spring-projects/spring-amqp

Memory leak AsyncRabbitTemplate

tetrade opened this issue · 8 comments

Version 3.1.4.

I recently wrote about a bug with a memory leak that occurred due to the fact that the internal pending map was not cleaned up after receive message.

Now everything works better, but you can see that for some reason some Messages still remain indefinitely in memory.

image

@tetrade ,

thank you for the report!

I'd be happy to fix the problem, but I don't understand how to read that object tree.
Yes, I do see RabbitMessageFuture and its TimeoutTask reference. But it is not clear for me what is requestMessage and why that RabbitMessageFuture is a part of it?

Any chances to determine what you are doing in your code with this AsyncRabbitTemplate?

@artembilan
requestMessage - name of field in RabbitFuture. I guess that TimeoutTask in some reason didn't deleted from memory and still have reference from Scheldued that created here org.springframework.amqp.rabbit.AsyncRabbitTemplate#timeoutTask

@artembilan

In short, I just use Rabbit to send a photo to a third-party service for processing and wait for a response via Direct Reply-to. When all the Future (many photos have been analyzed) are completed, I consider the project processed and upload everything to the user.

I initially thought that I was mistaken, because I have an internal ConcurrentMap in my code that takes into account the Id of the photo and stores it as the value of RabbitFuture so that when re-requesting it, I understand where to wait for a response.

However, given my implementation, there is hardly a leak there. And the profiler shows that there is a leak in scheduled tasks in Java

image

As you see the logic there is like this:

	@Nullable
	private ScheduledFuture<?> timeoutTask(RabbitFuture<?> future) {
		if (this.receiveTimeout > 0) {
			this.lock.lock();
			try {
				if (!this.running) {
					this.pending.remove(future.getCorrelationId());
					throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
				}
				return this.taskScheduler.schedule(
						new TimeoutTask(future, this.pending, this.directReplyToContainer),
						Instant.now().plusMillis(this.receiveTimeout));
			}
			finally {
				this.lock.unlock();
			}
		}
		return null;
	}

So, such a timeout task is scheduled to be executed after that this.receiveTimeout (30 seconds by default).
Let's see if the problem with that ThreadPoolTaskScheduler with the default as private volatile int poolSize = 1;!
This way, it feels like "too many messages" would cause such a pool to be exhausted and therefore we see some kind of memory leak for some period of time until the queue of those TimeoutTask is cleared.

You can set an external ThreadPoolTaskScheduler with a bigger thread pool to mitigate workload.

Again: this is just an assumption since I don't see how else those TimeoutTask instances could be held in the memory if scheduling is done properly.

I also had memory leak in same scene[AsyncRabbitTemplate.sendAndReceive()], but version is 2.4.17

Maybe the bug also exists in 3.1.x @tetrade

Causes of memory leaks

  1. When AsyncRabbitTemplate.sendAndReceive() and receiveTimeout > 0, will startTimer() and create timeoutTask with TaskScheduler

  2. When sendAndReceive() completed, but timeoutTask is not clear immediately, and remains in the TaskScheduler until receiveTimeout

  3. IF High throughput in short time scenarios,a large number of timeoutTask can cause memory leaks !!!

        // Class: AsyncRabbitTemplate
	@Override
	public RabbitMessageFuture sendAndReceive(String exchange, String routingKey, Message message) {
		String correlationId = getOrSetCorrelationIdAndSetReplyTo(message, null);
		RabbitMessageFuture future = new RabbitMessageFuture(correlationId, message);
		CorrelationData correlationData = null;
		if (this.enableConfirms) {
			correlationData = new CorrelationData(correlationId);
			future.setConfirm(new SettableListenableFuture<>());
		}
		this.pending.put(correlationId, future);
		if (this.container != null) {
			this.template.send(exchange, routingKey, message, correlationData);
		}
		else {
			ChannelHolder channelHolder = this.directReplyToContainer.getChannelHolder();
			future.setChannelHolder(channelHolder);
			sendDirect(channelHolder.getChannel(), exchange, routingKey, message, correlationData);
		}
                 
                 // !!! Problem spot
		future.startTimer();
		return future;
	}
        // Class: RabbitFuture<T>

        void startTimer() {
	    if (AsyncRabbitTemplate.this.receiveTimeout > 0) {
		    synchronized (AsyncRabbitTemplate.this) {
			    if (!AsyncRabbitTemplate.this.running) {
				    AsyncRabbitTemplate.this.pending.remove(this.correlationId);
				    throw new IllegalStateException("'AsyncRabbitTemplate' must be started.");
			    }

                             // !!! Problem spot
			    this.timeoutTask = AsyncRabbitTemplate.this.taskScheduler.schedule(new TimeoutTask(),
					    new Date(System.currentTimeMillis() + AsyncRabbitTemplate.this.receiveTimeout));
		    }
	    }
	    else {
		    this.timeoutTask = null;
	    }
      }

Expect it to be fixed in version 2.4.x,Looking forward to your reply @artembilan

@mailingfeng ,

Thank you for investigation!

I will into that code today.

However we are not going to back-port the fix into 2.4.x branch. That version is out of Open Source support: https://spring.io/projects/spring-amqp#support

I think this was fixed via: #2724 and released as a part of the 3.1.6.

So, treating this as a Duplication and close respectively.
As I said: this is not going to be back-ported to the 2.4.x because no support for that branch any more.

@mailingfeng ,

Thank you for investigation!

I will into that code today.

However we are not going to back-port the fix into 2.4.x branch. That version is out of Open Source support: https://spring.io/projects/spring-amqp#support

OK, Thanks