futureverse/future

Incremental async reduction, e.g. value(fs, reduce = ...)

Opened this issue · 5 comments

Asynchronous incremental reduction

Asynchronous incremental reduction applies a reduction function to incremental
results as they are delivered (that is, asynchronously). It's implemented in
foreach using the .combine function argument. That argument is simply a
standard reduction function similar the argument used by standard R Reduce.
Because no order is guaranteed, the reduction function must be commutative and
associative (however, foreach also provides an .inorder argument that caches results
locally untill order can be guaranteed).

This can be (very) useful in cases when the full Map result is too large to fit
in memory and the reduction function output is small. Using incremental async
reduce in those cases lowers memory pressure on the coordinating R process.
Note that, as far as I know, async reduce is only implemented in doRedis (edit: on CRAN anyway, I know that there were a number of forks of doMPI that had something similar).

In principle, this might be achieved in future using the resolve function by
adding a reduction function argument? That could be an elegant solution.

Alternatively, the resolved function plus looping and polling might be able
to achieve something similar, as sketched in the doc for resolved:

It should also be possible to use the method for
polling the future until it is resolved (without having to wait
infinitely long)

But there are no examples, and this also seems cumbersome.

In principle, this might be achieved in future using the resolve function by adding a reduction function argument? That could be an elegant solution.

Segue: value() is a generic function. The S3 methods for lists, environments, and list environments (listenv package) gather the results for the input set of futures as they get ready. The results are return in the same order as the input. Now, they're implemented to also preserve the relaying order of stdout and conditions, but as soon as they can, they re-output/re-signal them. (There's also a mechanism to drop them from memory when no longer needed).

I can imagine that the value() function could support an optional reduction argument for doing reduction "on the fly".

The only way I can see a reduction function to work properly is that it reduces the elements in the same order as Reduce() would do on the final result. That is, we need to reduce the "head" and the next element in order. (This is the same constraint that stdout and conditions are relayed).

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

Q. In the extreme, if the result of the first element comes in last, then you cannot do any reduction (same for relaying of output and conditions). Is this how you also anticipate this to work? Or do you think there should be support for relaxing this constrain, e.g. reduce as soon as possible?

Oh, I forgot that you already wrote "Because no order is guaranteed, the reduction function must be commutative and
associative". So, yes, that would make it possible to reduce ASAP. And then you wrote "(however, foreach also provides an .inorder argument that caches results locally untill order can be guaranteed)", which I think should also be supported.

So, maybe something like:

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = `+`)

and

fs <- lapply(1:3, function(x) future(sqrt(x))
y <- value(fs, reduce = structure(`-`, inorder = TRUE))

Maybe inorder = TRUE should be the default, to minimize the risk for silent bugs.

What's an example where multicombine/maxcombine is needed? Why wouldn't pairwise reduce be sufficient?