Spring Boot and Kafka quickstarts
This repository contains a set of code samples around Kafka Clients and Kafka Streams leveraging Spring Boot
to simplify the development of applications.
This section contains quickstarts around the Producer API with unit tests using the MockProducer API.
Module
Library
Content
Avro Generic
Kafka Clients
Produce generic Avro records
Avro Specific
Kafka Clients
Produce specific Avro records
Headers
Kafka Clients
Produce records with headers
Simple
Kafka Clients
Produce String records
Transaction
Kafka Clients
Produce to multiple topics while guaranteeing atomicity
This section contains quickstarts around the Consumer API with unit tests using the MockConsumer API.
Module
Library
Content
Avro Generic
Kafka Clients
Consume generic Avro records
Avro Specific
Kafka Clients
Consume specific Avro records
Circuit breaker
Kafka Clients
Consume records while handling poison pills
Headers
Kafka Clients
Consume records with headers
Retry external system
Kafka Clients
Consume records while retrying on failed external system calls
Simple
Kafka Clients
Consume String records
Transaction
Kafka Clients
Consume records from committed transactions
This section contains quickstarts around the Kafka Streams API with unit tests using the TopologyTestDriver API.
Module
Library
DSL
Global Table
Kafka Streams
Source topic as global table
globalTable()
Table
Kafka Streams
Source topic as table
table()
Stream
Kafka Streams
Source topic as stream
stream()
Module
Library
Content
DSL
Branch
Kafka Streams
Split and create branches from a stream
split()
, branch()
Cogroup
Kafka Streams
Aggregate records of multiple streams by key
cogroup()
Filter
Kafka Streams
Retain or drop records based on a predicate
filter()
, filterNot()
FlatMap
Kafka Streams
Change one record into 0, 1 or n records
flatMap()
FlatMapValues
Kafka Streams
Change one record value into 0, 1 or n record values
flatMapValues()
Foreach
Kafka Streams
Perform a terminal operation on each record
foreach()
Map
Kafka Streams
Change one record into another record
map()
MapValues
Kafka Streams
Change one record value into another record value
mapValues()
Merge
Kafka Streams
Merge two streams into one stream
merge()
Print
Kafka Streams
Print a stream to the system output or a file
print()
Repartition
Kafka Streams
Trigger a repartitioning of the stream
repartition()
SelectKey
Kafka Streams
Change the key of each record
selectKey()
Module
Library
Content
DSL
Aggregate
Kafka Streams
Aggregate a stream by key in a single object
groupByKey()
, aggregate()
Aggregate Hopping Window
Kafka Streams
Aggregate a stream by key and by hopping window with a grace period
groupByKey()
, aggregate()
, windowedBy()
, advanceBy()
Aggregate Sliding Window
Kafka Streams
Aggregate a stream by key and by sliding window with a grace period
groupByKey()
, aggregate()
, windowedBy()
Aggregate Tumbling Window
Kafka Streams
Aggregate a stream by key and by tumbling window with a grace period
groupByKey()
, aggregate()
, windowedBy()
Average
Kafka Streams
Compute an average value of a stream by key
groupBy()
, aggregate()
Count
Kafka Streams
Count the number of records of a stream by key
groupBy()
, count()
Reduce
Kafka Streams
Reduce the records of a stream by key
groupBy()
, reduce()
Module
Library
Content
DSL
Hopping Window
Kafka Streams
Group records by hopping window with a grace period
windowedBy()
, advanceBy()
Sliding Window
Kafka Streams
Group records by sliding window with a grace period
windowedBy()
Tumbling Window
Kafka Streams
Group records by tumbling window with a grace period
windowedBy()
Module
Library
Content
DSL
Processor API
Process
Kafka Streams
Apply a processor to a stream
process()
context()
, forward()
, Record#headers()
ProcessValues
Kafka Streams
Apply a fixed key processor to a stream
processValues()
context()
, forward()
, Record#headers()
Schedule
Kafka Streams
Schedule punctuation functions based on wall clock time and stream time
process()
schedule()
, getStateStore()
Schedule Store Cleanup
Kafka Streams
Schedule periodic store cleanup based on stream time
process()
, addStateStore()
schedule()