harlow/kinesis-consumer

ShardIteratorType - LATEST

Closed this issue · 4 comments

I see ShardIteratorType is hardcoded to "TRIM_HORIZON" in case of no sequence number is specified.

Is there a way I can use the "LATEST" ShardIteratorType along with SkipCheckpoint: true option?

This seems works well in order to fetch the latest records without using a checkpoint. I assume I missed an important point why should I use TRIM_HORIZON with checkpoint instead of this simple solution. Correct me if so, Thanks.

Hi @servocoder good question.

I don't have a good reason for it. It's just the way i've used the library in the past. As you mentioned having a configurable ShardIteratorType might be something worth exploring.

It's something we could potentially add as an additional config option:

// ShardIteratorType overrides the starting point for the consumer
func With ShardIteratorType(s string) Option {
	return func(c *Consumer) {
		c.initialShardIteratorType = s
	}
}

and then here:
https://github.com/harlow/kinesis-consumer/blob/master/consumer.go#L257-L262

if lastSeqNum != "" {
		params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
		params.StartingSequenceNumber = aws.String(lastSeqNum)
} else {
		params.ShardIteratorType = aws.String(c.initialShardIteratorType)
}

So this would set the params.ShardIteratorType = c. initialShardIteratorType on the first loop, and then on future pages it would use the lastSeqNumber to continue paging

I feel like this would be nice functionality for the library; feel free to submit a PR, or if you'd like me to get this added I can probably do it over the weekend.

Your example is the exact way I was thinking to implement it, cool!

It would be nice to get the update on Monday, I don't think I could make it better than you.

Thanks in advance

Works like a charm, thanks!

Would be nice to get it documented and released.