
Frizzle for Apache Kafka

Primary LanguageGoMIT LicenseMIT


Travis Build Status Coverage Status MIT licensed GitHub release Go Report Card GoDoc

Frafka is a Kafka implementation for Frizzle based on confluent-go-kafka.

Frizzle is a magic message (Msg) bus designed for parallel processing w many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Prereqs / Build instructions

Go mod

As of Go 1.11, frafka uses go mod for dependency management.

Install librdkafka

Frafka depends on C library librdkafka (>=v0.11.6). For Debian 9+ (which includes golang docker images), it has to be built from source. Fortunately, there's a script for that.

  # Install librdkafka
  - curl --silent -OL https://raw.githubusercontent.com/confluentinc/confluent-kafka-go/v0.11.4/mk/bootstrap-librdkafka.sh
  - bash bootstrap-librdkafka.sh v0.11.4 /usr/local
  - ldconfig

Once that is installed, should be good to go with

go get github.com/qntfy/frafka
cd frafka
go build

Running the tests

Frafka has integration tests which require a kafka broker to test against. KAFKA_BROKERS environment variable is used by tests. simplesteph/kafka-stack-docker-compose has a great simple docker-compose setup that is used in frafka CI currently.

curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml
DOCKER_HOST_IP= docker-compose -f kafka.yml up -d
# takes a while to initialize; can use a tool like wait-for-it.sh in scripting
go test -v --cover ./...


Frafka Sources and Sinks are configured using Viper.

func InitSink(config *viper.Viper) (*Sink, error)

func InitSource(config *viper.Viper) (*Source, error)

We typically initialize Viper through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix before the below values.

Variable Required Description Default
KAFKA_BROKERS required address(es) of kafka brokers, space separated
KAFKA_TOPICS source topic(s) to read from
KAFKA_CONSUMER_GROUP source consumer group value for coordinating multiple clients
KAFKA_CONSUME_LATEST_FIRST source (optional) start at the beginning or end of topic earliest
KAFKA_MAX_BUFFER_KB optional How large a buffer to allow for prefetching and batch produing kafka message* 16384

*KAFKA_MAX_BUFFER_KB is passed through to librdkafka. Default is 16MB. Corresponding librdkafka config values are queue.buffering.max.kbytes (Producer) and queued.max.messages.kbytes (Consumer). Note that librdkafka creates one buffer each for the Producer (Sink) and for each topic+partition being consumed by the source. E.g. with default 16MB default, if you are consuming from 4 partitions and also producing then the theoretical max memory usage from the buffer would be 16*(4+1) = 80 MB.

Async Error Handling

Since records are sent in batch fashion, Kafka may report errors or other information asynchronously. Event can be recovered via channels returned by the Sink.Events() and Source.Events() methods. Partition changes and EOF will be reported as non-error Events, other errors will conform to error interface. Where possible, Events will retain underlying type from confluent-kafka-go if more information is desired.


Contributions welcome! Take a look at open issues.