awslabs/amazon-kinesis-client

Feature Request: Stream name in aws kcl consumer when MultiStreamTracker is used

gurudatta-carbon opened this issue · 4 comments

  • When using MultiStreamTracker in aws kcl 2.x, how to find out from which stream the message is from. In KinesisClientRecord, there isn't a attribute for that.
  • If multiple streams have different payload structures, it would be nice to know from which stream is the record from and helpful in deserializing it.
  • Also not available in ShardRecordProcessor as well
  • Version of kcl = 2.5.2

@stair-aws
any idea on when this can be picked up ?

are there any workarounds meanwhile for this ?

was able to achieve this via implementing shard record processor factory

private class KinesisShardProcessorFactory(private val shardRecordProcessors: List<StreamConsumer<*>>) :
    ShardRecordProcessorFactory {

    override fun shardRecordProcessor(): ShardRecordProcessor {
        throw UnsupportedOperationException("")
    }
    override fun shardRecordProcessor(streamIdentifier: StreamIdentifier): ShardRecordProcessor {
        return shardRecordProcessors.first { it.name == streamIdentifier.streamName() }
    }
}

this can probably be updated in docs for multi stream tracker ?

Thank you for your suggestion, we will add more clarification into our docs.

I've come across this recently too. It doesn't seem like fun shardRecordProcessor() is used if fun shardRecordProcessor(streamIdentifier: StreamIdentifier) is defined. If that's actually the case, it would be great to update the API here so that I don't have to worry about implementing an unused function.

If it's not, then I would like to know how the KCL decides to use each function.