/kotka-streams

Kotka Streams - the Kotlin DSL for Kafka Streams

Primary LanguageKotlinApache License 2.0Apache-2.0

GitHub license Maven Central Maven Central Snapshots

Kotka Streams - Kotlin for Kafka Streams

Using Kotka means a more pleasant experience while using Kafka Streams.

Quickstart

Add a dependency on kotka-streams-extensions for the basics.

// build.gradle.kts
repositories {
  mavenCentral()
}

dependencies {
  implementation("dev.adamko.kotka:kotka-streams-extensions:$kotkaVersion")
}

Modules

There are three modules. Add a dependency on com.github.adamko-dev:kotka-streams to get them all at once

dependencies {
  implementation("dev.adamko.kotka:kotka-streams:$kotkaVersion")
}

kotka-streams-extensions

Contains the basic extension functions to make Kafka Streams more Kotlin-esque.

  implementation("dev.adamko.kotka:kotka-streams-extensions:$kotkaVersion")
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*

data class MusicalBand(
  val name: String,
  val memberNames: List<String>,
)

builder.stream<String, MusicalBand>("musical-bands")
  .flatMap("band-member-names-to-band-name") { _: String, band: MusicalBand ->
    band.memberNames.map { memberName -> memberName to band.name }
  }
  .groupByKey(groupedAs("map-of-band-member-to-band-names"))

kotka-streams-framework

A light framework for structuring topics and records.

  implementation("dev.adamko.kotka:kotka-streams-framework:$kotkaVersion")

Use TopicRecord to standardise the data on each topic. Records can now easily be converted from one type, to another.

import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
import dev.adamko.kotka.topicdata.*

data class Animal(
  val id: Long,
  val name: String,
) : TopicRecord<Long> {
  override val topicKey: Long by ::id
}

data class Pet(
  val id: Long,
  val name: String,
) : TopicRecord<Long> {
  override val topicKey: Long by ::id
}

val petUpdates = builder.stream<Long, Animal>("animals")
  .mapTopicRecords("convert-animals-to-pets") { _, animal ->
    Pet(animal.id, animal.name)
  }

Use KeyValueSerdes<K, V> to define both the key and value serdes for a topic. A TopicDefinition<K, V> ties both of these together.

/** All [Pet] updates */
object PetUpdatesTopic : TopicDefinition<Long, Animal> {
  override val topicName = "pet-updates"
  override val serdes = KeyValueSerdes(Serdes.Long(), PetSerde())
}

petUpdates
  .to(
    PetUpdatesTopic.topicName,
    PetUpdatesTopic.serdes.producer("send-pet-updates-to-pet-update-topic")
  )

kotka-streams-kotlinx-serialization

Use Kotlinx Serialization for topic key/value serdes.

implementation("dev.adamko.kotka:kotka-streams-kotlinx-serialization:$kotkaVersion")
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
import dev.adamko.kotka.topicdata.*
import dev.adamko.kotka.kxs.*

val jsonMapper = Json {}

@Serializable
data class Sku(
  val sku: String
)

@Serializable
data class ShopItem(
  val id: Sku,
  val name: String,
) : TopicRecord<Sku> {
  override val topicKey: Sku by ::id
}

object ShopItemTopic : TopicDefinition<Long, ShopItem> {
  override val topicName = "shop-item-updates"
  override val serdes = KeyValueSerdes.kxsJson(jsonMapper)
}