fd4s/fs2-kafka

withCreateConsumer and withCreateProducter methods have been removed

noelwelsh opened this issue · 13 comments

How does one use Kafka's MockConsumer and MockProducer with fs2-kafka?

The documentation (e.g. for ConsumerSettings) mentions withCreateConsumer and withCreateProducer but these methods have been removed. This commit removed the wtihCreateConsumer method from ConsumerSettings, and this commit removed the withCreateProducer method from ProducerSettings.

What is the replacement method? Thanks!

Have you tried with com.evolutiongaming.skafka.producer.Producer#fromProducerJ2 and com.evolutiongaming.skafka.consumer.Consumer#fromConsumerJ1?

I'm happy to work on this issue if someone can tell me what the design should be.

Sorry for the slow response to this - good question, I don't think we really thought about this when making the changes you've linked to. I don't have all the context on this currently in my head but it looks like you'd need to override the default instances of MkConsumer/MkProducer with custom instances that create instances of the relevant mock classes. It would be good to have those in a testkit module and we definitely need to update the docs. I'd be happy to take a PR on this - is that enough to go on?

I'll work on something and then we can discuss in a PR.

Umm ... the build doesn't work out of the box due to this:

ThisBuild / latestVersion := tlLatestVersion.value.getOrElse(
  throw new IllegalStateException("No tagged version found")
)

Any suggestions?

That's surprising, I haven't seen that problem - I'll try checking out a fresh copy of the repo later and see if I can reproduce this

In the meantime you can probably just type a value in to be able to start hacking?

👍

I've made a bit of progress. Perhaps you can take a look and tell me if this is keeping with what you're thinking of:

https://github.com/noelwelsh/fs2-kafka/tree/feature/public-mk-consumer

I'm having a lot of trouble testing this code. The basic problem seems to be that the MockConsumer is not assigned to any of the partitions. I've tried a lot of variations on the code below, which is following the pattern of existing tests. Any insights here would be appreciated!

    it("should use the MkConsumer instance in scope") {
      withTopic { topic =>
        createCustomTopic(topic, partitions = 3)
        val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.NONE)

        implicit val mockMkConsumer = MkConsumer.fromKafkaByteConsumer[IO](mockConsumer)

        val consumed =
          KafkaConsumer[IO]
            .stream(consumerSettings[IO])
            .subscribeTo(topic)
            .evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
            .evalTap(
              _ =>
                IO.delay {
                  // mockConsumer.assign(java.util.Collections.singleton(new TopicPartition(topic, 0)))
                  // val partitionInfo = mockConsumer.partitionsFor(topic).get(0)
                  val record = new clients.consumer.ConsumerRecord(
                    topic,
                    0,
                    0L,
                    "Hello".getBytes(),
                    "Kafka".getBytes()
                  )
                  mockConsumer.addRecord(record)
                }
            )
            .records
            .map(committable => committable.record.key -> committable.record.value)
            .interruptAfter(10.seconds)
            .compile
            .toVector
            .unsafeRunSync()

        consumed.size shouldEqual 1
        consumed(0) shouldEqual "Hello" -> "Kafka"
      }
    }

I've tried playing around with creating custom MkConsumer and MkProducer instances in the code I'm trying to update. I'm afraid to say I think this new design is really the wrong thing.

The settings don't encapsulate all the settings any more. Previously the ConsumerSettings and ProducerSettings would encapsulate all the details needed to create a consumer or producer. Now the settings are split across the settings and the MkConsumer and MkProducer instances. So now I have to track down all the points of use of the settings and make sure the right instances are also used.

MkConsumer and MkProducer are not unique for a given type, as clearly shown by this issue, and therefore are not good candidates for type classes.

@noelwelsh that's a fair point about the ergonomics (though the traits are intended as capability traits rather than type classes so I don't see non-uniqueness as an argument against them in itself). The motivation for introducing the MkX traits was to decouple the effect type of consumer/producer creation from the effect type of the consumer/producer itself - see #588. One other way I can see to do this is reintroduce withCreateX but with type () => Consumer etc instead of F[Consumer] - what do you think?

As another option, would it help if there were an option to explicitly pass a MkX instance when creating an consumer/producer?

Umm ... the build doesn't work out of the box due to this:

ThisBuild / latestVersion := tlLatestVersion.value.getOrElse(
  throw new IllegalStateException("No tagged version found")
)

Any suggestions?

I'm unable to reproduce this when cloning the repo in the normal way:

git clone git@github.com:fd4s/fs2-kafka.git
cd fs2-kafka
sbt compile

Is there anything unusual about your development environment that might cause the tags to be missing?

Sorry about the delay. I don't get time to work on this most of the week.

Summary: I think the best solution is to add a method to pass a MkX instance when creating the settings.

Here's my reasoning:

  • The root problem is the consumer (or producer; but I'm just going talk about consumers to keep it simpler) may be created with a different effect type than the settings.

  • Scala 2 functions cannot have type parameters, so we cannot create a builder method on ConsumerSettings that accepts a function to build the consumer.

  • MkConsumer is essentially a function parameterised by an effect type, solving that issue.

  • The issue with the current design is that the settings are split across the ConsumerSettings and the MkConsumer, which must both be provided at the point where we create a consumer.

  • We want to encapsulate all the settings in a single object, and hence should move to providing a MkConsumer instance at the point where we construct the ConsumerSettings.

So the best way forward seems:

  • ConsumerSettings includes a MkConsumer field
  • this field defaults to the default MkConsumer (implicit) value
  • there is a builder method to change the value to a custom value
  • there is a builder method to use the MockConsumer

I don't think there is much value in keeping MkConsumer as an implicit value in this design.