ConduitIO/conduit-connector-kafka

Bug: Destination connector stops upon handling first message

Closed this issue · 6 comments

Bug description

Right after starting a pipeline with two builtin:kafka connectors (source from one topic, output to another), the destination connector immediately stops:

2022-03-25T18:36:56+00:00 DBG starting pipeline component=pipeline.Service pipeline_id=9155d960-165d-450e-8137-3c923b2b018c request_id=5e20b661-d430-4465-b20c-f8e8722a5359
2022-03-25T18:36:56+00:00 INF pipeline started component=pipeline.Service pipeline_id=9155d960-165d-450e-8137-3c923b2b018c request_id=5e20b661-d430-4465-b20c-f8e8722a5359
2022-03-25T18:36:56+00:00 DBG starting destination connector plugin component=connector.Destination connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a
2022-03-25T18:36:56+00:00 DBG starting source connector plugin component=connector.Source connector_id=0f08dab9-3b6c-46e2-b5c1-68513fd9ba24
2022-03-25T18:36:56+00:00 INF request processed duration=0.321186 grpc_method=/api.v1.PipelineService/StartPipeline grpc_status_code=OK http_endpoint="POST /v1/pipelines/9155d960-165d-450e-8137-3c923b2b018c/start" request_id=5e20b661-d430-4465-b20c-f8e8722a5359
2022-03-25T18:36:56+00:00 DBG configuring destination connector plugin component=connector.Destination connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a
2022-03-25T18:36:56+00:00 INF Configuring a Kafka Destination... connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a plugin_name=kafka plugin_type=destination
2022-03-25T18:36:56+00:00 DBG configuring source connector plugin component=connector.Source connector_id=0f08dab9-3b6c-46e2-b5c1-68513fd9ba24
2022-03-25T18:36:56+00:00 INF destination connector plugin successfully started component=connector.Destination connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a
2022-03-25T18:36:56+00:00 INF Configuring a Kafka Source... component=plugin connector_id=0f08dab9-3b6c-46e2-b5c1-68513fd9ba24 plugin_name=kafka plugin_type=source
2022-03-25T18:36:56+00:00 INF source connector plugin successfully started component=connector.Source connector_id=0f08dab9-3b6c-46e2-b5c1-68513fd9ba24
2022-03-25T18:36:56+00:00 DBG stopping destination connector plugin component=connector.Destination connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a
2022-03-25T18:36:56+00:00 DBG tearing down destination connector plugin component=connector.Destination connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a
2022-03-25T18:36:56+00:00 INF Tearing down a Kafka Destination... connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a plugin_name=kafka plugin_type=destination
2022-03-25T18:36:56+00:00 INF connector plugin successfully torn down component=connector.Destination connector_id=1507c91d-ba2d-45a7-8222-8ee34e90db7a

Steps to reproduce

1️⃣ Create pipeline
2️⃣ Add source kafka connector:

  {
    "type": "TYPE_SOURCE",
    "plugin": "builtin:kafka",
    "pipelineId": "b44b63c2-aa26-4ecb-aec3-5a4ea0e1d479",
    "config": {
      "name": "kafka-in",
      "settings": {
        "servers": "localhost:9092",
        "topic": "new-json-struct",
        "readFromBeginning": "true"
      }
    }
  }

3️⃣ Add destination kafka connector:

{
  "type": "TYPE_DESTINATION",
  "plugin": "builtin:kafka",
  "pipelineId": "b44b63c2-aa26-4ecb-aec3-5a4ea0e1d479",
  "config": {
    "name": "kafka-out",
    "settings": {
      "servers": "localhost:9092",
      "topic": "output-test-hi"
    }
  }
}

4️⃣ Start Pipeline - you'll see the destination connector stop. The destination topic was created on the kafka cluster, but no messages were written to it.

Maybe helpful - if I stop and restart conduit, then the message will successfully write to the destination. So I wonder if something got stuck / into a bad state before restarting?

Also - I tried this with the latest conduit main branch, along with updating to the latest commit of this connector - still had this issue.

Version

v0.2.0-nightly.20220325

Maybe helpful - if I stop and restart conduit, then the message will successfully write to the destination. So I wonder if something got stuck / into a bad state before restarting?

@samirketema This is helpful! So, after you restart Conduit, and then write more messages to the source topic, do they get written to the destination topic too?

From your message, I also understand that the destination topic kafka-out didn't exist at the time you created the pipeline?

This is helpful! So, after you restart Conduit, and then write more messages to the source topic, do they get written to the destination topic too?

Yup, after the restart the messages do get written to the destination topic.

From your message, I also understand that the destination topic kafka-out didn't exist at the time you created the pipeline?

Correct. However, upon the first time starting the pipeline, the destination topic was created, but no messages were written to the topic yet.

What I think is happening there is what at least I experienced when using some Kafka tools locally. When I want to consume from/produce to a topic which doesn't exist, and auto-create is enabled (even though I rarely use that), it takes some time for the broker to create the topic, which causes a timeout in the client.

What we can do here is to increase the timeout. But what I think we must do is check why don't we see in Conduit the actual error which made the destination stop.

@hariso what is the retry logic here? Does it throw a generic timeout error or something more specific?

@lyuboxa I just tested it, and when the topic doesn't exist at the moment of sending a message, and auto-create is enabled, the error message is:

Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes

The error type is specific, kafka.LeaderNotAvailable, so we can easily check for it, and retry.

Here are the PRs: