[E2E] kafka producer with version 1.1 may fail to send messages
Closed this issue · 2 comments
mooc9988 commented
test_id: kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_client_throttling_behavior=True
In this test, one producer with version 1.1 will try to send 50000 records with quota. A Consumer will consume all these records at the same time.
However, the test ended with msg:
[INFO - 2024-01-31 11:39:00,668 - runner_client - log - lineno:294]: RunnerClient: kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_client_throttling_behavior=True: FAIL: AssertionError("number of produced messages 50000 doesn't equal number of consumed messages 29806")
Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run
data = self.run_test()
File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test
return self.test_context.function(self.test)
File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 170, in test_quota
assert success, msg
AssertionError: number of produced messages 50000 doesn't equal number of consumed messages 29806
consumer lag showed that there were not 50000 records in cluster:
Consumer group 'test-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test_topic 0 5808 5808 0 - - -
test-consumer-group test_topic 4 5807 5807 0 - - -
test-consumer-group test_topic 3 5808 5808 0 - - -
test-consumer-group test_topic 5 5808 5808 0 - - -
test-consumer-group test_topic 2 5808 5808 0 - - -
test-consumer-group test_topic 1 5806 5806 0 - - -
Some exceptions were thrown in server.log:
[2024-01-31 10:33:17,643] ERROR Closing socket for 10.5.0.5:9093-10.5.0.6:47968-2 because of error (kafka.network.Processor)
java.lang.IllegalStateException: Cannot transition from NOT_MUTED for THROTTLE_STARTED
at org.apache.kafka.common.network.KafkaChannel.handleChannelMuteEvent(KafkaChannel.java:313)
at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1(SocketServer.scala:1369)
at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1$adapted(SocketServer.scala:1369)
at scala.Option.foreach(Option.scala:437)
at kafka.network.Processor.handleChannelMuteEvent(SocketServer.scala:1369)
at kafka.network.Processor.processNewResponses(SocketServer.scala:1023)
at kafka.network.Processor.run(SocketServer.scala:959)
at java.base/java.lang.Thread.run(Thread.java:833)
[2024-01-31 10:33:17,650] ERROR Exception while processing response for 10.5.0.5:9093-10.5.0.6:47968-2 (kafka.network.Processor)
java.lang.IllegalStateException: Cannot transition from NOT_MUTED for THROTTLE_STARTED
at org.apache.kafka.common.network.KafkaChannel.handleChannelMuteEvent(KafkaChannel.java:313)
at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1(SocketServer.scala:1369)
at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1$adapted(SocketServer.scala:1369)
at scala.Option.foreach(Option.scala:437)
at kafka.network.Processor.handleChannelMuteEvent(SocketServer.scala:1369)
at kafka.network.Processor.processNewResponses(SocketServer.scala:1023)
at kafka.network.Processor.run(SocketServer.scala:959)
at java.base/java.lang.Thread.run(Thread.java:833)
some other exceptions were thrown in producer's log:
[2024-01-31 10:33:17,723] WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:173)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
at org.apache.kafka.common.network.Selector.close(Selector.java:736)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:534)
at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.base/java.lang.Thread.run(Thread.java:833)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
How to reproduce
set quota with client id 'overridden_id':
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config producer_byte_rate=10,consumer_byte_rate=10 --entity-type clients --entity-name overridden_id
Then, try to send 100 messages with client id 'overridden_id'.
superhx commented