ocaml-multicore/domainslib

sketch proposal / brainstorming for the Domainslib API

gasche opened this issue · 11 comments

In #77 we discussed the API of domainslib, and the fact that one could consider several changes, small or large. On my commute back today I tried to "brainstorm" on what another API for domainslib could look like -- same capabilities as today, but with a clearer separation of concern between the various pieces of the implementation. The result is included below for discussion.

module Workset : sig
  (* This lower-level module represents a "job pool" where jobs can be
     added or retrieved.  It is used internally in the implementation
     of domain pools, and also in the Task runner
     (asynchronous actions are sent to the workset, and tasks are
     retrieved from the workset while waiting on a promise). This
     intermediate abstraction gives expressivity for more complex
     scheduling scenarios, for example it is possible to have two
     domain pools share their workset to "union" them. *)

  type job = (unit -> unit)

  type t (* = job Multi_channel.t *)
  (** A workset is a multi-producer multi-consumer unordered queue
      of jobs to be performed. *)

  val make : unit -> t

  val send : t -> job -> unit
  val recv : t -> job
  val recv_poll : t -> job option (** non-blocking *)
end
module Pool : sig
  (** A pool of worker domains that are dedicated to performing jobs
     received from a workset. It is only necessary to deal with Pool
     for semi-advanced Task scheduling policies, for example if you
     want to share the same domain pool between many short-running
     tasks. *)

  type t
  (** Type of domain pool. *)
  
  val spawn : num_worker_domains:int -> workset:Workset.t -> pool
  (** Sets up a task execution pool with [num_worker_domains] worker domains,
      that immediately start pulling jobs from the given [worker] and
      performing them, continuing until [teardown] is called.
  
      When [num_worker_domains] is 0, the new pool will have no domains
      and not perform any work.
  
      Raises {!Invalid_argument} when [num_worker_domains] is less than 0. *)
  
  val get_num_worker_domains : t -> int
  val get_workset : t -> Workset.t

  val set_workset : t -> Workset.t -> t
  
  val teardown : t -> unit
  (** Tears down the task execution pool. Each worker will finish
      ongoing job, but jobs still in the workset after that will
      remain there. *)
end
module Task : sig
  (** A "task" abstraction for determining sub-computations to be
      performed in parallel.

      Tasks are effectful programs that use the effectful operations
      provided by the Task module ([fork_join], [parallel_for],
      [async], [await], etc. They must run in the dynamic scope of
      a handler, one of the [run_*] functions.  *)

  type 'a task = unit -> 'a
  (** Type of task *)

  (** {2. Handlers} *)

  val run : ?num_worker_domains:int -> 'a task -> 'a
  (** [run t] runs the task [t] synchronously, using a temporary pool of worker domains
      to offload asynchronous tasks in parallel.

      By default the number of worker domains created is
        [Domain.recommended_domain_count () - 1].
  *)

  val run_with_workset : Workset.t -> 'a task -> 'a
  (** [run_with_workset ws t] is an advanced version of [run t] that
      does not spawn a temporary domain pool, but instead parks some
      asynchronous tasks in the workset [ws] and performs tasks from
      [ws] during idle times. This gives maximum flexibility to offload
      computation. Note that the calling domain may run jobs from the
      workset that are unrelated to the task.

      This advanced handler is especially useful in the advanced
      use-case of nesting several task handlers. For example, the main
      handler may be invoked with [run], but locally some sub-part of
      the computation runs under a nested [run_with_workset] invocation
      to be offloaded to a different workset feeding a dedicated domain
      pool.
  *)

  (** {2. Effectful operations -- must run in the dynamic scope of a handler.} *)

  val fork_join : 'a task -> 'b task -> 'a * 'b

  val fork_join_u : unit task -> unit task -> unit task

  val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
                     (int -> unit) -> unit

  val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
                            ('a -> 'a -> 'a) -> 'a -> (int -> 'a) -> 'a

  val parallel_scan : ('a -> 'a -> 'a) -> 'a array -> 'a array

  val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
                      (int -> 'a option) -> 'a option

  type !'a promise
  (** Type of promises *)

  val async : 'a task -> 'a promise
  (** [async t] runs the task [t] asynchronously. The function returns
      a promise [r] in which the result of the task [t] will be
      stored. *)

  val await : 'a promise -> 'a
  (** [await r] waits for the promise [r] to be resolved. Other asynchronous tasks
      may be performed in the meantime. If the task associated with the promise have
      completed successfully, then the result of the task will be returned. If the
      task have raised an exception, then [await] raises the same exception. *)
end

I think this is moving in a brighter direction. Question: it seems impossible for Task to work with more than two anonymous Workset.t at the same time, and that for non-anonymous ones one would have to call run_with_workset to switch? It is intentional? If not, perhaps some suggestions in #77 (e.g., using functors) could help?

I'm not sure what your distinction is between anonymous and non-anonymous worksets, but I realized that I forgot to expose a Workset.make function, which is probably related. It is added now. In my imagination, Task.run is a convenience helper function that creates a workset, starts a fresh domain pool on that workset, and then calls Task.run_with_workset.

I think it is possible to mix distinct worksets in the same computation by nesting run_with_workset handlers, but I would not spend too much effort thinking about this now because it's a corner use-case. (I thought of this when I realized than today async being parametrized by a pool lets you offload some sub-tasks specifically to a separate pool, and that this might be involved in the serialization/crypto user scenario of using different pools for different parts of the computation.)

Got it. (By "anonymous" I meant "worksets created internally by Task.run" and by "call run to switch" I meant exactly "nesting run_with_workset". Just fixed my previous comment to replace "run" with "run_with_workset".)

Is the distinction between workset and pool necessary? What's the usecase?

Relatedly, what would be the semantics of set_workset if the pool is already servicing a workset. Specifically, what happens to the jobs that are currently being serviced by a domain in the pool when a new workset is specified?

My intuition is that separating the two concepts makes them both simpler, easier to specify and reason about. Currently it is weird that a task runner may steal jobs from a domain pool when it is blocked waiting on something else, and I think it is the root cause of the confusion that is discussed in #77. With the separated design, worksets are just a concurrent data-structure storing jobs, while pools are just worker domains that repeatedly service jobs from a workset -- plus the ability to tear them down externally, which cannot be implemented with a Quit job anymore.

In the previous API, everyone must know about pools, but worksets are a completely internal details of the pool implementation ... except they leak in the Task.run specification. With the proposed API, simple use-cases require knowledge of neither pools nor worksets, Task.run hides them both. Advanced users can use them if the need arises, but they are mostly there as support module to implement Task runners.

Relatedly, what would be the semantics of set_workset if the pool is already servicing a workset. Specifically, what happens to the jobs that are currently being serviced by a domain in the pool when a new workset is specified?

This is a delicate part. Below I will explain my mental model (my best guesses as to what would be going on in your case), but it is likely to be wrong if we don't experiment with an actual prototype implementation.

Mental model:

  • There is no notion of "job of a workset currently running': as soon as a domain pool receives a job from a workset (to run it), it is not part of the workset anymore. (In particular, set_workset cannot interrupt a running job or anything like that.)
  • After the call to set_workset, the worker domains are not consuming jobs from the previous workset anymore, but other code running may still send new jobs to it. (In particular, maybe the worker domains are still running some jobs coming from a task runner, that will send more asynchronous jobs to the previous workset. The runner knows about the workset, not the pool.)
  • It may be that those jobs will sit idle without anyone performing them, but note that if the only users of worksets are task runners, then the task-running domains will keep consuming jobs from their respective worksets until they return a final result. (They may leave dangling jobs corresponding to asynchronous tasks whose promises were never forced, though.)

One advanced use-case involving set_workset would be a system that has a single global pool of worker domains, but that several independent task runners (maybe even running concurrently from several domains). For example, maybe the application has a parmap helper that is implemented with Task.run_with_workset plus some worksete-synchronization logic. One possible logic is to share a single workset between all task runners, which gives the same behavior as the current implementation. But another one is to have a local workset for each task runner, and protect the "active workset" of the global pool with a lock: task runners take the lock, set their workset and wait for task completion before releasing the lock. This prevents the task runners from running concurrently, but it guarantees that each task runner will only ever service jobs coming from its own task, and never jobs coming from another runner, which might be important for locality reasons. Note that in this case, set_workset is only ever called when the current workset is "quiescent", no one is interested in sending more jobs or consuming its jobs anymore. (In fact those worksets become dead and would be garbage-collected.)

I'm not sure whether set_workset is important to have, it was my latest addition to the API and it is not necessary to express the simple, common use-case. I also considered other functions like Workset.is_currently_empty and Workset.clear and decided against adding them, at least until a use-case shows up.

Thinking about it, my own "advanced example" above is a bit silly, because instead of having independent worksets and a lock on set_workset usage one may just as well lock the usage of the current pool, which can already be done with the current API. set_workset remains important to express other things, in particular the ability to join two domain pools to work for the same task runner, but again do we care about this extra expressivity?

Thinking about it again: I've grown convinced by @kayceesrk's point that separating pools and worksets is probably API over-engineering. Below is a simplified API where pools have an explicit function to steal a job.

module Pool : sig
  (** A pool of worker domains that are dedicated to performing jobs
     received from a workset. It is only necessary to deal with Pool
     for semi-advanced Task scheduling policies, for example if you
     want to share the same domain pool between many short-running
     tasks. *)

  type t
  
  val spawn : num_worker_domains:int -> pool  
  val get_num_worker_domains : t -> int

  val steal_job_poll : pool -> job option
  (** [steal_job_poll p] steals a job of [p] if available.
      This function is non-blocking. *)

  val teardown : t -> unit
end
module Task : sig
  (** A "task" abstraction for determining sub-computations to be
      performed in parallel.

      Tasks are effectful programs that use the effectful operations
      provided by the Task module ([fork_join], [parallel_for],
      [async], [await], etc. They must run in the dynamic scope of
      a handler, one of the [run_*] functions.  *)

  type 'a task = unit -> 'a
  (** Type of task *)

  (** {2. Handlers} *)

  val run : ?num_worker_domains:int -> 'a task -> 'a
  (** [run t] runs the task [t] synchronously, using a temporary pool of worker domains
      to offload asynchronous tasks in parallel.

      By default the number of worker domains created is
        [Domain.recommended_domain_count () - 1].
  *)

  val run_with_pool : Pool.t -> 'a task -> 'a
  (** [run_with_pool p t] is an advanced version of [run t] that does
      not spawn a temporary domain pool, but instead parks some
      asynchronous tasks in the given pool [p].
      
      During idle times (when waiting on an asynchronous task), the
      calling domain will itself perform jobs from [p] -- note that
      those jobs may be unrelated to the task, coming from other users
      of the pool. *)

  (** {2. Effectful operations -- must run in the dynamic scope of a handler.} *)

  val fork_join : 'a task -> 'b task -> 'a * 'b

  val fork_join_u : unit task -> unit task -> unit task

  val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
                     (int -> unit) -> unit

  val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
                            ('a -> 'a -> 'a) -> 'a -> (int -> 'a) -> 'a

  val parallel_scan : ('a -> 'a -> 'a) -> 'a array -> 'a array

  val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
                      (int -> 'a option) -> 'a option

  type !'a promise
  val async : 'a task -> 'a promise
  val await : 'a promise -> 'a
end

I like the new API. I have another idea---can we drop the parallel_ prefix?

  • iter/for_: the current parallel_for
  • reduce: the current parallel_for_reduce
  • scan: the current parallel_scan
  • find: the current parallel_find (btw, this is semantically a special case of reduce, but potentially supports early termination?)

While we are on it, I wonder if it makes sense to also have a high-level interface on arrays?

  • init: parallel initialization
  • iter: like the low-level iter/for but takes an array
  • map: parallel map
  • reduce: like the low-level reduce but takes an array
  • scan: the same as the low-level scan
  • find: like the low-level find but takes an array

The new API is better. More questions:

  1. Is the type job the same as a task? See steal_job_poll.
  2. Do we know that steal_job_poll is useful? Is this an API that we're adding without a real usecase?
  3. It is conceivable that the programmer may wish to add a task to a different already existing pool. The current API does not provide this.
    • Can we make pool an optional argument to fork_*, async, await, parallel_*? When it is explicitly provided the "task" is serviced by workers on that pool. This causes a little bit of confusion about what the domain is doing when the domain does fork_* on a different pool. Does it run tasks from its "current" pool or the target pool. I would think that, since the domain is waiting for the result to be returned, then it should service the tasks on the target pool.
    • Alternatively, we may say that the only primitive that allows you to run a task on a different pool is async: ?pool:Pool.t -> task -> 'a promise. If the user wishes to offload a parallel for loop, for example, they may wrap that in a thunk, async it to the target pool and get a promise to the result? This is a cheap operation.
  4. The named pool concept is gone. We should check whether Tezos folks are using that. Without named pools, one needs to pass the state around to the functions that may wish to run a job on a different pool than the "current" one. This needs (3) above.

The distinction between job and task is intentional. job is any (unit -> unit) function that can be run on any OCaml domain, without making assumptions about the environment. task is a closure that may perform asynchronous-computation effects, and needs to be run inside a specific handler. The Task implementation never sends tasks to the workers as-is, it always packages them with effect-handling logic.

Do we know that steal_job_poll is useful? Is this an API that we're adding without a real usecase?

In my imagination, steal_job_poll is used to implement Task.run: when you await a promise which is not yet available, the Task handler will steal a job from the pool it and run it. Having this function available influences the mental model of users and let them understand that no, the pool does not "own" the current domain, but yes, the current domain may run jobs from the pool.

It is conceivable that the programmer may wish to add a task to a different already existing pool.

In my imagination again, Task.run_with_pool handlers can be nested, so that sub-computations could in theory be sent to a different pool. In particular, if you have a specific crypto pool, you could define say let async_crypto f = Task.run_with_pool crypto_pool (Task.async f) and use this helper in your program.

If this idea of nesting handlers works (I would try it first before claiming that it does), I think it is nicer than making pools an optional argument of all task constructors, because it also gives the expressivity to conveniently run larger sub-tasks on a different pool.

The named pool concept is gone.

If I was in charge of designing Domainslib (fortunately I'm not), I would not include named pools, which I think of as premature. It's very easy to do on the user side, and users are likely to do it better (safer) than what is currently proposed.

On the other hand: I used to be very confident that hiding Pool creation under a Task.run helper, with a more advanced Task.run_with_pool function, was the right default choice for a convenient API. I am less sure now that I realize (thanks to other, ahem, concurrent discussions) that the only robust design performance-wise is to have a single global pool with n-k domains and never create more. Maybe the current API way to force users to think explicitly about pools is better. If I was to iterate on the proposal again I would probably rename Task.run_with_pool into Task.run, get rid of the current Task.run, and get into the business of making Pool.spawn convenient again (optional argument? etc.). (Maybe also a Pool.with_ helper that takes a callback and both creates and tear downs the pool?)

Hello!

I've spent some time in the past few weeks experimenting with trying to write highly optimized multicore OCaml code and thinking about some of these issues.

In particular, I've experimented with various work-stealing and work-sharing implementations in my par-ml project. It basically seems to achieve an order-of-magnitude overhead reduction compared to the current implementation of domainslib for parallel workloads.

I believe that the realization that

the only robust design performance-wise is to have a single global pool with n-k domains and never create more

mentioned by @gasche has a lot of merit.

My work on the Hopac library, based on ideas from Cilk and Concurrent ML, for F# many years ago was based on that same fundamental idea. .NET and Windows provides several APIs for programming with threads, thread pools, etc. Back when I was working on Hopac I came to the conclusion (after many experiments) that it was not possible to implement super efficient schedulers on top of those existing APIs. That is why Hopac runs its own worker threads and schedules work on those worker threads. This provides very good performance when only Hopac threads are running. Unfortunately most existing applications and libraries use the existing .NET / Windows APIs or use their own worker threads, which then breaks the ideal one worker per thread model.

I believe that we should try to come up with ways for asynchronous and parallel programming libraries to co-exist and co-operate profitably with minimal loss of performance so that applications do not have to unnecessarily choose between competing library ecosystems.

For example, an application might use Eio (ping @talex5) for asynchronous programming and use some other library to run CPU intensive work in parallel. A single application might even use multiple different libraries for asynchronous and parallel programming.

To address that goal I have put up a (work-in-progress) "proposal" for a library for the co-operative allocation of domains or the domains library.

Please note that domains is work-in-progress and is mostly meant as a starting point for investigating the basic idea (co-operative allocation of domains). Ultimately similar functionality might be part of the standard Domain module. Any feedback (critique) on the idea and the (work-in-progress) API is welcome!

I have experimented with the domains library in my par-ml library. The experiment is currently in the dcyl-work-stealing branch. With the current implementation (of domains and par-ml) the overhead is just slightly higher than without the domains layer (but still an order of magnitude smaller than with domainslib). I'm positive that the added overhead can be made smaller.