No connection is made
Opened this issue · 1 comments
I am trying to get the "TCP Connection" example to work, but it just won't work.
I installed RabbitMQ on my device and the service is running.
sudo rabbitmq-diagnostics status
yields
Listeners
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
I added a custom user user:pass
and a custom virtualhost coolhost
for testing purposes.
sudo rabbitmqctl list_permissions -p coolhost
yields
Listing permissions for vhost "coolhost" ...
user configure write read
user .* .* .*
Now all I want to do is to create an exchange, a queue and a binding connecting the latter two in a C++ program with the AMQP-CPP library. But I just can't get it to work. Here is my code:
//main.cpp
#include "MyTcpHandler.h"
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
#include <iostream>
int main()
{
MyTcpHandler myHandler;
AMQP::Address address("localhost", ((uint16_t)5672), AMQP::Login("user", "pass"), "coolhost");
// create a AMQP connection object
AMQP::TcpConnection connection(&myHandler, address);
// and create a channel
AMQP::TcpChannel channel(&connection);
channel.onError([](const char* message)
{
std::cout << "error! " << message << std::endl;
});
// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout)
.onSuccess([](){
std::cout << "exchange declared" << std::endl;
})
.onError([](const char* message){
std::cout << "error! " << message << std::endl;
})
;
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");
std::cout << "Not ready yet" << std::endl;
while(!connection.ready())
{
}
std::cout << "Ready!" << std::endl;
return 0;
}
//MyTcpHandler.h
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{
/**
* Method that is called by the AMQP library when a new connection
* is associated with the handler. This is the first call to your handler
* @param connection The connection that is attached to the handler
*/
virtual void onAttached(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation, for example initialize things
// to handle the connection.
std::cout << "Attached!" << std::endl;
}
/**
* Method that is called by the AMQP library when the TCP connection
* has been established. After this method has been called, the library
* still has take care of setting up the optional TLS layer and of
* setting up the AMQP connection on top of the TCP layer., This method
* is always paired with a later call to onLost().
* @param connection The connection that can now be used
*/
virtual void onConnected(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation (probably not needed)
std::cout << "Conected!" << std::endl;
}
/**
* Method that is called when the secure TLS connection has been established.
* This is only called for amqps:// connections. It allows you to inspect
* whether the connection is secure enough for your liking (you can
* for example check the server certificate). The AMQP protocol still has
* to be started.
* @param connection The connection that has been secured
* @param ssl SSL structure from openssl library
* @return bool True if connection can be used
*/
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
{
// @todo
// add your own implementation, for example by reading out the
// certificate and check if it is indeed yours
std::cout << "Secured!" << std::endl;
return true;
}
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this the connection is ready to use.
* @param connection The connection that can now be used
*/
virtual void onReady(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation, for example by creating a channel
// instance, and start publishing or consuming
std::cout << "Ready!" << std::endl;
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized, or the underlying connection is lost. This
* call is normally followed by a call to onLost() (if the error occurred
* after the TCP connection was established) and onDetached().
* @param connection The connection on which the error occurred
* @param message A human readable error message
*/
virtual void onError(AMQP::TcpConnection *connection, const char *message) override
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program and logging the error
std::cout << "Error!" << std::endl;
}
/**
* Method that is called when the AMQP protocol is ended. This is the
* counter-part of a call to connection.close() to graceful shutdown
* the connection. Note that the TCP connection is at this time still
* active, and you will also receive calls to onLost() and onDetached()
* @param connection The connection over which the AMQP protocol ended
*/
virtual void onClosed(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation (probably not necessary, but it could
// be useful if you want to do some something immediately after the
// amqp connection is over, but do not want to wait for the tcp
// connection to shut down
std::cout << "Closed!" << std::endl;
}
/**
* Method that is called when the TCP connection was closed or lost.
* This method is always called if there was also a call to onConnected()
* @param connection The connection that was closed and that is now unusable
*/
virtual void onLost(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation (probably not necessary)
std::cout << "Lost!" << std::endl;
}
/**
* Final method that is called. This signals that no further calls to your
* handler will be made about the connection.
* @param connection The connection that can be destructed
*/
virtual void onDetached(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation, like cleanup resources or exit the application
std::cout << "Detached!" << std::endl;
}
/**
* Method that is called by the AMQP-CPP library when it wants to interact
* with the main event loop. The AMQP-CPP library is completely non-blocking,
* and only make "write()" or "read()" system calls when it knows in advance
* that these calls will not block. To register a filedescriptor in the
* event loop, it calls this "monitor()" method with a filedescriptor and
* flags telling whether the filedescriptor should be checked for readability
* or writability.
*
* @param connection The connection that wants to interact with the event loop
* @param fd The filedescriptor that should be checked
* @param flags Bitwise or of AMQP::readable and/or AMQP::writable
*/
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
{
// @todo
// add your own implementation, for example by adding the file
// descriptor to the main application event loop (like the select() or
// poll() loop). When the event loop reports that the descriptor becomes
// readable and/or writable, it is up to you to inform the AMQP-CPP
// library that the filedescriptor is active by calling the
// connection->process(fd, flags) method.
std::cout << "Monitor" << std::endl;
}
};
I build the project with CMake with no problems like this:
cmake_minimum_required(VERSION 3.16)
project(Prog LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
add_executable(
Prog
main.cpp
)
target_link_libraries(Prog
amqpcpp
pthread
dl
)
When I run the program, I only get this output:
Monitor
Attached!
Not ready yet
None of my custom callback functions get called, except for "onAttached". I really don't know what's going on here.
The 5672 port is open and RabbitMQ is listening for connections.
telnet localhost 5672
yields
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
12345
Connection closed by foreign host.
I looked for the RabbitMQ logfile (var/log/rabbitmq/rabbit@ubuntu20.log
) and found this:
2022-10-14 11:18:45.469020+02:00 [info] <0.518.0> Ready to start client connection listeners
2022-10-14 11:18:45.471168+02:00 [info] <0.536.0> started TCP listener on [::]:5672
2022-10-14 11:18:45.535473+02:00 [info] <0.518.0> Server startup complete; 0 plugins started.
2022-10-14 11:19:45.336525+02:00 [info] <0.539.0> accepting AMQP connection <0.539.0> (127.0.0.1:32942 -> 127.0.0.1:5672)
2022-10-14 11:19:45.336586+02:00 [error] <0.539.0> closing AMQP connection <0.539.0> (127.0.0.1:32942 -> 127.0.0.1:5672):
2022-10-14 11:19:45.336586+02:00 [error] <0.539.0> {handshake_timeout,handshake}
2022-10-14 11:32:39.955539+02:00 [info] <0.603.0> accepting AMQP connection <0.603.0> (127.0.0.1:52766 -> 127.0.0.1:5672)
2022-10-14 11:32:39.955619+02:00 [error] <0.603.0> closing AMQP connection <0.603.0> (127.0.0.1:52766 -> 127.0.0.1:5672):
2022-10-14 11:32:39.955619+02:00 [error] <0.603.0> {handshake_timeout,handshake}
2022-10-14 11:34:02.693603+02:00 [info] <0.610.0> accepting AMQP connection <0.610.0> (127.0.0.1:58656 -> 127.0.0.1:5672)
2022-10-14 11:34:08.167533+02:00 [error] <0.610.0> closing AMQP connection <0.610.0> (127.0.0.1:58656 -> 127.0.0.1:5672):
2022-10-14 11:34:08.167533+02:00 [error] <0.610.0> {handshake_timeout,handshake}
2022-10-14 11:35:13.942528+02:00 [info] <0.616.0> accepting AMQP connection <0.616.0> (127.0.0.1:51686 -> 127.0.0.1:5672)
2022-10-14 11:35:13.942599+02:00 [error] <0.616.0> closing AMQP connection <0.616.0> (127.0.0.1:51686 -> 127.0.0.1:5672):
2022-10-14 11:35:13.942599+02:00 [error] <0.616.0> {handshake_timeout,handshake}
It seems like RabbitMQ registers the connections, but for some reason immediately closes is due to a "handshake timeout". What could be the problem here?
You're not running an event loop.
while(!connection.ready())
{
}
This should be something like "myeventloop.run()" -- depending on the type of event loop mechanism that you use of course. Plus make sure that you've implemented all the methods in the ConnectionHandler to interact with the event loop.