Gem used to send messages to Kafka in an easy way with an extra validation layer. It is a part of the Karafka ecosystem.
WaterDrop is based on Zendesks delivery_boy gem.
It is:
- Thread safe
- Supports sync and async producers
gem install waterdrop
or add this to your Gemfile:
gem 'waterdrop'
and run
bundle install
WaterDrop is a complex tool, that contains multiple configuration options. To keep everything organized, all the configuration options were divided into two groups:
- WaterDrop options - options directly related to Karafka framework and it's components
- Ruby-Kafka driver options - options related to Ruby-Kafka/Delivery boy
To apply all those configuration options, you need to use the #setup
method:
WaterDrop.setup do |config|
config.deliver = true
config.kafka.seed_brokers = %w[kafka://localhost:9092]
end
Option | Description |
---|---|
client_id | This is how the client will identify itself to the Kafka brokers |
logger | Logger that we want to use |
deliver | Should we send messages to Kafka |
Note: We've listed here only the most important configuration options. If you're interested in all the options, please go to the config.rb file for more details.
Note: All the options are subject to validations. In order to check what is and what is not acceptable, please go to the config.rb validation schema file.
Option | Description |
---|---|
delivery_interval | The number of seconds between background message deliveries. Disable timer-based background deliveries by setting this to 0. |
delivery_threshold | The number of buffered messages that will trigger a background message delivery. Disable buffer size based background deliveries by setting this to 0. |
required_acks | The number of Kafka replicas that must acknowledge messages before they're considered as successfully written. |
ack_timeout | A timeout executed by a broker when the client is sending messages to it. |
max_retries | The number of retries when attempting to deliver messages. |
retry_backoff | The number of seconds to wait after a failed attempt to send messages to a Kafka broker before retrying. |
max_buffer_bytesize | The maximum number of bytes allowed in the buffer before new messages are rejected. |
max_buffer_size | The maximum number of messages allowed in the buffer before new messages are rejected. |
max_queue_size | The maximum number of messages allowed in the queue before new messages are rejected. |
sasl_plain_username | The username used to authenticate. |
sasl_plain_password | The password used to authenticate. |
This configuration can be also placed in config/initializers and can vary based on the environment:
WaterDrop.setup do |config|
config.deliver = Rails.env.production?
config.kafka.seed_brokers = [Rails.env.production? ? 'kafka://prod-host:9091' : 'kafka://localhost:9092']
end
To send Kafka messages, just use one of the producers:
WaterDrop::SyncProducer.call('message', topic: 'my-topic')
# or for async
WaterDrop::AsyncProducer.call('message', topic: 'my-topic')
Both SyncProducer
and AsyncProducer
accept following options:
Option | Required | Value type | Description |
---|---|---|---|
topic |
true | String | The Kafka topic that should be written to |
key |
false | String | The key that should be set on the Kafka message |
partition |
false | Integer | A specific partition number that should be written to |
partition_key |
false | String | A string that can be used to deterministically select the partition |
Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).
Fork the project. Make your feature addition or bug fix. Add tests for it. This is important so we don't break it in a future versions unintentionally. Commit, do not mess with version, or history. (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull). Send me a pull request. Bonus points for topic branches.
Each pull request must pass our quality requirements. To check if everything is as it should be, we use Coditsu that combines multiple linters and code analyzers for both code and documentation.
Unfortunately, it does not yet support independent forks, however you should be fine by looking at what we require.
Please run:
bundle exec rspec
to check if everything is in order. After that you can submit a pull request.