/clickhouse-sink-connector

Replicate data from MySQL, Postgres and MongoDB to ClickHouse

Primary LanguageRoffApache License 2.0Apache-2.0

License Tests Build, Unit tests, Push Docker image. AltinityDB Slack

Altinity Replicator for ClickHouse (Lightweight version)

New tool to replicate data from MySQL, PostgreSQL, MariaDB and Mongo without additional dependencies. Single executable and lightweight.

Supports DDL in MySQL.

Usage

From Command line.

Download the JAR file from the releases.

Update the yaml configuration file.(mysql_config.yaml)

database.hostname: "localhost"
database.port: "3306"
database.user: "root"
database.password: "root"
database.include.list: sbtest
#table.include.list=sbtest1
clickhouse.server.url: "localhost"
clickhouse.server.user: "root"
clickhouse.server.pass: "root"
clickhouse.server.port: "8123"
clickhouse.server.database: "test"
database.allowPublicKeyRetrieval: "true"
snapshot.mode: "schema_only"
connector.class: "io.debezium.connector.mysql.MySqlConnector"
offset.storage.file.filename: /data/offsets.dat
database.history.file.filename: /data/dbhistory.dat
schema.history.internal.file.filename: /data/schemahistory2.dat

Start the application. java -jar clickhouse-debezium-embedded-1.0-SNAPSHOT.jar mysql_config.yaml

Configuration

Configuration Description
database.hostname Source Database HostName
database.port Source Database Port number
database.user Source Database Username
database.password Source Database Password
database.include.list List of databases to be included in replication.
table.include.list List of tables to be included in replication.
clickhouse.server.url ClickHouse URL
clickhouse.server.user ClickHouse username
clickhouse.server.pass ClickHouse password
clickhouse.server.port ClickHouse port
clickhouse.server.database ClickHouse destination database
snapshot.mode "initial" -> Data that already exists in source database will be replicated. "schema_only" -> Replicate data that is added/modified after the connector is started.<br/> MySQL: https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-snapshot-mode \
PostgreSQL: https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-snapshot-mode
MongoDB: initial, never. https://debezium.io/documentation/reference/stable/connectors/mongodb.html
connector.class MySQL -> "io.debezium.connector.mysql.MySqlConnector"
PostgreSQL ->
Mongo ->
offset.storage.file.filename Offset storage file(This stores the offsets of the source database) MySQL: mysql binlog file and position, gtid set. Make sure this file is durable and its not persisted in temp directories.
database.history.file.filename Database History: Make sure this file is durable and its not persisted in temp directories.
schema.history.internal.file.filename Schema History: Make sure this file is durable and its not persisted in temp directories.
disable.ddl Optional, Default: false, if DDL execution needs to be disabled
enable.ddl.snapshot Optional, Default: false, If set to true, the DDL that is passed as part of snapshot process will be executed. Default behavior is DROP/TRUNCATE as part of snapshot is disabled.
database.allowPublicKeyRetrieval Optional, MySQL specific: true/false
Docker

Images are published in Gitlab.

registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest

Docker Setup instructions

Altinity Sink Connector for ClickHouse

Sink connector is used to transfer data from Kafka to Clickhouse using the Kafka connect framework. The connector is tested with the following converters

Features

  • Inserts, Updates and Deletes using ReplacingMergeTree - Updates/Deletes
  • Auto create tables in ClickHouse
  • Exactly once semantics
  • Bulk insert to Clickhouse.
  • Store Kafka metadata Kafka Metadata
  • Kafka topic to ClickHouse table mapping, use case where MySQL table can be mapped to a different CH table name.
  • Store raw data in JSON(For Auditing purposes)
  • Monitoring(Using Grafana/Prometheus) Dashboard to monitor lag.
  • Kafka Offset management in ClickHouse
  • Increased Parallelism(Customize thread pool for JDBC connections)

Source Databases

Component Version(Tested)
Redpanda 22.1.3, 22.3.9
Kafka-connect 1.9.5.Final
Debezium 2.1.0.Alpha1
MySQL 8.0
ClickHouse 22.9, 22.10
PostgreSQL 15

Quick Start (Docker-compose)

Docker image for Sink connector (Updated December 12, 2022) altinity/clickhouse-sink-connector:latest https://hub.docker.com/r/altinity/clickhouse-sink-connector

Recommended Memory limits.

Production Usage In docker-compose.yml file, its recommended to set Xmx to atleast 5G -Xmx5G when using in Production and if you encounter a Out of memory/Heap exception error. for both Debezium and Sink

- KAFKA_HEAP_OPTS=-Xms2G -Xmx5G

Kubernetes

Docker Image for Sink connector(with Strimzi) https://hub.docker.com/repository/docker/subkanthi/clickhouse-kafka-sink-connector-strimzi

Docker Image for Debezium MySQL connector(with Strimzi) https://hub.docker.com/repository/docker/subkanthi/debezium-mysql-source-connector

Recommended to atleast set 5Gi as memory limits to run on kubernetes using strimzi.

      limits:
        memory: 6Gi
      requests:
        memory: 6Gi

MySQL:

cd deploy/docker
./start-docker-compose.sh 

PostgreSQL:

export SINK_VERSION=latest
cd deploy/docker
docker-compose -f docker-compose.yaml -f docker-compose-postgresql.override.yaml up

For Detailed setup instructions - Setup

Development:

Requirements

mvn install -DskipTests=true

Data Types

Note: Using float data types are highly discouraged, because of the behaviour in ClickHouse with handing precision.(Decimal is a better choice)

MySQL Kafka
Connect
ClickHouse
Bigint INT64_SCHEMA Int64
Bigint Unsigned INT64_SCHEMA UInt64
Blob String + hex
Char String String / LowCardinality(String)
Date Schema: INT64
Name:
debezium.Date
Date(6)
DateTime(6) Schema: INT64
Name: debezium.Timestamp
DateTime64(6)
Decimal(30,12) Schema: Bytes
Name:
kafka.connect.data.Decimal
Decimal(30,12)
Double Float64
Int INT32 Int32
Int Unsigned INT64 UInt32
Longblob String + hex
Mediumblob String + hex
Mediumint INT32 Int32
Mediumint Unsigned INT32 UInt32
Smallint INT16 Int16
Smallint Unsigned INT32 UInt16
Text String String
Time String
Time(6) String
Timestamp DateTime64
Tinyint INT16 Int8
Tinyint Unsigned INT16 UInt8
varbinary(*) String + hex
varchar(*) String
JSON String
BYTES BYTES, io.debezium.bits String
YEAR INT32 INT32
GEOMETRY Binary of WKB String

Sink Connector Configuration

Property Default Description
tasks.max No SinkConnector task(essentially threads), ideally this needs to be the same as the Kafka partitions.
topics.regex No Regex of matching topics. Example: "SERVER5432.test.(.*)" matches SERVER5432.test.employees and SERVER5432.test.products
topics No The list of topics. topics or topics.regex has to be provided.
clickhouse.server.url ClickHouse Server URL
clickhouse.server.user ClickHouse Server username
clickhouse.server.pass ClickHouse Server password
clickhouse.server.database ClickHouse Database name
clickhouse.server.port 8123 ClickHouse Server port
clickhouse.topic2table.map No Map of Kafka topics to table names, <topic_name1>:<table_name1>,<topic_name2>:<table_name2> This variable will override the default mapping of topics to table names.
store.kafka.metadata false If set to true, kafka metadata columns will be added to Clickhouse
store.raw.data false If set to true, the entire row is converted to JSON and stored in the column defined by the store.raw.data.column field
store.raw.data.column No Clickhouse table column to store the raw data in JSON form(String Clickhouse DataType)
metrics.enable true Enable Prometheus scraping
metrics.port 8084 Metrics port
buffer.flush.time.ms 30 Buffer(Batch of records) flush time in milliseconds
thread.pool.size 10 Number of threads that is used to connect to ClickHouse
auto.create.tables false Sink connector will create tables in ClickHouse (If it does not exist)
snowflake.id true Uses SnowFlake ID(Timestamp + GTID) as the version column for ReplacingMergeTree
replacingmergetree.delete.column "sign" Column used as the sign column for ReplacingMergeTree.

ClickHouse Loader(Load Data from MySQL to CH for Initial Load)

Clickhouse Loader is a program that loads data dumped in MySQL into a CH database compatible the sink connector (ReplacingMergeTree with virtual columns _version and _sign)

Grafana Dashboard

Documentation

Blog articles