[DPP-104] more primitives from Control.Concurrent.Async for CH
Opened this issue · 0 comments
[Imported from JIRA. Reported by davidsd @davidsd) as DPP-104 on 2014-04-03 00:42:01]
I needed a Process-based version of the Concurrently applicative from Control.Concurrent.Async. To implement it, I had to do some trivial rewriting of a few primitives to work with Process instead of IO (code below). Would it make sense to include these functions in Control.Distributed.Process.Platform.Async (given that it's already largely modeled after Control.Concurrent.Async)?
By the way, I couldn't use CH's existing Async to implement these functions because those require the inner type be Serializable, and I wanted to be able to use these in an applicative, which requires that the inner type be unrestricted.
{code:haskell}
race :: Process a -> Process b -> Process (Either a b)
race left right = concurrently' left right collect
where
collect m = do
e <- liftIO $ takeMVar m
case e of
Left ex -> liftIO $ throwIO ex
Right r -> return r
concurrently :: Process a -> Process b -> Process (a,b)
concurrently left right = concurrently' left right (collect [])
where collect [Left a, Right b] _ = return (a,b)
collect [Right b, Left a] _ = return (a,b)
collect xs m = do
e <- liftIO $ takeMVar m
case e of
Left ex -> liftIO $ throwIO ex
Right r -> collect (r:xs) m
concurrently' :: Process a -> Process b
-> (MVar (Either SomeException (Either a b)) -> Process r)
-> Process r
concurrently' left right collect = do
done <- liftIO newEmptyMVar
mask $ \restore -> do
lid <- spawnLocal $ restore (left >>= liftIO . putMVar done . Right . Left)
catch
(liftIO . putMVar done . Left)
rid <- spawnLocal $ restore (right >>= liftIO . putMVar done . Right . Right)
catch
(liftIO . putMVar done . Left)
let stop = kill lid "process died" >> kill rid "process died"
r <- restore (collect done) onException
stop
stop
return r
{code}