/kafka-scala-examples

Examples of Avro, Kafka, Schema Registry, Kafka Streams, Interactive Queries, KSQL, Kafka Connect in Scala

Primary LanguageScala

kafka-scala-examples

Build Status Scala Steward badge

Examples in Scala of

Local environment

# start locally
# - zookeeper
# - kafka
# - kafka-rest
# - kafka-ui
# - schema-registry
# - schema-registry-ui
# - ksql-server
# - ksql-cli
# - kafka-connect
# - kafka-connect-ui
docker-compose up

# (mac|linux) view kafka ui
[open|xdg-open] http://localhost:8000

# (mac|linux) view schema-registry ui
[open|xdg-open] http://localhost:8001

# (mac|linux) view kafka-connect ui
[open|xdg-open] http://localhost:8002

# cleanup
docker-compose down -v

If containers are crashing, make sure you have enough resources

# verify memory and cpu usage
docker ps -q | xargs docker stats --no-stream

# verify status
docker inspect <CONTAINER_NAME> | jq '.[].State'

avro

Description

Avro serialization and deserialization examples of

Demo

# console
sbt avro/console

# generate avro classes
# avro/target/scala-2.12/classes/com/kafka/demo/User.class
sbt clean avro/compile

# test
sbt clean avro/test

Resources

kafka

Description

Kafka apis example of

Demo

# access kafka
docker exec -it local-kafka bash

# create topic
# convention <MESSAGE_TYPE>.<DATASET_NAME>.<DATA_NAME>
# example [example.no-schema.original|example.no-schema.cakesolutions]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# delete topic
kafka-topics --zookeeper zookeeper:2181 \
  --delete --topic <TOPIC_NAME>

# view topic
kafka-topics --zookeeper zookeeper:2181 --list 
kafka-topics --zookeeper zookeeper:2181 --describe --topic <TOPIC_NAME>

# view topic offset
kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list kafka:9092 \
  --time -1 \
  --topic <TOPIC_NAME>

# list consumer groups
kafka-consumer-groups --bootstrap-server kafka:9092 --list

# view consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --describe

# reset consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --topic <TOPIC_NAME> \
  --reset-offsets \
  --to-earliest \
  --execute

# console producer
kafka-console-producer --broker-list kafka:9092 --topic <TOPIC_NAME>
kafkacat -P -b 0 -t <TOPIC_NAME>

# console consumer
kafka-console-consumer --bootstrap-server kafka:9092 --topic <TOPIC_NAME> --from-beginning
kafkacat -C -b 0 -t <TOPIC_NAME>

# producer example
sbt "kafka/runMain com.kafka.demo.original.Producer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Producer"

# consumer example
sbt "kafka/runMain com.kafka.demo.original.Consumer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Consumer"

# test
sbt clean kafka/test
sbt "test:testOnly *KafkaSpec"

Resources

schema-registry

Description

# register schema
# convention <TOPIC_NAME>-key or <TOPIC_NAME>-value
http -v POST :8081/subjects/example.with-schema.simple-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# import schema from file
http -v POST :8081/subjects/example.with-schema.user-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema=@avro/src/main/avro/user.avsc

# export schema to file
http :8081/subjects/example.with-schema.user-value/versions/latest \
  | jq -r '.schema|fromjson' \
  | tee avro/src/main/avro/user-latest.avsc

# list subjects
http -v :8081/subjects

# list subject's versions
http -v :8081/subjects/example.with-schema.simple-value/versions

# fetch by version
http -v :8081/subjects/example.with-schema.simple-value/versions/1

# fetch by id
http -v :8081/schemas/ids/1

# test compatibility
http -v POST :8081/compatibility/subjects/example.with-schema.simple-value/versions/latest \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# delete version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/1

# delete latest version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/latest

# delete subject
http -v DELETE :8081/subjects/example.with-schema.simple-value

# stringify
jq tostring avro/src/main/avro/user.avsc

Demo

# generate SpecificRecord classes under "schema-registry/target/scala-2.12/src_managed/main/compiled_avro"
sbt clean schema-registry/avroScalaGenerateSpecific

# (optional) create schema
http -v POST :8081/subjects/example.with-schema.payment-key/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'
http -v POST :8081/subjects/example.with-schema.payment-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema=@schema-registry/src/main/avro/Payment.avsc

# access kafka
docker exec -it local-kafka bash

# (optional) create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic example.with-schema.payment

# console producer (binary)
kafka-console-producer --broker-list kafka:9092 --topic example.with-schema.payment

# console consumer (binary)
kafka-console-consumer --bootstrap-server kafka:9092 --topic example.with-schema.payment

# access schema-registry
docker exec -it local-schema-registry bash

# avro console producer
# example "MyKey",{"id":"MyId","amount":10}
kafka-avro-console-producer --broker-list kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property parse.key=true \
  --property key.separator=, \
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"namespace":"io.confluent.examples.clients.basicavro","type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}'

# avro console consumer
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning

# producer example
sbt "schema-registry/runMain com.kafka.demo.specific.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.specific.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistrySpecificSpec"
# producer example
sbt "schema-registry/runMain com.kafka.demo.generic.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.generic.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistryGenericSpec"

Resources

Alternatives

TODO

  • generic + schema evolution
  • ovotech
  • multi-schema
  • formulation

kafka-streams

Description

Kafka Streams apis examples

Demo-1

# access kafka
docker exec -it local-kafka bash

# create topic
# example [example.to-upper-case-app.input|example.to-upper-case-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# ToUpperCaseApp example (input topic required)
sbt "streams/runMain com.kafka.demo.streams.ToUpperCaseApp"

# produce
kafka-console-producer --broker-list kafka:9092 \
  --topic example.to-upper-case-app.input

# consume
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic example.to-upper-case-app.output

# test
sbt clean streams/test

Demo-2

Tested with embedded-kafka and embedded-kafka-schema-registry

# access kafka
docker exec -it local-kafka bash

# create topic
# example [json.streams-json-to-avro-app.input|avro.streams-json-to-avro-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# produce (default StringSerializer)
kafka-console-producer \
  --broker-list kafka:9092 \
  --property "parse.key=true" \
  --property "key.separator=:" \
  --property "key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --property "value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --topic <TOPIC_NAME>

# consume (default StringDeserializer)
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --from-beginning \
  --property "print.key=true" \
  --property "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --property "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --topic <TOPIC_NAME>

# access schema-registry
docker exec -it local-schema-registry bash

# consume avro
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning \
  --topic <TOPIC_NAME>

# JsonToAvroApp example (input topic required)
sbt "streams-json-avro/runMain com.kafka.demo.JsonToAvroApp"

# test
sbt clean streams-json-avro/test

Example

# json
mykey:{"valueInt":42,"valueString":"foo"}

# log
[json.streams-json-to-avro-app.input]: mykey, JsonModel(42,foo)
[avro.streams-json-to-avro-app.output]: KeyAvroModel(mykey), ValueAvroModel(42,FOO)

Demo-3

TODO

  • CatsKafkaStreamsApp [source]
# run app
sbt -jvm-debug 5005 "cats-kafka-streams/runMain com.kafka.demo.CatsKafkaStreamsApp"

Demo-4

TODO

# run app
sbt -jvm-debug 5005 "zio-kafka-streams/runMain com.kafka.demo.ZioKafkaStreamsApp"

Resources

ksql

Description

Setup Kafka

# access kafka
docker exec -it local-kafka bash

# create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic USER_PROFILE

# produce sample data
kafka-console-producer --broker-list kafka:9092 --topic USER_PROFILE << EOF
{"userid": 1000, "firstname": "Alison", "lastname": "Smith", "countrycode": "GB", "rating": 4.7}
EOF

# consume
kafka-console-consumer --bootstrap-server kafka:9092 --topic USER_PROFILE --from-beginning

Access KSQL CLI

  • using the server

    # access ksql-server
    docker exec -it local-ksql-server bash
    
    # start ksql cli
    ksql http://ksql-server:8088
  • using a local instance

    # connect to local cli
    docker exec -it local-ksql-cli ksql http://ksql-server:8088
  • using a temporary instance

    # connect to remote server
    docker run --rm \
      --network=kafka-scala-examples_local_kafka_network \
      -it confluentinc/cp-ksql-cli http://ksql-server:8088

Execute SQL statements

# create stream
CREATE STREAM user_profile (\
  userid INT, \
  firstname VARCHAR, \
  lastname VARCHAR, \
  countrycode VARCHAR, \
  rating DOUBLE \
  ) WITH (KAFKA_TOPIC = 'USER_PROFILE', VALUE_FORMAT = 'JSON');

# verify stream
list streams;
describe user_profile;

# query stream
SELECT userid, firstname, lastname, countrycode, rating FROM user_profile EMIT CHANGES;

Expect the consumer and the query to show the generated data

# generate data
docker run --rm \
  -v $(pwd)/local/ksql:/datagen \
  --network=kafka-scala-examples_local_kafka_network \
  -it confluentinc/ksql-examples ksql-datagen \
  bootstrap-server=kafka:29092 \
  schemaRegistryUrl=http://schema-registry:8081 \
  schema=datagen/user_profile.avro \
  format=json \
  topic=USER_PROFILE \
  key=userid \
  maxInterval=5000 \
  iterations=100

kafka-connect

Setup PostgreSQL locally

# create shared network
docker-compose up

# start postgres
docker-compose -f docker-compose.postgres.yml up

# (mac|linux) view postgres ui
# [schema=public|database=postgres|username=postgres|password=postgres]
[open|xdg-open] http://localhost:8080

Setup connectors

# list connector
http -v :8083/connectors

# init data to generate schema
cp local/connect/data/resources-0.txt.orig local/connect/data/resources-0.txt

# setup spooldir source connector
http -v --json POST :8083/connectors < local/connect/config/source-spooldir-connector.json

# ingest data
echo "{\"accountId\":\"123\",\"resourceType\":\"XXX\",\"value\":\"X1\"}" > local/connect/data/resources-1.txt

# setup jdbc sink connector
# topic = SCHEMA.DATABASE = "public.postgres"
http -v --json POST :8083/connectors < local/connect/config/sink-jdbc-connector.json

# verify data
docker exec -it local-postgres bash -c "psql -U postgres postgres"
select * from public.postgres;

# cleanup
docker-compose -f docker-compose.postgres.yml down -v

extra

Resources

Tools

Companies