vsilaev/tascalate-concurrent

Unexpected cancellation behavior

adrilo opened this issue · 6 comments

Hi!

I stumbled upon your library and it looks very promising! Thanks for the hard work you put into it.
While testing timeouts and cancellations, the behavior I am seeing is not what I would expect though. Here's an example:

public void test() {
     return CompletableTask
        .supplyAsync(() -> {
            try {
                Thread.sleep(10_000); // sleep 10s
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
                throw new CompletionException(e);
            }
            System.out.println("Interrupted? " + (Thread.currentThread().isInterrupted() ? "yes" : "no"));
            return "foo";
        }, Executors.newCachedThreadPool())
        .orTimeout(Duration.ofMillis(200), true)
        .join();
}

With such a code, I would expect the Thread.sleep() to be interrupted after 200ms but it does not seem to be the case ("Interrupted: no" is printed after 10s). Is that the expected behavior? Am I doing something wrong?

Also, I noticed that different executors will provide different results. Using newCachedThreadPool() will return a result after 200ms while using newSingleThreadExecutor() or Spring's ThreadPoolTaskExecutor will return but only after 10s. Again, is that expected?

Thanks!

Hello,

Yes, true interruption depends on the concrete executor used (mentioned in readme).

Smth. strange with the number of the available threads. For ex, Executors.newFixedThreadPool(2) will work as expected, but Executors.newFixedThreadPool(1) will not. Need to double-check why it happens, looks like there are should be at least 2 threads available.

Spring's ThreadPoolTaskExecutor is merely a wrapper, please try to use it with underlying pool of 2+ threads - it should work.

Issue confirmed -- will fix it shortly.

Fixed, see release 0.9.4

Thansk @vsilaev!
Just to confirm the expected behavior: with the code above, I'm expecting to have the main thread return in 0.2s (which works as long as there are more than 2 threads in the pool). But I'm also expecting to have the task cancelled (and therefore the Thread.sleep interrupted). So the test above should print "Interrupted" and throw.
Instead, it seems like the task is not properly cancelled, as "Interrupted? no" is printed after 10s.

I can confirm that setting cancelOnTimeout = true prevents the sequence from running in case a thenApplyAsync is added for instance.

Is the expectation that the running thread should be interrupted on timeout wrong?

It's interrupted now, your test is in repo now, and here is an output

Interrupted
Exception in thread "main" java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Timeout after PT0.2S
	at net.tascalate.concurrent.SharedFunctions.wrapCompletionException(SharedFunctions.java:46)
	at net.tascalate.concurrent.Promise.join(Promise.java:81)
	at net.tascalate.concurrent.TimeoutDifferentExecutors.main(TimeoutDifferentExecutors.java:23)
Caused by: java.util.concurrent.TimeoutException: Timeout after PT0.2S
	at net.tascalate.concurrent.Try.doneOrTimeout(Try.java:112)
	at net.tascalate.concurrent.Promise.lambda$orTimeout$6(Promise.java:168)
	at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
	at net.tascalate.concurrent.AbstractCompletableTask.lambda$thenApplyAsync$0(AbstractCompletableTask.java:150)
	at net.tascalate.concurrent.CallbackRegistry.lambda$callCallback$0(CallbackRegistry.java:282)
	at net.tascalate.concurrent.CompletableSubTask$DelegatingCallable.call(CompletableSubTask.java:46)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at net.tascalate.concurrent.CompletableSubTask.fireTransition(CompletableSubTask.java:66)
	at net.tascalate.concurrent.CallbackRegistry.lambda$callCallback$1(CallbackRegistry.java:284)
	at net.tascalate.concurrent.PromiseAdapter$1.execute(PromiseAdapter.java:47)
	at net.tascalate.concurrent.CallbackRegistry.callCallback(CallbackRegistry.java:284)
	at net.tascalate.concurrent.CallbackRegistry$IntermediateState.lambda$addCallbacks$0(CallbackRegistry.java:195)
	at net.tascalate.concurrent.CallbackRegistry$IntermediateState.callSuccessCallbacks(CallbackRegistry.java:211)
	at net.tascalate.concurrent.CallbackRegistry.success(CallbackRegistry.java:72)
	at net.tascalate.concurrent.AbstractCompletableTask.success(AbstractCompletableTask.java:130)
	at net.tascalate.concurrent.AbstractCompletableTask.lambda$doApplyToEitherAsync$9(AbstractCompletableTask.java:391)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2144)
	at net.tascalate.concurrent.CompletableFutureWrapper.onSuccess(CompletableFutureWrapper.java:41)
	at net.tascalate.concurrent.CompletableFutureWrapper.success(CompletableFutureWrapper.java:36)
	at net.tascalate.concurrent.Timeouts.lambda$delay$0(Timeouts.java:44)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:835)

Don't sure how you are adding thenApply, but it should be either:

        CompletableTask
           .supplyAsync(() -> {
              ...
           }, Executors.newFixedThreadPool(1))
           .orTimeout(Duration.ofMillis(200), true)
           .thenApply(Function.identity())
           .join();

OR

        CompletableTask
           .supplyAsync(() -> {
              ...
           }, Executors.newFixedThreadPool(1))
           .dependent()
           .thenApply(Function.identity(), true)
           .orTimeout(Duration.ofMillis(200), true)
           .join();

Notice the use of the dependent promise here: otherwise timeout will cancel only the promise returned from thenApply and will not interrupt the thread in supplyAsync

Ah thanks! I was missing the true param on the thenApply.