An exploration of Flink + Pulsar + Debezium. This exploration primarily leverages the Flink Table API via the Flink SQL Client.
The following tools were used in this exploration:
- Flink Pulsar connector: to connect Flink to Pulsar
- Flink Debezium format: to interpret the Debezium data format
- Pulsar IO CDC Debezium source connector: to change-data-capture a changelog from a Postgres database into Pulsar
- Flink SQL Client: to easily work with Flink SQL without needing to write a Flink program in Java/Scala
- PostgreSQL: to play the role of a standard relational database. As a source or sink.
And, here's another exploration that omits messaging middleware altogether: experiment-flink-cdc-connectors
The general theme of "I want to get state from Point-A to Point-B, maybe transform it along the way, and continue to keep it updated, in near real-time" is a fairly common story that can take a variety of forms.
- data integration amongst microservices
- analytic datastore loading and updating
- cache maintenance
- search index syncing
Given these use cases, some interesting questions to explore are:
- Fundamentally, how well does a stream processing paradigm speak to these use cases? (I believe it does quite well. [1, 2, 3])
- How about Flink and its ecosystem?
- From a technological lens: how's performance, scalability, and fault tolerence?
- From a usability lens: what types of personas might be successful using various types of solutions? For example, how easy to use and powerful are Flink's Table API and SQL Client, vs its more expressive lower level API's. And what types of personas might be good fits for each?
We'll start with a system like this
Then we'll build up a system like this - layering in CDC via Debezium.
Build and start the system
docker-compose build
docker-compose up
Start Flink SQL Client
docker-compose exec flink-sql-client ./sql-client.sh
In the Flink SQL Client, define a Dynamic Table.
-- Flink SQL Client
CREATE TABLE pulsartest (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/avrotest',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'scan.startup.mode' = 'earliest'
);
SELECT * FROM pulsartest;
INSERT INTO pulsartest
VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE);
SELECT * FROM pulsartest;
List Pulsar topics
docker-compose exec pulsar /pulsar/bin/pulsar-admin topics list public/default
# persistent://public/default/avrotest
Subscribe to the Pulsar topic
docker-compose exec pulsar /pulsar/bin/pulsar-client consume -s "mysubscription" persistent://public/default/avrotest -n 0
In the Flink SQL Client, insert a few more rows and observe what happens. Observe
that the data is binary. This is because the value.format
that we used is
avro
.
Now let's try value.format
as json
.
-- Flink SQL Client
CREATE TABLE pulsarjsontest (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/jsontest', -- NOTE
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'json', -- NOTE
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'scan.startup.mode' = 'earliest'
);
Subscribe a Pulsar consumer to the new topic
docker-compose exec pulsar /pulsar/bin/pulsar-client consume -s "mysubscription" persistent://public/default/jsontest -n 0
Insert data
-- Flink SQL Client
INSERT INTO pulsarjsontest
VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE);
The Pulsar consumer output now looks like this
----- got message -----
key:[a2V5MQ==], properties:[k11=v11, k12=v12], content:{"physical_1":"data 1","physical_2":1,"key":"key1","physical_3":true}
----- got message -----
key:[a2V5Mg==], properties:[k21=v21, k22=v22], content:{"physical_1":"data 2","physical_2":2,"key":"key2","physical_3":false}
----- got message -----
key:[a2V5Mw==], properties:[k31=v31, k32=v32], content:{"physical_1":"data 3","physical_2":3,"key":"key3","physical_3":true}
We can write directly to Pulsar using other means, too. For example, using the Pulsar client CLI.
# get into pulsar container shell
docker-compose exec pulsar bash
# use pulsar-client to send a message into the topic
./bin/pulsar-client produce persistent://public/default/jsontest -f <(echo '{"physical_1":"data 1","physical_2":1,"key":"key1","physical_3":true}')
Try a Flink query and see the new row appear
-- Flink SQL Client
SELECT * from pulsarjsontest;
The pulsar
connector simply appends rows. If you'd like to have your table
update existing rows, you can leverage the
upsert-pulsar
connector.
-- Flink SQL Client
CREATE TABLE pulsarjsontestupsert (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN,
PRIMARY KEY (key) NOT ENFORCED -- NOTE: added this
) WITH (
'connector' = 'upsert-pulsar', -- NOTE: this changed from pulsar to upsert-pulsar
'topic' = 'persistent://public/default/jsontest',
'key.format' = 'raw',
'value.format' = 'json',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080'
);
For reference, the following properties are available:
- admin-url
- connector
- key.fields-prefix
- key.format
- key.raw.charset
- key.raw.endianness
- properties
- property-version
- service-url
- sink.parallelism
- topic
- value.fields-include
- value.format
- value.json.fail-on-missing-field
- value.json.ignore-parse-errors
- value.json.map-null-key.literal
- value.json.map-null-key.mode
- value.json.timestamp-format.standard
Try a Flink SQL query with the new table
-- Flink SQL Client
SELECT * FROM pulsarjsontestupsert;
Insert and update via Pulsar messages
# pulsar container shell, docker-compose exec pulsar bash
# these should translate into inserts
./bin/pulsar-client produce persistent://public/default/jsontest -f <(echo '{"physical_1":"data 1","physical_2":100,"key":"key1","physical_3":false}')
./bin/pulsar-client produce persistent://public/default/jsontest -f <(echo '{"physical_1":"data 1","physical_2":100,"key":"key2","physical_3":false}')
# this should translate into an update
./bin/pulsar-client produce persistent://public/default/jsontest -f <(echo '{"physical_1":"updated","physical_2":1,"key":"key1","physical_3":true}')
# TODO: How do we delete a row? These don't seem to work. Look more into this
# ./bin/pulsar-client produce persistent://public/default/jsontest -k key2 -f <(echo)
# ./bin/pulsar-client produce persistent://public/default/jsontest -k key3 -f <(echo)
As of v2.7.5.2
of pulsar-flink
, the upsert-pulsar
connector doesn't seem
to work with multiple topics via topic-pattern
.
-- Flink SQL Client
CREATE TABLE pulsarmultitopictest (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN,
PRIMARY KEY (key) NOT ENFORCED
) WITH (
'connector' = 'upsert-pulsar', -- NOTE
'topic-pattern' = 'persistent://public/default/multitopictest.*', -- NOTE
'key.format' = 'raw',
'value.format' = 'json',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080'
);
SELECT * FROM pulsarmultitopictest;
-- [ERROR] Could not execute SQL statement. Reason:
-- org.apache.flink.table.api.ValidationException: One or more required options are missing.
-- Missing required options are:
-- topic
The appending pulsar
connector seems to work with topic-pattern
though
-- Flink SQL Client
CREATE TABLE pulsarmultitopictest (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar', -- NOTE
'topic-pattern' = 'persistent://public/default/multitopictest.*', -- NOTE
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'json',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'scan.startup.mode' = 'earliest'
);
SELECT * FROM pulsarmultitopictest;
Now send some messages to various topics that match topic-pattern
.
# inside Pulsar container, docker-compose exec pulsar bash
./bin/pulsar-client produce persistent://public/default/multitopictest1 -f <(echo '{"physical_1":"data 1","physical_2":100,"key":"key1","physical_3":false}')
./bin/pulsar-client produce persistent://public/default/multitopictest2 -f <(echo '{"physical_1":"data 1","physical_2":100,"key":"key1","physical_3":false}')
./bin/pulsar-client produce persistent://public/default/multitopictest3 -f <(echo '{"physical_1":"data 1","physical_2":100,"key":"key1","physical_3":false}')
Note that the Flink SQL query does NOT return these rows.
Stop the query, and rerun it again
-- Flink SQL Client
SELECT * FROM pulsarmultitopictest;
Now the topic-pattern
detects the matched topics and displays the rows.
Takeaway: The appending pulsar
connector can source from multiple topics.
However, it seems like topic detection happens at job startup?
The Flink Debezium value format is intended to interpret the data format that Debezium provides.
To experiment with this, let's bring up another piece of the system (Pulsar IO Postgres Source Connector) so that our system now looks like this.
Start the Pulsar IO Postgres Debezium source connector in the running Pulsar container. This will start the Postgres → Pulsar change-data-capture connection.
docker-compose exec pulsar /pulsar/bin/pulsar-admin source localrun --source-config-file /debezium-postgres-source-config.yaml
Note: This leverages "local run mode". In production deployments, you'd likely want to leverage "cluster mode".
List Pulsar topics
docker-compose exec pulsar /pulsar/bin/pulsar-admin topics list public/default
# "persistent://public/default/source-db1.public.users"
# "persistent://public/default/debezium-postgres-source-debezium-offset-topic"
# "persistent://public/default/debezium-postgres-topic"
Insert some data into Postgres
-- source-db1 psql, docker-compose exec source-db1 psql experiment experiment
INSERT INTO users (full_name) VALUES ('bob');
For reference: a Debezium message look like this:
{
"before": null,
"after": {
"id": 2,
"full_name": "bob"
},
"source": {
"version": "1.0.0.Final",
"connector": "postgresql",
"name": "source-db1",
"ts_ms": 1615476804970,
"snapshot": "false",
"db": "experiment",
"schema": "public",
"table": "users",
"txId": 561,
"lsn": 23439752,
"xmin": null
},
"op": "c",
"ts_ms": 1615476806822
}
Set up Flink dynamic table
-- Flink SQL Client
CREATE TABLE pulsardebeziumtest (
`full_name` STRING,
`id` BIGINT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-pulsar', -- NOTE
'topic' = 'persistent://public/default/source-db1.public.users',
'key.format' = 'raw',
'value.format' = 'debezium-json', -- NOTE
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080'
);
SELECT * from pulsardebeziumtest
-- [ERROR] Could not execute SQL statement. Reason:
-- org.apache.flink.table.api.ValidationException: 'upsert-Pulsar' connector doesn't support 'debezium-json' as value format, because 'debezium-json' is not in insert-only mode.
Note: The relevant
pulsar-flink
code is here.
Now, let's try the appending pulsar
connector instead
-- Flink SQL Client
CREATE TABLE pulsardebeziumappendtest (
`id` BIGINT,
`full_name` STRING,
`eventTime` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL,
`source_schema` STRING METADATA FROM 'value.source.schema' VIRTUAL,
`source_database` STRING METADATA FROM 'value.source.database' VIRTUAL,
`properties` MAP<STRING, STRING> METADATA,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'pulsar', -- NOTE
'topic' = 'persistent://public/default/source-db1.public.users',
'value.format' = 'debezium-json', -- NOTE
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'scan.startup.mode' = 'earliest'
);
For reference, for usage as columns, the DynamicTableSource
class
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource
supports the following metadata keys:
- value.schema
- value.ingestion-timestamp
- value.source.timestamp
- value.source.database
- value.source.schema
- value.source.table
- value.source.properties
- topic
- messageId
- sequenceId
- publishTime
- eventTime
- properties
See here for available Debezium format metadata and how to map them to table columns: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html#available-metadata
Now try again, now using one table set up with the pulsar
append mode
connector to insert into another table with upsert-pulsar
upsert mode
connector. Using upsert-kafka
as a reference (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html)
-- Flink SQL Client
-- Create an append mode table with incoming data from Debezium
CREATE TABLE pulsardebeziumappendtest (
`id` BIGINT,
`full_name` STRING,
`eventTime` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL,
`source_schema` STRING METADATA FROM 'value.source.schema' VIRTUAL,
`source_database` STRING METADATA FROM 'value.source.database' VIRTUAL,
`properties` MAP<STRING, STRING> METADATA,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/source-db1.public.users',
'value.format' = 'debezium-json',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'scan.startup.mode' = 'earliest'
);
-- Create an upsert mode table
CREATE TABLE pulsardebeziumupsertest (
`id` BIGINT,
`full_name` STRING,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-pulsar',
'topic' = 'persistent://public/default/upserttest',
'key.format' = 'raw',
'value.format' = 'json',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080'
);
-- source-db1 psql
INSERT INTO users (full_name) VALUES ('bob');
INSERT INTO users (full_name) VALUES ('bob 2');
UPDATE users SET full_name = 'bobby' WHERE id = 1;
-- DELETE FROM users WHERE id = 1 -- TODO: this doesn't show up in pulsardebeziumappendtest. Look more into this.
-- Flink SQL Client
-- Get the most recent row per key from the append mode table
-- and use that as an insert query into the upsert mode table
--
-- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#deduplication
INSERT INTO pulsardebeziumupsertest (id, full_name, eventTime, properties)
SELECT
id,
full_name,
eventTime,
MAP['k11', 'v11', 'k12', 'v12'] -- TODO: proper values for the below
FROM (
SELECT
id,
full_name,
eventTime,
ROW_NUMBER() OVER ( -- ROW_NUMBER assigns a unique, sequential number to each row, starting with one.
PARTITION BY id -- the partitioning / deduplicate key
ORDER BY eventTime DESC -- the ordering column. It must be a time attribute
) as row_num
FROM pulsardebeziumappendtest)
WHERE row_num = 1;
SELECT * FROM pulsardebeziumupsertest;
-- source-db1 psql
-- INSERT works as expected
INSERT INTO users (full_name) VALUES ('bob');
INSERT INTO users (full_name) VALUES ('bob 2');
-- UPDATE works too
UPDATE users SET full_name = 'bobby' WHERE id = 1;
-- TODO: DELETE breaks the job, unfortunately.
-- I might be doing something weird, though. Look into this some more.
-- DELETE FROM users WHERE id = 1
-- Caused by: java.lang.RuntimeException: Can not retract a non-existent record. This should never happen.
Let's try setting up the connector with value.format=json
.
Again, for reference, a Debezium message look like this:
{
"before": null,
"after": {
"id": 2,
"full_name": "bob"
},
"source": {
"version": "1.0.0.Final",
"connector": "postgresql",
"name": "source-db1",
"ts_ms": 1615476804970,
"snapshot": "false",
"db": "experiment",
"schema": "public",
"table": "users",
"txId": 561,
"lsn": 23439752,
"xmin": null
},
"op": "c",
"ts_ms": 1615476806822
}
-- Flink SQL Client
CREATE TABLE pulsardebeziumappendtest (
`before` STRING,
`after` STRING,
`source` STRING,
`op` STRING,
-- Accessing nested fields from the json
`source` ROW(`schema` STRING, `db` STRING, `name` STRING),
-- Flattening nested fields
`source_schema` AS source.schema,
`source_db` AS source.db,
`source_name` AS source.name,
-- Pulsar metadata fields
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/source-db1.public.users',
'value.format' = 'json', -- NOTE
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
'scan.startup.mode' = 'earliest'
);
SELECT
*,
-- Nested fields can be accessed like this
source.schema,
source.db,
source.name
FROM pulsardebeziumappendtest;
One challenge with a message bus middleware based approach will be harmonizing bootstrapping/backfilling ("I need enough changelog data to rebuild state") with GPDR data deletion requirements ("There is some state that I want to remove everywhere").
Aside from the "encrypt and throw away key" approach (which has its tradeoffs),
there is another approach - based on compaction + tombstones. While Kafka's
approach to compaction (the
most recent message per non-deleted key is retained forever, and tombstoned keys
are deleted everywhere) should work for this purpose, Pulsar's approach to
compaction (a
separate compacted topic is maintained in parallel with the original
non-compacted topic) is problematic until the ability to configure lifecycle
(i.e. retention policy) of both compacted and original topic independently is
implemented. As of Pulsar 2.7.0
, this capability is not yet available.
- Pulsar changelog topics + GPDR + lifecycle policies
- How might we consolidate/merge multiple (logically same, but physically independent) source tables from distinct Postgres nodes and schemas into one logical dynamic table? For example: With a Postgres + schema per tenant database structure. Also, an analogous demuxing at a sink.
List Pulsar topics
docker-compose exec pulsar /pulsar/bin/pulsar-admin topics list public/default
Delete Pulsar topic
docker-compose exec pulsar /pulsar/bin/pulsar-admin topics delete persistent://public/default/source-db1.public.users
- Flink
- Pulsar
- Debezium
- Flink + Debezium
- Flink Debezium Format
- flink-cdc-connectors (not used in this exploration)
- FLIP-95
- FLIP-105
- Flink + Pulsar
- Flink SQL Client
- Pulsar IO CDC Debezium source connector