dasch/avro_turf

avro_turf and thread safety

steveburkett opened this issue · 4 comments

hi there.

Can someone please help determine if this is "safe" for use in Puma (where we run N threads for our web app that uses avro_turf). We would want to push messages into the gem's batching when it then releases when it hits the threshold, but this could get "messed up" if we're running say 5 threads in a Puma process all hitting this code at the same time? should i push the instance into a thread local variable?

class AsyncKafkaProducer
  def self.instance
    @@instance ||=
      begin
        OxKafka.instance.async_producer(
          delivery_interval: Rails.application.secrets.kafka[:delivery_interval],
          delivery_threshold: Rails.application.secrets.kafka[:delivery_threshold]
        )
      end
  end
end
class OxKafka
  def self.instance
    @@instance ||=
      begin
        Kafka.new(Rails.application.secrets.kafka[:broker_host].split(','), logger: Rails.logger)
      end
  end
end
dasch commented

Instantiating the class variable is not thread safe (you're first reading it, then setting it if it's nil – but there could be a race condition there).

If you eagerly set the instance at application boot time then it's fine – the async producer is thread safe, so multiple threads can produce using it. That's in fact one of the main use cases for the async producer.

dasch commented

By application boot time, I mean before the threads are started, so that they all read the same instance.

@dasch thanks so much for your input.

The code above (setting the singleton in the class variable) is done inside an /config/initializer. So, i believe this is what you mean by doing it at boot time (before any request). True?

e.g., this:

AsyncKafkaProducer.instance
KafkaAvroTurf.instance

inside of a config/initializer (so that the class singleton is created at boot time).

Last, a teammate of mine asked this question. Since we're using puma and its forking mode...his question was does avro-turf and the async producer hold file descriptors open at the time of forking? or is there some issue that this gem could run into, if used within a forking mode? He said that this is a common problem in rails (gems that leave connections open and forking occurring...and that it's important for gem authors to close connections and reopen before/after forking.

I do have this in the initializer:

at_exit { AsyncKafkaProducer.instance.shutdown }
dasch commented

This is how you would do it eagerly at boot time:

class AsyncKafkaProducer
  @@instance =
      begin
        OxKafka.instance.async_producer(
          delivery_interval: Rails.application.secrets.kafka[:delivery_interval],
          delivery_threshold: Rails.application.secrets.kafka[:delivery_threshold]
        )
      end
  end

  def self.instance; @@instance; end
end

class OxKafka
  def self.instance; @@instance; end

  @@instance =
      begin
        Kafka.new(Rails.application.secrets.kafka[:broker_host].split(','), logger: Rails.logger)
      end
  end
end

I don't quite understand your setup – are you both forking and threading? For forking, you should defer starting the instances until after the fork happens, but for threading you want to reuse the instances across threads in this case.