-
1 # 寫程式設計師的程式碼
-
2 # 網路圈
當下主流的訊息系統有RabbitMQ、RocketMQ、ActiveMQ等,而RabbitMQ是基於Erlang開發,無論是併發、延時表現都很好。
RabbitMQ訊息可靠性是靠什麼實現的?訊息可靠性是RabbitMQ的一大特點,RabbitMQ靠什麼實現訊息可靠性的呢?其實就是透過訊息持久化來實現的,這樣就避免了服務異常(重啟、宕機)下訊息和佇列丟失的風險。
訊息持久化是指RabbitMQ將記憶體中的資料(交換器Exchange、佇列Queue、訊息Message)落地到硬碟中儲存,以防止異常情況導致記憶體中的資料丟失。
RabbitMQ如何實現訊息持久化?RabbitMQ中不同資料持久化方式是不同的,主要有:
1、交換器(Exchange)的持久化
交換器Exchange若丟失會影響後續訊息的寫入,我們在建立Exchange時需要指定durable為true表示持久化。
2、佇列(Queue)的持久化
在上面第一步操作中,在建立交換器時即使設定了durable為true後,Exchange不會丟失,但是裡面的佇列依舊會丟失。如何保證佇列持久化呢?同樣是在建立佇列時指定durable為true即可。
3、訊息(Message)的持久化
上面兩步操作後,在重啟RabbitMQ後,雖然Exchange和Queue不會丟失,但是Queue裡的訊息是會丟失的,那如何保證訊息持久化不丟失呢?我們設定訊息投遞模式(deliveryMode)為2即代表訊息持久化。
訊息持久化並不能100%保證資料不丟失當我們將交換器/佇列/訊息都設定了持久化依舊不能100%保證資料不會丟失。這其實很好理解,記憶體中的資料寫入硬碟是要時間的,突然斷電、宕機重啟等情況時訊息可能沒來得及落地,那麼這些訊息就有丟失的可能。
訊息持久化會帶來效能問題我們知道訊息持久化是將記憶體中的資料寫入硬碟中,但硬碟的讀寫速度遠不如記憶體,所以開啟訊息持久化後會影響RabbitMQ的效能。
-
3 # IT實戰聯盟
1. 前言
如何保證RabbitMQ異常情況(人為重啟、異常宕機等)下,佇列和訊息不丟失?
2. 本篇概要
要解決該問題,就要用到RabbitMQ中持久化的概念,所謂持久化,就是RabbitMQ會將記憶體中的資料(Exchange 交換器,Queue 佇列,Message 訊息)固化到磁碟,以防異常情況發生時,資料丟失。
其中,RabblitMQ的持久化分為三個部分:
交換器(Exchange)的持久化
佇列(Queue)的持久化
訊息(Message)的持久化
3. 交換器(Exchange)的持久化
在上篇部落格中,我們宣告Exchange的程式碼是這樣的:
private final static String EXCHANGE_NAME = "normal-confirm-exchange";
// 建立一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
這種情況下宣告的Exchange是非持久化的,在RabbitMQ出現異常情況(重啟,宕機)時,該Exchange會丟失,會影響後續的訊息寫入該Exchange,那麼如何設定Exchange為持久化的呢?答案是設定durable引數。
durable:設定是否持久化。durable設定為true表示持久化,反之是非持久化。
持久化可以將交換器存檔,在伺服器重啟的時候不會丟失相關資訊。
設定Exchange持久化:
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此時呼叫的過載方法為:
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}
為了能更好的理解,我們新建個生產類如下:
package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線
ConnectionFactory factory = new ConnectionFactory();
// 設定 RabbitMQ 的主機名
factory.setHost("localhost");
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個通道
Channel channel = connection.createChannel();
// 建立一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 傳送訊息
String message = "durable exchange test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
示例程式碼中,我們新建了1個非持久化的Exchange,1個非持久化的Queue,並將它們做了繫結,此時執行程式碼,Exchange和Queue新建成功,訊息‘durable exchange test’也被正確地投遞到了佇列中:
此時重啟下RabbitMQ服務,會發現Exchange丟失了:
修改下程式碼,將durable引數設定為ture:
// 建立一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此時執行完程式碼,然後重啟下RabbitMQ服務,會發現Exchange不再丟失:
4. 佇列(Queue)的持久化
細心的網友可能會發現,雖然現在重啟RabbitMQ服務後,Exchange不丟失了,但是佇列和訊息丟失了,那麼如何解決佇列不丟失呢?答案也是設定durable引數。
durable:設定是否持久化。為true則設定佇列為持久化。
持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊。
簡單修改下上面宣告Queue的程式碼,將durable引數設定為true:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
此時呼叫的過載方法如下:
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
validateQueueNameLength(queue);
return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}
執行程式碼,然後重啟RabbitMQ服務,會發現佇列現在不丟失了:
5. 訊息(Message)的持久化
雖然現在RabbitMQ重啟後,Exchange和Queue都不丟失了,但是儲存在Queue裡的訊息卻仍然會丟失,那麼如何保證訊息不丟失呢?答案是設定訊息的投遞模式為2,即代表持久化。
修改傳送訊息的程式碼為:
// 傳送訊息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
呼叫的過載方法為:
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
this.basicPublish(exchange, routingKey, false, props, body);
}
執行程式碼,然後重啟RabbitMQ服務,發現此時Exchange,Queue,訊息都不丟失了:
至此,我們完美的解決了RabbitMQ重啟後,訊息丟失的問題。
最終的程式碼如下,你也可以透過文末的原始碼連結下載本文用到的所有原始碼:
package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線
ConnectionFactory factory = new ConnectionFactory();
// 設定 RabbitMQ 的主機名
factory.setHost("localhost");
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個通道
Channel channel = connection.createChannel();
// 建立一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 傳送訊息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
6. 注意事項
1)理論上可以將所有的訊息都設定為持久化,但是這樣會嚴重影響RabbitMQ的效能。因為寫入磁碟的速度比寫入記憶體的速度慢得不止一點點。對於可靠性不是那麼高的訊息可以不採用持久化處理以提高整體的吞吐量。在選擇是否要將訊息持久化時,需要在可靠性和吞吐量之間做一個權衡。
2)將交換器、佇列、訊息都設定了持久化之後仍然不能百分之百保證資料不丟失,因為當持久化的訊息正確存入RabbitMQ之後,還需要一段時間(雖然很短,但是不可忽視)才能存入磁碟之中。如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異常情況,訊息還沒來得及落盤,那麼這些訊息將會丟失。
3)單單隻設定佇列持久化,重啟之後訊息會丟失;單單隻設定訊息的持久化,重啟之後佇列消失,繼而訊息也丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。
7. 原始碼
回覆列表
背景當有多個消費者同時收取訊息,且每個消費者在接收訊息的同時,還要處理其它的事情,且會消耗很長的時間。在此過程中可能會出現一些意外,比如訊息接收到一半的時候,一個消費者死掉了。在預設情況下,我們程式建立的訊息佇列以及存放在佇列裡面的訊息,都是非持久化的。當RabbitMQ死掉了或者重啟了,上次建立的佇列、訊息都不會儲存。
這樣就解決了,即使一個消費者出了問題,沒有同步訊息給服務端,還有其他的消費端去消費,保證了訊息不丟的case。
channel.basicAck(envelope.getDeliveryTag(), false);//手動回執訊息
2. 佇列持久化
生產者建立佇列宣告時,修改第二個引數為 true
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
生產者傳送訊息時,修改第三個引數為MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
3. 交換器的持久化
交換器的持久化是在宣告交換器的時候,將durable設定為true。如果交換器不設定持久化,那麼在RabbitMQ服務重啟之後,相關交換器的元資料會丟失,不過訊息不會丟失,只是不能將訊息傳送到這個交換器中。對於長期使用的交換器來說,建議將其置為持久化。
channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.DIRECT,true);
4. 訊息持久化
佇列的持久化能保證其本身的元資料不會因異常情況而丟失,但是並不能保證內部所儲存的訊息不會丟失。要確保訊息不會丟失,需要將其設定為持久化。在傳送訊息的時候,透過將BasicProperties中的屬性deliveryMode(投遞模式)設定為2即可實現訊息的持久化。
channel.basicPublish("exchangeName" , "routingKey", new AMQP.BasicProperties.Builder() .deliveryMode(2) .build(), "ddf".getBytes());
設定叢集映象模式需要採用HA 映象模式佇列
1)同步至所有的 2)同步最多N個機器 3)只同步至符合指定名稱的nodes
總結伺服器端和客戶端都要指定佇列的持久化和訊息的持久化,這樣可以保證RabbitMQ重啟,佇列和訊息也不會丟失。