一、分散式事務的概念1,什麼是事務
事務可以看做是一次大的活動,它由不同的小活動組成,這些活動要麼全部成功,要麼全部失敗。
2,本地事務資料庫事務的四大特性 ACID:
A(Atomic): 原子性 ,構成事務的所有操作,要麼都執行完成,要麼全部不執行,不可能出現部分成功部分失敗的情況。C(Consistency): 一致性 ,在事務執行前後,資料庫的一致性約束沒有被破壞。比如:張三向李四轉100元,轉賬前和轉賬後的資料是正確狀態這叫一致性,如果出現張三轉出100元,李四賬戶沒有增加100元這就出現了資料錯誤,就沒有達到一致性。I(Isolation): 隔離性 ,資料庫中的事務一般都是併發的,隔離性是指併發的兩個事務的執行互不干擾,一個事務不能看到其他事務執行過程的中間狀態。透過配置事務隔離級別可以避髒讀、重複讀等問題。D(Durability): 永續性 ,事務完成之後,該事務對資料的更改會被持久化到資料庫,且不會被回滾。資料庫事務在實現時會將一次事務涉及的所有操作全部納入到一個不可分割的執行單元,該執行單元中的所有操作要麼都成功,要麼都失敗,只要其中任一操作執行失敗,都將導致整個事務的回滾。
3,分散式事務分散式系統會把一個應用系統拆分為可獨立部署的多個服務,因此需要服務與服務之間遠端協作才能完成事務操作,這種分散式系統環境下 由不同的服務之間透過網路遠端協作完成事務稱之為分散式事務 ,例如使用者註冊送積分事務、建立訂單減庫存事務,銀行轉賬事務等都是分散式事務。
4,分散式事務產生的場景典型的場景就是微服務架構,微服務之間透過 遠端呼叫完成事務 操作。比如:訂單微服務和庫存微服務,下單的同時訂單微服務請求庫存微服務減庫存。 簡言之: 跨JVM程序產生分散式事務 。單體系統訪問多個數據庫例項, 跨資料庫例項產生分散式事務 。多服務訪問同一個資料庫例項 ,比如:訂單微服務和庫存微服務即使訪問同一個資料庫也會產生分散式事務,原因就是 跨JVM程序 ,兩個微服務持有了不同的資料庫連結進行資料庫操作,此時產生分散式事務。二、分散式事務基礎理論1,CAP理論a)概念CAP是 Consistency、Availability、Partition tolerance三個詞語的縮寫,分別表示一致性、可用性、分割槽容忍性。
b)組合方式在所有分散式事務場景中 不會同時具備CAP三個特性,因為在具備了P的前提下C和A是不能共存的 。
AP:放棄一致性,追求分割槽容忍性和可用性。這是 很多分散式系統設計時的選擇 。Eureka叢集就是採用的AP設計思想。CP:放棄可用性,追求一致性和分割槽容錯性。zookeeper叢集。CA:放棄分割槽容忍性,即不進行分割槽,不考慮由於網路不通或結點掛掉的問題,則可以實現一致性和可用性。那麼系統將不是一個標準的分散式系統,我們最常用的關係型資料就滿足了CA。c)總結CAP是一個已經被證實的理論:一個分散式系統最多隻能同時滿足 一致性(Consistency)、可用性(Availability)和分割槽容忍性(Partition tolerance)這三項中的兩項 。它可以作為我們進行架構設計、技術選型的考量標準。對於多數大型網際網路應用的場景,結點眾多、部署分散,而且現在的叢集規模越來越大,所以節點故障、網路故障是常態,而且要保證服務可用性達到N個9(99.99..%),並要達到良好的響應效能來提高使用者體驗,因此一般都會做出如下選擇: 保證P和A,捨棄C強一致,保證最終一致性 。
2,BASE理論a)強一致性與最終一致性強一致性:CAP中的一致性要求在任何時間查詢每個結點資料都必須一致,它強調的是強一致性。最終一致性:允許可以 在一段時間內每個結點的資料不一致 ,但是經過一段時間每個結點的資料必須一致,它強調的是 最終資料的一致性 。b)概念BASE 是 Basically Available(基本可用)、Soft state(軟狀態)和 Eventually consistent (最終一致性) 三個短語的縮寫。BASE理論是對CAP中AP的一個擴充套件,透過犧牲強一致性來獲得可用性,當出現故障允許部分不可用但要保證核心功能可用,允許資料在一段時間內是不一致的,但最終達到一致狀態。滿足BASE理論的事務,我們稱之為“ 柔性事務 ”。
基本可用 :分散式系統在出現故障時,允許損失部分可用功能, 保證核心功能可用 。如,電商網站交易付款出現問題了,商品依然可以正常瀏覽。軟狀態 :由於不要求強一致性,所以BASE允許系統中 存在中間狀態 (也叫軟狀態),這個狀態不影響系統可用性,如訂單的 “支付中”、“資料同步中” 等狀態,待資料最終一致後狀態改為“成功”狀態。最終一致性 :最終一致是指 經過一段時間後,所有節點資料都將會達到一致 。如訂單的"支付中"狀態,最終會變為“支付成功”或者"支付失敗",使訂單狀態與實際交易結果達成一致,但需要一定時間的延遲、等待。三、解決方案之2PC1,什麼是2PC2PC即兩階段提交協議,是將整個事務流程分為兩個階段,準備階段(Prepare phase)、提交階段(commit phase),2是指兩個階段,P是指準備階段,C是指提交階段。
準備階段(Prepare phase): 事務管理器給每個參與者傳送Prepare訊息 ,每個資料庫參與者在本地執行事務,並寫本地的Undo/Redo日誌,此時事務沒有提交。(Undo日誌是記錄修改前的資料,用於資料庫回滾,Redo日誌是記錄修改後的資料,用於提交事務後寫入資料檔案)提交階段(commit phase):如果事務管理器收到了參與者的執行失敗或者超時訊息時,直接給每個參與者傳送回滾(Rollback)訊息;否則,傳送提交(Commit)訊息;參與者根據事務管理器的指令執行提交或者回滾操作,並釋放事務處理過程中使用的鎖資源。注意:必須在最後階段釋放鎖資源。成功情況:
失敗情況:
2,解決方案之XA2PC的 傳統方案是在資料庫層面 實現的,如Oracle、MySQL都支援2PC協議,為了統一標準減少行業內不必要的對接成本,需要制定標準化的處理模型及介面標準,國際開放標準組織Open Group定義了分散式事務處理模型DTP(Distributed Transaction Processing Reference Model)。
整個2PC的事務流程涉及到三個角色AP、RM、TM。AP指的是使用2PC分散式事務的 應用程式;RM指的是 資源管理器 ,它控制著分支事務;TM指的是 事務管理器 ,它控制著整個全域性事務。
1)在 準備階段 RM執行實際的業務操作,但不提交事務,資源鎖定;
2)在 提交階段 TM會接受RM在準備階段的執行回覆,只要有任一個RM執行失敗,TM會通知所有RM執行回滾操作,否則,TM將會通知所有RM提交該事務。提交階段結束資源鎖釋放。
XA方案的問題 :1、需要本地資料庫支援XA協議。2、資源鎖需要等到兩個階段結束才釋放,效能較差。
3,解決方案之Seataa)seata的設計思想Seata的設計目標其一是對業務無侵入,因此從業務無侵入的2PC方案著手,在傳統2PC的基礎上演進,並解決2PC方案面臨的問題。
Seata把 一個分散式事務理解成一個包含了若干分支事務的全域性事務 。全域性事務的職責是協調其下管轄的分支事務達成一致,要麼一起成功提交,要麼一起失敗回滾。此外,通常分支事務本身就是一個關係資料庫的本地事務,下圖是全域性事務與分支事務的關係圖:
與 傳統2PC 的模型類似,Seata定義了3個元件來協議分散式事務的處理過程:
Transaction Coordinator (TC): 事務協調器 ,它是獨立的中介軟體,需要 獨立部署 執行,它維護全域性事務的執行狀態,接收TM指令發起全域性事務的提交與回滾,負責與RM通訊協調各個分支事務的提交或回滾。Transaction Manager (TM): 事務管理器 ,TM需要嵌入應用程式中工作,它負責 開啟一個全域性事務 ,並最終向TC發起全域性提交或全域性回滾的指令。Resource Manager (RM): 控制分支事務 ,負責分支註冊、狀態彙報,並接收事務協調器TC的指令,驅動分支(本地)事務的提交和回滾。b)Seata的執行流程使用者服務的 TM 向 TC 申請開啟一個全域性事務 ,全域性事務建立成功並 生成一個全域性唯一的XID。使用者服務的 RM 向 TC 註冊 分支事務 ,該分支事務在使用者服務執行新增使用者邏輯,並將其納入 XID 對應全域性事務的管轄 。使用者服務執行分支事務,向用戶表插入一條記錄。邏輯執行到遠端呼叫積分服務時( XID 在微服務呼叫鏈路的上下文中傳播 )。積分服務的RM 向 TC 註冊分支事務,該分支事務執行增加積分的邏輯,並將其納入 XID 對應全域性事務的管轄。積分服務執行分支事務,向積分記錄表插入一條記錄,執行完畢後,返回使用者服務。使用者服務分支事務執行完畢。TM 向 TC 發起針對 XID 的全域性提交或回滾決議 。TC 排程 XID 下管轄的全部分支事務 完成提交或回滾請求 。 c)Seata的具體實現詳情見: Spring Cloud Alibaba Seata
4,Seata與傳統2PC架構層次方面, 傳統2PC 方案的 RM 實際上是在 資料庫層 ,RM 本質上就是資料庫自身,透過 XA 協議實現,而 Seata的 RM 是以jar包的形式作為中介軟體層部署 在應用程式這一側的。兩階段提交方面,傳統2PC無論第二階段的決議是commit還是rollback, 事務性資源的鎖都要保持到Phase2完成才釋放 。而 Seata的做法是在Phase1 就將本地事務提交 ,這樣就可以省去Phase2持鎖的時間,整體 提高效率 。 四、解決方案之TCC1,什麼是TCCTCC是Try、Confirm、Cancel三個詞語的縮寫,TCC要求每個分支事務實現三個操作:預處理Try、確認Confirm、撤銷Cancel。Try 操作做業務檢查及資源預留 , Confirm做業務確認操作 , Cancel實現一個與Try相反的操作即回滾操作 。TM首先發起所有的分支事務的try操作,任何一個分支事務的try操作執行失敗,TM將會發起所有分支事務的Cancel操作,若try操作全部成功,TM將會發起所有分支事務的Confifirm操作,其中Confirm/Cancel操作若執行失敗,TM會進行重試。
成功情況:
失敗情況:
TCC分為三個階段:
Try 階段是做 業務檢查(一致性)及資源預留(隔離) ,此階段僅是一個初步操作,它和後續的Confirm 一起才能真正構成一個完整的業務邏輯。Confirm 階段是做 確認提交 ,Try階段所有分支事務執行成功後開始執行 Confirm。通常情況下,採用TCC則認為 Confifirm階段是不會出錯的。即: 只要Try成功,Confirm一定成功 。若Confirm階段真的出錯了,需引入重試機制或人工處理。 Cancel 階段是在業務執行錯誤 需要回滾的狀態下執行分支事務的業務取消 ,預留 資源釋放。通常情況下,採用TCC則 認為Cancel階段也是一定成功 的。若Cancel階段真的出錯了,需引入重試機制或人工處理。2,TCC解決方案
框架名稱 |
Github地址 |
tcc-transaction |
https://github.com/changmingxie/tcc-transaction |
Hmily | https://github.com/yu199195/hmily |
ByteTCC |
https://github.com/liuyangming/ByteTCC |
EasyTransaction |
https://github.com/QNJR-GROUP/EasyTransaction |
在沒有呼叫 TCC 資源 Try 方法的情況下,呼叫了二階段的 Cancel 方法,Cancel 方法需要 識別出這是一個空回滾 ,然後直接返回成功。
出現原因:是當一個分支事務所在服務宕機或網路異常,分支事務呼叫記錄為失敗,這個時候其實是沒有執行Try階段,當故障恢復後,分散式事務進行回滾則會呼叫二階段的Cancel方法,從而形成空回滾。
解決方法:識別出這個空回滾。需要知道一階段是否執行,如果執行了,那就是正常回滾;如果沒執行,那就是空回滾。前面已經說過TM在發起全域性事務時生成全域性事務記錄,全域性事務ID貫穿整個分散式事務呼叫鏈條。再額外增加一張分支事務記錄表,其中有全域性事務 ID 和分支事務 ID,第一階段 Try 方法裡會插入一條記錄,表示一階段執行了。
//在cancel中cancel空回滾處理,如果try沒有執行,cancel不允許執行if(accountInfoDao.isExistTry(transId)<=0){ log.info("bank1 空回滾處理,try沒有執行,不允許cancel執行,xid:{}",transId); return ;}
b)冪等為了保證TCC二階段提交重試機制不會引發資料不一致,要求 TCC 的二階段 Try、Confirm 和 Cancel 介面保證冪等,這樣不會重複使用或者釋放資源。如果冪等控制沒有做好,很有可能導致資料不一致等嚴重問題。
//當前是在try中進行冪等判斷 判斷local_try_log表中是否有try日誌記錄,如果有則不再執行if(accountInfoDao.isExistTry(transId)>0){ log.info("bank1 try 已經執行,無需重複執行,xid:{}",transId); return ;}
c)懸掛
懸掛就是對於一個分散式事務,其二階段 Cancel 介面比 Try 介面先執行。
出現原因:RPC 呼叫分支事務try時,先註冊分支事務,再執行RPC呼叫,如果此時 RPC 呼叫的網路發生擁堵,通常 RPC 呼叫是有超時時間的, RPC 超時 以後,TM就會通知RM 回滾 該分散式事務,可能回滾完,RPC 請求才到達參與者真正執行,而一個 Try 方法預留的業務資源。
解決思路:如果二階段執行完成,那一階段就不能再繼續執行。在執行一階段事務時判斷在該全域性事務下, “分支事務記錄”表中是否已經有二階段事務記錄 ,如果有則不執行Try。
//try懸掛處理,如果cancel、confirm有一個已經執行了,try不再執行if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){ log.info("bank1 try懸掛處理 cancel或confirm已經執行,不允許執行try,xid:{}",transId); return ;}
4,Hmily專案原始碼: cloud-dtx-tcc
a)匯入資料庫sql檔案下載地址為: dtx-tcc-sql
b)工程配置涉及到分散式事務的工程均需要的配置
org: dromara: hmily: serializer: kryo recoverDelayTime: 30 retryMax: 30 scheduledDelay: 30 scheduledThreadMax: 10 repositorySupport: db #對於發起方的時候,把此屬性設定為true。參與方為false。 started: true hmilyDbConfig: driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/hmily?useUnicode=true username: root password: 123456
注入hmily的配置Bean
@Beanpublic HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer")); hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime"))); hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax"))); hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay"))); hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax"))); hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport")); hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started"))); HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName")); hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password")); hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig); return hmilyTransactionBootstrap;}
啟動類上添加註解
@ComponentScan({"org.dromara.hmily"})
c)呼叫方(bank1)實現
程式碼實現: AccountInfoServiceImpl
try: try冪等校驗 try懸掛處理 檢查餘額是夠扣減金額 扣減金額confirm: 空cancel cancel冪等校驗 cancel空回滾處理 增加可用餘額
注意 :遠端呼叫bank2時,在feign呼叫的介面上加註解 @Hmily
d)參與方(bank2)實現程式碼實現: AccountInfoServiceImpl
try: 空confirm: confirm冪等校驗 正式增加金額cancel: 空
e)小結
如果拿TCC事務的處理流程與2PC兩階段提交做比較, 2PC通常都是在跨庫的DB層面 ,而 TCC則在應用層面的處理 ,需要透過業務邏輯來實現。這種分散式事務的實現方式的 優勢 在於,可以讓應用自己定義資料操作的粒度,使得 降低鎖衝突、提高吞吐量 成為可能。
而 不足之處 則在於對應用的 侵入性非常強 ,業務邏輯的每個分支都需要實現try、confirm、cancel三個操作。此外,其 實現難度也比較大 ,需要按照網路狀態、系統故障等不同的失敗原因實現不同的回滾策略。
五、解決方案之可靠訊息最終一致性專案原始碼: cloud-dtx-txmsg
1,什麼是可靠訊息最終一致性可靠訊息最終一致性方案是指當 事務發起方執行完成本地事務後併發出一條訊息 , 事務參與方(訊息消費者)一定能夠接收訊息並處理事務成功 ,此方案強調的是隻要訊息發給事務參與方最終事務要達到一致。
可靠訊息需要解決的問題:
本地事務與訊息傳送的原子性問題//先發訊息如果資料庫操作錯誤,訊息已經發送 begin transaction; //1.傳送MQ //2.資料庫操作 commit transation; //如果資料庫超時,此時資料庫回滾,但是訊息可能也已經發送 begin transaction; //1.資料庫操作 //2.傳送MQ commit transation;事務參與方接受訊息的可靠性事務參與方必須能夠從訊息佇列接收到訊息,如果接收訊息失敗可以重複接收訊息。訊息重複消費的問題由於網路2的存在,若某一個消費節點超時但是消費成功,此時訊息中介軟體會重複投遞此訊息,就導致了訊息的重 復消費。 要解決訊息重複消費的問題就要實現事務參與方的方法冪等性。2,RocketMQ事務訊息方案Producer 傳送事務訊息 :Producer (MQ傳送方)傳送事務訊息至MQ Server,MQ Server將訊息狀態標記為Prepared( 預備狀態 ),注意此時這條訊息消費者(MQ訂閱方)是 無法消費 到的。 MQ Server 迴應訊息 傳送成功 :MQ Server接收到Producer 傳送給的訊息則迴應傳送成功表示MQ已接收到訊息。 Producer 執行 本地事務 :Producer 端執行業務程式碼邏輯,透過 本地資料庫事務控制 。 訊息投遞 :若Producer 本地事務 執行成功 則自動向MQServer傳送 commit 訊息,此時MQ訂閱方(積分服務)即正常消費訊息;若Producer 本地事務 執行失敗 則自動向MQServer傳送 rollback 訊息,MQ Server接收到rollback訊息後 將刪除”增加積分訊息“ 。 MQ訂閱方(積分服務)消費訊息, 消費成功則向MQ迴應ack ,否則將重複接收訊息。這裡ack預設自動迴應,即程式執行正常則自動迴應ack。 事務回查 :如果執行Producer端 本地事務過程中,執行端掛掉,或者超時 ,MQ Server將會不停的詢問 同組的其他 Producer來獲取事務執行狀態 ,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞訊息。 3,RocketMQ實現可靠訊息最終一致性事務a)SQLbank1
CREATE DATABASE `bank1` CHARACTERSET 'utf8' COLLATE 'utf8_general_ci';DROP TABLEIF EXISTS `account_info`;CREATE TABLE `account_info` ( `id` BIGINT (20) NOT NULL AUTO_INCREMENT, `account_name` VARCHAR (100) CHARACTERSET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` VARCHAR (100) CHARACTERSET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號', `account_password` VARCHAR (100) CHARACTERSET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼', `account_balance` DOUBLE NULL DEFAULT NULL COMMENT '帳戶餘額', PRIMARY KEY (`id`) USING BTREE) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTERSET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;INSERT INTO `account_info`VALUES ( 2, '張三的賬戶', '1', '', 10000 );DROP TABLEIF EXISTS `de_duplication`;CREATE TABLE `de_duplication` ( `tx_no` VARCHAR (64) COLLATE utf8_bin NOT NULL, `create_time` datetime (0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE = INNODB CHARACTERSET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;View Code
bank2
CREATE DATABASE `bank2` CHARACTERSET 'utf8' COLLATE 'utf8_general_ci';DROP TABLEIF EXISTS `account_info`;CREATE TABLE `account_info` ( `id` BIGINT (20) NOT NULL AUTO_INCREMENT, `account_name` VARCHAR (100) CHARACTERSET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` VARCHAR (100) CHARACTERSET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號', `account_password` VARCHAR (100) CHARACTERSET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼', `account_balance` DOUBLE NULL DEFAULT NULL COMMENT '帳戶餘額', PRIMARY KEY (`id`) USING BTREE) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTERSET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;INSERT INTO `account_info`VALUES ( 3, '李四的賬戶', '2', NULL, 0 );CREATE TABLE `de_duplication` ( `tx_no` VARCHAR (64) COLLATE utf8_bin NOT NULL, `create_time` datetime (0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE = INNODB CHARACTERSET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;View Code
b)安裝RocketMQc)工程配置maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version></dependency>
properties配置
rocketmq.producer.group = producer_bank2rocketmq.name‐server = 127.0.0.1:9876
d)bank1
Service: AccountInfoServiceImpl
//兩個方法//1,向mq傳送轉賬訊息//2,更新賬戶,扣減金額 (透過事務id保證冪等性)
Controller: AccountInfoController
//生成事務id,呼叫service的發訊息介面
message: ProducerTxmsgListener
//兩個方法executeLocalTransaction和checkLocalTransaction//事務訊息傳送後的回撥方法。此時保證本地事務,呼叫Service扣減金額同時將訊息改為COMMIT(可消費狀態),如果捕獲異常,將訊息改為ROLLBACK回滾//事務回查。查詢是否在呼叫方已經處理,如果已經處理需修改訊息為COMMIT可消費,否則就是UNKOWN狀態。
e)bank2Service: AccountInfoServiceImpl
//更新賬戶bank2,增加金額。(透過事務id保證冪等性)
message: TxmsgConsumer
//監聽bank1傳送的訊息topic,呼叫Service增加金額
4,總結
可靠訊息最終一致性就是 保證訊息從生產方經過訊息中介軟體傳遞到消費方 的一致性,本案例使用了RocketMQ作為訊息中介軟體,RocketMQ主要解決了兩個功能:
本地事務與訊息傳送的原子性問題。事務參與方接收訊息的可靠性。可靠訊息最終一致性事務適合 執行週期長且實時性要求不高的場景 。引入訊息機制後,同步的事務操作變為基於訊息執行的 非同步 操作, 避免了分散式事務中的同步阻塞操作的影響,並實現了兩個服務的 解耦 。
六、解決方案之最大努力通知原始碼: cloud-dtx-notify
1,什麼是最大努力通知發起通知方透過一定的機制 最大努力將業務處理結果通知到接收方 。
有一定的 訊息重複通知機制 。因為接收通知方可能沒有接收到通知,此時要有一定的機制對訊息重複通知。訊息校對機制 。如果盡最大努力也沒有通知到接收方,或者接收方消費訊息後要再次消費,此時可 由接收方主動向通知方查詢訊息 資訊來滿足需求。2,最大努力通知與可靠訊息一致性的異同思想不同:可靠訊息一致性,發起 通知方需要保證將訊息發出去 ,並且將訊息發到接收通知方,訊息的可靠性關鍵由發起通知方來保證。最大努力通知,發起通知方盡最大的努力將業務處理結果通知為接收通知方,但是可能訊息接收不到,此時需要接收通知方主動呼叫發起通知方的介面查詢業務處理結果, 通知的可靠性關鍵在接收通知方 。業務場景不同:可靠訊息一致性關注的是 交易過程的事務一致 ,以非同步的方式完成交易。最大努力通知關注的是 交易後的通知事務 ,即將交易結果可靠的通知出去。技術解決方向不同:可靠訊息一致性要解決 訊息從發出到接收的一致性 ,即訊息發出並且被接收到;最大努力通知無法保證訊息從發出到接收的一致性,只提供訊息接收的可靠性機制。可靠機制是, 最大努力的將訊息通知給接收方 ,當訊息無法被接收方接收時,由 接收方主動查詢訊息 (業務處理結果)。3,解決方案a)解決方案一:
具體流程:
發起通知方將通知發給MQ。使用普通訊息機制將通知發給MQ。接收通知方監聽 MQ。接收通知方接收訊息,業務處理完成迴應ack。 接收通知方若 沒有迴應ack則MQ會重複通知 。 MQ會按照間隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知間隔 (如果MQ採用rocketMq,在broker中可進行配置),直到達到通知要求的時間視窗上限。 接收通知方可透過訊息校對介面來校對訊息的一致性。b)解決方案二:
與方案1不同的是 應用程式向接收通知方傳送通知 ,如下圖:
具體流程:
發起通知方將通知發給MQ:使用可靠訊息一致方案中的事務訊息保證 本地事務與訊息的原子性 ,最終將通知先發給MQ。 通知程式監聽 MQ,接收MQ的訊息。 通知程式若沒有迴應ack則MQ會重複通知。 通知程式 透過網際網路介面協議(如http、webservice) 呼叫接收通知方案介面 ,完成通知。 通知程式呼叫接收通知方案介面成功就表示通知成功,即消費MQ訊息成功,MQ將不再向通知程式投遞通知訊息。接收通知方可透過訊息校對介面來校對訊息的一致性。c)兩種方案比較
方案1中接收通知方與MQ介面,即接收通知方案監聽 MQ,此方案主要 應用與內部應用之間的通知 。方案2中由通知程式與MQ介面,通知程式監聽MQ,收到MQ的訊息後由通知程式透過網際網路介面協議呼叫接收通知方。此方案主要應用於 外部應用之間的通知 ,例如支付寶、微信的支付結果通知。 4,最大努力通知實現a)sqlbank1_pay.sql
CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1_pay` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `bank1_pay`;/*Table structure for table `account_pay` */DROP TABLE IF EXISTS `account_pay`;CREATE TABLE `account_pay` ( `id` varchar(64) COLLATE utf8_bin NOT NULL, `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '賬號', `pay_amount` double DEFAULT NULL COMMENT '充值餘額', `result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值結果:success,fail', PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;/*Data for the table `account_pay` */insert into `account_pay`(`id`,`account_no`,`pay_amount`,`result`) values ('5678ef0a-1ff0-4cfd-97ac-640d749d596f','1',2,'success'),('7d7d469c-f100-4066-b927-014c0c3aa010','1',2,'success'),('947fafad-c19c-46bc-b0f0-43703a124fd4','1',2,'success');View Code
bank1.sql
CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `bank1`;/*Table structure for table `account_info` */DROP TABLE IF EXISTS `account_info`;CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '戶主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帳戶密碼', `account_balance` double DEFAULT NULL COMMENT '帳戶餘額', PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;/*Data for the table `account_info` */insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'張三','1',NULL,1000);/*Table structure for table `de_duplication` */DROP TABLE IF EXISTS `de_duplication`;CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;View Code
b)工程配置
基本配置同可靠訊息一致性
c)pay支付方
Service: AccountPayServiceImpl
//兩個方法//1,插入充值記錄。生成事務id,將事務id和充值資訊傳送給MQ佇列//2,查詢充值記錄。提供給呼叫方查詢。
Controller: AccountPayController
//直接呼叫Service中的方法插入充值記錄
d)bank1
Service: AccountInfoServiceImpl
//兩個方法//1,更新賬戶金額。根據事務id保證更新的冪等性。//2,遠端呼叫pay的查詢充值結果。如果發現狀態改變同時更新當前賬號情況。
message: NotifyMsgListener
//監聽訊息。呼叫Service的更新賬戶金額,冪等更新。
Controller: AccountInfoController
//呼叫Service的查詢充值結果
5,總結
最大努力通知方案是分散式事務中 對一致性要求最低的一種 ,適用於一些最終一致性時間敏感度低的業務;最大努力通知方案需要實現如下功能:
訊息重複通知機制。訊息校對機制。 主動呼叫介面查詢並修改。七、四種分散式事務對比2PC: 最大的詬病是一個 阻塞協議 。RM在執行分支事務後需要等待TM的決定,此時服務會阻塞並鎖定資源。由於其阻塞機制和最差時間複雜度高, 因此,這種設計不能適應隨著事務涉及的服務數量增加而擴充套件的需要, 很難用於併發較高以及子事務生命週期較長 (long-running transactions) 的分散式服務 中。
TCC :如果拿TCC事務的處理流程與2PC兩階段提交做比較, 2PC 通常都是在 跨庫的DB層面,而 TCC則在應用層面的處理 ,需要透過業務邏輯來實現。這種分散式事務的實現方式的優勢在於,可以讓應用自己定義資料操作的粒度,使得 降低鎖衝突、提高吞吐量 成為可能。而不足之處則在於對應用的侵入性非常強,業務邏輯的每個分支都需要實現try、confirm、cancel三個操作。此外,其 實現難度也比較大 ,需要按照網路狀態、系統故障等不同的失敗原因實現不同的回滾策略。典型的使用場景:登入送優惠券等。
可靠訊息最終一致性事務: 適合 執行週期長且實時性要求不高的場景 。引入訊息機制後,同步的事務操作變為基於訊息執行的非同步操作, 避免了分散式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。典型的使用場景: 註冊送積分,登入送優惠券 等。
最大努力通知: 是分散式事務中要求最低的一種,適用於一些最終一致性時間敏感度低的業務;允許發起通知方處理業務失敗,在接收通知方收到通知後積極進行失敗處理,無論發起通知方如何處理結果都會不影響到接收通知方的後續處理;發起通知方需提供查詢執行情況介面,用於 接收通知方校對結果 。典型的使用場景: 銀行通知、支付結果 通知等。
原文連結:http://www.cnblogs.com/bbgs-xc/p/14456917.html