hreinhardt/amqp

openConnection/closeConnection not thread safe

Opened this issue · 1 comments

I noticed that if multiple connections to the same amqp server are opened on separate threads, one thread calling closeConnection causes all of the connections to stop working.

I don't plan to have per-thread connections in production (this arose when running tests in parallel) but it strikes me as something that would be good to fix, or at least document.

That's strange, since there's no global state in the AMQP library; therefore, connections should be totally independent.

I tried hacking together a simple test-case, but it's not displaying any erroneous behaviour:

{-# OPTIONS -XOverloadedStrings #-}
import Network.AMQP

import Control.Concurrent
import qualified Data.ByteString.Lazy.Char8 as BL

main = do
    putStrLn "(thread1) opening connection"
    conn <- openConnection "127.0.0.1" "/" "guest" "guest"
    chan <- openChannel conn
    declareQueue chan newQueue {queueName = "myQueue"}
    declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"}
    bindQueue chan "myQueue" "myExchange" "myKey"
    consumeMsgs chan "myQueue" Ack $ \(msg, env) -> do
        putStrLn $ "(thread1) received message: "++(BL.unpack $ msgBody msg)
        ackEnv env

    forkIO thread2
    threadDelay 500000

    putStrLn "(thread1) publishing message"
    publishMsg chan "myExchange" "myKey"
        (newMsg {msgBody = (BL.pack "hello world (from thread1)"),
                 msgDeliveryMode = Just NonPersistent}
                )

    threadDelay 500000
    closeConnection conn
    putStrLn "(thread1) connection closed"


thread2 = do
    putStrLn "(thread2) opening connection"
    conn2 <- openConnection "127.0.0.1" "/" "guest" "guest"
    chan2 <- openChannel conn2

    putStrLn "(thread2) publishing message"
    publishMsg chan2 "myExchange" "myKey"
        (newMsg {msgBody = (BL.pack "hello world (from thread2)"),
                 msgDeliveryMode = Just NonPersistent}
                )
    threadDelay 100000
    closeConnection conn2
    putStrLn "(thread2) connection closed"

For me it prints:

(thread1) opening connection
(thread2) opening connection
(thread2) publishing message
(thread1) received message: hello world (from thread2)
(thread2) connection closed
(thread1) publishing message
(thread1) received message: hello world (from thread1)
(thread1) connection closed

So Thread1 is able to work with the connection, even after Thread2 opens and closes its own connection.

It would be great if you could produce a test-case. But I also wouldn't rule out that the problem might actually lie elsewhere, so you probably want to take a look at the RabbitMQ log-file first and see if there's anything unusual.