Memory leak AsyncRabbitTemplate
tetrade opened this issue · 8 comments
Version 3.1.4.
I recently wrote about a bug with a memory leak that occurred due to the fact that the internal pending map was not cleaned up after receive message.
Now everything works better, but you can see that for some reason some Messages still remain indefinitely in memory.
@tetrade ,
thank you for the report!
I'd be happy to fix the problem, but I don't understand how to read that object tree.
Yes, I do see RabbitMessageFuture
and its TimeoutTask
reference. But it is not clear for me what is requestMessage
and why that RabbitMessageFuture
is a part of it?
Any chances to determine what you are doing in your code with this AsyncRabbitTemplate
?
@artembilan
requestMessage
- name of field in RabbitFuture
. I guess that TimeoutTask in some reason didn't deleted from memory and still have reference from Scheldued that created here org.springframework.amqp.rabbit.AsyncRabbitTemplate#timeoutTask
In short, I just use Rabbit to send a photo to a third-party service for processing and wait for a response via Direct Reply-to. When all the Future (many photos have been analyzed) are completed, I consider the project processed and upload everything to the user.
I initially thought that I was mistaken, because I have an internal ConcurrentMap in my code that takes into account the Id of the photo and stores it as the value of RabbitFuture so that when re-requesting it, I understand where to wait for a response.
However, given my implementation, there is hardly a leak there. And the profiler shows that there is a leak in scheduled tasks in Java
As you see the logic there is like this:
@Nullable
private ScheduledFuture<?> timeoutTask(RabbitFuture<?> future) {
if (this.receiveTimeout > 0) {
this.lock.lock();
try {
if (!this.running) {
this.pending.remove(future.getCorrelationId());
throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
}
return this.taskScheduler.schedule(
new TimeoutTask(future, this.pending, this.directReplyToContainer),
Instant.now().plusMillis(this.receiveTimeout));
}
finally {
this.lock.unlock();
}
}
return null;
}
So, such a timeout task is scheduled to be executed after that this.receiveTimeout
(30 seconds
by default).
Let's see if the problem with that ThreadPoolTaskScheduler
with the default as private volatile int poolSize = 1;
!
This way, it feels like "too many messages" would cause such a pool to be exhausted and therefore we see some kind of memory leak for some period of time until the queue of those TimeoutTask
is cleared.
You can set an external ThreadPoolTaskScheduler
with a bigger thread pool to mitigate workload.
Again: this is just an assumption since I don't see how else those TimeoutTask
instances could be held in the memory if scheduling is done properly.
I also had memory leak
in same scene[AsyncRabbitTemplate.sendAndReceive()
], but version is 2.4.17
Maybe the bug also exists in 3.1.x @tetrade
Causes of memory leaks
-
When
AsyncRabbitTemplate.sendAndReceive() and receiveTimeout > 0
, will startTimer() and createtimeoutTask
withTaskScheduler
-
When
sendAndReceive()
completed, buttimeoutTask
is not clear immediately, and remains in theTaskScheduler
untilreceiveTimeout
-
IF
High throughput in short time
scenarios,a large number oftimeoutTask
can cause memory leaks !!!
// Class: AsyncRabbitTemplate
@Override
public RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message) {
String correlationId = getOrSetCorrelationIdAndSetReplyTo(message, null);
RabbitMessageFuture future = new RabbitMessageFuture(correlationId, message);
CorrelationData correlationData = null;
if (this.enableConfirms) {
correlationData = new CorrelationData(correlationId);
future.setConfirm(new SettableListenableFuture<>());
}
this.pending.put(correlationId, future);
if (this.container != null) {
this.template.send(exchange, routingKey, message, correlationData);
}
else {
ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
future.setChannelHolder(channelHolder);
sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
}
// !!! Problem spot
future.startTimer();
return future;
}
// Class: RabbitFuture<T>
void startTimer() {
if (AsyncRabbitTemplate.this.receiveTimeout > 0) {
synchronized (AsyncRabbitTemplate.this) {
if (!AsyncRabbitTemplate.this.running) {
AsyncRabbitTemplate.this.pending.remove(this.correlationId);
throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
}
// !!! Problem spot
this.timeoutTask = AsyncRabbitTemplate.this.taskScheduler.schedule(new TimeoutTask(),
new Date(System.currentTimeMillis() + AsyncRabbitTemplate.this.receiveTimeout));
}
}
else {
this.timeoutTask = null;
}
}
Expect it to be fixed in version 2.4.x,Looking forward to your reply @artembilan
Thank you for investigation!
I will into that code today.
However we are not going to back-port the fix into 2.4.x
branch. That version is out of Open Source support: https://spring.io/projects/spring-amqp#support
I think this was fixed via: #2724 and released as a part of the 3.1.6
.
So, treating this as a Duplication
and close respectively.
As I said: this is not going to be back-ported to the 2.4.x
because no support for that branch any more.
Thank you for investigation!
I will into that code today.
However we are not going to back-port the fix into
2.4.x
branch. That version is out of Open Source support: https://spring.io/projects/spring-amqp#support
OK, Thanks