confluentinc/kafka-connect-jdbc

It is possible to sink data using JDBC connector without a schema in message?

ivanilk opened this issue · 2 comments

Hi There, I’m trying to insert my data from a Kafka Topic into clickhouse with theJDBC Sink Connector.
This is my connector config

{
  "name": "JdbcSinkConnector",
  "config": {
    "value.converter.schemas.enable": "false",
    "name": "JdbcSinkConnector_my_topic",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.retry.timeout": "100",
    "errors.retry.delay.max.ms": "60000",
    "topics": "my_topic",
    "connection.url": "jdbc:clickhouse://clickhouse:8123/my_db",
    "connection.user": "default",
    "connection.password": "",
    "insert.mode": "insert",
    "batch.size": "1000",
    "table.name.format": "my_tb",
    "pk.mode": "record_value",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

and this is my message

{
  "ts": 1682006722000,
  "sessionId": 194860,
  "auth": "Logged In",
  "level": "free",
  "itemInSession": 5,
  "city": "Erie",
}

but got an error Error: Sink connector 'JdbcSinkConnector_my_topic' is configured with 'delete.enabled=false' and 'pk.mode=record_value' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='my_topic',partition=0,offset=0,timestamp=1687020175058) with a HashMap value and null value schema. (org.apache.kafka.connect.runtime.WorkerSinkTask)
Pls help me, thanks all

A schema is required. Otherwise, it cannot guarantee that any two events have the exact same fields or types

Worth pointing out that Clickhouse has its own Kafka ingest mechanisms. https://clickhouse.com/docs/en/integrations/kafka#choosing-an-option