/RabbitMQ

RabbitMQ for Direct Exchange, Topic Exchange, Header Exchange with Fanout Exchange

Primary LanguageC#

Youtube : https://www.youtube.com/watch?v=NwVbiYet1iI&feature=youtu.be

---------

---------

AMQP (Advanced Message Queuing Protocol) • Gelişmiş bir mesajlaşma protokolüdür. • Platform bağımsız beberleşme imkanı sunuyor. AMQP destekli bir sunucu var ise, sunucu ile hemen hemen her dil ile Client Bağlantısı yazılabilir. • AMQP paketlerinde, header, properties ve message alanları bulunur. AMQP protokolünde toplam 4 adet exchange tipi var. Direct, Fanout, Topic ve Header. Direct AMQP Mesajlaşma Mimarileri ? Direct Exchange (Point-to-point)

? Bir mesaj üreticisi tarafından (Producer, Publisher) üretilen mesajın sadece bir Tüketiciye (Consumer) iletildiği yapıdır. Birden fazla aynı işi yapan tüketici (Consumer) yazılmasına rağmen, mesaj yalnızca bir tüketici (Consumer) tarafından işlenir. ? Mesajlar, mesajın “binding key” değeri ile sıranın “routing key” değerleri arasında bir birleşme vardır. Elimizde hangi mesajın hangi sıraya gideceğini belirleme seçeneği vardır. ? Eğer Direct Exchange kullanırken binding_key ve routing_key belirlenmemiş ise, Direct Exchange Fanout Exchange gibi hareket edip, mesajları ilişkide olduğu tüm sıralara gönderecktir. ? Direct Exchange kullanırken, Queue oluşturduğumuzda ki Route Key’i Producer belirtmez ise sistemdeki tüm Direct Exchange Queue lere mesaj eklenecektir. ? Bir Queue’ ye birden fazla Route Key verilir. Publish edilen Route Key’ e göre Queue’ ye mesaj gönderilir.

? Fanout Exchange (Publish-subscribe)

? Producer’ ler tarafından üretilen mesajların, Tüm Consumer’ lara gönderildiği Exchange Type’ dır. ? Fanout Exchange’ e bağlı tüm Queue lere Producer’ den gelen mesaj iletilir. ? Fanout Exchange’ de Route Key in bir önemi yoktur. ? Fanout Exchange’ de Producer tarafından bir Queue belirtilmelidir. Yoksa sistem kendisi bir Queue oluşturacaktır.

? Topic Exchange

Topic Exchange Type; Producer tarafından gönderilen bir mesajın, kurallar göz önünde bulundurularak farklı Queue lara gönderilmesi işleminde kullanılır. Kural ları *. Şeklinde belirtebiliyoruz. Aşağıdaki örnek Topik Exchange Route Key kuralları ile kullanımları belirtilmiştir. Kural Kullanımı .serif 2 kelimeden oluşan 2. Kelimesi serif olan serif. 2 kelimeden oluşan 1. Kelimesi serif olan .serif. 3 kelimeden oluşan Ortadaki kelimesi serif olan serif Sadece serif olucak

? Header Exchange

RabbitMQ ile Queue oluşturuken, Queue ların Header larına key-value şeklinde parametreler yazabiliriz. Producer bir Mesaj gönderdiğinde mesajın Properties’ inde bulunan header bilgisine göre, gerekli Queue kuyruğuna mesaj eklenecektir.

RabbitMQ RabbitMQ, bir mesaj kuyruğu sistemidir. Publish ve Subscribe mantığı ile çalışır. RabbitMQ Erlang işletim sistemi ile yazılmıştır. • RabbitMQ bir çok yazılım diline destek vermektedir. • RabbitMQ bir çok işletim sistemi ile çalışır. • RabbitMQ Open Source dur. • RabbitMQ’ yu işletim sistemine kurmadan önce Erlang dilinin o işletim sistemine kurulması gerekmektedir. • RabbitMQ içerisinde bulundurduğu Publish, Subscribe ve Routing mekanizması ile Gelişmiş Mesaj Kuyruğu Protokolü (AMQP) standardına uygun olarak çalışmaktadır • RabbitMQ Queue’ si FIFO (First in First out) ilk giren ilk çıkar, mantığında çalışmaktadır. RabbitMQ ile ölçeklenebilir (scakability) bir ortam oluşturabiliriz. Anlık yapılmayacak işlemleri asenkron şekilde gerçekleştirerek; • Uygulamalarımızı kullanan kişileri gereksiz bir response time maliyetinden arındırmış oluruz. • Server üzerindeki concurrent process maliyetini bir nebze ölçeklenebilir bir hale getirmiş oluruz. RabbitMQ installer site : http://www.rabbitmq.com/install-windows.html Erlang installer site : http://www.erlang.org/downloads RabbitMQ kullanmak için bazı özel tanımları bilmemiz gerekmektedir. • Producer (Publisher) : Kuyruğa mesaj gönderen uygulamadır. Mesajı Exchange’ e gönderir. • Consumer : Kuyruktaki mesajı dinleyecek olan uygulamadır. • Queue : Mesajların RabbitMQ tarafında eklendiği kuyruktur. Exchange’ den gelen mesajı tutar. • Exchange : Mesajı Preducer’ den alır ve Queue’ ye gönderir. Exchange mecburi değildir. Bir kaç Exchange tipi bulunmaktadır. Yaptığı işlem ise ilgili Routing Key’ e göre mesajı ilgili Queue’ ye yönlendirmektir. o Binding – Exchange içinde tanımlanan kurallardır ve hangi mesajın hangi Queue’ ye iletileceğini belirler. • Exchange Type : İlgili mesaj’ ın Routing Key’ e göre hangi Queue’ ye nasıl yönlendireceğini belirlemektir. 4 adettir. Direct, Fanout, Headers, Topic Exchange

• Producer: Queue’ya mesaj gönderen uygulamadır. Yani Publisher’ımız. • Consumer: Queue’daki mesajları dinleyecek olan uygulamamızdır. Erlang’ i kurduktan sonra, RabbitMQ yu bilgisayarımıza indirelim ve kuralım. Ardından RabbitMQ Command Prompt’ u Administrator Yönetici olarak çalıştıralım. Komut Satırı : rabbitmq-plugins enable rabbitmq_management

Yukarıdaki komut satırını RabbitMQ Command Prompt’ da çalıştırdığımızda, yukarıdaki resimde de görüldüğü üzere Bilgisayar da kurulu olan RabbitMQ servislerini çalıştırır. Erlang ve RabbitMQ kurulumunu gerçekleştirdik den sonra. Default olarak http://localhost:15672 adresinden RabbitMW yönetim ekranını açabiliriz.

Servis olarak hizmetlerde çalışmaktadır, buradan durdurup veya restart edebiliriz.

Windows başlat menüsüne yukarıdaki RabbitMQ özel komutları gelecektir. http://localhost:15672 ile RabbitMQ Dashboard’ı ını açabiliriz. Fakat girdiğimizde bizi bir Login ekranı karşılayacak default olarak aşağıdaki kullanıcı adı ve şifre girilmelidir. • Kullanıcı Adı : guest • Şifre : guest

RabbitMQ Menüleri ve İşlemleri

  1. Overview Menüsü RabbitMQ server üzerinde ki tüm analiz ve raporları buradan canlı olarak takip edebilir gerekirse müdahale edebiliriz.

  2. Connection Menüsü
    RabbitMQ Server’ a bağlı olan Consumer bilgisini canlı olarak buradan tüm detaylı analiz bilgileri ile ulaşabiliriz.

  3. Channels Menüsü

RabbitMQ Server’ a bağlı olan Consumer bilgisini canlı olarak buradan tüm detaylı analiz bilgileri ile ulaşabiliriz. Consumer’ ın • Virtual host bilgisine ulaşılabilir. Virtual host • Kaç adet mesajı okuduğu bilgisi burada mevcut. Unacked

  1. Exchanges Menüsü

• RabbitMQ’ da 4 adet Exchange tipi vardır. • Bizim de oluşturduğumuz Exchange Tipleri burada listelenir. 5. Queues Menüsü

RabbitMQ server’ da Oluşturulan tüm QUEUE listesine buradan ulaşıyoruz. Buradan Queue ne durumda kaç adet var kıçı işlendi gibi bir çok Queue özelindeki bilgiye rahatlıkla erişip yönetebiliyoruz. • Ready : İletilecek olan mesaj sayısı • Unacked : Consumer lar tafından işlenen mesaj sayısı • Total : Queue içerisindeki mesaj sayısı. • Queue ile ilgili tüm detay bilgilere buradan ulaşabiliriz.

  1. RabbitMQ Admin İşlemleri

http://localhost:15672/ adından default oluşan bir host ile RabbitMQ nun tüm süreçlerini kontrol edebiliyoruz. Şu anda kullanıcı işlemlerini inceleyeceğiz. NOT : RabbitMQ’ yu bir server da çalıştırdığımızda, farklı bir lokasyondan veya bilgisayardan guest acount’u ile giriş yapamayız. Yeni bir Acount oluşturmamız gerekmektedir. RabbitMQ’ yu farklı serverlara kurup yönetmek isteyebiliriz. Bunun için yapmamız gereken bir kaç adım var.

  1. Add a user sekmesine tıklayalım ve bir kullanıcı açalım.

  2. Yukarıdaki işlemin ardından Add user diyelim ve All users sekmesine dönelim.

  3. Açılan kullanıcıya Virtual Host eklemek için, Admin menüsündeyken Virtual hosts sekmesine tıklayalım. a. Default olarak oluşturulan “/” host un içerisine kullanıcıyı ekleyebiliriz b. Add virtual host ile yeni Virtual Host oluşturup kullanıcıyı bu host’ a set edebiliriz.

  4. Yukarıdaki işlemleri bitirdikten sonra, artık farklı lokasyonlardan ve bilgisayardan Server da bulunan RabbitMQ management uygulamasına erişebiliriz. Aynı zamanda kodlama yapmak için bir Connection oluşturmuş olduk.

  5. Policies, Limits, Cluster vb. gibi ayarlamalarıda yine Admin penceresi altından yapabiliriz.

  6. Default olarak gelen “/” Virtual host yerine kendimizde istersek bir Virtusl host oluşturabiliriz.

Artık istediğimiz takdirde Admin kullanıcısı arvato Virtusl host’ u ile sisteme bağlanabilir, ve kodlama yapabilir.

RabbitMQ Örnek Kodlar. Direct Exchange ? Producer var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(CL_ConstModel.ExchangeName, ExchangeType.Direct, true, false, null);

                //QUEUE CREATED
                channel.QueueDeclare(CL_ConstModel.queue1Name, true, false, false, null);
                channel.QueueDeclare(CL_ConstModel.queue2Name, true, false, false, null);
                channel.QueueBind(CL_ConstModel.queue1Name, CL_ConstModel.ExchangeName, CL_ConstModel.queue1RouteKey);
                channel.QueueBind(CL_ConstModel.queue2Name, CL_ConstModel.ExchangeName, CL_ConstModel.queue2RouteKey);

                //QUEUE MESSAGE SEND
                var publicationAddress = new PublicationAddress(ExchangeType.Direct, CL_ConstModel.ExchangeName, CL_ConstModel.queue1RouteKey);

                string message = JsonConvert.SerializeObject(UserService.Load());

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(publicationAddress, null, body);
            }
        }

        Console.WriteLine("Mesaj publish edildi, Direct Exchange' e bağlı Route Key' e göre mesaj iletildi 2999.");
        Console.ReadLine();

? Consumer Console.WriteLine("Direct Consumer 1");

        var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    index++;
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);

                    User user = new User();

                    user = JsonConvert.DeserializeObject<User>(message);

                    Console.WriteLine(index + " Queue1 üzerinden mesaj alındı: {0} , {1} , {2}", user.Id, user.Name, user.Surname);
                };

                channel.BasicConsume(CL_ConstModel.queue1Name, false, consumer);
                Console.ReadLine();
            }
        }

Fanout Exchange ? Producer var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(CL_ConstModel.ExchangeName, ExchangeType.Fanout, true, false, null);

                channel.QueueDeclare(CL_ConstModel.queue1Name, true, false, false, null);
                channel.QueueDeclare(CL_ConstModel.queue2Name, true, false, false, null);

                channel.QueueBind(CL_ConstModel.queue1Name, CL_ConstModel.ExchangeName, "");
                channel.QueueBind(CL_ConstModel.queue2Name, CL_ConstModel.ExchangeName, "");

                var publicationAddress = new PublicationAddress(ExchangeType.Fanout, CL_ConstModel.ExchangeName, "");

                string message = JsonConvert.SerializeObject(UserService.Load());

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(publicationAddress, null, body);
            }
        }

        Console.WriteLine("Mesaj publish edildi, Fanout Exchange' e bağlı tüm Queue lere mesaj iletildi.");
        Console.ReadLine();

? Consumer Console.WriteLine("Fanout Consumer 1"); var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    index++;
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    User user = JsonConvert.DeserializeObject<User>(message);

                    Console.WriteLine(index + " Queue1 üzerinden mesaj alındı: {0} , {1} , {2}", user.Id, user.Name, user.Surname);
                };

                channel.BasicConsume(CL_ConstModel.queue1Name, false, consumer);
                Console.ReadLine();
            }
        }

Header Exchange ? Producer var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(CL_ConstModel.ExchangeName, ExchangeType.Headers, true, false, null);
                channel.QueueDeclare(CL_ConstModel.queue1Name, true, false, false, null);

                Dictionary<string, object> headerOptionsWithAll = new Dictionary<string, object>();
                headerOptionsWithAll.Add("x-match", "all");
                headerOptionsWithAll.Add("category", "animal");
                headerOptionsWithAll.Add("type", "mammal");
                channel.QueueBind(CL_ConstModel.queue1Name, CL_ConstModel.ExchangeName, "", headerOptionsWithAll);

                //Dictionary<string, object> headerOptionsWithAny = new Dictionary<string, object>();
                //headerOptionsWithAny.Add("x-match", "any");
                //headerOptionsWithAny.Add("category", "plant");
                //headerOptionsWithAny.Add("type", "tree");
                //channel.QueueBind(CL_ConstModel.queue1Name, CL_ConstModel.ExchangeName, "", headerOptionsWithAny);

                Dictionary<string, object> messageHeaders = new Dictionary<string, object>();

                IBasicProperties properties = channel.CreateBasicProperties();
                messageHeaders = new Dictionary<string, object>();
                messageHeaders.Add("category", "animal");
                messageHeaders.Add("type", "mammal");
                messageHeaders.Add("x-match", "all");
                properties.Headers = messageHeaders;

                var publicationAddress = new PublicationAddress(ExchangeType.Headers, CL_ConstModel.ExchangeName, "");
                string message = JsonConvert.SerializeObject(UserService.Load());
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(publicationAddress, properties, body);
            }
        }

? Consumer Console.WriteLine("Header Consumer 2");

        var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    User user = JsonConvert.DeserializeObject<User>(message);

                    Console.WriteLine("Queue1 üzerinden mesaj alındı: {0} , {1} , {2}", user.Id, user.Name, user.Surname);
                };

                channel.BasicConsume(CL_ConstModel.queue2Name, false, consumer);
                Console.ReadLine();
            }
        }

Topix Exchange ? Producer var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(CL_ConstModel.ExchangeName, ExchangeType.Topic, true, false, null);

                channel.QueueDeclare(CL_ConstModel.queue1Name, true, false, false, null);
                channel.QueueDeclare(CL_ConstModel.queue2Name, true, false, false, null);

                channel.QueueBind(CL_ConstModel.queue1Name, CL_ConstModel.ExchangeName, "*.bmw");
                channel.QueueBind(CL_ConstModel.queue1Name, CL_ConstModel.ExchangeName, "serif.*");

                channel.QueueBind(CL_ConstModel.queue2Name, CL_ConstModel.ExchangeName, "merc.*");

                var publicationAddress = new PublicationAddress(ExchangeType.Headers, CL_ConstModel.ExchangeName, "merc");

                string message = JsonConvert.SerializeObject(UserService.Load());

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(publicationAddress, null, body);
            }
        }

? Consumer Console.WriteLine("Topic Consumer 1");

        var rabbitMQService = new RabbitMQService();

        using (var connection = rabbitMQService.GetRabbitMQConnection())
        {
            using (var channel = connection.CreateModel())
            {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    index++;

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    User user = JsonConvert.DeserializeObject<User>(message);

                    Console.WriteLine(index + " Queue1 üzerinden mesaj alındı: {0} , {1} , {2}", user.Id, user.Name, user.Surname);
                };

                channel.BasicConsume(CL_ConstModel.queue1Name, false, consumer);
                Console.ReadLine();
            }
        }