bluedenim/log4j-s3-search

Flushing out remaining events in the cache on shutdown

Closed this issue · 15 comments

Hi @bluedenim

Our app runs as a scheduled job. So, when the shutdown hook is triggered, I would like whatever are the remaining events in the cache to be published to s3 rather than depending on the cacheMonitor() rules at this stage (i.e. stagingBufferSize or stagingBufferAge). Since, the cacheMonitor() rules are checked before publishing the staging log on shutdown, the last few events of our log which are critical, are being missed to be sent to s3. Can you suggest what would be the best way to fix/handle this in the code?

Reference snippet of code: https://github.com/bluedenim/log4j-s3-search/blob/master/appender-log4j/src/main/java/com/van/logging/log4j/Log4jAppender.java#L320.L328

Hello,

As you've referenced, there is a shutdown hook that triggers the flushing and publishing operation (LoggingEventCache.flushAndPublish). Note that LoggingEventCache.flushAndPublish does not consult with cache monitors. In fact, the cache monitors call it when they want to actually publish. The only thing flushAndPublish checks is whether there are actually any items to publish.

However, I can see two possibilities why the logs may be missing:

  • The program was terminated by a SIGKILL or other unforgiving means, preventing the shutdown hook from even running. If your scheduled job ends by itself, then this is not the case.
  • The shutdown hook was called, but because the publish process relies on firing off a new thread to do the publishing, that new thread never gets a chance to publish before the program ends.

So going with the 2nd case, however, I think it is possible to tweak LoggingEventCache.flushAndPublish (and LoggingEventCache.publishCache) to add a parameter indicating whether to use a new thread (current behavior) or publish in the current thread (new behavior).

    @Override
    public Future<Boolean> flushAndPublish(bool publishUsingCurrentThread) {
        Future<Boolean> f = null;
        if (eventCount.get() > 0) {
            f = publishCache(cacheName, publishUsingCurrentThread);
        }
        return f;
    }

    @SuppressWarnings("unchecked")
    Future<Boolean> publishCache(final String name, final bool publishUsingCurrentThread) {
        if (false == publishUsingCurrentThread) {
            // current code ...
        } else {
            // new behavior: publish in current thread
        }

During shutdown, the current thread should be used, so only in the shutdown hook should we call LoggingEventCache.flushAndPublish(true). Everybody else will call LoggingEventCache.flushAndPublish(false). Or we add an overloaded variant to reduce the number of changes:

    public Future<Boolean> flushAndPublish() {
        return flushAndPublish(false);
    }

    public Future<Boolean> flushAndPublish(bool publishUsingCurrentThread) {

BTW, thanks for bringing this up!

#72 in testing to address this. If you can help test, that'll be great.

Thanks @bluedenim for the details & working on the fix. I tested #72 but it doesn't seem to be working. Below is a sample log snippet => I see the shutdown hook triggered but what's happening is the last temp file which gets created for uploading e.g. 'toBePublished523361507263822346.tmp' below doesn't get published and also the remaining log entries post that also aren't getting in to s3. Sometimes, I even see a 0B size file uploaded to s3.

[......]
Collecting content into /var/folders/n4/_qsjydm14472jlm3knm53r0cmj_z67/T/toBePublished523361507263822346.tmp before uploading.
[2020-08-02 11:45:11.933] [INFO ] [pool-20-thread-1] [XXXX] - custom.test.spark.etl.pipeline.job.step.duration 2405 1596348909 source=XXXX 
[2020-08-02 11:45:11.933] [INFO ] [pool-20-thread-1] [XXXX] - custom.test.spark.etl.pipeline.job.step.duration 0 1596348909 source=XXXX 
[2020-08-02 11:45:11.933] [INFO ] [pool-20-thread-1] [XXXX] - custom.test.spark.etl.pipeline.job.step.duration 0 1596348909 source=XXXX 

[2020-08-02 11:45:11.934] [INFO ] [ScalaTest-run] [XXXX] - custom.test.spark.etl.pipeline.job.duration 11458 1596348899 source=XXXX
[2020-08-02 11:45:11.934] [INFO ] [ScalaTest-run] [XXXX] [XXXX] - custom.test.spark.etl.pipeline.job.status 1 1596348899 source=XXXX status=success
Publishing staging log on shutdown...

Process finished with exit code 0

Interesting. Ideally I'd like to add more System.out.println() to pinpoint what got ran and what didn't run. However, perhaps it's easier if I can reproduce the problem you're having on my side.

Can you tell me a bit more about how your program ends? Does it process a batch of data and then just exit? It looks like something to do with Spark. Is it a Spark worker process? Another option that comes to mind is just a class method I can add to LoggingEventCache to flush and publish all caches. This class method can be called by your program just before it ends. But I am not clear on how your program works.

My bad! There was an issue with the updated dependency versions getting picked up in my pom.xml => it was still picking up the old jars. With the new changes you pushed, ran multiple tests. Now I see below NPE but good news, all the log entries are getting uploaded into s3.

Cannot publish with com.van.logging.aws.S3PublishHelper@44c3d0d8 due to error: Cannot collect event main 
com.van.example.Main:WARN:2020-08-02 22:53:53,806 WARN [main] com.van.example.Main This is a warning!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@44c3d0d8 due to error: Cannot collect event main com.van.example.Main:ERROR:2020-08-02 22:53:53,806 ERROR [main] com.van.example.Main And this is an error!!!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@44c3d0d8 due to error: Cannot collect event main com.van.example.Main:WARN:2020-08-02 22:53:53,803 WARN [main] com.van.example.Main This is a warning!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@44c3d0d8 due to error: Cannot collect event main com.van.example.Main:INFO:2020-08-02 22:53:53,806 INFO [main] com.van.example.Main Another round through the loop!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@44c3d0d8 due to error: Cannot collect event main com.van.example.Main:ERROR:2020-08-02 22:53:53,804 ERROR [main] com.van.example.Main And this is an error!!!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@44c3d0d8 due to error: Cannot collect event main com.van.example.Main:WARN:2020-08-02 22:53:53,806 WARN [main] com.van.example.Main This is a warning!
java.lang.NullPointerException
: null
java.lang.NullPointerException
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:143)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:186)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Similar issue when I run my Spark application too.

Hm. Thanks. Let me look into that NPE

I wasn't able to reproduce that NPE from the sample programs. Do you see the exception from your program?

The error, I guess is due to outputWriter being null. But that only happens after AbstractFilePublishHelper#end. Looking at the code I don't see how that's possible unless there are multiple threads publishing which should not happen, especially now that we're using the current thread.

A "patch" would be to not attempt to publish if outputWriter is null, but I'd like to know how we got to that state before blindly patching that.

I'm not able to reproduce this issue consistently. So, I tweaked the sample main program (appender-log4j-sample) to remove the loop and sleep. Added more logger statements (precisely 15), set stagingBufferSize=5 and when I run the program. I see the below 2 issues. Weirdly if I change the stagingBufferSize to a larger value, say 10, I don't see the issue. Am I missing something?

Using cache monitor: CapacityBasedBufferMonitor(cacheLimit: 5)
Registering AWS S3 publish helper -> S3 configuration (abhi-XXXX in region us-west-2; compressed: false)
2020-08-03 02:28:30,053 INFO [main] com.van.example.Main Hello from Main!
2020-08-03 02:28:30,061 INFO [main] com.van.example.Main Another round through the loop!
2020-08-03 02:28:30,061 WARN [main] com.van.example.Main This is a warning!
2020-08-03 02:28:30,061 ERROR [main] com.van.example.Main And this is an error!!!
2020-08-03 02:28:30,061 INFO [main] com.van.example.Main Another round through the loop!
Collecting content into /var/folders/n4/_qsjydm14472jlm3knm53r0cmj_z67/T/toBePublished6409093714052893299.tmp before uploading.
2020-08-03 02:28:30,068 WARN [main] com.van.example.Main This is a warning!
2020-08-03 02:28:30,068 ERROR [main] com.van.example.Main And this is an error!!!
2020-08-03 02:28:30,068 INFO [main] com.van.example.Main Another round through the loop!
2020-08-03 02:28:30,068 WARN [main] com.van.example.Main This is a warning!
2020-08-03 02:28:30,068 ERROR [main] com.van.example.Main And this is an error!!!
2020-08-03 02:28:30,069 INFO [main] com.van.example.Main Another round through the loop!
2020-08-03 02:28:30,070 WARN [main] com.van.example.Main This is a warning!
2020-08-03 02:28:30,070 ERROR [main] com.van.example.Main And this is an error!!!
2020-08-03 02:28:30,070 INFO [main] com.van.example.Main Another round through the loop!
Collecting content into /var/folders/n4/_qsjydm14472jlm3knm53r0cmj_z67/T/toBePublished4719331678301763579.tmp before uploading.
2020-08-03 02:28:30,070 WARN [main] com.van.example.Main This is a warning!
2020-08-03 02:28:30,071 ERROR [main] com.van.example.Main And this is an error!!!
Collecting content into /var/folders/n4/_qsjydm14472jlm3knm53r0cmj_z67/T/toBePublished1915447637470166756.tmp before uploading.
java.lang.NullPointerException
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:134)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:134)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:134)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:134)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
java.lang.NullPointerException
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:134)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Cannot publish with com.van.logging.aws.S3PublishHelper@3795932f due to error: Cannot collect event main com.van.example.Main:INFO:2020-08-03 02:28:30,069 INFO [main] com.van.example.Main Another round through the loop!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@3795932f due to error: Cannot collect event main com.van.example.Main:WARN:2020-08-03 02:28:30,070 WARN [main] com.van.example.Main This is a warning!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@3795932f due to error: Cannot collect event main com.van.example.Main:ERROR:2020-08-03 02:28:30,070 ERROR [main] com.van.example.Main And this is an error!!!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@3795932f due to error: Cannot collect event main com.van.example.Main:INFO:2020-08-03 02:28:30,070 INFO [main] com.van.example.Main Another round through the loop!
: null
Cannot publish with com.van.logging.aws.S3PublishHelper@3795932f due to error: Cannot collect event main com.van.example.Main:WARN:2020-08-03 02:28:30,070 WARN [main] com.van.example.Main This is a warning!
: null
Publishing staging log on shutdown...
Collecting content into /var/folders/n4/_qsjydm14472jlm3knm53r0cmj_z67/T/toBePublished4985106363162541792.tmp before uploading.
java.io.IOException: Stream closed
	at sun.nio.cs.StreamEncoder.ensureOpen(StreamEncoder.java:45)
	at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:118)
	at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
	at java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
	at java.io.Writer.write(Writer.java:157)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:45)
	at com.van.logging.AbstractFilePublishHelper.publish(AbstractFilePublishHelper.java:7)
Cannot publish with com.van.logging.aws.S3PublishHelper@8f72b77 due to error: Cannot collect event main com.van.example.Main:WARN:2020-08-03 02:30:05,886 WARN [main] com.van.example.Main This is a warning!
	at com.van.logging.BufferPublisher.publish(BufferPublisher.java:53)
: Stream closed
	at com.van.logging.LoggingEventCache.publishEventsFromFile(LoggingEventCache.java:134)
	at com.van.logging.LoggingEventCache.lambda$publishCache$0(LoggingEventCache.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

It does sound like race conditions now. Without the sleep and the buffer set to such a low number, I think what's happening is that the publishing is not keeping up so that multiple publish threads are being used (I recently changed the executor to use 3 threads from 1 originally).

https://github.com/bluedenim/log4j-s3-search/blob/master/appender-core/src/main/java/com/van/logging/LoggingEventCache.java#L30

Those 3 threads must be stepping on one another because of something I did to not synchronize them correctly. Changing it back to 1 might temporarily fix the errors, although with that low a buffer and no sleep, we may be stressing the publishing and eventually blocking the program.

Thanks for the info. I'll look into it some more.

Ah. Yes. I think I got it: BufferPublisher is not thread safe. The fix for now is setting the thread pool back to 1.

Thanks! Yes, reset the thread pool to 1 and tested, it's working fine.

OK. Merged. I'll do a new release later today. Thanks for your help!

Release 2.7.0 is out. It has all the latest changes: https://github.com/bluedenim/log4j-s3-search/releases/tag/v2.7.0

https://mvnrepository.com/search?q=therealvan

Thanks again for your help!