Reads row-level changes from CockroachDB then sends them to a message queue!
This app utilises Core changefeeds "which stream row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled".
- RabbitMQ
- CockroachDB
- The cursor store is read to check if a cursor is already stored.
- A changefeed is created with the
EXPERIMENTAL CHANGEFEED FOR {table}
expression with the following options:frequency
- this is how often a new cursor should be returned from cockroachdb.cursor
- this is the earliest time a change should be returned.
- Response rows are iterated to find rows that describe a change and rows which have the latest resolved cursor.
- If the row describes a change:
- The arguments are parsed into the
table
,key
andvalue
. - These are wrapped into a new struct and encoded into JSON.
- The new JSON payload is sent to the message queue.
- The arguments are parsed into the
- If the row is a new resolved cursor:
- The
value
argument is parsed as the others should be null. - The
resolved
JSON field is pulled from thevalue
response. - This is inserted/updated in the cursor store.
- The
SQL:
CREATE TABLE foo (a INT PRIMARY KEY, b int);
INSERT INTO foo VALUES (54);
Published message:
{"table":"foo","key":"[54]","value":{"after": {"a": 54, "b": null}}}
Tested on Rust version 1.46.0-nightly (346aec9b0 2020-07-11)
.
docker run duccos/crdb-changefeed-publisher:latest
Example deployments can be found in the .kube directory.
git clone git@github.com:ducc/crdb-changefeed-publisher.git
cd crdb-changefeed-publisher
cargo build --release
{environment vars} target/release/crdb-changefeed-publisher
Argument | Help | Options | Default |
---|---|---|---|
--help | Shows available arguments | ||
--version | Shows the running version | ||
--table | Name of the table to watch changes in | Table name e.g. foo | |
--queue | The message queue to send row changes to | rabbitmq | rabbitmq |
--cursor-store | Where cursor values should be stored | cockroachdb | cockroachdb |
--cursor-frequency | How often cursors should be received | Duration e.g. 10s | 10s |
Variable | Help | Options | Default |
---|---|---|---|
RUST_LOG | Logging level | trace, debug, info, error | info |
PROMETHEUS_ADDR | Adress of the promethues server | ip:port | 0.0.0.0:8001 |
DATABASE_URL | URL of the cockroachdb server | postgresql://user:pass@ip:port/database | |
AMQP_ADDR | RabbitMQ server address | amqp://ip:port | |
AMQP_QUEUE | RabbitMQ queue topic name | e.g. changes |
Prometheus metrics are exposed on the PROMETHEUS_ADDR env var address at /metrics.
Metric | Type |
---|---|
rabbitmq_messages_sent | Counter |
Licensed under MIT. See the LICENSE file in the repository root for the full text. This is a third party application and is not affiliated with CockroachDB. It is not a direct replacement for the Enterprise Changefeeds offering.
Feel free to submit a merge request! Your changes MUST be submitted under the MIT license.