This repository is an adaptation from a WIP article from @rlecomte about Free Monad for process management. This article is really cool and interesting BTW and I will update this repo when it will be available to link it here.
The article is in Scala and even if I like the language I was thinking that it could be done in Kotlin (but uglier). Cause I like to hurt myself I decided to do it and this is the result
You can run the project with ./gradlew run
Use Arrow-fx Resource instead of ZManaged
val stack = startZk()
.zip(startKafka(), startSchemaRegistry())
{ zk, kafka, schema ->
Triple(zk, kafka, schema)
}
stack.use { (zk, kafka, schema) ->
println("Cool stuff bro")
}
Output :
[IO] Start ZooKeeper
[IO] Start Kafka
[IO] Start Schema
Cool stuff bro
[IO] Stop Schema
[IO] Stop Kafka
[IO] Stop ZooKeeper
This part needed to reuse Lightweight higher-kinded polymorphism previously provided by Arrow. I also chose to use new Kotlin feature of context receiver to use this to avoid witness by argument.
The example provide two interpreter for Free[Step, StackConfig]
, one as List<Event>
and another using Flow
(We needed to create an integration between Kotlin Flow
and Arrow Resource
).
with(Step.functor()) {
with(freeMonad()) {
val t = startZkStep().liftF()
.flatMap { zkRuntime ->
startKafkaStep().liftF()
.flatMap { kafkaRuntime ->
startSchemaRegistryStep()
.map { schemaRuntime ->
Triple(zkRuntime, kafkaRuntime, schemaRuntime).also(::println)
}
.liftF()
}
}
println(t)
println("----- List -----")
var listInterruptChannel = Channel<Boolean>()
launch {
delay(5.seconds)
listInterruptChannel.send(true)
}
runList(t, listInterruptChannel)
.forEach(printEvent)
}
}
Output :
[IO] Start ZooKeeper
[IO] Start Kafka
[IO] Start Schema
(ZkRuntime(port=8081), KafkaRuntime(port=8090), SchemaRuntime(port=9092))
[IO] Stop Schema
[IO] Stop Kafka
[IO] Stop ZooKeeper
[Event] StartZookeeker@48503868
[Event] StartKafka@5891e32e
[Event] StartSchema@cb0ed20
[Event] StackStarted(config=(ZkRuntime(port=8081), KafkaRuntime(port=8090), SchemaRuntime(port=9092)))
[Event] StopSchema@8e24743
[Event] StopKafka@74a10858
[Event] StopZookeeker@6895a785
with(Step.functor()) {
with(freeMonad()) {
val t = startZkStep().liftF()
.flatMap { zkRuntime ->
startKafkaStep().liftF()
.flatMap { kafkaRuntime ->
startSchemaRegistryStep()
.map { schemaRuntime ->
Triple(zkRuntime, kafkaRuntime, schemaRuntime).also(::println)
}
.liftF()
}
}
println(t)
var streamInterruptChannel = Channel<Boolean>()
launch {
delay(5.seconds)
streamInterruptChannel.send(true)
}
runStream(t, streamInterruptChannel)
.collect(printEvent)
}
}
Output :
[Event] StartZookeeker@48503868
[IO] Start ZooKeeper
[Event] StartKafka@5891e32e
[IO] Start Kafka
[Event] StartSchema@cb0ed20
[IO] Start Schema
(ZkRuntime(port=8081), KafkaRuntime(port=8090), SchemaRuntime(port=9092))
[Event] StackStarted(config=(ZkRuntime(port=8081), KafkaRuntime(port=8090), SchemaRuntime(port=9092)))
[IO] Stop Schema
[Event] StopSchema@8e24743
[IO] Stop Kafka
[Event] StopKafka@74a10858
[IO] Stop ZooKeeper
[Event] StopZookeeker@6895a785