Hopac/Hopac

How to conCollect but stop on first exception?

Opened this issue · 8 comments

Consider the following:

Job.conCollect [
  job {
    printfn "start 1"
    do! timeOutMillis 1000
    printfn "end 1"
  }
  job {
    printfn "start 2"
    do! timeOutMillis 2000
    printfn "end 2"
  }
  job {
    printfn "start 3"
    failwith "foo"
  }

When run, this prints all the "start x" messages, waits for one second, prints "end 1", waits another second, prints "end 2", and then immediately throws.

I would like to run jobs in parallel (and get their results), but stop on the first exception, because (in my use-case) if one of the jobs throws an exception, the whole request fails and it makes no sense to wait for anything else, or to return more than the first exception.

Is there a simple way to do this in Hopac?

Is there no way in Hopac to stop on the first exception when running jobs concurrently?

@cmeeren You can try mapPipelinedJob

Thanks. Never seen that function, but judging from the description, it seems to limit parallelism. This is not what I'm trying to do. Also, the description even says

Note that due to the sequential nature of streams, this simply isn't an effective solution for fine grained parallel processing.

So I'm a bit wary of doing that.

Do you have an example of how it would be used? Do I need to worry about performance? Are there other ways of accomplishing this?

I've figured out a way to do this. The idea is to use an IVar like it's a cancellation token, and wrap your jobs inside alts that will commit to the IVar if a cancellation happens. Use Job.tryWith to fill the IVar if an exception is thrown.

The naive implementation of this concept seems like it might work...

/// Builds a job composed of a list of concurrent jobs
let wontCancel () : Job<ResizeArray<unit>> =
    /// Fill this with () if we need to cancel
    let cancel : IVar<unit> = IVar()
    /// Wraps a job in an exception handler, and fills cancel if it raises
    let attempt (atomicJob: Job<unit>) : Job<unit> =
        Job.tryWith atomicJob (fun exn -> IVar.tryFill cancel ())
    /// Wraps a job in an alt that would prefer our cancellation
    /// over the result of our job
    let attemptWithCancel (atomicJob: Job<unit>) : Alt<unit> =
        // This will be available as soon as we cancel
        IVar.read cancel
        // This will be available when our job completes,
        // but our job won't even start if we already cancelled
        <|> (Alt.unit () ^=> fun () -> attempt atomicJob)
    Job.conCollect [
        attemptWithCancel <| job {
            printfn "start 1"
            do! timeOutMillis 1000
            printfn "end 1" }
        attemptWithCancel <| job {
            printfn "start 2"
            do! timeOutMillis 2000
            printfn "end 2" }
        attemptWithCancel <| job {
            printfn "start 3"
            failwith "some exception" }
    ]

Why won't these jobs cancel? Jobs are very lightweight units of work, which are essentially atomic unless you write them otherwise. Once a job is started, it won't be cancelled under normal conditions. So if IVar.read cancel is committed to, but attempt atomicJob has already been started by the scheduler, atomicJob will probably complete and then have its results discarded by the <|>.

So in order to actually respect a cancellation, your atomic jobs will need to be converted to job pipelines that check the status of the cancellation at each step. They'll also need to pipe forward some form of working state:

type WorkState<'a, 'b> =
  | Started
  | Ongoing of 'a
  | Completed of 'b
  | Cancelled

/// Builds a job composed of a list of concurrent jobs
let willCancel () : Job<ResizeArray<WorkState<unit, string>>> =
    /// Fill this with () if we need to cancel
    let cancel : IVar<WorkState<unit, string>> = IVar()
    /// Wraps a job in an exception handler, and fills cancel if it raises
    let attempt (composedJob: Job<WorkState<'a, 'b>>) : Job<_> =
        Job.tryWith composedJob
            (fun exn -> IVar.tryFill cancel Cancelled >>-. Cancelled)
    /// Checks whether we cancelled, and if not, does the job
    let doIfOk (atomicJob: Job<WorkState<unit, string>>) : Alt<_> =
        // This will be available as soon as we cancel
        IVar.read cancel
        // This will be available when our job completes,
        // but our job won't even start if we already cancelled
        <|> (Alt.unit () ^=> fun () -> atomicJob)
    Job.conCollect [
        attempt (doIfOk <| job {
                printfn "start 1"
                return Started }
            >>= (fun state -> doIfOk (timeOutMillis 1000 >>-. Ongoing ()))
            >>= (fun state -> doIfOk <| job {
                    let rsp = "end 1"
                    printfn "%s" rsp
                    return Completed rsp } ))
        attempt (doIfOk <| job {
                printfn "start 2"
                return Started }
            >>= (fun state ->
                doIfOk (Job.raises (Exception()) >>-. Cancelled)))
    ]

An alternative approach might involve the Alt.nackWithJob family, or related infixes like *<-=>-. These are more complicated to get started with, but will ultimately push your architecture toward using e.g. Job.iterateServer, which gives you robust internal microservers that can respond to requests and keep track of their own private state.

As an aside, I think a lot of idiomatic F# doesn't really use exceptions much, preferring to catch exceptions close to the source, and then pass around union types like WorkState in this example (or the widely used Result). But of course go with what works for you! I hope this is helpful!

Thanks! I have to admit the code is far beyond my current understanding of Hopac. It certainly seems highly non-trivial. So I'm a bit wary of copy-pasting it and using it in critical infrastructure. Do you know anything about the performance implications? I am for example concerned with the extra allocations for each job due to WorkState.

It would also be great to get a Hopac maintainer's perspective on the soundness of the proposed solution.

Yeah, it's certainly non-trivial. I edited the examples above to add some comments and type annotations, so I hope that's a little clearer. I'll also break a few things down:

  • The top-level functions do need to be functions that build concurrent jobs, so you get a fresh cancellation IVar each time.
  • An IVar is a receptable for a variable that can only be filled once, and is then immutable. Reading the value of an IVar with IVar.read gives you an Alt (see next point). You can read it as many times as you want, but reading its value won't complete until it the value is filled.
  • An Alt is a job alternative. Alts are in fact jobs (Alt inherits Job). When you compose Alts (like with <|>), you get a bigger Alt that chooses (or commits to) the first available result from one of the smaller Alts inside it. Then after it commits you can do more work with the result, such as with ^=> or Alt.afterJob.
  • Job.tryWith composes a job that works like try ... with exn -> ...
  • >>-. composes a job that: does this job >>-. and returns this value
  • >>= composes a job that: does this job >>= and passes its result -> to this job (and does this job as well)

In terms of performance, (edited bad oversimplification) F#/.NET runtime with server GC should handle the closures and reference allocations in this example easily. Running lots and lots of small jobs is where Hopac's performance excels compared to competing frameworks.
Edited to add:

  • don't block inside jobs
  • don't Hopac.run inside jobs (Job.queue is ok but would need the cancellation forwarded)
  • don't make your working state a large value type. If you're doing image processing or something like that, wrap your working data in a reference type to avoid duplicated memory
  • don't put <|>, Alt.choose, etc. (or doIfOk) inside a rec function (see #192)

Anecdotally, my main codebase passes around a lot more state than this, with much deeper pipelines, and it's still very fast after the first call (when the F# runtime instantiates all the types and so forth). Hopac is really well optimized in my experience, at least what I've used.

But if you did want to try this approach, I'd suggest mocking something up that's more realistic to your scenario and benchmarking it before reworking your production code.

I hope that makes a bit more sense!

It also occurred to me that you could use latches instead of alts, which might lead to a more elegant design.

Thank you, that is very helpful. I will look carefully at your code and explanations when time permits.