Real-time Data Warehouse

Setup Overview The goal of this setup is to create a real-time data warehouse that ingests data from a PostgreSQL database using Debezium, processes it with Apache Flink, and stores it in different layers using Apache Hudi. Finally, it aggregates data and visualizes it using Kibana.

demo_overview

Getting the setup up and running

Docker Compose The setup is orchestrated using Docker Compose, which allows you to define and run multi-container applications easily.

docker compose build

docker compose up -d

Check everything really up and running

docker compose ps

You should be able to access the Flink Web UI (http://localhost:8081), as well as Kibana (http://localhost:5601).

Postgres

Postgres Start a PostgreSQL client to interact with the PostgreSQL source tables.

Check the available tables in the 'claims' schema.

Start the Postgres client to have a look at the source tables and run some DML statements later:

docker compose exec postgres env PGOPTIONS="--search_path=claims" bash -c 'psql -U $POSTGRES_USER postgres'

What tables are we dealing with?

SELECT * FROM information_schema.tables WHERE table_schema = 'claims';

Debezium

Start the Debezium Postgres connector using a provided configuration file (register-postgres.json).

Check that the connector is running.

Monitor changes in the accident_claims table by consuming messages from the corresponding Kafka topic.

Start the Debezium Postgres connector using the configuration provided in the register-postgres.json file:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-members.json

Check that the connector is running:

curl http://localhost:8083/connectors/claims-connector/status # | jq

The first time it connects to a Postgres server, Debezium takes a consistent snapshot of all database schemas; so, you should see that the pre-existing records in the accident_claims table have already been pushed into your Kafka topic:

docker compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic pg_claims.claims.accident_claims

ℹ️ Have a quick read about the structure of these events in the Debezium documentation.

Is it working?

In the tab you used to start the Postgres client, you can now run some DML statements to see that the changes are propagated all the way to your Kafka topic:

INSERT INTO accident_claims (claim_total, claim_total_receipt, claim_currency, member_id, accident_date, accident_type,accident_detail, claim_date, claim_status) VALUES (500, 'PharetraMagnaVestibulum.tiff', 'AUD', 321, '2020-08-01 06:43:03', 'Collision', 'Blue Ringed Octopus','2020-08-10 09:39:31', 'INITIAL');
UPDATE accident_claims
SET claim_total_receipt = 'CorrectReceipt.pdf'
WHERE claim_id = 1001;
DELETE
FROM accident_claims
WHERE claim_id = 1001;

In the output of your Kafka console consumer, you should now see three consecutive events with op values equal to c (an insert event), u (an update event) and d (a delete event).

Flink connectors

This section mentions Flink connectors but doesn't provide specific details. You can explore these links to understand more about Flink connectors:

Flink Table Connectors Flink Packages - Connectors Flink Faker

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/ https://flink-packages.org/categories/connectors https://github.com/knaufk/flink-faker/

Datasource ingestion

Start the Flink SQL Client to execute SQL commands.

Define tables, such as t1, and specify their connectors. In this case, a 'hudi' connector is used.

Insert data into the defined tables.

Register a Postgres catalog to access external tables over JDBC.

Create external tables (e.g., datasource.accident_claims) that fetch data from Kafka topics (Debezium data) or generate data using a datagen connector.

Query and check the data in the external tables.

Start the Flink SQL Client:

docker compose exec sql-client ./sql-client.sh

OR

docker compose exec sql-client ./sql-client-submit.sh

test

CREATE TABLE t1(
  uuid VARCHAR(20), -- you can use 'PRIMARY KEY NOT ENFORCED' syntax to mark the field as record key
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '/data/t1',
  'write.tasks' = '1', -- default is 4 ,required more resource
  'compaction.tasks' = '1', -- default is 10 ,required more resource
  'table.type' = 'COPY_ON_WRITE', -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
  'read.tasks' = '1', -- default is 4 ,required more resource
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.streaming.start-commit' = '0', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

SELECT * FROM t1;

Register a Postgres catalog , so you can access the metadata of the external tables over JDBC:

CREATE CATALOG datasource WITH (
    'type'='jdbc',
    'property-version'='1',
    'base-url'='jdbc:postgresql://postgres:5432/',
    'default-database'='postgres',
    'username'='postgres',
    'password'='postgres'
);
CREATE DATABASE IF NOT EXISTS datasource;
CREATE TABLE datasource.accident_claims WITH (
                                            'connector' = 'kafka',
                                            'topic' = 'pg_claims.claims.accident_claims',
                                            'properties.bootstrap.servers' = 'kafka:9092',
                                            'properties.group.id' = 'accident_claims-consumer-group',
                                            'format' = 'debezium-json',
                                            'scan.startup.mode' = 'earliest-offset'
                                            ) LIKE datasource.postgres.`claims.accident_claims` (EXCLUDING ALL);

OR generate data from datagen connector:

CREATE TABLE datasource.accident_claims(
    claim_id            BIGINT,
    claim_total         DOUBLE,
    claim_total_receipt VARCHAR(50),
    claim_currency      VARCHAR(3),
    member_id           INT,
    accident_date       DATE,
    accident_type       VARCHAR(20),
    accident_detail     VARCHAR(20),
    claim_date          DATE,
    claim_status        VARCHAR(10),
    ts_created          TIMESTAMP(3),
    ts_updated          TIMESTAMP(3)
                                          ) WITH (
                                            'connector' = 'datagen',
                                            'rows-per-second' = '100'
                                            );

and members table:

CREATE TABLE datasource.members WITH (
                                    'connector' = 'kafka',
                                    'topic' = 'pg_claims.claims.members',
                                    'properties.bootstrap.servers' = 'kafka:9092',
                                    'properties.group.id' = 'members-consumer-group',
                                    'format' = 'debezium-json',
                                    'scan.startup.mode' = 'earliest-offset'
                                    ) LIKE datasource.postgres.`claims.members` ( EXCLUDING OPTIONS);

OR generate data from datagen connector:

CREATE TABLE datasource.members(
    id                BIGINT,
    first_name        VARCHAR(50),
    last_name         VARCHAR(50),
    address           VARCHAR(50),
    address_city      VARCHAR(10),
    address_country   VARCHAR(10),
    insurance_company VARCHAR(25),
    insurance_number  VARCHAR(50),
    ts_created        TIMESTAMP(3),
    ts_updated        TIMESTAMP(3)
                                    ) WITH (
                                            'connector' = 'datagen',
                                            'rows-per-second' = '100'
                                            );

Check data:

SELECT * FROM datasource.accident_claims;
SELECT * FROM datasource.members;

DWD

DWD (Data Warehouse - Data) Create a 'dwd' (Data Warehouse - Data) database.

Define Hudi tables (e.g., dwd.accident_claims) for data in the Data Warehouse - Data layer.

Insert data into the DWD layer tables, fetching it from the datasource layer (previous layer).

Check the data in the DWD layer tables.

Create a database in DWD layer:

CREATE DATABASE IF NOT EXISTS dwd;
CREATE TABLE dwd.accident_claims
(
    claim_id            BIGINT,
    claim_total         DOUBLE,
    claim_total_receipt VARCHAR(50),
    claim_currency      VARCHAR(3),
    member_id           INT,
    accident_date       DATE,
    accident_type       VARCHAR(20),
    accident_detail     VARCHAR(20),
    claim_date          DATE,
    claim_status        VARCHAR(10),
    ts_created          TIMESTAMP(3),
    ts_updated          TIMESTAMP(3),
    ds                  DATE,
    PRIMARY KEY (claim_id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
  'connector'='hudi',
  'path' = '/data/dwd/accident_claims',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'write.batch.size' = '1',
  'write.tasks' = '1',
  'compaction.tasks' = '1',
  'compaction.delta_seconds' = '60',
  'write.precombine.field' = 'ts_updated',
  'read.tasks' = '1',
  'read.streaming.check-interval' = '5',
  'read.streaming.start-commit' = '20210712134429',
  'index.bootstrap.enabled' = 'true'
);
CREATE TABLE dwd.members
(
    id                BIGINT,
    first_name        VARCHAR(50),
    last_name         VARCHAR(50),
    address           VARCHAR(50),
    address_city      VARCHAR(10),
    address_country   VARCHAR(10),
    insurance_company VARCHAR(25),
    insurance_number  VARCHAR(50),
    ts_created        TIMESTAMP(3),
    ts_updated        TIMESTAMP(3),
    ds                DATE,
    PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
      'connector'='hudi',
      'path'='/data/dwd/members',
      'table.type' = 'MERGE_ON_READ',
      'read.streaming.enabled' = 'true',
      'write.batch.size' = '1',
      'write.tasks' = '1',
      'compaction.tasks' = '1',
      'compaction.delta_seconds' = '60',
      'write.precombine.field' = 'ts_updated',
      'read.tasks' = '1',
      'read.streaming.check-interval' = '5',
      'read.streaming.start-commit' = '20210712134429',
      'index.bootstrap.enabled' = 'true'
);

and submit a continuous query to the Flink cluster that will write the data from datasource into dwd table(ES):

INSERT INTO dwd.accident_claims
SELECT claim_id,
       claim_total,
       claim_total_receipt,
       claim_currency,
       member_id,
       CAST (accident_date as DATE),
       accident_type,
       accident_detail,
       CAST (claim_date as DATE),
       claim_status,
       CAST (ts_created as TIMESTAMP),
       CAST (ts_updated as TIMESTAMP),
       claim_date
       --CAST (SUBSTRING(claim_date, 0, 9) as DATE)
FROM datasource.accident_claims;
INSERT INTO dwd.members
SELECT id,
       first_name,
       last_name,
       address,
       address_city,
       address_country,
       insurance_company,
       insurance_number,
       CAST (ts_created as TIMESTAMP),
       CAST (ts_updated as TIMESTAMP),
       ts_created
       --CAST (SUBSTRING(ts_created, 0, 9) as DATE)
FROM datasource.members;

Check data:

SELECT * FROM dwd.accident_claims;
SELECT * FROM dwd.members;

DWB

DWB (Data Warehouse - Business) Create a 'dwb' (Data Warehouse - Business) database.

Define Hudi tables (e.g., dwb.accident_claims) for data in the Data Warehouse - Business layer.

Insert data into the DWB layer tables, fetching it from the DWD layer (previous layer).

Check the data in the DWB layer tables.

Create a database in DWB layer:

CREATE DATABASE IF NOT EXISTS dwb;
CREATE TABLE dwb.accident_claims
(
    claim_id            BIGINT,
    claim_total         DOUBLE,
    claim_total_receipt VARCHAR(50),
    claim_currency      VARCHAR(3),
    member_id           INT,
    accident_date       DATE,
    accident_type       VARCHAR(20),
    accident_detail     VARCHAR(20),
    claim_date          DATE,
    claim_status        VARCHAR(10),
    ts_created          TIMESTAMP(3),
    ts_updated          TIMESTAMP(3),
    ds                  DATE,
    PRIMARY KEY (claim_id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
  'connector'='hudi',
  'path' = '/data/dwb/accident_claims',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'write.batch.size' = '1',
  'write.tasks' = '1',
  'compaction.tasks' = '1',
  'compaction.delta_seconds' = '60',
  'write.precombine.field' = 'ts_updated',
  'read.tasks' = '1',
  'read.streaming.check-interval' = '5',
  'read.streaming.start-commit' = '20210712134429',
  'index.bootstrap.enabled' = 'true'
);
CREATE TABLE dwb.members
(
    id                BIGINT,
    first_name        VARCHAR(50),
    last_name         VARCHAR(50),
    address           VARCHAR(50),
    address_city      VARCHAR(10),
    address_country   VARCHAR(10),
    insurance_company VARCHAR(25),
    insurance_number  VARCHAR(50),
    ts_created        TIMESTAMP(3),
    ts_updated        TIMESTAMP(3),
    ds                DATE,
    PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (ds) WITH (
      'connector'='hudi',
      'path'='/data/dwb/members',
      'table.type' = 'MERGE_ON_READ',
      'read.streaming.enabled' = 'true',
      'write.batch.size' = '1',
      'write.tasks' = '1',
      'compaction.tasks' = '1',
      'compaction.delta_seconds' = '60',
      'write.precombine.field' = 'ts_updated',
      'read.tasks' = '1',
      'read.streaming.check-interval' = '5',
      'read.streaming.start-commit' = '20210712134429',
      'index.bootstrap.enabled' = 'true'
);
INSERT INTO dwb.accident_claims
SELECT claim_id,
       claim_total,
       claim_total_receipt,
       claim_currency,
       member_id,
       accident_date,
       accident_type,
       accident_detail,
       claim_date,
       claim_status,
       ts_created,
       ts_updated,
       ds
FROM dwd.accident_claims;
INSERT INTO dwb.members
SELECT id,
       first_name,
       last_name,
       address,
       address_city,
       address_country,
       insurance_company,
       insurance_number,
       ts_created,
       ts_updated,
       ds
FROM dwd.members;

Check data:

SELECT * FROM dwb.accident_claims;
SELECT * FROM dwb.members;

DWS

Create a 'dws' (Data Warehouse - Serving) database.

Define Elasticsearch sink tables (e.g., dws.insurance_costs) for serving layer data.

Submit continuous queries that perform aggregations and write data to Elasticsearch.

Kibana Dashboard Create a Kibana dashboard to visualize the aggregated data in Elasticsearch.

Create a database in DWS layer:

CREATE DATABASE IF NOT EXISTS dws;
CREATE TABLE dws.insurance_costs
(
    es_key            STRING PRIMARY KEY NOT ENFORCED,
    insurance_company STRING,
    accident_detail   STRING,
    accident_agg_cost DOUBLE
) WITH (
      'connector' = 'elasticsearch-7', 'hosts' = 'http://elasticsearch:9200', 'index' = 'agg_insurance_costs'
      );

and submit a continuous query to the Flink cluster that will write the aggregated insurance costs per insurance_company, bucketed by accident_detail (or, what animals are causing the most harm in terms of costs):

INSERT INTO dws.insurance_costs
SELECT UPPER(SUBSTRING(m.insurance_company, 0, 4) || '_' || SUBSTRING(ac.accident_detail, 0, 4)) es_key,
       m.insurance_company,
       ac.accident_detail,
       SUM(ac.claim_total) member_total
FROM dwb.accident_claims ac
         JOIN dwb.members m
              ON ac.member_id = m.id
WHERE ac.claim_status <> 'DENIED'
GROUP BY m.insurance_company, ac.accident_detail;

Finally, create a simple dashboard in Kibana

References