1.1 AMQP是什麼
AMQP(高階訊息佇列協議)是一個網路協議。它支援符合要求的客戶端應用(application)和訊息中介軟體代理(messaging middleware broker)之間進行通訊。
1.2 訊息佇列是什麼MQ 全稱為Message Queue, 訊息佇列。是一種應用程式對應用程式的通訊方法。應用程式透過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。
2 安裝透過docker進行安裝
首先,進入RabbitMQ官網 http://www.rabbitmq.com/download.html
然後,找到 Docker image 並進入找到你需要安裝的版本, -management 表示有管理介面的,可以瀏覽器訪問。
接著,接來下docker安裝,我這裡裝的 3-management:
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
最後,瀏覽器訪問看下:http://localhost:15672/ ,使用者名稱/密碼: guest/guest
3 使用3.1 “ Hello World!”RabbitMQ是訊息代理:它接受並轉發訊息。您可以將其視為郵局:將您要釋出的郵件放在郵箱中時,可以確保郵遞員先生或女士最終將郵件傳遞給收件人。在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個佇列
生產者程式碼:
using RabbitMQ.Client; //1. 使用名稱空間using System;using System.Text;namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) //2. 建立到伺服器的連線 using (var channel = connection.CreateModel()) //3. 建立一個通道 { channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. 宣告要傳送到的佇列 string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. 將訊息釋出到佇列 Console.WriteLine(" 傳送訊息:{0}", message); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }}
消費者程式碼:使用名稱空間,建立伺服器連線,建立通道,宣告佇列都與生產者程式碼一致,增加了將佇列中的訊息傳遞給我們。由於它將非同步地向我們傳送訊息,因此我們提供了回撥。這就是EventingBasicConsumer.Received事件處理程式所做的。
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine(" 等待訊息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" 接收訊息:{0}", message); }; channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }}
讓我們來看看輸出結果:
傳送端:
傳送訊息:Hello World! Press [enter] to exit.
接收端:
等待訊息。 Press [enter] to exit. 接收訊息:Hello World!
3.2 工作佇列
工作佇列(又稱任務佇列)的主要思想是避免立即執行資源密集型任務,然後必須等待其完成。相反,我們安排任務在以後完成。我們將任務封裝為訊息並將其傳送到佇列。工作進行在後臺執行並不斷的從佇列中取出任務然後執行。當你運行了多個工作程序時,任務佇列中的任務將會被工作程序共享執行。
生產者程式碼:
using RabbitMQ.Client;using System;using System.Text;namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body); Console.WriteLine(" 傳送訊息:{0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return args.Length > 0 ? string.Join(" ", args) : "Hello World!"; } }}
消費者程式碼:
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;using System.Threading;namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" 等待訊息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" 接收訊息:{0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" 接收完成"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }}
迴圈排程使用任務佇列的好處是能夠很容易的並行工作。如果我們積壓了很多工作,我們僅僅透過增加更多的工作者就可以解決問題,使系統的伸縮性更加容易。
讓我們來看看輸出結果:
傳送端:
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息1 傳送訊息:訊息1 Press [enter] to exit.\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息2 傳送訊息:訊息2 Press [enter] to exit.\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息3 傳送訊息:訊息3 Press [enter] to exit.\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 訊息4 傳送訊息:訊息4 Press [enter] to exit.
接收端1:
等待訊息。 Press [enter] to exit. 接收訊息:訊息1 接收完成 接收訊息:訊息3 接收完成
接收端2:
等待訊息。 Press [enter] to exit. 接收訊息:訊息2 接收完成 接收訊息:訊息4 接收完成
預設情況下,RabbitMQ將按順序將每個訊息傳送給下一個使用者。平均而言,每個消費者都會收到相同數量的訊息。這種分發訊息的方式稱為迴圈。與三個或更多的工人一起嘗試。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
使用此程式碼,我們可以確保,即使您在處理訊息時使用CTRL + C殺死工作人員,也不會丟失任何資訊。工人死亡後不久,所有未確認的訊息將重新發送。
訊息永續性我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是,如果RabbitMQ伺服器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,除非您告知不要這樣做,否則它將忘記佇列和訊息。要確保訊息不會丟失,需要做兩件事:我們需要將佇列和訊息都標記為持久。
首先,我們需要確保該佇列將在RabbitMQ節點重啟後繼續存在。為此,我們需要將其宣告為持久的:
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
最後,我們需要將訊息標記為永續性-透過將IBasicProperties.SetPersistent設定為true。
var properties = channel.CreateBasicProperties();properties.Persistent = true;
公平派遣
我們可以將BasicQos方法與 prefetchCount = 1設定一起使用。這告訴RabbitMQ一次不要給工人一個以上的訊息。換句話說,在處理並確認上一條訊息之前,不要將新訊息傳送給工作人員。而是將其分派給不忙的下一個工作程式。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
3.3 釋出/訂閱
在上一個教程中,我們建立了一個工作佇列。工作佇列背後的假設是,每個任務都恰好交付給一個工人。在這一部分中,我們將做一些完全不同的事情-我們將訊息傳達給多個消費者。這種模式稱為“釋出/訂閱”。
生產者程式碼:
using RabbitMQ.Client;using System;using System.Text;namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" 傳送訊息:{0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!"; } }}
生產者程式碼與上一教程看起來沒有太大不同。最重要的變化是我們現在希望將訊息釋出到 Example.RabbitMQ.PublishSubscribe 交換器,而不是無名的訊息交換器。交換型別有以下幾種:direct,topic,headers 和fanout,在這裡我們採用fanout交換型別。
消費者程式碼:
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: ""); Console.WriteLine(" 等待訊息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" 接收訊息:{0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }}
如果沒有佇列繫結到交換機,則訊息將丟失,但這對我們來說是可以的。如果沒有消費者在聽,我們可以安全地丟棄該訊息。
3.4 路由在上一個教程中,我們建立了一個釋出/訂閱。我們能夠向許多接收者廣播訊息。在本教程中,我們將向其中新增功能-將訊息分類指定給具體的訂閱者。
生產者程式碼:
using RabbitMQ.Client;using System;using System.Linq;using System.Text;namespace Example.RabbitMQ.Routing.Producer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct"); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" 傳送訊息:'{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }}
消費者程式碼:
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity); } Console.WriteLine(" 等待訊息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine(" 接收訊息:'{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }}
3.5 話題在上一個教程中,我們改進了訊息系統。代替使用僅能進行虛擬廣播的扇出交換機,我們使用直接交換機,並有選擇地接收訊息的可能性。
儘管使用直接交換對我們的系統進行了改進,但它仍然存在侷限性-它無法基於多個條件進行路由。
*(星號)可以代替一個單詞。#(雜湊)可以替代零個或多個單詞。
生產者程式碼:
using RabbitMQ.Client;using System;using System.Linq;using System.Text;namespace Example.RabbitMQ.Topics.Producer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic"); var routingKey = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" 傳送訊息:'{0}':'{1}'", routingKey, message); } } }}
消費者程式碼:
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp{ class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic"); var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var bindingKey in args) { channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey); } Console.WriteLine(" 等待訊息。 To exit press CTRL+C"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine(" 接收訊息:'{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }}