Too large message causes infinite loop / consumer can no longer consume messages
DevQuandt opened this issue · 2 comments
Describe the bug
When you have a message which is bigger than the "maxBodyLength" set for the amqp client it will not be moved to a dead-letter queue. The consumer is restarted infinitely and no messages can be processed until the message is manually removed from the rabbit.
2024-01-10T10:22:38.559+01:00 INFO 17208 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@58ed3837: tags=[[amq.ctag-7WEuQ4gIFVp2Q9UZEzBy7A]], channel=Cached Rabbit Channel: AMQChannel(amqp://*****@127.0.0.1:5673/*****,1), conn: Proxy@59a79443 Shared Rabbit Connection: SimpleConnection@51036fec [delegate=amqp://*****@127.0.0.1:5673/****, localPort=63968], acknowledgeMode=AUTO local queue size=0
2024-01-10T10:22:38.562+01:00 INFO 17208 --- [ntContainer#0-4] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673]
2024-01-10T10:22:38.589+01:00 INFO 17208 --- [ntContainer#0-4] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#60783105:3/SimpleConnection@42154df2 [delegate=amqp://*****@127.0.0.1:5673/*****, localPort=63969]
2024-01-10T10:22:38.598+01:00 ERROR 17208 --- [ 127.0.0.1:5673] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occurred
java.lang.IllegalStateException: Message body is too large (68157440), maximum configured size is 67108864. See ConnectionFactory#setMaxInboundMessageBodySize if you need to increase the limit.
at com.rabbitmq.client.impl.CommandAssembler.consumeHeaderFrame(CommandAssembler.java:109) ~[amqp-client-5.20.0.jar:5.20.0]
at com.rabbitmq.client.impl.CommandAssembler.handleFrame(CommandAssembler.java:172) ~[amqp-client-5.20.0.jar:5.20.0]
at com.rabbitmq.client.impl.AMQCommand.handleFrame(AMQCommand.java:108) ~[amqp-client-5.20.0.jar:5.20.0]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:123) ~[amqp-client-5.20.0.jar:5.20.0]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:761) ~[amqp-client-5.20.0.jar:5.20.0]
at com.rabbitmq.client.impl.AMQConnection.access$400(AMQConnection.java:48) ~[amqp-client-5.20.0.jar:5.20.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:688) ~[amqp-client-5.20.0.jar:5.20.0]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Reproduction steps
- Create a queue and send a large message to the queue
@SpringBootApplication
public class AmqpdemoApplication {
public static void main(String[] args) {
SpringApplication.run(AmqpdemoApplication.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
sendMessage("myQueue", 1, 65, template);
};
}
public void sendMessage(String queueName, int numberOfMessages, int sizeInMb, final RabbitTemplate template) {
for (int i = 0; i < numberOfMessages; i++) {
byte[] message = new byte[sizeInMb * 1024 * 1024];
new Random().nextBytes(message);
template.convertAndSend(queueName, message);
}
}
}
- Create a listener which consumes the messages
@Service
public class ListenerService {
@RabbitListener(queues = "myQueue")
public void receiveMessage(final String message) {
System.out.println("Received message!");
}
}
- Start the application
Expected behavior
When the rabbit-mq has a "x-dead-letter-exchange" configured the message should be rejected (and moved to the dead letter queue)
Additional context
Context: This example is done with a spring-boot application using Spring 3.2.0 and 'org.springframework.boot:spring-boot-starter-amqp'
Rabbit: rabbitmq:3.7.7-management-alpine
amqp-client: 5.20.0
The connection is closed in case of a message with a body too large, so it's unlikely the Java client tries to restore the consumer, as an IllegalStateException does not trigger connection recovery. I suspect Spring AMQP connection recovery kicks in and tries to restore the consumer, that's the reason the consumer gets stuck in an infinite loop. Please do more testing and see with the Spring AMQP folks accordingly.
About rejecting the message to send it to a DLX, the message is lost if the consumer uses automatic acknowledgment or goes back to the queue if the consumer uses manual acknowledgment. The message can become a poison message if it comes back again and again. This can be mitigated with TTL or poison message handling with quorum queues.
I'm not sure always rejecting the message without requeuing is what every user would want, so we would need to add an option. The idea would be to send the large message elsewhere and consider it is now someone else's problem. Not sure there's a definitive answer here.
Any opinion @michaelklishin?
RabbitMQ 3.7 is a cool version that has reached EOL in November 2020.
We have been recommending against using large messages (says, 50 MiB or larger) for years. Put them in a blob store and transfer their ID in the messages you send (or in the data structures you store).
In fact, both RabbitMQ and some of our clients now have explicitly limits on the message size they would accept, and that limit has been gradually reduced (it is still over 100 MiB everywhere IIRC).
Dead lettering large messages will in no way avoid some of the key issues they present: unexpected memory footprint spikes (and not just on the broker nodes, consumers are equally affected).
So this is a "wontfix".