leonoel/missionary

race condition on `via` - interruption flag may not be reset

Closed this issue · 2 comments

In Thunk.java : if the run method terminates concurrently with a cancellation, interruption flag may be reset before the runner thread is interrupted by the canceller thread, resulting with the interruption flag set when the runner thread starts the next task.

Not a problem with executors exposed by missionary because j.u.c.ThreadPoolExecutor resets the flag between successive tasks anyways, but could be problematic if via is used with another executor implementation.

PEZ commented

Could it be the reason why this locks up for me?

(let [begin (System/currentTimeMillis)
        ;; create a flow of values generated by asynchronous tasks
        inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
        values (m/ap
                (let [flow (m/seed inputs)     ;; create a flow of tasks to execute
                      task (m/?> ##Inf flow)]  ;; from here, fork on every task in **parallel**
                  (m/? task)))                 ;; get a forked task value when available

        ;; drain the flow of values and count them
        n (m/? ;; tasks are executed, and flow is consume here!
           (m/reduce (fn [acc v]
                       (assert (= "hi" v))
                       (inc acc))
                     0 values))]
    (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.

Race condition fixed in b.37
ap issue moved to #108