vsilaev/tascalate-concurrent

orTimeout() breaks cancellation abilities

chrisly42 opened this issue · 11 comments

Hi Valery,

this simple test case breaks here with the latest release:


    @Test
    public void test_orTimeout_does_not_break_cancellation_ability() {
        State s1 = new State();
        Promise<Void> p = CompletableTask.runAsync(() -> longTask(5, s1), executor)
                .orTimeout(2, TimeUnit.SECONDS);
        trySleep(2);
        p.cancel(true);
        trySleep(1);
        assertCancelled("s1", s1);
    }

Took me a couple of hours to figure out that orTimeout() causes it. Other than that, I think that your library is great to overcome the cumbersomeness of the original implementation. Thanks!

PS: I guess increasing the overall unit test coverage would also help your development as the stuff is pretty complex and multithreading done right is very hard.

chrisly42,

Thank you for the kind words about the library!
I understand that tests coverage is beyond acceptable levels, but it's my 3-d Open Source project that I'm actively maintaining in my free time... )) So if you can contribute some tests in the form of patch it would be excellent.

Regarding the issue itself. Actually, it's not a bug, it's expected behavior. And previously it worked accidentally due to a bug while I used invalid implicit arguments during delegation. Probably it's a design issue, but it's how it's done:

  1. orTimeout / onTimeout has the same meaning as completionStage.thenApply / thenRun / thenAccept. Probably it sounds controversial. But I'd like to keep the same semantic for all composition methods - and I'm treating orTimeout as a regular composition method (btw, corresponding methods in Java 9 CompletableFuture treat timeouts differently)
  2. According to [1] the following is expected:
Promise<String> pMain = CompletableTask.supplyAsync(someProducer, executor);
Promise<String> pDerrived = pMain.thenApplyAsync(someConverter);
...
pDerrived.cancel(true);

Above pDerrived is cancelled, but pMain is not!
3. As long as we treat thenApply and orTimeout as semantically same the following is also expected:

Promise<String> pMain = CompletableTask.supplyAsync(someProducer, executor);
Promise<String> pTimeRestricted = pMain.orTimeout(10, TimeUnit.SECONDS);
...
pTimeRestricted.cancel(true);

Above pTimeRestricted is cancelled, but pMain is not!
4. The library provides a way to cancel a whole chain. I'm talking about Promise.dependent(). So the following code will work for you:

@Test
public void test_orTimeout_does_not_break_cancellation_ability() {
    State s1 = new State();
    Promise<Void> p = CompletableTask
        .runAsync(() -> longTask(5, s1), executor)
        .dependent()
        .orTimeout(2, TimeUnit.SECONDS, true, true);
    trySleep(2);
    p.cancel(true);
    trySleep(1);
    assertCancelled("s1", s1);
}

I hope this explains the behavior you are observing and we can close the issue.

Regards,
Valery

Btw, here is a corresponding block from the Promise interface:

    default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
        Promise<T> timeout = Timeouts.failAfter(duration);
        // Use *async to execute on default "this" executor
        Promise<T> result = this
            .dependent()
            .applyToEitherAsync(timeout, Function.identity(), PromiseOrigin.PARAM_ONLY)
            ;
        
        result.whenComplete(Timeouts.timeoutsCleanup(this, timeout, cancelOnTimeout));
        return result.raw();
    }

See the argument PromiseOrigin.PARAM_ONLY -- if it would be PromiseOrigin.ALL then cancellation will affect the original Promise as well -- but this option is left for DependentPromise only.

Thanks for the clarification and the quick reply. Your work is greatly appreciated.

I tried dependent() and thought that cancelOnTimeout = true (default) would be enough, but i was not aware of enlistOrigin. Thanks for the long explaination! Maybe the docs could also be more elaborative on that (maybe near the cancel-features, because this is one of the main reasons I am using it)?

I've only been playing with your library a couple of days now and I am wondering if you could add another feature, that might come in handy. As far as I know there is no way to join the thread/promise after it was canceled (as .get() and .join() of course always return that the promise was canceled). I wrote a wrapper around the Promise with CountdownLatch (for termination) and AtomicBoolean (for starting) to be able to (almost) await() the termination of the code inside the promise, but maybe you know a better way?

Cheers, Chris

As far as I know there is no way to join the thread/promise after it was canceled (as .get() and .join() of course always return that the promise was canceled). I wrote a wrapper around the Promise with CountdownLatch (for termination) and AtomicBoolean (for starting) to be able to (almost) await() the termination of the code inside the promise, but maybe you know a better way?

Could you elaborate more? After Promise is cancelled you can get / join it, though you must handle an exception. CompletionStage.exceptionally provides an option to convert an exception to some result... So it's a bit unclear what option you are missing...

Usually, .get() and .join() are methods to synchronize the promise and wait for it complete. If it is canceled, .get() and .join() will immediately return, regardless if the backing thread is still running (the thread might have been interrupted, but it might also take some time for it to exit gracefully (or via an exception)).

I would like to still be able to wait for the promise to return and make sure the thread cannot mess up things in the background. I'm not sure the state machine would allow the cancelation to be reset to use .get() or .join() again, especially, if the promise didn't start executing yet.

Currently, I wrap the execution of the payload supplier in this way (simplified, leaving out other stuff regarding whether it started executing or not):

    private CountDownLatch promiseDone = new CountDownLatch(1);

    public void joinThread() throws InterruptedException {
         promiseDone.await();
    }

    private T wrapPromise(Supplier<T> supplier) {
        try {
            return supplier.get();
        } finally {
            promiseDone.countDown();
        }
    }

Are my intentions a bit clearer now?

Regards,
Chris

I'm not sure the state machine would allow the cancelation to be reset to use .get() or .join() again, especially, if the promise didn't start executing yet.

As with CompletableFuture and others alternative implementations, CompletionStage may be completed once and only once -- either with result, or with exception, or via cancellation. All other attempts are just ignored.

Even if promise is not started, like it depends on preceding promises, it may be cancelled as well. No problems here.

So CompletableTask.supplyAsync(supplier, executor) should ultimately provide everything you need. You may safely join() the returned promise to get either a result, or a CompletionException or CancellationException. The later case is possible if you cancel the returned promise before it is completed.

Let me rephrase the problem. I think I got the right words now.

I want to leverage the features of the executor of .isTerminated() and .awaitTermination() onto a Promise without starting each promise in a separate executor that can be awaited regardless of cancellation or not.

I made a test case for you with a slightly changed implementation of long task and state. Notice that all other test cases in the class are still green with this change. The .get() returns immediately, the thread with its is still running.

    @Test
    public void test_get_after_cancellation_waits_for_termination() {
        State s1 = new State();
        Promise<Void> p = CompletableTask.runAsync(() -> longTask(5, s1), executor);
        trySleep(2);
        p.cancel(true);
        trySleep(1);
        assertCancelled("s1", s1);
        try {
            p.get();
        } catch (InterruptedException | ExecutionException | CancellationException e) {
            //
        }
        assertDone("s1", s1);
    }

    private void longTask(int units, State state) {
        state.start();
        try {
            Thread.sleep(units * UNIT);
        } catch (InterruptedException ex) {
            state.cancel();
            try {
                Thread.sleep(units * UNIT); // or do some other longer running cleanup
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            state.cancelDone();
            return;
        }
        state.finish();
    }

    static class State {

        static final int NEW = 0;
        static final int STARTED = 1;
        static final int DONE = 2;
        static final int CANCELLED = 3;
        static final int CANCELLED_DONE = 4;

        AtomicInteger s = new AtomicInteger(NEW);

        void start() {
            s.set(STARTED);
        }

        void finish() {
            s.set(DONE);
        }

        void cancel() {
            s.set(CANCELLED);
        }

        void cancelDone() {
            s.set(CANCELLED_DONE);
        }

        boolean wasStarted() {
            return s.get() >= STARTED;
        }

        boolean isDone() {
            return s.get() == DONE || s.get() == CANCELLED_DONE;
        }

        boolean isCancelled() {
            return s.get() == CANCELLED || s.get() == CANCELLED_DONE;
        }

        @Override
        public String toString() {
            switch (s.get()) {
                case NEW:
                    return "new";
                case STARTED:
                    return "started";
                case DONE:
                    return "done";
                case CANCELLED:
                    return "cancelled";
                case CANCELLED_DONE:
                    return "cancelled-done";
                default:
                    return String.format("unknown (%d)", s.get());
            }
        }
    }

Replacing the p.get() with executor.awaitTermination(1, TimeUnit.SECONDS); would make the test green. But sure, I can only use this for a single promise per executor.

I don't think still having slight control over the promise after cancellation would be such a bad thing. On cancellation, the promise is pretty much "don't care", even if it does have side effects that you would want to avoid by making sure it is "gone". There is no means to obtain either result or exception of the promise returning after cancellation.

Do you see the rationale here? While you successfully expanded the "short running computational task" design of CompletableFutures to "long running tasks", the control over these long running tasks on cancellation is still lacking.

Hmmm... you might be right I could be doing this by nesting two promises. Using a chain does not work because there is no .whenCancelled() and all downstream stuff of the cancelled promise will of course not be executed.

See, cancellation implies thread interruption, and it's somewhat cooperative activity between outer code that calls Promise.cancel and thread runnable block that reacts to interruption request. You are using long-running blocking calls inside InterruptedException handler of the thread body -- and it will never receive another request for interruption - while promise is already resolved as cancelled.

And yes, you need a pair of coordinated promises -- one for primary execution path, the second one for long-running cleanup after interruption

Btw, it would be better (both safer and simpler) if you return State from Supplier, i.e. uses Promise -- this way you will solve all the shortcomings of the coordination of the mutable state inside/outside supplier.

I'll give up. My wrapper workaround works for my use cases, no need for you to change the limited design of your library. But you do realise that the State class is from your own unit testing code and that this was only an example for proving my case?