That repo is used to practice Kafka Streams & Kafka Connect. Here I would expose working configuration of an environment, instruction how to run simple test on it, suggestion of practice tasks and example of the solution.
Here are several ideas of task that could be used for practice.
- Join data from client table
- Create event of status change
- Configure backups of events
- Configure live synchronization with another database
- Configure reindexing of events
TBD: Here should be the database description and diagram of the initial stream processing.
- Docker & Docker Compose
- Java 17+
- Python 3 + PIP
You need to download the following plugins:
Also for JDBC connector, you need to download JDBC driver for your database. For example, for MySQL you can download it from here: dev.mysql.com/downloads/connector/j/
Finally, you should get the following files:
connect-plugins/
├── confluentinc-kafka-connect-jdbc-10.7.4/
connect-libs/
├── mysql-connector-java-8.0.26.jar
For that demo, python used to simplify data generation. Because it allows executing requests to database with less amount of code.
TBD Generally you need python 3 with mysql connector. I will describe how to install python venv with it later.
To run environment, you should run docker compose, from the repository's root directory.
docker compose up -d
It could take several minutes to download all images and start them all.
I prepared a list of requests that could be helpful to work with
Kafka Connect and Schema Registry. It could be found in the file
platform.http
.
To start, you need to create connectors that extract data from the MySQL. To do that, execute the following requests:
### [Connect] Create connector for invoices
### [Connect] Create connector for invoice items
Then you may check the connector status.
### [Connect] Connector status - invoices-source
### [Connect] Connector status - invoice-items-source
You should get something like:
{
"name": "invoices-source",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "source"
}
To check that events are published, we need to generate some data. It could be done using the python script.
python data-generator/data-generator.py
Then you could use console consumer from schema-registry container to select check that some events are published.
docker exec -it schema-registry /bin/bash
kafka-avro-console-consumer --bootstrap-server broker:29092 --topic source-mysql-invoice-item --from-beginning
You also should be available to get event schema from the schema registry.
### [Schema Registry] Get invoice schema
### [Schema Registry] Get invoice item schema
We will use Specific Avro Records in our Kafka Streams application. It requires to generate Java classes from Avro schemas. To do that, you need to do the following:
- Download Avro schemas from the Schema Registry (you also may do that manually using
platform.http
file)
wget http://localhost:8081/subjects/source-mysql-invoice-value/versions/-1/schema -O stream/src/main/avro/Invoice.avsc
wget http://localhost:8081/subjects/source-mysql-invoice-item-value/versions/-1/schema -O stream/src/main/avro/InvoiceItem.avsc
Then you should be able to generate Java classes using the following command:
./gradlew generateAvroJava
Classes will be generated in the stream/build/generated-main-avro-java
.
Finally, you could run Kafka Streams application using the following command:
./gradlew bootRun
In logs, you should see logs like that (it can take some time to execute join):
2023-10-01T22:11:11.258+02:00 INFO 36384 --- [-StreamThread-1] c.a.jointables.invoice.JoinInvoiceData : Processing finished: invoice event bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6: {"id": "bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6", "code": "xpzAaZQfvOhNTDLo", "items": [{"id": "bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6", "name": "Product 3"}, {"id": "bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6", "name": "Product 3"}], "created_at": 2023-10-01T20:10:22Z, "last_updated_at": 2023-10-01T20:10:22Z}.
2023-10-01T22:11:11.420+02:00 INFO 36384 --- [-StreamThread-1] c.a.jointables.invoice.JoinInvoiceData : Processing finished: invoice event a17ecf53-a4fc-4a05-8b9a-448f8f17a33e: {"id": "a17ecf53-a4fc-4a05-8b9a-448f8f17a33e", "code": "WVqsQBAOmHznIvRB", "items": [{"id": "a17ecf53-a4fc-4a05-8b9a-448f8f17a33e", "name": "Product 2"}, {"id": "a17ecf53-a4fc-4a05-8b9a-448f8f17a33e", "name": "Product 1"}], "created_at": 2023-10-01T20:10:22Z, "last_updated_at": 2023-10-01T20:10:22Z}.
now if you check the resulting topic:
docker exec -it schema-registry /bin/bash
kafka-avro-console-consumer --bootstrap-server broker:29092 --topic invoices-event-log --from-beginning
The result should be like that:
{"id":"bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6","code":"xpzAaZQfvOhNTDLo","items":[{"id":"bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6","name":"Product 3"},{"id":"bf3ce37e-c1d3-4d6a-a004-d2ed6f9e9ae6","name":"Product 3"}],"created_at":1696191022000,"last_updated_at":1696191022000}
{"id":"a17ecf53-a4fc-4a05-8b9a-448f8f17a33e","code":"WVqsQBAOmHznIvRB","items":[{"id":"a17ecf53-a4fc-4a05-8b9a-448f8f17a33e","name":"Product 2"},{"id":"a17ecf53-a4fc-4a05-8b9a-448f8f17a33e","name":"Product 1"}],"created_at":1696191022000,"last_updated_at":1696191022000}