Enriching Kafka Stream with Another Stream Using Flink
- Install PostgreSQL 11+
- Setup PostgreSQL to allow Debezium to CDC using pgoutput. Reference here
- Setup Apache Kafka (with Kafka Connect) on your machine/cluster
- Install Debezium PostgreSQL connector from here
- Run Apache Kafka & Kafka Connect
- Create table
transactions
and customers
in PostgreSQL (SQL file in here)
- Create POST request to your Kafka Connect REST interface with request body as below
{
"name": "postgres_cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<host>",
"database.port": "<port>",
"database.user": "<username>",
"database.password": "****",
"database.dbname" : "<dbname>",
"database.server.name": "<servername>",
"table.whitelist": "<schema>.customers,<schema>.transactions",
"plugin.name": "pgoutput"
}
}
- Run the jar
Streaming Job Available Parameters
--checkpoint-path
: path to save Flink's checkpoints.
--debug-result-stream
: whether to debug result stream to the console or not
--environment
: environment to run the app
--auto-offset-reset
: Kafka auto.offset.reset
parameter
--boostrap-server
: Kafka bootstrap servers
--consumer-group-id
: Kafka consumer group ID
--offset-strategy
: whether to get earliest or latest offset from Kafka
--source-topic-1
: Kafka transactions stream name
--source-topic-2
: Kafka customers stream name
--target-topic
: target topic name to publish enriched data
--properties-file
: properties file to load parameters from