OpenHFT/Chronicle-Queue

Roll files never closed

remiville opened this issue · 7 comments

Hello,

I'm using chronicle-queue 5.25ea13 (I had the same issue with 5.23.43).
We use it to make a server application send events to subscribed applications, only the server application is using the queue.
I run the program on Linux (Ubuntu and CentOS), I'm using StoreFileListener methods to delete no more used roll files.
In the onReleased() method:

  • I check if the the released roll file has a cycle older than the tailer one.
  • I check if the roll file is closed
  • If all is true I delete the file

The issue: some roll files are never released, I mean the method onReleased() is called for each roll file but some still in opened state no matter what.
The queue is continually used, and the disk space used by roll files increases gradually until the machine limit.
I've got the same result in a container, and when I use Eclipse Memory Analyser I see that the roll file file descriptor is hold by a CleaningRandomAccessFile:
image
image

Some roll files are not closed but some others are closed and released (so there are "holes" between unclosed roll files).
Previously, I deleted unclosed file, they disappeared from the directory, but with lsof +L1 I saw that these files were not released at system level (they were in deleted state but not released).
Now I check the state of roll files and not delete them if they're opened, maybe it is better to let Chronicle queue release these files, but I think the issue still fundamentally the same and some roll file stay in opened state.
For testing I'm using a MINUTELY roll cycle.

I understood that trying to close roll file in place of Chronicle Queue is counter productive.
I mean I don't do

store = queue.storeForCycle(previousCycle,0,false,null);
eventStore.closeStore(store);

Like descrived here

I don't know what I'm doing wrong, I'm a little desperate, maybe you'll see something or have a workaround ?

Here the details:

I created a queue like this:

String basePath = OS.getTarget() + "/chronicle_queues/notifications_from_" + id
SingleChronicleQueueBuilder singleChronicleQueueBuilder = SingleChronicleQueueBuilder
	.single(basePath)
	.rollCycle(configuration.getRollCycle())
	.storeFileListener(this);

I've got a thread who publlish on the queue like this:

public void doPublishOne(Event event, List headers) {
	try (final DocumentContext dc = queue.acquireAppender().writingDocument()) {
		dc.wire().write().object(event);
	} catch (UnrecoverableTimeoutException e) {
		LOG.error(String.format("Failed to write event %s to chronicle queue %s", event, queue), e);
	}
}

I've got a thread who initialy created a tailer like this:

tailer = queue.createTailer("id");

Then the thread infinite loop is:

AffinityLock affinityLock = AffinityLock.acquireLock();
// "AffinityLock: Thread[EventProcessor,5,main] alive=true, allocated: true, bound: true, cpuId: 7"
try {
	while (running) {
		try (DocumentContext dc = tailer.readingDocument()) {
			if (!dc.isPresent()) {
				pauser.pause();
			} else {
				ChronicleQueueEventNotificationBus.this.notify(deserialize((Event) dc.wire().read().object()));
				pauser.reset();
			}
		} catch (EventNotificationProcessingException e) {
			ChronicleQueueEventNotificationBus.this.fatalError(new EventNotificationBusException(e));
			stopProcessor();
		}
	}
} finally {
	if (affinityLock != null) {
		affinityLock.release();
	}
}

And here is StoreFileListener implemented methods:

@Override
public void onAcquired(int cycle, File file) {
	LOG.trace(String.format("onAcquired - fileToCycle.put %s -> %s", file, cycle));
	fileToCycle.put(file, cycle);
}

@Override
public void onReleased(int cycle, File file) {
	LOG.trace(String.format("onReleased - fileToCycle.put %s -> %s", file, cycle));
	List<File> removableRollFileCandidates = Stream.of(file.getParentFile().listFiles())
				.filter(FileUtil::hasQueueSuffix).sorted(Comparator.comparing(File::getName))
				.collect(Collectors.toList());
	// If file absent it has already been deleted
	if (removableRollFileCandidates.contains(file)) {
		fileToCycle.put(file, cycle);
	}
	boolean rollFileDeleted = false;
	int tailerCycle = processor.tailer.cycle();
	for (File f : removableRollFileCandidates) {
		Integer fCycle = fileToCycle.get(f);
		if (fCycle == null) {
			rollFileDeleted = deleteRollFile(f) || rollFileDeleted;
		} else if (fCycle >= tailerCycle) {
			LOG.info(
				String.format("%s will not be deleted because its cycle %s is greater or equals to the tailer one %s",
					f,
					fCycle,
					tailerCycle));
			break;
		} else {
			rollFileDeleted = deleteRollFile(f) || rollFileDeleted;
		}
	}
	if (rollFileDeleted) {
		queue.refreshDirectoryListing();
	}
}

private boolean deleteRollFile(File f) {
	boolean tryDeleteRollFile;
	boolean rollFileDeleted = false;
	if (!InternalFileUtil.getAllOpenFilesIsSupportedOnOS()) {
		// On Windows, the system will just refuse to delete an opened file
		tryDeleteRollFile = true;
	} else {
		FileState fileState = FileUtil.state(f);
		LOG.trace(String.format("Roll file %s state is %s", f, fileState));
		// On Unix, deleting an opened file will make the system mark it as "deleted" but will not release the resource, then chronicle queue will never release it
		tryDeleteRollFile = fileState == FileState.CLOSED;
	}
	if (!tryDeleteRollFile) {
		LOG.trace(String.format("Roll file %s is not closed, searching a younger roll file to delete...", f));
	} else if (f.delete()) {
		LOG.info(String.format("Succeeded to delete roll file %s", f));
		fileToCycle.remove(f);
		rollFileDeleted = true;
	} else {
		LOG.error(String.format("Failed to delete roll file %s", f));
		rollFileDeleted = false;
	}
	return rollFileDeleted;
}

Thanks.

We do the following:

  • Move away unused files (similar to what you posted)
  • Making sure all appenders and tailers are closed
  • Closing all current queue instances and re-constructing new ones
  • Call queue.refreshDirectoryListing() on the new queue instances
  • run System.gc()

That's the only way I found to do it reliably. We run on vertx which makes sure that there is no concurrent access the to instances.

Hi @andrm,

Thanks for your hints.

In my case I have one tailer use by one server application, it is used to store applications events before sending them to other subscribed applications.
So I cannot close the queue or the tailer, if I do so I would have to manage:

  • choosing when can I close the queue/tailer without disturbing the whole notification process
  • what to do with events created during the time interval where the queue/tailer is closed and recreated
    I really don't want to do that as it is obviously "anti pattern"/counter productive.

However you gave me a glimmer of hope with the "run System.gc()" hint: maybe the roll file is managed by Chronicle as a kind of weak reference, waiting for a GC to be released...
I will test that.

@remiville we are busy with customer work and can't triage/investigate this in the short term. If you would like us to prioritise it please review the support offerings at https://chronicle.software/openhft-support/

Hi @remiville,
calling System.gc() potentially also halts the application and increases latency for your events, no matter in which way they enter or leave the system. It's basically THE anti pattern :) Since you are using low latency facilities from Chronicle like AffinityLock and Pauser it might be very counterproductive.

The link leads to a password protected page @JerryShea Is there a public link to the support offerings?

System.gc() changed nothing, the issue was that I didn't close the ExcerptAppender, I replaced:

try (final DocumentContext dc = queue.acquireAppender().writingDocument())

by:

try (final ExcerptAppender excerptAppender = queue.acquireAppender(); final DocumentContext dc = excerptAppender.writingDocument())

Thanks.

Great! I'm glad you've solved it.

tgd commented

Great to hear this is resolved - we will close it out. Thank you @andrm for the help in the ticket!