wix/greyhound

Sample Hello World Kafka Consumer with ZIO

sumeetpri opened this issue · 2 comments

I am trying to build a Kafka consumer and run using zio.App , run method. I am getting type mismatch error with zio environment. I am new to the scala effect system and looking for information to fix the below error and understand how greyhound takes environment.

image

import zio._
import com.wixpress.dst.greyhound.core.consumer.{RecordConsumer, RecordConsumerConfig}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription, RecordHandler}
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import zio.console._

  object MyApp extends zio.App {
  val bootstrapServer = "localhost:9092"
  val initialTopics = Set("some-topic")
  val group = "some-consumer-group-id"
  type Env = ZEnv with GreyhoundMetrics with Console

  val handler: RecordHandler[Any, Nothing, Chunk[Byte], Chunk[Byte]] = RecordHandler { record =>
    print(record.value)
    ZIO.succeed(true)
  }

  override def run(args: List[String]): URIO[zio.ZEnv,ExitCode] = {

 val consumer:ZIO[Any, Throwable, Nothing] =
    RecordConsumer.make[Any,Throwable](RecordConsumerConfig(bootstrapServer, group, ConsumerSubscription.Topics(initialTopics)), handler).useForever
    consumer.exitCode
  }
}

hey @sumeetpri , thanks for reporting your issue!
RecordConsumer needs to be provided a GreyhoundMetrics layer to compile, so it knows how to report events.

To make this code compile replace the consumer definition as follows:
val consumer = RecordConsumer.make[Any,Throwable](RecordConsumerConfig(bootstrapServer, group, ConsumerSubscription.Topics(initialTopics)), handler).useForever .provideCustomLayer(GreyhoundMetrics.liveLayer)

GreyhoundMetrics.liveLayer is a simple slf4j implementation we provide out of the box, but you can replace it your own if needed.

Thank you @berman7, it worked 👍