RabbitMQ client (XS for librabbitmq)
Please, before install this module make RabbitMQ-C library.
See https://github.com/alanxz/rabbitmq-c
Make module:
perl Makefile.PL
make
make test
make install
Simple API:
use utf8;
use strict;
use Net::RabbitMQ::Client;
produce();
consume();
sub produce {
my $simple = Net::RabbitMQ::Client->sm_new(
host => "best.host.for.rabbitmq.net",
login => "login", password => "password",
exchange => "test_ex", exchange_type => "direct", exchange_declare => 1,
queue => "test_queue", queue_declare => 1
);
die sm_get_error_desc($simple) unless ref $simple;
my $sm_status = $simple->sm_publish('{"say": "hello"}', content_type => "application/json");
die sm_get_error_desc($sm_status) if $sm_status;
$simple->sm_destroy();
}
sub consume {
my $simple = Net::RabbitMQ::Client->sm_new(
host => "best.host.for.rabbitmq.net",
login => "login", password => "password",
queue => "test_queue"
);
die sm_get_error_desc($simple) unless ref $simple;
my $sm_status = $simple->sm_get_messages(sub {
my ($self, $message) = @_;
print $message, "\n";
return 1; # it is important to return 1 (send ask) or 0
});
die sm_get_error_desc($sm_status) if $sm_status;
$simple->sm_destroy();
}
Base API:
use utf8;
use strict;
use Net::RabbitMQ::Client;
produce();
consume();
sub produce {
my $rmq = Net::RabbitMQ::Client->create();
my $exchange = "test";
my $routingkey = "";
my $messagebody = "lalala";
my $channel = 1;
my $conn = $rmq->new_connection();
my $socket = $rmq->tcp_socket_new($conn);
my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
die "Can't create socket" if $status;
$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq->channel_open($conn, $channel);
die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
$rmq->queue_bind($conn, 1, "test_q", $exchange, $routingkey, 0);
die "Can't bind queue" if $status != AMQP_RESPONSE_NORMAL;
my $props = $rmq->type_create_basic_properties();
$rmq->set_prop__flags($props, AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG);
$rmq->set_prop_content_type($props, "text/plain");
$rmq->set_prop_delivery_mode($props, AMQP_DELIVERY_PERSISTENT);
$status = $rmq->basic_publish($conn, $channel, $exchange, $routingkey, 0, 0, $props, $messagebody);
if($status != AMQP_STATUS_OK) {
print "Can't send message\n";
}
$rmq->type_destroy_basic_properties($props);
$rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
$rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
$rmq->destroy_connection($conn);
}
sub consume {
my $rmq = Net::RabbitMQ::Client->create();
my $exchange = "test";
my $routingkey = "";
my $messagebody = "lalala";
my $channel = 1;
my $conn = $rmq->new_connection();
my $socket = $rmq->tcp_socket_new($conn);
my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
die "Can't create socket" if $status;
$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq->channel_open($conn, $channel);
die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq->basic_consume($conn, 1, "test_q", undef, 0, 1, 0);
die "Consuming" if $status != AMQP_RESPONSE_NORMAL;
my $envelope = $rmq->type_create_envelope();
while (1)
{
$rmq->maybe_release_buffers($conn);
$status = $rmq->consume_message($conn, $envelope, 0, 0);
last if $status != AMQP_RESPONSE_NORMAL;
print "New message: \n", $rmq->envelope_get_message_body($envelope), "\n";
$rmq->destroy_envelope($envelope);
}
$rmq->type_destroy_envelope($envelope);
$rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
$rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
$rmq->destroy_connection($conn);
}
This is glue for RabbitMQ-C library
my $simple = Net::RabbitMQ::Client->sm_new(
host => '', # default:
port => 5672, # default: 5672
channel => 1, # default: 1
login => '', # default:
password => '', # default:
exchange => undef, # default: undef
exchange_type => undef, # optional, if exchange_declare == 1; types: topic, fanout, direct, headers; default: undef
exchange_declare => 0, # declare exchange or not; 1 or 0; default: 0
queue => undef, # default: undef
routingkey => '', # default:
queue_declare => 0, # declare queue or not; 1 or 0; default: 0
);
Return: a Simple object if successful, otherwise an error occurred
my $sm_status = $simple->sm_publish($text,
# default args
content_type => "text/plain",
delivery_mode => AMQP_DELIVERY_PERSISTENT,
_flags => AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG
);
Return: 0 if successful, otherwise an error occurred
Loop to get messages
my $callback = {
my ($simple, $message) = @_;
return 1; # it is important to return 1 (send ask) or 0
}
my $sm_status = $simple->sm_get_messages($callback[, $timeout_sec, $timeout_usec]);
Return: 0 if successful, otherwise an error occurred
Break messages loop after next iteration
$simple->sm_get_messages_break_loop();
Get one message
my $sm_status = 0;
my $message = $simple->sm_get_message($sm_status[, $timeout_sec, $timeout_usec]);
Return: message if successful
my $rmq = $simple->sm_get_rabbitmq();
Return: RabbitMQ Base object
my $conn = $simple->sm_get_connection();
Return: Connection object (from Base API new_connection)
my $socket = $simple->sm_get_socket();
Return: Socket object (from Base API tcp_socket_new)
my $config = $simple->sm_get_config();
Return: Config when creating a Simple object
my $description = $simple->sm_get_error_desc($sm_error_code);
Return: Error description by Simple error code
###head3 sm_queue_meta
my $meta = $simple->sm_queue_meta($queue_name);
Return: hash, {queue => ..., message_count => ..., consumer_count => ...}
###head3 sm_set_qos
$simple->sm_set_qos($prefetch_size, $prefetch_count);
Destroy a Simple object
$simple->sm_destroy();
my $rmq = Net::RabbitMQ::Client->create();
Return: rmq
my $amqp_connection_state_t = $rmq->new_connection();
Return: amqp_connection_state_t
my $amqp_socket_t = $rmq->tcp_socket_new($conn);
Return: amqp_socket_t
my $status = $rmq->socket_open($socket, $host, $port);
Return: status
my $status = $rmq->socket_open_noblock($socket, $host, $port, $struct_timeout);
Return: status
my $status = $rmq->login($conn, $vhost, $channel_max, $frame_max, $heartbeat, $sasl_method);
Return: status
my $status = $rmq->channel_open($conn, $channel);
Return: status
my $res = $rmq->socket_get_sockfd($socket);
Return: variable
my $amqp_socket_t = $rmq->get_socket($conn);
Return: amqp_socket_t
my $status = $rmq->channel_close($conn, $channel, $code);
Return: status
my $status = $rmq->connection_close($conn, $code);
Return: status
my $status = $rmq->destroy_connection($conn);
Return: status
my $amqp_socket_t = $rmq->ssl_socket_new($conn);
Return: amqp_socket_t
my $status = $rmq->ssl_socket_set_key($socket, $cert, $key);
Return: status
$rmq->set_initialize_ssl_library($do_initialize);
my $status = $rmq->ssl_socket_set_cacert($socket, $cacert);
Return: status
my $status = $rmq->ssl_socket_set_key_buffer($socket, $cert, $key, $n);
Return: status
$rmq->ssl_socket_set_verify($socket, $verify);
my $status = $rmq->basic_publish($conn, $channel, $exchange, $routing_key, $mandatory, $immediate, $properties, $body);
Return: status
my $status = $rmq->basic_consume($conn, $channel, $queue, $consumer_tag, $no_local, $no_ack, $exclusive);
Return: status
my $status = $rmq->basic_get($conn, $channel, $queue, $no_ack);
Return: status
my $status = $rmq->basic_ack($conn, $channel, $delivery_tag, $multiple);
Return: status
my $status = $rmq->basic_nack($conn, $channel, $delivery_tag, $multiple, $requeue);
Return: status
my $status = $rmq->basic_reject($conn, $channel, $delivery_tag, $requeue);
Return: status
my $status = $rmq->consume_message($conn, $envelope, $struct_timeout, $flags);
Return: status
my $status = $rmq->queue_declare($conn, $channel, $queue, $passive, $durable, $exclusive, $auto_delete);
Return: status
my $status = $rmq->queue_bind($conn, $channel, $queue, $exchange, $routing_key);
Return: status
my $status = $rmq->queue_unbind($conn, $channel, $queue, $exchange, $routing_key);
Return: status
my $status = $rmq->exchange_declare($conn, $channel, $exchange, $type, $passive, $durable, $auto_delete, $internal);
Return: status
my $res = $rmq->envelope_get_redelivered($envelope);
Return: variable
my $res = $rmq->envelope_get_channel($envelope);
Return: variable
my $res = $rmq->envelope_get_exchange($envelope);
Return: variable
my $res = $rmq->envelope_get_routing_key($envelope);
Return: variable
$rmq->destroy_envelope($envelope);
my $res = $rmq->envelope_get_consumer_tag($envelope);
Return: variable
my $res = $rmq->envelope_get_delivery_tag($envelope);
Return: variable
my $res = $rmq->envelope_get_message_body($envelope);
Return: variable
my $amqp_envelope_t = $rmq->type_create_envelope();
Return: amqp_envelope_t
$rmq->type_destroy_envelope($envelope);
my $struct_timeval = $rmq->type_create_timeout($timeout_sec);
Return: struct_timeval
$rmq->type_destroy_timeout($timeout);
my $amqp_basic_properties_t = $rmq->type_create_basic_properties();
Return: amqp_basic_properties_t
my $status = $rmq->type_destroy_basic_properties($props);
Return: status
$rmq->set_prop_app_id($props, $value);
$rmq->set_prop_content_type($props, $value);
$rmq->set_prop_reply_to($props, $value);
$rmq->set_prop_priority($props, $priority);
$rmq->set_prop__flags($props, $flags);
$rmq->set_prop_user_id($props, $value);
$rmq->set_prop_delivery_mode($props, $delivery_mode);
$rmq->set_prop_message_id($props, $value);
$rmq->set_prop_timestamp($props, $timestamp);
$rmq->set_prop_cluster_id($props, $value);
$rmq->set_prop_correlation_id($props, $value);
$rmq->set_prop_expiration($props, $value);
$rmq->set_prop_type($props, $value);
$rmq->set_prop_content_encoding($props, $value);
my $amqp_boolean_t = $rmq->data_in_buffer($conn);
Return: amqp_boolean_t
$rmq->maybe_release_buffers($conn);
my $res = $rmq->error_string($error);
Return: variable
undef $rmq;
Free mem and destroy object.
Alexander Borisov lex.borisov@gmail.com
This software is copyright (c) 2015 by Alexander Borisov.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.
See librabbitmq license and COPYRIGHT https://github.com/alanxz/rabbitmq-c