hreinhardt/amqp

consumeMsgs forever

Closed this issue · 5 comments

In a long running service what is the correct way to wait for the consumeMsgs function?

The examples in the documentation is using getLine or threadDelay and for my application I haven't found a better way than using forever $ threadDelay 1000000, but it seems a bit of a hack.

Might be something obvious in the use of Control.Concurrency that I'm missing.

withChannel :: Connection -> (Channel -> IO a) -> IO a
withChannel conn =
  bracket (openChannel conn) closeChannel

consumeMaterials :: Config -> Connection -> (Material -> IO ()) -> IO ()
consumeMaterials (Config {..}) conn f =
  withChannel conn $ \chan -> do
    void $ declareQueue chan newQueue { queueName = cfgMqQueue }
    void $ consumeMsgs chan cfgMqQueue Ack (upsertMessage f)
    forever $ threadDelay 1000000

upsertMessage :: (Material -> IO ()) -> (Message, Envelope) -> IO ()
upsertMessage f (msg, env) = do
  logStr $ "upsertMessage: " ++ show msg
  void $ f $ fromMessage msg
  ackEnv env

I think using threadDelay is perfectly fine.
Alternatively you could use an (initially empty) MVar (). Calling readMVar would then block until you put something inside the MVar.

Actually you may not really need to block at all. When you call consumeMsgs the subscription will live as long as the channel lives.
So if you get rid of the calls to closeChannel your subscriptions will be permanent without needing to block.

It looks like I'll have to either block on the channel or on the connection. Even without explicitly closing the channel or the connection, if nothing is blocking, the program will just terminate.

An alternative would be repeatedly calling getMsg, which runs in the foreground. Is it not possible to "listen" for a message in the main thread?

You're right that you need to somehow prevent the connection from being closed. I just wanted to point out that you don't need to do it for every channel.

There is no method for listening in the main-thread, but even if there was, it would probably have to use threadDelay internally ;).

So I'd recommend either using threadDelay or an MVar.

I'll stick with threadDelay. Thanks for your response!