CopernicaMarketingSoftware/AMQP-CPP

Connection create success depends on when I call uv_run()

Closed this issue · 16 comments

Im trying to create connection using AMQP::LibUvHandler. like this:

connection.reset(new AMQP::TcpConnection(this, address));

this function is called in thread of object that implements override functions of TcpHandler like onConnected, onClosed, etc.

Everything works when connection is made like in example:

            connection.reset(new AMQP::TcpConnection(this, address));

            AMQP::TcpChannel channel(connection.get());

            // create a temporary queue
            channel.declareQueue(AMQP::exclusive).onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
                
                // report the name of the temporary queue
                Logger::Instance()->log_warn("queue {}", name);
            });
            uv_run(uv_default_loop(), UV_RUN_DEFAULT);

AMQP::onConnected() callback is called and in RabbitMQ interface I see queue being created, but AMQP::onReady() never called...When is it called?

But when I move uv_run(uv_default_loop(), UV_RUN_DEFAULT); to other places(like before connection is created, but LibUvHandler already was initialized) or simply remove channel-queue creation:

connection.reset(new AMQP::TcpConnection(this, address));
uv_run(uv_default_loop(), UV_RUN_DEFAULT);

even AMQP::onConnected() is not called.

I never used libuv before so I dont understand how it works under the hood, but I was under impression once events to watch were added into loop by LibUvHandler instantiation I can call uv_run(). May be calling other methods like channel creation adds additional events to watch? And if I do not do it before calling uv_run() then uv cycle is finished before those events are added to watch list? So may be whenever I add a channel or queue I need to run uv_run() everytime just to be safe?

EDIT:

I am trying to implement reconnection pattern to rabbitmq via conditional variable, so firstly Im trying to create connection:
connection.reset(new AMQP::TcpConnection(this, address));
then I wait for it to report in onReady() that everything is cool so I can proceed but even onConnected() never called. And after connection would be established I would proceed to channel-queue creation and messaging. I was imagining that if connection is not created onError() would be called, so I can put thread to sleep and try again in 5 seconds. But onError() also never called, after initial single call to monitor() nothing gets called.

Can you provide a working and minimal test program that demonstrates the issue without threads?

I figured out problem, libuv default loop is quitting too fast before connection is established so AMQP has no time to add event to watchlist. As per docs loop is closed if there is no active handles http://docs.libuv.org/en/v1.x/guide/utilities.html#timers. So I added dummy timer with empty callback so loop exists throughout lifecycle of my object, works well now. Not obvious stuff for people who have no experience with event libs tbh. Previously I worked only with default event loop of Windows where you yourself run loop parsing Windows messages, this quite different as I learn.

No that is not the problem. The problem is that you run your loop in a different thread than the AMQP connection.

Right now it works in different threads. UV loop is running in one thread, and AMQP is initialized and being run in the other.

Which is wrong.

This is not informative. Why is this wrong? UV loop doesnt really care which thread it occupies, also it shouldnt care who tells him which events to watch, my understanding is this. And it works. It didnt worked in same thread, nor in different threads, unill I added entity that allows uv loop run indefinitely. So far my understanding is this, Im not amqp/libuv developer so If you can correct me, I would appreciate that.

Here is working example taken from examples:

auto *loop = uv_default_loop();

// handler for libev
MyHandler handler(loop);

// make a connection
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://"));

// we need a channel too
AMQP::TcpChannel channel(&connection);

// create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
    
    // report the name of the temporary queue
    std::cout << "declared queue " << name << std::endl;
});

// run the loop
uv_run(loop, UV_RUN_DEFAULT);

queue name is printed.

Here is example I tried to implement:

auto *loop = uv_default_loop();

// handler for libev
MyHandler handler(loop);

// make a connection
AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://"));

uv_run(loop, UV_RUN_DEFAULT);

// we need a channel too
AMQP::TcpChannel channel(&connection);

// create a temporary queue
channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
    
    // report the name of the temporary queue
    std::cout << "declared queue " << name << std::endl;
});

single thing printed "connected", program gets stuck.

Where is the thread that you were talking about?

here is crude example:

void uv_TimerCallback(uv_timer_t* handle)
{

}

uv_timer_t timer_req;

void ThreadProc()
{
    uv_timer_init(uv_default_loop(), &timer_req);
    uv_timer_start(&timer_req, uv_TimerCallback, 5000, 2000);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}

int main(int argc, char *argv[])
{
    auto *loop = uv_default_loop();

    std::thread m_thread = std::thread(&ThreadProc);

    // handler for libev
    MyHandler handler(loop);
    
    // make a connection
    AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://"));
    
    // we need a channel too
    AMQP::TcpChannel channel(&connection);
    
    // create a temporary queue
    channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
        
        // report the name of the temporary queue
        std::cout << "declared queue " << name << std::endl;
    });

    if(m_thread.joinable())
				m_thread.join();
     return 0;
}

also prints queue name.

Remember: AMQP objects are not thread safe. You cannot call methods on the objects from multiple threads because you will sooner or later run into crashes, race conditions, unexpected behavior, etcetera. Just like you will run into problems if you start writing to the same file from multiple threads, or to the same memory buffer, you can also not use AMQP connections from multiple threads.

Your MyHandler class lives in the std::thread thread. Inside your handler you call methods like Connection::parse(). This is all good if you do not access the Connection class in other threads too.

And here is the problem: in the main() function of the program, which is a different thread(!), you construct a channel, you call channel.declareQueue() and so on. This is wrong. Because now you use the connection in two threads at the same time.

Maybe it works for you in a test environment, because of pure luck. But in the longer run, if you run this in production, and give it some real load, it will eventually crash or freeze or break or do other nasty things.

I see. Thank you for reply. So my setup currently this: I was planning on having one thread for AMQP where Im initializing connection, channels, queue, etc, and after everything is done just putting thread on conditional variable that wakes up thread if connection is lost to try to reconnect. Second thread is uv loop. That's it.

I've looked at the code and it appears only place where uv loop interacts with AMQP is void TcpConnection::process(int fd, int flags) which accesses AMQP::TcpState which is called as uv loop callback in uv loop thread.
Apparently this isnt thread safe?

This leaves me confused to be honest. UV loop occupies whole thread, and after initialization of AMQP connection (when onReady is called and I created a channel) I have no way of communicating with AMQP besides uv loop. For example, if outside of AMQP-UV thread I'll decide I need to create new channel/queue, how I should go about it? I will need to add new event to loop with a callback "create_new_channel"? This is intended way of communication with AMQP objects?

Don't use threads.. Don't do it. Use the event loop to run your entire application. And if you can't modify your application to have a single event loop, run all the AMQP stuff in a separate thread, and use mutexes or so to hand over work items (like publish-operations or consumed messages) from one thread to the other. But stay far away from even trying to access the AMQP objects from multiple threads at the same time. They are, like almost all C++ objects, not thread safe by design, so don't even try to access them from multiple threads.

And even when you do a deep-inspect of the AMQP-CPP code and think 'well if I do it like this it might just work because I see how it is implemented' you should still not do it. Don't break the encapsulation principle by assuming a certain behavior of AMQP-CPP. The objects are not thread safe. Period.

I obviously would wrap access to defined by me AMQP objects in mutex guard. The thing Im interested whether I should modify uv callback that uses connection to notify AMQP about change in filedescriptors readablity/writeability in mutex too if I will be accessing any other AMQP object that potentially uses connection from outside thread(like creating channel).

One more time: do not run the event loop in a different thread. If you need an event loop for AMQP, run your AMQP code and the event loop in the same thread.

I already rewrote my code to run it in single thread. This not what I am asking, don't I? All my questions still remain valid if you cannot answer them directly not a problem really.

"The thing Im interested whether I should modify uv callback that uses connection to notify AMQP about change in filedescriptors readablity/writeability in mutex too"

Answer: no this is not necessary. You don't have to use mutexes here because you will (of course) run the event loop in the same thread as the rest of the AMQP code, so there is no need to mutex-protect this.

Watch out: it is very important to not use threads!

All right, thank you.