An exploration of Flink and change-data-capture (CDC). We will try to examine what it's like to have Flink directly manage CDC, omitting messaging middleware (Kafka, Pulsar, etc.). For comparison, here's another exploration of that does include an event log middleware (Apache Pulsar) in the system: experiment-flink-pulsar-debezium.
Here, this exploration primariy leverages:
- Flink's Debezium embedding flink-cdc-connectors
- Flink Table API
- Flink SQL Client
It is based and adapted from these resources.
If you're impatient and would like to jump directly to the takeaways from this exploration.
Otherwise, continue reading below for a hands on example that you can run on your own.
Here's the system we'll experiment with
The general theme of "I want to get state from Point-A to Point-B, maybe transform it along the way, and continue to keep it updated, in near real-time" is a fairly common story that can take a variety of forms.
- data integration amongst microservices
- analytic datastore loading and updating
- cache maintenance
- search index syncing
Given these use cases, some interesting questions to explore are:
- Fundamentally, how well does a stream processing paradigm speak to these use cases? (I believe it does quite well. [1, 2, 3])
- How about Flink and its ecosystem?
- From a technological lens: how's performance, scalability, and fault tolerence?
- From a usability lens: what types of personas might be successful using various types of solutions? For example, how easy to use and powerful are Flink's Table API and SQL Client, vs its more expressive lower level API's. And what types of personas might be good fits for each?
Build and bring up the system
docker-compose build
docker-compose up
For some visibility into the Flink system, Flink provides a web UI. To check it out, visit: http://localhost:8081/#/overview
Log into source-db1
psql shell
docker-compose exec source-db1 psql experiment experiment
Start Flink SQL client
docker-compose exec sql-client ./sql-client.sh
Define a Dynamic
Table
using the source-db1
database users
table. See more connector configuration
options
here.
-- Flink SQL Client
CREATE TABLE source_db1_users (
id BIGINT NOT NULL,
full_name STRING
) WITH (
'connector' = 'postgres-cdc',
'decoding.plugin.name' = 'pgoutput',
'hostname' = 'source-db1',
'port' = '5432',
'username' = 'experiment',
'password' = 'experiment',
'database-name' = 'experiment',
'schema-name' = 'public',
'table-name' = 'users'
);
In source-db1
psql, examine the replication
slots
-- source-db1 psql
SELECT * FROM pg_replication_slots;
-- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-- -----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
-- (0 rows)
Notice that there are no replication slots, yet.
Now start a Continuous
Query
via the Flink SQL CLI that connects to source-db1
.
-- Flink SQL Client
SELECT * FROM source_db1_users;
In source-db1
psql, examine the replication slots again
-- source-db1 psql
SELECT * FROM pg_replication_slots;
-- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-- -----------+----------+-----------+--------+------------+-----------+--------+------------+------+--------------+-------------+---------------------
-- flink | pgoutput | logical | 16384 | experiment | f | f | | | 560 | 0/1660A30 | 0/1660A68
-- (1 row)
Notice that a replication slot named flink
was created.
And insert a row into the users
table
-- source-db1 psql
INSERT INTO users (full_name) VALUES ('susan smith');
Notice that the query in the Flink SQL client window now shows this new row!
Update the row
-- source-db1 psql
UPDATE users SET full_name = 'susanna smith' WHERE id = 1;
Notice that the query in the Flink SQL client window now shows the updated value!
Set up another Dynamic Table, via a connector to another Postgres database,
source-db2
-- Flink SQL Client
CREATE TABLE source_db2_users_favorite_color (
id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
favorite_color STRING
) WITH (
'connector' = 'postgres-cdc',
'decoding.plugin.name' = 'pgoutput',
'hostname' = 'source-db2',
'port' = '5432',
'username' = 'experiment',
'password' = 'experiment',
'database-name' = 'experiment',
'schema-name' = 'public',
'table-name' = 'users_favorite_color'
);
Try a query that joins data from the two source tables
-- Flink SQL Client
SELECT * FROM source_db1_users AS u
LEFT JOIN source_db2_users_favorite_color AS ufc
ON u.id = ufc.user_id;
-- id full_name id0 user_id favorite_color
-- 1 susanna smith (NULL) (NULL) (NULL)
Now let's create a JDBC
sink
that connects to the sink-db1
database's users_full_name_and_favorite_color
table.
-- Flink SQL Client
CREATE TABLE sink_db1_users_full_name_and_favorite_color (
id BIGINT NOT NULL,
full_name STRING,
favorite_color STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://sink-db1:5432/experiment',
'username' = 'experiment',
'password' = 'experiment',
'table-name' = 'users_full_name_and_favorite_color'
);
Let's examine what's at the sink table.
-- sink-db1 psql, docker-compose exec sink-db1 psql experiment experiment
SELECT * FROM users_full_name_and_favorite_color;
-- id | full_name | favorite_color
-- ----+-----------+----------------
-- (0 rows)
There's nothing there yet, because we have not written anything there yet.
Now let's write to the sink using data joined from source-db1
and
source-db2
. We can do this via a Continuous
Query
performing an INSERT
into the sink.
-- Flink SQL Client
INSERT INTO sink_db1_users_full_name_and_favorite_color (id, full_name, favorite_color)
SELECT
u.id,
u.full_name,
ufc.favorite_color
FROM source_db1_users AS u
LEFT JOIN source_db2_users_favorite_color AS ufc
ON u.id = ufc.user_id;
Check the results at sink-db1
-- sink-db1 psql
SELECT * FROM users_full_name_and_favorite_color;
-- id | full_name | favorite_color
-- ----+---------------+----------------
-- 1 | susanna smith |
-- (1 row)
Insert some new data into source-db2
-- source-db2 psql, docker-compose exec source-db2 psql experiment experiment
INSERT INTO users_favorite_color (user_id, favorite_color) VALUES (1, 'blue');
Check out the results at sink-db1
again
-- sink-db1 psql
SELECT * FROM users_full_name_and_favorite_color;
-- id | full_name | favorite_color
-- ----+-------------+----------------
-- 1 | susan smith | blue
-- (1 row)
Try out a few updates at the source databases
-- source-db1 psql
UPDATE users SET full_name = 'sue smith' WHERE id = 1;
INSERT INTO users (full_name) VALUES ('bob smith');
-- source-db2 psql
INSERT INTO users_favorite_color (user_id, favorite_color) VALUES (2, 'red');
And refresh the sink...
-- sink-db1 psql
SELECT * FROM users_full_name_and_favorite_color;
-- id | full_name | favorite_color
-- ----+-----------+----------------
-- 1 | sue smith | blue
-- 2 | bob smith | red
-- (2 rows)
In another concurrent Flink SQL client session, trying to add a concurrent
stream to source-db1
.
Bring up another Flink SQL client session
docker-compose exec sql-client ./sql-client.sh
Set up another source connection
-- Flink SQL Client
CREATE TABLE source_db1_test (
id BIGINT NOT NULL,
hello STRING
) WITH (
'connector' = 'postgres-cdc',
'decoding.plugin.name' = 'pgoutput',
'hostname' = 'source-db1',
'port' = '5432',
'username' = 'experiment',
'password' = 'experiment',
'database-name' = 'experiment',
'schema-name' = 'public',
'table-name' = 'test'
);
SELECT * FROM source_db1_test;
-- [ERROR] Could not execute SQL statement. Reason:
-- org.postgresql.util.PSQLException: FATAL: number of requested standby connections exceeds max_wal_senders (currently 1)
Set Postgres max_wal_senders=10
at the source databases (in
docker-compose.yaml
), restart the system, and try again
-- Flink SQL Client
CREATE TABLE source_db1_test (
id BIGINT NOT NULL,
hello STRING
) WITH (
'connector' = 'postgres-cdc',
'decoding.plugin.name' = 'pgoutput',
'hostname' = 'source-db1',
'port' = '5432',
'username' = 'experiment',
'password' = 'experiment',
'database-name' = 'experiment',
'schema-name' = 'public',
'table-name' = 'test'
);
SELECT * FROM source_db1_test;
-- [ERROR] Could not execute SQL statement. Reason:
-- org.postgresql.util.PSQLException: ERROR: replication slot "flink" is active for PID 34
Takeaway: A single Postgres replication slot cannot be shared concurrently.
Ok then - let's try setting up another replication slot.
Set Postgres max_replication_slots=10
at the source databases (in
docker-compose.yaml
), restart the system, and try again
CREATE TABLE source_db1_test (
id BIGINT NOT NULL,
hello STRING
) WITH (
'connector' = 'postgres-cdc',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'flink2', -- ADDED THIS
'hostname' = 'source-db1',
'port' = '5432',
'username' = 'experiment',
'password' = 'experiment',
'database-name' = 'experiment',
'schema-name' = 'public',
'table-name' = 'test'
);
select * from source_db1_test;
-- [ERROR] Could not execute SQL statement. Reason:
-- org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'postgres-cdc'.
-- Unsupported options:
-- slot.name
-- Supported options:
-- connector
-- database-name
-- decoding.plugin.name
-- hostname
-- password
-- port
-- property-version
-- schema-name
-- table-name
-- username
Takeaway: Bad news, we need to provide a unique slot name, but are not yet
able to as of v1.2.0
of flink-cdc-connectors
. However, the good news is that
slot.name
config
is in the master
branch though.
Instead of setting up each JDBC sink table individually, the Flink JDBC connector can be set up to access and use Postgres' catalog.
First, if it's still running, stop the old INSERT
job via Flink's web
interface (http://localhost:8081/#/job/running)
Now set up a new connection via Postgres' catalog.
-- Flink SQL Client
-- Drop the old sink table in Flink's catalog
DROP TABLE sink_db1_users_full_name_and_favorite_color
-- Connect to Postgres catalog in Flink
CREATE CATALOG sink_db1 WITH (
'type'='jdbc',
'property-version'='1',
'base-url'='jdbc:postgresql://sink-db1:5432/',
'default-database'='experiment',
'username'='experiment',
'password'='experiment'
);
-- Access sink-db1 via Flink's connection to Postgres
SELECT * FROM sink_db1.experiment.users_full_name_and_favorite_color;
-- Create a new INSERT job
INSERT INTO sink_db1.experiment.users_full_name_and_favorite_color (id, full_name, favorite_color)
SELECT
u.id,
u.full_name,
ufc.favorite_color
FROM source_db1_users AS u
LEFT JOIN source_db2_users_favorite_color AS ufc
ON u.id = ufc.user_id;
Take a look at the data at the sink
-- sink-db1 psql
SELECT * FROM users_full_name_and_favorite_color;
-- id | full_name | favorite_color
-- ----+-----------+----------------
-- 2 | bob smith | red
-- 1 | sue smith | blue
-- (2 rows)
Insert new data to a source
-- source-db1 psql
INSERT INTO users (full_name) VALUES ('anne smith');
Take a look at the data at the sink again
-- sink-db1 psql
SELECT * FROM users_full_name_and_favorite_color;
-- id | full_name | favorite_color
-- ----+-----------+----------------
-- 2 | bob smith | red
-- 1 | sue smith | blue
-- 3 | ann smith |
-- (3 rows)
The new entry should appear.
The Postgres catalog can also be used for table schema metadata when creating
tables in Flink's catalog, via CREATE TABLE...LIKE
-- FLINK SQL Client
CREATE TABLE sink_db1_users_full_name_and_favorite_color
LIKE sink_db1.experiment.users_full_name_and_favorite_color (INCLUDING OPTIONS);
SELECT * FROM sink_db1_users_full_name_and_favorite_color;
Start a Flink Continuous Query
-- Flink SQL Client
SELECT * FROM source_db1_users;
Now, let's try adding a new column
-- source-db1 psql
ALTER TABLE users
ADD new_column VARCHAR;
The Flink query is still running and ok...
Let's insert some data
-- source-db1 psql
INSERT INTO users (full_name, new_column) VALUES ('fred smith', 'value for new column');
The Flink query is still running and ok...
Now let's try dropping a column that our Flink query was using.
-- source-db1 psql
ALTER TABLE users
DROP COLUMN full_name;
The Flink query is still running and ok...
Now insert some new data at the source.
-- source-db1 psql
INSERT INTO users (new_column) VALUES ('another value for new column');
Now the Flink query errors
[ERROR] Could not execute SQL statement. Reason:
org.apache.kafka.connect.errors.DataException: full_name is not a valid field name
First, let's try adding a new column
-- sink-db1 psql
ALTER TABLE users_full_name_and_favorite_color
ADD COLUMN new_column VARCHAR;
Insert some data
-- source-db1 psql
INSERT INTO users (full_name) VALUES ('sally');
Try a Flink query
-- Flink SQL CLI
SELECT * FROM sink_db1_users_full_name_and_favorite_color;
The new row is there, everything still works
Now try removing a column.
-- sink-db1 psql
ALTER TABLE users_full_name_and_favorite_color
DROP COLUMN full_name;
And insert some new data at a source
-- source-db1 psql
INSERT INTO users (full_name) VALUES ('bob');
Check out the Flink web ui (http://localhost:8081). The Flink job performing the
INSERT
keeps running, and the task manager logs show errors.
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC executeBatch error, retry times = 0
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO users_full_name_and_favorite_color(id, full_name, favorite_color) VALUES (4, 'sally', NULL) ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, full_name=EXCLUDED.full_name, favorite_color=EXCLUDED.favorite_color was aborted: ERROR: column "full_name" of relation "users_full_name_and_favorite_color" does not exist
The flink-cdc-connector
approach to setting up CDC offers a nice, easy way to
set up CDC. It conveniently does not require additional event log infrastructure
(Kafka, Pulsar, etc.) in the system. That said, with the removal of that
additional layer of indirection, some care and discipline is recommended when
managing such a system. Some potentially dangerous things to carefully manage
include:
- The Flink connector is able to create replication slots at the sources
- Replication slots that stagnate will result in the accumulation of WAL files at the sources
Alternatively, in circumstances where better decoupling and independence between source owners and sink owners - another form of CDC connection that is more tightly controlled, and output that is more curated, will be safer and more manageable. In other words, a more carefully curated "public API" and better shielded internal implementation details.
For example:
Source owners are responsible for:
Source
→ CDC
→ Message bus, raw CDC feed
→ Some process cleanses and prepares the stream for consumers
→ Message bus, cleansed stream, stream is set up to be compacted w/ indefinite
retention for active keys, tombstoned keys are removed
And changelog consumers usage will look like:
Message bus, cleansed stream
→ Consumer apps (Flink, Plain old Kafka or Pulsar client, etc.), read and
leverage cleansed stream somehow
→ Sink
The message bus approach is explored more in experiment-flink-pulsar-debezium.
One challenge with a message bus middleware based approach will be harmonizing bootstrapping/backfilling ("I need enough changelog data to rebuild state") with GPDR data deletion requirements ("There is some state that I want to remove everywhere").
Aside from the "encrypt and throw away key" approach (which has its tradeoffs),
there is another approach - based on compaction + tombstones. While Kafka's
approach to compaction (the
most recent message per non-deleted key is retained forever, and tombstoned keys
are deleted everywhere) should work for this purpose, Pulsar's approach to
compaction (a
separate compacted topic is maintained in parallel with the original
non-compacted topic) is problematic until the ability to configure lifecycle
(i.e. retention policy) of both compacted and original topic independently is
implemented. As of Pulsar 2.7.0
, this capability is not yet available.
CDC Source
- https://github.com/ververica/flink-cdc-connectors
- https://github.com/morsapaes/flink-sql-CDC
- https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html
JDBC Sink
- How might we consolidate/merge multiple (logically same, but physically independent) source tables from distinct Postgres nodes and schemas into one logical dynamic table? For example: With a Postgres + schema per tenant database structure. Also, an analogous demuxing at a sink.
- Given a spectrum of personas ranging in technical proficiency - how might we potentially productize and platform(ize) solutions for less technical personas? For example, the range of those that don't want to think about: Java/Scala, deployment, operations, ... to even SQL?
- Performance?
- Fault tolerance? What if a source goes down? What if a sink goes down? What if Flink goes down?
- State management best practices? State migrations?
- What is the impact of continuous joins on the size of Flink state?
- Governance, access mgt of source, sink?
- Elasticsearch sink
- Temporal joins, lookup cache - https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html#lookup-cache
- Deployment