behavior of `ap` with huge parallelism
Opened this issue · 3 comments
Duplicating comment from #35 as it's likely a separate issue
(let [begin (System/currentTimeMillis)
;; create a flow of values generated by asynchronous tasks
inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
values (m/ap
(let [flow (m/seed inputs) ;; create a flow of tasks to execute
task (m/?> ##Inf flow)] ;; from here, fork on every task in **parallel**
(m/? task))) ;; get a forked task value when available
;; drain the flow of values and count them
n (m/? ;; tasks are executed, and flow is consume here!
(m/reduce (fn [acc v]
(assert (= "hi" v))
(inc acc))
0 values))]
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.
This one is caused by the StackOverflowError
generated by the following call chain:
Seed::deref()
Seed::transfer()
Seed::more()
- notifies downstream
- Call into
Ambiguous::ready
(which will create a new branch & deref a new value - since it's notified)
So, the . I was wrong about the this, after re-check, the Seed::deref()
got a flow protocol violation (#63, it throws)deref
allows throwing exception to indicate a failure.
It would be great if the missionary's implementation check the checkable and just stop the processes and throw. At least the must not throw
part is very checkable (java.lang.VirtualMachineError
like this one & user caused exception like #81).
Even better making undefined behaviors from task/flow spec defined under the context of missionary processes. (#81 is caused by success continuation throw on the ground & stops the sleep loop).
Still try to find which part of Ambiguous's handling of flow/tasks' UB is causing this issue. But for Seed::deref
, replace the following to Thunk.cpu.execute(() -> more(ps, i))
should be enough (which breaks the recursive call chain & make deref non-blocking; currently it blocks until all branches are created):
missionary/java/missionary/impl/Seed.java
Line 42 in 45fcaac
The call chain is indeed wrong, the consumer process (ap
in this case) should guard against reentrant notifications. The correct call chain must be :
seed
notifiesap
ap
transfersseed
seed
notifiesap
if iterator not exhaustedap
detects reentrant notification, marks ready state and returnsseed
returns value from transferap
processes transferred value, creates new branch, checks ready state, goto 2ap
returns from notification
Running the seed
notification in another thread - I would consider this to be a semantic change, and therefore not a valid fix. This is because operators interpret synchronous notifications as part of the same transaction, it matters e.g. for the implementation of continuous time semantics. However, you can write the same workaround in userland :
(defn seed-async [xs]
(m/ap (try (m/?> (m/seed xs))
(finally (m/? (m/via m/cpu))))))
The following one throws with StackOverflowError (switch on huge input):
(->> (m/ap (let [i (m/?< (m/seed (range 10000)))]
(try (m/? (m/sleep 10 i))
(catch Cancelled _ (m/amb)))))
(m/reduce conj)
(m/?))