Newly introduced share_producer setting in 0.18.0 for out_kafka2 plugin results in BufferOverflowError
pchheda-lyft opened this issue · 3 comments
Describe the bug
Producing to Kafka works as expected, with 0.17.5
However, with 0.18.0, producing to Kafka results in BufferOverflowError (essentially less performant?). This is true if share_producer setting is marked as true or even false
To Reproduce
Flow of traffic ~7000 events/sec
`
@type kafka2
# Hence, decreasing verbosity to warn
# For more info, see https://docs.fluentd.org/deployment/logging
brokers "#{ENV['KAFKA_BOOTSTRAP_SERVER']}"
default_topic events
# authentication settings
ssl_ca_cert ["/code/lumosingest/kafka-certs/cert.2.pem", "/code/lumosingest/kafka-certs/cert.1.pem"]
username "writer"
password "#{ENV['PW']}"
scram_mechanism "sha256"
sasl_over_ssl true
ssl_verify_hostname false
# producer settings
required_acks -1
max_send_retries 10
time_key collected_at
time_type string
time_format %Y-%m-%dT%H:%M:%S.%3NZ
@type json
get_kafka_client_log true
max_send_limit_bytes 2850000
<buffer events>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
flush_thread_count 16
queued_chunks_limit_size 16
chunk_limit_size 3MB
chunk_full_threshold 0.8
total_limit_size 1024MB
overflow_action block
</buffer>
`
Have tried multiple settings, to increase flush_thread_count and queued_chunks_limit_size, chunk size and total limit size etc, to remove overflow_action as well.
this fails even in a staging environment with very less traffic.
Using this in conjunction with s3-sqs input plugin.
Expected behavior
expected share_producer setting to work, without impact.
Your Environment
- Fluentd version: 1.15.1
- TD Agent version:
- fluent-plugin-kafka version: 0.18.0
- ruby-kafka version: 1.5.0
- Operating system:
- Kernel version:
Your Configuration
@type kafka2
# Hence, decreasing verbosity to warn
# For more info, see https://docs.fluentd.org/deployment/logging
brokers "#{ENV['KAFKA_BOOTSTRAP_SERVER']}"
default_topic events
# authentication settings
ssl_ca_cert ["/code/lumosingest/kafka-certs/cert.2.pem", "/code/lumosingest/kafka-certs/cert.1.pem"]
username "writer"
password "#{ENV['PW']}"
scram_mechanism "sha256"
sasl_over_ssl true
ssl_verify_hostname false
# producer settings
required_acks -1
max_send_retries 10
<inject>
time_key collected_at
time_type string
time_format %Y-%m-%dT%H:%M:%S.%3NZ
</inject>
<format>
@type json
</format>
get_kafka_client_log true
max_send_limit_bytes 2850000
<buffer events>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
flush_thread_count 16
queued_chunks_limit_size 16
chunk_limit_size 3MB
chunk_full_threshold 0.8
total_limit_size 1024MB
overflow_action block
</buffer>
Your Error Log
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:39:04 +0000 chunk="5e4f82e09e9659921a00f223c259a9ff" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 Exception Backtrace : /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:324:in `write'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 suppressed same stacktrace
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:363:in `block in write'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:31:13 +0000 chunk="5e4f82d4f428c362d34da82c4fc7a24c" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/event.rb:314:in `each'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1180:in `try_flush'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:33:00 +0000 chunk="5e4f82dabac326b0b9bc430e6c802f90" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:210:in `produce'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/event.rb:314:in `each'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:363:in `block in write'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:501:in `block (2 levels) in start'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [debug]: #0 taking back chunk for errors. chunk="5e4f82dbda43ccd13a5217dd5fc50351"
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [info]: #0 initialized kafka producer: fluentd
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [warn]: #0 suppressed same stacktrace
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [warn]: #0 Send exception occurred: Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [info]: #0 initialized kafka producer: fluentd
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [warn]: #0 Exception Backtrace : /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
Jul 29, 2022 @ 15:24:49.810 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Additional context
No response
This issue also occurring with older version of fluentd (1.15.0) also if that helps.
Also
When using the
With adding the chunk_limit_records (set to 800), The following errors came
2022-08-08 16:44:37 +0000 [warn]: #3 chunk size limit exceeds for an emitted event stream: 1016records
2022-08-08 16:44:37 +0000 [warn]: #3 emit transaction failed: error_class=NoMethodError error="undefined method `synchronize' for nil:NilClass" location="/usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/buffer.rb:419:in `block in write'" tag="<log file name>"
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:198:in `rescue in emit_events'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:195:in `emit_events'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:115:in `emit_stream'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:620:in `receive_lines'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1109:in `block in handle_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1145:in `with_io'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1069:in `handle_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `block in on_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `synchronize'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `on_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:822:in `on_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:452:in `construct_watcher'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:457:in `block in start_watchers'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:456:in `each_value'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:456:in `start_watchers'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:389:in `refresh_watchers'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:259:in `start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:203:in `block in start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:192:in `block (2 levels) in lifecycle'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:191:in `each'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:191:in `block in lifecycle'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:178:in `each'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:178:in `lifecycle'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:202:in `start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/engine.rb:248:in `start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/engine.rb:147:in `run'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:760:in `block in run_worker'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:1036:in `main_process'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:751:in `run_worker'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/command/fluentd.rb:386:in `<top (required)>'
2022-08-08 16:44:37 +0000 [warn]: #3 <internal:/usr/share/rubygems/rubygems/core_ext/kernel_require.rb>:85:in `require'
2022-08-08 16:44:37 +0000 [warn]: #3 <internal:/usr/share/rubygems/rubygems/core_ext/kernel_require.rb>:85:in `require'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/bin/fluentd:15:in `<top (required)>'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/bin/fluentd:23:in `load'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/bin/fluentd:23:in `<main>'
Config:
<match **>
@type copy
<store>
@type kafka2
brokers <broker address>
default_topic <kafka topic>
max_send_retries 3
required_acks -1
get_kafka_client_log false
compression_codec gzip
<format>
@type json
</format>
<buffer>
@type file
path /fluentd/buffer
chunk_limit_size 128MB
total_limit_size 2560MB
flush_mode immediate
flush_thread_count 15
flush_thread_interval 1
retry_type exponential_backoff
retry_max_interval 90
chunk_limit_records 800
compress gzip
</buffer>
</store>
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of outgoing records
<labels>
hostname "#{IPSocket.getaddress(Socket.gethostname)}"
</labels>
</metric>
</store>
</match>
Hey @pchheda-lyft thanks for reporting this. I am able to reproduce this with the following config
<source>
@type sample
sample {"hello": "world"}
rate 7000
tag sample
</source>
<match **>
@type kafka2
brokers "broker:29092"
default_topic events
get_kafka_client_log true
shared_producer true
<format>
@type json
</format>
<buffer events>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
flush_thread_count 16
queued_chunks_limit_size 16
chunk_limit_size 3MB
chunk_full_threshold 0.8
total_limit_size 1024MB
overflow_action block
</buffer>
</match>
Adding chunk_limit_records 1250
to the buffer section (1000 message / 0.8 threshold) seems to have "fixed" the issue. I'm guessing this is far from your ideal throughput, however. I have identified the root cause and will be putting up a fix soon.
Hey @pchheda-lyft thanks for reporting this. I am able to reproduce this with the following config
<source> @type sample sample {"hello": "world"} rate 7000 tag sample </source> <match **> @type kafka2 brokers "broker:29092" default_topic events get_kafka_client_log true shared_producer true <format> @type json </format> <buffer events> flush_at_shutdown true flush_mode interval flush_interval 1s flush_thread_count 16 queued_chunks_limit_size 16 chunk_limit_size 3MB chunk_full_threshold 0.8 total_limit_size 1024MB overflow_action block </buffer> </match>
Adding
chunk_limit_records 1250
to the buffer section (1000 message / 0.8 threshold) seems to have "fixed" the issue. I'm guessing this is far from your ideal throughput, however. I have identified the root cause and will be putting up a fix soon.
Thanks for working on this! Yeah, I don't thinking adding chunk_limit_records should be needed. Looking forward to the fix! Appreciate the work!