시그마 삽질==six 시그마

kafka ksqlDB 본문

프로그래밍/Kafka

kafka ksqlDB

Ethan Matthew Hunt 2022. 8. 26. 08:35

컨플루언트 플랫폼 다운로드 압축해제

 

 

 

 

 

 

 


ksqlDB


• Event Streaming Database(또는 SQL 엔진) -RDBMS/NoSQL DB 가 아님
• Confluent Community License(2017)
• 간단한 Cluster 구축 방법 - 동일한 ksql.service.id 로 ksqlDB를 여러 개 기동
• 여러 개의 Cluster는 ksql.service.id 값을 서로 다르게 하기만 하면 됨
• SQL과 유사한 형태로 ksqlDB에 명령어를 전송하여 스트림 프로세싱 수행

SQL을 사용하여 실시간 이벤트 스트리밍 처리용 애플리케이션을 작성하기 위한 Apache Kafka® 스트리밍 DB(SQL 엔진)

KSQL 쿼리 작성 <--> 실시간 결과 확인 가능

 

 

https://docs.ksqldb.io/en/latest/reference/

 

 

state.store?

 

ksql.streams.state.dir

ksql.streams.state.dir

Per query: no

Sets the storage directory for stateful operations, like aggregations and joins, to a durable location. By default, state is stored in the /tmp/kafka-streams directory.

The state storage directory must be unique for every server running on the machine. Otherwise, servers may appear to be stuck and not doing any work.

https://docs.ksqldb.io/en/latest/reference/server-configuration/#ksqlstreamsstatedir

 

 

 Where are KSQL-related data and metadata stored?

https://docs.ksqldb.io/en/latest/faq/

Where are ksqlDB-related data and metadata stored?¶

In interactive mode, ksqlDB stores metadata in and builds metadata from the ksqlDB command topic. To secure the metadata, you must secure the command topic.

The ksqlDB command topic stores all data definition language (DDL) statements: CREATE STREAM, CREATE TABLE, DROP STREAM, and DROP TABLE. Also, the ksqlDB command topic stores TERMINATE statements, which stop persistent queries based on CREATE STREAM AS SELECT (CSAS) and CREATE TABLE AS SELECT (CTAS).

Currently, data manipulation language (DML) statements, like UPDATE and DELETE aren't available.

In headless mode, ksqlDB stores metadata in the config topic. The config topic stores the ksqlDB properties provided to ksqlDB when the application was first started. ksqlDB uses these configs to ensure that your SQL queries are built compatibly on every restart of the server.

https://docs.ksqldb.io/en/latest/operate-and-deploy/how-it-works/

 

Command Topic

In interactive mode, ksqlDB shares statements with servers in the cluster over the command topic. The command topic stores every SQL statement, along with some metadata that ensures the statements are built compatibly across ksqlDB restarts and upgrades. ksqlDB names the command topic _confluent-ksql-<service id>command_topic, where <service id> is the value in the ksql.service.id property.

By convention, the ksql.service.id property should end with a separator character of some form, for example a dash or underscore, as this makes the topic name easier to read.

https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/show-topics/

SHOW TOPICS does not display hidden topics by default, such as: * KSQL internal topics, like the KSQL command topic or changelog & repartition topics, or topics that match any pattern in the ksql.hidden.topics configuration.

SHOW ALL TOPICS lists all topics, including hidden topics

 

Lecture 33: Configuration Settings

Understanding settings

confluent stop

confluent destroy

 

cd /opt/confluent/etc/ksql

vi ksql-server.properties

 

# add this line anywhere in file

ksql.service.id=myservicename

 

confluent start ksql-server

Start KSQL

 

LIST PROPERTIES;

 

 Property                                                      | Default override | Effective Value

---------------------------------------------------------------------------------------------------

 ksql.schema.registry.url                                        | SERVER               | http://localhost:8081

 ksql.streams.auto.offset.reset                     | SERVER               | latest

 ksql.service.id                                    | SERVER               | myservicename                 <-- *** Note: this is the one we changed

 

SET 'auto.offset.reset'='earliest';

 

LIST PROPERTIES;

 

 

 Property                                                    | Default override | Effective Value

----------------------------------------------------------------------------------------------------

 ksql.schema.registry.url                                        | SERVER               | http://localhost:8081

 ksql.streams.auto.offset.reset                      | SESSION               | earliest            <-- *** Note both the override and Value cahnges

 ksql.service.id                                    | SERVER               | myservicename        

 

 

 

 

 

 

Lecture 34: State Stores

State Stores

Start KSQL

LIST PROPERTIES;

 

# Look for ksql.streams.state.dir

 

 Property                                                      | Default override | Effective Value

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 ksql.streams.state.dir                             | SERVER               | /var/folders/1p/3whlrkzx4bs3fkd55_600x4c0000gp/T/confluent.V2kB1p2N/ksql-server/data/kafka-streams

At UNIX

ksql-datagen schema=./datagen/userprofile.avro format=json topic=USERPROFILE key=userid maxInterval=5000 iterations=1000

 

At KSQL

CREATE STREAM userprofile (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) \

  WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE');

 

At UNIX

# note: this will show nothing

find /var/folders/1p/3whlrkzx4bs3fkd55_600x4c0000gp/T/confluent.V2kB1p2N/ksql-server/data/kafka-streams -type f

Run a stateful operation, which should require RocksDB to persist to disk

 

select countrycode, count(*) from userprofile group by countrycode;

 

At UNIX

# note: this will now show files

find /var/folders/1p/3whlrkzx4bs3fkd55_600x4c0000gp/T/confluent.V2kB1p2N/ksql-server/data/kafka-streams -type f

Complex State Stores example

set 'ksql.sink.partitions' = '1';

 

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic userrequests

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic browsertype

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic location

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic cartype

 

create table browsertype (browser_code bigint, browsername varchar) with (kafka_topic='browsertype', value_format='json', key='browser_code');

 

create table location (location_code bigint, locationname varchar) with (kafka_topic='location', value_format='json', key='location_code');

 

create table cartype (car_code bigint, carname varchar) with (kafka_topic='cartype', value_format='json', key='car_code');

 

 

create stream userrequests (browser_code bigint, location_code bigint, car_code bigint, silly varchar) with (kafka_topic='userrequests', value_format='json');

 

create stream user_browser as select us.LOCATION_CODE, us.CAR_CODE, us.silly, bt.browsername from userrequests us left join browsertype bt on bt.browser_code=us.browser_code;

 

 

create stream user_browser_location as select ub.CAR_CODE, ub.silly, ub.browsername, l.locationname from user_browser ub left join location l on ub.location_code = l.location_code;

 

create stream user_browser_location_car as select ubl.silly, ubl.browsername, ubl.locationname, c.carname from user_browser_location ubl left join cartype c on ubl.CAR_CODE = c.car_code;

 

 

 

 

 

 

 

 

Partitioning requirements

https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/

 

 

Data types

https://docs.ksqldb.io/en/latest/reference/sql/data-types/

 

 

1)Character types

String ,bytes 있다. varchar

 

2)Numeric types

Long 없고 double 있다.(int, bigint,double, decimal)

 

3)Time types

time, date, timestamp ( datetime없다!!)

 

4)Compound types (array,struct,map)

 

CREATE TABLE users

(registertime BIGINT, userid VARCHAR, gender VARCHAR, regionid VARCHAR, interests ARRAY<STRING>, contactinfo MAP<STRING,STRING>)

WITH (KAFKA_TOPIC = 'users', VALUE_FORMAT='JSON', KEY = 'userid');

 

(1)array

sequence of values of a single type.    Java native array

SELECT interests[0]  AS first_interest, userid, gender, regionid FROM users EMIT CHANGES;

 

(2)struct

a strongly typed structured data type.    org.apache.kafka.connect.data

Access nested data by declaring a STRUCT and using the dereference operator (->) to access its fields:

SELECT USERID, ADDRESS->STREET, ADDRESS->HOUSE_NUM FROM USERS EMIT CHANGES;

https://docs.ksqldb.io/en/0.7.1-ksqldb/developer-guide/query-with-structured-data/

https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/operators/

 

(3)map 있다

a mapping of keys to values.    java.util.map

Access the values of a map by using the [] operator and passing in the key. For example, SOME_MAP['cost'] retrieves the value for the entry with key cost, or null For more information,

https://docs.ksqldb.io/en/0.7.1-ksqldb/developer-guide/query-with-arrays-and-maps/

 

SELECT contactinfo['zipcode'] AS zipcode, contactinfo['city'] AS city, userid, gender, regionid FROM users EMIT CHANGES;

 

Data definition

  stream table
Key column type KEY PRIMARY KEY
NON NULL key constraint No Yes: A message in the Kafka topic with a NULL PRIMARY KEY is ignored.
Unique key constraint No: A message with the same key as another has no special meaning. Yes: A later message with the same key replaces earlier messages in the table.
Tombstones No: A message with a NULL value is ignored. Yes: A NULL message value is treated as a tombstone. Any existing row with a matching key is deleted from the table.

 

 

Streams

A stream is a partitioned, immutable, append-only collection that represents a series of historical facts. For example, the rows of a stream could model a sequence of financial transactions, like "Alice sent $100 to Bob", followed by "Charlie sent $50 to Bob".

Once a row is inserted into a stream, it can never change. New rows can be appended at the end of the stream, but existing rows can never be updated or deleted.

Each row is stored in a particular partition. Every row, implicitly or explicitly, has a key that represents its identity. All rows with the same key reside in the same partition.

To create a stream, use the CREATE STREAM command. The following example statement specifies a name for the new stream, the names of the columns, and the data type of each column.

 

 

Stream properties

Use the WITH clause to specify details about your stream. The WITH clause supports the following properties.

FORMAT(key,value 한꺼번에 넣는것),KAFKA_TOPIC (required), KEY_FORMAT, KEY_SCHEMA_ID, PARTITIONS, REPLICAS, TIMESTAMP, TIMESTAMP_FORMAT, VALUE_DELIMITER, VALUE_FORMAT, VALUE_SCHEMA_ID, WRAP_SINGLE_VALUE

 

 

CREATE STREAM s1 (
    k VARCHAR KEY,
    v1 INT,
    v2 VARCHAR
) WITH (
    kafka_topic = 's1',
    partitions = 3,
    value_format = 'json'
);

 

 For example, if you create a stream named "pageviews_enriched", ksqlDB might assign an ID like "CSAS_PAGEVIEWS_ENRICHED_1". The prepended string, "CSAS", is an acronym for CREATE STREAM AS SELECT.

 

The stream metadata, like the column layout, serialization scheme, and other information, is placed into ksqlDB's command topic, which is its internal cluster communication channel.

https://docs.ksqldb.io/en/latest/operate-and-deploy/how-it-works/#command-topic

 

 

 

-- Create a view that enriches a stream with a table lookup:
CREATE STREAM enriched AS
   SELECT
      cs.*,
      u.name,
      u.classification,
      u.level
   FROM clickstream cs
      JOIN users u ON u.id = cs.userId
   EMIT CHANGES;

-- Create a view that enriches a stream with a table lookup with value serialization schema 
-- defined by VALUE_SCHEMA_ID:
CREATE STREAM enriched WITH (
    VALUE_SCHEMA_ID = 1
  ) AS
  SELECT
     cs.*,
     u.name,
     u.classification,
     u.level
  FROM clickstream cs
    JOIN users u ON u.id = cs.userId
  EMIT CHANGES;

 

 

 

Tables

A table is a mutable, partitioned collection that models change over time. In contrast with a stream, which represents a historical sequence of events, a table represents what is true as of "now". For example, you might use a table to model the locations where someone has lived as a stream: first Miami, then New York, then London, and so forth.

Tables work by leveraging the keys of each row. If a sequence of rows shares a key, the last row for a given key represents the most up-to-date information for that key's identity. A background process periodically runs and deletes all but the newest rows for each key.

Syntactically, declaring a table is similar to declaring a stream. The following example statement declares a current_location table that has a key field named person.

Each row is identified by its PRIMARY KEY. A row's PRIMARY KEY can't be NULL.

if the message's value is NULL, it deletes the row.

 

CREATE TABLE current_location (
    person VARCHAR PRIMARY KEY,
    location VARCHAR
) WITH (
    kafka_topic = 'current_location',
    partitions = 3,
    value_format = 'json'
);

 

CREATE STREAM foo WITH (TIMESTAMP='t2') AS
  SELECT * FROM bar
  WINDOW TUMBLING (size 10 seconds);
  EMIT CHANGES;




 

Headers

Starting in ksqlDB 0.24, you can mark a column with HEADERS or HEADER('<key>') to indicate that it is populated by the header field of the underlying Kafka record. A column marked with HEADERS must have the type ARRAY<STRUCT<key STRING, value BYTES>> and contains the full list of the Kafka record's header keys and values.

 

 

 

Time operations

-- Example timestamp format: yyyy-MM-dd'T'HH:mm:ssX CREATE STREAM TEST (id BIGINT KEY, event_timestamp VARCHAR) WITH ( kafka_topic='test_topic', value_format='JSON', timestamp='event_timestamp', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX' );

 

 

CREATE CONNECTOR

Synopsis

CREATE SOURCE | SINK CONNECTOR [IF NOT EXISTS] connector_name WITH( property_name = expression [, ...]);

 

Example

CREATE SOURCE CONNECTOR `jdbc-connector` WITH(
    "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
    "connection.url"='jdbc:postgresql://localhost:5432/my.db',
    "mode"='bulk',
    "topic.prefix"='jdbc-',
    "table.whitelist"='users',
    "key"='username');

 

 

 


Lambda Functions
invocation

Lambda functions must be used inside designated invocation functions. 
These are the available Invocations:
TRANSFORM
REDUCE
FILTER

 

https://docs.ksqldb.io/en/latest/concepts/lambda-functions/
https://docs.ksqldb.io/en/latest/how-to-guides/use-lambda-functions/

 

Push queries: query the state of the system in motion and continue to output results until they meet a LIMIT condition or are terminated by the user. This was the default behavior in older versions of KSQL. ‘EMIT CHANGES’ is used to to indicate a query is a push query.(ksqlDB 5.4 onwards)

 

 

Pull queries: query the current state of the system, return a result, and terminate. Use this to select a result as of “now”. New from 5.4. KSQL currently only supports pull queries on materialized aggregate tables (sometimes refered to as materialized views). i.e. those created by a ‘CREATE TABLE AS SELECT ,  FROM  GROUP BY ’ style statement. A query must using a predicate against ROWKEY

 

 

WHERE Clause Guidelines¶

By default, only key lookups are enabled. They have the following requirements:

 

Key column(s) must use an equality comparison to a literal (e.g. KEY = 'abc').

On windowed tables, WINDOWSTART and WINDOWEND can be optionally compared to literals. For more information on windowed tables, see Time and Windows in ksqlDB.

You can loosen the restrictions on the WHERE clause, or eliminate the WHERE clause altogether, by enabling table scans in your current CLI session with the command SET 'ksql.query.pull.table.scan.enabled'='true';. Table scans can also be enabled by default by setting a server configuration property with ksql.query.pull.table.scan.enabled=true. Once table scans are enabled, the following additional expressions are allowed:

 

Key column(s) using range comparisons to literals.

Non-key columns to be used alone, without key references.

Columns to be compared to other columns.

References to subsets of columns from a multi-column key.

Complex expressions without direct column references using UDFs and function calls (e.g. instr(NAME_COL, 'hello') > 0).

 

 

Time and Windows

https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/#event-time

 

 

 

 

 

 

For example, to start the ksql-server

                     Prior to 5.3 : confluent start ksql-server

                     From 5.3 : confluent local start ksql-server

                     From 6.0 : confluent local services ksql-server start

 

 

Lecture 8: Our first KSQL Stream

print 'USERS';
print 'USERS' from beginning;
print 'USERS' from beginning limit 2;
print 'USERS' from beginning interval 2 limit 2 ;

list topics;

 

 

create stream users_stream (name VARCHAR, countrycode VARCHAR) WITH (KAFKA_TOPIC='USERS', VALUE_FORMAT='DELIMITED');

 

select name, countrycode  from users_stream emit changes;

auto.offset.reset - Determines what to do when there is no initial offset in Apache Kafka or if the current offset does not exist on the server. The default value in KSQL is latest, which means all Kafka topics are read from the latest available offset. For example, to change it to earliest by using the KSQL command line:

 

-- default to beginning of time

SET 'auto.offset.reset'='earliest';

 

-- basic aggregate

select countrycode, count(*) from users_stream group by countrycode emit changes;

 

drop stream if exists users_stream delete topic;

 

Lecture 9: Create a Stream with JSON

 

At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic USERPROFILE

 

kafka-console-producer --broker-list localhost:9092 --topic USERPROFILE << EOF

{"userid": 1000, "firstname":"Alison", "lastname":"Smith", "countrycode":"GB", "rating":4.7}

EOF

 

kafka-console-producer --broker-list localhost:9092 --topic USERPROFILE << EOF

{"userid": 1001, "firstname":"Bob", "lastname":"Smith", "countrycode":"US", "rating":4.2}

EOF

 

At KSQL prompt

CREATE STREAM userprofile (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) \

  WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE');

SET 'auto.offset.reset'='earliest';

 

select firstname, lastname, countrycode, rating from userprofile emit changes;

Alison | Smith | GB | 4.7

 

 

Lecture 10: KSQL Datagen - Generating Streams

At UNIX prompt

ksql-datagen schema=./datagen/userprofile.avro format=json topic=USERPROFILE key=userid msgRate=1 iterations=1000

At KSQL prompt

-- Review a stream - every 5th row

print 'USERPROFILE' interval 5;

 

 

Lecture 11: Manipulate a Stream

At KSQL prompt

ksql> describe userprofile;

 

Name                   : USERPROFILE

 Field              | Type

-----------------------------------------

 USERID         | INTEGER

 FIRSTNAME   | VARCHAR(STRING)

 LASTNAME    | VARCHAR(STRING)

 COUNTRYCODE | VARCHAR(STRING)

 RATING        | DOUBLE

 

 

select rowtime, firstname from userprofile emit changes;

Review Scalar functions at https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#scalar-functions

select  TIMESTAMPTOSTRING(rowtime, 'dd/MMM HH:mm') as createtime, firstname + ' ' + ucase(lastname)  as full_name

from userprofile emit changes;

 

 

Lecture 12: Streams from streams and functions

Create a stream from a stream At KSQL prompt

 

select firstname + ' '

+ ucase( lastname)

+ ' from ' + countrycode

+ ' has a rating of ' + cast(rating as varchar) + ' stars. '

+ case when rating < 2.5 then 'Poor'

           when rating between 2.5 and 4.2 then 'Good'

           else 'Excellent'

   end as description

from userprofile emit changes;

 

Bob FAWCETT from IN has a rating of 4.4 stars. | Excellent

Heidi COEN from US has a rating of 4.9 stars. | Excellent

Bob FAWCETT from IN has a rating of 2.2 stars. | Poor

 

At KSQL prompt

Review the script user_profile_pretty.ksql

 

list streams;

 

run script 'user_profile_pretty.ksql';

 

list streams;

 

describe extended user_profile_pretty;

 CSAC_user_profile_pretty_0 나옴. 이걸 삭제해야 user_profile_pretty stream 삭제가능

 

select description from user_profile_pretty emit changes;

 

drop stream user_profile_pretty;

 

terminate CSAS_USER_PROFILE_PRETTY_0;

 

drop stream user_profile_pretty;

 

list streams;

 

drop stream IF EXISTS user_profile_pretty DELETE TOPIC;

 

 

 

Lecture 13: ksqlDB Tables

Create a table

At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic COUNTRY-CSV

 

-- version 5.5 and later

kafka-console-producer --broker-list localhost:9092 --topic COUNTRY-CSV --property "parse.key=true"  --property "key.separator=:" << EOF

AU:Australia

IN:India

GB:UK

US:United States

EOF

At KSQL prompt

 

-- version 5.5 and later

CREATE TABLE COUNTRYTABLE  (countrycode VARCHAR PRIMARY KEY, countryname VARCHAR) WITH (KAFKA_TOPIC='COUNTRY-CSV', VALUE_FORMAT='DELIMITED',KEY=’countrycode’);

 

show tables;

 

describe COUNTRYTABLE;

 

describe extended COUNTRYTABLE;

 

SET 'auto.offset.reset'='earliest';

 

select countrycode, countryname from countrytable emit changes;

 

-- Note the countryname is "UK"

select countrycode, countryname from countrytable where countrycode='GB' emit changes limit 1;

 

-- This does not exist

select countrycode, countryname from countrytable where countrycode='FR' emit changes;

Update a table

One record updated (UK->United Kingdom), one record added (FR)

At UNIX prompt

kafka-console-producer --broker-list localhost:9092 --topic COUNTRY-CSV --property "parse.key=true"  --property "key.separator=:" << EOF

GB:United Kingdom

FR:France

EOF

 

At KSQL prompt

select countrycode, countryname from countrytable emit changes;

 

-- Note the countryname has changed to "United Kingdom"

select countrycode, countryname from countrytable where countrycode='GB' emit changes limit 1;

 

-- And now appears

select countrycode, countryname from countrytable where countrycode='FR' emit changes;

 

 

 

Section 5: ksqlDB and KSQL Intermediate

Lecture 14: KSQL Joins

Join user stream to country table

At KSQL prompt

select up.firstname, up.lastname, up.countrycode, ct.countryname

from USERPROFILE up

left join COUNTRYTABLE ct on ct.countrycode=up.countrycode emit changes;

 

create stream up_joined as

select up.firstname

+ ' ' + ucase(up.lastname)

+ ' from ' + ct.countryname

+ ' has a rating of ' + cast(rating as varchar) + ' stars.' as description

, up.countrycode

from USERPROFILE up

left join COUNTRYTABLE ct on ct.countrycode=up.countrycode;

 

select description from up_joined emit changes;

 

 

 

 

Lecture 15: Pull Queries

Pull Queries

Pull Query - new in ksqlDB (5.4 onwards)

SET 'auto.offset.reset'='earliest';

 

-- from 5.5 onwards

CREATE STREAM driverLocations (driverId VARCHAR KEY, countrycode VARCHAR, city VARCHAR, driverName VARCHAR)

  WITH (kafka_topic='driverlocations', value_format='json', partitions=1);

 

INSERT INTO driverLocations (driverId, countrycode, city, driverName) VALUES ('1', 'AU', 'Sydney', 'Alice');

INSERT INTO driverLocations (driverId, countrycode, city, driverName) VALUES ('2', 'AU', 'Melbourne', 'Bob');

INSERT INTO driverLocations (driverId, countrycode, city, driverName) VALUES ('3', 'GB', 'London', 'Carole');

INSERT INTO driverLocations (driverId, countrycode, city, driverName) VALUES ('4', 'US', 'New York', 'Derek');

create table countryDrivers as select countrycode, count(*) as numDrivers from driverLocations group by countrycode;

-- note: as a pull query we don't use "emit"

select countrycode, numdrivers from countryDrivers where countrycode='AU';

 

INSERT INTO driverLocations (driverId, countrycode, city, driverName) VALUES ('5', 'AU', 'Sydney', 'Emma');

 

-- note: as a pull query we don't use "emit"

select countrycode, numdrivers from countryDrivers where countrycode='AU';

 

 

 

Lecture 16: Kafka Connect with ksqlDB

Kafka Connect with ksqlDB. You will be running this example using docker. First we need to stop the local Confluent platform

confluent local services stop

 

Now, start Postgres and Confluent platform together using docker

docker-compose up -d

 

Start ksqlDB KSQL CLI

docker-compose exec  ksqldb-cli ksql http://ksqldb-server:8088

 

Kafka Connect

cat postgres-setup.sql

docker-compose exec postgres psql -U postgres -f /postgres-setup.sql

 

To look at the Postgres table

docker-compose exec postgres psql -U postgres -c "select * from carusers;"

 

CREATE SOURCE CONNECTOR postgres_jdbc_source WITH(

  "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',

  "connection.url"='jdbc:postgresql://postgres:5432/postgres',

  "mode"='incrementing',

  "incrementing.column.name"='ref',

  "table.whitelist"='carusers',

  "connection.password"='password',

  "connection.user"='postgres',

  "topic.prefix"='db-',

  "key"='username');

 

print 'db-carusers' from beginning;

 

In another window, insert a new database row

docker exec -it postgres psql -U postgres -c "INSERT INTO carusers (username) VALUES ('Derek');"

 

 

 

Lecture 17: Data Encodings

Data Formats

Imagine a complaints stream of unhappy customers. Explore the different data formats (CSV, JSON, AVRO)

Column AVRO Type KSQL Type
customer_name string VARCHAR
complaint_type string VARCHAR
trip_cost float DOUBLE
new_customer boolean BOOLEAN

 

 

Lecture 18: CSV Delimited Data

At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic COMPLAINTS_CSV

 

kafka-console-producer --broker-list localhost:9092 --topic COMPLAINTS_CSV << EOF

Alice, Late arrival, 43.10, true

EOF

 

At KSQL prompt

CREATE STREAM complaints_csv (customer_name VARCHAR, complaint_type VARCHAR, trip_cost DOUBLE, new_customer BOOLEAN) \

  WITH (VALUE_FORMAT = 'DELIMITED', KAFKA_TOPIC = 'COMPLAINTS_CSV');

 

select * from complaints_csv emit changes;

CSV - experience with bad data

At UNIX prompt

 

kafka-console-producer --broker-list localhost:9092 --topic COMPLAINTS_CSV << EOF

Alice, Bob and Carole, Bad driver, 43.10, true

EOF

 

 

 

Lecture 19: JSON Data

JSON - At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic COMPLAINTS_JSON

 

kafka-console-producer --broker-list localhost:9092 --topic COMPLAINTS_JSON << EOF

{"customer_name":"Alice, Bob and Carole", "complaint_type":"Bad driver", "trip_cost": 22.40, "new_customer": true}

EOF

 

At KSQL prompt

CREATE STREAM complaints_json (customer_name VARCHAR, complaint_type VARCHAR, trip_cost DOUBLE, new_customer BOOLEAN) \

  WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'COMPLAINTS_JSON');

 

select * from complaints_json emit changes;

JSON - experience with bad data

At UNIX prompt

kafka-console-producer --broker-list localhost:9092 --topic COMPLAINTS_JSON << EOF

{"customer_name":"Bad Data", "complaint_type":"Bad driver", "trip_cost": 22.40, "new_customer": ShouldBeABoolean}

EOF

 

Review the KSQL Server logs confluent local services ksql-server log

Now look at the KSQL Server log. We can see bad data is noticed; but hidden in a conversion error message

  at [Source: (byte[])"{"customer_name":"Bad Data", "complaint_type":"Bad driver", "trip_cost": 22.40, "new_customer": ShouldBeABoolean}"; line: 1, column: 105]

 Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ShouldBeABoolean': was expecting ('true', 'false' or 'null')

 

 

 

Lecture 20: Avro Data

At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic COMPLAINTS_AVRO

 

kafka-avro-console-producer  --broker-list localhost:9092 --topic COMPLAINTS_AVRO \

--property value.schema='

{

  "type": "record",

  "name": "myrecord",

  "fields": [

            {"name": "customer_name",  "type": "string" }

             , {"name": "complaint_type", "type": "string" }

             , {"name": "trip_cost", "type": "float" }

             , {"name": "new_customer", "type": "boolean"}

  ]

}' << EOF

{"customer_name":"Carol", "complaint_type":"Late arrival", "trip_cost": 19.60, "new_customer": false}

EOF

 

At KSQL prompt

-- Note no columns or data type specified

create stream complaints_avro with (kafka_topic='COMPLAINTS_AVRO', value_format='AVRO');

 

describe extended complaints_avro;

 

 

AVRO - experience with bad data

At UNIX prompt - note bad data is noted at serialization time

kafka-avro-console-producer  --broker-list localhost:9092 --topic COMPLAINTS_AVRO \

--property value.schema='

{

  "type": "record",

  "name": "myrecord",

  "fields": [

            {"name": "customer_name",  "type": "string" }

             , {"name": "complaint_type", "type": "string" }

             , {"name": "trip_cost", "type": "float" }

             , {"name": "new_customer", "type": "boolean"}

  ]

}' << EOF

 

{"customer_name":"Bad Data", "complaint_type":"Bad driver", "trip_cost": 22.40, "new_customer": ShouldBeABoolean}

EOF

 

 

Lecture 21: Avro Schema Evolution

 

토픽1 스트림 생성 (스키마1 생성) -데이터 insert –> 스키마1 변경(스키마2) ->스키마2로 된 데이터를 토픽1에  insert -> 토픽1 조회시 스키마2의 컬럼 추가된거 컬럼조차 안나옴!!! -> 토픽1스트림 토대로 토픽2 스트림 재생성(현재 스키마2로) -> 토픽2 조회시 컬럼 추가된 스키마 나옴(스키마 변경전 데이터의 컬럼은 null나옴)

 

At UNIX prompt

# Optional : strart Confluent Control Center

confluent local services start

 

curl -s -X GET http://localhost:8081/subjects/COMPLAINTS_AVRO-value/versions

 

kafka-avro-console-producer  --broker-list localhost:9092 --topic COMPLAINTS_AVRO \

--property value.schema='

{

  "type": "record",

  "name": "myrecord",

  "fields": [

            {"name": "customer_name",  "type": "string" }

             , {"name": "complaint_type", "type": "string" }

             , {"name": "trip_cost", "type": "float" }

             , {"name": "new_customer", "type": "boolean"}

             , {"name": "number_of_rides", "type": "int", "default" : 1}

  ]

}' << EOF

{"customer_name":"Ed", "complaint_type":"Dirty car", "trip_cost": 29.10, "new_customer": false, "number_of_rides": 22}

EOF

 

curl -s -X GET http://localhost:8081/subjects/COMPLAINTS_AVRO-value/versions

 

curl -s -X GET http://localhost:8081/subjects/COMPLAINTS_AVRO-value/versions/1 | jq '.'

 

curl -s -X GET http://localhost:8081/subjects/COMPLAINTS_AVRO-value/versions/2 | jq '.'

At KSQL prompt

 

ksql> describe complaints_avro;

 

 

 

Lecture 22: Nested JSON

Imagine we have data like this

{

  "city": {

             "name": "Sydney",

             "country": "AU",

             "latitude": -33.8688,

             "longitude": 151.2093

  },

  "description": "light rain",

  "clouds": 92,

  "deg": 26,

  "humidity": 94,

  "pressure": 1025.12,

  "rain": 1.25

}

At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic WEATHERNESTED

 

cat demo-weather.json | kafka-console-producer --broker-list localhost:9092 --topic WEATHERNESTED

Extract like this - At KSQL prompt

SET 'auto.offset.reset'='earliest';

 

 

CREATE STREAM weather

            (city STRUCT <name VARCHAR, country VARCHAR, latitude DOUBLE, longitude DOUBLE>,

           description VARCHAR,

           clouds BIGINT,

           deg BIGINT,

           humidity BIGINT,

           pressure DOUBLE,

           rain DOUBLE)

WITH (KAFKA_TOPIC='WEATHERNESTED', VALUE_FORMAT='JSON');  

 

SELECT city->name AS city_name, city->country AS city_country, city->latitude as latitude, city->longitude as longitude, description, rain from weather emit changes; 

 

 

 

 

 

 

Lecture 23: Build a rekeyed table

 

기존스트림을 기반으로 키가 있는 새로운 스트림 만들고

그 스트림을 토픽삼아 새로운 테이블을 만들고

 

그 테이블을 사용한다.

 

 

                     create a table based on rekeyed city field from weather stream

                     At KSQL prompt

create stream weatherraw with (value_format='AVRO') as SELECT city->name AS city_name, city->country AS city_country, city->latitude as latitude, city->longitude as longitude, description, rain from weather ;

 

list streams;

-- note AVRO

 

describe extended weatherraw;

 

Now notice the Key field

ksql> describe extended weatherraw;

 

Name                   : WEATHERRAW

Type                    : STREAM

Key field                 :               <- *** NOTE BLANK ***

Key format              : STRING

Timestamp field         : Not set - using <ROWTIME>

Value format             : AVRO

Kafka topic              : WEATHERRAW (partitions: 4, replication: 1)

create stream weatherrekeyed as select * from weatherraw partition by city_name;

 

describe extended weatherrekeyed;

 

Now notice the Key field

ksql> describe extended weatherrekeyed;

 

Name                   : WEATHERREKEYED

Type                    : STREAM

Key field                 : CITY_NAME  <- ***  Keyed on city ***

Key format              : STRING

Timestamp field         : Not set - using <ROWTIME>

Value format             : AVRO

Kafka topic              : WEATHERREKEYED (partitions: 4, replication: 1)

-- prior to 5.5

create table weathernow with (kafka_topic='WEATHERREKEYED', value_format='AVRO', key='CITY_NAME');

 

-- from 5.5 onwards

create table weathernow (city_name varchar primary key, city_country varchar, latitude double, longitude double, description varchar, rain double) with (kafka_topic='WEATHERREKEYED', value_format='AVRO');

 

 

select * from weathernow emit changes;

 

select * from weathernow where city_name = 'San Diego' emit changes;

 

Let’s make it sunny! At UNIX prompt

cat demo-weather-changes.json | kafka-console-producer --broker-list localhost:9092 --topic WEATHERNESTED

 

At KSQL prompt

select * from weathernow where city_name = 'San Diego' emit changes;

 

 

 

 

 

Lecture 24: Repartition a Stream

 

파티션

 

드라이버 프로파일1 스트림 파티션2

컨트리 테이블 파티션1

 

기존 드라이버 프로파일1을 기반으로 새로운 드라이버 프로파일2 스트림 생성(파티션을 테이블과 맞춤)

 

새로운 드라이버 프로파일2와 테이블을 조인시킴!!!

 

When you use KSQL to join streaming data, you must ensure that your streams and tables are co-partitioned, which means that input records on both sides of the join have the same configuration settings for partitions.

 

At UNIX prompt

kafka-topics --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic DRIVER_PROFILE

 

kafka-console-producer --broker-list localhost:9092 --topic DRIVER_PROFILE << EOF

{"driver_name":"Mr. Speedy", "countrycode":"AU", "rating":2.4}

EOF

 

At KSQL prompt

CREATE STREAM DRIVER_PROFILE (driver_name VARCHAR, countrycode VARCHAR, rating DOUBLE)

  WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'DRIVER_PROFILE');

 

 

 

select dp.driver_name, ct.countryname, dp.rating

from DRIVER_PROFILE dp

left join COUNTRYTABLE ct on ct.countrycode=dp.countrycode emit changes;  

 

Can't join DRIVER_PROFILE with COUNTRYTABLE since the number of partitions don't match. DRIVER_PROFILE partitions = 2; COUNTRYTABLE partitions = 1. Please repartition either one so that the number of partitions match.

 

We can fix this by co-partitioning, use the PARTITION BY clause. At KSQL prompt

create stream driverprofile_rekeyed with (partitions=1) as select * from DRIVER_PROFILE partition by driver_name;

 

select dp2.driver_name, ct.countryname, dp2.rating

from DRIVERPROFILE_REKEYED dp2

left join COUNTRYTABLE ct on ct.countrycode=dp2.countrycode emit changes;  

 

 

 

Lecture 25: Merging Streams

Merging Streams; Concat Topics with INSERT

                     create stream of requested rides in Europe using data gen

                     create stream of requested rides in USA using data gen

                     combine into single stream of all requested rides using INSERT

 

At UNIX prompt

ksql-datagen schema=./datagen/riderequest-europe.avro  format=avro topic=riderequest-europe key=rideid msgRate=1 iterations=1000

 

ksql-datagen schema=./datagen/riderequest-america.avro format=avro topic=riderequest-america key=rideid msgRate=1 iterations=1000

 

At KSQL prompt

create stream rr_america_raw with (kafka_topic='riderequest-america', value_format='avro');

 

create stream rr_europe_raw with (kafka_topic='riderequest-europe', value_format='avro'); 

 

select * from rr_america_raw emit changes;

 

select * from rr_europe_raw emit changes;

 

create stream rr_world as select 'Europe' as data_source, * from rr_europe_raw;

 

insert into rr_world  select 'Americas' as data_source, * from rr_america_raw;

 

select * from rr_world emit changes;

 

 

 

 

Lecture 26: Windowing

                     how many requests are arriving each time period

                     At KSQL prompt

select data_source, city_name, count(*)

from rr_world

window tumbling (size 60 seconds)

group by data_source, city_name emit changes;

select data_source, city_name, COLLECT_LIST(user)

from rr_world

window tumbling (size 60 seconds)

group by data_source, city_name emit changes; 

select data_source, city_name, COLLECT_LIST(user)

from rr_world WINDOW SESSION (60 SECONDS)

group by data_source, city_name emit changes;

 

select TIMESTAMPTOSTRING(WindowStart, 'HH:mm:ss')

, TIMESTAMPTOSTRING(WindowEnd, 'HH:mm:ss')

, data_source

, TOPK(city_name, 3)

, count(*)

FROM rr_world

WINDOW TUMBLING (SIZE 1 minute)

group by data_source

emit changes;

 

 

 

 

 

 

 

 

Lecture 27: Geospatial

                     create stream - distance of car to waiting rider

                     At KSQL prompt

 

select * from rr_world emit changes;

 

describe rr_world;

 

create stream requested_journey as

select rr.latitude as from_latitude

, rr.longitude as from_longitude

, rr.user

, rr.city_name as city_name

, w.city_country

, w.latitude as to_latitude

, w.longitude as to_longitude

, w.description as weather_description

, w.rain

from rr_world rr

left join weathernow w on rr.city_name = w.city_name; 

 

 

create stream ridetodest as

select user

, city_name

, city_country

, weather_description

, rain

, GEO_DISTANCE(from_latitude, from_longitude, to_latitude, to_longitude, 'km') as dist

from requested_journey;

 

select user + ' is travelling ' + cast(round(dist) as varchar) +' km to ' + city_name + ' where the weather is reported as ' + weather_description

from ridetodest emit changes;

 

Alice is at (52,0) and is travelling 215 km to Manchester where it is SUNNY

Heidi is at (51,-1) and is travelling 88 km to London where it is heavy rain

Grace is at (50,-1) and is travelling 138 km to London where it is heavy rain

 

 

 

Section 6: ksqlDB and KSQL Extensions - UDF & UDAF

Lecture 28: Extending KSQL - UDF / UDAF

UDF - Build and deploy KSQL User Defined Anomoly Functions - write a UDF to calculare drive time based on - distance to travel - weather conditions

Compile Code to Create Anomoly Functions

                     Have a look at the file java/src/main/java/com/vsimon/kafka/streams/TaxiWait.java

                     If you don’t want to compile the code; just copy the JAR from java/pre-compiled/ksql-udf-taxi-1.0.jar

                     Download Maven and follow the installation instructions (https://maven.apache.org/)

cd java

mvn clean package

ls target/ksql-udf-taxi-1.0.jar

Deploy KSQL User Defined Functions

Find the location of your extension directory. From KSQL

ksql> LIST PROPERTIES;

 

 Property              | Effective Value

--------------------------------------------

 . . .

 ksql.extension.dir      | ext                    <--  *** Look for this

 . . .

# Stop (just the) KSQL-Server

confluent local services ksql-server stop

 

# Create an ext (extensions) directory in ${CONFLUENT_HOME}/ext

mkdir /opt/confluent/ext

 

# build ksql-udf-taxi.jar as above and copy into ext directory

cp target/ksql-udf-taxi-1.0.jar /opt/confluent/ext

 

# or to use the pre-compile one

cp pre-compiled/ksql-udf-taxi-1.0.jar /opt/confluent/ext

 

# Restart KSQL server

confluent local services ksql-server start

 

 

 

Check KSQL User Defined Functions Available

Start ksql client and verify

ksql> list functions;

 

 Function Name          | Type

-------------------------------

  . . .

 SUM                     | AGGREGATE

 TAXI_WAIT             | SCALAR                <--- You need this one

 TIMESTAMPTOSTRING | SCALAR

 

 

ksql> DESCRIBE FUNCTION TAXI_WAIT;

 

Name             : TAXI_WAIT

Overview          : Return expected wait time in minutes

Type              : scalar

Jar                 : /etc/ksql/ext/ksql-udf-taxi-1.0.jar

Variations  :

 

             Variation   : TAXI_WAIT(VARCHAR, DOUBLE)

             Returns            : DOUBLE

             Description : Given weather and distance return expected wait time in minutes

 

Lecture 29: Using the UDF / UDAF

Use the UDF

describe ridetodest;

 

select user

, round(dist) as dist

, weather_description

, round(TAXI_WAIT(weather_description, dist)) as taxi_wait_min

from ridetodest emit changes;

 

 

select user

+ ' will be waiting ' + cast(round(TAXI_WAIT(weather_description, dist)) as varchar)

+ ' minutes for their trip of '

+ cast(round(dist) as varchar) +' km to ' + city_name

+ ' where it is ' + weather_description

from ridetodest emit changes;

 

Heidi will be waiting 14 minutes for their trip of 358 km to Bristol where it is light rain

Bob will be waiting 4 minutes for their trip of 218 km to Manchester where it is SUNNY

Frank will be waiting 15 minutes for their trip of 193 km to London where it is heavy rain

 

 

Lecture 31: Explain Plan

Explain

create stream my_stream

as select firstname

from userprofile;

 

show queries;

 

explain CSAS_MY_STREAM_1;

 

 

create table my_table

as select firstname, count(*) as cnt

from userprofile

group by firstname;

 

show queries;

 

explain CTAS_MY_TABLE_0;

 

 

 

Kafka Streams Topology Visualizer

Converts an ASCII Kafka Topology description into a hand drawn diagram.

                     See https://zz85.github.io/kafka-streams-viz/

 

 

Lecture 32: Scaling and Load Balancing

한개만 꺼지면 잠시 멈췄다가 움직인다

그런데 둘다 꺼지면 완전멈춤.

그 후 시간이 많이 지난뒤  한개 키면 그 이전에 못받았던거 한꺼번에 다 받아진다!!

Multi Server with docker

docker-compose  -f docker-compose-prod.yml up -d

ksql-datagen schema=./datagen/userprofile.avro format=json topic=USERPROFILE key=userid maxInterval=1000 iterations=100000

In KSQL

 

CREATE STREAM userprofile (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE)

  WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE');

 

create stream up_lastseen as

SELECT TIMESTAMPTOSTRING(rowtime, 'dd/MMM HH:mm:ss') as createtime, firstname

from userprofile;

kafka-console-consumer --bootstrap-server localhost:9092  --topic UP_LASTSEEN

 

 

docker-compose -f docker-compose-prod.yml ps

 

# stop 1

docker-compose -f docker-compose-prod.yml stop ksql-server-1

 

# re-start 1

docker-compose -f docker-compose-prod.yml start ksql-server-1

 

# stop 2

docker-compose -f docker-compose-prod.yml stop ksql-server-2

 

# stop 1

docker-compose -f docker-compose-prod.yml stop ksql-server-1

 

# start 2

docker-compose -f docker-compose-prod.yml start ksql-server-1

 

# start 1

docker-compose -f docker-compose-prod.yml start ksql-server-1

 

 

'프로그래밍 > Kafka' 카테고리의 다른 글

kafka schema registry  (0) 2022.08.26
kafka stream  (0) 2022.08.26
kafka connect  (0) 2022.08.25
kafka 모니터링  (0) 2022.08.01
kafka 전략  (0) 2022.07.31
Comments