hreinhardt/amqp

Deadlock on calling cancelConsumer.

alaendle opened this issue · 11 comments

I'm calling cancelConsumer from another thread while consuming messages - I would expect that his call is successful - but it blocks undefinedly. Please note that the call succeeds if there are no in-flight messages. After reading #84 - is this a related problem? As far as I get it the usage of request inside cancelConsumer could be a problem? For now I keep short, but please let me know if you need further information.

I don't think this is related to issue 84. That would only be the case if you had called cancelConsumer from within the consumeMsgs-callback. So what you discovered seems like a bug.

It would be great if you could try adding some print-statements to the code and try to figure out in which line of code exactly it is blocking.

Thanks for your quick response - I try to narrow the problem; what I can tell for now I use the docker image rabbitmq:3.8.1-management as the AMQP broker (don't know if this matters) and I'm definitely not calling cancelConsumer within the callback.

What I did so far...

cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer chan consumerTag = do
    traceIO "--- START: Calling cancelConsumer"
    (SimpleMethod (Basic_cancel_ok _)) <- request chan $ (SimpleMethod (Basic_cancel
        (ShortString consumerTag) -- consumer_tag
        False -- nowait
        ))
    traceIO "--- BETWEEN: Calling cancelConsumer"

    --unregister the consumer
    modifyMVar_ (consumers chan) $ return . M.delete consumerTag
    traceIO "--- END: Calling cancelConsumer"

And I only see the START tracing - BETWEEN is never reached.

Hm, could you try adding some debug-statements in the request function itself?

For sure - the result...

Cancel rabbit consumer: input2 "1"
--- cancelConsumer --- 1
--- request --- 1
--- request --- 2
--- request --- 3
--- request --- 4
--- request --- 5
--- request --- 7

request :: Channel -> Assembly -> IO Assembly
request chan m = do
    traceIO "--- request --- 1"
    res <- newEmptyMVar
    CE.catches (do
            withMVar (chanClosed chan) $ \cc -> do
                traceIO "--- request --- 2"
                if isNothing cc
                    then do
                        traceIO "--- request --- 3"
                        modifyMVar_ (outstandingResponses chan) $ \val -> return $! val Seq.|> res
                        traceIO "--- request --- 4"
                        writeAssembly' chan m
                        traceIO "--- request --- 5"
                    else do
                        traceIO "--- request --- 6"
                        CE.throwIO $ userError "closed"

            -- res might contain an exception, so evaluate it here
            traceIO "--- request --- 7"
            !r <- takeMVar res
            traceIO "--- request --- 8"
            return r
            )
        [CE.Handler (\ (_ :: AMQPException) -> throwMostRelevantAMQPException chan),
         CE.Handler (\ (_ :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
         CE.Handler (\ (_ :: CE.IOException) -> throwMostRelevantAMQPException chan)]

So I'm blocked forever in line..

!r <- takeMVar res

I also tried to run against RabbitMQ 3.7.21 - same result. However the problem seems to be very sensitive to timing - I can reproduce it in my real world application every time, but unfortunately I was unable to build a simple unit test exposing the same behaviour.

@hreinhardt Thanks for your support - I think I spotted the cause - not sure if this is a problem of the library or my application.

What happened? In the consumeMsgs I forwarded the message to a 3rd-party-system (waiting for an acknowledgment to confirm/reject the consumed message). Unfortunately this 3rd-party-system also raises the event to cancel the consumption - on the same thread as it acknowledges the forwarded messages. So what basically happened - greatly simplified

      con <- AMQP.openConnection "localhost" "/" "user" "user"
      signal <- newEmptyMVar
      c <- AMQP.openChannel con
      consumerTag <- AMQP.consumeMsgs c "input" AMQP.Ack (\(msg, env) -> do
        takeMVar signal -- block
        AMQP.ackEnv env)

      threadDelay $ 1 * 1000 * 1000 -- wait for consumer to block

      -- cancel and release consumer -- thread doesn't matter
      AMQP.cancelConsumer c consumerTag -- blocks - since the consumer is blocked
      putMVar signal ()

Are you actually calling takeMVar inside consumeMsgs? That is generally not a good idea, since as long as the takeMVar is waiting, the whole AMQP library would essentially be blocked.

Not directly, but the effect still remains the same. Think of sendSync (AMQP.msgBody msg) insteat of takeMVar - dependent on the result of this method I acknowledged/rejected the AMQP message. Unfortunately the 3rd party system also raises a deviceTwinSpecChanged callback on it's receiver thread - which is the same thread where it handles it's acknowledgments for the sendSyncmethod.

I use the deviceTwinSpecChanged to manage my relationship to RabbitMQ - add/remove subscriptions/channels and so on.

So if there are no in-flight messages everything works, but it may happen that while sendSync is waiting to come back; the deviceTwinSpecChanged callback is raised and I tried to adjust RabbitMQ channels - e.g. by calling cancelConsumer. So now basically the receiver thread is blocked, since sendSync would not return until deviceTwinSpecChanged returns - and this callback needs the receiver thread because it has called cancelConsumer -> Deadlock!

So after I understood the problem, I changed two things to mitigate the problem. First I used a asynchronous method and continuations to forward the message and immediately free the consumer thread. And I added a forkIO as the first statement of the deviceTwinSpecChanged callback to not delay the execution of the continuations.

This works as expected, but lead to a new question (which is maybe not directly related the the AMQP library). Since the cancelConsumer methods runs in the receiver thread and waits for a acknowledgment from the server (at least as far as I interpret the code) I can assume no further messages will arrive, therefore it should be safe to close the channel. This was true for the synchronous sender, but now I also have to make sure that my continuations are carried out because these continuations want to acknowledge there messages. For now I "solve" this problem by a small thread delay between cancelConsumer and closeConsumer - but I would strive for a more elegant solution.

I am aware that this is more a general concurrency question, but maybe we can close this issue with a tip to this question?

Once again thanks for your support and your long-standing engagement in the community expressed inter alia in this great library.

Since the cancelConsumer methods runs in the receiver thread and waits for a acknowledgment from the server (at least as far as I interpret the code) I can assume no further messages will arrive, therefore it should be safe to close the channel.

It is always safe to close a channel. I don't think you even need to call cancelConsumer before, since closing the channel will implicitly cancel all consumers.

If you meant safe in the sense of "no more messages will arrive" then you're correct. But even then: If some messages were to be lost, that wouldn't be a big deal. Since if you can't acknowledge them, the server should resend them at some point in the future.

but now I also have to make sure that my continuations are carried out because these continuations want to acknowledge there messages.

I'm not sure I understand you fully here. If you have some continuation that will acknowledge a message, why would that have to run before cancelConsumer?


Just as an idea, an alternative approach to structure the consumer logic might be to store all the incoming messages in some kind of queue and handle them in a different thread, e.g.:

import qualified Control.Concurrent.Chan as C

myChan <- C.newChan

consumeMsgs chan "myQueue" Ack $ \(msg, env) -> do
    C.writeChan myChan (msg, env)
    
    
-- in a different thread:
go = do
    (msg, env) <- C.readChan myChan
    ackEnv env
    go

This would keep the work inside consumeMsgs to a minimum and rule out any deadlocking issues.

Absolutely, however there is no guarantee that ackEnv runs with success (meaning the channel is still open). So for now I added a small delay between cancelConsumer and closeChannel - hoping that inflight messages in the other thread are processed in between. While i think a more deterministic solution without the delay is possible - e.g. by using semaphores or something similar - it might lead to more complexity and up to my implementation to new problems ;-) So for now I live with the delay. The other approach would be just close the channel - and catch the exception on ackEnv - however this could lead to duplicated processing of a message (QoS: at least once) - while this is for sure acceptable from a specification point of view - the app could crash anytime - I would like to avoid that in the case of a graceful shutdown (therefore I prefer the threadDelay-solution).

I think if you wanted to solve this the "right way", you'd need something like an MVar Int for every channel, which would store the number of continuations that haven't completed yet. Once this drops to zero you'd be safe to close the channel. But I haven't really thought this through, maybe I'm missing something.

Of course, your current solution will probably be fine in practice.

For sure; once again thanks for your ideas and your support!