spring-cloud/spring-cloud-stream-samples

kafka-streams-word-count error when running KafkaStreamsWordCountApplicationTests

gborobio73 opened this issue · 1 comments

Hello!
Thank you for the examples, they are very useful. I have a problem when running KafkaStreamsWordCountApplicationTests from kafka-streams-word-count.
First I get this error due using Java 14:

java.lang.IllegalArgumentException: Unable to canonicalize address 127.0.0.1/<unresolved>:59201 because it's not resolvable

Then I switch to newer Kafka as explained here but the test still fails.
First I see this error in the logs:

2020-06-09 08:16:36,182 ERROR hello-word-count-sample-58e3b82d-9e56-4f85-9067-5eab473cd5f1-StreamThread-1 o.a.z.s.NIOServerCnxnFactory:92 - Thread 	StreamsThread threadId: hello-word-count-sample-58e3b82d-9e56-4f85-9067-5eab473cd5f1-StreamThread-1
TaskManager
	MetadataState:
		GlobalMetadata: []
		GlobalStores: []
		My HostInfo: HostInfo{host='unknown', port=-1}
		null
	Active tasks:
		Running:
		Suspended:
		New:
		Restoring:
	Standby tasks:
		Running:
		Suspended:
		New:
 died
java.lang.NoSuchMethodError: 'java.lang.Object org.apache.kafka.common.utils.Utils.notNull(java.lang.Object)'
	at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:110)
	at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049)
	at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:545)
	at org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.assign(PartitionAssignorAdapter.java:59)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

Then it gets stuck at

2020-06-09 08:16:37,703  INFO controller-event-thread k.c.KafkaController:66 - [Controller id=0] Processing automatic preferred replica leader election

for about one minute and starts the shutting down process.

Test fails with:

java.lang.AssertionError:

Expecting:
 <0>
to be greater than or equal to:
 <1>
	at kafka.streams.word.count.KafkaStreamsWordCountApplicationTests.testKafkaStreamsWordCountProcessor(KafkaStreamsWordCountApplicationTests.java:89)

I really appreciate your help!

Thanks a lot!

Hello!
The issue was due not having latest kafka streams dependency. I updated to 2.4.1 and works perfectly.

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams</artifactId>
	<version>2.4.1</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams-test-utils</artifactId>
	<version>2.4.1</version>
	<scope>test</scope>
</dependency>

Thank you!