How to conCollect but stop on first exception?
Opened this issue · 8 comments
Consider the following:
Job.conCollect [
job {
printfn "start 1"
do! timeOutMillis 1000
printfn "end 1"
}
job {
printfn "start 2"
do! timeOutMillis 2000
printfn "end 2"
}
job {
printfn "start 3"
failwith "foo"
}
When run, this prints all the "start x" messages, waits for one second, prints "end 1", waits another second, prints "end 2", and then immediately throws.
I would like to run jobs in parallel (and get their results), but stop on the first exception, because (in my use-case) if one of the jobs throws an exception, the whole request fails and it makes no sense to wait for anything else, or to return more than the first exception.
Is there a simple way to do this in Hopac?
Is there no way in Hopac to stop on the first exception when running jobs concurrently?
Thanks. Never seen that function, but judging from the description, it seems to limit parallelism. This is not what I'm trying to do. Also, the description even says
Note that due to the sequential nature of streams, this simply isn't an effective solution for fine grained parallel processing.
So I'm a bit wary of doing that.
Do you have an example of how it would be used? Do I need to worry about performance? Are there other ways of accomplishing this?
I've figured out a way to do this. The idea is to use an IVar like it's a cancellation token, and wrap your jobs inside alts that will commit to the IVar if a cancellation happens. Use Job.tryWith
to fill the IVar if an exception is thrown.
The naive implementation of this concept seems like it might work...
/// Builds a job composed of a list of concurrent jobs
let wontCancel () : Job<ResizeArray<unit>> =
/// Fill this with () if we need to cancel
let cancel : IVar<unit> = IVar()
/// Wraps a job in an exception handler, and fills cancel if it raises
let attempt (atomicJob: Job<unit>) : Job<unit> =
Job.tryWith atomicJob (fun exn -> IVar.tryFill cancel ())
/// Wraps a job in an alt that would prefer our cancellation
/// over the result of our job
let attemptWithCancel (atomicJob: Job<unit>) : Alt<unit> =
// This will be available as soon as we cancel
IVar.read cancel
// This will be available when our job completes,
// but our job won't even start if we already cancelled
<|> (Alt.unit () ^=> fun () -> attempt atomicJob)
Job.conCollect [
attemptWithCancel <| job {
printfn "start 1"
do! timeOutMillis 1000
printfn "end 1" }
attemptWithCancel <| job {
printfn "start 2"
do! timeOutMillis 2000
printfn "end 2" }
attemptWithCancel <| job {
printfn "start 3"
failwith "some exception" }
]
Why won't these jobs cancel? Jobs are very lightweight units of work, which are essentially atomic unless you write them otherwise. Once a job is started, it won't be cancelled under normal conditions. So if IVar.read cancel
is committed to, but attempt atomicJob
has already been started by the scheduler, atomicJob
will probably complete and then have its results discarded by the <|>
.
So in order to actually respect a cancellation, your atomic jobs will need to be converted to job pipelines that check the status of the cancellation at each step. They'll also need to pipe forward some form of working state:
type WorkState<'a, 'b> =
| Started
| Ongoing of 'a
| Completed of 'b
| Cancelled
/// Builds a job composed of a list of concurrent jobs
let willCancel () : Job<ResizeArray<WorkState<unit, string>>> =
/// Fill this with () if we need to cancel
let cancel : IVar<WorkState<unit, string>> = IVar()
/// Wraps a job in an exception handler, and fills cancel if it raises
let attempt (composedJob: Job<WorkState<'a, 'b>>) : Job<_> =
Job.tryWith composedJob
(fun exn -> IVar.tryFill cancel Cancelled >>-. Cancelled)
/// Checks whether we cancelled, and if not, does the job
let doIfOk (atomicJob: Job<WorkState<unit, string>>) : Alt<_> =
// This will be available as soon as we cancel
IVar.read cancel
// This will be available when our job completes,
// but our job won't even start if we already cancelled
<|> (Alt.unit () ^=> fun () -> atomicJob)
Job.conCollect [
attempt (doIfOk <| job {
printfn "start 1"
return Started }
>>= (fun state -> doIfOk (timeOutMillis 1000 >>-. Ongoing ()))
>>= (fun state -> doIfOk <| job {
let rsp = "end 1"
printfn "%s" rsp
return Completed rsp } ))
attempt (doIfOk <| job {
printfn "start 2"
return Started }
>>= (fun state ->
doIfOk (Job.raises (Exception()) >>-. Cancelled)))
]
An alternative approach might involve the Alt.nackWithJob
family, or related infixes like *<-=>-
. These are more complicated to get started with, but will ultimately push your architecture toward using e.g. Job.iterateServer
, which gives you robust internal microservers that can respond to requests and keep track of their own private state.
As an aside, I think a lot of idiomatic F# doesn't really use exceptions much, preferring to catch exceptions close to the source, and then pass around union types like WorkState
in this example (or the widely used Result
). But of course go with what works for you! I hope this is helpful!
Thanks! I have to admit the code is far beyond my current understanding of Hopac. It certainly seems highly non-trivial. So I'm a bit wary of copy-pasting it and using it in critical infrastructure. Do you know anything about the performance implications? I am for example concerned with the extra allocations for each job due to WorkState
.
It would also be great to get a Hopac maintainer's perspective on the soundness of the proposed solution.
Yeah, it's certainly non-trivial. I edited the examples above to add some comments and type annotations, so I hope that's a little clearer. I'll also break a few things down:
- The top-level functions do need to be functions that build concurrent jobs, so you get a fresh cancellation IVar each time.
- An IVar is a receptable for a variable that can only be filled once, and is then immutable. Reading the value of an IVar with
IVar.read
gives you an Alt (see next point). You can read it as many times as you want, but reading its value won't complete until it the value is filled. - An Alt is a job alternative. Alts are in fact jobs (Alt inherits Job). When you compose Alts (like with <|>), you get a bigger Alt that chooses (or commits to) the first available result from one of the smaller Alts inside it. Then after it commits you can do more work with the result, such as with ^=> or Alt.afterJob.
- Job.tryWith composes a job that works like
try ... with exn -> ...
- >>-. composes a job that: does this job >>-. and returns this value
- >>= composes a job that: does this job >>= and passes its result -> to this job (and does this job as well)
In terms of performance, (edited bad oversimplification) F#/.NET runtime with server GC should handle the closures and reference allocations in this example easily. Running lots and lots of small jobs is where Hopac's performance excels compared to competing frameworks.
Edited to add:
- don't block inside jobs
- don't
Hopac.run
inside jobs (Job.queue is ok but would need the cancellation forwarded) - don't make your working state a large value type. If you're doing image processing or something like that, wrap your working data in a reference type to avoid duplicated memory
- don't put
<|>
,Alt.choose
, etc. (ordoIfOk
) inside a rec function (see #192)
Anecdotally, my main codebase passes around a lot more state than this, with much deeper pipelines, and it's still very fast after the first call (when the F# runtime instantiates all the types and so forth). Hopac is really well optimized in my experience, at least what I've used.
But if you did want to try this approach, I'd suggest mocking something up that's more realistic to your scenario and benchmarking it before reworking your production code.
I hope that makes a bit more sense!
It also occurred to me that you could use latches instead of alts, which might lead to a more elegant design.
Thank you, that is very helpful. I will look carefully at your code and explanations when time permits.