vsilaev/tascalate-concurrent

Normal execution after cancellation due to onSuccess(...) in StageTransition.set(...)

Closed this issue · 28 comments

Hi,

First of all, thank you for your code, it helped me really a lot as I needed in my GUI the ability to cancel the running underlying task. Moreover, your code enables me to extend this cancellation ability such that I could cancel the whole tree of underlying tasks which the "root" task depends on. This extension is currently in progress, and when finished I would like to share it with you. Would you be interested?

Anyway, I think I found an issue in the original code today. I wrote a simple test (OnSuccessTest) which runs two async tasks, the first one and then the second one. In the test I cancel the first task but the second one still can be executed which it should not as the first stage should complete exceptionally with CancellationException. For me the issue occurs when the first task is cancelled (from main thread) using AbstractCompletableTask.cancel(true) but the onError(...) in this method is DELAYed (I added some code before). In this case the StageTransition.set(...) method can be executed from FutureTask.run() during the DELAY on a thread pool thread and the first stage is completed successfully despite of its task was cancelled.
My solution is to test the completion status of the task in StageTransition.set(...) and call either onError(...) when the task was cancelled or onSuccess(...) otherwise.

test.zip

ajurcik,

I run your test and it's executed with green colors.
I modified it to add more explicit debug statements -- and it shows expected results.
s1 is canceled, s2 is not even started (see modified test attached).

Did you modify source code of the library itself? What version are you using (0.5.1 is a most recent bug-fix release)?

Could you please attach a test case that fails with assertions? Also, if you altered source code - could you attach a patch with your changes?

test.zip

Hi Valery,

Yes, I did changes in the library. Please, try the prepared patch which adds a delay to the AbstractCompletableTask.cancel(...) method. The patch was made on your lastest 0.5.1 release.

The test should now fail. At least it fails for me. If it won't fail, please, try to make the delay longer.

Cheers, Adam

tascalate-concurrent-cancel-failure.patch.zip

Moreover, your code enables me to extend this cancellation ability such that I could cancel the whole tree of underlying tasks which the "root" task depends on. This extension is currently in progress, and when finished I would like to share it with you. Would you be interested?

Could you elaborate more with what you try to get as final result? Preferably, with some (pseudo-) code?
Because functionality provided with class Promises (see methods all, any, atLeast) already provide combinators with such cancellation capability...

Yeah. I will try to describe it.

Promise<T1> p1 = CompletableTask.complete(...)
    .thenApplyAsync(... Task 1 ...);
Promise<T2> p2 = p1.thenApplyAsync(... Task 2 ...);

return p???

Now I would like be able to return one Promise which would cancel (or prevent starting) both Task 1 and Task 2. If I return p2, I will not be able to cancel p1 if it will be still running. On the other hand when returning p1, I will not be able to cancel p2 after p1 completes.

Looking into Promises once again it seems that my goal could be achieved using it with caveats. I could create the all promise pMulti. The issue is here that I would get a list of results which I see inconvenient when creating and API which should expose only, e.g., last result. First, if I would wrap pMulti promise into a new promise pLast then again I would not be able to cancel pMulti by cancelling pLast. This could be solved by providing a new specialized Promise implementation. Second, all my promises which I would like to be to cancel their tasks would need to share a common super type among their results.
Did a make a mistake in my reasoning somewhere?

My implementation is almost complete. I will share a patch with you and you can decide whether it makes sense or not.

Now I would like be able to return one Promise which would cancel (or prevent starting) both Task 1 and Task 2. If I return p2, I will not be able to cancel p1 if it will be still running. On the other hand when returning p1, I will not be able to cancel p2 after p1 completes.

If you cancel p1 in your example then p2 will be completed with CompletionException (with CancellationException as a cause) as well. And the corresponding thread will be interrupted!
Please add the following to your test and see:

        p2.whenComplete((r, e) -> {
           if (null != e) {
               System.out.println("P2 Exception!");
               e.printStackTrace(System.out);
           } else {
               System.out.println("P2 Result: " + r);
           }
        });

So I see no need for extra behavior besides provided (and specified in CompletionStage API)

Yeah, but this does not test the case when p1 already completed and p2 started. In this case I want to be able to cancel also p2 but p1 already completed normally so it would not make sense to change its completion status to exceptional completion.
This is the right test to see the behaviour:

@Test
public void testChainCancellation() {
    State s1 = new State();
    State s2 = new State();
        
    Promise<?> p1 = CompletableTask.asyncOn(executor)
            .thenRunAsync(() -> longTask(5, s1));
        
    // This stage should not be executed, previous stage will be cancelled
    Promise<?> p2 = p1.thenRunAsync(() -> longTask(5, s2));
        
    p2.whenComplete((r, e) -> {
        if (null != e) {
            System.out.println("P2 Exception!");
            e.printStackTrace(System.out);
        } else {
            System.out.println("P2 Result: " + r);
        }
    });
        
    // Let the Task 1 complete normally
    trySleep(8);
    p1.cancel(true);
    trySleep(1);
        
    assertTrue("Expected s1 done, but is " + s1, s1.isDone());
    // This assertion also fails, uncommment to see
//    assertTrue("Expected s2 cancelled, but is " + s2, s2.isCancelled());
        
    // Wait Task 2 to complete as it was NOT cancelled
    trySleep(2);
        
    assertTrue("Expected s2 cancelled, but is " + s2, s2.isCancelled());
}

Here tascalate-concurrent-recursive-cancel.patch.zip is the patch for 0.5.2-SNAPSHOT which adds support for recursive cancellation. I added also a file with tests so that it can be directly taken for a test drive :) The patch already contains the previously discussed bug fix.
You can freely modify the code or decide not to use/merge it in any way.

Cheers, Adam

Adam,

I see your point. However, this somewhat contradicts with CompletionStage API contract -- completion stage may be completed only once, either successfully, or with exception, or cancelled. With your suggestion it's somewhat non-intuitive, especially when there is a race between completion and cancellation.

For now I've created a separate branch for your patch (with the patch applied). However, I would like to avoid mixing your changes with the code that adhere to CompletionStage contract. What I'm suggesting:

  1. Work with the new branch for now.
  2. Refactor AbstractCompletableTask to allow 2 extension hierachies -- the one for previously existed code, and the second for your extension. Please try to keep changes in AbstractCompletableTask to minimum.
  3. Probably it's better to have separate method, cancelRecursive or cancelWithDependencies -- to have a separate execution path for this behavior.

At the end, I'll either include both class hierarchies into lib, or you will end up with good enough superclass(es) to extend the lib and add necessary behavior without patching lib sources.

Thank you for your interest & support,
Valery

Hi Valery,

what is the best way to do what you suggested? Should I fork your repository and create pull request for the new branch? Or should I commit to the new branch directly? I suppose, I should choose the first.

Reopening issue, was closed accidentally.
Transitive cancellation is indeed both practical and challenging topic that should be researched.

Yes, the first

I reviewed your code. There are a lot of questions:

  1. doThenComposeAsync -- tempStage is a predecessor of the nextStage, why it's not listed in dependencies?
  2. A lot of methods, like thenApply or exceptionally don't enlist dependencies at all.
  3. In cancel, if you put onError(new CancellationException()); before cancelling dependencies you probably not need dependenciesMightHaveBeenInterrupted at all.
  4. Why thenCombineAsync has whenComplete handler? Once nextStage is complete you can't cancel it anyway.

Also, I'm a bit skeptic about thenCombineAsync, applyToEitherAsync etc methods, that takes CompletionStage as an argument. This dependency is not owned exclusively by the newly created Promise. So cancelling them might be dangerous. Actually, no one guarantees that any Promise here is an exclusive dependency, there are always chances that it's shared. So cancelling it will lead to unpredictable behavior, it would be hard to reason about the program behavior.

Regarding your example.

Promise<?> p1 = ...;
Promise<?> p2 = ...;
Promise<List<?>> cancelationToken = Promises.all(p1, p2);
...
cancelationToken.cancel(true);

This way you can cancel all required promises at once if they are not completed yet. Without any changes to current library code.

Thoughts?

Ad 1. Yeah, it might be more correct if tempStage is also included into dependencies. For my usage, I expected the function fn, which provides the next CompletionStage, to execute quickly.

Ad 2. Not directly, but look into internalCreateCompletionStage(...) implementations. The current completion stage is added as a dependency in this factory method.

Ad 3. If I put onError(...) before cancelling then the tests failed for me. Yet, I don't know exactly why. I will look into it more.

Ad 4. This is for the case that the current stage is cancelled. Then I wanted to propagate the cancellation also to provided (combined) completion stage.

I see why are you sceptic. The dependencies might be shared, and in a complex scenario they probably will be. Thus this recursive (or transitive) cancellation would in such cases introduce unexpected behaviour. What about counting dependents? They are known through CallbackRegistry.

Yes, in the way you shown I am able to cancel all the required promises. The problem arises when I want to provide this "all cancelling" promise as a result of an API method. It is incovenient to provide all results (including intermediate) instead of the producet of the whole task chain.

Hi Valery.

I thought about 3. I can call onError(...) before cancelling but I still need to set the mightHaveBeenInterrupted flag even before onError(...). The reason is that when onError(...) is called and there are succeeding stages executed synchronously (such as whenComplete) they are reached recursively from onError(...). This is correct and it is not a problem regarding thread interruption at all. However, it is a problem of my forward-cancelling thenCombineAsync implementation which in this case does not know whether its predecessor could have been interrupted or not. Maybe, I am able to implement it in another way, e.g., I could extend CancellationException and propagate the flag value using it. However, it seems to me that it would be less intuitive to handle the flag in this manner.

Regarding the shared dependencies. Maybe this might be solved by providing new methods that have a new argument (after Executor) which lets the programmer choose whether he wants to interrupt a possibly shared promise.

prev.thenCombineAsync(CompletionStage<?> other, Executor e, boolean mayCancelOther);

This might be possible by extending Promise into a new interface (let's say Operation) which could also contain a new method cancelRecursive or just cancel with a new parameter, e.g., mayCancelRecursive. In this manner a new extension hierarchy would be created. Is this what you meant?

Adam,

I reviewed thenComposeAsync and understand that I need cancellableOrigins anyway -- at least, for non-exposed helper stages. Please review my code in master branch -- now in thenComposeAsync first tempStage is registered as cancellable origin (so fn.apply() may be interrupted), then result of fn.apply() itself is enlisted.

Regarding your implementation of thenCombineAsync -- I don't understand here why whenComplete is necessary for you. Ain't it simpler to just add other parameter as a dependency to nextStage ?

Still thinking on extra-methods with explicit parameter to mark cancellable origin...

Adam, please take a look at the following prototype code:
https://github.com/vsilaev/tascalate-concurrent/blob/master/src/main/java/net/tascalate/concurrent/RecursiveCancellationPromise.java

The idea is:

Promise<?> p1 = CompletableTask
     .asyncOn(executor)
     .thenRunAsync(() -> longTask(5, s1));

Promise<?> p2 = RecursiveCancellationPromise.from(p1).thenRunAsync(() -> longTask(5, s2));
RecursiveCancellationPromise.class.cast(p2).enlistCancellableOrigin(someOtherPromiseLikeCombineOrEitherParam):

p2.cancel(true);
// This will cancel p1 and its task, p2 and its task as well as everything explicitly enlisted

Thoughts?

Valery, I did not much today. Probably the only thing that I will have finished today is a test which shows a race in the new implementation of thenComposeAsync. I copy here the comment which I added also to the specific lines into AbstractCompletableTask:

                // TEST: There is a race when fn.apply(r) is completed
                // normally and nextStage is cancelled before returned is set
                // as its nextStage's cancellableOrigins. In this case,
                // execution of returned continues as nextStage cannot be
                // cancelled for a second time. However, completion stages after
                // nextStage are completed exceptionally (correctly) since when
                // moveToNextStage is executed nextStage is already completed
                // (cancelled) from cancel(...) -> onError(...). In order to
                // cancel returned here, I think you need to know whether
                // nextStage might have been interrupted.

I hit this issue before during my implementation and addressed it using the synchronization and possible cancelling using the remembered mayInterrupt... flag.
Here is a test (for master) which fails for me. thenComposeAsync-race-test.zip

Now, I will think about thenCombineAsync more.

Adam,

Thanks, fixed.
Checking nextStage.isCancelled() after resetting cancellableOrigins should work without synchronization, so no extra flags required.

Sleep in AbstractCompletableTask is commented out, so to test uncomment locally

Regards,
Valery

Looks good. No other sync needed since cancelling can only occur before or after reseting cancellable origins, but not in the same time. Sync is implicit due to sync in these methods.

Thanks!

Please look at RecursiveCancellationPromise
This is still work in progress. But! If we can go with such wrapper, with additionally overwritten *Either* and *Both* named methods to enlist CompletionStage parameter -- then I will definitely keep it in library for next release.

Valery

Some thoughts on thenCombineAsync.

Yes, you can add other into cancellableOrigins of nextStage, but they will be also manipulated (reset) in thenComposeAsync. In this manner, reseting could remove other from origins of nextStage. This could be handled by adding. However, it seemed to me quite elegant to implement thenCombineAsync using simpler operations such as internal whenComplete.
What do you think?

Anyway, I also thought about implementing thenComposeAsync using this approach. This could avoid solving sync issues and could make the code more easy to read and also to extend. I worked today on the second extension hierarchy and become stuck on thenComposeAsync since I extracted the new fields (origins, lock) from AbstractCompletableTask to new classes. But these fields were used directly in thenComposeAsync before.

I looked at RecursiveCancellationPromise. Do you plan to support fluent style of programming?
I mean this:

RecursiveCancellationTask.supplyAsync(...)
    .thenApplyAsync(...)
    .thenAcceptBothAsync(p, true); // or something like thenAcceptBothCancellableAsync(p)

This seems to me much more easy to work with than keeping explicit references to your cancellable promises. Also, it might be less error prone, e.g., when you forgot to enlist a task.

Adam,

Yes, I've added missing methods, class is renamed to DependentPromise

DependentPromise<?> p1 = DependentPromise.from(someRegularStage);
DependentPromise<?> p2 = p1.thenApplyAsync(fn, true); 
// same as p1.thenApplyAsync(fn);
// "this" is enlisted by default;
DependentPromise<?> p3 = p2.runAfterEitherAsync(otherStage, codeBlock);
// same as p2.runAfterEitherAsync(otherStage, codeBlock, PromiseOrigin.THIS_ONLY);
// "this" and only "this" is enlisted by default;
DependentPromise<?> p4 = p2.acceptEither(otherStage, someConsumer, PromiseOrigin.ALL);
// above both "this" and "otherStage" are enlisted
// otherStage will be cancelled if p4 is cancelled.

Please provide your feedback, probably we need nothing else in AbstractCompletableTask.
Plus, this wrapper may wrap anything besides AbstractCompletableTask, incl. standard CompletableFuture/

Regards,
Valery

Hi Valery, this seems the right direction to go.

Today, I don't have much time so I only modified one test and fixed a small mistake in DependentPromise. The first test passes and I expect also the others to pass.
I will be on a vacation during the weekend and will be back on Monday. I suggest that I modify all the other tests so that we can be sure about DependentPromise and I will suggest a DependentTask that would make the creation of the intial DependentPromise more covenient (see CompletableTask.runAsync(...)).

DependentPromise-bugfix-test.zip

What about calling cancelOrigins(mayInterruptIfRunning = true) in doWrap? Is this ok if you do not known how the dependent promise was cancelled? For my usage, posibility of interruption is needed.

Last small suggestion. What about including a setting similar to defaultExecutor for a DependentPromise, defaultEnlistOrigins?

Today, I don't have much time so I only modified one test and fixed a small mistake in DependentPromise. The first test passes and I expect also the others to pass.

Thank you! The fix is applied.

and I will suggest a DependentTask that would make the creation of the intial DependentPromise more covenient (see CompletableTask.runAsync(...)).

Well, I don't see it should be included in the library itself, any library user can do this for itself on per-case basis (it's just two lib calls, one wrapped into another)

What about calling cancelOrigins(mayInterruptIfRunning = true) in doWrap? Is this ok if you do not known how the dependent promise was cancelled? For my usage, posibility of interruption is needed.

Please elaborate more. It's exactly cancellation of origins with interruption...

Last small suggestion. What about including a setting similar to defaultExecutor for a DependentPromise, defaultEnlistOrigins?

Thought about this. I'd prefer to keep things explicit while abuse of this feature is quite dangerous. Even adding direct predecessor (as currently implemented) is somewhat dangerous.

What about calling cancelOrigins(mayInterruptIfRunning = true) in doWrap? Is this ok if you do not known how the dependent promise was cancelled? For my usage, posibility of interruption is needed.

Please elaborate more. It's exactly cancellation of origins with interruption...

I thought about a situation when a user wants to cancel a chain of tasks. Let's say we have four thenRunAsync tasks (T1-4) from which T1 is done, T2 is running, and T3, T4 are waiting. Currently, you can cancel T3 and T4, and interrupt T2. But what if there is a case that you want to prevent T3 and T4 from running, but you want to let T2 to finish?
I don't see that the second scenario is possible now. Or am I missing something?

EDIT: Sorry, the patch was incomplete.
I modified my tests, so that they successfully test DepedentPromise now. Please, see my patch.
tascalate-concurrent-recursive-tests.patch.zip

Also I suggested removing the to Promise cast from DependentPromise.doWrap method. I instead made the type of original parameters of doWrap and wrap stronger regarding type checking. Anyway, you are only able to cancel a task if you know it is a promise, i.e., it implements also Future.

I thought about a situation when a user wants to cancel a chain of tasks. Let's say we have four thenRunAsync tasks (T1-4) from which T1 is done, T2 is running, and T3, T4 are waiting. Currently, you can cancel T3 and T4, and interrupt T2. But what if there is a case that you want to prevent T3 and T4 from running, but you want to let T2 to finish?
I don't see that the second scenario is possible now. Or am I missing something?

DependentPromise T1 = DependentPromise.from( CompletableTask.supplyAsync(supplier1, executor) );
DependentPromise T2 = T1.thenRunAsync(run2);
DependentPromise T3 = T2.thenRunAsync(run3, false);
DependentPromise T3 = T3.thenRunAsync(run4, false);
...
T3.cancel(true);
T4.cancel(true);

Ok, but it is not possible to cancel T1 and T2 by using, .e.g, T4 now.

Maybe it is too general scenario which might not occur at all. At least for me it won't. I think you can close the issue now.

Adam,

Big thanks for all the work you did for this issue, and especially for tests.
I'm about to release new functionality -- I decided that all methods inherited from CompletionStage API should not add anything as cancelable origin. Only overloaded method calls should be used, with explicit arguments. I prefer the path of least surprise for the library user: DependentPromise is a Promise, and if it is passed to code that expect Promise (with no cancellation of anything besides the promise itself) it should work like this.

Tests are updated accordingly to use overloaded methods' variants with explicit arguments.

Thank you,
Valery