Sample Hello World Kafka Consumer with ZIO
sumeetpri opened this issue · 2 comments
I am trying to build a Kafka consumer and run using zio.App , run method. I am getting type mismatch error with zio environment. I am new to the scala effect system and looking for information to fix the below error and understand how greyhound takes environment.
import zio._
import com.wixpress.dst.greyhound.core.consumer.{RecordConsumer, RecordConsumerConfig}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription, RecordHandler}
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import zio.console._
object MyApp extends zio.App {
val bootstrapServer = "localhost:9092"
val initialTopics = Set("some-topic")
val group = "some-consumer-group-id"
type Env = ZEnv with GreyhoundMetrics with Console
val handler: RecordHandler[Any, Nothing, Chunk[Byte], Chunk[Byte]] = RecordHandler { record =>
print(record.value)
ZIO.succeed(true)
}
override def run(args: List[String]): URIO[zio.ZEnv,ExitCode] = {
val consumer:ZIO[Any, Throwable, Nothing] =
RecordConsumer.make[Any,Throwable](RecordConsumerConfig(bootstrapServer, group, ConsumerSubscription.Topics(initialTopics)), handler).useForever
consumer.exitCode
}
}
hey @sumeetpri , thanks for reporting your issue!
RecordConsumer
needs to be provided a GreyhoundMetrics
layer to compile, so it knows how to report events.
To make this code compile replace the consumer definition as follows:
val consumer = RecordConsumer.make[Any,Throwable](RecordConsumerConfig(bootstrapServer, group, ConsumerSubscription.Topics(initialTopics)), handler).useForever .provideCustomLayer(GreyhoundMetrics.liveLayer)
GreyhoundMetrics.liveLayer
is a simple slf4j implementation we provide out of the box, but you can replace it your own if needed.