Rework timely layers
frankmcsherry opened this issue · 1 comments
This is a proposal for a different layering of the timely dataflow, intended to tease apart different levels of functionality and allow us to compartmentalize some aspects a bit better (e.g. progress tracking).
It seems like we could have a layering that includes the following three layers:
-
Communication fabric.
This is essentially the currenttimely::communication
module, except rather than baking in "what to do with messages" we allow the user to specify this as part of receiving the messages. I'm thinking of a trait replacingAllocate
that looks something like:trait Fabric { fn index(&self) -> usize; fn peers(&self) -> usize; // This may have a hard time being generic... type Message; /// Analogous to `allocate`. fn channel(&mut self) -> (Vec<Box<Push<Self::Message>>>, Box<Pull<Self::Message>>); /// Analogous to `pre_work`, this responds to each received message. fn receive<F: Fn(Header, Self::Message)>(&mut self, action: F); }
The idea is that the work currently done by the various communicators, putting received messages into per-channel queues, gets handled in
receive
instead (or even avoiding that step if at all possible; you could in principle putmap
andfilter
operators right inreceive
).One downside of the above is that the associated type
Message
can't easily be generic, which may mean that we need to useBox<Any>
to transit typed allocations (or roll our own variant). This is not currently required, but if we want thereceive
logic to notice the arrival of each extra-worker message, we need to do something here. We could have two lines of communication: typed channels and a "please schedule me" message, but we could also start here and treat the other as an optimization. This choice also removes some opportunistic concurrency from the processing (a worker would only see those messages that had been received by the timereceive
was last called, and not any received while it is doing work). -
Tasklet scheduling.
The above has no clue about the relationship between messages and what you might do with them. This next layer would introduce the idea of a "process" and its "threads", for lack of better terms (things that are currently "dataflow" and "scopes/operators" respectively). Actually, it seems that fiber is perhaps the better term. In particular, this layer understands "addresses" in the form&[usize]
indicating a path along nested tasklets to something that needs to run (again, nested scopes / operators in a dataflow).The main feature this layer adds is event-based scheduling. I'm imagining that what is currently
Worker
would have a new methodfn activate(&mut self, address: &[usize]);
and that what is currently the
Operate
trait will have its methodspush_external_progress
andpull_internal_progress
replaced byfn schedule(&mut self, activations: &mut Activations) -> bool;
where
Activations
is a tree-structured representation of which children need processing. I'm thinking something like:/// Tree-shaped requested activations. struct Activations { activated: Vec<usize>, children: Vec<Activations>, }
where the
activated
member indicates which ofchildren
need to be scheduled. This could all change if we figure out that scheduling a tasklet needs more information about what to do (right now all are scheduled, with no information). -
Timely dataflow (above + progress)
The above has no information about dataflow structure or progress information. I'm a bit torn about whether we should have a pure dataflow layer, and put progress on top of that, or have a fused layer that does both of them (certain concepts, like "termination", only result from progress tracking).
In any case, we would want to layer on top of the tasklets above dataflow structure and progress tracking. Whereas this was previously part of the scheduling (i.e. calls to
pull_internal_progress
) it is now on the side, with the intent being that a scope shares someRc<RefCell<ChangeBatch<T>>>
with its managed children, and atomically pulls this information out at various moments.Previously this happened as part of scheduling because there was (and still is, I think) no way that progress information could manifest without scheduling the operator, and no reason not to harvest progress information once reported. The decoupling probably isn't mandatory, and is proposed mainly to help clarify the specific obligations surround progress tracking (it can be collected and updated independent of operator scheduling, even if we don't have a great reason to do that).
I'm not exactly sure of the plan of attack on this issue, probably just starting by rewriting bits of the lowest layer and seeing what breaks, then moving up. Posting for now to solicit comments!
Addressed in #206.