首頁>技術>

1.1 AMQP是什麼

AMQP(高階訊息佇列協議)是一個網路協議。它支援符合要求的客戶端應用(application)和訊息中介軟體代理(messaging middleware broker)之間進行通訊。

1.2 訊息佇列是什麼

MQ 全稱為Message Queue, 訊息佇列。是一種應用程式對應用程式的通訊方法。應用程式透過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。

2 安裝

透過docker進行安裝

首先,進入RabbitMQ官網 http://www.rabbitmq.com/download.html

然後,找到 Docker image 並進入找到你需要安裝的版本, -management 表示有管理介面的,可以瀏覽器訪問。

接著,接來下docker安裝,我這裡裝的 3-management:

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

最後,瀏覽器訪問看下:http://localhost:15672/ ,使用者名稱/密碼: guest/guest

3 使用3.1 “ Hello World!”

RabbitMQ是訊息代理:它接受並轉發訊息。您可以將其視為郵局:將您要釋出的郵件放在郵箱中時,可以確保郵遞員先生或女士最終將郵件傳遞給收件人。在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個佇列

生產者程式碼:

using RabbitMQ.Client; //1. 使用名稱空間using System;using System.Text;namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection()) //2. 建立到伺服器的連線            using (var channel = connection.CreateModel()) //3. 建立一個通道            {                channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. 宣告要傳送到的佇列                string message = "Hello World!";                var body = Encoding.UTF8.GetBytes(message);                channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. 將訊息釋出到佇列                Console.WriteLine(" 傳送訊息:{0}", message);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();            }        }    }}

消費者程式碼:使用名稱空間,建立伺服器連線,建立通道,宣告佇列都與生產者程式碼一致,增加了將佇列中的訊息傳遞給我們。由於它將非同步地向我們傳送訊息,因此我們提供了回撥。這就是EventingBasicConsumer.Received事件處理程式所做的。

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null);                Console.WriteLine(" 等待訊息。");                var consumer = new EventingBasicConsumer(channel);                consumer.Received += (model, ea) =>                {                    var body = ea.Body.ToArray();                    var message = Encoding.UTF8.GetString(body);                    Console.WriteLine(" 接收訊息:{0}", message);                };                channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();            }        }    }}

讓我們來看看輸出結果:

傳送端:

 傳送訊息:Hello World! Press [enter] to exit.

接收端:

 等待訊息。 Press [enter] to exit. 接收訊息:Hello World!
3.2 工作佇列

工作佇列(又稱任務佇列)的主要思想是避免立即執行資源密集型任務,然後必須等待其完成。相反,我們安排任務在以後完成。我們將任務封裝為訊息並將其傳送到佇列。工作進行在後臺執行並不斷的從佇列中取出任務然後執行。當你運行了多個工作程序時,任務佇列中的任務將會被工作程序共享執行。

生產者程式碼:

using RabbitMQ.Client;using System;using System.Text;namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);                var message = GetMessage(args);                var body = Encoding.UTF8.GetBytes(message);                var properties = channel.CreateBasicProperties();                properties.Persistent = true;                channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body);                Console.WriteLine(" 傳送訊息:{0}", message);            }            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }        private static string GetMessage(string[] args)        {            return args.Length > 0 ? string.Join(" ", args) : "Hello World!";        }    }}

消費者程式碼:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;using System.Threading;namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);                Console.WriteLine(" 等待訊息。");                var consumer = new EventingBasicConsumer(channel);                consumer.Received += (model, ea) =>                {                    byte[] body = ea.Body.ToArray();                    var message = Encoding.UTF8.GetString(body);                    Console.WriteLine(" 接收訊息:{0}", message);                    int dots = message.Split('.').Length - 1;                    Thread.Sleep(dots * 1000);                    Console.WriteLine(" 接收完成");                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                };                channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();            }        }    }}
迴圈排程

使用任務佇列的好處是能夠很容易的並行工作。如果我們積壓了很多工作,我們僅僅透過增加更多的工作者就可以解決問題,使系統的伸縮性更加容易。

讓我們來看看輸出結果:

傳送端:

\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息1 傳送訊息:訊息1 Press [enter] to exit.\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息2 傳送訊息:訊息2 Press [enter] to exit.\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息3 傳送訊息:訊息3 Press [enter] to exit.\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息4 傳送訊息:訊息4 Press [enter] to exit.

接收端1:

 等待訊息。 Press [enter] to exit. 接收訊息:訊息1 接收完成 接收訊息:訊息3 接收完成

接收端2:

 等待訊息。 Press [enter] to exit. 接收訊息:訊息2 接收完成 接收訊息:訊息4 接收完成

預設情況下,RabbitMQ將按順序將每個訊息傳送給下一個使用者。平均而言,每個消費者都會收到相同數量的訊息。這種分發訊息的方式稱為迴圈。與三個或更多的工人一起嘗試。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

使用此程式碼,我們可以確保,即使您在處理訊息時使用CTRL + C殺死工作人員,也不會丟失任何資訊。工人死亡後不久,所有未確認的訊息將重新發送。

訊息永續性

我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是,如果RabbitMQ伺服器停止,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,除非您告知不要這樣做,否則它將忘記佇列和訊息。要確保訊息不會丟失,需要做兩件事:我們需要將佇列和訊息都標記為持久。

首先,我們需要確保該佇列將在RabbitMQ節點重啟後繼續存在。為此,我們需要將其宣告為持久的:

channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

最後,我們需要將訊息標記為永續性-透過將IBasicProperties.SetPersistent設定為true。

var properties = channel.CreateBasicProperties();properties.Persistent = true;
公平派遣

我們可以將BasicQos方法與 prefetchCount = 1設定一起使用。這告訴RabbitMQ一次不要給工人一個以上的訊息。換句話說,在處理並確認上一條訊息之前,不要將新訊息傳送給工作人員。而是將其分派給不忙的下一個工作程式。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
3.3 釋出/訂閱

在上一個教程中,我們建立了一個工作佇列。工作佇列背後的假設是,每個任務都恰好交付給一個工人。在這一部分中,我們將做一些完全不同的事情-我們將訊息傳達給多個消費者。這種模式稱為“釋出/訂閱”。

生產者程式碼:

using RabbitMQ.Client;using System;using System.Text;namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);                var message = GetMessage(args);                var body = Encoding.UTF8.GetBytes(message);                channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body);                Console.WriteLine(" 傳送訊息:{0}", message);            }            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }        private static string GetMessage(string[] args)        {            return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";        }    }}

生產者程式碼與上一教程看起來沒有太大不同。最重要的變化是我們現在希望將訊息釋出到 Example.RabbitMQ.PublishSubscribe 交換器,而不是無名的訊息交換器。交換型別有以下幾種:direct,topic,headers 和fanout,在這裡我們採用fanout交換型別。

消費者程式碼:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);                var queueName = channel.QueueDeclare().QueueName;                channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "");                Console.WriteLine(" 等待訊息。");                var consumer = new EventingBasicConsumer(channel);                consumer.Received += (model, ea) =>                {                    byte[] body = ea.Body.ToArray();                    var message = Encoding.UTF8.GetString(body);                    Console.WriteLine(" 接收訊息:{0}", message);                };                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();            }        }    }}

如果沒有佇列繫結到交換機,則訊息將丟失,但這對我們來說是可以的。如果沒有消費者在聽,我們可以安全地丟棄該訊息。

3.4 路由

在上一個教程中,我們建立了一個釋出/訂閱。我們能夠向許多接收者廣播訊息。在本教程中,我們將向其中新增功能-將訊息分類指定給具體的訂閱者。

生產者程式碼:

using RabbitMQ.Client;using System;using System.Linq;using System.Text;namespace Example.RabbitMQ.Routing.Producer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");                var severity = (args.Length > 0) ? args[0] : "info";                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";                var body = Encoding.UTF8.GetBytes(message);                channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body);                Console.WriteLine(" 傳送訊息:'{0}':'{1}'", severity, message);            }            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }    }}

消費者程式碼:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");                var queueName = channel.QueueDeclare().QueueName;                if (args.Length < 1)                {                    Console.WriteLine(" Press [enter] to exit.");                    Console.ReadLine();                    Environment.ExitCode = 1;                    return;                }                foreach (var severity in args)                {                    channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity);                }                Console.WriteLine(" 等待訊息。");                var consumer = new EventingBasicConsumer(channel);                consumer.Received += (model, ea) =>                {                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());                    var routingKey = ea.RoutingKey;                    Console.WriteLine(" 接收訊息:'{0}':'{1}'", routingKey, message);                };                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();            }        }    }}
3.5 話題

在上一個教程中,我們改進了訊息系統。代替使用僅能進行虛擬廣播的扇出交換機,我們使用直接交換機,並有選擇地接收訊息的可能性。

儘管使用直接交換對我們的系統進行了改進,但它仍然存在侷限性-它無法基於多個條件進行路由。

*(星號)可以代替一個單詞。#(雜湊)可以替代零個或多個單詞。

生產者程式碼:

using RabbitMQ.Client;using System;using System.Linq;using System.Text;namespace Example.RabbitMQ.Topics.Producer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");                var routingKey = (args.Length > 0) ? args[0] : "info";                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";                var body = Encoding.UTF8.GetBytes(message);                channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body);                Console.WriteLine(" 傳送訊息:'{0}':'{1}'", routingKey, message);            }        }    }}

消費者程式碼:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp{    class Program    {        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");                var queueName = channel.QueueDeclare().QueueName;                if (args.Length < 1)                {                    Console.WriteLine(" Press [enter] to exit.");                    Console.ReadLine();                    Environment.ExitCode = 1;                    return;                }                foreach (var bindingKey in args)                {                    channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey);                }                Console.WriteLine(" 等待訊息。 To exit press CTRL+C");                var consumer = new EventingBasicConsumer(channel);                consumer.Received += (model, ea) =>                {                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());                    var routingKey = ea.RoutingKey;                    Console.WriteLine(" 接收訊息:'{0}':'{1}'", routingKey, message);                };                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();            }        }    }}

13
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • CNN大戰驗證碼