- Docker
- Flink
- Flink SQL Client
Start the local flink cluster. You can start the cluster by running the following command in the $FLINK_HOME directory
./bin/start-cluster.sh
Then you can access the Flink Web UI at http://localhost:8081
Start the local flink sql client
./bin/sql-client.sh
Start services
docker-compose up -d
Run the following command to create the tables and insert some data into MySQL and PostgreSQL
cat mysql-init.sql | docker-compose -f docker-compose.yaml exec -T mysql mysql -uroot -p123456
cat postgres-init.sql | docker-compose -f docker-compose.yaml exec -T postgres psql -h localhost -U postgres
On the Flink SQL Client terminal, create table for source and sink
SET execution.checkpointing.interval = 3s;
Create the source table for the products table in MySQL
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products',
'server-time-zone' = 'UTC'
);
Create the source table for the orders table in MySQL
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders',
'server-time-zone' = 'UTC'
);
Create the source table for the shipments table in PostgreSQL
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments',
'slot.name' = 'flink'
);
Create the sink table for the enriched_orders table in Elasticsearch
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
Insert some data into the enriched_orders table in Elasticsearch which is the result of joining the orders, products, and shipments tables which submits a flink job to join the tables
INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
In order to see the changes in the enriched_orders table in Elasticsearch, you can update the orders table in MySQL and shipments table in PostgreSQL
INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
UPDATE orders SET order_status = true WHERE order_id = 10004;
INSERT INTO shipments VALUES (default,10004,'Shanghai','Beijing',false);
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
Stop the services
docker-compose down
Stop the local flink cluster
./bin/stop-cluster.sh