Deadlock on calling cancelConsumer.
alaendle opened this issue · 11 comments
I'm calling cancelConsumer
from another thread while consuming messages - I would expect that his call is successful - but it blocks undefinedly. Please note that the call succeeds if there are no in-flight messages. After reading #84 - is this a related problem? As far as I get it the usage of request
inside cancelConsumer
could be a problem? For now I keep short, but please let me know if you need further information.
I don't think this is related to issue 84. That would only be the case if you had called cancelConsumer
from within the consumeMsgs
-callback. So what you discovered seems like a bug.
It would be great if you could try adding some print-statements to the code and try to figure out in which line of code exactly it is blocking.
Thanks for your quick response - I try to narrow the problem; what I can tell for now I use the docker image rabbitmq:3.8.1-management
as the AMQP broker (don't know if this matters) and I'm definitely not calling cancelConsumer
within the callback.
What I did so far...
cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer chan consumerTag = do
traceIO "--- START: Calling cancelConsumer"
(SimpleMethod (Basic_cancel_ok _)) <- request chan $ (SimpleMethod (Basic_cancel
(ShortString consumerTag) -- consumer_tag
False -- nowait
))
traceIO "--- BETWEEN: Calling cancelConsumer"
--unregister the consumer
modifyMVar_ (consumers chan) $ return . M.delete consumerTag
traceIO "--- END: Calling cancelConsumer"
And I only see the START tracing - BETWEEN is never reached.
Hm, could you try adding some debug-statements in the request
function itself?
For sure - the result...
Cancel rabbit consumer: input2 "1"
--- cancelConsumer --- 1
--- request --- 1
--- request --- 2
--- request --- 3
--- request --- 4
--- request --- 5
--- request --- 7
request :: Channel -> Assembly -> IO Assembly
request chan m = do
traceIO "--- request --- 1"
res <- newEmptyMVar
CE.catches (do
withMVar (chanClosed chan) $ \cc -> do
traceIO "--- request --- 2"
if isNothing cc
then do
traceIO "--- request --- 3"
modifyMVar_ (outstandingResponses chan) $ \val -> return $! val Seq.|> res
traceIO "--- request --- 4"
writeAssembly' chan m
traceIO "--- request --- 5"
else do
traceIO "--- request --- 6"
CE.throwIO $ userError "closed"
-- res might contain an exception, so evaluate it here
traceIO "--- request --- 7"
!r <- takeMVar res
traceIO "--- request --- 8"
return r
)
[CE.Handler (\ (_ :: AMQPException) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (_ :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
CE.Handler (\ (_ :: CE.IOException) -> throwMostRelevantAMQPException chan)]
So I'm blocked forever in line..
Line 826 in 9711c97
I also tried to run against RabbitMQ 3.7.21 - same result. However the problem seems to be very sensitive to timing - I can reproduce it in my real world application every time, but unfortunately I was unable to build a simple unit test exposing the same behaviour.
@hreinhardt Thanks for your support - I think I spotted the cause - not sure if this is a problem of the library or my application.
What happened? In the consumeMsgs I forwarded the message to a 3rd-party-system (waiting for an acknowledgment to confirm/reject the consumed message). Unfortunately this 3rd-party-system also raises the event to cancel the consumption - on the same thread as it acknowledges the forwarded messages. So what basically happened - greatly simplified
con <- AMQP.openConnection "localhost" "/" "user" "user"
signal <- newEmptyMVar
c <- AMQP.openChannel con
consumerTag <- AMQP.consumeMsgs c "input" AMQP.Ack (\(msg, env) -> do
takeMVar signal -- block
AMQP.ackEnv env)
threadDelay $ 1 * 1000 * 1000 -- wait for consumer to block
-- cancel and release consumer -- thread doesn't matter
AMQP.cancelConsumer c consumerTag -- blocks - since the consumer is blocked
putMVar signal ()
Are you actually calling takeMVar
inside consumeMsgs
? That is generally not a good idea, since as long as the takeMVar
is waiting, the whole AMQP library would essentially be blocked.
Not directly, but the effect still remains the same. Think of sendSync (AMQP.msgBody msg)
insteat of takeMVar
- dependent on the result of this method I acknowledged/rejected the AMQP message. Unfortunately the 3rd party system also raises a deviceTwinSpecChanged
callback on it's receiver thread - which is the same thread where it handles it's acknowledgments for the sendSync
method.
I use the deviceTwinSpecChanged
to manage my relationship to RabbitMQ - add/remove subscriptions/channels and so on.
So if there are no in-flight messages everything works, but it may happen that while sendSync
is waiting to come back; the deviceTwinSpecChanged
callback is raised and I tried to adjust RabbitMQ channels - e.g. by calling cancelConsumer
. So now basically the receiver thread is blocked, since sendSync
would not return until deviceTwinSpecChanged
returns - and this callback needs the receiver thread because it has called cancelConsumer
-> Deadlock!
So after I understood the problem, I changed two things to mitigate the problem. First I used a asynchronous method and continuations to forward the message and immediately free the consumer thread. And I added a forkIO
as the first statement of the deviceTwinSpecChanged
callback to not delay the execution of the continuations.
This works as expected, but lead to a new question (which is maybe not directly related the the AMQP library). Since the cancelConsumer
methods runs in the receiver thread and waits for a acknowledgment from the server (at least as far as I interpret the code) I can assume no further messages will arrive, therefore it should be safe to close the channel. This was true for the synchronous sender, but now I also have to make sure that my continuations are carried out because these continuations want to acknowledge there messages. For now I "solve" this problem by a small thread delay between cancelConsumer
and closeConsumer
- but I would strive for a more elegant solution.
I am aware that this is more a general concurrency question, but maybe we can close this issue with a tip to this question?
Once again thanks for your support and your long-standing engagement in the community expressed inter alia in this great library.
Since the cancelConsumer methods runs in the receiver thread and waits for a acknowledgment from the server (at least as far as I interpret the code) I can assume no further messages will arrive, therefore it should be safe to close the channel.
It is always safe to close a channel. I don't think you even need to call cancelConsumer
before, since closing the channel will implicitly cancel all consumers.
If you meant safe in the sense of "no more messages will arrive" then you're correct. But even then: If some messages were to be lost, that wouldn't be a big deal. Since if you can't acknowledge them, the server should resend them at some point in the future.
but now I also have to make sure that my continuations are carried out because these continuations want to acknowledge there messages.
I'm not sure I understand you fully here. If you have some continuation that will acknowledge a message, why would that have to run before cancelConsumer
?
Just as an idea, an alternative approach to structure the consumer logic might be to store all the incoming messages in some kind of queue and handle them in a different thread, e.g.:
import qualified Control.Concurrent.Chan as C
myChan <- C.newChan
consumeMsgs chan "myQueue" Ack $ \(msg, env) -> do
C.writeChan myChan (msg, env)
-- in a different thread:
go = do
(msg, env) <- C.readChan myChan
ackEnv env
go
This would keep the work inside consumeMsgs
to a minimum and rule out any deadlocking issues.
Absolutely, however there is no guarantee that ackEnv
runs with success (meaning the channel is still open). So for now I added a small delay between cancelConsumer
and closeChannel
- hoping that inflight messages in the other thread are processed in between. While i think a more deterministic solution without the delay is possible - e.g. by using semaphores or something similar - it might lead to more complexity and up to my implementation to new problems ;-) So for now I live with the delay. The other approach would be just close the channel - and catch the exception on ackEnv
- however this could lead to duplicated processing of a message (QoS: at least once) - while this is for sure acceptable from a specification point of view - the app could crash anytime - I would like to avoid that in the case of a graceful shutdown (therefore I prefer the threadDelay
-solution).
I think if you wanted to solve this the "right way", you'd need something like an MVar Int
for every channel, which would store the number of continuations that haven't completed yet. Once this drops to zero you'd be safe to close the channel. But I haven't really thought this through, maybe I'm missing something.
Of course, your current solution will probably be fine in practice.
For sure; once again thanks for your ideas and your support!