Feature Request: Stream name in aws kcl consumer when MultiStreamTracker is used
gurudatta-carbon opened this issue · 4 comments
gurudatta-carbon commented
- When using MultiStreamTracker in aws kcl 2.x, how to find out from which stream the message is from. In KinesisClientRecord, there isn't a attribute for that.
- If multiple streams have different payload structures, it would be nice to know from which stream is the record from and helpful in deserializing it.
- Also not available in ShardRecordProcessor as well
- Version of kcl = 2.5.2
gurudatta-carbon commented
@stair-aws
any idea on when this can be picked up ?
are there any workarounds meanwhile for this ?
gurudatta-carbon commented
was able to achieve this via implementing shard record processor factory
private class KinesisShardProcessorFactory(private val shardRecordProcessors: List<StreamConsumer<*>>) :
ShardRecordProcessorFactory {
override fun shardRecordProcessor(): ShardRecordProcessor {
throw UnsupportedOperationException("")
}
override fun shardRecordProcessor(streamIdentifier: StreamIdentifier): ShardRecordProcessor {
return shardRecordProcessors.first { it.name == streamIdentifier.streamName() }
}
}
this can probably be updated in docs for multi stream tracker ?
zeynepsu commented
Thank you for your suggestion, we will add more clarification into our docs.
kschwarz1116 commented
I've come across this recently too. It doesn't seem like fun shardRecordProcessor()
is used if fun shardRecordProcessor(streamIdentifier: StreamIdentifier)
is defined. If that's actually the case, it would be great to update the API here so that I don't have to worry about implementing an unused function.
If it's not, then I would like to know how the KCL decides to use each function.