smol-rs/async-task

Can there be no additional scheduling of tasks when Task cancel or Detach

Closed this issue · 1 comments

The following code is logic in my dynamic call based async-task project. And Here, I didn't let the task reschedule to run, but it didn't seem any problem.

  • task.rs
     //Cancel task execution (may be called only once because ownership will be taken for either a cancer or drop)
    fn set_canceled(&mut self) {
        let raw = unsafe { self.raw.as_mut() };
        let mut state = raw.state.load(Ordering::Acquire);
        loop {`
            //If the task has been completed or closed, it does not need to be cancelled.
            if state & (COMPLETED | CLOSED) != 0 {
                break;
            }
            match raw.state.compare_exchange_weak(
                state,
                state | CLOSED,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    //Notify watcher if task is in pendding status
                    //Drop Future
                    //Reduce reference counts. At this point task is definitely not detached from task, so there is no real destory RawTask.
                    //When the task is detached, the task is truly destroyed
                    //It is only possible that the notified here has other tasks
                    if state & (SCHEDULED | RUNNING) == 0 {
                        raw.notify(None);
                        raw.drop_future();
                        raw.drop_ref();
                    }
                }
                Err(s) => state = s,
            }
        }
    }
    unsafe fn set_detached(&mut self) -> Option<T> {
        let raw = self.raw.as_mut();
        let mut output = None;
        //Most often dissociate when created for a task, at which point only simple removal of the task Association mark is necessary
        if let Err(mut state) = raw.state.compare_exchange_weak(
            SCHEDULED | TASK | REFERENCE,
            SCHEDULED | REFERENCE,
            Ordering::AcqRel,
            Ordering::Acquire,
        ) {
            //Detach directly from RawTask
            state = raw.state.fetch_and(!TASK, Ordering::AcqRel);
            //If the RawTask has been completed but not closed
            //Then need to  take out  the results of the task.
            if state & (COMPLETED | CLOSED) == COMPLETED {
                output = Some((raw.output as *mut T).read());
            }
            //If the task reference is already zero when detach, the task needs to be destroyed.
            if state & !(REFERENCE - 1) == 0 {
                raw.destroy();
            }
        }
        output
    }
    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        unsafe {
            let raw = self.raw.as_mut();
            let mut state = raw.state.load(Ordering::Acquire);
            // If the task is cancelled, register yourself to the task and wait for the task to cancel.
            //No task execution results are acquired at this time.
            if state & CLOSED != 0 {
                //If the task is scheduled or running, we need to wait until its future is abandoned.
                if state & (SCHEDULED | RUNNING) != 0 {
                    //The awaiter is replaced by a waker associated with the current task.
                    raw.register(cx.waker());
                    //Reload the state after registering. It is possible changes occurred just
                    // before registration so we need to check for that.
                    state = raw.state.load(Ordering::Acquire);
                    //If the task is still planned or running, we need to wait because its future has not been abandoned.
                    if state & (SCHEDULED | RUNNING) != 0 {
                        return Poll::Pending;
                    } else {
                       //Otherwise, we remove the walker just registered anyway.
                        let _rtn = raw.take();
                    }
                }
                return Poll::Ready(None);
            }
            // If the task is not completed, register the current waker.
            if state & COMPLETED == 0 {
                //Replacing the awaker with an arousal associated with the current task。
                raw.register(cx.waker());
                //  Reload the state after registering. It is possible that the task became
                // completed or closed just before registration so we need to check for that.
                state = raw.state.load(Ordering::Acquire);
               
                // It is not possible to get here because the only one that can now operate the rawtask status is our own.
                // if state & CLOSED != 0 {
                //     continue;
                // }
                // If the task is still not completed, we're blocked on it
                if state & COMPLETED == 0 {
                    return Poll::Pending;
                } else {
                    raw.take();
                }
            }
            raw.state.fetch_or(CLOSED, Ordering::AcqRel);
            //从任务中获取输出。
            let output = raw.output as *mut T;
            return Poll::Ready(Some(output.read()));
        }
    }

*raw.rs

    pub(crate) unsafe fn run(&mut self) -> bool {
        println!("runing");
        let mut state = self.state.load(Ordering::Acquire);
        loop {
            //Task switched to RUNNING state.
            let new = (state & !SCHEDULED) | RUNNING;
            match self
                .state
                .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
            {
                Ok(_) => {
                    state = new;
                    break;
                }
                Err(s) => state = s,
            }
        }
        //Start execution future
        let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
            self as *const _ as *const (),
            &Self::RAW_WAKER_VTABLE,
        )));
        let cx = &mut Context::from_waker(&waker);
        let poll = self.dynfn.as_mut().poll(cx);
        match poll {
            Poll::Ready(_) => {
                loop {
                    //Task switch to COMPLETED state
                    let new = if state & TASK == 0 {
                        (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
                    } else {
                        (state & !RUNNING & !SCHEDULED) | COMPLETED
                    };
                    match self.state.compare_exchange_weak(
                        state,
                        new,
                        Ordering::AcqRel,
                        Ordering::Acquire,
                    ) {
                        Ok(_) => {
                            if state & TASK == 0 || state & CLOSED != 0 {
                                self.drop_output();
                            }
                            if state & AWAITER != 0 {
                                self.notify();
                            }
                            self.drop_ref();
                            break;
                        }
                        Err(s) => {
                            state = s;
                        }
                    }
                }
            }

            Poll::Pending => loop {
                let new = if state & CLOSED != 0 {
                    state & !RUNNING & !SCHEDULED
                } else {
                    state & !RUNNING
                };
                match self.state.compare_exchange_weak(
                    state,
                    new,
                    Ordering::AcqRel,
                    Ordering::Acquire,
                ) {
                    Ok(_) => {
                        if state & CLOSED != 0 {
                            if state & AWAITER != 0 {
                                self.notify();
                            }
                            self.drop_future();
                            self.drop_ref();
                        } else if state & SCHEDULED != 0 {
                            self.schedule();
                            return true;
                        }
                        break;
                    }
                    Err(s) => state = s,
                }
            },
        }
        false
    }

The scheduling is intentional. It means that the cancelled Future gets one more poll to put its resources back into place, which makes cancel-safety a lot more attainable. As it prevents many footguns I'm hesitant to remove it.