j-easy/easy-batch

Error threshold should be checked before processing records

fmbenhassine opened this issue · 2 comments

As of v5.3, the error threshold is checked after processing records. This is not correct as it will read/process an extra records after the threshold is exceeded. Here is a sample job to reproduce the issue:

import java.util.Arrays;

import org.jeasy.batch.core.job.Job;
import org.jeasy.batch.core.job.JobBuilder;
import org.jeasy.batch.core.job.JobExecutor;
import org.jeasy.batch.core.processor.RecordProcessor;
import org.jeasy.batch.core.reader.IterableRecordReader;
import org.jeasy.batch.core.record.Record;
import org.jeasy.batch.core.writer.StandardOutputRecordWriter;

public class Launcher {
    public static void main(String[] args) {
        Job job = new JobBuilder()
                .named("hello world job")
                .reader(new IterableRecordReader(Arrays.asList("foo", "bar", "baz")))
                .processor(new RecordProcessor() {
                    @Override
                    public Record processRecord(Record record) throws Exception {
                        throw new Exception("boom");
                    }
                })
                .writer(new StandardOutputRecordWriter())
                .errorThreshold(2)
                .build();
        JobExecutor jobExecutor = new JobExecutor();
        jobExecutor.execute(job);
        jobExecutor.shutdown();
    }
}

It prints:

[pool-1-thread-1] INFO org.jeasy.batch.core.job.BatchJob - Job 'hello world job' starting
[pool-1-thread-1] INFO org.jeasy.batch.core.job.BatchJob - Job 'hello world job' started
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Unable to process record Record: {header=[number=1, source="In-Memory Iterable", creationDate="Tue Jan 21 23:41:55 CET 2020"], payload=[foo]}
java.lang.Exception: boom
	at org.jeasy.batch.tutorials.basic.helloworld.Launcher$1.processRecord(Launcher.java:56)
	at org.jeasy.batch.core.processor.CompositeRecordProcessor.processRecord(CompositeRecordProcessor.java:61)
	at org.jeasy.batch.core.job.BatchJob.processRecord(BatchJob.java:219)
	at org.jeasy.batch.core.job.BatchJob.readAndProcessBatch(BatchJob.java:188)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:111)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:54)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Unable to process record Record: {header=[number=2, source="In-Memory Iterable", creationDate="Tue Jan 21 23:41:55 CET 2020"], payload=[bar]}
java.lang.Exception: boom
	at org.jeasy.batch.tutorials.basic.helloworld.Launcher$1.processRecord(Launcher.java:56)
	at org.jeasy.batch.core.processor.CompositeRecordProcessor.processRecord(CompositeRecordProcessor.java:61)
	at org.jeasy.batch.core.job.BatchJob.processRecord(BatchJob.java:219)
	at org.jeasy.batch.core.job.BatchJob.readAndProcessBatch(BatchJob.java:188)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:111)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:54)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Unable to process record Record: {header=[number=3, source="In-Memory Iterable", creationDate="Tue Jan 21 23:41:55 CET 2020"], payload=[baz]}
java.lang.Exception: boom
	at org.jeasy.batch.tutorials.basic.helloworld.Launcher$1.processRecord(Launcher.java:56)
	at org.jeasy.batch.core.processor.CompositeRecordProcessor.processRecord(CompositeRecordProcessor.java:61)
	at org.jeasy.batch.core.job.BatchJob.processRecord(BatchJob.java:219)
	at org.jeasy.batch.core.job.BatchJob.readAndProcessBatch(BatchJob.java:188)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:111)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:54)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Error threshold exceeded. Aborting execution
java.lang.Exception: boom
	at org.jeasy.batch.tutorials.basic.helloworld.Launcher$1.processRecord(Launcher.java:56)
	at org.jeasy.batch.core.processor.CompositeRecordProcessor.processRecord(CompositeRecordProcessor.java:61)
	at org.jeasy.batch.core.job.BatchJob.processRecord(BatchJob.java:219)
	at org.jeasy.batch.core.job.BatchJob.readAndProcessBatch(BatchJob.java:188)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:111)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:54)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[pool-1-thread-1] INFO org.jeasy.batch.core.job.BatchJob - Job 'hello world job' finished with status FAILED in 21ms

Record 3 is still read/processed even if errorThreshold has been set to 2.

Fixed in acaefe1. Here is the log of the same app after the fix:

[pool-1-thread-1] INFO org.jeasy.batch.core.job.BatchJob - Job 'hello world job' starting
[pool-1-thread-1] INFO org.jeasy.batch.core.job.BatchJob - Job 'hello world job' started
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Unable to process record Record: {header=[number=1, source="In-Memory Iterable", creationDate="Wed Jan 22 00:01:46 CET 2020"], payload=[foo]}
java.lang.Exception: boom
	at org.jeasy.batch.tutorials.basic.helloworld.Launcher$1.processRecord(Launcher.java:45)
	at org.jeasy.batch.core.processor.CompositeRecordProcessor.processRecord(CompositeRecordProcessor.java:61)
	at org.jeasy.batch.core.job.BatchJob.processRecord(BatchJob.java:222)
	at org.jeasy.batch.core.job.BatchJob.readAndProcessBatch(BatchJob.java:188)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:111)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:54)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Unable to process record Record: {header=[number=2, source="In-Memory Iterable", creationDate="Wed Jan 22 00:01:46 CET 2020"], payload=[bar]}
java.lang.Exception: boom
	at org.jeasy.batch.tutorials.basic.helloworld.Launcher$1.processRecord(Launcher.java:45)
	at org.jeasy.batch.core.processor.CompositeRecordProcessor.processRecord(CompositeRecordProcessor.java:61)
	at org.jeasy.batch.core.job.BatchJob.processRecord(BatchJob.java:222)
	at org.jeasy.batch.core.job.BatchJob.readAndProcessBatch(BatchJob.java:188)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:111)
	at org.jeasy.batch.core.job.BatchJob.call(BatchJob.java:54)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[pool-1-thread-1] ERROR org.jeasy.batch.core.job.BatchJob - Error threshold exceeded. Aborting execution
[pool-1-thread-1] INFO org.jeasy.batch.core.job.BatchJob - Job 'hello world job' finished with status FAILED in 20ms

Record 3 is not read/processed anymore when the error threshold is reached.

This is actually invalid. I should stop working after midnights..

errorThreshold=2 means we tolerate at most 2 errors. If a third one occurs, the job is aborted. So reading/processing the third record is necessary. This is documented in the wiki (See "error limit after which the job is aborted") and is also clear form the error message Error threshold exceeded: it is said "exceeded" and not "reached".