hreinhardt/amqp

Dangerous uses of forkIO

NorfairKing opened this issue · 19 comments

Hi!

While going through a rabit hole together with @vidocco, we found that the amqp library uses the dangerous forkIO.

Our issue is as follows:
We have a worker executable that should watch for certain messages and "deal" with thim.
This worker executable should keep running at all times.
However, consumeMsgs is non-blocking, so we use something like takeMVar to block until a certain variable is filled by our connectionClosedHandler.
So far so good, but this has the following problem:
If the handler ever stops receiving messages without closing the connection, then our worker will hang indefinitely. (Any advice on how to do this more robustly would be appreciated.)

We recently had an incident where the worker stopped processing messages without crashing, so @vidocco and I went went to investigate.
Our hypothesis, after reading the amqp code, is that the following might have happened:

The channelReceiver seems to be the function that is responsible for handling incoming messages and is started using forkFinally, which uses forkIO.
This means that if this thread ever stops looping (because it throws an exception for example), the main thread will not notice this and the connection will not be closed either.

Here is an example to show that that's how forkIO works:

#!/usr/bin/env stack
-- stack --resolver lts-15.15 script
{-# LANGUAGE NumericUnderscores #-}
import Control.Concurrent
main :: IO ()
main = do
  putStrLn "Starting our 'server'."
  forkIO $ do            
    putStrLn "Serving..."
    threadDelay 1_000_000
    putStrLn "Oh no, about to crash!"
    threadDelay 1_000_000
    putStrLn "Aaaargh"
    undefined
  threadDelay 5_000_000
  putStrLn "Still running, eventhough we crashed"
  threadDelay 5_000_000                 
  putStrLn "Ok that's enough of that, stopping here."

Which outputs:

$ ./test.hs
Starting our 'server'.
Serving...
Oh no, about to crash!
Aaaargh
test.hs: Prelude.undefined
CallStack (from HasCallStack):
  error, called at libraries/base/GHC/Err.hs:80:14 in base:GHC.Err
  undefined, called at /home/syd/test/test.hs:17:5 in main:Main
Still running, eventhough we crashed
Ok that's enough of that, stopping here.

We haven't found which part of the channelReceiver might have thrown the exception that may have caused the handler thread to stop, but we found some usages of error that may have been the culprit:

In short; forkIO is dangerous and should be avoided. You can use async together with link instead.

I'm currently working on reproducing this issue by manually sending a message that would cause the handler thread to error and produce this issue.

I was able to reproduce a problem, though not the one we were having, here: https://github.com/NorfairKing/amqp-forkio-repro

I know it's not the problem we were having because the worker's logs show these errors that we didn't see:

ERROR: channel not open 1
ERROR: channel not open 1
ERROR: channel not open 1

So what I've reproduced here is also an issue, but not the one we had.

Hi,

This means that if this thread ever stops looping (because it throws an exception for example), the main thread will not notice this and the connection will not be closed either.

Exceptions shouldn't be a problem, since the library catches any callback exceptions: https://github.com/hreinhardt/amqp/blob/master/Network/AMQP/Internal.hs#L593 (the subscriber variable references the user-supplied consumeMsgs-callback)

I think the more likely cause may be that your consumeMsgs callback itself is blocking. This in turn would block the whole AMQP channel. Are you maybe performing some channel-operations in the callback?

Another reason may be if you are accidentally catching ChanThreadKilledException inside the callback without rethrowing it. That would prevent the library from closing the channel itself.

So what I've reproduced here is also an issue, but not the one we had.

Interesting. I'll have to take a closer look at this but it probably is a bug.

OK I've taken a look at your repository. You have the following code:

  let putDangerousMessageOnOwnQueue = do
        writeChan
          (inQueue chan)
          HeartbeatPayload

Here you are actually using functions from AMQP.Internal which aren't really designed to be called by end-users. You're essentially tricking the library into believing that a Heartbeat came in. So it's probably not a bug unless there's a way to trigger it without using any functions from Internal.

Exceptions shouldn't be a problem, since the library catches any callback exceptions: https://github.com/hreinhardt/amqp/blob/master/Network/AMQP/Internal.hs#L593 (the subscriber variable references the user-supplied consumeMsgs-callback)

I meant exceptions from within the library code around the callbacks, not exceptions from the callbacks themselves .
The library is using error quite a lot, it has a lot of potential to throw exceptions.
I also think that catching all exceptions is probably not what you want to do because then you also catch async exceptions.

I think the more likely cause may be that your consumeMsgs callback itself is blocking. This in turn would block the whole AMQP channel. Are you maybe performing some channel-operations in the callback?

That sounds like something I should definitely double-check, thanks for the pointer!
We thought that the problem would be with the library because the same problem occurred on three different services that do very different things at the same time but there could have been some shared issue that I'm not aware of.

Here you are actually using functions from AMQP.Internal which aren't really designed to be called by end-users. You're essentially tricking the library into believing that a Heartbeat came in. So it's probably not a bug unless there's a way to trigger it without using any functions from Internal.

If I understand correctly, these things are input from bytes on the network. Any network data can come in at any time, technically speaking, so I could trigger the issue by sending bytes to the worker myself without using functions from AMQP.Internal.
It's also not just heartbeats, a heartbeat was just the easiest way to show the problem. Another way would be to have Basic_return with an unknown error code for example. Any of the errors that can be called in the handler would cause this issue.

The point is that if there is a bug in the library, I would like the whole thing to crash instead of it stopping to receive messages.
In that sense forkIO is not what I'd want the library to use.
If it so happens that the problematic scenario can really never happen, then that would be fine, but I would prefer not to hope that that's the case.

I meant exceptions from within the library code around the callbacks, not exceptions from the callbacks themselves .

Ah, I see. I think if that was the cause, wouldn't you see the exception printed on the screen? We do catch exceptions in a few places but most of the time we do print something on the screen. So getting absolutely no output makes me think that this is unlikely to be the cause. But I might be wrong.

I also think that catching all exceptions is probably not what you want to do because then you also catch async exceptions.

I agree in general, but for the channel-thread we want to ensure it is not accidentally killed by an async exception. Only the AMQP library itself should kill channel-threads.

If I understand correctly, these things are input from bytes on the network. Any network data can come in at any time, technically speaking, so I could trigger the issue by sending bytes to the worker myself without using functions from AMQP.Internal.

Yes but a well-behaved AMQP server hopefully never triggers these errors.

The point is that if there is a bug in the library, I would like the whole thing to crash instead of it stopping to receive messages.

I think nobody ever reported hitting one of these errors, so there was no strong motivation to fix it. But you're right that it should probably be handled in a better way.

In that sense forkIO is not what I'd want the library to use.

You mean we should use a lib like async as an abstraction on top of threads? I'm not opposed to it, but I'm not really familiar with async so I don't know how much refactoring that would require.

Another reason may be if you are accidentally catching ChanThreadKilledException inside the callback without rethrowing it. That would prevent the library from closing the channel itself.

This ended up being the issue. We were foolishlynaively catching SomeException. Now we're only catching non-async and non-ChanThreadKilledExceptions.
Thank you very much for this pointer, it really helped!

As for your other comments: You're probably right that a lot of these things that could go wrong, but don't, would signify a bug in the library.
However, when experiencing a bug it's often hard to figure out whether the bug is in our code or in the library code.
For that reason it would be great if libraries didn't use dangerous or partial functions so that they are not so suspicious even is the library is completely bug-free.

You mean we should use a lib like async as an abstraction on top of threads? I'm not opposed to it, but I'm not really familiar with async so I don't know how much refactoring that would require.

async is full of best-practices whereas using forkIO is really easy to get wrong. In that sense it would potentially be an improvement. However, multi-threaded programming is hard to get right in the best of cases, so it's possible that bugs could be introduced by the refactor as well.
I think getting rid of partial functions altogether would be a much bigger improvement than refactoring to use async.

Some examples partial functions to eliminate:

  • undefined
  • error
  • fromJust
  • decodeUtf8

I fully understand that you're probably maintaining this library for free, so I'm happy to help out where I can!

For that reason it would be great if libraries didn't use dangerous or partial functions so that they are not so suspicious even is the library is completely bug-free.

That's an interesting argument that I haven't really considered before. But I only partially agree with it. You can have nicely looking code that contains a subtle logic-bug or a race-condition, for example.

And I would argue that debugging error calls shouldn't be very hard, since you get a message printed on the screen that points you to the error.

async is full of best-practices whereas using forkIO is really easy to get wrong. In that sense it would potentially be an improvement. However, multi-threaded programming is hard to get right in the best of cases, so it's possible that bugs could be introduced by the refactor as well.

I took a look at async and it feels to me that it's more suited for client-side applications, something like "run a computation that will eventually yield a result". I think the high-level abstractions it offers won't really work for the AMQP library.

The link function you mentioned is lower-level and could be used. This function internally uses forkFinally to ensure that the dying thread kills the linked thread. But we're already using forkFinally in a similar fashion in the AMQP library: When a connection-thread dies we make sure to kill all the channel-threads.

I think getting rid of partial functions altogether would be a much bigger improvement than refactoring to use async.

I just took a quick look and I think nearly all of the partial function calls are from within the connection- or channel-thread, for which we have appropriate exception-handling. So these exceptions should not leak to the user and are basically just an implementation detail.

We could change these partial functions to return something like Either Text x instead of x. But then we would have to bubble these Left a values up as far as possible, i.e. to the event-loop of the connection-thread. And the only thing we could to there would be to close the connection. So the end result would be identical to the current code, but would require converting all intermediate functions to something like ExceptT e IO to pass the errors around.

So I'm not sure if that's worth the effort.

That's an interesting argument that I haven't really considered before. But I only partially agree with it. You can have nicely looking code that contains a subtle logic-bug or a race-condition, for example.
And I would argue that debugging error calls shouldn't be very hard, since you get a message printed on the screen that points you to the error.

I would completely agree, if it wasn't for a fact that using forkIO invalidates this completely because of how exceptions are not thrown onwards across threads in this case. :p

I took a look at async and it feels to me that it [...] won't really work for the AMQP library.

That's fine, it was just a suggestion :)

I just took a quick look and I think nearly all of the partial function calls are from within the connection- or channel-thread, for which we have appropriate exception-handling. So these exceptions should not leak to the user and are basically just an implementation detail.

No, my reproduction case shows a counterexample.
The exception doesn't just leak, it makes the entire server stop responding and has it output these ERROR lines.

We could change these partial functions to return something like Either Text x instead of x. But then we would have to bubble these Left a values up as far as possible, i.e. to the event-loop of the connection-thread. And the only thing we could to there would be to close the connection. So the end result would be identical to the current code, but would require converting all intermediate functions to something like ExceptT e IO to pass the errors around.
So I'm not sure if that's worth the effort.

That's fair. It's entirely possible that not changing anything is the right way to move forward.
We've fixed our bug in the meantime so I'm happy to close this issue.

The only thing left to do is for me to send a PR to change the docs that say If you use flow-control or want to perform anything more complex, it's a good idea to wrap your requests inside 'forkIO'. because that's still definitely a bad idea.

No, my reproduction case shows a counterexample. The exception doesn't just leak, it makes the entire server stop unresponsive and has it output these ERROR lines.

Well, your test-code sends a synthetic HeartbeatPayload to channel 1, and this causes channel 1 to be closed. Any further messages to that channel will lead to the message "ERROR: channel not open 1" being printed on the screen.

You can see the channel being closed by adding something like the following code to the worker function:

    addChannelExceptionHandler chan $ \e -> do
        putStrLn $ "Channel exception: "++show e

But the AMQP connection itself should still function, e.g. other channels should still work.

Maybe you were expecting the whole connection to be closed in response to the error?


The only thing left to do is for me to send a PR to change the docs that say If you use flow-control or want to perform anything more complex, it's a good idea to wrap your requests inside 'forkIO'. because that's still definitely a bad idea.

My thinking was that if the user wants to do something like this:

consumeMsgs chan "myQueue" Ack $ (msg, env) -> do
    cancelConsumer chan "mytag"

that wouldn't work, since using chan inside the callback would deadlock. So the easiest solution would be:

consumeMsgs chan "myQueue" Ack $ (msg, env) -> forkIO $ do
    cancelConsumer chan "mytag"

Is there a problem that I'm not aware of?

Any further messages to that channel will lead to the message "ERROR: channel not open 1" being printed on the screen.

Indeed, but no crashes and no responses. One would have to be watching the logs to notice that something is wrong.
If calling the error in the amqp library means a bug in the library, then that should fail as loudly and as obviously as possible, I think.

Is there a problem that I'm not aware of?

In this case you have the same problem that forkIO always has: You won't notice when something goes wrong, the other threads will just happily go on. The most you might get is something printed to stderr.
Also, in this particular case, you're at risk of creating an unbounded number of threads and take down your entire server.

Indeed, but no crashes and no responses. One would have to be watching the logs to notice that something is wrong. If calling the error in the amqp library means a bug in the library, then that should fail as loudly and as obviously as possible, I think.

You're right. I'll keep it in mind in case I ever decide to do a major refactor.

In this case you have the same problem that forkIO always has: You won't notice when something goes wrong, the other threads will just happily go on. The most you might get is something printed to stderr. Also, in this particular case, you're at risk of creating an unbounded number of threads and take down your entire server.

But if the user wants to run a blocking operation, it definitely must run in a separate thread. So some kind of forkIO is unavoidable. But the advice in the docs is probably not optimal; it might lead people to do dangerous stuff.

Instead of just removing this line from the docs, it might be better to offer users a more sensible recommendation. I could think of something like this:

import qualified Control.Concurrent.Chan as C
import Control.Concurrent (forkFinally)

subscribe :: Channel -> IO ConsumerTag
subscribe chan = do
    queue <- C.newChan
    forkFinally (go queue) $ \res -> case res of
        Left e -> putStrLn $ "Exception in handler: "++show e
        Right _ -> return ()
    consumeMsgs chan "my-queue" Ack $ \(msg, env) ->
        C.writeChan queue (msg, env)
  where
    go queue = do
        (msg, env) <- C.readChan queue
        -- TODO: Handle the message. You can safely perform
        --       blocking operations like cancelConsumer here.
        ackEnv env
        go queue

Basically just putting all incoming messages into a queue and having a single thread that will then handle them.

It still uses forkIO under the hood but at least it would prevent the situation of an unbounded number of threads from being spawned. And by using forkFinally it forces users to at least think about exception-handling.

Basically just putting all incoming messages into a queue and having a single thread that will then handle them.

This is excellent.

Then you wouldn't use forkIO but rather concurrently with consumeMsgs and your worker, so that the other crashes too if one throws an exception.

My understanding is that concurrently is a blocking operation, so that would make the whole function blocking, which is probably not optimal.

Typically a user will want the consume-operation to be non-blocking, so that they can subscribe to multiple channels, for example.

@hreinhardt Then you call concurrently, or something similar, as many times as necessary. What one should not do is use forkIO.

But the point of consumeMsgs is that it runs in the background. If the user wants to explicitly poll for messages they can use getMsg, which is synchronous.

Now that I think of it, the getMsg function actually seems to be exactly what you're asking for. It's a simple synchronous method that gives you a single message.

Maybe I should just recommend people to use this if the consumeMsgs function doesn't work for them.

But the point of consumeMsgs is that it runs in the background. If the user wants to explicitly poll for messages they can use getMsg, which is synchronous.

I think @NorfairKing's point was that people are capable of making threads run in the background themselves, using async. That way, they can declare trees of forked threads which have predictable clean up/exit/exception handling semantics, including cascading exit, and it's easy to use supervisors (a la Erlang) which can retry or die. When the library wants to fork threads, in principle it's more ergonomic, but often in practice it's a leaky abstraction.