fsprojects/FSharp.Control.AsyncSeq

Parallel sequence runs consequentially (non-parallel)

Szer opened this issue · 8 comments

Szer commented

Description

I've tried to implement some of my work with asyncSeq.
I have a big job list which I want to process in parallel with some throttling (32 threads maximum for example) and then feed results as stream to other async tasks (saving to disk etc).
Async.Parallel is not good for me because it doesn't have max thread setting
I've also tried slicing job list myself but it was very memory inefficient so that's the moment when I switched to AsyncSeq

Repro steps

Just run the code below


#r @"../../packages/FSharp.Control.AsyncSeq/lib/net45/FSharp.Control.AsyncSeq.dll"

open FSharp.Control

//simulating long IO work
let longWork x = async {    
    do! Async.Sleep 3000
    return x * 2
}

//simulating another async task, saving to HDD for example
let log x = async {
    let! x = x    
    printfn "%A" x
    return x
}

let doWorkAndSaveInParallel() =
    async {
        do! seq {1..100}                                          //original data
            |> AsyncSeq.ofSeq
            |> AsyncSeq.mapAsync (longWork >> log)                //my async workflow
            |> AsyncSeq.indexed
            |> AsyncSeq.groupBy (fun (i, _) -> i % 32L)           //slicing by 32 threads
            |> AsyncSeq.mapAsyncParallel 
                (snd >> AsyncSeq.map snd >> AsyncSeq.iter ignore) //parallel
            |> AsyncSeq.iter ignore                               //iteration
    } |> Async.RunSynchronously

Expected behavior

Expected to see numbers 2,4,6...64 after 3 seconds almost simultaneously (in any order), after 3 more seconds there should be numbers 66,68,70..128 etc...

Actual behavior

Number appears one by one with 3 sec interval (no parallel at all)

Known workarounds

Do parallelism myself is only known workaround

Related information

Win10
FSharp.Control.AsyncSeq (2.0.16)

Thx for reporting, will take a look.

If you structure the workflow like this, the handle function will execute in parallel:

let handle i = async {
  do! Async.Sleep 3000
  printfn "%i" i
  return () }

seq {1..100}                                          //original data
|> AsyncSeq.ofSeq
|> AsyncSeq.indexed
|> AsyncSeq.groupBy (fun (i, _) -> i % 32L)           //slicing by 32 threads
|> AsyncSeq.mapAsyncParallel 
    (snd >> AsyncSeq.map snd >> AsyncSeq.iterAsync handle) //parallel
|> AsyncSeq.iter ignore                               //iteration
|> Async.RunSynchronously

Otherwise, the execution across groups is in parallel, but the groups are themselves generated sequentially.

Szer commented

I see. So it is not a bug, it is intended :) Should I close it or you? I don't know

Yes, it is intended. AsyncSeq is by default "sequential" and any parallelism imposed via an operation applies to the previous sequence only. So for example, in your workflow, the mapAsyncParallel parallelizes across the groups created by groupBy, but the groups themselves are iterated sequentially, as is the upstream sequence. In other words, the parallelism does not propagate all the way upstream. In order to get the parallelism you want, you just need to re-arrange the workflow a bit. And this type of data-driven parallelism I see used quite often.

Close if you think your issue was addressed. Perhaps this can be made more clear? Or additional parallelism operations implemented?

Szer commented

There is one operation which I lack:
**map**AsyncParallelThrottled

Right now there is only:
iterAsyncParallelThrottled

Don't know why map version of throttled parallelism was left behind (just like async.map in FSharp.Core library by the way) Is there any issue with implementation of it?

I ended up resolving a very similar set of issues as documented here, with a testbed by implementing mapAsyncParallelThrottled as:

module AsyncSeq =
    type Async with
        static member Throttle degreeOfParallelism f =
            let s = new System.Threading.SemaphoreSlim(degreeOfParallelism)
            fun x -> async {
                let! ct = Async.CancellationToken
                do! s.WaitAsync ct |> Async.AwaitTask
                try return! f x
                finally s.Release() |> ignore }
    let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
        let throttle = Async.Throttle degreeOfParallelism
        AsyncSeq.mapAsyncParallel (throttle f)

@dsyme do you perhaps have an answer, or some thoughts for @Szer's question to close out the tangent this thread became?

Don't know why map version of throttled parallelism was left behind (just like async.map in FSharp.Core library by the way) Is there any issue with implementation of it?

I also have some other follow-on questions this is raising for me:

  1. would adding an AsyncSeq.iterAsyncParallel make sense?
  2. would having an Async.Throttle : degreeOfParallelism : int -> ('a -> Async<'b>) : ('a -> Async<'b>) fslang suggestion make any sense?
  3. and, if both existed, would one deprecate AsyncSeq.iterAsyncParallelThrottled on the basis that the throttling can then become an orthogonal concern that AsyncSeq can rid itself of?
  4. would an fslang suggestion for an Async.ParallelThrottled : degreeOfParallelism : int -> seq<Async<'t>> : 't [] pipelineable variant of Async.Parallel(computations, degreeOfParallism) make sense?
dsyme commented

I'm not sure why it was left behind - it sounds like it would make perfect sense.

1 - yes

2 - that seems much like Async.Parallel with maxDegreeOfParallelism https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel

3 - I don't think so. A separate throttling is needed at each construct that introduces parallelism, it's not compositional throughout Async computations

4 - no, we don't introduce new methods just to support pipelining/currying

Thanks for the follow-up. Some rebuttals / clarifications; apologies for the walls of text:

1 - Yes

Hm, mea culpa: it seems as of FSharp.Control.AsyncSeq v 2.0.23 an AsyncSeq.iterAsyncParallel already exists, so consider this follow-up closed.

2 - that seems much like Async.Parallel with maxDegreeOfParallelism https://fsharp.github.io/fsharp-core-docs/reference/fsharp-control-fsharpasync.html#Parallel

It would provide a similar throttling effect by having the computation first acquire a semaphore, and release it on exit. It would be true to say that it would not provide any useful function when compared to simply using the maxDegreeOfParallelism parameter of Async.Parallel.

However the idea would be that this operator can be applied generically over any Async computation, regardless of how the parallelism is introduced. In particular, this would allow one to achieve the effect of an AsyncSeq.mapParallelThrottled by using the existing AsyncSeq.mapParallel but wrapping the function you're passing in this semaphore. If such a throttle operator was supplied, it could be used in a variety of situations (see the sample impl in my preceding post). As an example, I've used this to limit the parallelism within sections of an Async workflow where one is performing IO that you want to limit - i.e. you might be happy to run the overall computation with DOP 16, but constrain the parallelism of concurrent reads to a data store to say 4 in order to self-govern the resource consumption (e.g. to limit RU consumption during a read phase)

So the question, for me, remains: would an orthogonal operator to govern the degree of parallelism without introducing that parallelism be acceptable in type Async, or is the intention that every operator in Async, AsyncSeq etc that offers a XyzParallel provide a ?degreeOfParalleism argument which can be used to layer that effect in without it being also offered as a separable governing function as I have illustrated in my OP?

3 - I don't think so. A separate throttling is needed at each construct that introduces parallelism, it's not compositional throughout Async computations

I'm not sure what you mean by this - I appreciate that you can't and/or wouldnt want to limit parallelism in a compositional way.

I can appreciate that each of:
a) Async.Parallel
b) AsyncSeq.iterAsyncParallel
c) AsyncSeq.mapParallel,
introduce parallelism.

For each, there are related ways of constraining that parallelism, respectively:
a) the ?degreeOfParallelism optional arg
b) using the sister method Async.interAsyncParallelThrottled (which happens to be be pipelining friendly, unlike Async.Parallel...)
c)

  • via the AsyncSeq.mapParallelThrottled sister method as proposed by @Szer in the OP (and implemented in my previous post)
  • or by combining my suggestion no 2 of having a Async.Throttle with the existing AsyncSeq.mapParallel (as illustrated in my previous post)

I'm happy to accept a no on the decoupling of the throttling aspect from the parallizing aspect (and the general principle that anything that introduces parallelism should also offer a mechanism to optionally limit that too).

However I do believe that having an operator whose function is solely to throttle within a situation where the parallelism is already present for other reasons is useful so wanted to explain myself properly so its not ruled out without the goal being thoroughly understood.

4 - no, we don't introduce new methods just to support pipelining/currying

I can appreciate that in general providing pipelining-friendly sister methods for every case would not be a good design. My reason for suggesting it in this instance is that, as illustrated in my expansion of point 3, its the only one Async/AsyncSeq operator introduces parallelism that is not a) pipelining friendly and b) explicitly named with the term Throttled directly in the member / function name itself.

It is my experience that:

  • Async.Parallel is typically applied via pipelining (as is Async.Sequential)
  • quite frequently constraining the degree of parallelism is something that should be considered but is not (it always comes up in code reviews)

While I can appreciate that AsyncSeq and Async have had quite separate evolutions and do not need to align to a fault, I do believe that not providing an explicit Async.parallelThrottled is all but an inconsistency in this single case (I'm am absolutely not seeking to have a raft of Async.map or other such pipelining affordances added)

In summary, its my belief that an exception should be made to the general principle for an explicit Async.ParallelThrottled (or parallelThrottled) on the basis that Async.Parallel is frequently used via pipelining, yet the API design means that but can be cumbersome to do that (or even be aware of the optional arg, which arguably one should be explicitly opting out of using rather than into). (Also that naming and design is more consistent with how AsyncSeq represents parallelism)


So, the same questions again with my assumed answers:

  1. Add Async.mapParallelThrottled - assuming yes to an fslang suggestion to add
  2. Add AsyncSeq.iterParallel - already existed, sorry
  3. Add Async.Throttle as above (governing executions of a given function as shown above) without introducing parallelism ?
  4. Agreed; everything that introduces parallelism should have a way to limit the degree thereof (case 0 being an instance of this)
  5. No to adding an Async.parallelThrottled or similar; extra functions/operators in the box to support pipelining is a non goal, despite my additional pleas above ;)