vsilaev/tascalate-concurrent

The cancel is not working on the async chain

Closed this issue · 3 comments

unaor commented

Im trying to use the library due to the fact that i must cancel stucked threads (due to stuck WMI usually)

im noticing however that despite calling the cancel or specifing short timeout with the parameter set to true, the chain continues

to make it easier to see what im doing wrong ill attach some code and logs

public Promise<ComplianceCandidate> process(ComplianceCandidate candidate, ExecutorService pool) {

        return CompletableTask.supplyAsync(() -> candidate, pool).thenApplyAsync(CheckIp::doWork)
                .thenApplyAsync(CheckType::doWork).thenApplyAsync(CheckExclusion::doWork)
                .thenApplyAsync(AssignEntity::doWork).thenApplyAsync(DecideWmi::doWork)
                .thenApplyAsync(ObtainWmiConnection::doWork).thenApplyAsync(ObtainRegistryWmiConnection::doWork)
                .thenApplyAsync(RunCompliance::doWork).thenApplyAsync(DecideComplianceResult::doWork)
                .thenApplyAsync(CreateAlert::doWork).thenApplyAsync(ApplyFailureManager::doWork)
                .thenApplyAsync(ApplyDisconnectManager::doWork).thenApplyAsync(EnforceCompliance::doWork)
                .exceptionally(ExceptionHandlerService::handle).orTimeout(Duration.ofSeconds(3), true);
    }

the timeout should affect in the task obtainWmiConnection yet im seeing the task downstream being executed (runCompliance)
im also attaching the relevant logs

2021-02-19 09:09:18 [pool-2-thread-6] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has entered Decide WMI task
2021-02-19 09:09:18 [pool-2-thread-6] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] needs WMI connection due to condition ComplianceCondition [id=6, type=FileWMI, uniqueName=AVDatFile]
2021-02-19 09:09:18 [pool-2-thread-6] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has passed Decide WMI task
2021-02-19 09:09:18 [pool-2-thread-7] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has entered Obtain WMI task
2021-02-19 09:09:19 [pool-2-thread-7] - Proceeding to establish wmi connection for device: [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server]  wmiuser: localhost\administrator total users: 1
2021-02-19 09:09:23 [pool-3-thread-1] - Error TimeoutException: Timeout after PT3S on demand execution for device [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server]
2021-02-19 09:09:34 [pool-2-thread-7] - Successfully established WMI for device: [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] with WMI user: Credential [id=12, wmiDomain=localhost, wmiUser=administrator], total time to obtain connection is 15 seconds
2021-02-19 09:09:34 [pool-2-thread-10] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has entered run compliance task
2021-02-19 09:09:35 [pool-2-thread-10] - [ipAddress=192.168.99.190, macAddress=0x18A905C1AAA9, type=Server] has has the system name WISE-DEV190 and users WISE-DEV190\Administrator,WISE-DEV190\DWM-2,WISE-DEV190\DWM-1

The pool by the way is fixedThreadPool

Read the doc about dependent() promise functionality -- you are cancelling only the the latest stage - exceptionally
For your specific case the best option is:

return CompletableTask.supplyAsync(() -> candidate, pool)
                .dependent(PromiseOrigin.ALL) //Key point
                .thenApplyAsync(CheckIp::doWork)
                .thenApplyAsync(CheckType::doWork).thenApplyAsync(CheckExclusion::doWork)
                .thenApplyAsync(AssignEntity::doWork).thenApplyAsync(DecideWmi::doWork)
                .thenApplyAsync(ObtainWmiConnection::doWork).thenApplyAsync(ObtainRegistryWmiConnection::doWork)
                .thenApplyAsync(RunCompliance::doWork).thenApplyAsync(DecideComplianceResult::doWork)
                .thenApplyAsync(CreateAlert::doWork).thenApplyAsync(ApplyFailureManager::doWork)
                .thenApplyAsync(ApplyDisconnectManager::doWork).thenApplyAsync(EnforceCompliance::doWork)
                .exceptionally(ExceptionHandlerService::handle).orTimeout(Duration.ofSeconds(3), true);

Next item, your code should cooperate with the library -- either check Thread.currentThread().isInterrupted() as in this issue or use libs that supports interruptions -- for ex., I/O streams created with Channels.newInputStream(FileChannel.open(...)) may be interrupted on read/write, but ones coming from FileInputStream are not. Same is true for different Http Connection libs (see here).

unaor commented

Thank you. i tried with the dependent without args and it didnt work however when i tried using the PromiseOrigin.All i got the result that i wanted

Yes, I need to document this feature. Plus what unwrap and raw do in regard to the dependent method and other decorators.