A generic Kafka client tuned for our own usage.


  • Messages are expected to be hashes and are BSON-serialized
  • Connections will be properly closed on exceptions
  • Consumer will stop gracefully on SIGTERM
  • At most once consumer semantics are used
  • Production via an HTTP proxy (including SSL)


require "cc/kafka"


producer = CC::Kafka::Producer.new("kafka://host:1234/topic", "client-id")
producer.send_message(foo: :bar, baz: :bat)


consumer = CC::Kafka::Consumer.new("client-id", ["kafka://host:1234", "..."], "topic", 0)
consumer.on_message do |message|
  # Given the producer above, message will be
  #   {
  #     "foo" => :bar,
  #     "baz" => :bat,
  #     CC::Kafka::MESSAGE_OFFSET_KEY => "topic-0-1",
  #   }


Note: the value for the MESSAGE_OFFSET_KEY identifies the message's offset within the given topic and partition as <topic>-<partition>-<offset>. It can be used by consumers to tie created data to the message that lead to it and prevent duplicate processing.


  • CC::Kafka.offset_model

    Must respond to find_for_create!(attributes) and return an object that responds to set(attributes).

    The attributes used are topic, partition, and current. And the object returned from find_or_create! must expose methods for each of these.

    A Minidoc-based module is included that can be included in client code for an offset model implementation that will work for many clients.

    class KafkaOffset < Minidoc
      include CC::Kafka::OffsetStorage::Minidoc
    CC::Kafka.offset_model = KafkaOffset

    Note: This is only necessary if using Consumer.

  • Kafka.logger

    This is optional and defaults to Logger.new(STDOUT). The configured object must have the same interface as the standard Ruby logger.


    Kafka.logger = Rails.logger
  • Kafka.statsd

    This is optional and defaults to a null object. The configured object should represent a statsd client and respond to the usual methods, increment, time, etc.

  • Kafka.ssl_ca_file

    Path to a custom SSL Certificate Authority file.

    Will result in:

    http.ca_file = Kafka.ssl_ca_file
  • Kafka.ssl_pem_file

    Path to a custom SSL Certificate (and key) in concatenated, PEM format.

    Will result in:

    pem = File.read(Kafka.ssl_pem_file)
    http.cert = OpenSSL::X509::Certificate.new(pem)
    http.key = OpenSSL::PKey::RSA.new(pem)
