spring-cloud/spring-cloud-stream-binder-aws-kinesis

consumer stops consuming events and throws KinesisMessageDrivenChannelAdapter : The lock for key 'xxxxxxxx:shardId-00000000000X' was not renewed in time

malvine opened this issue ยท 43 comments

Hi,

I've got interesting use case and it's very simple to replicate.
I am using the latest binder version.

My setup:

  • One Kinesis stream with two shards.
  • One consumer app with 3 instances let's call them instance1, instance2, instance3.
  • Latest binder version.

To replicate:

  1. Start the consumer: usually one instance gets all both shards as owner. Let's say instance1 gets all the shards. So far so good. Instance consuming events.

  2. Restart the active consuming instance e.g. instance1. Instance2 will get now shard-0 and instance3 gets shard-1 as owner. (This step might take couple of attempts as sometimes instance2 or instance3 gets all the shards. Keep trying until you get one shards per instance)

  3. Once you got instance1 - doing nothing; instance2 owner of shard-0 and instance-3 owner of shard-1, that's when the instances stop consuming any events and both of them have this in the logs every 30 seconds:


.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'xxxxxxxx:shardId-000000000002' was not renewed in time java.util.concurrent.TimeoutException: null 	at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(Kin
esisMessageDrivenChannelAdapter.java:1030) ~[spring-integration-aws-2.4.0.jar:na] 
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMes
sageDrivenChannelAdapter.java:946) ~[spring-integration-aws-2.4.0.jar:na] 
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:856) ~[spring-integration-aws-2.4.0.jar:na 

The only way to stop this is to restart one of the consuming instances so all the shards are back to one instance.

The binder version 2.0.1.RELEASE doesn't have this issue.
Pretty sure it's not even the binder issue - it's probably integrations-aws lib issue.

Thanks.

I think there must be some other logs related to the lock processing.
Something like:

logger.error(ex, () -> "Error during locking: " + lock);

Would you mind to share that one with us as well?

Also: is there a chance to use really the latest 2.2.0 Kinesis binder version: https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kinesis ?

The spring-integration-aws-2.4.0 is not the latest one...

yeah, sorry was trying with 2.1.0 thought if that one doesn't have the issue.
ok. with the latest version 2.2.0 get exactly the same.

No error before that or after. just that endlessly on both instances.

2022-05-11 17:39:28.046  INFO 99193 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49629266667711709536924126896771570517185546512247029778', timestamp=null, stream='xxxxxx', shard='shardId-000000000001', reset=false}, state=NEW}] has been started.
2022-05-11 17:39:39.020  INFO 99193 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'xx:xxxxxx:shardId-000000000001' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1950) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2085) ~[na:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) ~[spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) ~[spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) ~[spring-integration-aws-2.5.1.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

OK. Any chances that you can share a simple project to reproduce?
Is it possible for me to run it against LocalStack then?

Yeah, I did put a quick project together and seems like on LocalStack you can easy to replicate the issue. Once you get shards being consumed by not one instance but two.
Here is the link

Hi @malvine !

Thank you for the sample.
I definitely could reproduce it locally with my Docker.
It's impossible to debug it, but at least I see where we are heading.
So, I guess we can explain the problem like this:
As long as consumer doesn't take all the shards ownership from the stream, it doesn't consume even from a shard it owns, neither update the lock for the shard in DynamoDb.
I mean it looks like lock manager is broken for this partial situation somehow...

Will look into code today.

Well, I'm not sure, but it looks like the problem is here:


    /**
     * Setting this flag to true will prevent the thread from being blocked (put to sleep) for the lease duration and
     * instead will return the call with the lock not granted exception back to the caller. It is up to the caller to
     * optionally back-off and retry and to acquire the lock.
     */
    private final boolean shouldSkipBlockingWait;

Which we don't set in our DynamoDbLockRegistry and then in the KinesisMessageDrivenChannelAdapter.ShardConsumerManager we do block on this line:

if (lock.tryLock()) {
	this.locks.put(key, lock);
}

So, if another instance is holder for the lock, we don't return immediately as it states for the tryLock() contract, but rather step into a busy-wait loop in the AmazonDynamoDBLockClient.acquireLock().

Does it make sense?

I'll try to fix in my local Spring Integration AWS copy and re-run your solution to verify.

So, yeah... My assumption and investigation is correct.
The tryLock() contraction must be honored in the DynamoDbLockRegistry.
Otherwise the AmazonDynamoDBLockClient steps into a busy-wait loop and never fulfill a Future for the lock.

I'm going to push the fix today, but if you need a release for Spring Integration AWS 2.5.2, then I can do that tomorrow.

I'm moving this to Spring Integration AWS project though...

@artembilan thank you very much!

Did this get resolved? I'm running into this exact issue - even when upgrading Spring Integration AWS to 2.5.4.

I was even still able to reproduce it with the quick project that @malvine made using Spring Integration AWS to 2.5.4.

@postalservice14 ,

can you share, please, an updated sample project to see what is going on?
I mean how are you sure that you have updated Spring Integration AWS to 2.5.4?
There is no explicit dependency for that in the mentioned project.
Not a fact that it is not overridden transitively by Kinesis Binder provided.

@artembilan thanks for the quick reply! Here's a fork of his project upgraded to 2.5.4 (you can see the latest commit). And I can still get it to happen using his instructions in the readme.

https://github.com/postalservice14/kinesis-lock-was-not-renewed-in-time

@postalservice14 ,

any chances that you can upgrade your solution (not that sample app) to the latest Spring Boot and org.springframework.cloud:spring-cloud-stream-binder-kinesis:3.0.0 ?
There is no @EnableBinding and friends any more in Spring Cloud Stream: everything has to be mapped via java.util.function primitives.

I'll try to reproduce over weekend in a simple unit test with Testcontainers.
I also don't see a reason in a producer side for that sample: according to the issue description we got a problem just with shards assignments - no need in any data!

Well, I'd prefer to have a new GH issue with fresh, actual and relevant info instead of trying to resurrect this issue against out-dated versions.
Thanks for understanding!

OK. Was able to reproduce locally as a parallel unit test against Testcontainers with LocalStack image.
Reopening.
Not sure what to do yet...

Found the problem.
When we have several instances competing for shards we stuck on the if (lock.tryLock()) { exactly for leaseDuration on the lock which does not belong to us.
This is not OK and must be fixed this or that way since lock.tryLock() contract must return immediately if no way to lock at that moment.
I think there is something wrong in the AmazonDynamoDBLockClient by itself, but I'll see how that can be mitigated on our side.

Thank you for the patience!

So, the code in that Lock client looks like this:

   /*
                         * Someone else has the lock, and they have the lock for LEASE_DURATION time. At this point, we need
                         * to wait at least LEASE_DURATION milliseconds before we can try to acquire the lock.
                         */
                        lockTryingToBeAcquired = existingLock.get();
                        if (!alreadySleptOnceForOneLeasePeriod) {
                            alreadySleptOnceForOneLeasePeriod = true;
                            millisecondsToWait += existingLock.get().getLeaseDuration();
                        }

I really doubt that it is OK to block it that long period.
But we just don't have a choice since that's how that client works.

Some related discussion is here: spring-projects/spring-integration-aws#219 which leads us to the conclusion that we cannot use shouldSkipBlockingWait since isExpired is not updated on the item.

I don't know yet what is the proper fix must be done, perhaps a combination of getLock() and then tryAcquireLock() to avoid leaseDuration busy-spin loop, but for now a workaround is like this:
the leaseDuration must be less than lockRenewalTimeout on the KinesisMessageDrivenChannelAdapter, which is a 10 seconds by default.

Right. See here for a bug on AWS SDK side: awslabs/amazon-dynamodb-lock-client#44

So, I have just made some workaround fix in the DynamoDbLockRegistry to make a time to wait as negative.
The Lock client then adds a leaseDuration to realign the timeout with our tryLock() expectations.
Give a try to the latest 2.5.5-SNAPSHOT for your solution!

@artembilan I'm having the same issue, can you provide a URL of the repository that contains 2.5.5-SNAPSHOT?

@artembilan version 2.5.5 doesn't have those messages about the locks, but I now have another issue.

If my app is not released the locks properly (like JVM is dies or k8s rolling restart) then there are old locks that stay in the lock table, and those locks are not released on the next app startup.
This prevents the app from consuming messages after restart, it just waits indefinitely.
If I manually remove locks from the table it starts to consume messages.

I've tested with the older version and the old locks are not an issue there, it just waits for some time and locks get released automatically.

Yeah... i think we are facing this one: awslabs/amazon-dynamodb-lock-client#79.
Kinda there is no info in the lock DynamoDB record that it was not used for a while.

It's probably worth to try with much shorter leaseDuration.

You know I have just implemented our own DynamoDbLockRepository: spring-projects/spring-integration-aws@594ea58.
It does properly set a TTL attribute which may calculates the time in the future when item has to be expired.
This should work nicely with DynamoDB TTL feature: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html.

I still have some changes to be done on the binder level to start a new 4.0.0 version based on that breaking change in Spring integration AWS 3.0.0.

That sounds awesome, I've tried to play with leaseDuration however, I got controversial results.
Please keep posted about the progress.
Thanks @artembilan

So, here is a Kinesis Binder 4.0.0-SNAPSHOT for your consideration: https://repo.spring.io/snapshot/org/springframework/cloud/spring-cloud-stream-binder-kinesis/4.0.0-SNAPSHOT.
Note: if you use custom values for partitionKey, sortKeyName, sortKey and heartbeatPeriod, they are not valid any more.
See DynamoDbLockRepository Javadocs for a new table structure.

What are the expected spring-cloud-stream and spring-boot versions for spring-cloud-stream-binder-kinesis/4.0.0-SNAPSHOT ?

4.0.x and 3.0.x. Java 17.
Technically everything has to be pulled transitively when you just use:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
            <version>4.0.0-SNAPSHOT<version>
        </dependency>

With 4.0.0-SNAPSHOT I'm getting error:
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: Missing the key sortKey in the item (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: 9KHOJDPV4HCGFD19NIDUV9PU7RVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)

works fine with 3.0.0

I guess you have to remove your SpringIntegrationLockRegistry table from DynamoDB and let the framework to create a fresh one for you.
As I said before: there is no that sortKey any more involved in a new DynamoDbLockRepository model.
The Spring Integration AWS must be 3.0.0-SNAPSHOT or just managed transitively by the spring-cloud-stream-binder-kinesis dependency.

I see strange behavior with new version locks disappearing almost instantly, difference between created and TTL is 10 ms?
Is there any additional configuration required?
image

Well, that is in Epoch Seconds.
By default it is public static final Duration DEFAULT_LEASE_DURATION = Duration.ofSeconds(10);
So, what you show is correct - 10 seconds difference between create time and TTL.
Probably that is not a surprise that DynamoDB TTL feature cleans them up so quickly.

However the KinesisMessageDrivenChannelAdapter must renew them every second...

Oh! I see. There is a bug ๐Ÿ˜„

I do this over there:

				new UpdateItemSpec()
							.withPrimaryKey(KEY_ATTR, lock)
							.withUpdateExpression("SET " + TTL_ATTR + " = :ttl")
							.withConditionExpression(LOCK_EXISTS_EXPRESSION)
							.withValueMap(ownerWithCurrentTimeValues());

	private ValueMap ownerWithCurrentTimeValues() {
		ValueMap valueMap =
				new ValueMap()
						.withNumber(":ttl", currentEpochSeconds());
		valueMap.putAll(this.ownerAttribute);
		return valueMap;
	}

where new ttl must be plus leaseDuration.
On it.

Please, try now with -U for Maven or --refresh-dependencies for Gradle: the fresh Spring Integration AWS SNAPSHOT is already there.

Thank you for assisting in testing!

Thanks for the quick feedback!

I still see locks are not live for more than a second.
As with implementation without TTL, I expect the current implementation to have numberOfStreams*numberOfShards records in the table, but I may catch only 1 or 2 at the same time, and it disappears much faster than 10 seconds, almost instantly.

Right. The number of records really must be equal to the number of active shard consumers.
In my case that is a single stream with two shards and I saw a proper distribution in concurrent restart.

Any advice what tool I can use locally in Windows to browse DynamoDB in Docker container?
Or probably Localstack comes with some CLI tool to sneak in?
I kinda don't not how to be sure in DB content unless I confirm the stuff with my unit tests against Localstack in Docker ๐Ÿ˜„

You may try to use awscli, but you will need to set it up somehow to look into your docker dynamodb.

aws dynamodb scan --table-name yourTableName

We may have a pair session if it helps.

FYI I'm running java service locally using real AWS kinesis/dynamodb.

So, I have this test:

	@Test
	public void testLockRenew() {
		final Lock lock = this.dynamoDbLockRegistry.obtain("foo");

		assertThat(lock.tryLock()).isTrue();
		try {
			this.dynamoDbLockRepository.setLeaseDuration(Duration.ofSeconds(60));
			assertThatNoException().isThrownBy(() -> this.dynamoDbLockRegistry.renewLock("foo"));
			String ttl =
					DYNAMO_DB.getItem(DynamoDbLockRepository.DEFAULT_TABLE_NAME,
									Map.of(DynamoDbLockRepository.KEY_ATTR, new AttributeValue("foo")))
							.getItem()
							.get(DynamoDbLockRepository.TTL_ATTR).getN();
			assertThat(Long.parseLong(ttl))
					.isCloseTo(LocalDateTime.now().plusSeconds(60).toEpochSecond(ZoneOffset.UTC),
							Percentage.withPercentage(10));
		}
		finally {
			lock.unlock();
			this.dynamoDbLockRepository.setLeaseDuration(Duration.ofSeconds(2));
		}
	}

It confirms that expireAt attribute is updated respectively when we call renew().

And yes the localstack/localstack:1.2.0 includes an aws CLI. I can get into it from my Docker Desktop terminal.
Have some problems with AWS credentials yet, but that's different story.
Playing with the concurrent test for 10 shards.

And no: I don't have any AWS accounts to test against real one. ๐Ÿ˜„

OK. Something is still wrong with my renew algorithm.
With regular tryLock() it looks good and sometime I see shards distribution between different instances.

Looking further for renew problem...

So, I made some tweaks and testing.
Please, give a shot for latest Spring Integration AWS SNAPSHOT.

Note: I still could not figure out why AWS CLI in Localstack container fails with that The security token included in the request is invalid.
I did an aws configure against values from the LocalStackContainer, but it still doesn't like it. ๐Ÿคท

It looks like records are expiring rite after creation, instantly.
I see now TTL was increased from the initial 10 seconds to 60, but it doesn't affect record life at all.

Hm. So, sounds like LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) from Java is not exactly what is Epoch Seconds on AWS.
And also it feel like Localstack does not support DynamoDB TTL feature.

Would you mind to double check with your AWS env how our expireAt is compared to its current time?

You probably can disable TTL on the table for now though.
Or we can consider to make it optional when the framework creates table for us.

Here is the value that I see rite now
image
and it translates to:
GMT: Tuesday, February 28, 2023 12:21:58 PM
Your time zone: Tuesday, February 28, 2023 7:21:58 AM GMT-05:00
Relative: 5 hours ago

So it is some issue with timezone.
In my local System.currentTimeMillis() gives: 1677604766750

another observation, if I set JVM timezone to GMT it starts working as expected
like: -Duser.timezone="GMT"

OK. Have just pushed Instant.now().getEpochSecond() fix.
Give it 5-6 mins to have a SNAPSHOT published.

Looks like it works fine now, thank you!