spring-cloud/spring-cloud-stream-binder-aws-kinesis

Current KinesisProducerConfiguration does not work as expected

kabennett opened this issue ยท 15 comments

In what version(s) of Spring Cloud Stream Binder for AWS Kinesis are you seeing this issue?

4.0.0

Describe the bug

Rather than repeat information already documented, please see the original problem and solution posted here.

Well, I didn't dive into details, but sounds like you found a solution which may be incorporated back into this project to avoid the pain similar to yours in the future.

Thank you!

You are welcome, Artem!

I probably was not clear with my previous message, so let me fix that.

Are you willing to open a PR with the fix you propose in your SO thread?
Or do you just leave it me to deal with your code as it is possible or appropriate?

Artem, I can work on a PR for you.

Hello, @kabennett !

Any chances to go an update from you?
If you cannot work on PR, just post your suggestion here and we will incorporate it back to the project.

Thanks

I have just pushed some fix on the matter.
@kothapet, would be great if you can give a try 4.0.1-SNAPSHOT with your solution.

@artembilan how do I pull the 4.0.1-SNAPSHOT into my project?

@artembilan, Yes its working fine.. Here is the output, and its using KPL from the logs.
Thanks


2023-09-08T17:50:33.741-04:00  INFO 10396 --- [           main] c.g.k.scs.ScsKinesisProj1Application     : Started ScsKinesisProj1Application in 6.736 seconds (process running for 7.397)
2023-09-08T17:50:33.747-04:00 DEBUG 10396 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2023-09-08T17:50:33.750-04:00 DEBUG 10396 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2023-09-08T17:50:33.809-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.800091] [0x000028d9][0x00007f246c5c5840] [info] [logging.cc:89] Set boost log level to info
2023-09-08T17:50:33.809-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.800153] [0x000028d9][0x00007f246c5c5840] [info] [logging.cc:179] Set AWS Log Level to INFO
2023-09-08T17:50:33.814-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.814053] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815286] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:257] Region has been successfully set to us-east-1 from user's input configuration
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815515] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:348] Setting CA path to /tmp/amazon-kinesis-producer-native-binaries/cacerts
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815544] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:383] Starting up main producer
2023-09-08T17:50:33.815-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815575] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815595] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815609] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.815622] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:62] Using default Kinesis endpoint
2023-09-08T17:50:33.816-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.816450] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.817-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.816897] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T17:50:33.817-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.817314] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T17:50:33.817-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.817639] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:62] Using default CloudWatch endpoint
2023-09-08T17:50:33.818-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.818307] [0x000028d9][0x00007f246c5c5840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T17:50:33.818-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.818779] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T17:50:33.819-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.819092] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T17:50:33.819-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.819398] [0x000028d9][0x00007f246c5c5840] [info] [kinesis_producer.cc:62] Using default STS endpoint
2023-09-08T17:50:33.820-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.820111] [0x000028d9][0x00007f246c5c5840] [info] [main.cc:394] Entering join
2023-09-08T17:50:33.820-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.820239] [0x000028d9][0x00007f24699fa6c0] [info] [kinesis_producer.cc:226] Created pipeline for stream "my-test-stream"
2023-09-08T17:50:33.925-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.924900] [0x000028d9][0x00007f24699fa6c0] [info] [pipeline.h:221] StreamARN "arn:aws:kinesis:us-east-1:xxxxxx:stream/my-test-stream" has been successfully configured, and will be used in requests including ListShards and PutRecords
2023-09-08T17:50:33.925-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:33.924979] [0x000028d9][0x00007f24699fa6c0] [info] [shard_map.cc:89] Updating shard map for stream "my-test-stream"
2023-09-08T17:50:34.041-04:00  INFO 10396 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 17:50:34.040942] [0x000028d9][0x00007f243ffff6c0] [info] [shard_map.cc:151] Successfully updated shard map for stream "my-test-stream" (arn: "arn:aws:kinesis:us-east-1:xxxxxxxxx:stream/my-test-stream"). Found 4 shards
mySupplier creating payload 502
mySupplier creating payload 214
mySupplier creating payload 547
mySupplier creating payload 109
mySupplier creating payload 469
mySupplier creating payload 441
mySupplier creating payload 155

@artembilan it works with AWS kinesis. However I am getting the same error when using localstack.. Am I missing something. Do you have a full example for localstack/localstack#507 of working localstack version

2023-09-08T20:24:01.489-04:00  INFO 31372 --- [           main] c.g.k.scs.ScsKinesisProj1Application     : Started ScsKinesisProj1Application in 5.221 seconds (process running for 5.732)
2023-09-08T20:24:01.493-04:00 DEBUG 31372 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2023-09-08T20:24:01.496-04:00 DEBUG 31372 --- [           main] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2023-09-08T20:24:01.548-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.537337] [0x00007ac8][0x00007f39f8bc0840] [info] [logging.cc:89] Set boost log level to info
2023-09-08T20:24:01.548-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.537408] [0x00007ac8][0x00007f39f8bc0840] [info] [logging.cc:179] Set AWS Log Level to INFO
2023-09-08T20:24:01.549-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.549447] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.552-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550532] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:257] Region has been successfully set to us-east-1 from user's input configuration
2023-09-08T20:24:01.552-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550711] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:348] Setting CA path to /tmp/amazon-kinesis-producer-native-binaries/cacerts
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550727] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:383] Starting up main producer
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550748] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550765] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550776] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.550788] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:185] Using Kinesis endpoint localhost:4567
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551624] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.553-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551645] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551655] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.551667] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:200] Using CloudWatch endpoint localhost:4567
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552020] [0x00007ac8][0x00007f39f8bc0840] [info] [AWS Log: WARN](ClientConfiguration)Retry Strategy will use the default max attempts.
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552061] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:111] Using Region: us-east-1
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552076] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:134] Using per request threading model.
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552091] [0x00007ac8][0x00007f39f8bc0840] [info] [kinesis_producer.cc:62] Using default STS endpoint
2023-09-08T20:24:01.554-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.552736] [0x00007ac8][0x00007f39f8bc0840] [info] [main.cc:394] Entering join
2023-09-08T20:24:01.557-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.556309] [0x00007ac8][0x00007f39f5ffa6c0] [info] [kinesis_producer.cc:226] Created pipeline for stream "my-test-stream"
2023-09-08T20:24:01.661-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661036] [0x00007ac8][0x00007f39f5ffa6c0] [info] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 'InvalidClientTokenId': The security token included in the request is invalid.
2023-09-08T20:24:01.661-04:00  WARN 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661072] [0x00007ac8][0x00007f39f5ffa6c0] [warning] [AWS Log: ERROR](AWSXmlClient)HTTP response code: 403
Resolved remote host IP address: 67.220.245.43
Request ID: adeca5d6-70ab-438a-8d93-01906ea5eaf4
Exception name: InvalidClientTokenId
Error message: The security token included in the request is invalid.
4 response headers:
content-length : 306
content-type : text/xml
date : Sat, 09 Sep 2023 00:24:01 GMT
x-amzn-requestid : adeca5d6-70ab-438a-8d93-01906ea5eaf4
2023-09-08T20:24:01.661-04:00  INFO 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661131] [0x00007ac8][0x00007f39f5ffa6c0] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
2023-09-08T20:24:01.662-04:00 ERROR 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : [2023-09-08 20:24:01.661181] [0x00007ac8][0x00007f39f5ffa6c0] [error] [pipeline.h:228] Failed to get StreamARN using STS GetCallerIdentity | Code: InvalidClientTokenId | Message: The security token included in the request is invalid. | Request was: Action=GetCallerIdentity&Version=2011-06-15
2023-09-08T20:24:01.662-04:00  WARN 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    : terminate called after throwing an instance of 'boost::wrapexcept<boost::exception_detail::error_info_injector<boost::log::v2s_mt_posix::system_error> >'
2023-09-08T20:24:01.662-04:00  WARN 31372 --- [kpl-daemon-0003] c.a.s.k.producer.LogInputStreamReader    :   what():  Failed to set TLS value: Invalid argument
mySupplier creating payload 311
2023-09-08T20:24:02.518-04:00 ERROR 31372 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aws.outbound.KplMessageHandler@6694a721], failedMessage=GenericMessage [payload=byte[3], headers={contentType=application/json, id=a41b8fa8-9bab-6e1a-9343-dc9936275ee0, timestamp=1694219042514}]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)

@artembilan I apologize about not submitting a PR to you. I have been sick lately and am just now getting back to feeling somewhat normal again. In fact, today I was working on a PR for you when I saw you had made progress on it. Again, I apologize for not working on this sooner.

I just submitted a PR for you with the changes I made to get the solution working with STS for our team.

Sorry, we don't support here AWS KPL project, neither Localstack.
It is better to ask such a question in their community.
I only can speculate on what I've seen so far in the mentioned localstack/localstack#507:

So now setting .setVerifyCertificate(false) on the KLP is enough.

Therefore I suggest you to have some fake bean in the test configuration to disable that option on our auto-configured KinesisProducerConfiguration:

@Bean 
String kinesisProducerConfigurationAdjuster(KinesisProducerConfiguration kinesisProducerConfiguration) {
    kinesisProducerConfiguration.setVerifyCertificate(false);
    return null;
}

Of course, you need to be sure to use the latest Localstack.

You also can look into our https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis/src/test/java/org/springframework/cloud/stream/binder/kinesis/LocalstackContainerTest.java to see how we expose Localstack ENV into auto-configuration properties.

Yes I did follow the instructions on the other thread and did the .setVerifyCertificate(false) like below and using the latest localstack, but the issue persists.
I raised an issue here at localstack repo localstack/localstack#9110 and awslabs/amazon-kinesis-producer#532

	@Bean
	public KinesisProducerConfiguration kinesisProducerConfiguration()  {
			return new KinesisProducerConfiguration()
					.setCredentialsProvider(new DefaultAWSCredentialsProviderChain())
					.setRegion("us-east-1")
					.setStsEndpoint("localhost")
					.setStsPort(4567)
					.setKinesisEndpoint("localhost")
					.setKinesisPort(4567)
					.setCloudwatchEndpoint("localhost")
					.setCloudwatchPort(4567)
					.setVerifyCertificate(false);
	}	

Actually I figured it out. Instead of bean returning the KinesisProducer, return the KinesisProducerConfiguration.
Hopefully this helps someone in the future.

This works

	@Bean
	public KinesisProducerConfiguration kinesisProducerConfiguration() {
		System.out.println("**** KinesisProducerConfiguration ***");
		return new KinesisProducerConfiguration().setCredentialsProvider(getCredentialsProvider())
				.setKinesisEndpoint("localhost").setKinesisPort(4566)
				.setStsEndpoint("localhost").setStsPort(4566)
				.setCloudwatchEndpoint("localhost").setCloudwatchPort(4566)
				.setRegion(region).setVerifyCertificate(false);
	}