fluent/fluent-plugin-kafka

Newly introduced share_producer setting in 0.18.0 for out_kafka2 plugin results in BufferOverflowError

pchheda-lyft opened this issue · 3 comments

Describe the bug

Producing to Kafka works as expected, with 0.17.5

However, with 0.18.0, producing to Kafka results in BufferOverflowError (essentially less performant?). This is true if share_producer setting is marked as true or even false

To Reproduce

Flow of traffic ~7000 events/sec

`
@type kafka2
# Hence, decreasing verbosity to warn
# For more info, see https://docs.fluentd.org/deployment/logging
brokers "#{ENV['KAFKA_BOOTSTRAP_SERVER']}"
default_topic events
# authentication settings
ssl_ca_cert ["/code/lumosingest/kafka-certs/cert.2.pem", "/code/lumosingest/kafka-certs/cert.1.pem"]
username "writer"
password "#{ENV['PW']}"
scram_mechanism "sha256"
sasl_over_ssl true
ssl_verify_hostname false
# producer settings
required_acks -1
max_send_retries 10

time_key collected_at
time_type string
time_format %Y-%m-%dT%H:%M:%S.%3NZ


@type json

get_kafka_client_log true

max_send_limit_bytes 2850000

<buffer events>
  flush_at_shutdown true
  flush_mode interval
  flush_interval 1s
  flush_thread_count 16
  queued_chunks_limit_size 16
  chunk_limit_size 3MB
  chunk_full_threshold 0.8
  total_limit_size 1024MB
  overflow_action block
</buffer>

`

Have tried multiple settings, to increase flush_thread_count and queued_chunks_limit_size, chunk size and total limit size etc, to remove overflow_action as well.

this fails even in a staging environment with very less traffic.

Using this in conjunction with s3-sqs input plugin.

Expected behavior

expected share_producer setting to work, without impact.

Your Environment

- Fluentd version: 1.15.1
- TD Agent version:
- fluent-plugin-kafka version: 0.18.0
- ruby-kafka version: 1.5.0
- Operating system: 
- Kernel version:

Your Configuration

@type kafka2
# Hence, decreasing verbosity to warn
# For more info, see https://docs.fluentd.org/deployment/logging
brokers "#{ENV['KAFKA_BOOTSTRAP_SERVER']}"
default_topic events
# authentication settings
ssl_ca_cert ["/code/lumosingest/kafka-certs/cert.2.pem", "/code/lumosingest/kafka-certs/cert.1.pem"]
username "writer"
password "#{ENV['PW']}"
scram_mechanism "sha256"
sasl_over_ssl true
ssl_verify_hostname false
# producer settings
required_acks -1
max_send_retries 10
<inject>
  time_key collected_at
  time_type string
  time_format %Y-%m-%dT%H:%M:%S.%3NZ
</inject>
<format>
  @type json
</format>
get_kafka_client_log true

max_send_limit_bytes 2850000

<buffer events>
  flush_at_shutdown true
  flush_mode interval
  flush_interval 1s
  flush_thread_count 16
  queued_chunks_limit_size 16
  chunk_limit_size 3MB
  chunk_full_threshold 0.8
  total_limit_size 1024MB
  overflow_action block
</buffer>

Your Error Log

Jul 29, 2022 @ 15:24:49.811	2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:39:04 +0000 chunk="5e4f82e09e9659921a00f223c259a9ff" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
	Jul 29, 2022 @ 15:24:49.811	2022-07-29 22:24:49 +0000 [warn]: #0 Exception Backtrace : /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
	Jul 29, 2022 @ 15:24:49.811	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:324:in `write'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 suppressed same stacktrace
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:363:in `block in write'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
	Jul 29, 2022 @ 15:24:49.811	2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:31:13 +0000 chunk="5e4f82d4f428c362d34da82c4fc7a24c" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
	Jul 29, 2022 @ 15:24:49.811	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
	Jul 29, 2022 @ 15:24:49.811	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/event.rb:314:in `each'
	Jul 29, 2022 @ 15:24:49.811	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1180:in `try_flush'
	Jul 29, 2022 @ 15:24:49.811	2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:33:00 +0000 chunk="5e4f82dabac326b0b9bc430e6c802f90" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:210:in `produce'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
	Jul 29, 2022 @ 15:24:49.811	  2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/event.rb:314:in `each'
	Jul 29, 2022 @ 15:24:49.811	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:363:in `block in write'
	Jul 29, 2022 @ 15:24:49.811	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:501:in `block (2 levels) in start'
	Jul 29, 2022 @ 15:24:49.811	2022-07-29 22:24:49 +0000 [debug]: #0 taking back chunk for errors. chunk="5e4f82dbda43ccd13a5217dd5fc50351"
	Jul 29, 2022 @ 15:24:49.811	2022-07-29 22:24:49 +0000 [info]: #0 initialized kafka producer: fluentd
	Jul 29, 2022 @ 15:24:49.810	  2022-07-29 22:24:49 +0000 [warn]: #0 suppressed same stacktrace
	Jul 29, 2022 @ 15:24:49.810	2022-07-29 22:24:49 +0000 [warn]: #0 Send exception occurred: Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached
	Jul 29, 2022 @ 15:24:49.810	2022-07-29 22:24:49 +0000 [info]: #0 initialized kafka producer: fluentd
	Jul 29, 2022 @ 15:24:49.810	2022-07-29 22:24:49 +0000 [warn]: #0 Exception Backtrace : /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
	Jul 29, 2022 @ 15:24:49.810	/code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'

Additional context

No response

This issue also occurring with older version of fluentd (1.15.0) also if that helps.
Also
When using the

With adding the chunk_limit_records (set to 800), The following errors came

2022-08-08 16:44:37 +0000 [warn]: #3 chunk size limit exceeds for an emitted event stream: 1016records
2022-08-08 16:44:37 +0000 [warn]: #3 emit transaction failed: error_class=NoMethodError error="undefined method `synchronize' for nil:NilClass" location="/usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/buffer.rb:419:in `block in write'" tag="<log file name>"
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:198:in `rescue in emit_events'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:195:in `emit_events'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:115:in `emit_stream'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:620:in `receive_lines'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1109:in `block in handle_notify'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1145:in `with_io'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1069:in `handle_notify'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `block in on_notify'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `synchronize'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `on_notify'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:822:in `on_notify'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:452:in `construct_watcher'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:457:in `block in start_watchers'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:456:in `each_value'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:456:in `start_watchers'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:389:in `refresh_watchers'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:259:in `start'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:203:in `block in start'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:192:in `block (2 levels) in lifecycle'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:191:in `each'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:191:in `block in lifecycle'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:178:in `each'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:178:in `lifecycle'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:202:in `start'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/engine.rb:248:in `start'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/engine.rb:147:in `run'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:760:in `block in run_worker'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:1036:in `main_process'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:751:in `run_worker'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/command/fluentd.rb:386:in `<top (required)>'
  2022-08-08 16:44:37 +0000 [warn]: #3 <internal:/usr/share/rubygems/rubygems/core_ext/kernel_require.rb>:85:in `require'
  2022-08-08 16:44:37 +0000 [warn]: #3 <internal:/usr/share/rubygems/rubygems/core_ext/kernel_require.rb>:85:in `require'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/bin/fluentd:15:in `<top (required)>'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/bin/fluentd:23:in `load'
  2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/bin/fluentd:23:in `<main>'

Config:

  <match **>
    @type copy
    <store>
     @type kafka2
     brokers   <broker address>
     default_topic  <kafka topic>
     max_send_retries  3
     required_acks  -1
     get_kafka_client_log  false
     compression_codec  gzip
     <format>
       @type json
     </format>
       <buffer>
         @type file
         path /fluentd/buffer
         chunk_limit_size 128MB
         total_limit_size 2560MB
         flush_mode immediate
         flush_thread_count 15
         flush_thread_interval 1
         retry_type exponential_backoff
         retry_max_interval 90
         chunk_limit_records 800
         compress gzip
       </buffer>
    </store>
    <store>
     @type prometheus
     <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
       hostname "#{IPSocket.getaddress(Socket.gethostname)}"
      </labels>
     </metric>
    </store>
  </match>

Hey @pchheda-lyft thanks for reporting this. I am able to reproduce this with the following config

<source>
    @type sample
    sample {"hello": "world"}
    rate 7000
    tag sample
</source>

<match **>
    @type kafka2

    brokers "broker:29092"

    default_topic events
    get_kafka_client_log true
    shared_producer true

    <format>
        @type json
    </format>

    <buffer events>
      flush_at_shutdown true
      flush_mode interval
      flush_interval 1s
      flush_thread_count 16
      queued_chunks_limit_size 16
      chunk_limit_size 3MB
      chunk_full_threshold 0.8
      total_limit_size 1024MB
      overflow_action block
    </buffer>
</match>

Adding chunk_limit_records 1250 to the buffer section (1000 message / 0.8 threshold) seems to have "fixed" the issue. I'm guessing this is far from your ideal throughput, however. I have identified the root cause and will be putting up a fix soon.

Hey @pchheda-lyft thanks for reporting this. I am able to reproduce this with the following config

<source>
    @type sample
    sample {"hello": "world"}
    rate 7000
    tag sample
</source>

<match **>
    @type kafka2

    brokers "broker:29092"

    default_topic events
    get_kafka_client_log true
    shared_producer true

    <format>
        @type json
    </format>

    <buffer events>
      flush_at_shutdown true
      flush_mode interval
      flush_interval 1s
      flush_thread_count 16
      queued_chunks_limit_size 16
      chunk_limit_size 3MB
      chunk_full_threshold 0.8
      total_limit_size 1024MB
      overflow_action block
    </buffer>
</match>

Adding chunk_limit_records 1250 to the buffer section (1000 message / 0.8 threshold) seems to have "fixed" the issue. I'm guessing this is far from your ideal throughput, however. I have identified the root cause and will be putting up a fix soon.

Thanks for working on this! Yeah, I don't thinking adding chunk_limit_records should be needed. Looking forward to the fix! Appreciate the work!