kafkaex/kafka_ex

KafkaEx.stream timing out trying to fetch more messages even with no_wait_at_logend

yogodoshi opened this issue · 1 comments

Issue description

After banging my head trying to understand why I was getting timeouts from Kafka I noticed that the timeout was on a fetch call where the offset param was exactly the last offset of the given partition.

Error:

15:38:51.553 [error] GenServer #PID<0.306.0> terminating
** (stop) exited in: GenServer.call(:kafka_ex, {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 1, offset: 748734, partition: 2, topic: "sms_4pt_3kk_1kk", wait_time: 60000}}, 60000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:1010: GenServer.call/3
    (kafka_ex) lib/kafka_ex/stream.ex:149: Enumerable.KafkaEx.Stream.fetch_response/2
    (kafka_ex) lib/kafka_ex/stream.ex:28: anonymous fn/3 in Enumerable.KafkaEx.Stream.reduce/3
    (elixir) lib/stream.ex:1407: Stream.do_resource/5
    (gen_stage) lib/gen_stage/streamer.ex:24: GenStage.Streamer.handle_demand/2
    (gen_stage) lib/gen_stage.ex:2099: GenStage.noreply_callback/3
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
Last message: {:"$gen_producer", {#PID<0.307.0>, #Reference<0.1745854737.2488270849.18939>}, {:ask, 500}}
State: #Function<20.35756501/1 in Stream.do_list_resource/6>

Checking that:

iex(3)> KafkaEx.latest_offset("sms_4pt_3kk_1kk", 2)  
[
  %KafkaEx.Protocol.Offset.Response{
    partition_offsets: [
      %{error_code: :no_error, offset: [748734], partition: 2}
    ],
    topic: "sms_4pt_3kk_1kk"
  }
]

Debugging

So I went to check how KafkaEx.stream worked to see if I was doing something wrong and here is what I found out:

In the Stream implementation, there is the next_fun which:

  1. fetches more messages from Kafka
  2. commits them or not
  3. calls stream_control which decides if it should continue the Stream process or not

And the issue seems to be in the order these things are done because at one moment the stream will reach the offset and this is what looks like it is happening:

  1. try to fetch messages from the latest offset (748734 in my case)
  2. seems like Kafka doesn't answer as it doesn't have any new message to send
  3. Genserver times out

Maybe there should be a check before doing the fetch call if the given offset is the latest and simply not do it if so? 🤔

I don't know if this is an edge case because of a combination of timeout settings and Kafka consumer settings, if it is something related to Flow, or if this is all wrong 😆

Code and dependencies versions

I'm using:

  • kafka_ex from the commit 08e91caaaf2e79b676b822efedbcd90c3bbe518e (because I wanted something that was merged after the last release;
  • Kafka 0.11.0
  • Flow 0.14

My code on a high level (let me know if you want details of any part):

topic_name
|> retrieve_partitions_ids
|> create_streams(topic_name)
|> Flow.from_enumerables()
|> Flow.map(&SmsEtl.Flow.GenConsumer.prepare_for_ets/1)
|> Flow.partition(key: {:elem, 0})
|> Flow.reduce(fn -> ets_table_ref end, &save_kafka_message_to_ets/2)
|> Flow.run()
|> create_csvs_and_manifest(table_name)
|> SmsEtl.S3.upload_manifest_and_its_csvs()
|> persist_on_redshift(insert_method)

defp create_streams(partitions_ids, topic_name) do
  Enum.map(partitions_ids, fn partition_id ->
  KafkaEx.stream(topic_name, partition_id, no_wait_at_logend: true, auto_commit: false, wait_time: 30_000, offset: 0)
  end)
end

in the latest release 0.11 we have fixed the stream API to support the global sync_timeout setting. Perhaps you need the wait_time to be less than the sync_timeout?