sky-uk/kafka-topic-loader

Return a source unifying preload and onGoing reading, with a milestone

Closed this issue · 2 comments

We found by using the Topic loader we were calling it with the OnRecord function and then creating a second source to take over after the Topic Loaders ends, with the same parameters and the same record handling. We think it would be helpful to have this implemented in the Library rather than to have that boilerplate code in the client. Moreover, the fact that it receives the OnRecord function which is a side effect, makes it difficult to test.

I'm happy to contribute this feature as it concerns mainly Recs atm.
The function shuld accept kakfa Configuration parameters for the client as the current TopicLoader takes, plus a function to deserialize records. Signature would look something like:
def awesomeFeature[T: Deserializer](clientConfig: Config): Source[T :+: PreloadComplete, _]
:)

i'm not really a fan of Source[T :+: PreloadComplete, _] because this tells me i have a source that will be emitting on of those two values, it doesn't really describe that i go from one state to another.

IMO the approaches we can choose from are:

  • use the completion of the topic loading stream to identify when it has completed (a Future callback)
  • use a source of sources where the inner source is emitted when the topic loading has finished, with a param to do something when the topic loading has finished

Source[Source[T], NotUsed] or something? not sure

I guess what we want for your case is one stream, that first does the topic loading and then just consumes from Kafka normally. So you could just inject the function to call inside mapMaterializedValue which atm is hardcoded to logResult. Then you'll obviously want to start the main kafka source after the loading in this new method

As discussed in person earlier, I think your use-case and the use-case in the KMS (when we take out the offset committing stuff) would be satisfied by an API like:

def loadAndRun[T](params from other methods, f: Try[Done] => Unit): Source[T, Control]

where f is a callback to be executed when the topic loading source has finished (send a message to an actor for example). The callback is attached to the Future[Done] from watchTermination

This gives a pretty clean API to the user where they just get elements out, but they can trigger something to happen when the topic loading source has finished. Internally it would do the topic loading source, then it would pass the emitted Map[TopicPartition, Long] to another alpakka Kafka source with a manual partition assignment, all flattened to a single Source[T, Control]