Parallel sequence runs consequentially (non-parallel)
Szer opened this issue · 8 comments
Description
I've tried to implement some of my work with asyncSeq.
I have a big job list which I want to process in parallel with some throttling (32 threads maximum for example) and then feed results as stream to other async tasks (saving to disk etc).
Async.Parallel is not good for me because it doesn't have max thread setting
I've also tried slicing job list myself but it was very memory inefficient so that's the moment when I switched to AsyncSeq
Repro steps
Just run the code below
#r @"../../packages/FSharp.Control.AsyncSeq/lib/net45/FSharp.Control.AsyncSeq.dll"
open FSharp.Control
//simulating long IO work
let longWork x = async {
do! Async.Sleep 3000
return x * 2
}
//simulating another async task, saving to HDD for example
let log x = async {
let! x = x
printfn "%A" x
return x
}
let doWorkAndSaveInParallel() =
async {
do! seq {1..100} //original data
|> AsyncSeq.ofSeq
|> AsyncSeq.mapAsync (longWork >> log) //my async workflow
|> AsyncSeq.indexed
|> AsyncSeq.groupBy (fun (i, _) -> i % 32L) //slicing by 32 threads
|> AsyncSeq.mapAsyncParallel
(snd >> AsyncSeq.map snd >> AsyncSeq.iter ignore) //parallel
|> AsyncSeq.iter ignore //iteration
} |> Async.RunSynchronously
Expected behavior
Expected to see numbers 2,4,6...64 after 3 seconds almost simultaneously (in any order), after 3 more seconds there should be numbers 66,68,70..128 etc...
Actual behavior
Number appears one by one with 3 sec interval (no parallel at all)
Known workarounds
Do parallelism myself is only known workaround
Related information
Win10
FSharp.Control.AsyncSeq (2.0.16)
Thx for reporting, will take a look.
If you structure the workflow like this, the handle
function will execute in parallel:
let handle i = async {
do! Async.Sleep 3000
printfn "%i" i
return () }
seq {1..100} //original data
|> AsyncSeq.ofSeq
|> AsyncSeq.indexed
|> AsyncSeq.groupBy (fun (i, _) -> i % 32L) //slicing by 32 threads
|> AsyncSeq.mapAsyncParallel
(snd >> AsyncSeq.map snd >> AsyncSeq.iterAsync handle) //parallel
|> AsyncSeq.iter ignore //iteration
|> Async.RunSynchronously
Otherwise, the execution across groups is in parallel, but the groups are themselves generated sequentially.
I see. So it is not a bug, it is intended :) Should I close it or you? I don't know
Yes, it is intended. AsyncSeq is by default "sequential" and any parallelism imposed via an operation applies to the previous sequence only. So for example, in your workflow, the mapAsyncParallel parallelizes across the groups created by groupBy, but the groups themselves are iterated sequentially, as is the upstream sequence. In other words, the parallelism does not propagate all the way upstream. In order to get the parallelism you want, you just need to re-arrange the workflow a bit. And this type of data-driven parallelism I see used quite often.
Close if you think your issue was addressed. Perhaps this can be made more clear? Or additional parallelism operations implemented?
There is one operation which I lack:
**map**AsyncParallelThrottled
Right now there is only:
iterAsyncParallelThrottled
Don't know why map
version of throttled parallelism was left behind (just like async.map
in FSharp.Core library by the way) Is there any issue with implementation of it?
I ended up resolving a very similar set of issues as documented here, with a testbed by implementing mapAsyncParallelThrottled
as:
module AsyncSeq =
type Async with
static member Throttle degreeOfParallelism f =
let s = new System.Threading.SemaphoreSlim(degreeOfParallelism)
fun x -> async {
let! ct = Async.CancellationToken
do! s.WaitAsync ct |> Async.AwaitTask
try return! f x
finally s.Release() |> ignore }
let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
let throttle = Async.Throttle degreeOfParallelism
AsyncSeq.mapAsyncParallel (throttle f)
@dsyme do you perhaps have an answer, or some thoughts for @Szer's question to close out the tangent this thread became?
Don't know why map version of throttled parallelism was left behind (just like async.map in FSharp.Core library by the way) Is there any issue with implementation of it?
I also have some other follow-on questions this is raising for me:
- would adding an
AsyncSeq.iterAsyncParallel
make sense? - would having an
Async.Throttle : degreeOfParallelism : int -> ('a -> Async<'b>) : ('a -> Async<'b>)
fslang suggestion make any sense? - and, if both existed, would one deprecate
AsyncSeq.iterAsyncParallelThrottled
on the basis that the throttling can then become an orthogonal concern thatAsyncSeq
can rid itself of? - would an fslang suggestion for an
Async.ParallelThrottled : degreeOfParallelism : int -> seq<Async<'t>> : 't []
pipelineable variant ofAsync.Parallel(computations, degreeOfParallism)
make sense?
I'm not sure why it was left behind - it sounds like it would make perfect sense.
1 - yes
2 - that seems much like Async.Parallel with maxDegreeOfParallelism https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel
3 - I don't think so. A separate throttling is needed at each construct that introduces parallelism, it's not compositional throughout Async computations
4 - no, we don't introduce new methods just to support pipelining/currying
Thanks for the follow-up. Some rebuttals / clarifications; apologies for the walls of text:
1 - Yes
Hm, mea culpa: it seems as of FSharp.Control.AsyncSeq
v 2.0.23
an AsyncSeq.iterAsyncParallel
already exists, so consider this follow-up closed.
2 - that seems much like Async.Parallel with maxDegreeOfParallelism https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel
It would provide a similar throttling effect by having the computation first acquire a semaphore, and release it on exit. It would be true to say that it would not provide any useful function when compared to simply using the maxDegreeOfParallelism
parameter of Async.Parallel
.
However the idea would be that this operator can be applied generically over any Async
computation, regardless of how the parallelism is introduced. In particular, this would allow one to achieve the effect of an AsyncSeq.mapParallelThrottled
by using the existing AsyncSeq.mapParallel
but wrapping the function you're passing in this semaphore. If such a throttle operator was supplied, it could be used in a variety of situations (see the sample impl in my preceding post). As an example, I've used this to limit the parallelism within sections of an Async workflow where one is performing IO that you want to limit - i.e. you might be happy to run the overall computation with DOP 16, but constrain the parallelism of concurrent reads to a data store to say 4 in order to self-govern the resource consumption (e.g. to limit RU consumption during a read phase)
So the question, for me, remains: would an orthogonal operator to govern the degree of parallelism without introducing that parallelism be acceptable in type Async
, or is the intention that every operator in Async, AsyncSeq etc that offers a XyzParallel
provide a ?degreeOfParalleism
argument which can be used to layer that effect in without it being also offered as a separable governing function as I have illustrated in my OP?
3 - I don't think so. A separate throttling is needed at each construct that introduces parallelism, it's not compositional throughout Async computations
I'm not sure what you mean by this - I appreciate that you can't and/or wouldnt want to limit parallelism in a compositional way.
I can appreciate that each of:
a) Async.Parallel
b) AsyncSeq.iterAsyncParallel
c) AsyncSeq.mapParallel
,
introduce parallelism.
For each, there are related ways of constraining that parallelism, respectively:
a) the ?degreeOfParallelism
optional arg
b) using the sister method Async.interAsyncParallelThrottled
(which happens to be be pipelining friendly, unlike Async.Parallel
...)
c)
- via the
AsyncSeq.mapParallelThrottled
sister method as proposed by @Szer in the OP (and implemented in my previous post) - or by combining my suggestion no 2 of having a
Async.Throttle
with the existingAsyncSeq.mapParallel
(as illustrated in my previous post)
I'm happy to accept a no on the decoupling of the throttling aspect from the parallizing aspect (and the general principle that anything that introduces parallelism should also offer a mechanism to optionally limit that too).
However I do believe that having an operator whose function is solely to throttle within a situation where the parallelism is already present for other reasons is useful so wanted to explain myself properly so its not ruled out without the goal being thoroughly understood.
4 - no, we don't introduce new methods just to support pipelining/currying
I can appreciate that in general providing pipelining-friendly sister methods for every case would not be a good design. My reason for suggesting it in this instance is that, as illustrated in my expansion of point 3, its the only one Async/AsyncSeq operator introduces parallelism that is not a) pipelining friendly and b) explicitly named with the term Throttled
directly in the member
/ function name itself.
It is my experience that:
Async.Parallel
is typically applied via pipelining (as isAsync.Sequential
)- quite frequently constraining the degree of parallelism is something that should be considered but is not (it always comes up in code reviews)
While I can appreciate that AsyncSeq
and Async
have had quite separate evolutions and do not need to align to a fault, I do believe that not providing an explicit Async.parallelThrottled
is all but an inconsistency in this single case (I'm am absolutely not seeking to have a raft of Async.map
or other such pipelining affordances added)
In summary, its my belief that an exception should be made to the general principle for an explicit Async.ParallelThrottled
(or parallelThrottled
) on the basis that Async.Parallel
is frequently used via pipelining, yet the API design means that but can be cumbersome to do that (or even be aware of the optional arg, which arguably one should be explicitly opting out of using rather than into). (Also that naming and design is more consistent with how AsyncSeq represents parallelism)
So, the same questions again with my assumed answers:
- Add
Async.mapParallelThrottled
- assuming yes to an fslang suggestion to add - Add
AsyncSeq.iterParallel
- already existed, sorry - Add
Async.Throttle
as above (governing executions of a given function as shown above) without introducing parallelism ? - Agreed; everything that introduces parallelism should have a way to limit the degree thereof (case 0 being an instance of this)
- No to adding an
Async.parallelThrottled
or similar; extra functions/operators in the box to support pipelining is a non goal, despite my additional pleas above ;)