Normal execution after cancellation due to onSuccess(...) in StageTransition.set(...)
Closed this issue · 28 comments
Hi,
First of all, thank you for your code, it helped me really a lot as I needed in my GUI the ability to cancel the running underlying task. Moreover, your code enables me to extend this cancellation ability such that I could cancel the whole tree of underlying tasks which the "root" task depends on. This extension is currently in progress, and when finished I would like to share it with you. Would you be interested?
Anyway, I think I found an issue in the original code today. I wrote a simple test (OnSuccessTest
) which runs two async tasks, the first one and then the second one. In the test I cancel the first task but the second one still can be executed which it should not as the first stage should complete exceptionally with CancellationException
. For me the issue occurs when the first task is cancelled (from main thread) using AbstractCompletableTask.cancel(true)
but the onError(...)
in this method is DELAYed (I added some code before). In this case the StageTransition.set(...)
method can be executed from FutureTask.run()
during the DELAY on a thread pool thread and the first stage is completed successfully despite of its task was cancelled.
My solution is to test the completion status of the task in StageTransition.set(...)
and call either onError(...)
when the task was cancelled or onSuccess(...)
otherwise.
ajurcik,
I run your test and it's executed with green colors.
I modified it to add more explicit debug statements -- and it shows expected results.
s1 is canceled, s2 is not even started (see modified test attached).
Did you modify source code of the library itself? What version are you using (0.5.1 is a most recent bug-fix release)?
Could you please attach a test case that fails with assertions? Also, if you altered source code - could you attach a patch with your changes?
Hi Valery,
Yes, I did changes in the library. Please, try the prepared patch which adds a delay to the AbstractCompletableTask.cancel(...)
method. The patch was made on your lastest 0.5.1 release.
The test should now fail. At least it fails for me. If it won't fail, please, try to make the delay longer.
Cheers, Adam
Moreover, your code enables me to extend this cancellation ability such that I could cancel the whole tree of underlying tasks which the "root" task depends on. This extension is currently in progress, and when finished I would like to share it with you. Would you be interested?
Could you elaborate more with what you try to get as final result? Preferably, with some (pseudo-) code?
Because functionality provided with class Promises
(see methods all
, any
, atLeast
) already provide combinators with such cancellation capability...
Yeah. I will try to describe it.
Promise<T1> p1 = CompletableTask.complete(...)
.thenApplyAsync(... Task 1 ...);
Promise<T2> p2 = p1.thenApplyAsync(... Task 2 ...);
return p???
Now I would like be able to return one Promise
which would cancel (or prevent starting) both Task 1 and Task 2. If I return p2
, I will not be able to cancel p1 if it will be still running. On the other hand when returning p1
, I will not be able to cancel p2
after p1
completes.
Looking into Promises
once again it seems that my goal could be achieved using it with caveats. I could create the all
promise pMulti
. The issue is here that I would get a list of results which I see inconvenient when creating and API which should expose only, e.g., last result. First, if I would wrap pMulti
promise into a new promise pLast
then again I would not be able to cancel pMulti
by cancelling pLast
. This could be solved by providing a new specialized Promise
implementation. Second, all my promises which I would like to be to cancel their tasks would need to share a common super type among their results.
Did a make a mistake in my reasoning somewhere?
My implementation is almost complete. I will share a patch with you and you can decide whether it makes sense or not.
Now I would like be able to return one Promise which would cancel (or prevent starting) both Task 1 and Task 2. If I return p2, I will not be able to cancel p1 if it will be still running. On the other hand when returning p1, I will not be able to cancel p2 after p1 completes.
If you cancel p1
in your example then p2
will be completed with CompletionException (with CancellationException as a cause) as well. And the corresponding thread will be interrupted!
Please add the following to your test and see:
p2.whenComplete((r, e) -> {
if (null != e) {
System.out.println("P2 Exception!");
e.printStackTrace(System.out);
} else {
System.out.println("P2 Result: " + r);
}
});
So I see no need for extra behavior besides provided (and specified in CompletionStage API)
Yeah, but this does not test the case when p1 already completed and p2 started. In this case I want to be able to cancel also p2 but p1 already completed normally so it would not make sense to change its completion status to exceptional completion.
This is the right test to see the behaviour:
@Test
public void testChainCancellation() {
State s1 = new State();
State s2 = new State();
Promise<?> p1 = CompletableTask.asyncOn(executor)
.thenRunAsync(() -> longTask(5, s1));
// This stage should not be executed, previous stage will be cancelled
Promise<?> p2 = p1.thenRunAsync(() -> longTask(5, s2));
p2.whenComplete((r, e) -> {
if (null != e) {
System.out.println("P2 Exception!");
e.printStackTrace(System.out);
} else {
System.out.println("P2 Result: " + r);
}
});
// Let the Task 1 complete normally
trySleep(8);
p1.cancel(true);
trySleep(1);
assertTrue("Expected s1 done, but is " + s1, s1.isDone());
// This assertion also fails, uncommment to see
// assertTrue("Expected s2 cancelled, but is " + s2, s2.isCancelled());
// Wait Task 2 to complete as it was NOT cancelled
trySleep(2);
assertTrue("Expected s2 cancelled, but is " + s2, s2.isCancelled());
}
Here tascalate-concurrent-recursive-cancel.patch.zip is the patch for 0.5.2-SNAPSHOT which adds support for recursive cancellation. I added also a file with tests so that it can be directly taken for a test drive :) The patch already contains the previously discussed bug fix.
You can freely modify the code or decide not to use/merge it in any way.
Cheers, Adam
Adam,
I see your point. However, this somewhat contradicts with CompletionStage API contract -- completion stage may be completed only once, either successfully, or with exception, or cancelled. With your suggestion it's somewhat non-intuitive, especially when there is a race between completion and cancellation.
For now I've created a separate branch for your patch (with the patch applied). However, I would like to avoid mixing your changes with the code that adhere to CompletionStage contract. What I'm suggesting:
- Work with the new branch for now.
- Refactor AbstractCompletableTask to allow 2 extension hierachies -- the one for previously existed code, and the second for your extension. Please try to keep changes in AbstractCompletableTask to minimum.
- Probably it's better to have separate method, cancelRecursive or cancelWithDependencies -- to have a separate execution path for this behavior.
At the end, I'll either include both class hierarchies into lib, or you will end up with good enough superclass(es) to extend the lib and add necessary behavior without patching lib sources.
Thank you for your interest & support,
Valery
Hi Valery,
what is the best way to do what you suggested? Should I fork your repository and create pull request for the new branch? Or should I commit to the new branch directly? I suppose, I should choose the first.
Reopening issue, was closed accidentally.
Transitive cancellation is indeed both practical and challenging topic that should be researched.
Yes, the first
I reviewed your code. There are a lot of questions:
doThenComposeAsync
--tempStag
e is a predecessor of thenextStage
, why it's not listed in dependencies?- A lot of methods, like thenApply or exceptionally don't enlist
dependencies
at all. - In
cancel
, if you putonError(new CancellationException());
before cancelling dependencies you probably not needdependenciesMightHaveBeenInterrupted
at all. - Why
thenCombineAsync
haswhenComplete
handler? OncenextStage
is complete you can't cancel it anyway.
Also, I'm a bit skeptic about thenCombineAsync
, applyToEitherAsync
etc methods, that takes CompletionStage as an argument. This dependency is not owned exclusively by the newly created Promise. So cancelling them might be dangerous. Actually, no one guarantees that any Promise here is an exclusive dependency, there are always chances that it's shared. So cancelling it will lead to unpredictable behavior, it would be hard to reason about the program behavior.
Regarding your example.
Promise<?> p1 = ...;
Promise<?> p2 = ...;
Promise<List<?>> cancelationToken = Promises.all(p1, p2);
...
cancelationToken.cancel(true);
This way you can cancel all required promises at once if they are not completed yet. Without any changes to current library code.
Thoughts?
Ad 1. Yeah, it might be more correct if tempStage
is also included into dependencies
. For my usage, I expected the function fn
, which provides the next CompletionStage, to execute quickly.
Ad 2. Not directly, but look into internalCreateCompletionStage(...)
implementations. The current completion stage is added as a dependency in this factory method.
Ad 3. If I put onError(...)
before cancelling then the tests failed for me. Yet, I don't know exactly why. I will look into it more.
Ad 4. This is for the case that the current stage is cancelled. Then I wanted to propagate the cancellation also to provided (combined) completion stage.
I see why are you sceptic. The dependencies might be shared, and in a complex scenario they probably will be. Thus this recursive (or transitive) cancellation would in such cases introduce unexpected behaviour. What about counting dependents? They are known through CallbackRegistry
.
Yes, in the way you shown I am able to cancel all the required promises. The problem arises when I want to provide this "all cancelling" promise as a result of an API method. It is incovenient to provide all results (including intermediate) instead of the producet of the whole task chain.
Hi Valery.
I thought about 3. I can call onError(...)
before cancelling but I still need to set the mightHaveBeenInterrupted
flag even before onError(...)
. The reason is that when onError(...)
is called and there are succeeding stages executed synchronously (such as whenComplete) they are reached recursively from onError(...)
. This is correct and it is not a problem regarding thread interruption at all. However, it is a problem of my forward-cancelling thenCombineAsync
implementation which in this case does not know whether its predecessor could have been interrupted or not. Maybe, I am able to implement it in another way, e.g., I could extend CancellationException and propagate the flag value using it. However, it seems to me that it would be less intuitive to handle the flag in this manner.
Regarding the shared dependencies. Maybe this might be solved by providing new methods that have a new argument (after Executor
) which lets the programmer choose whether he wants to interrupt a possibly shared promise.
prev.thenCombineAsync(CompletionStage<?> other, Executor e, boolean mayCancelOther);
This might be possible by extending Promise
into a new interface (let's say Operation
) which could also contain a new method cancelRecursive
or just cancel with a new parameter, e.g., mayCancelRecursive
. In this manner a new extension hierarchy would be created. Is this what you meant?
Adam,
I reviewed thenComposeAsync
and understand that I need cancellableOrigins
anyway -- at least, for non-exposed helper stages. Please review my code in master
branch -- now in thenComposeAsync
first tempStage
is registered as cancellable origin (so fn.apply()
may be interrupted), then result of fn.apply()
itself is enlisted.
Regarding your implementation of thenCombineAsync
-- I don't understand here why whenComplete
is necessary for you. Ain't it simpler to just add other
parameter as a dependency to nextStage
?
Still thinking on extra-methods with explicit parameter to mark cancellable origin...
Adam, please take a look at the following prototype code:
https://github.com/vsilaev/tascalate-concurrent/blob/master/src/main/java/net/tascalate/concurrent/RecursiveCancellationPromise.java
The idea is:
Promise<?> p1 = CompletableTask
.asyncOn(executor)
.thenRunAsync(() -> longTask(5, s1));
Promise<?> p2 = RecursiveCancellationPromise.from(p1).thenRunAsync(() -> longTask(5, s2));
RecursiveCancellationPromise.class.cast(p2).enlistCancellableOrigin(someOtherPromiseLikeCombineOrEitherParam):
p2.cancel(true);
// This will cancel p1 and its task, p2 and its task as well as everything explicitly enlisted
Thoughts?
Valery, I did not much today. Probably the only thing that I will have finished today is a test which shows a race in the new implementation of thenComposeAsync
. I copy here the comment which I added also to the specific lines into AbstractCompletableTask
:
// TEST: There is a race when fn.apply(r) is completed
// normally and nextStage is cancelled before returned is set
// as its nextStage's cancellableOrigins. In this case,
// execution of returned continues as nextStage cannot be
// cancelled for a second time. However, completion stages after
// nextStage are completed exceptionally (correctly) since when
// moveToNextStage is executed nextStage is already completed
// (cancelled) from cancel(...) -> onError(...). In order to
// cancel returned here, I think you need to know whether
// nextStage might have been interrupted.
I hit this issue before during my implementation and addressed it using the synchronization and possible cancelling using the remembered mayInterrupt...
flag.
Here is a test (for master
) which fails for me. thenComposeAsync-race-test.zip
Now, I will think about thenCombineAsync
more.
Adam,
Thanks, fixed.
Checking nextStage.isCancelled()
after resetting cancellableOrigins
should work without synchronization, so no extra flags required.
Sleep in AbstractCompletableTask
is commented out, so to test uncomment locally
Regards,
Valery
Looks good. No other sync needed since cancelling can only occur before or after reseting cancellable origins, but not in the same time. Sync is implicit due to sync in these methods.
Thanks!
Please look at RecursiveCancellationPromise
This is still work in progress. But! If we can go with such wrapper, with additionally overwritten *Either* and *Both* named methods to enlist CompletionStage parameter -- then I will definitely keep it in library for next release.
Valery
Some thoughts on thenCombineAsync
.
Yes, you can add other
into cancellableOrigins
of nextStage
, but they will be also manipulated (reset) in thenComposeAsync
. In this manner, reseting could remove other
from origins of nextStage
. This could be handled by adding. However, it seemed to me quite elegant to implement thenCombineAsync
using simpler operations such as internal whenComplete
.
What do you think?
Anyway, I also thought about implementing thenComposeAsync
using this approach. This could avoid solving sync issues and could make the code more easy to read and also to extend. I worked today on the second extension hierarchy and become stuck on thenComposeAsync
since I extracted the new fields (origins, lock) from AbstractCompletableTask
to new classes. But these fields were used directly in thenComposeAsync
before.
I looked at RecursiveCancellationPromise
. Do you plan to support fluent style of programming?
I mean this:
RecursiveCancellationTask.supplyAsync(...)
.thenApplyAsync(...)
.thenAcceptBothAsync(p, true); // or something like thenAcceptBothCancellableAsync(p)
This seems to me much more easy to work with than keeping explicit references to your cancellable promises. Also, it might be less error prone, e.g., when you forgot to enlist a task.
Adam,
Yes, I've added missing methods, class is renamed to DependentPromise
DependentPromise<?> p1 = DependentPromise.from(someRegularStage);
DependentPromise<?> p2 = p1.thenApplyAsync(fn, true);
// same as p1.thenApplyAsync(fn);
// "this" is enlisted by default;
DependentPromise<?> p3 = p2.runAfterEitherAsync(otherStage, codeBlock);
// same as p2.runAfterEitherAsync(otherStage, codeBlock, PromiseOrigin.THIS_ONLY);
// "this" and only "this" is enlisted by default;
DependentPromise<?> p4 = p2.acceptEither(otherStage, someConsumer, PromiseOrigin.ALL);
// above both "this" and "otherStage" are enlisted
// otherStage will be cancelled if p4 is cancelled.
Please provide your feedback, probably we need nothing else in AbstractCompletableTask.
Plus, this wrapper may wrap anything besides AbstractCompletableTask, incl. standard CompletableFuture/
Regards,
Valery
Hi Valery, this seems the right direction to go.
Today, I don't have much time so I only modified one test and fixed a small mistake in DependentPromise
. The first test passes and I expect also the others to pass.
I will be on a vacation during the weekend and will be back on Monday. I suggest that I modify all the other tests so that we can be sure about DependentPromise
and I will suggest a DependentTask
that would make the creation of the intial DependentPromise
more covenient (see CompletableTask.runAsync(...)
).
DependentPromise-bugfix-test.zip
What about calling cancelOrigins(mayInterruptIfRunning = true)
in doWrap
? Is this ok if you do not known how the dependent promise was cancelled? For my usage, posibility of interruption is needed.
Last small suggestion. What about including a setting similar to defaultExecutor
for a DependentPromise
, defaultEnlistOrigins
?
Today, I don't have much time so I only modified one test and fixed a small mistake in DependentPromise. The first test passes and I expect also the others to pass.
Thank you! The fix is applied.
and I will suggest a DependentTask that would make the creation of the intial DependentPromise more covenient (see CompletableTask.runAsync(...)).
Well, I don't see it should be included in the library itself, any library user can do this for itself on per-case basis (it's just two lib calls, one wrapped into another)
What about calling cancelOrigins(mayInterruptIfRunning = true) in doWrap? Is this ok if you do not known how the dependent promise was cancelled? For my usage, posibility of interruption is needed.
Please elaborate more. It's exactly cancellation of origins with interruption...
Last small suggestion. What about including a setting similar to defaultExecutor for a DependentPromise, defaultEnlistOrigins?
Thought about this. I'd prefer to keep things explicit while abuse of this feature is quite dangerous. Even adding direct predecessor (as currently implemented) is somewhat dangerous.
What about calling cancelOrigins(mayInterruptIfRunning = true) in doWrap? Is this ok if you do not known how the dependent promise was cancelled? For my usage, posibility of interruption is needed.
Please elaborate more. It's exactly cancellation of origins with interruption...
I thought about a situation when a user wants to cancel a chain of tasks. Let's say we have four thenRunAsync
tasks (T1-4) from which T1 is done, T2 is running, and T3, T4 are waiting. Currently, you can cancel T3 and T4, and interrupt T2. But what if there is a case that you want to prevent T3 and T4 from running, but you want to let T2 to finish?
I don't see that the second scenario is possible now. Or am I missing something?
EDIT: Sorry, the patch was incomplete.
I modified my tests, so that they successfully test DepedentPromise
now. Please, see my patch.
tascalate-concurrent-recursive-tests.patch.zip
Also I suggested removing the to Promise
cast from DependentPromise.doWrap
method. I instead made the type of original
parameters of doWrap
and wrap
stronger regarding type checking. Anyway, you are only able to cancel a task if you know it is a promise, i.e., it implements also Future
.
I thought about a situation when a user wants to cancel a chain of tasks. Let's say we have four thenRunAsync tasks (T1-4) from which T1 is done, T2 is running, and T3, T4 are waiting. Currently, you can cancel T3 and T4, and interrupt T2. But what if there is a case that you want to prevent T3 and T4 from running, but you want to let T2 to finish?
I don't see that the second scenario is possible now. Or am I missing something?
DependentPromise T1 = DependentPromise.from( CompletableTask.supplyAsync(supplier1, executor) );
DependentPromise T2 = T1.thenRunAsync(run2);
DependentPromise T3 = T2.thenRunAsync(run3, false);
DependentPromise T3 = T3.thenRunAsync(run4, false);
...
T3.cancel(true);
T4.cancel(true);
Ok, but it is not possible to cancel T1 and T2 by using, .e.g, T4 now.
Maybe it is too general scenario which might not occur at all. At least for me it won't. I think you can close the issue now.
Adam,
Big thanks for all the work you did for this issue, and especially for tests.
I'm about to release new functionality -- I decided that all methods inherited from CompletionStage
API should not add anything as cancelable origin. Only overloaded method calls should be used, with explicit arguments. I prefer the path of least surprise for the library user: DependentPromise
is a Promise
, and if it is passed to code that expect Promise
(with no cancellation of anything besides the promise itself) it should work like this.
Tests are updated accordingly to use overloaded methods' variants with explicit arguments.
Thank you,
Valery