TimelyDataflow/timely-dataflow

About the order of operator schedule.

Closed this issue · 6 comments

bmmcq commented

.for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));

Hi,Frank, I'm reading the event-driven codes. At this line, you push the active operator's index into the priority queue, where the less index will be popped first. I have an adventurous idea if it is also applicable to pop the queue with reverse order. In this way, the operator with a larger index will be scheduled first, and the buffered inputs for this operator will be consumed immediately. Otherwise, the inputs of the operator with the larger index will linger until the parent operators which would produce more inputs have been scheduled, it may be not memory friendly.

It's an interesting question which order is best for processing. In the future, I'd like to make it something you can programmatically alter.

For the moment, though, processing operators in topological order has some appealing properties. Mainly, it guarantees that we do at most one pass through the operators, and we avoid what could be an unbounded amount of work (either with a loop present, or in a "complete" digraph doing an exponential amount of work).

You are right that we might prefer to schedule immediately downstream operators especially if it looks like a pipeline locally, and that could be something a user indicates. What do you think about the possibility that you get to specify a total order on the operators, and they are processed once in that order? Would that suffice, or is the intent to return to an operator multiple times if it has multiple inputs?

Also, how does the event driven scheduling work for you folks? I tried mocking up an example that looked a bit like your test case (in your document) and performance improved, but if you have specifics to report that would be useful too.

bmmcq commented

I think it is sufficient if I can plugin-in a scheduling strategy processing operators in arbitrary order.

As it seems that the flow control is more emergency in our scenarios at this moment, we haven't tested the performance improvement of the event-driven scheduling so far. We read the paper of "Faucet", and try to figure out whether it is enough for our tasks. Currently, I don't know is the Faucet pattern also effective for loops?

You should be able to use Faucet-style techniques in loops, in principle. It is a bit hard to say anything too precise without knowing about the computation.

Generally the Faucet work suggests putting a regulator in place (wherever "expansion" happens, like in a flatmap or a join), and this regulator should space out released records so that batches are only released as prior batches complete. Exactly what they complete is a different question, that depends on your needs.

Here is an example. Let's imagine that you are writing a loop, and you use a u64 for your loop coordinate, and you use the high 32 bits for the round of iteration. You can now use the low 32 bits for Faucet-style batch identifiers. As timestamps go around the loop you increment them by one (in the low bits) but can then delay them to effect a high-order round increment (you may not want to promise to always increment each timestamp by 2^32, as this does not allow you to reset the batch).

Alternately, you have a loop whose coordinates are (round, batch) but which use the lexicographic order (probably best to stay with high / low bits for the moment).

Your regulator is able to withhold (round, batch+1) until all of (round, batch) have cleared the end of the loop. There may still be (round + 1, ??) records circulating, but by the same reasoning there should be at most one batch of them, and each round of iteration.

bmmcq commented

I do not quite understand. If there is an input with a timestamp '[0,0]' will loop n times, and imagine that in each circulation the output will 100 times larger than the input. So when the first circulation finishing there will be 100 *'[0,0]', what should I do? If I hold 90 '[0,1]' and let 10 '[0,0]' to re-enter the circulation, there will be 1000 * '[1,0]' back(plus 90 '[0,1]', there are 1090 data, is it right?). With more circulation, there will be more data to be held by the Batcher. Although only one batch data is in circulating, the Batcher hold too many data. I think data with higher round should be scheduled first, and leave the loop quickly, like dfs.

On the other hand, if I have a chain of Flatmap, it seems run as dfs saving more memory, but with poor performance.

We think timely-dataflow use data-driven scheduling, what do you think about demand-driven, is it not appropraite for a distributed dataflow framework.

If there is a factor 100 increase in each iteration, then I expect that the batcher would hold roughly a number of records bounded by

100 * max_rounds * batch_size

This is approximately what you would get with a single-threaded DFS approach that has access to a method to produce the 100 outputs, and a recursive method to call for the next iteration. The factor of batch size is something you can avoid if you integrate the batcher with the operator that does the 100x expansion.

You get to choose which batches to release in the batcher. If you hold a set of capabilities for various timestamps, you are welcome to release data only for the batch with the largest round, even if multiple rounds are "ready". It is the same single operator making this decision, not different operators for each round, so you should have the ability to encode some amount of DFS vs BFS in here. It may be hard to do exactly, however, if you want subsequent invocations of the operator to withhold (round, _) timestamps as long as a (round+1,_) timestamp exists; the former could result in the latter, and its existence masks details about the presence or absence of the latter.

It is possible that what you actually want is a loop indexed by batch identifier, rather than by round of iteration, and to stash the round of iteration as data (or as low-order bits under the batch identifier).

Again, really hard to speculate here. If you write down a simplified program we could talk more specifically.

Timely dataflow does use data-driven scheduling, and demand-driven scheduling would be a substantial re-think. You get hard questions like: "at the exit point of a loop, what do you 'demand' from inside the loop?". It appears much easier to push work forward and see what happens than to pull work where you have to correctly guess in which rounds it will complete.

It is possible that what you actually want is a loop indexed by batch identifier, rather than by round of iteration, and to stash the round of iteration as data (or as low-order bits under the batch identifier).

I think this may actually be the right thing here, but I'll need to ponder how to explain it best. Although we can put things like "round of iteration" in the timestamp, what we are really looking to express in timestamps is "what dependencies should our computation reflect".

In your case it sounds like the most important thing is that various batches clear the iteration, rather than that prior iterations fully complete. The latter is more important for e.g. differential dataflow, where it needs to know it has the correct answer for an outer timestamp (so it can do its differencing correctly), but the former may make more sense where your goal is to serialize the completion of batches within the iteration (because you face resource exhaustion otherwise).