spring-projects/spring-integration-aws

Feature Request - customise kinesis detectShardsToConsume

Closed this issue · 3 comments

I would like to customise KinesisMessageDrivenChannelAdapter#detectShardsToConsume(java.lang.String)

Currently this method returns a list of shards which will be read from, which will be all shards which are not "complete" (complete shards are closed shards with a checkpoint for the endingSequenceNumber).

However, I would like to be able to do the following two things:

  1. Exclude shards with an incomplete parent - when a stream is resharded, shards are either split or merged, and have a parent/child relationship. Records in a parent shards will be sequentially before records in their child shards. I want to be able to ensure I have completed any parent shards before starting to read their children. This would reduce throughput, but maintain some element or ordering

  2. Only read a given set of shards - I have a special case where I would like to use the adaptor to only read certain shards

I would be able to do this if KinesisMessageDrivenChannelAdapter#detectShardsToConsume(java.lang.String) was protected, or could be customised with a Function<String, List<Shard>>

Only read a given set of shards - I have a special case where I would like to use the adaptor to only read certain shards

This can be achieved at the moment with the:

public KinesisMessageDrivenChannelAdapter(
			AmazonKinesis amazonKinesis, KinesisShardOffset... shardOffsets) {

See that KinesisShardOffset. So, you can provide which shard and from what offset to read.

For the first point I can suggest something like Predicate<Shard> shardFilter option to be called when we got result for list of shards, so we will be able to check every shard from the stream for any business purpose.

Feel free to raise a PR and we can take a look together!

For the first point I can suggest something like Predicate<Shard> shardFilter option to be called when we got result for list of shards, so we will be able to check every shard from the stream for any business purpose.

The thing is, I want to filter based on the other shards. It seems a bit obtuse to use a BiPredicate<Shard, List<Shard>> like this:

		// find all open shards which *don't* have an open parent
		BiPredicate<Shard, List<Shard>> include = (Shard shard, List<Shard> shards) -> {
			String parentShardId = shard.getParentShardId();
			if (parentShardId != null) {
				return shards.stream()
						.map(Shard::getParentShardId)
						.noneMatch(parentShardId::equals);
			}
			return true;
		};

It seems nicer to work with a Function<List<Shard>, List<Shard>>. Is the concern that the user may return a list of Shards which may contain invalid items?

Here is a PR #180