/tutorial-rabbitmq

Projeto de estudo para entender o funcionamento do RabbitMQ e suas aplicações práticas.

Primary LanguageC#MIT LicenseMIT

Contributors Forks Stargazers Issues MIT License


tutorial-rabbitmq

Tutoriais RabbitMQ

Projeto de estudo para entender o funcionamento do RabbitMQ e suas aplicações práticas.

Ver problemas abertos · Reportar um problema

Sumário

  1. Sobre este projeto
  2. Configurações do ambiente de desenvolvimento
  3. Introdução
  4. Tutorial 1 » "Hello World!"
  5. Tutorial 2 » Work queues
  6. Tutorial 3 » Publish/Subscribe
  7. Tutorial 4 » Routing
  8. Tutorial 5 » Topics
  9. Tutorial 6 » RPC
  10. Tutorial 7 » Publisher Confirms
  11. Estudo adicional
  12. Licença
  13. Contato
  14. Referências

Sobre este projeto

Este repositório foi elaborado como projeto de estudo para entender o funcionamento do RabbitMQ e suas aplicações práticas.

Toda a documentação aqui transcrita tem como base a documentação oficial, que pode ser encontrada no site da ferramenta.

Tecnologias utilizadas

Configurações do ambiente de desenvolvimento

Para obter uma cópia local atualizada e que possa ser executada corretamente, siga os passos abaixo.

Pré-requisitos

Os tutoriais assumem que o RabbitMQ está instalado e sendo executado em localhost na porta padrão (5672).

Para acessar o ambiente de gerenciamento, utilize as informações abaixo:

Clonando o repositório

git clone https://github.com/ahcantarim/tutorial-rabbitmq.git

Instalando as dependências

No diretório do projeto, executar o(s) comando(s):

dotnet restore

Introdução

RabbitMQ é um message broker: ele aceita e encaminha mensagens. Você pode pensar sobre isso como se fossem os Correios: quando você coloca a carta que você quer em uma caixa de postagem, você pode ter certeza de que eventualmente o carteiro irá entregar sua carta ao destinatário. Nesta analogia, o RabbitMQ é a caixa de postagem, a agência dos Correios e o carteiro.

A maior diferença entre o RabbitMQ e uma agência dos Correios é que ele não lida com papel, ao invés disso aceita, armazena e encaminha blobs binários de dados ‒ mensagens.

O RabbitMQ ‒ e outras ferramentas de mensagens no geral ‒ usa alguns jargões:

  • Producing significa nada mais do que enviando. Um programa que envia mensagens é um Producer (produtor):

Producer

  • Uma Queue (fila) é o nome para uma caixa postal que vive dentro do RabbitMQ. Apesar das mensagens fluirem entre o RabbitMQ e suas aplicações, elas apenas podem ser armazenadas dentro de uma fila. Uma fila é apenas limitada pela memória e espaço em disco do servidor, e é essencialmente um grande buffer de mensagens. Vários Producers podem enviar mensagens que vão para uma fila, e vários Consumers podem tentar receber dados de uma fila. É assim que representamos uma fila:

Queue

  • Consuming tem um significado parecido com producing. Um Consumer (consumidor) é um programa que majoritariamente espera para receber mensagens:

Queue

Note que o Producer, Consumer e o broker não precisam residir no mesmo servidor. De fato na maioria das aplicações isso não acontece. Uma aplicação pode ser ao mesmo tempo um Producer e um Consumer, também.

Conectando aplicações no servidor do RabbitMQ

Para criar uma conexão com o servidor, teremos sempre algo parecido com isso:

class Send
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };

        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            ...
        }
    }
}

A conexão abstrai a conexão socket e se encarrega de negociar a versão do protocolo e a autenticação para nós. Aqui, estamos conectando a um nó local do RabbitMQ ‒ por isso o localhost. Se nós quisermos nos conectar a um nó em um servidor diferente, simplesmente especificamos o HostName ou endereço IP aqui:

var factory = new ConnectionFactory() { HostName = "xxx.xxx.xxx.xxx", Port = 5672, UserName = "xxx", Password = "xxx" };

Declarando uma fila

Declarar uma fila é uma ação idempotente ‒ ela apenas será criada se ainda não existir. Por conta disso, é comum declararmos a fila tanto no Producer quanto no Consumer, pois queremos garantir que a fila exista antes de utilizá-la.

channel.QueueDeclare(queue: "hello",
                     durable: false,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

Enviando mensagens

O conteúdo de mensagem é um array de bytes, então você pode codificar qualquer coisa que quiser.

string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "",
                     routingKey: "hello",
                     basicProperties: null,
                     body: body);

Console.WriteLine($" [x] Sent {message}");

Recebendo mensagens

Considerando que sempre iremos obter as mensagens de forma assíncrona de uma fila do servidor, utilizaremos um callback. O manipulador de evento EventingBasicConsumer.Received nos permite isso.

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
};

channel.BasicConsume(queue: "hello",
                     autoAck: true,
                     consumer: consumer);

Tutorial 1 » "Hello World!"

Basic Producer and Consumer

Neste tutorial, foram escritos dois programas: um produtor que envia uma mensagem única, e um consumidor que recebe mensagens e exibe-as em tela.

No diagrama abaixo, P é nosso produtor e C é nosso consumidor. A caixa no meio é uma fila.

Queue

Sending

Queue

Receiving

Queue

Executando os projetos

  • Tutorial.RabbitMQ.Console.Send: Produtor que conecta no RabbitMQ, envia uma mensagem única, e é finalizado.

  • Tutorial.RabbitMQ.Console.Receive: Consumidor que fica escutando as mensagens do RabbitMQ. Diferente do produtor que envia uma única mensagem e é finalizado, ele será executado continuamente para escutar novas mensagens e exibi-las.

Você pode executar os projetos pelo Visual Studio, pelos executáveis gerados no diretório bin, ou através da linha de comando. Para o último caso, abra dois terminais.

Execute primeiro o Consumer:

cd Tutorial.RabbitMQ.Console.Receive
dotnet run

Depois execute o Producer:

cd Tutorial.RabbitMQ.Console.Send
dotnet run

O Consumer irá exibir as mensagens que obter do Producer via RabbitMQ. O Consumer continuará sendo executado, aguardando por mensagens, então você pode tentar executar um novo Producer a partir de outro terminal.

No próximo tutorial iremos criar uma simples fila de trabalho.

Tutorial 2 » Work queues

Work Queues (aka: Task Queues)

Foi criada uma fila de trabalho que é usada para distribuir tarefas demoradas através de múltiplos Consumers.

Como não temos uma tarefa do mundo real para executar, como redimensionar imagens ou gerar arquivos PDF, simulamos algo nesse sentido através a função Thread.Sleep(). Consideramos o número de . na cadeia de caracteres enviada como sua complexidade: cada . contará como um segundo de "trabalho". Por exemplo, uma tarefa descrita por Hello... demorará 3 segundos para ser finalizada.

Queue

Manual message acknowledgments (ack)

Foi alterado o valor do parâmetro autoAck: false no canal que consome a fila, visando realizar manualmente a confirmação/rejeição da mensagem recebida.

No manipulador de eventos de mensagem recebida, foi implementado o código channel.BasicAck() para confirmar manualmente o processamento da mensagem, após o término da mesma.

var channel = ((EventingBasicConsumer)sender).Model;
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);

Usando esse código nós podemos ter certeza que mesmo que um Consumer seja finalizado no meio do processamento de uma mensagem, nada será perdido. Logo que isso ocorrer, todas as mensagens não confirmadas serão reenviadas para outros Consumers.

Message durability

Anteriormente (com o manual message ack), vimos como garantir que mesmo que o Consumer seja finalizado por algum motivo, a tarefa não seja perdida. Mas, da forma atual, as tarefas seriam perdidas se o servidor do RabbitMQ parasse.

Para que isso não aconteça, devemos marcar tanto a fila quanto as mensagens como durable.

Primeiro, foi alterado o valor do parâmetro durable: true nos canais que declaram a fila para envio (Producer NewTask) e recebimento (Consumer Worker) de mensagens.

Apesar do comando por si só estar correto, não funcionaria na configuração atual. Isso acontece pois uma fila com o nome atual já foi definida (e não como durable). O RabbitMQ não permite que uma fila existente seja redefinida com parâmetros diferentes e retornará um erro. Como alternativa, apenas declararemos a fila com um nome diferente.

Com essa alteração, temos certeza que a fila não será perdida se o RabbitMQ for reiniciado.

Agora, precisamos marcar nossas mensagens como persistentes e, para isso, utilizamos o trecho de código abaixo:

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

Adicionalmente, repassamos tais propriedades para o método channel.BasicPublish().

NOTA SOBRE PERSISTÊNCIA DE MENSAGENS

Marcar as mensagens como persistentes não garante completamente que uma mensagem não será perdida.

Apesar disso dizer ao RabbitMQ para salvar a mensagem no disco, existe uma pequena janela de tempo quando o RabbitMQ aceita uma mensagem e ainda não salvou a mesma.

As garantias de persistência não são fortes, mas são mais do que o necessário para sistemas simples de enfileramento de mensagens.

Se você precisa de uma garantia melhor, então você pode usar as confirmações de publicação (https://www.rabbitmq.com/confirms.html).

Fair Dispatch

Pode-se notar que o envio de mensagens aos Consumers, por vezes, pode não ser "justo". Por exemplo, em uma situação com dois workers, onde todas as mensagens pares tem um processamento pesado e todas as ímpares tem um processamento leve, um worker estará constantemente ocupado e o outro não fará nenhum trabalho pesado.

Isso acontece porque o RabbitMQ apenas envia a mensagem assim que ela entra na fila. Ele não olha para o número de mensagens não confirmadas de um Consumer.

Queue

Para alterar este comportamento, podemos usar o método channel.BasicQos() com um valor de prefetchCount: 1. Isso diz ao RabbitMQ para não dar mais de uma mensagem para um worker ao mesmo tempo. Ou, em outras palavras, não envie uma nova mensagem para um worker até que ele tenha processado e confirmado a anterior. Ao invés disso, ele irá enviá-la para o próximo worker que não estiver ocupado.

channel.BasicQos(0, 1, false);

NOTA SOBRE TAMANHO DA FILA

Se todos os workers estão ocupados, sua fila pode ficar cheia.

Você deve ficar de olho nisso, e talvez adicionar mais workers, ou ter alguma outra estratégia.

Executando os projetos

  • Tutorial.RabbitMQ.Console.NewTask: console para adicionar mensagens em uma fila ;

  • Tutorial.RabbitMQ.Console.Worker: console para ler mensagens de uma fila simulando um processamento para cada mensagem; pode ser executada mais de uma instância e as mensagens serão lidas alternadamente por cada uma;

Você pode executar os projetos pelo Visual Studio, pelos executáveis gerados no diretório bin, ou através da linha de comando. Para o último caso, abra dois terminais.

Execute primeiro o Consumer:

cd Tutorial.RabbitMQ.Console.Worker
dotnet run

Depois execute o Producer:

cd Tutorial.RabbitMQ.Console.NewTask
dotnet run

Você também pode executar cada projeto mais de uma vez (usando mais de um terminal), para verificar como é feita a distribuição de mensagens entre eles. As opções de durabilidade permitem que a mensagem sobreviva mesmo que o RabbitMQ seja reiniciado (ou mesmo que um Consumer seja finalizado no meio do processamento de uma tarefa - neste caso, a tarefa será entregue a outro Consumer assim que possível). Adicionalmente, na execução do Producer, você pode informar um argumento com . para simular uma carga de trabalho maior:

cd Tutorial.RabbitMQ.Console.NewTask
dotnet run "Task que demora 5 segundos....."
dotnet run "Task que demora 3 segundos..."
dotnet run "Task que demora 20 segundos...................."

No próximo tutorial iremos aprender como enviar a mesma mensagem para vários Consumers.

Tutorial 3 » Publish/Subscribe

Publish/Subscribe

No tutorial anterior foi criada uma fila de trabalho. Assume-se através de uma fila de trabalho que cada tarefa é entregue a exatamente um worker.

Agora será feito algo completamente diferente -- iremos entregar uma mesma mensagem a múltiplos Consumers.

Para ilustrar este padrão, foi criado um sistema de log simples. Consiste em dois programas -- o primeiro envia as mensagens de log e o segundo recebe e exibe as mesmas.

Nesse sistema de log, cada cópia do Consumer que estiver sendo executada irá receber as mensagens. Assim, pode-se executar um receptor e direcionar os logs para o disco rígido (arquivo); e ao mesmo tempo pode-se executar outro receptor e visualizar os logs em tela.

Essencialmente, as mensagens publicadas serão transmitidas para todos os receptores.

Exchanges

Até aqui, enviamos e recebemos mensagens de e para uma fila. Agora introduziremos o conceito do modelo completo de mensageria com RabbitMQ.

A ideia principal do modelo de mensagens no RabbitMQ é que um Producer nunca envia nenhuma mensagem diretamente para uma fila. Na verdade, geralmente um Producer sequer sabe se uma mensagem será enviada para alguma fila.

Ao invés disso, o Producer pode apenas enviar mensagens para uma exchange.

Queue

Nos tutoriais anteriores não sabíamos nada sobre exchanges, mas ainda assim fomos capazes de enviar mensagens para filas. Isso foi possível pois estávamos usando a exchange default, a qual é identificada pela cadeia de caracteres vazia ("").

Quando a exchange informada for uma cadeia de caracteres vazia (default ou nameless), as mensagens são encaminhadas para a fila com o nome especificado no parâmetro routingKey, se ela existir.

Temporary queues

Anteriormente usamos filas que tinham nomes específicos. Nomear uma fila foi crucial naquele momento -- nós precisávamos apontar os workers para a mesma fila. Dar nome à filas é importante quando você quer compartilhá-la entre Producers e Consumers.

Mas esse não é o caso da aplicação de log. Aqui, nós queremos escutar todas as mensagens, não apenas um grupo delas. Também estamos interessados apenas no fluxo atual de mensagens e não nas antigas. Por isso, precisamos de duas coisas:

Em primeiro lugar, sempre que nos conectarmos ao RabbitMQ precisamos de uma fila nova e vazia. Para fazer isso nós podemos criar uma fila com um nome aleatório ou, ainda melhor, deixar o servidor escolher um nome aleatório para nós.

Em segundo lugar, assim que desconectarmos o Consumer, a fila deve ser automaticamente deletada.

Quando nós não informamos parâmetros para o método QueueDeclare(), criamos uma fila nomeada e não durável, exclusiva e auto deletável.

var queueName = channel.QueueDeclare().QueueName;

Neste ponto, a propriedade QueueName contém um nome aleatório para a fila. Por exemplo, pode ser algo como amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings

Queue

Nós já criamos a exchange que espalha as mensagens e uma fila. Agora nós precisamos dizer para a exchange para enviar mensagens para nossa fila. Essa relação entre uma exchange e uma fila é chamanda de binding.

O binding é um relacionamento entre uma exchange e uma fila. Pode ser entendido da seguinte forma: a fila está interessada nas mensagens desta exchange.

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

A partir de agora, a exchange logs irá acrescentar mensagens em nossa fila.

Queue

Executando os projetos

  • Tutorial.RabbitMQ.Console.EmitLog: console para transmitir mensagens a uma Exchange;

  • Tutorial.RabbitMQ.Console.ReceiveLogs: console para receber mensagens de uma Exchange;

Você pode executar os projetos pelo Visual Studio, pelos executáveis gerados no diretório bin, ou através da linha de comando. Para o último caso, abra dois terminais.

Execute primeiro o Consumer. Se você quer salvar os logs em um arquivo, utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.ReceiveLogs
dotnet run > logs_from_rabbit.log

Se você quer ver os logs na tela, através de um novo terminal utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.ReceiveLogs
dotnet run

E para gerar os logs utilize o comando:

cd Tutorial.RabbitMQ.Console.EmitLog
dotnet run

No próximo tutorial iremos aprender como escutar um subconjunto de mensagens.

Tutorial 4 » Routing

Routing

No tutorial anterior, criamos um sistema de log simples. Fomos capazes de transmitir mensagens para vários receptores.

Neste tutorial vamos adicionar uma funcionalidade à ele - vamos tornar possível se subscrever apenas a um subconjunto de mensagens. Por exemplo, teremos a possibilidade de direcionar apenas as mensagens de erro crítico para o arquivo em disco, enquanto ainda é possível exibir todas as mensagens de log em tela.

Bindings podem ter um parâmetro extra chamado routingKey. Para evitar a confusão com o parâmetro do método BasicPublish, iremos chamá-lo aqui de binding key. Essa é a forma que nós podemos criar um binding com uma key:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs",
                  routingKey: "black");

O significado de um binding key dependo do tipo de exchange. As exchanges do tipo Fanout, a qual usamos previamente, simplesmente ignoram este valor.

Direct exchange

Nosso sistema de log do tutorial anterior transmite todas as mensagens para todos os Consumers. Nós queremos extender isso para permitir filtrar mensagens baseadas em sua severidade. Por exemplo, nós podemos querer que o programa que está escrevendo mensagens de log no disco apenas receba mensagens de erro crítico, e não desperdice espaço em disco com mensagens de informação ou alertas.

Nós estávamos usando a exchange do tipo Fanout, a qual não nos dá muita flexibilidade - ela é apenas capaz de fazer uma transmissão "pouco esperta".

Ao invés disso, iremos usar uma exchange Direct. O algoritmo de roteamento por trás de uma exchange desse tipo é simples - uma mensagem vai para a fila onde o binding key corresponde exatamente ao routing key da mensagem.

Para ilustrar isso, considere a seguinte configuração:

Queue

Nessa configuração, podemos ver a exchange Direct chamada X com duas filas vinculadas à ela. A primeira fila está vinculada com um binding key orange, e a segunda tem dois bindings: um com o binding key black e outra com green.

Dessa forma, uma mensagem publicada para a exchange com uma routing key orange será roteada para a fila Q1. Mensagens com routing key black ou green irão para a fila Q2. Todas as outras mensagens serão descartadas.

Multiple bindings

Queue

É perfeitamente legal se vincular à múltiplas filas com o mesmo binding key. Em nosso exemplo, nós poderíamos criar vínculos entre X e Q1 com o binding key black. Neste caso, a exchange Direct se comportará como uma Fanout e irá transmitir a mensagem para todas as filas correspondentes. Uma mensagem com o routing key black será entregue tanto para Q1 quanto para Q2.

Emitting logs

Iremos usar este modelo para nosso sistema de log. Ao invés de uma Fanout iremos enviar mensagens para uma exchange Direct. Iremos fornecer a severidade do log como routing key. Desta forma, o Consumer poderá selecionar a severidade que deseja receber.

Como sempre, precisamos primeiro criar uma exchange:

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

E agora estamos prontos para enviar uma mensagem:

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

Para simplificar as coisas, iremos assumir que a severity poderá ser uma das seguintes: info, warning ou error.

Subscribing

Receber mensagens irá funcionar como no tutorial anterior, com uma diferença - iremos criar um novo vínculo para cada severidade na qual estamos interessados.

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

Executando os projetos

Queue

  • Tutorial.RabbitMQ.Console.EmitLogDirect: console para transmitir mensagens a uma Exchange com routing key especificado em forma de severidade do log;

  • Tutorial.RabbitMQ.Console.ReceiveLogsDirect: console para receber mensagens de uma Exchange, vinculado à uma ou mais routing key;

Você pode executar os projetos pelo Visual Studio, pelos executáveis gerados no diretório bin, ou através da linha de comando. Para o último caso, abra dois terminais.

Se você quer salvar em arquivo apenas as mensagens de log de warning e error, utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

Se você quer ver os logs na tela, através de um novo terminal utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.ReceiveLogsDirect
dotnet run info warning error

E, por exemplo, para gerar os logs de error utilize o comando:

cd Tutorial.RabbitMQ.Console.EmitLogDirect
dotnet run error "Run. Run. Or it will explode."

No próximo tutorial veremos como escutar por mensagens baseadas em um modelo.

Tutorial 5 » Topics

No tutorial anterior nós melhoramos nosso sistema de logs. Ao invés de utilizar uma exchange Fanout capaz de apenas espalhar as mensagens, nós utilizamos uma Directe ganhamos a possibilidade de receber os logs de forma seletiva.

Apesar disso, ainda temos limitações - nossa exchange não pode rotear as mensagens baseadas em critérios múltiplos.

Em nosso sistema de log, nós podemos querer subscrever não apenas baseado na severidade da mensagem, mas também baseado na fonte que emitiu o log. Você pode conhecer esse conceito da ferramenta unix syslog, a qual roteia os logs baseado tanto na severidade (info, warn, crit, ...) e facilidade (auth, cron, kern, ...).

Isso poderia nos dar muito mais flexibilidade - nós podemos querer escutar apenas os erros críticos vindo do cron, mas também todos os logs vindos do kern.

Para implementar isso em nosso sistema de logs, nós precisamos aprender sobre uma exchange um pouco mais complexa: Topic.

Topic exchange

Mensagens enviadas a uma exchange Topic não podem ter um routing key arbitrário - ele deve ser uma lista de palavras, separadas por ponto. As palavras podem ser qualquer coisa, mas comumente elas especificam algumas funcionalidades conectadas à mensagem. Alguns exemplos de routing key válidos: stock.usd.nyse, nyse.vmw, quick.orange.rabbit. Também podem existir quantas palavras você quiser no routing key, desde que ele seja limitado a 255 bytes.

O binding key precisa também estar no mesmo formato. A lógica por trás de uma exchange Topic é parecida com uma Direct - uma mensagem enviada com um routing key específico será entregue para todas as filas que estão vinculadas com um binding key correspondente. Entretanto há dois casos especiais para os binding keys:

  • * (asteriscos) podem ser substitudos para exatamente uma palavra.
  • # (sustenidos) podem ser substitutos para nenhuma ou mais palavras.

É mais fácil explicar isso em um exemplo:

Queue

Neste exemplo, iremos enviar mensagens onde todas elas descrevem animais. As mensagens serão enviadas com um routing key que consiste de três palavras (e dois pontos para separação). A primeira palavra no routing key irá descrever a Velocidade, a segunda a Cor e a terceira a Espécie: {velocidade}.{cor}.{espécie}.

Criamos três vínculos: Q1 está vinculada com o binding key *.laranja.* e Q2 com *.*.coelho e lento.#.

Esses vínculos podem ser resumidos como:

  • Q1 está interessada em todos os animais laranjas.
  • Q2 quer escutar tudo sobre coelhos, e tudo sobre animais lentos.

Uma mensagem com um routing key rápido.laranja.coelho será entregue para as duas filas. A mensagem lento.laranja.elefante também irá para ambas. Por outro lado, rápido.laranja.raposa irá apenas para a primeira fila, e lento.marrom.raposa irá apenas para a segunda fila. lento.rosa.coelho será entregue para a segunda fila apenas uma vez, mesmo que corresponda a dois vínculos. rápido.marrom.raposa não corresponde a nenhum vínculo então será descartada.

O que acontece se nós quebrarmos o contrato e enviar uma mensagem com uma ou quatro palavras, como laranja ou rápido.laranja.macho.coelho? Bem, essas mensagens não correspondem a nenhum vínculo e serão perdidas.

Por outro lado, lento.laranja.macho.coelho, mesmo que tenha quatro palavras, irá corresponder ao último vínculo e será entregue à segunda fila.

NOTA SOBRE TOPIC EXCHANGE

Esse tipo de exchange é poderosa e pode se comportar como outras exchanges.

Quando uma fila é vinculada com # (sustenido) no binding key ela irá receber todas as mensagens, independentemente do routing key - como uma exchange Fanout.

Quando os caracteres especiais * (asterisco) e # (sustenido) não são usados nos vínculos, a exchange Topic irá se comportar da mesma forma que uma Direct.

Executando os projetos

  • Tutorial.RabbitMQ.Console.EmitLogTopic: console para transmitir mensagens a uma Exchange com routing key assumidos como tendo duas palavras: facilidade.severidade;

  • Tutorial.RabbitMQ.Console.ReceiveLogsTopic: console para receber mensagens de uma Exchange, especificando o vínculo para o routing key;

Você pode executar os projetos pelo Visual Studio, pelos executáveis gerados no diretório bin, ou através da linha de comando. Para o último caso, abra dois terminais.

Para receber todos os logs, utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.ReceiveLogsTopic
dotnet run "#"

Para receber todos os logs da facilidade kern:

cd Tutorial.RabbitMQ.Console.ReceiveLogsTopic
dotnet run "kern.*"

Ou se você quiser ouvir apenas sobre critical logs:

cd Tutorial.RabbitMQ.Console.ReceiveLogsTopic
dotnet run "*.critical"

Você também pode criar múltiplos vínculos:

cd Tutorial.RabbitMQ.Console.ReceiveLogsTopic
dotnet run "kern.*" "*.critical"

E para gerar um log com o routing key kern.critical, por exemplo, utilize o comando:

cd Tutorial.RabbitMQ.Console.EmitLogTopic
dotnet run "kern.critical" "A critical kernel error."

Divirta-se brincando com esses programas. Note que o código não assume nada sobre os routing ou binding keys, você pode querer fazer algo com mais de dois parâmetros no routing key.

No próximo tutorial, vamos descobrir como fazer uma mensagem de ida e volta, como uma chamada de procedimento remoto (RPC).

Tutorial 6 » RPC

No segundo tutorial, nós aprendemos como usar filas de trabalho para distribuir tarefas demoradas entre vários workers.

Mas e se nós precisarmos executar uma função em um computador remoto (servidor) e aguardar o resultado? Bem, essa é uma história diferente. Este padrão é comumente conhecido como Remote Procedure Call ou RPC.

Nesse tutorial nós vamos usar o RabbitMQ para criar um sistema RPC: um Cliente e um Servidor RPC escalável. Como não temos nenhuma tarefa demorada que valha a pena distribuir, vamos criar um serviço RPC fictício que retorna os números de Fibonacci.

Client interface

Para ilustrar como um serviço RPC poderia ser usado, nós vamos criar uma simples classe cliente. Ela vai expor um método chamado Call que irá enviar uma requisição RPC e bloquear até a resposta ser recebida.

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

Callback queue

No geral, fazer RPC através do RabbitMQ é fácil. Um cliente envia uma mensagem de requisição e o servidor responde com uma mensagem de resposta. Para receber uma resposta, precisamos enviar um endereço de fila de callback com a requisição:

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

// ... then code to read a response message from the callback_queue ...

Propriedades das mensagens

O protocolo AMQP 0-9-1 pré-define um conjunto de 14 propriedades que vão junto com uma mensagem. A maioria das propriedades são raramente usadas, com exceção das seguintes:

  • Persistent: Marca uma mensagem como persistente (com o valor de true) or passageira (com qualquer outro valor). Veja o segundo tutorial.
  • DeliveryMode: aqueles familiares com o protocolo podem escolher usar essa propriedade ao invés de Persistent. Elas controlam a mesma coisa.
  • ContentType: Usado para descrever o mime-type da codificação. Por exemplo, para a codificação JSON frequentemente usada, é uma boa prática setar essa propriedade para application/json.
  • ReplyTo: Comumente usado para nomear uma fila de callback.
  • CorrelationId: Útil para correlacionar respostas RPC com as requisições.

Correlation Id

No método apresentado acima nós sugerimos criar uma fila de callback para cada requisição RPC. Isso é bem ineficiente, mas felizmente há uma maneira melhor - vamos criar uma única fila de callback por cliente.

Isso levanta um novo problema, tendo recebido uma resposta naquela fila não está claro a qual requisição a mesma pertence. É quando a propriedade CorrelationId é usada. Vamos defini-la com um valor único para cada requisição. Mais tarde, quando recebemos uma mensagem na fila de callback, nós olharemos para essa propriedade, e baseado nela poderemos corresponder a resposta com a requisição. Se nós vermos um valor de CorrelationId desconhecido, podemos seguramente descartar a mensagem - ela não perterce às nossas requisições.

Você pode perguntar, por que nós deveríamos ignorar as mensagens desconhecidas na fila de callback, ao invés de falhar com um erro? É devido a possibilidade de uma condição de corrida no lado do servidor. Embora improvável, é possível que o servidor RPC morra logo após nos enviar a resposta, mas antes de enviar uma mensagem de confirmação para a requisição. Se isso acontecer, o servidor RPC reiniciado processará a requisição novamente. É por isso que no cliente devemos lidar com as respostas duplicadas normalmente, e o RPC deve ser idealmente idempotente.

Resumo

Queue

Nosso RPC funcionará dessa forma:

  • Quando o Cliente é inicializado, ele cria uma fila anônima exclusiva de callback.
  • Para uma requisição RPC, o Cliente envia uma mensagem com duas propriedades: ReplyTo, que é definido para a fila de callback e CorrelationId, que é definido com um valor exclusivo para cada requisição.
  • A requisição é enviada para a fila rpc_queue.
  • O worker RPC (Servidor) está esperando por solicitações nessa fila. Quando uma solicitação aparece, ele realiza o trabalho e envia uma mensagem com o resultado de volta para o Cliente, usando a fila da propriedade ReplyTo.
  • O Cliente espera por dados na fila de callback. Quando uma mensagem aparece, ele verifica a propriedade CorrelationId. Se corresponder ao valor da requisição, ele retornará a resposta para a aplicação.

Executando os projetos

  • Tutorial.RabbitMQ.Console.RPCClient: console ???

  • Tutorial.RabbitMQ.Console.RPCServer: console ???

Você pode executar os projetos pelo Visual Studio, pelos executáveis gerados no diretório bin, ou através da linha de comando. Para o último caso, abra dois terminais.

Para iniciar o servidor RPC, utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.RPCServer
dotnet run

Para requisitar um número de Fibonacci, utilize o comando abaixo:

cd Tutorial.RabbitMQ.Console.RPCClient
dotnet run
  • TODO: Finalizar documentação

Tutorial 7 » Publisher Confirms

Confirmações de publicação são uma extensão do RabbitMQ para implementar uma publicação confiável. Quando as confirmações de publicação estão habilitadas em um canal, as mensagens que um cliente publica são confirmadas de forma assíncrona pelo broker, o que significa que foram atendidas no lado do servidor.

Nesse tutorial vamos usar as confirmações de publicação para ter certeza que as mensagens chegaram com segurança ao broker. Cobriremos várias estratégias para usar as confirmações e explicaremos seus prós e contras.

Enabling Publisher Confirms on a Channel

As confirmações de publicação são uma extensão do RabbitMQ ao protocolo AMQP 0.9.1, então elas não estão habilitadas por padrão. Elas são habilitadas a nível de canal com o método ConfirmSelect:

var channel = connection.CreateModel();
channel.ConfirmSelect();

Esse método deve ser chamado em cada canal que você espera que tenha confirmações de publicação. As confirmações devem ser habilitadas apenas uma vez, e não para cada mensagem publicada.

Strategy #1: Publishing Messages Individually

  • TODO: Documentar

Estudo adicional

Testes de carga básica

  • TODO: Documentar

Limite da fila

  • TODO: Documentar

Licença

Distribuído através da licença MIT. Veja LICENSE para mais informações.

Contato

André Cantarim

LinkedIn

Referências

🔝 Voltar ao topo