karafka/rdkafka-ruby

Producing not working + crash for forked processes

thijsc opened this issue · 5 comments

A C rdkafka instance does not survive a fork. Producing or polling does not work and rd_kafka_destroy has a failing assertion that crashes the process when it is called via https://github.com/appsignal/rdkafka-ruby/blob/master/lib/rdkafka/config.rb#L149

A fix to not let rd_kafka_destroy crash in this scenario was added in librdkafka: confluentinc/librdkafka@8c67e42

We should move to this release when it's there end of Februari. For the failing produces and polls I see a few options:

  • Raise an exception if our pid changed
  • Recreate the rdkafka instance if our pid changed
  • Add a forked hook you need to call after forking, possibly with a Unicorn integration
agis commented

@thijsc I'm trying producing from a forked process, but apparently nothing is written to the socket (i.e. nothing is produced).

require "rdkafka"

puts Rdkafka::LIBRDKAFKA_VERSION
puts Rdkafka::VERSION

config = {:"bootstrap.servers" => "kafka-a.vm.skroutz.gr:9092"}
producer = Rdkafka::Config.new(config).producer

fork do
  puts "Producing message"

  producer.produce(topic: "test-rb", payload: "foo").wait

  puts "this is never printed"
end

puts Process.wait

The script blocks at the call to wait, until the default timeout (60sec.) is reached:

$ ruby producer.rb
1.2.0
0.7.0
Producing message

^CTraceback (most recent call last):
        6: from producer.rb:9:in `<main>'
        5: from producer.rb:9:in `fork'
        4: from producer.rb:12:in `block in <main>'
        3: from /home/agis/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:49:in `wait'
        2: from /home/agis/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:49:in `loop'
        1: from /home/agis/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:54:in `block in wait'
/home/hyu/.gem/ruby/2.6.3/gems/rdkafka-0.7.0/lib/rdkafka/producer/delivery_handle.rb:54:in `sleep': Interrupt
Traceback (most recent call last):
        1: from producer.rb:17:in `<main>'
producer.rb:17:in `wait': Interrupt

The forked process writes nothing to the socket (verified with tcpdump) and the consumer never sees any messages.

The forked process seems to be stuck in a loop of ppol and read syscalls (output from strace):

ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=100000000}, NULL, 8) = 0 (Timeout)
read(3, 0x7ffc0ce16660, 8)              = -1 EAGAIN (Resource temporarily unavailable)

Is my script supposed to work or am I missing something?

Thanks!

P.S. I'm on Linux Debian with ruby 2.6.3p62.

This indeed does not work, rdkafka does not survive a fork. It will work if you move producer = Rdkafka::Config.new(config).producer into the fork.

Not sure exactly sure still how to make this clear and user friendly, any thoughts on that?

agis commented

So there is no way to reuse a producer object across multiple forks?

9882ce4 gave me the impression that it should work (isn't that spec doing the same thing essentially?) If it doesn't, then that part of the README shouldn't be removed I guess(?)

Creating the producer after the fork defeats the purpose of what I'm trying to achieve: create a producer in the parent and reuse it across the children (e.g. this would be an ideal use case for Resque).

agis commented

After reading @edenhill's comment on https://github.com/edenhill/librdkafka/blob/master/tests/0079-fork.c#L37-L42, it's apparent that this use-case is not possible in librdkafka. Perhaps we should bring back the relevant README section informing that the client must be created after forking?

Creating the producer after the fork defeats the purpose of what I'm trying to achieve: create a producer in the parent and reuse it across the children (e.g. this would be an ideal use case for Resque).

That's impossible I'm afraid. When you fork you create a separate Unix process. That process does not share state with the original one. Even if the producer survived the fork it would still be a copy in a separate process that's not being reused. I actually wrote a blog post about this a few years ago that might be good reading :-).

You'd have to create some setup with unix sockets for example to be able to communicate between them and reuse a single producer.