leonoel/missionary

behavior of `ap` with huge parallelism

Opened this issue · 3 comments

Duplicating comment from #35 as it's likely a separate issue

(let [begin (System/currentTimeMillis)
        ;; create a flow of values generated by asynchronous tasks
        inputs (repeat 100000 (m/via m/cpu "hi")) ;; a task has no identity, it can be reused
        values (m/ap
                (let [flow (m/seed inputs)     ;; create a flow of tasks to execute
                      task (m/?> ##Inf flow)]  ;; from here, fork on every task in **parallel**
                  (m/? task)))                 ;; get a forked task value when available

        ;; drain the flow of values and count them
        n (m/? ;; tasks are executed, and flow is consume here!
           (m/reduce (fn [acc v]
                       (assert (= "hi" v))
                       (inc acc))
                     0 values))]
    (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

It's from the Tasks & Flows Walkthrough. But with 100K inputs instead of 1K.

lotuc commented

This one is caused by the StackOverflowError generated by the following call chain:

  • Seed::deref()
  • Seed::transfer()
  • Seed::more()
    • notifies downstream
    • Call into Ambiguous::ready (which will create a new branch & deref a new value - since it's notified)

So, the Seed::deref() got a flow protocol violation (#63, it throws). I was wrong about the this, after re-check, the deref allows throwing exception to indicate a failure.

It would be great if the missionary's implementation check the checkable and just stop the processes and throw. At least the must not throw part is very checkable (java.lang.VirtualMachineError like this one & user caused exception like #81).

Even better making undefined behaviors from task/flow spec defined under the context of missionary processes. (#81 is caused by success continuation throw on the ground & stops the sleep loop).

Still try to find which part of Ambiguous's handling of flow/tasks' UB is causing this issue. But for Seed::deref, replace the following to Thunk.cpu.execute(() -> more(ps, i)) should be enough (which breaks the recursive call chain & make deref non-blocking; currently it blocks until all branches are created):

The call chain is indeed wrong, the consumer process (ap in this case) should guard against reentrant notifications. The correct call chain must be :

  1. seed notifies ap
  2. ap transfers seed
  3. seed notifies ap if iterator not exhausted
  4. ap detects reentrant notification, marks ready state and returns
  5. seed returns value from transfer
  6. ap processes transferred value, creates new branch, checks ready state, goto 2
  7. ap returns from notification

Running the seed notification in another thread - I would consider this to be a semantic change, and therefore not a valid fix. This is because operators interpret synchronous notifications as part of the same transaction, it matters e.g. for the implementation of continuous time semantics. However, you can write the same workaround in userland :

(defn seed-async [xs]
  (m/ap (try (m/?> (m/seed xs))
             (finally (m/? (m/via m/cpu))))))
lotuc commented

The following one throws with StackOverflowError (switch on huge input):

(->> (m/ap (let [i (m/?< (m/seed (range 10000)))]
             (try (m/? (m/sleep 10 i))
                  (catch Cancelled _ (m/amb)))))
     (m/reduce conj)
     (m/?))