RocketMQ 的事務訊息並不是用於解決業務分散式事務,當然可以基於MQ實現最終一致性。RocketMQ 實現的事務訊息的基本思路其實就是一次分散式事務一致性的經典實現。
RocketMQ 事務訊息的目的是確保業務與訊息傳送這兩個步驟實現一致性,即要麼都成功,要麼都失敗,請看下面一段虛擬碼:
public Map createOrder(OrderDto orderInfo) { // 開始資料庫事務
// step1:組裝業務
// step2: 呼叫dao方法插入資料庫
orderMapper.insert(order);
//傳送訊息,以便訂單系統的下游能夠根據該訂單資訊,完成其後續流程,例如通知物流系統發貨等
producer.send(msg);
} // 提交事務或回滾事務(例如資料庫發生唯一性約束等錯誤,或者訊息傳送後突然宕機,導致該事務無法提交)
那現在就有可能出現一個問題:訂單落資料庫與訊息傳送這兩個分散式操作(一個操作本地資料庫,另外一個透過網路向訊息中介軟體傳送訊息)如何保證要麼成功,要麼失敗呢?這就是 RocketMQ 引入事務訊息的目的。
RocketMQ 實現事務訊息的原理如下圖所示,其基本原理如下:
1、應用程式在事務內完成相關業務資料落庫後,需要同步呼叫 RocketMQ 訊息傳送介面,傳送狀態為 prepare 的訊息,訊息傳送成功後,RocketMQ 伺服器會回撥 RocketMQ 訊息傳送者的事件監聽程式,記錄訊息的本地事務狀態,該相關標記與本地業務操作同屬一個事務,確保訊息傳送與本地事務的原子性。
2、RocketMQ 在收到型別為 prepare 的訊息時,會首先備份訊息的原主題與原訊息消費佇列,然後將訊息儲存在主題為RMQ_SYS_TRANS_HALF_TOPIC 的訊息消費佇列中。
3、RocketMQ 訊息伺服器開啟一個定時任務,消費 RMQ_SYS_TRANS_HALF_TOPIC 的訊息,向訊息傳送端(應用程式)發起訊息事務狀態回查,應用程式根據儲存的事務狀態回饋訊息伺服器事務的狀態(提交、回滾、未知),如果是提交或回滾,則訊息伺服器提交或回滾訊息,如果是未知,待下一次回查,RocketMQ 允許設定一條訊息的回查間隔與回查次數,如果在超過回查次數後未知訊息的事務狀態,則預設回滾訊息。
RocketMQ 的事務訊息並不是用於解決業務分散式事務,當然可以基於MQ實現最終一致性。RocketMQ 實現的事務訊息的基本思路其實就是一次分散式事務一致性的經典實現。
RocketMQ 事務訊息的目的是確保業務與訊息傳送這兩個步驟實現一致性,即要麼都成功,要麼都失敗,請看下面一段虛擬碼:
public Map createOrder(OrderDto orderInfo) { // 開始資料庫事務
// step1:組裝業務
// step2: 呼叫dao方法插入資料庫
orderMapper.insert(order);
//傳送訊息,以便訂單系統的下游能夠根據該訂單資訊,完成其後續流程,例如通知物流系統發貨等
producer.send(msg);
} // 提交事務或回滾事務(例如資料庫發生唯一性約束等錯誤,或者訊息傳送後突然宕機,導致該事務無法提交)
那現在就有可能出現一個問題:訂單落資料庫與訊息傳送這兩個分散式操作(一個操作本地資料庫,另外一個透過網路向訊息中介軟體傳送訊息)如何保證要麼成功,要麼失敗呢?這就是 RocketMQ 引入事務訊息的目的。
RocketMQ 實現事務訊息的原理如下圖所示,其基本原理如下:
1、應用程式在事務內完成相關業務資料落庫後,需要同步呼叫 RocketMQ 訊息傳送介面,傳送狀態為 prepare 的訊息,訊息傳送成功後,RocketMQ 伺服器會回撥 RocketMQ 訊息傳送者的事件監聽程式,記錄訊息的本地事務狀態,該相關標記與本地業務操作同屬一個事務,確保訊息傳送與本地事務的原子性。
2、RocketMQ 在收到型別為 prepare 的訊息時,會首先備份訊息的原主題與原訊息消費佇列,然後將訊息儲存在主題為RMQ_SYS_TRANS_HALF_TOPIC 的訊息消費佇列中。
3、RocketMQ 訊息伺服器開啟一個定時任務,消費 RMQ_SYS_TRANS_HALF_TOPIC 的訊息,向訊息傳送端(應用程式)發起訊息事務狀態回查,應用程式根據儲存的事務狀態回饋訊息伺服器事務的狀態(提交、回滾、未知),如果是提交或回滾,則訊息伺服器提交或回滾訊息,如果是未知,待下一次回查,RocketMQ 允許設定一條訊息的回查間隔與回查次數,如果在超過回查次數後未知訊息的事務狀態,則預設回滾訊息。