To run main pipeline
./gradlew run
To run fake producer to generate message every 20 seconds
./gradlew fakeProducer
Entrypoint | Description | Credentials |
---|---|---|
http://localhost:8080 | Kafka UI | -- |
docker compose exec -it ksql-cli ksql http://ksqldb:8088
From https://developer.confluent.io/courses/ksqldb/hands-on-interacting-with-ksqldb/
CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR) WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
SELECT * FROM MOVEMENTS EMIT CHANGES;
CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT PERSON,
LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY PERSON
EMIT CHANGES;
show tables;
SELECT * FROM PERSON_STATS;