cd kafka
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
Schema Registry from Confluent Community
curl -O https://packages.confluent.io/archive/7.5/confluent-community-7.5.2.tar.gz
tar xzf confluent-7.5.2.tar.gz
cd confluent-7.5.2
./bin/schema-registry-start schema-registry.properties
./ksql http://localhost:8088
SET 'auto.offset.reset' = 'earliest';
Easy aggregation
CREATE STREAM air_stream WITH (kafka_topic='air-fluff', value_format='AVRO');
CREATE TABLE air_latest WITH (value_format='AVRO') AS
SELECT
CLIENT,
LATEST_BY_OFFSET(unique_id) as latest_id,
LATEST_BY_OFFSET(key) as key,
LATEST_BY_OFFSET(id) as id,
LATEST_BY_OFFSET(message) as message
FROM air_stream
GROUP BY CLIENT;
SELECT * FROM air_latest EMIT CHANGES;
Theory Only!
LIST TOPICS;
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM air_stream WITH (kafka_topic='air-fluff', value_format='AVRO');
CREATE STREAM air_stream_by_unique_id with (value_format='AVRO') AS SELECT * FROM air_stream PARTITION BY unique_id;#CREATE TABLE air_table_by_unique_id (unique_id varchar primary key) WITH (kafka_topic='AIR_STREAM_BY_UNIQUE_ID', value_format='AVRO');
CREATE TABLE air_table_by_unique_id (latest_id varchar PRIMARY KEY, client varchar) with (value_format='AVRO') AS SELECT LATEST_BY_OFFSET(unique_id) as latest_id, CLIENT FROM AIR_STREAM_BY_UNIQUE_ID GROUP BY CLIENT EMIT CHANGES;
CREATE TABLE air_latest WITH (value_format='AVRO') AS SELECT LATEST_BY_OFFSET(unique_id) as latest_id, CLIENT FROM air_table_by_unique_id GROUP BY CLIENT EMIT CHANGES;
CREATE STREAM air_stream_latest AS SELECT * FROM air_stream_by_unique_id INNER JOIN air_latest ON air_stream_by_unique_id.unique_id=air_latest.latest_id EMIT CHANGES;