tanakh/process-conduit

Deadlocks are possible

Opened this issue · 1 comments

Consider the following example:

{-# LANGUAGE OverloadedStrings #-}
import           Control.Concurrent           (threadDelay)
import           Control.Concurrent.MVar      (newEmptyMVar, putMVar, takeMVar)
import           Control.Monad                (forever)
import           Control.Monad.IO.Class       (liftIO)
import           Control.Monad.Trans.Resource (runResourceT)
import           Data.Conduit                 (yield, ($$), (=$))
import qualified Data.Conduit.List            as CL
import           Data.Conduit.Process         (conduitCmd)

main :: IO ()
main = do
    baton <- newEmptyMVar
    runResourceT $ source baton $$ conduitCmd cmd =$ CL.mapM_ (sink baton)
  where
    source baton = forever $ do
        liftIO $ takeMVar baton >>= print
        yield "Stepped"

    sink baton bs = liftIO $ do
        putMVar baton bs
        print bs

    cmd = "while true; do sleep 1; date; done"

The issue is that a single conduit pipeline is always single-threaded, whereas we really need to be able to respond to two different kinds of signals (data available from the upstream source, and data coming out of the process).

I believe the only true solution is to model the API like Data.Conduit.Network: provide a Source and a Sink separately, and let the client fork a thread as necessary.

Hi! I've also noticed this problem (as have many others). I really needed this functionality for an application I am writing, so reimplemented it using two threads here in this gist: https://gist.github.com/KholdStare/0faff6b2bd6ff01db776

The idea is that there is an abstract "channel" between two threads that have two ends of a conduit. That channel happens to be a process. It's a rough idea/implementation, but I thought I'd get the ball rolling since there hasn't been much activity in this repository.

Let me know if any of the concepts exist already, or if there is a better implementation.