coMaxChannel, or some other way to limit the number of simultaneous threads
Closed this issue · 8 comments
I'm writing a Yesod app, and part of my architecture is that a background worker process consumes "job" messages from an AMQP queue. Jobs are pushed onto the queue at an uneven and unpredictable rate. I.e. sometimes 1000 jobs will suddenly be pushed, while often 5 or 10 minutes will pass without any being pushed.
The callback I pass to consumeQueue performs a synchronous http callout and also runs database queries before and after each callout. So each callback thread needs to own a database connection for a while (up to a few seconds).
I'm running into situations where the background worker process is spinning off a bunch of callback threads in very quick succession, and pretty soon there are more concurrent callback threads than there are database connections. (At least, that's what I think is happening.) So a bunch of the threads end up dying, each reporting the following SqlError:
SqlError {sqlState = "", sqlExecStatus = FatalError, sqlErrorMsg = "FATAL: remaining connection slots are reserved for non-replication superuser connections\n", sqlErrorDetail = "", sqlErrorHint = ""}
My sense is that the right way to solve this problem is to limit the max number of channels in use at any one time. I'm not actually sure if that's right -- in my mind, 1 channel means 1 thread consuming 1 message (and passing it to my callback) at a time. So if that's not right, please correct me. But if it is right... are you close to implementing coMaxChannel? =) Or can you think of any other way to approach this situation?
Thanks!!
It absolutely doesn't have to be 1 channel for 1 consumer using 1 thread. Most clients (Java, .NET, several Ruby ones, Erlang and, I believe, Go) maintain a pool of threads (or similar) which is either fixed or fixed by default.
Dynamically creating a thread per delivery is pretty wasteful unless you only have cases when processing a delivery takes a long time.
Thanks Michael. I'm pretty much a noob with this stuff in general.
Am I right in thinking I would need to create and manage a channel pool manually? Or does this library have some way of helping me with that?
To my original question, would you recommend a particular way of throttling my consumption of messages? Right now I'm thinking about using flow
. That is, every time a SqlError is raised in a callback thread, I'd handle it by calling flow chan False
, then sleep for say 2 seconds before restarting flow.
You can use basic.qos
with manual acks to control how many messages are pushed to your consumer at a time. How to process the deliveries concurrently or in parallel is up to you.
There is no channel pooling in this library (or almost any RabbitMQ client) and because error handling is scoped to channels, it is not a great idea.
You can use just one channel if you only consume. It can take you a long way.
It seems like I'm confused about terminology and the underlying concepts.
When you say "You can use just one channel if you only consume", I think
that's clear enough to me.
I also understand now that there's no "1 thread == 1 channel" parity.
So when you say "Dynamically creating a thread per delivery is pretty
wasteful", I'm not sure what you mean. Isn't thread spawning handled by
Network.AMQP's internals -- it just spawns one thread per message it
consumes?
Thanks for bearing with me.
On Mon, Oct 14, 2013 at 2:46 PM, Michael Klishin
notifications@github.comwrote:
You can use basic.qos with manual ackshttp://www.rabbitmq.com/tutorials/tutorial-two-python.htmlto control how many messages are pushed to your consumer at a time. How to
process the deliveries concurrently or in parallel is up to you.There is no channel pooling in this library (or almost any RabbitMQ
client) and because error handling is scoped to channels, it is not a great
idea.You can use just one channel if you only consume. It can take you a long
way.—
Reply to this email directly or view it on GitHubhttps://github.com//issues/34#issuecomment-26278801
.
I need to take a look but if it does, it's simply unnecessary.
You can use one channel if your code is very unlikely to produce a channel exception (e.g. reference a queue that does not exist or try to redeclare it with different attributes), so you don't really care about fine grained error handling.
Perhaps this protocol concepts overview would be useful.
We do not spawn a thread for every message. All messages that a channel receives are processed one after another on the same thread.
In your situation I would probably use one channel that just pushes all incoming messages into some kind of a queue data-structure (for example Control.Concurrent.Chan). You can then create several worker threads that read from your queue and process the items.
This has been extremely helpful -- thanks very much, both of you.
I've just confirmed -- had to see it with my own eyes -- can I can use
qos
with prefetchCount
of 1 (plus making sure to require ack, and not
to send it until my job is complete) to throttle my process to 1 job at a
time. I'll play with ways to throttle it up -- I see how I might be able
to use a different prefetchCount
value, and I also see how and why I
might fetch all messages, queue them in my own data structure, and pop them
off that structure whenever there's an available database connection.
On Mon, Oct 14, 2013 at 6:36 PM, Holger Reinhardt
notifications@github.comwrote:
We do not spawn a thread for every message. All messages that a channel
receives are processed one after another on the same thread.In your situation I would probably use one channel that just pushes all
incoming messages into some kind of a queue data-structure (for example
Control.Concurrent.Chan). You can then create several worker threads that
read from your queue and process the items.—
Reply to this email directly or view it on GitHubhttps://github.com//issues/34#issuecomment-26294763
.