hreinhardt/amqp

Using a green thread per message consume

qwbarch opened this issue · 2 comments

This isn't an actual issue with the library. I hope asking a question here is okay since discussions aren't enabled.

When consuming messages, I was expecting each message consume to be running asynchronous to one another.
However, when I tried running threadDelay on each message consume, it seems that only a single thread is used for processing each message.

Here's what I used as an example:

{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}

module Lib where

import Control.Concurrent (threadDelay)
import Control.Monad (forever, replicateM_)
import Network.AMQP (Ack (Ack), DeliveryMode (NonPersistent), Message (msgBody, msgDeliveryMode), ackEnv, consumeMsgs, newMsg, openChannel, openConnection, publishMsg, qos)

run :: IO ()
run = do
  putStrLn "Enter 0 for consumer, 1 for producer:"
  option <- getLine
  connection <- openConnection "127.0.0.1" "/" "guest" "guest"
  channel <- openChannel connection
  case option of
    "0" -> consumer channel
    "1" -> publisher channel
    _ -> run

publisher channel = do
  let sendHello = publishMsg channel "example.exchange" "example.key" (newMsg {msgBody = "Hello world!", msgDeliveryMode = Just NonPersistent})
  replicateM_ 1_000_000 sendHello

consumer channel = do
  let deliveryHandler (message, envelope) = do
        print message
        threadDelay 2_000_000
        ackEnv envelope
  qos channel 0 20_000 True
  _ <- consumeMsgs channel "example.queue" Ack deliveryHandler
  forever $ threadDelay 1_000_000

I was expecting the consumer to print the message out one after another, and 2 seconds after it ack's the message.
Instead it prints one message, waits for 2 seconds, ack's it, and then moves onto the second message, etc.

To go around this, I'm using forkIO on each message consume.

consumer channel = do
  let deliveryHandler (message, envelope) = void . forkIO $ do
        print message
        threadDelay 2_000_000
        ackEnv envelope
  qos channel 0 20_000 True
  _ <- consumeMsgs channel "example.queue" Ack deliveryHandler
  forever $ threadDelay 1_000_000

Is there any big/obvious drawbacks to spawning a new green thread per message? Is there a better way to handle this?

The consumeMsgs-callback runs in the channel-thread, so anything you do there will block the whole channel.

You have a few options:

  1. Use forkIO: the drawback is that it can be hard to make the code reliable; for example, exceptions thrown inside forkIO will not cause the program to crash and so one may forget to handle them.
  2. Use a queue, i.e. inside the consumeMsgs-callback you would put the messages into a queue and then you could have multiple separate threads reading from that queue and handling the messages.
  3. Use getMsg (in a loop) instead of consumeMsgs: You could then have multiple threads call getMsg to receive messages and handle them independently. Since this approach uses polling, this may not be a good solution if you have a very high throughput of messages.

Option 3 is probably the easiest to get right, but it may not work if you have a high volume of messages.

Options 2 looks the most appealing, I'll look into implementing that.
Thanks for the advice, I appreciate it!