AutoMQ/automq

[E2E] kafka producer with version 1.1 may fail to send messages

Closed this issue · 2 comments

test_id: kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_client_throttling_behavior=True

In this test, one producer with version 1.1 will try to send 50000 records with quota. A Consumer will consume all these records at the same time.

However, the test ended with msg:

[INFO  - 2024-01-31 11:39:00,668 - runner_client - log - lineno:294]: RunnerClient: kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_client_throttling_behavior=True: FAIL: AssertionError("number of produced messages 50000 doesn't equal number of consumed messages 29806")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run
    data = self.run_test()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 170, in test_quota
    assert success, msg
AssertionError: number of produced messages 50000 doesn't equal number of consumed messages 29806

consumer lag showed that there were not 50000 records in cluster:

Consumer group 'test-consumer-group' has no active members.

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-consumer-group test_topic      0          5808            5808            0               -               -               -
test-consumer-group test_topic      4          5807            5807            0               -               -               -
test-consumer-group test_topic      3          5808            5808            0               -               -               -
test-consumer-group test_topic      5          5808            5808            0               -               -               -
test-consumer-group test_topic      2          5808            5808            0               -               -               -
test-consumer-group test_topic      1          5806            5806            0               -               -               -

Some exceptions were thrown in server.log:

[2024-01-31 10:33:17,643] ERROR Closing socket for 10.5.0.5:9093-10.5.0.6:47968-2 because of error (kafka.network.Processor)
java.lang.IllegalStateException: Cannot transition from NOT_MUTED for THROTTLE_STARTED
	at org.apache.kafka.common.network.KafkaChannel.handleChannelMuteEvent(KafkaChannel.java:313)
	at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1(SocketServer.scala:1369)
	at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1$adapted(SocketServer.scala:1369)
	at scala.Option.foreach(Option.scala:437)
	at kafka.network.Processor.handleChannelMuteEvent(SocketServer.scala:1369)
	at kafka.network.Processor.processNewResponses(SocketServer.scala:1023)
	at kafka.network.Processor.run(SocketServer.scala:959)
	at java.base/java.lang.Thread.run(Thread.java:833)
[2024-01-31 10:33:17,650] ERROR Exception while processing response for 10.5.0.5:9093-10.5.0.6:47968-2 (kafka.network.Processor)
java.lang.IllegalStateException: Cannot transition from NOT_MUTED for THROTTLE_STARTED
	at org.apache.kafka.common.network.KafkaChannel.handleChannelMuteEvent(KafkaChannel.java:313)
	at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1(SocketServer.scala:1369)
	at kafka.network.Processor.$anonfun$handleChannelMuteEvent$1$adapted(SocketServer.scala:1369)
	at scala.Option.foreach(Option.scala:437)
	at kafka.network.Processor.handleChannelMuteEvent(SocketServer.scala:1369)
	at kafka.network.Processor.processNewResponses(SocketServer.scala:1023)
	at kafka.network.Processor.run(SocketServer.scala:959)
	at java.base/java.lang.Thread.run(Thread.java:833)

some other exceptions were thrown in producer's log:

[2024-01-31 10:33:17,723] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:173)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
	at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
	at org.apache.kafka.common.network.Selector.close(Selector.java:736)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:534)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
	at java.base/java.lang.Thread.run(Thread.java:833)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

How to reproduce

set quota with client id 'overridden_id':

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config producer_byte_rate=10,consumer_byte_rate=10 --entity-type clients --entity-name overridden_id

Then, try to send 100 messages with client id 'overridden_id'.

Related PR which causes the problem:

Solution:
image