race condition on `via` - interruption flag may not be reset
Closed this issue · 2 comments
leonoel commented
In Thunk.java
: if the run method terminates concurrently with a cancellation, interruption flag may be reset before the runner thread is interrupted by the canceller thread, resulting with the interruption flag set when the runner thread starts the next task.
Not a problem with executors exposed by missionary because j.u.c.ThreadPoolExecutor
resets the flag between successive tasks anyways, but could be problematic if via
is used with another executor implementation.
PEZ commented
Could it be the reason why this locks up for me?
(let [begin (System/currentTimeMillis)
;; create a flow of values generated by asynchronous tasks
inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
values (m/ap
(let [flow (m/seed inputs) ;; create a flow of tasks to execute
task (m/?> ##Inf flow)] ;; from here, fork on every task in **parallel**
(m/? task))) ;; get a forked task value when available
;; drain the flow of values and count them
n (m/? ;; tasks are executed, and flow is consume here!
(m/reduce (fn [acc v]
(assert (= "hi" v))
(inc acc))
0 values))]
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.