Kafka bindings for Haskell backed by the librdkafka C module.
This project is inspired by Haskakafka which unfortunately doesn't seem to be actively maintained.
HaskellWorks Kafka ecosystem is described here: https://github.com/haskell-works/hw-kafka
High level consumers are supported by librdkafka
starting from version 0.9.
High-level consumers provide an abstraction for consuming messages from multiple
partitions and topics. They are also address scalability (up to a number of partitions)
by providing automatic rebalancing functionality. When a new consumer joins a consumer
group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
$ stack build --flag hw-kafka-client:examples
or
$ stack build --exec kafka-client-example --flag hw-kafka-client:examples
A working consumer example can be found here: ConsumerExample.hs
To run an example please compile with the examples
flag:
import Data.Monoid ((<>))
import Kafka
import Kafka.Consumer
-- Global consumer properties
consumerProps :: ConsumerProperties
consumerProps = consumerBrokersList [BrokerAddress "localhost:9092"]
<> groupId (ConsumerGroupId "consumer_example_group")
<> noAutoCommit
<> consumerDebug [DebugAll]
-- Subscription to topics
consumerSub :: Subscription
consumerSub = topics [TopicName "kafka-client-example-topic"]
<> offsetReset Earliest
-- Running an example
runConsumerExample :: IO ()
runConsumerExample = do
res <- runConsumer consumerProps consumerSub processMessages
print res
-------------------------------------------------------------------
processMessages :: KafkaConsumer -> IO (Either KafkaError ())
processMessages kafka = do
mapM_ (\_ -> do
msg1 <- pollMessage kafka (Timeout 1000)
putStrLn $ "Message: " <> show msg1
err <- commitAllOffsets kafka OffsetCommit
putStrLn $ "Offsets: " <> maybe "Committed." show err
) [0 .. 10]
return $ Right ()
kafka-client
producer supports sending messages to multiple topics.
Target topic name is a part of each message that is to be sent by produceMessage
.
A working producer example can be found here: ProducerExample.hs
import Control.Monad (forM_)
import Kafka
import Kafka.Producer
-- Global producer properties
producerProps :: ProducerProperties
producerProps = producerBrokersList [BrokerAddress "localhost:9092"]
-- Topic to send messages to
targetTopic :: TopicName
targetTopic = TopicName "kafka-client-example-topic"
-- Run an example
runProducerExample :: IO ()
runProducerExample = do
res <- runProducer producerProps sendMessages
print res
sendMessages :: KafkaProducer -> IO (Either KafkaError ())
sendMessages prod = do
err1 <- produceMessage prod ProducerRecord
{ prTopic = targetTopic
, prPartition = UnassignedPartition
, prKey = Nothing
, prValue = Just "test from producer"
}
forM_ err1 print
err2 <- produceMessage prod ProducerRecord
{ prTopic = targetTopic
, prPartition = UnassignedPartition
, prKey = Just "key"
, prValue = Just "test from producer (with key)"
}
forM_ err2 print
return $ Right ()
Although librdkafka
is available on many platforms, most of
the distribution packages are too old to support kafka-client
.
As such, we suggest you install from the source:
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make && make install
Sometimes it is helpful to specify openssl includes explicitly:
LDFLAGS=-L/usr/local/opt/openssl/lib CPPFLAGS=-I/usr/local/opt/openssl/include ./configure
The full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart
Alternatively docker-compose
can be used to run Kafka locally inside a Docker container.
To run Kafka inside Docker:
$ docker-compose up