spring-projects/spring-integration-aws

DynamoDB Streams cannot list shards

Closed this issue · 8 comments

acm19 commented

I'm trying to read records from a DynamoDB Stream. I'm getting the following error:

	at com.amazonaws.services.kinesis.AbstractAmazonKinesis.listShards(AbstractAmazonKinesis.java:149)
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.readShardList(KinesisMessageDrivenChannelAdapter.java:544)
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.readShardList(KinesisMessageDrivenChannelAdapter.java:530)
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.detectShardsToConsume(KinesisMessageDrivenChannelAdapter.java:604)
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.detectShardsToConsume(KinesisMessageDrivenChannelAdapter.java:598)
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream$0(KinesisMessageDrivenChannelAdapter.java:664)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

My application.yaml config is looks like this:

cloud.aws:
  region.static: us-east-1
  stack.auto: false

spring:
  cloud:
    stream:
      bindings:
        input-in-0:
          destination: test-streams
          group: test-streams-group
      kinesis:
        bindings:
          input-in-0:
            consumer:
              dynamoDbStreams: true
              shardIteratorType: TRIM_HORIZON

The problem is that in the class KinesisMessageChannelBinder a KclMessageDrivenChannelAdapter is created and the AmazonKinesis to be used is set as follows:

AmazonKinesis amazonKinesisClient =
        kinesisConsumerProperties.isDynamoDbStreams()
                ? this.dynamoDBStreamsAdapter
                : this.amazonKinesis;

That basically means that when DynamoDB stream is being process the following class will be used to interact with it AmazonDynamoDBStreamsAdapterClient. That class doesn't support listShards operations that are later used by KinesisMessageDrivenChannelAdapter#readShardList. Seems like this problem was introduced in 10e96e2#diff-f7c5bb9b0447b4af040508dfb5f6a9fb0b23aa1c22cfa7c378aa5521dfc7f8f3R544.

Given there were problems with DescribeStream throttling (that's why the issue was introduce) it's possible to sub-class AmazonDynamoDBStreamsAdapterClient overloading listShards to use describe stream for DynamoDB but list shards for Kinesis with minimal changes. I'm happy to do the change if you are OK with that approach. The change would have to be in https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis but I have created the issue here as a change in this repo was the initial trigger for the issue.

Well, sounds more like the feature request exactly for the https://github.com/awslabs/dynamodb-streams-kinesis-adapter.
I think we indeed may have our own adapter extension in this project and use it in that Kinesis binder. This way everybody is going to gain from the feature when they deal with DynamoDB streams over Kinesis.

Feel free to raise a PR here with an optional dependency for the dynamodb-streams-kinesis-adapter, but still raise an issue in their repo to make the feature generally available. I think that DescribeStream limitation should be a great argument to convince them in such an improvement.

Thank you.
Looking forward for your contribution!

acm19 commented

I don't think this is a feature request for https://github.com/awslabs/dynamodb-streams-kinesis-adapter. Their class maps the AWS API and there is no listShards for a DynamoDB Stream in AWS API: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html. So, it's kind of misleading to implement that method there.

I might as well create a support ticket for AWS to ask them if they are planning to provide listShards operation for DynamoDB Streams in the near future. In the meanwhile I'll prepare a PR as soon as I can.

Thank you!

acm19 commented

Hi @artembilan,

I have create a PR, also created an issue in the adapter repo from AWS, so they can consider if it makes sense for them to add that functionality there. I've also requested AWS Support to add ListShards operations to DynamoDB Streams but that doesn't have ETA of course. Let me know what do you think?

Regards

Thanks, but looks like you misunderstood me a bit with the first point: I want to see a fix exactly in the repo, not that Binder where you have raised it.

It sounds like such a feature really could be useful for any DynamoDB stream consuming via Kinesis adapter.
That's why I suggested to add your fix into this SI-AWS repo. The Kinesis binder can that get a gain using the fixed from here.

Does it make sense?

acm19 commented

So, I raised PR in in the binder repo because:

  • There is a major functionality regression in the latest release of that repo.
  • That fix can be included to fix the issue and removed once/if AWS includes this feature (for them is a feature but for the binder is a major bug fix).
  • I suspect they might consider not merging such a thing as their adapter seems to support only methods supported by the AWS API and this one is not.
  • I have raise this issue on the AWS repo and if they are willing to merge it I'll create a PR to their repo directly: awslabs/dynamodb-streams-kinesis-adapter#39.

Unfortunately, we cannot use older versions of the binder as there are bugs fixes that force us to use the latest version. But we need to start using DynamoDB Streams, I guess that is or will be the case for other users at some point. The idea was to fix the issue asap where it is materialising and then try to persuade AWS to include it as part of their adapter lib.

Does that sound sensible to you?

Sure!
All make sense.

Will review and merge it later this week.
And we will definitely have release soon enough!

acm19 commented

Sounds good, thanks @artembilan. BTW, I think you closed this issue by mistake as it's not solved still.

If you need any change in the PR just let me know.

I closed it because the fix is not relevant to this project and according our discussion there is nothing to do exactly with this project.
Your PR is still valid and going to be applied exactly to Kinesis Binder project.