sketch proposal / brainstorming for the Domainslib API
gasche opened this issue · 11 comments
In #77 we discussed the API of domainslib, and the fact that one could consider several changes, small or large. On my commute back today I tried to "brainstorm" on what another API for domainslib could look like -- same capabilities as today, but with a clearer separation of concern between the various pieces of the implementation. The result is included below for discussion.
module Workset : sig
(* This lower-level module represents a "job pool" where jobs can be
added or retrieved. It is used internally in the implementation
of domain pools, and also in the Task runner
(asynchronous actions are sent to the workset, and tasks are
retrieved from the workset while waiting on a promise). This
intermediate abstraction gives expressivity for more complex
scheduling scenarios, for example it is possible to have two
domain pools share their workset to "union" them. *)
type job = (unit -> unit)
type t (* = job Multi_channel.t *)
(** A workset is a multi-producer multi-consumer unordered queue
of jobs to be performed. *)
val make : unit -> t
val send : t -> job -> unit
val recv : t -> job
val recv_poll : t -> job option (** non-blocking *)
end
module Pool : sig
(** A pool of worker domains that are dedicated to performing jobs
received from a workset. It is only necessary to deal with Pool
for semi-advanced Task scheduling policies, for example if you
want to share the same domain pool between many short-running
tasks. *)
type t
(** Type of domain pool. *)
val spawn : num_worker_domains:int -> workset:Workset.t -> pool
(** Sets up a task execution pool with [num_worker_domains] worker domains,
that immediately start pulling jobs from the given [worker] and
performing them, continuing until [teardown] is called.
When [num_worker_domains] is 0, the new pool will have no domains
and not perform any work.
Raises {!Invalid_argument} when [num_worker_domains] is less than 0. *)
val get_num_worker_domains : t -> int
val get_workset : t -> Workset.t
val set_workset : t -> Workset.t -> t
val teardown : t -> unit
(** Tears down the task execution pool. Each worker will finish
ongoing job, but jobs still in the workset after that will
remain there. *)
end
module Task : sig
(** A "task" abstraction for determining sub-computations to be
performed in parallel.
Tasks are effectful programs that use the effectful operations
provided by the Task module ([fork_join], [parallel_for],
[async], [await], etc. They must run in the dynamic scope of
a handler, one of the [run_*] functions. *)
type 'a task = unit -> 'a
(** Type of task *)
(** {2. Handlers} *)
val run : ?num_worker_domains:int -> 'a task -> 'a
(** [run t] runs the task [t] synchronously, using a temporary pool of worker domains
to offload asynchronous tasks in parallel.
By default the number of worker domains created is
[Domain.recommended_domain_count () - 1].
*)
val run_with_workset : Workset.t -> 'a task -> 'a
(** [run_with_workset ws t] is an advanced version of [run t] that
does not spawn a temporary domain pool, but instead parks some
asynchronous tasks in the workset [ws] and performs tasks from
[ws] during idle times. This gives maximum flexibility to offload
computation. Note that the calling domain may run jobs from the
workset that are unrelated to the task.
This advanced handler is especially useful in the advanced
use-case of nesting several task handlers. For example, the main
handler may be invoked with [run], but locally some sub-part of
the computation runs under a nested [run_with_workset] invocation
to be offloaded to a different workset feeding a dedicated domain
pool.
*)
(** {2. Effectful operations -- must run in the dynamic scope of a handler.} *)
val fork_join : 'a task -> 'b task -> 'a * 'b
val fork_join_u : unit task -> unit task -> unit task
val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
(int -> unit) -> unit
val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
('a -> 'a -> 'a) -> 'a -> (int -> 'a) -> 'a
val parallel_scan : ('a -> 'a -> 'a) -> 'a array -> 'a array
val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
(int -> 'a option) -> 'a option
type !'a promise
(** Type of promises *)
val async : 'a task -> 'a promise
(** [async t] runs the task [t] asynchronously. The function returns
a promise [r] in which the result of the task [t] will be
stored. *)
val await : 'a promise -> 'a
(** [await r] waits for the promise [r] to be resolved. Other asynchronous tasks
may be performed in the meantime. If the task associated with the promise have
completed successfully, then the result of the task will be returned. If the
task have raised an exception, then [await] raises the same exception. *)
end
I think this is moving in a brighter direction. Question: it seems impossible for Task
to work with more than two anonymous Workset.t
at the same time, and that for non-anonymous ones one would have to call run_with_workset
to switch? It is intentional? If not, perhaps some suggestions in #77 (e.g., using functors) could help?
I'm not sure what your distinction is between anonymous and non-anonymous worksets, but I realized that I forgot to expose a Workset.make
function, which is probably related. It is added now. In my imagination, Task.run
is a convenience helper function that creates a workset, starts a fresh domain pool on that workset, and then calls Task.run_with_workset
.
I think it is possible to mix distinct worksets in the same computation by nesting run_with_workset
handlers, but I would not spend too much effort thinking about this now because it's a corner use-case. (I thought of this when I realized than today async
being parametrized by a pool lets you offload some sub-tasks specifically to a separate pool, and that this might be involved in the serialization/crypto user scenario of using different pools for different parts of the computation.)
Got it. (By "anonymous" I meant "worksets created internally by Task.run
" and by "call run
to switch" I meant exactly "nesting run_with_workset
". Just fixed my previous comment to replace "run
" with "run_with_workset
".)
Is the distinction between workset
and pool
necessary? What's the usecase?
Relatedly, what would be the semantics of set_workset
if the pool is already servicing a workset. Specifically, what happens to the jobs that are currently being serviced by a domain in the pool when a new workset is specified?
My intuition is that separating the two concepts makes them both simpler, easier to specify and reason about. Currently it is weird that a task runner may steal jobs from a domain pool when it is blocked waiting on something else, and I think it is the root cause of the confusion that is discussed in #77. With the separated design, worksets are just a concurrent data-structure storing jobs, while pools are just worker domains that repeatedly service jobs from a workset -- plus the ability to tear them down externally, which cannot be implemented with a Quit
job anymore.
In the previous API, everyone must know about pools, but worksets are a completely internal details of the pool implementation ... except they leak in the Task.run specification. With the proposed API, simple use-cases require knowledge of neither pools nor worksets, Task.run
hides them both. Advanced users can use them if the need arises, but they are mostly there as support module to implement Task runners.
Relatedly, what would be the semantics of
set_workset
if the pool is already servicing a workset. Specifically, what happens to the jobs that are currently being serviced by a domain in the pool when a new workset is specified?
This is a delicate part. Below I will explain my mental model (my best guesses as to what would be going on in your case), but it is likely to be wrong if we don't experiment with an actual prototype implementation.
Mental model:
- There is no notion of "job of a workset currently running': as soon as a domain pool receives a job from a workset (to run it), it is not part of the workset anymore. (In particular,
set_workset
cannot interrupt a running job or anything like that.) - After the call to
set_workset
, the worker domains are not consuming jobs from the previous workset anymore, but other code running may still send new jobs to it. (In particular, maybe the worker domains are still running some jobs coming from a task runner, that will send more asynchronous jobs to the previous workset. The runner knows about the workset, not the pool.) - It may be that those jobs will sit idle without anyone performing them, but note that if the only users of worksets are task runners, then the task-running domains will keep consuming jobs from their respective worksets until they return a final result. (They may leave dangling jobs corresponding to asynchronous tasks whose promises were never forced, though.)
One advanced use-case involving set_workset
would be a system that has a single global pool of worker domains, but that several independent task runners (maybe even running concurrently from several domains). For example, maybe the application has a parmap
helper that is implemented with Task.run_with_workset
plus some worksete-synchronization logic. One possible logic is to share a single workset between all task runners, which gives the same behavior as the current implementation. But another one is to have a local workset for each task runner, and protect the "active workset" of the global pool with a lock: task runners take the lock, set their workset and wait for task completion before releasing the lock. This prevents the task runners from running concurrently, but it guarantees that each task runner will only ever service jobs coming from its own task, and never jobs coming from another runner, which might be important for locality reasons. Note that in this case, set_workset
is only ever called when the current workset is "quiescent", no one is interested in sending more jobs or consuming its jobs anymore. (In fact those worksets become dead and would be garbage-collected.)
I'm not sure whether set_workset
is important to have, it was my latest addition to the API and it is not necessary to express the simple, common use-case. I also considered other functions like Workset.is_currently_empty
and Workset.clear
and decided against adding them, at least until a use-case shows up.
Thinking about it, my own "advanced example" above is a bit silly, because instead of having independent worksets and a lock on set_workset
usage one may just as well lock the usage of the current pool, which can already be done with the current API. set_workset
remains important to express other things, in particular the ability to join two domain pools to work for the same task runner, but again do we care about this extra expressivity?
Thinking about it again: I've grown convinced by @kayceesrk's point that separating pools and worksets is probably API over-engineering. Below is a simplified API where pools have an explicit function to steal a job.
module Pool : sig
(** A pool of worker domains that are dedicated to performing jobs
received from a workset. It is only necessary to deal with Pool
for semi-advanced Task scheduling policies, for example if you
want to share the same domain pool between many short-running
tasks. *)
type t
val spawn : num_worker_domains:int -> pool
val get_num_worker_domains : t -> int
val steal_job_poll : pool -> job option
(** [steal_job_poll p] steals a job of [p] if available.
This function is non-blocking. *)
val teardown : t -> unit
end
module Task : sig
(** A "task" abstraction for determining sub-computations to be
performed in parallel.
Tasks are effectful programs that use the effectful operations
provided by the Task module ([fork_join], [parallel_for],
[async], [await], etc. They must run in the dynamic scope of
a handler, one of the [run_*] functions. *)
type 'a task = unit -> 'a
(** Type of task *)
(** {2. Handlers} *)
val run : ?num_worker_domains:int -> 'a task -> 'a
(** [run t] runs the task [t] synchronously, using a temporary pool of worker domains
to offload asynchronous tasks in parallel.
By default the number of worker domains created is
[Domain.recommended_domain_count () - 1].
*)
val run_with_pool : Pool.t -> 'a task -> 'a
(** [run_with_pool p t] is an advanced version of [run t] that does
not spawn a temporary domain pool, but instead parks some
asynchronous tasks in the given pool [p].
During idle times (when waiting on an asynchronous task), the
calling domain will itself perform jobs from [p] -- note that
those jobs may be unrelated to the task, coming from other users
of the pool. *)
(** {2. Effectful operations -- must run in the dynamic scope of a handler.} *)
val fork_join : 'a task -> 'b task -> 'a * 'b
val fork_join_u : unit task -> unit task -> unit task
val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
(int -> unit) -> unit
val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
('a -> 'a -> 'a) -> 'a -> (int -> 'a) -> 'a
val parallel_scan : ('a -> 'a -> 'a) -> 'a array -> 'a array
val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
(int -> 'a option) -> 'a option
type !'a promise
val async : 'a task -> 'a promise
val await : 'a promise -> 'a
end
I like the new API. I have another idea---can we drop the parallel_
prefix?
iter
/for_
: the currentparallel_for
reduce
: the currentparallel_for_reduce
scan
: the currentparallel_scan
find
: the currentparallel_find
(btw, this is semantically a special case ofreduce
, but potentially supports early termination?)
While we are on it, I wonder if it makes sense to also have a high-level interface on arrays?
init
: parallel initializationiter
: like the low-leveliter
/for
but takes an arraymap
: parallel mapreduce
: like the low-levelreduce
but takes an arrayscan
: the same as the low-levelscan
find
: like the low-levelfind
but takes an array
The new API is better. More questions:
- Is the type
job
the same as atask
? Seesteal_job_poll
. - Do we know that
steal_job_poll
is useful? Is this an API that we're adding without a real usecase? - It is conceivable that the programmer may wish to add a task to a different already existing pool. The current API does not provide this.
- Can we make
pool
an optional argument tofork_*
,async
,await
,parallel_*
? When it is explicitly provided the "task" is serviced by workers on that pool. This causes a little bit of confusion about what the domain is doing when the domain doesfork_*
on a different pool. Does it run tasks from its "current" pool or the target pool. I would think that, since the domain is waiting for the result to be returned, then it should service the tasks on the target pool. - Alternatively, we may say that the only primitive that allows you to run a task on a different pool is
async: ?pool:Pool.t -> task -> 'a promise
. If the user wishes to offload a parallel for loop, for example, they may wrap that in a thunk,async
it to the target pool and get a promise to the result? This is a cheap operation.
- Can we make
- The named pool concept is gone. We should check whether Tezos folks are using that. Without named pools, one needs to pass the state around to the functions that may wish to run a job on a different pool than the "current" one. This needs (3) above.
The distinction between job
and task
is intentional. job
is any (unit -> unit)
function that can be run on any OCaml domain, without making assumptions about the environment. task
is a closure that may perform asynchronous-computation effects, and needs to be run inside a specific handler. The Task
implementation never sends tasks to the workers as-is, it always packages them with effect-handling logic.
Do we know that
steal_job_poll
is useful? Is this an API that we're adding without a real usecase?
In my imagination, steal_job_poll
is used to implement Task.run
: when you await
a promise which is not yet available, the Task handler will steal a job from the pool it and run it. Having this function available influences the mental model of users and let them understand that no, the pool does not "own" the current domain, but yes, the current domain may run jobs from the pool.
It is conceivable that the programmer may wish to add a task to a different already existing pool.
In my imagination again, Task.run_with_pool handlers can be nested, so that sub-computations could in theory be sent to a different pool. In particular, if you have a specific crypto pool, you could define say let async_crypto f = Task.run_with_pool crypto_pool (Task.async f)
and use this helper in your program.
If this idea of nesting handlers works (I would try it first before claiming that it does), I think it is nicer than making pools an optional argument of all task constructors, because it also gives the expressivity to conveniently run larger sub-tasks on a different pool.
The named pool concept is gone.
If I was in charge of designing Domainslib (fortunately I'm not), I would not include named pools, which I think of as premature. It's very easy to do on the user side, and users are likely to do it better (safer) than what is currently proposed.
On the other hand: I used to be very confident that hiding Pool creation under a Task.run
helper, with a more advanced Task.run_with_pool
function, was the right default choice for a convenient API. I am less sure now that I realize (thanks to other, ahem, concurrent discussions) that the only robust design performance-wise is to have a single global pool with n-k domains and never create more. Maybe the current API way to force users to think explicitly about pools is better. If I was to iterate on the proposal again I would probably rename Task.run_with_pool
into Task.run
, get rid of the current Task.run
, and get into the business of making Pool.spawn
convenient again (optional argument? etc.). (Maybe also a Pool.with_
helper that takes a callback and both creates and tear downs the pool?)
Hello!
I've spent some time in the past few weeks experimenting with trying to write highly optimized multicore OCaml code and thinking about some of these issues.
In particular, I've experimented with various work-stealing and work-sharing implementations in my par-ml
project. It basically seems to achieve an order-of-magnitude overhead reduction compared to the current implementation of domainslib
for parallel workloads.
I believe that the realization that
the only robust design performance-wise is to have a single global pool with n-k domains and never create more
mentioned by @gasche has a lot of merit.
My work on the Hopac library, based on ideas from Cilk and Concurrent ML, for F# many years ago was based on that same fundamental idea. .NET and Windows provides several APIs for programming with threads, thread pools, etc. Back when I was working on Hopac I came to the conclusion (after many experiments) that it was not possible to implement super efficient schedulers on top of those existing APIs. That is why Hopac runs its own worker threads and schedules work on those worker threads. This provides very good performance when only Hopac threads are running. Unfortunately most existing applications and libraries use the existing .NET / Windows APIs or use their own worker threads, which then breaks the ideal one worker per thread model.
I believe that we should try to come up with ways for asynchronous and parallel programming libraries to co-exist and co-operate profitably with minimal loss of performance so that applications do not have to unnecessarily choose between competing library ecosystems.
For example, an application might use Eio (ping @talex5) for asynchronous programming and use some other library to run CPU intensive work in parallel. A single application might even use multiple different libraries for asynchronous and parallel programming.
To address that goal I have put up a (work-in-progress) "proposal" for a library for the co-operative allocation of domains or the domains
library.
Please note that domains
is work-in-progress and is mostly meant as a starting point for investigating the basic idea (co-operative allocation of domains). Ultimately similar functionality might be part of the standard Domain
module. Any feedback (critique) on the idea and the (work-in-progress) API is welcome!
I have experimented with the domains
library in my par-ml
library. The experiment is currently in the dcyl-work-stealing
branch. With the current implementation (of domains
and par-ml
) the overhead is just slightly higher than without the domains
layer (but still an order of magnitude smaller than with domainslib
). I'm positive that the added overhead can be made smaller.