Commit after all connected multicasted observables are finished processing a record
yeenow123 opened this issue · 4 comments
I am creating multiple observables (per Kafka topic)
val kafkaConsumerConfig = KafkaConsumerConfig.default.copy(
bootstrapServers = kConfig.bootstrapServers,
groupId = kConfig.groupId,
enableAutoCommit = false,
observableCommitOrder = ObservableCommitOrder.AfterAck,
observableCommitType = ObservableCommitType.Sync,
autoOffsetReset = AutoOffsetReset.Latest
)
val observable = Observable.merge(topics.map {
provider => KafkaStream
.creatConsumer(provider)
} : _*)
val multiCast = observable.multicast(Pipe.publish[ConsumerRecord[A, B]])
... // multiple subscribers subscribe separately to the multiCast
multicast.connect()
It doesn't seem like the commit back to Kafka is issued after all subscribers are finished processing (I am using a combination of .subscribe()
and .foreach
to subscribe. It seems to commit after any of them complete. Is this possible or am I doing anything incorrectly?
The issue seems to stem from Observable.merge
. I tried Observable.concat
and it seems to give me the functionality I need. I guess the ordering semantics between the two operators are different, one being unorder vs ordered.
EDIT: Spoke too soon.. when I use .concat
only one one of the observables seem to actually start up
The issue seems to stem from
Observable.merge
. I triedObservable.concat
and it seems to give me the functionality I need. I guess the ordering semantics between the two operators are different, one being unorder vs ordered.
That's correct, concat
will wait for the first one to complete before starting the second one and merge
will send elements as they come with time.
How do you commit / use kafkaConsumerConfig
? I dont recognize KafkaStream.creatConsumer(provider)
I copied partial code and made some typos changing names. The kafkaConsumerConfig
is passed into the KafkaConsumerObservable
constructor.
As with my EDIT on my previous comment, concat
only seems to start the first observable from the list of observables passed into concat
Probably the first Obsevable
never ends, I'll see if I can reproduce your issue