CopernicaMarketingSoftware/AMQP-CPP

While receiving messages on the consumer, I receive a "connection lost" message sporadically.

Closed this issue · 1 comments

ygtysr commented

Describe the bug
Greetings, I am using version 4.3.26, which is the latest version of the amqp-cpp library.I use the library via a wrapper.One of my test cases is as follows; After publishing 1000 messages, it checks whether all of them are consumed by the consumer and sometimes during these tests I sporadically receive a "connection lost" message when receiving messages on consumer.I wanted to debug the library to observe the error, and in case of an error, I observed that the "_data" field in the "receivefrom" function in "tcpinbuffer.h" was NULL.Since the "_data" variable was marked protected, I did not think that another module would change the variable and the first place I focused on within the module was the "reallocate" function, but I did not encounter any problems here. I also examined the "process" method in the "tcpconnected.h" file. There is an interesting note here before the "onReceived" method call:

" // we need a local copy of the buffer - because it is possible that "this"
 // object gets destructed halfway through the call to the parse() method",

here the move assignment call will be made anyway, so could the parse method cause the _data object to be detructed for some reason?What other effect could there be that would cause the "_dataİ" field to be NULL?

Expected behavior and actual behavior
Regardless of the number of messages, all published messages can be consumed without errors.

Sample code

Here is the consumer implementation on myside:

void consume(const std::string& queue, const std::string& tag, int flags, Table table, t_SuccessCallback onSuccess,
                 t_MessageCallback onMessage, t_ErrorCallback onError)
    {
        consumeParams = {queue, tag, flags, table, onSuccess, onMessage, onError};

        if (isChannelRunning())
        {
            LOG_DEBUG("consume called");
            channel_->consume(queue, tag, flags, details::ConvertTable(table))
                .onSuccess([cb = std::move(onSuccess)]() {
                    LOG_DEBUG("onSuccess called")
                    cb();
                })
                .onReceived(
                    [cb = std::move(onMessage)](const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) {
                        LOG_DEBUG("onReceived called")
                        cb(details::ConvertMessage(message), deliveryTag, redelivered);
                    })
                .onError([cb = std::move(onError)](const char* message) {
                    LOG_DEBUG("onError called")
                    cb(message);
                });
        }
        isConsumed = true;
    }

This is insufficient information. The issue could just as well. e in your code. Please provide a full working example that demonstrates the issue, without using threads.