krasserm/streamz

`null` in `receiveBody` from AWS-KINESIS

PaulAtFormation opened this issue · 4 comments

I have a relatively simple test harness that, among other things, tries to send some JSON through Kinesis. It does this by providing a TypeConverter for Circe's Json type based on Java serialization. A property-based test of the TypeConverter passes, and sending a message succeeds, but receiveBodying returns a null body, although the headers are fine and the Camel logs show the base64-encoded body before decoding and conversion.

Also, I found it necessary to retain the SHARD_ID returned by sending and pass that through the receiveBody URL, otherwise the AWS-KINESIS component appears to sit and poll the wrong shard repeatedly. In the general case, I won't know what shard to request, and would expect receiveBodying to return any messages from all shards.

So on one hand, I may not understand Kinesis or the AWS-KINESIS component. On the other, I don't see how the receiveBody result can possibly be null.

You can run my test suite with it:test in a valid Docker environment. It relies on testcontainers-localstack for testing against AWS APIs.

Any advice would be welcome!

Thanks!
data-playground.zip

Did you try running a pure Camel application that polls from a shard and check if the message body is not null? Can you provide a Java-serialized version of the response message or exchange? This would really help a lot.

From debug logging, the response from the LocalStack container is:

2019-03-02 12:29:04 [Camel (camel-1) thread #2 - aws-kinesis://inbound] DEBUG org.apache.http.wire - http-outgoing-9 << "{"MillisBehindLatest":0,"NextShardIterator":"AAAAAAAAAAH0hGLImjSGkKtv0tW5VizJvaE5la6OrjCe+pN1URp6f+RsPnETl9LLjnbot1SSwwKyWYu/4Zk/kNdtRTsMYlGhsao7uq7y+SPjBPi8IlSw5NwgVO09Y35x26VUVH6mDeF7apV5LM07UyhO3ypAynNuLkctV5jWplrrH5YqP2MTmWKmH3TtcS2CeZhlzNje8gM=","Records":[{"PartitionKey":"person","Data":"rO0ABXNyABVpby5jaXJjZS5Kc29uJEpPYmplY3SdN5sjWbqH6AIAAUwABXZhbHVldAAVTGlvL2NpcmNlL0pzb25PYmplY3Q7eHIADWlvLmNpcmNlLkpzb26ynGPiQtIPTQIAAHhwc3IAK2lvLmNpcmNlLkpzb25PYmplY3QkTGlua2VkSGFzaE1hcEpzb25PYmplY3RPZAjmmmTTxQIAAUwAM2lvJGNpcmNlJEpzb25PYmplY3QkTGlua2VkSGFzaE1hcEpzb25PYmplY3QkJGZpZWxkc3QAGUxqYXZhL3V0aWwvTGlua2VkSGFzaE1hcDt4cgATaW8uY2lyY2UuSnNvbk9iamVjdAokXO7fm8O4AgAAeHBzcgAXamF2YS51dGlsLkxpbmtlZEhhc2hNYXA0wE5cEGzA+wIAAVoAC2FjY2Vzc09yZGVyeHIAEWphdmEudXRpbC5IYXNoTWFwBQfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAN0AARuYW1lc3IAFWlvLmNpcmNlLkpzb24kSlN0cmluZyLCuaRSGTIoAgABTAAFdmFsdWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cQB+AAJ0AARQYXVsdAADYWdlc3IAFWlvLmNpcmNlLkpzb24kSk51bWJlchKBgJvaF3XzAgABTAAFdmFsdWV0ABVMaW8vY2lyY2UvSnNvbk51bWJlcjt4cQB+AAJzcgARaW8uY2lyY2UuSnNvbkxvbmcQF3adDCwLugIAAUoABXZhbHVleHIAE2lvLmNpcmNlLkpzb25OdW1iZXKWBPtHnyRRaQIAAHhwAAAAAAAAADV0AAZoZWlnaHRzcQB+ABFzcgATaW8uY2lyY2UuSnNvbkRvdWJsZU932SwakUsHAgABRAAFdmFsdWV4cQB+ABVAGQAAAAAAAHgA","ApproximateArrivalTimestamp":1551547742.411,"SequenceNumber":"49593468040026657000809318652465555021718012239217164306"}]}"

When I change my receiveBody type from Json to ByteBuffer, I still get null, which is consistent with the fact that my property-based test of my serialization-based TypeConverter for Circe's Json passes.

In my mind, this all points pretty strongly at Camel itself, and the fact that I apparently need to know what shard to request from with Kinesis coupled with pretty strong evidence it simply doesn't work is souring me on what I'd hoped to be well-founded advocacy for Camel via streamz-camel-fs2 at work. :-( (No fault of yours, of course--thanks so much for your efforts!)

I tried running it:test (on Ubuntu 16.04, Docker installed) but got

2019-03-03 10:32:31 [pool-6-thread-7] ERROR o.t.d.EnvironmentAndSystemPropertyClientProviderStrategy - ping failed with configuration Environment variables, system properties and defaults. Resolved: 
    dockerHost=unix:///var/run/docker.sock
    apiVersion='{UNKNOWN_VERSION}'
    registryUrl='https://index.docker.io/v1/'
    registryUsername='martin'
    registryPassword='null'
    registryEmail='null'
    dockerConfig='DefaultDockerClientConfig[dockerHost=unix:///var/run/docker.sock,registryUsername=martin,registryPassword=<null>,registryEmail=<null>,registryUrl=https://index.docker.io/v1/,dockerConfigPath=/home/martin/.docker,sslConfig=<null>,apiVersion={UNKNOWN_VERSION},dockerConfig=<null>]'
 due to org.rnorth.ducttape.TimeoutException: Timeout waiting for result with exception
org.rnorth.ducttape.TimeoutException: Timeout waiting for result with exception
	at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:51)
	at org.testcontainers.dockerclient.DockerClientProviderStrategy.ping(DockerClientProviderStrategy.java:190)
	at org.testcontainers.dockerclient.EnvironmentAndSystemPropertyClientProviderStrategy.test(EnvironmentAndSystemPropertyClientProviderStrategy.java:42)
	at org.testcontainers.dockerclient.DockerClientProviderStrategy.lambda$getFirstValidStrategy$2(DockerClientProviderStrategy.java:113)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
	at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:302)
	at java.util.stream.Streams$ConcatSpliterator.tryAdvance(Streams.java:731)
	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.findAny(ReferencePipeline.java:469)
	at org.testcontainers.dockerclient.DockerClientProviderStrategy.getFirstValidStrategy(DockerClientProviderStrategy.java:148)
	at org.testcontainers.DockerClientFactory.client(DockerClientFactory.java:105)
	at org.testcontainers.containers.GenericContainer.<init>(GenericContainer.java:142)
	at org.testcontainers.containers.localstack.LocalStackContainer.<init>(LocalStackContainer.java:40)
	at formation.platform.testing.LocalStackContainer.$anonfun$container$1(LocalStackContainer.scala:13)
	at scala.Option.map(Option.scala:163)
	at formation.platform.testing.LocalStackContainer.<init>(LocalStackContainer.scala:13)
	at formation.platform.testing.LocalStackContainer$.apply(LocalStackContainer.scala:26)
	at ai.formation.platform.data.DataSpec.<init>(DataSpec.scala:60)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.lang.Class.newInstance(Class.java:442)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:435)
	at sbt.TestRunner.runTest$1(TestFramework.scala:113)
	at sbt.TestRunner.run(TestFramework.scala:124)
	at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:282)
	at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:246)
	at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:282)
	at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:282)
	at sbt.TestFunction.apply(TestFramework.scala:294)
	at sbt.Tests$.$anonfun$toTask$1(Tests.scala:309)
	at sbt.std.Transform$$anon$3.$anonfun$apply$2(System.scala:46)
	at sbt.std.Transform$$anon$4.work(System.scala:67)
	at sbt.Execute.$anonfun$submit$2(Execute.scala:269)
	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)
	at sbt.Execute.work(Execute.scala:278)
	at sbt.Execute.$anonfun$submit$1(Execute.scala:269)
	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:178)
	at sbt.CompletionService$$anon$2.call(CompletionService.scala:37)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: com.sun.jna.LastErrorException: [13] Permission denied
	at org.testcontainers.shaded.org.scalasbt.ipcsocket.UnixDomainSocket.<init>(UnixDomainSocket.java:62)
	at org.testcontainers.dockerclient.transport.okhttp.UnixSocketFactory$1.<init>(UnixSocketFactory.java:27)
	at org.testcontainers.dockerclient.transport.okhttp.UnixSocketFactory.createSocket(UnixSocketFactory.java:27)
	at org.testcontainers.shaded.okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:240)
	at org.testcontainers.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:166)
	at org.testcontainers.shaded.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257)
	at org.testcontainers.shaded.okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
	at org.testcontainers.shaded.okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
	at org.testcontainers.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
	at org.testcontainers.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
	at org.testcontainers.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
	at org.testcontainers.shaded.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
	at org.testcontainers.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
	at org.testcontainers.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:254)
	at org.testcontainers.shaded.okhttp3.RealCall.execute(RealCall.java:92)
	at org.testcontainers.dockerclient.transport.okhttp.OkHttpInvocationBuilder.execute(OkHttpInvocationBuilder.java:259)
	at org.testcontainers.dockerclient.transport.okhttp.OkHttpInvocationBuilder.execute(OkHttpInvocationBuilder.java:254)
	at org.testcontainers.dockerclient.transport.okhttp.OkHttpInvocationBuilder.get(OkHttpInvocationBuilder.java:220)
	at org.testcontainers.dockerclient.transport.okhttp.OkHttpDockerCmdExecFactory$1.execute(OkHttpDockerCmdExecFactory.java:124)
	at org.testcontainers.dockerclient.transport.okhttp.OkHttpDockerCmdExecFactory$1.execute(OkHttpDockerCmdExecFactory.java:117)
	at com.github.dockerjava.core.exec.AbstrSyncDockerCmdExec.exec(AbstrSyncDockerCmdExec.java:21)
	at com.github.dockerjava.core.command.AbstrDockerCmd.exec(AbstrDockerCmd.java:35)
	at org.testcontainers.dockerclient.DockerClientProviderStrategy.lambda$null$4(DockerClientProviderStrategy.java:193)
	at org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51)
	at org.testcontainers.dockerclient.DockerClientProviderStrategy.lambda$ping$5(DockerClientProviderStrategy.java:191)
	at org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:41)
	... 4 common frames omitted
Caused by: com.sun.jna.LastErrorException: [13] Permission denied
	at org.testcontainers.shaded.org.scalasbt.ipcsocket.UnixDomainSocketLibrary.connect(Native Method)
	at org.testcontainers.shaded.org.scalasbt.ipcsocket.UnixDomainSocket.<init>(UnixDomainSocket.java:57)
	... 35 common frames omitted

Any idea what I'm doing wrong?

@PaulAtFormation I'm going to close this for now. If you're still experiencing the issue, please reopen and leave some more info so we can track it down