/waterdrop

Ruby-Kafka based aspect oriented library for Kafka messages generating

Primary LanguageRubyMIT LicenseMIT

WaterDrop

Build Status Join the chat at https://gitter.im/karafka/karafka

Gem used to send messages to Kafka in an easy way with an extra validation layer. It is a part of the Karafka ecosystem.

WaterDrop is based on Zendesks delivery_boy gem.

It is:

  • Thread safe
  • Supports sync and async producers

Installation

gem install waterdrop

or add this to your Gemfile:

gem 'waterdrop'

and run

bundle install

Setup

WaterDrop is a complex tool, that contains multiple configuration options. To keep everything organized, all the configuration options were divided into two groups:

  • WaterDrop options - options directly related to Karafka framework and it's components
  • Ruby-Kafka driver options - options related to Ruby-Kafka/Delivery boy

To apply all those configuration options, you need to use the #setup method:

WaterDrop.setup do |config|
  config.deliver = true
  config.kafka.seed_brokers = %w[kafka://localhost:9092]
end

WaterDrop configuration options

Option Description
client_id This is how the client will identify itself to the Kafka brokers
logger Logger that we want to use
deliver Should we send messages to Kafka

Ruby-Kafka driver and Delivery boy configuration options

Note: We've listed here only the most important configuration options. If you're interested in all the options, please go to the config.rb file for more details.

Note: All the options are subject to validations. In order to check what is and what is not acceptable, please go to the config.rb validation schema file.

Option Description
delivery_interval The number of seconds between background message deliveries. Disable timer-based background deliveries by setting this to 0.
delivery_threshold The number of buffered messages that will trigger a background message delivery. Disable buffer size based background deliveries by setting this to 0.
required_acks The number of Kafka replicas that must acknowledge messages before they're considered as successfully written.
ack_timeout A timeout executed by a broker when the client is sending messages to it.
max_retries The number of retries when attempting to deliver messages.
retry_backoff The number of seconds to wait after a failed attempt to send messages to a Kafka broker before retrying.
max_buffer_bytesize The maximum number of bytes allowed in the buffer before new messages are rejected.
max_buffer_size The maximum number of messages allowed in the buffer before new messages are rejected.
max_queue_size The maximum number of messages allowed in the queue before new messages are rejected.
sasl_plain_username The username used to authenticate.
sasl_plain_password The password used to authenticate.

This configuration can be also placed in config/initializers and can vary based on the environment:

WaterDrop.setup do |config|
  config.deliver = Rails.env.production?
  config.kafka.seed_brokers = [Rails.env.production? ? 'kafka://prod-host:9091' : 'kafka://localhost:9092']
end

Usage

To send Kafka messages, just use one of the producers:

WaterDrop::SyncProducer.call('message', topic: 'my-topic')
# or for async
WaterDrop::AsyncProducer.call('message', topic: 'my-topic')

Both SyncProducer and AsyncProducer accept following options:

Option Required Value type Description
topic true String The Kafka topic that should be written to
key false String The key that should be set on the Kafka message
partition false Integer A specific partition number that should be written to
partition_key false String A string that can be used to deterministically select the partition

Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).

References

Note on Patches/Pull Requests

Fork the project. Make your feature addition or bug fix. Add tests for it. This is important so we don't break it in a future versions unintentionally. Commit, do not mess with version, or history. (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull). Send me a pull request. Bonus points for topic branches.

coditsu

Each pull request must pass our quality requirements. To check if everything is as it should be, we use Coditsu that combines multiple linters and code analyzers for both code and documentation.

Unfortunately, it does not yet support independent forks, however you should be fine by looking at what we require.

Please run:

bundle exec rspec

to check if everything is in order. After that you can submit a pull request.