Using a green thread per message consume
qwbarch opened this issue · 2 comments
This isn't an actual issue with the library. I hope asking a question here is okay since discussions aren't enabled.
When consuming messages, I was expecting each message consume to be running asynchronous to one another.
However, when I tried running threadDelay
on each message consume, it seems that only a single thread is used for processing each message.
Here's what I used as an example:
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
module Lib where
import Control.Concurrent (threadDelay)
import Control.Monad (forever, replicateM_)
import Network.AMQP (Ack (Ack), DeliveryMode (NonPersistent), Message (msgBody, msgDeliveryMode), ackEnv, consumeMsgs, newMsg, openChannel, openConnection, publishMsg, qos)
run :: IO ()
run = do
putStrLn "Enter 0 for consumer, 1 for producer:"
option <- getLine
connection <- openConnection "127.0.0.1" "/" "guest" "guest"
channel <- openChannel connection
case option of
"0" -> consumer channel
"1" -> publisher channel
_ -> run
publisher channel = do
let sendHello = publishMsg channel "example.exchange" "example.key" (newMsg {msgBody = "Hello world!", msgDeliveryMode = Just NonPersistent})
replicateM_ 1_000_000 sendHello
consumer channel = do
let deliveryHandler (message, envelope) = do
print message
threadDelay 2_000_000
ackEnv envelope
qos channel 0 20_000 True
_ <- consumeMsgs channel "example.queue" Ack deliveryHandler
forever $ threadDelay 1_000_000
I was expecting the consumer to print the message out one after another, and 2 seconds after it ack's the message.
Instead it prints one message, waits for 2 seconds, ack's it, and then moves onto the second message, etc.
To go around this, I'm using forkIO
on each message consume.
consumer channel = do
let deliveryHandler (message, envelope) = void . forkIO $ do
print message
threadDelay 2_000_000
ackEnv envelope
qos channel 0 20_000 True
_ <- consumeMsgs channel "example.queue" Ack deliveryHandler
forever $ threadDelay 1_000_000
Is there any big/obvious drawbacks to spawning a new green thread per message? Is there a better way to handle this?
The consumeMsgs
-callback runs in the channel-thread, so anything you do there will block the whole channel.
You have a few options:
- Use
forkIO
: the drawback is that it can be hard to make the code reliable; for example, exceptions thrown insideforkIO
will not cause the program to crash and so one may forget to handle them. - Use a queue, i.e. inside the
consumeMsgs
-callback you would put the messages into a queue and then you could have multiple separate threads reading from that queue and handling the messages. - Use
getMsg
(in a loop) instead ofconsumeMsgs
: You could then have multiple threads callgetMsg
to receive messages and handle them independently. Since this approach uses polling, this may not be a good solution if you have a very high throughput of messages.
Option 3 is probably the easiest to get right, but it may not work if you have a high volume of messages.
Options 2 looks the most appealing, I'll look into implementing that.
Thanks for the advice, I appreciate it!