ClickHouse/clickhouse-kafka-connect

Can't connect to CliclHouse from local minikube

vladislav-bakanov opened this issue · 6 comments

Describe the bug

Hi! Discovered bug very similar as this issue with JDBC driver, but with this driver plugin.

  1. I deployed Kafka Connect Strimzi CRD on local k8 cluster (minikube). I successfully connected to my company Kafka deployed on test environment.
  2. I built docker image, based on: quay.io/strimzi/kafka:0.39.0-kafka-3.5.0 and installed this plugin to /opt/kafka/plugins. This plugin was successfully detected by the Kafka Connect service.
  3. I added Strimzi Kafka Connector CRD on local k8s cluster (minikube). I see in logs, that this connector was successfully consumed by Kafka Connect service and I can see this connector in list of all available connectors (http://localhost:8083/connectors)

Steps to reproduce

  1. Deploy Strimzi Kafka Opearator v.0.39.0 in minikube
  2. Deploy Strimzi Kafka resource in minikube
  3. Deploy Strimzi Kafka Connect resource (with docker image with installed clickhouse-kafka-connect plugin) in minikube
  4. Deploy Strimzi Kafka Connector resource

After last step every reconciliation of Kafka Connector will fail with connectivity error.

I also tried to ping specified server and query data from KafkaConnect pod - it works.

Expected behaviour

Connector successfully connects to the Clickhouse server and starts streaming.

Error log

java.util.concurrent.ExecutionException: com.clickhouse.client.ClickHouseException: The target server failed to respond, server ClickHouseNode [uri=http://XXXXXXXXXXXXXXXXXXXX.europe-west4.gcp.clickhouse.cloud:8443/XXXXXXXXXXXXXXXXXXXX]

Configuration

Environment

  • Kafka cluster operator (Strimzi CRD): 0.39.0
  • Kafka version: 3.5.0
  • Clickhouse Kafka Connect version: 1.0.16 (also tried 1.0.17)
  • Minikube version: v1.32.0 [commit: 8220a6eb95f0a4d75f7f2d7b14cef975f050512d]
  • OS: MacBook M2 [Sonoma 14.3.1 (23D60)]

ClickHouse server

  • ClickHouse Server version: ClickHouse 24.1 [Cloud]

Kafka Connector configuration:

apiVersion: [kafka.strimzi.io/v1beta2](http://kafka.strimzi.io/v1beta2)
kind: KafkaConnector
metadata:
  name: XXXXXXXXXXXX-kafka-connector
  namespace: kafka
  labels:
    [strimzi.io/cluster](http://strimzi.io/cluster): XXXXXXXXXXXX-kafkaconnect
spec:
  class: com.clickhouse.kafka.connect.ClickHouseSinkConnector
  tasksMax: 1
  autoRestart:
    enabled: true
  state: running
  config:
    topics: XXXXXXXXXXXXXXXXXXXX
    topic2TableMap: "XXXXXXXXXXXXXXXXXXXX=my_clickhouse_table"

    hostname: [XXXXXXXXXXXXXXXXXXXX.europe-west4.gcp.clickhouse.cloud](http://xxxxxxxxxxxxxxxxxxxx.europe-west4.gcp.clickhouse.cloud/)
    port: "8443"
    database: XXXXXXXXXXXXXXXXXXXX
    username: XXXXXXXXXXXXXXXXXXXX
    password: XXXXXXXXXXXXXXXXXXXX
    sslEnable: "true"
    security.protocol: SSL

    suppressTableExistenceException: "false"

    key.deserializer: io.confluent.connect.avro.AvroConverter
    value.deserializer: io.confluent.connect.avro.AvroConverter

    # Schema
    schema.registry.url: http://xxxxxxxxxxxxxxxxxxxx.com/
    value.converter.schemas.enable: "true"
    value.converter.schema.registry.url: http://xxxxxxxxxxxxxxxxxxxx.com/
    key.converter.schema.registry.url: http://xxxxxxxxxxxxxxxxxxxx.com/

    # Transforms
    transforms: flatten
    transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
    transforms.flatten.delimiter: "."

Kafka Connect configuration:

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: XXXXXXXXXXXXXXXX
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: docker-image-strimzi-kafka-connect-with-clickhouse:0.1.0-dev.1
  replicas: 1
  authentication:
    type: scram-sha-512
    username: kafka-user-dwh
    passwordSecret:
      secretName: kafka-password
      password: password

  bootstrapServers: my-kafka.com:9094
  config:
    group.id: dwh-kafka-connect
    offset.storage.topic: uat.dwh.kafka-connect.offsets
    config.storage.topic: uat.dwh.kafka-connect.configs
    status.storage.topic: uat.dwh.kafka-connect.status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
    
  # Logging
  logging:
    type: inline
    loggers:
      log4j.rootLogger: INFO
      log4j.logger.org.eclipse.jetty.http: TRACE
      log4j.logger.org.apache.kafka.connect: TRACE
      log4j.logger.com.clickhouse.kafka.connect: TRACE

Docker image:

FROM quay.io/strimzi/kafka:0.39.0-kafka-3.5.0

USER root:root
# COPY ./clickhouse-kafka-connect-v1.0.17.jar /opt/kafka/plugins/
COPY ./clickhouse-kafka-connect-v1.0.16.jar /opt/kafka/plugins/

USER 1001

Hi @vladislav-bakanov I think I see the issue - it should be "ssl" rather than "sslEnable"

Please try that and let me know if it still doesn't connect!

The full list of configuration options can be seen at: https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#configuration-options

Since this seems to be resolved in the other thread, closing this out 🙂

@Paultagoras, yeap, thanks.

Just for history and for everyone who encountered with the same problem, please, follow my other request for better logging:
#379 (comment)

The main issue here was wrong connector setup for Kafka Consumer & Kafka Producer as well as common Kafka connectivity options connector options.

In my case Kafka auth required security protocol SASL_PLAINTEXT with SCRAM-SHA-512 method.
Kafka Connect requires all of these settings to be specified explicitly for common Kafka auth options and for Kafka Consumer & Kafka Producer AS WELL.

How to specify this values

Docker

use env-variables to specify ALL these options

`CONNECT_SECURITY_PROTOCOL="SASL_PLAINTEXT"`
`CONNECT_SASL_MECHANISM="SCRAM-SHA-512"`
`CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';"`

`CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_PLAINTEXT"`
`CONNECT_PRODUCER_SASL_MECHANISM="SCRAM-SHA-512"`
`CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';"`

`CONNECT_CONSUMER_SECURITY_PROTOCOL="SASL_PLAINTEXT"`
`CONNECT_CONSUMER_SASL_MECHANISM="SCRAM-SHA-512"`
`CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';"`

Kubernetes options

use Helm-chart

configurationOverrides:
    "security.protocol"="SASL_PLAINTEXT"
    "sasl.mechanism"="SCRAM-SHA-512"
    "sasl.jaas.config"="org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';"

    "producer.security.protocol"="SASL_PLAINTEXT"
    "producer.sasl.mechanism"="SCRAM-SHA-512"
    "producer.sasl.jaas.config"="org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';"

    "consumer.security.protocol"="SASL_PLAINTEXT"
    "consumer.sasl.mechanism"="SCRAM-SHA-512"
    "consumer.sasl.jaas.config"="org.apache.kafka.common.security.scram.ScramLoginModule required username='...' password='...';"

@Paultagoras I faced this problem one more time. Can we reopen this issue?

I tried to sink data from Kafka to Clickhouse with the help of Kafka Connect + ClickHouse Driver deployed on k8s (wrapped into helm-chart).
This approach doesn't work due to connectivity issues. Log message:

[2024-06-03 18:51:33,190] INFO Start SinkTask:  (com.clickhouse.kafka.connect.sink.ClickHouseSinkTask)
[2024-06-03 18:51:33,190] DEBUG ClickHouseSinkConfig: hostname: my-cloud-dns.gcp.clickhouse.cloud, port: 8443, database: sandbox, username: data_user, sslEnabled: true, timeout: 30000, retry: 3, exactlyOnce: true (com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig)
[2024-06-03 18:51:33,190] DEBUG ClickHouseSinkConfig: clickhouseSettings: {send_progress_in_http_headers=1, input_format_skip_unknown_fields=1, wait_end_of_query=1} (com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig)
[2024-06-03 18:51:33,190] DEBUG ClickHouseSinkConfig: topicToTableMap: {uat.dwh.debezium.topic=my_table} (com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig)
[2024-06-03 18:51:33,190] INFO Errant record reporter not configured. (com.clickhouse.kafka.connect.sink.ClickHouseSinkTask)
[2024-06-03 18:51:33,190] INFO Enable ExactlyOnce? true (com.clickhouse.kafka.connect.sink.ProxySinkTask)
[2024-06-03 18:51:33,190] INFO hostname: [my-cloud-dns.gcp.clickhouse.cloud] port [8443] database [sandbox] username [data_user] password [********************] sslEnabled [true] timeout [30000] (com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider)
[2024-06-03 18:51:33,190] INFO ClickHouse URL: https://my-cloud-dns.gcp.clickhouse.cloud:8443/sandbox (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient)
[2024-06-03 18:51:33,190] DEBUG Adding username [data_user] (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient)
[2024-06-03 18:51:33,192] DEBUG Checking [com.clickhouse.client.http.ClickHouseHttpClient@27abc8ca] against [HTTP]... (com.clickhouse.client.ClickHouseNodeSelector)
[2024-06-03 18:51:33,193] DEBUG Server [ClickHouseNode [uri=https://my-cloud-dns.gcp.clickhouse.cloud:8443/sandbox, options={sslmode=STRICT}]@-1309820496] , Timeout [30000] (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient)
[2024-06-03 18:51:33,193] DEBUG Connecting to: ClickHouseNode [uri=https://my-cloud-dns.gcp.clickhouse.cloud:8443/sandbox, options={sslmode=STRICT}]@-1309820496 (com.clickhouse.client.AbstractClient)
[2024-06-03 18:51:33,193] DEBUG url [https://my-cloud-dns.gcp.clickhouse.cloud:8443/?compress=1&extremes=0] (com.clickhouse.client.http.ClickHouseHttpConnection)
[2024-06-03 18:51:33,194] DEBUG Connection established: com.clickhouse.client.http.ApacheHttpConnectionImpl@429e7c85 (com.clickhouse.client.AbstractClient)
[2024-06-03 18:51:33,194] DEBUG Query: SELECT 1 FORMAT TabSeparated (com.clickhouse.client.http.ClickHouseHttpClient)
java.util.concurrent.ExecutionException: com.clickhouse.client.ClickHouseException: Connect to https://my-cloud-dns.gcp.clickhouse.cloud:8443 [my-cloud-dns.gcp.clickhouse.cloud/35.201.102.65] failed: Read timed out, server ClickHouseNode [uri=https://my-cloud-dns.gcp.clickhouse.cloud:8443/sandbox, options={sslmode=STRICT}]@-1309820496
 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
 at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2028)
 at com.clickhouse.client.ClickHouseClient.ping(ClickHouseClient.java:959)
 at com.clickhouse.client.AbstractClient.ping(AbstractClient.java:338)
 at com.clickhouse.client.ClickHouseClientBuilder$Agent.ping(ClickHouseClientBuilder.java:317)
 at com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient.ping(ClickHouseHelperClient.java:104)
 at com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider.<init>(KeeperStateProvider.java:49)
 at com.clickhouse.kafka.connect.sink.ProxySinkTask.<init>(ProxySinkTask.java:45)
 at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.start(ClickHouseSinkTask.java:57)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
 at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829)

But, I did simple query with CLI inside the pod and SELECT 1 works perfectly.

Then I just ran the same Docker image (that is used under helm deployment) with the same configuration and well - it works. It event says me different log messages, for example it notified me about the wrong types in ClickHouse table and after fix of types - it started work as I expected.

But what's wrong within the minikube deployment - I'm running out of ideas.

My Dockerfile looks like:

FROM confluentinc/cp-kafka-connect:7.5.4
USER root
RUN confluent-hub install --no-prompt clickhouse/clickhouse-kafka-connect:v1.0.16
USER appuser

My helm deployment looks like:

{{- if .Capabilities.APIVersions.Has "apps/v1" }}
apiVersion: apps/v1
{{- else }}
apiVersion: apps/v1beta2
{{- end }}
kind: Deployment
metadata:
  name: {{ template "cp-kafka-connect.fullname" . }}
  annotations:
    config-checksum: {{ include "cp-kafka-connect.configChecksum" . }}
  labels:
    {{- include "cp-kafka-connect.labels" . | nindent 4 }}
spec:
  replicas: {{ .Values.replicaCount }}
  selector:
    matchLabels:
      {{- include "cp-kafka-connect.selectorLabels" . | nindent 6 }}
  template:
    metadata:
      labels:
          {{- include "cp-kafka-connect.selectorLabels" . | nindent 8 }}
      {{- if or .Values.podAnnotations .Values.prometheus.jmx.enabled }}
      annotations:
      {{- range $key, $value := .Values.podAnnotations }}
        {{ $key }}: {{ $value | quote }}
      {{- end }}
      {{- if .Values.prometheus.jmx.enabled }}
        prometheus.io/scrape: "true"
        prometheus.io/port: {{ .Values.prometheus.jmx.port | quote }}
      {{- end }}
      {{- end }}
    spec:
      containers:
        {{- if .Values.prometheus.jmx.enabled }}
        - name: prometheus-jmx-exporter
          image: "{{ .Values.prometheus.jmx.image }}:{{ .Values.prometheus.jmx.imageTag }}"
          imagePullPolicy: "{{ .Values.prometheus.jmx.imagePullPolicy }}"
          command:
          - java
          - -XX:+UnlockExperimentalVMOptions
          - -XX:+UseCGroupMemoryLimitForHeap
          - -XX:MaxRAMFraction=1
          - -XshowSettings:vm
          - -jar
          - jmx_prometheus_httpserver.jar
          - {{ .Values.prometheus.jmx.port | quote }}
          - /etc/jmx-kafka-connect/jmx-kafka-connect-prometheus.yml
          ports:
          - containerPort: {{ .Values.prometheus.jmx.port }}
          resources:
{{ toYaml .Values.prometheus.jmx.resources | indent 12 }}
          volumeMounts:
          - name: jmx-config
            mountPath: /etc/jmx-kafka-connect
        {{- end }}
        - name: {{ template "cp-kafka-connect.name" . }}-server
          image: "{{ .Values.image }}:{{ .Values.imageTag }}"
          imagePullPolicy: "{{ .Values.imagePullPolicy }}"
          ports:
            - name: kafka-connect
              containerPort: {{ .Values.servicePort}}
              protocol: TCP
            {{- if .Values.prometheus.jmx.enabled }}
            - containerPort: {{ .Values.jmx.port }}
              name: jmx
            {{- end }}
          resources:
{{ toYaml .Values.resources | indent 12 }}
          env:
            {{- if .Values.connectAdvertisedHostName }}
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              value: "{{ .Values.connectAdvertisedHostName }}"
            {{- else }}
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            {{- end }}
            - name: CONNECT_BOOTSTRAP_SERVERS
              value: {{ template "cp-kafka-connect.kafka.bootstrapServers" . }}
            - name: CONNECT_GROUP_ID
              value: {{ template "cp-kafka-connect.groupId" . }}
            - name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
              value: {{ template "cp-kafka-connect.cp-schema-registry.service-name" .}}
            - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
              value: {{ template "cp-kafka-connect.cp-schema-registry.service-name" .}}
            - name: KAFKA_HEAP_OPTS
              value: "{{ .Values.heapOptions }}"
            # Common Kafka properties
            - name: CONNECT_SECURITY_PROTOCOL
              value: "{{ .Values.auth.kafka.security.protocol }}"
            - name: CONNECT_SASL_MECHANISM
              value: "{{ .Values.auth.kafka.sasl.mechanism }}"
            - name: CONNECT_SASL_JAAS_CONFIG
              valueFrom:
                secretKeyRef:
                  name: {{ template "cp-kafka-connect.fullname" . }}-secrets
                  key: sasl.jaas.config
            # Kafka producer properties
            - name: CONNECT_PRODUCER_SECURITY_PROTOCOL
              value: "{{ .Values.auth.kafka.security.protocol }}"
            - name: CONNECT_PRODUCER_SASL_MECHANISM
              value: "{{ .Values.auth.kafka.sasl.mechanism }}"
            - name: CONNECT_PRODUCER_SASL_JAAS_CONFIG
              valueFrom:
                secretKeyRef:
                  name: {{ template "cp-kafka-connect.fullname" . }}-secrets
                  key: sasl.jaas.config
            # Kafka consumer properties
            - name: CONNECT_CONSUMER_SECURITY_PROTOCOL
              value: "{{ .Values.auth.kafka.security.protocol }}"
            - name: CONNECT_CONSUMER_SASL_MECHANISM
              value: "{{ .Values.auth.kafka.sasl.mechanism }}"
            - name: CONNECT_CONSUMER_SASL_JAAS_CONFIG
              valueFrom:
                secretKeyRef:
                  name: {{ template "cp-kafka-connect.fullname" . }}-secrets
                  key: sasl.jaas.config
            {{- range $key, $value := .Values.configuration }}
            - name: {{ printf "%s" $key | replace "." "_" | upper | quote }}
              value: {{ $value | quote }}
            {{- end }}
            {{- range $key, $value := .Values.customEnv }}
            - name: {{ $key | quote }}
              value: {{ $value | quote }}
            {{- end }}
            {{- if .Values.jmx.port }}
            - name: KAFKA_JMX_PORT
              value: "{{ .Values.jmx.port }}"
            {{- end }}
        {{- if .Values.customEnv.CUSTOM_SCRIPT_PATH }}
          command:
            - /bin/bash
            - -c
            - |
              /etc/confluent/docker/run &
              $CUSTOM_SCRIPT_PATH
              sleep infinity
          {{- if .Values.livenessProbe }}
          livenessProbe:
{{ toYaml .Values.livenessProbe | trim | indent 12 }}
          {{- end }}
        {{- end }}
          {{- if .Values.volumeMounts }}
          volumeMounts:
{{ toYaml .Values.volumeMounts | indent 10 }}
          {{- end}}
      {{- if .Values.imagePullSecrets }}
      imagePullSecrets:
{{ toYaml .Values.imagePullSecrets | indent 8 }}
      {{- end }}
      volumes:
      {{- if .Values.volumes }}
{{ toYaml .Values.volumes | trim | indent 6 }}
      {{- end}}
      {{- if .Values.prometheus.jmx.enabled }}
      - name: jmx-config
        configMap:
          name: {{ template "cp-kafka-connect.fullname" . }}-jmx-configmap
      {{- end }}
      {{- if .Values.nodeSelector }}
      nodeSelector:
{{ toYaml .Values.nodeSelector | indent 8 }}
      {{- end }}
      {{- if .Values.tolerations }}
      tolerations:
{{ toYaml .Values.tolerations | indent 8 }}
      {{- end }}
      {{- if .Values.affinity }}
      affinity:
{{ toYaml .Values.affinity | indent 8 }}
      {{- end }}

Do you have any ideas what should I check within my minikube to fix this issue?

Btw I have many logs now, if you need something specific - tell me, please

P.S. The same configuration but on test environment in original k8s (not minikube) was ran successfully. Hence I think the bottleneck here is minikube, but I don't understand how to fix it locally

@vladislav-bakanov Are you still running into issues with this?

@Paultagoras , hi! Yes still getting the same error during local deployment on minikube.