Read timed out / TIMEOUT WAITING FOR ACK when sending/receiving messages in parallel
Closed this issue · 5 comments
Hi,
In what version(s) of Spring AMQP are you seeing this issue?
3.1.7
Describe the bug
We noticed that our application sometimes fails to send messages with "TIMEOUT WAITING FOR ACK". We were able to create a minimal example which fails to send messages once we send them in parallel.
To Reproduce
Please take a look at the following example:
public class TestDemoApplication {
private static final String MESSAGE = Stream
.generate( ( ) -> "A" )
.limit( 1 * 1024 * 1024 )
.collect( Collectors.joining( ) );
public static void main( String[] args ) {
final SpringApplication.Running running = SpringApplication
.from( DemoApplication::main )
.with( RabbitMQConfiguration.class )
.run( args );
final ConfigurableApplicationContext applicationContext = running.getApplicationContext( );
final RabbitTemplate rabbitTemplate = applicationContext.getBean( RabbitTemplate.class );
final ExecutorService executorService = Executors.newFixedThreadPool( 2 );
for ( int i = 0; i < 1000; i++ ) {
final String messageId = "id-" + i;
executorService.submit( ( ) -> sendMessage( rabbitTemplate, messageId ) );
}
}
private static void sendMessage( final RabbitTemplate rabbitTemplate, final String messageId ) {
try {
final CorrelationData correlationData = new CorrelationData( );
rabbitTemplate.convertAndSend( Constants.EXCHANGE_NAME, "key", MESSAGE, correlationData );
System.out.println( "Message " + messageId + " has been sent." );
final CorrelationData.Confirm confirm = correlationData.getFuture( ).get( 10, TimeUnit.SECONDS );
System.out.println( "Confirm Ack was " + confirm.isAck( ) );
} catch ( Exception ex ) {
throw new RuntimeException( "An exception occured.", ex );
}
}
}
The configuration is
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
The example sends 1000 messages - each containing approx. 1 MiByte payload - with two threads. After some messages it fails with
2024-11-15T15:05:15.705+01:00 ERROR 8536 --- [demo] [127.0.0.1:33303] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occurred
java.net.SocketTimeoutException: Read timed out
at java.base/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:278) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:304) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796) ~[na:na]
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099) ~[na:na]
at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345) ~[na:na]
at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420) ~[na:na]
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399) ~[na:na]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:208) ~[na:na]
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:179) ~[na:na]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:121) ~[amqp-client-5.21.0.jar:5.21.0]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:199) ~[amqp-client-5.21.0.jar:5.21.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:687) ~[amqp-client-5.21.0.jar:5.21.0]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]2024-11-15T15:05:15.709+01:00 ERROR 8536 --- [demo] [nectionFactory4] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: clean channel shutdown; protocol method: #method<channel.close>(reply-code=406, reply-text=TIMEOUT WAITING FOR ACK, class-id=0, method-id=0)
If we add a synchronized block around sendMessage, everything works.
Expected behavior
The messages are send without issues regarding the ack.
Thank you and best regards,
Nils
Please, share a simple project with us to reproduce on our side.
You probably have something else what makes such an effect.
For example, we don't know what is that RabbitMQConfiguration
.
I just created one like this:
@SpringBootApplication
public class SpringAmqpIssue2896Application {
private static final String MESSAGE = Stream
.generate(() -> "A")
.limit(1024 * 1024)
.collect(Collectors.joining());
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(SpringAmqpIssue2896Application.class);
final RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
final ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 1000; i++) {
final String messageId = "id-" + i;
executorService.submit(() -> sendMessage(rabbitTemplate, messageId));
}
}
private static void sendMessage(final RabbitTemplate rabbitTemplate, final String messageId) {
try {
final CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend("", "someQueue", MESSAGE, correlationData);
System.out.println("Message " + messageId + " has been sent.");
final CorrelationData.Confirm confirm = correlationData.getFuture().get(10, TimeUnit.SECONDS);
System.out.println("Confirm Ack was " + confirm.isAck());
}
catch (Exception ex) {
throw new RuntimeException("An exception occurred.", ex);
}
}
@Bean
Queue someQueue() {
return new Queue("someQueue");
}
}
And looks like result is totally OK:
...
Message id-993 has been sent.
Message id-994 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-995 has been sent.
Confirm Ack was true
Message id-996 has been sent.
Confirm Ack was true
Message id-997 has been sent.
Confirm Ack was true
Message id-998 has been sent.
Confirm Ack was true
Message id-999 has been sent.
Confirm Ack was true
Hi @artembilan,
Sure. Here you go: https://github.com/nils-christian/spring-amqp-issue-2896
The relevant method can be found in TestDemoApplication.java.
Best regards,
Nils
Thanks.
Running your application as (I mean TestDemoApplication
from test
scope) is does not fail for me:
2024-11-15T14:10:39.780-05:00 WARN 28292 --- [demo] [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
Message id-995 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.800-05:00 WARN 28292 --- [demo] [nectionFactory3] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
Message id-996 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.816-05:00 WARN 28292 --- [demo] [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
Message id-997 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.837-05:00 WARN 28292 --- [demo] [nectionFactory4] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
Message id-998 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.855-05:00 WARN 28292 --- [demo] [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
Message id-999 has been sent.
Confirm Ack was true
2024-11-15T14:10:39.872-05:00 WARN 28292 --- [demo] [nectionFactory3] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
Confirm Ack was true
2024-11-15T14:10:39.882-05:00 WARN 28292 --- [demo] [nectionFactory3] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
And that is expected because there is no queue bound to that key
.
When add some queue and bind it into your exchange:
@Bean
Queue testQueue() {
return new Queue("testQueue");
}
@Bean
Binding testBinding(Queue testQueue, TopicExchange testTopicExchange) {
return BindingBuilder.bind(testQueue).to(testTopicExchange).with("key");
}
I got same result as before with my own test:
Message id-994 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-995 has been sent.
Message id-996 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-997 has been sent.
Message id-998 has been sent.
Confirm Ack was true
Confirm Ack was true
Message id-999 has been sent.
Confirm Ack was true
I'm not sure how to reproduce your TIMEOUT WAITING FOR ACK
error.
Anything else you can share to elaborate the problem?
Interesting. The timeout happens on my system each time I execute the code without the queues. I will try and reproduce the example with binding and queries.
In the meantime: Do you have any (other) idea what might trigger a "TIMEOUT WAITING FOR ACK"? This occurs in our environment again and again, but cleans up itself after some time.
Duplication of #2907.
But looks like that one has more analysis and really states what is the problem.
Thank you and let's follow up in the issue from now on!