說明
Java生鮮電商平臺中由於採用了微服務架構進行業務的處理,買家,賣家,配送,銷售,供應商等進行服務化,但是不可避免存在分散式事務的問題。
業界有很多的解決方案,對此我相信大家都百度一下子就有很多,但是我巨人大哥想說的是:微服務架構中應當儘量避免分散式事務。
微服務落地存在的問題雖然微服務現在如火如荼,但對其實踐其實仍處於探索階段。很多中小型網際網路公司,鑑於經驗、技術實力等問題,微服務落地比較困難。
如著名架構師Chris Richardson所言,目前存在的主要困難有如下幾方面:
對於第三個問題,隨著docker、devops技術的發展以及各公有云paas平臺自動化運維工具的推出,微服務的測試、部署與運維會變得越來越容易。
而對於第二個問題,現在還沒有通用方案很好的解決微服務產生的事務問題。分散式事務已經成為微服務落地最大的阻礙,也是最具挑戰性的一個技術難題。
ACID原子性(Atomicity): 一個事務的所有系列操作步驟被看成是一個動作,所有的步驟要麼全部完成要麼一個也不會完成,如果事務過程中任何一點失敗,將要被改變的資料庫記錄就不會被真正被改變。一致性(Consistency): 資料庫的約束 級聯和觸發機制Trigger都必須滿足事務的一致性。也就是說,透過各種途徑包括外來鍵約束等任何寫入資料庫的資料都是有效的,不能發生表與表之間存在外來鍵約束,但是有資料卻違背這種約束性。所有改變資料庫資料的動作事務必須完成,沒有事務會建立一個無效資料狀態,這是不同於CAP理論的一致性"consistency".隔離性(Isolation): 主要用於實現併發控制, 隔離能夠確保併發執行的事務能夠順序一個接一個執行,透過隔離,一個未完成事務不會影響另外一個未完成事務。永續性(Durability): 一旦一個事務被提交,它應該持久儲存,不會因為和其他操作衝突而取消這個事務。很多人認為這意味著事務是持久在磁碟上,但是規範沒有特別定義這點。一致性理論分散式事務的目的是保障分庫資料一致性,而跨庫事務會遇到各種不可控制的問題,如個別節點永久性宕機,像單機事務一樣的 ACID 是無法奢望的。
另外,業界著名的 CAP 理論也告訴我們,對分散式系統,需要將資料一致性和系統可用性、分割槽容忍性放在天平上一起考慮。
BASE 理論告訴我們:可以透過放棄系統在每個時刻的強一致性來換取系統的可擴充套件性。
CAP 理論在分散式系統中,一致性(Consistency)、可用性(Availability)和分割槽容忍性(Partition Tolerance)3 個要素最多隻能同時滿足兩個,不可兼得。其中,分割槽容忍性又是不可或缺的。
一致性:分散式環境下,多個節點的資料是否強一致。可用性:分散式服務能一直保證可用狀態。當用戶發出一個請求後,服務能在有限時間內返回結果。分割槽容忍性:特指對網路分割槽的容忍性。舉例:Cassandra、Dynamo 等,預設優先選擇 AP,弱化 C;HBase、MongoDB 等,預設優先選擇 CP,弱化 A。
BASE 理論核心思想:
基本可用( Basically Available):指分散式系統在出現故障時,允許損失部分的可用性來保證核心可用;軟狀態( Soft state):指允許分散式系統存在中間狀態,該中間狀態不會影響到系統的整體可用性;最終一致性( Eventual consistency):指分散式系統中的所有副本資料經過一定時間後,最終能夠達到一致的狀態;原子性(A)與永續性(D)必須根本保障;為了可用性、效能與降級服務的需要,只有降低一致性( C ) 與 隔離性( I ) 的要求;酸鹼平衡(ACID-BASE Balance);BASE 是對 CAP 中 AP 的一個擴充套件
一致性模型資料的一致性模型可以分成以下三類:
強一致性:資料更新成功後,任意時刻所有副本中的資料都是一致的,一般採用同步的方式實現。弱一致性:資料更新成功後,系統不承諾立即可以讀到最新寫入的值,也不承諾具體多久之後可以讀到。最終一致性:弱一致性的一種形式,資料更新成功後,系統不承諾立即可以返回最新寫入的值,但是保證最終會返回上一次更新操作的值。分散式系統資料的強一致性、弱一致性和最終一致性可以透過 Quorum NRW 演算法分析。
本地事務在單個數據庫的本地並且限制在單個程序內的事務本地事務不涉及多個數據來源分散式事務典型方案兩階段提交(2PC, Two Phase Commit)方案;本地訊息表 (eBay 事件佇列方案);TCC 補償模式;分類:
兩<typo id="typo-2498" data-origin="階段型" ignoretag="true">階段型</typo>補償型非同步確保型最大努力通知型服務模式:
可查詢操作冪等操作TCC操作可補償操作兩階段提交2PC(強一致性)基於XA協議的兩階段提交:
第一階段是表決階段,所有參與者都將本事務能否成功的資訊反饋發給協調者;第二階段是執行階段,協調者根據所有參與者的反饋,通知所有參與者,步調一致地在所有分支上提交或者回滾;缺點:
單點問題:事務管理器在整個流程中扮演的角色很關鍵,如果其宕機,比如在第一階段已經完成,在第二階段正準備提交的時候事務管理器宕機,資源管理器就會一直阻塞,導致資料庫無法使用。同步阻塞:在準備就緒之後,資源管理器中的資源一直處於阻塞,直到提交完成,釋放資源。資料不一致:兩階段提交協議雖然為分散式資料強一致性所設計,但仍然存在資料不一致性的可能。比如:在第二階段中,假設協調者發出了事務 Commit 的通知,但是因為網路問題該通知僅被一部分參與者所收到並執行了 Commit 操作,其餘的參與者則因為沒有收到通知一直處於阻塞狀態,這時候就產生了資料的不一致性。總的來說,XA 協議比較簡單,成本較低,但是其單點問題,以及不能支援高併發(由於同步阻塞)依然是其最大的弱點。
本地訊息表(最終一致性)eBay 的架構師 Dan Pritchett,曾在一篇解釋 BASE 原理的論文《Base:An Acid Alternative》中提到一個 eBay 分散式系統一致性問題的解決方案。
它的核心思想是將需要分散式處理的任務透過訊息或者日誌的方式來非同步執行,訊息或日誌可以存到本地檔案、資料庫或訊息佇列,再透過業務規則進行失敗重試,它要求各服務的介面是冪等的。
本地訊息表與業務資料表處於同一個資料庫中,這樣就能利用本地事務來保證在對這兩個表的操作滿足事務特性,並且使用了訊息佇列來保證最終一致性。
在分散式事務操作的一方完成寫業務資料的操作之後向本地訊息表傳送一個訊息,本地事務能保證這個訊息一定會被寫入本地訊息表中;之後將本地訊息表中的訊息轉發到 Kafka 等訊息佇列中,如果轉發成功則將訊息從本地訊息表中刪除,否則繼續重新轉發;訊息消費方處理這個訊息,並完成自己的業務邏輯。此時如果本地事務處理成功,表明已經處理成功了,如果處理失敗,那麼就會重試執行。如果是業務上面的失敗,可以給生產方傳送一個業務補償訊息,通知生產方進行回滾等操作;優點: 一種非常經典的實現,避免了分散式事務,實現了最終一致性。
缺點: 訊息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理。
這個方案的核心在於第二階段的重試和冪等執行。失敗後重試,這是一種補償機制,它是能保證系統最終一致的關鍵流程。
可靠訊息的最終一致性程式碼示例表結構
DROP TABLE IF EXISTS `rp_transaction_message`; CREATE TABLE `rp_transaction_message` ( `id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '主鍵ID', `version` INT (11) NOT NULL DEFAULT '0' COMMENT '版本號', `editor` VARCHAR (100) DEFAULT NULL COMMENT '修改者', `creater` VARCHAR (100) DEFAULT NULL COMMENT '建立者', `edit_time` datetime DEFAULT NULL COMMENT '最後修改時間', `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '建立時間', `message_id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '訊息ID', `message_body` LONGTEXT NOT NULL COMMENT '訊息內容', `message_data_type` VARCHAR (50) DEFAULT NULL COMMENT '訊息資料型別', `consumer_queue` VARCHAR (100) NOT NULL DEFAULT '' COMMENT '消費佇列', `message_send_times` SMALLINT (6) NOT NULL DEFAULT '0' COMMENT '訊息重發次數', `areadly_dead` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '是否死亡', `status` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '狀態', `remark` VARCHAR (200) DEFAULT NULL COMMENT '備註', `field1` VARCHAR (200) DEFAULT NULL COMMENT '擴充套件欄位1', `field2` VARCHAR (200) DEFAULT NULL COMMENT '擴充套件欄位2', `field3` VARCHAR (200) DEFAULT NULL COMMENT '擴充套件欄位3', PRIMARY KEY (`id`), KEY `AK_Key_2` (`message_id`)) ENGINE = INNODB DEFAULT CHARSET = utf8; public interface RpTransactionMessageService { /** * 預儲存訊息. */ public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 確認併發送訊息. */ public void confirmAndSendMessage(String messageId) throws MessageBizException; /** * 儲存併發送訊息. */ public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 直接傳送訊息. */ public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 重發訊息. */ public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 根據messageId重發某條訊息. */ public void reSendMessageByMessageId(String messageId) throws MessageBizException; /** * 將訊息標記為死亡訊息. */ public void setMessageToAreadlyDead(String messageId) throws MessageBizException; /** * 根據訊息ID獲取訊息 */ public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException; /** * 根據訊息ID刪除訊息 */ public void deleteMessageByMessageId(String messageId) throws MessageBizException; /** * 重發某個訊息佇列中的全部已死亡的訊息. */ public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException; /** * 獲取分頁資料 */ PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException; }@Service("rpTransactionMessageService")public class RpTransactionMessageServiceImpl implements RpTransactionMessageService { private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class); @Autowired private RpTransactionMessageDao rpTransactionMessageDao; @Autowired private JmsTemplate notifyJmsTemplate; public int saveMessageWaitingConfirm(RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "儲存的訊息為空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "訊息的消費佇列不能為空 "); } message.setEditTime(new Date()); message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name()); message.setAreadlyDead(PublicEnum.NO.name()); message.setMessageSendTimes(0); return rpTransactionMessageDao.insert(message); } public void confirmAndSendMessage(String messageId) { final RpTransactionMessage message = getMessageByMessageId(messageId); if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據訊息id查詢的訊息為空"); } message.setStatus(MessageStatusEnum.SENDING.name()); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public int saveAndSendMessage(final RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "儲存的訊息為空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "訊息的消費佇列不能為空 "); } message.setStatus(MessageStatusEnum.SENDING.name()); message.setAreadlyDead(PublicEnum.NO.name()); message.setMessageSendTimes(0); message.setEditTime(new Date()); int result = rpTransactionMessageDao.insert(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); return result; } public void directSendMessage(final RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "儲存的訊息為空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "訊息的消費佇列不能為空 "); } notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public void reSendMessage(final RpTransactionMessage message) { if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "儲存的訊息為空"); } if (StringUtil.isEmpty(message.getConsumerQueue())) { throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "訊息的消費佇列不能為空 "); } message.addSendTimes(); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public void reSendMessageByMessageId(String messageId) { final RpTransactionMessage message = getMessageByMessageId(messageId); if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據訊息id查詢的訊息為空"); } int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times")); if (message.getMessageSendTimes() >= maxTimes) { message.setAreadlyDead(PublicEnum.YES.name()); } message.setEditTime(new Date()); message.setMessageSendTimes(message.getMessageSendTimes() + 1); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } public void setMessageToAreadlyDead(String messageId) { RpTransactionMessage message = getMessageByMessageId(messageId); if (message == null) { throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據訊息id查詢的訊息為空"); } message.setAreadlyDead(PublicEnum.YES.name()); message.setEditTime(new Date()); rpTransactionMessageDao.update(message); } public RpTransactionMessage getMessageByMessageId(String messageId) { Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("messageId", messageId); return rpTransactionMessageDao.getBy(paramMap); } public void deleteMessageByMessageId(String messageId) { Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("messageId", messageId); rpTransactionMessageDao.delete(paramMap); } @SuppressWarnings("unchecked") public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) { log.info("==>reSendAllDeadMessageByQueueName"); int numPerPage = 1000; if (batchSize > 0 && batchSize < 100) { numPerPage = 100; } else if (batchSize > 100 && batchSize < 5000) { numPerPage = batchSize; } else if (batchSize > 5000) { numPerPage = 5000; } else { numPerPage = 1000; } int pageNum = 1; Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("consumerQueue", queueName); paramMap.put("areadlyDead", PublicEnum.YES.name()); paramMap.put("listPageSortType", "ASC"); Map<String, RpTransactionMessage> messageMap = new HashMap<String, RpTransactionMessage>(); List<Object> recordList = new ArrayList<Object>(); int pageCount = 1; PageBean pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap); recordList = pageBean.getRecordList(); if (recordList == null || recordList.isEmpty()) { log.info("==>recordList is empty"); return; } pageCount = pageBean.getTotalPage(); for (final Object obj : recordList) { final RpTransactionMessage message = (RpTransactionMessage) obj; messageMap.put(message.getMessageId(), message); } for (pageNum = 2; pageNum <= pageCount; pageNum++) { pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap); recordList = pageBean.getRecordList(); if (recordList == null || recordList.isEmpty()) { break; } for (final Object obj : recordList) { final RpTransactionMessage message = (RpTransactionMessage) obj; messageMap.put(message.getMessageId(), message); } } recordList = null; pageBean = null; for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { final RpTransactionMessage message = entry.getValue(); message.setEditTime(new Date()); message.setMessageSendTimes(message.getMessageSendTimes() + 1); rpTransactionMessageDao.update(message); notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue()); notifyJmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.getMessageBody()); } }); } } @SuppressWarnings("unchecked") public PageBean<RpTransactionMessage> listPage(PageParam pageParam, Map<String, Object> paramMap) { return rpTransactionMessageDao.listPage(pageParam, paramMap); } }@Component("messageBiz")public class MessageBiz { private static final Log log = LogFactory.getLog(MessageBiz.class); @Autowired private RpTradePaymentQueryService rpTradePaymentQueryService; @Autowired private RpTransactionMessageService rpTransactionMessageService; /** * 處理[waiting_confirm]狀態的訊息 * @param messages */ public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) { log.debug("開始處理[waiting_confirm]狀態的訊息,總條數[" + messageMap.size() + "]"); // 單條訊息處理(目前該狀態的訊息,消費佇列全部是accounting,如果後期有業務擴充,需做佇列判斷,做對應的業務處理。) for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("開始處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息"); String bankOrderNo = message.getField1(); RpTradePaymentRecord record = rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo); // 如果訂單成功,把訊息改為待處理,併發送訊息 if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) { // 確認併發送訊息 rpTransactionMessageService.confirmAndSendMessage(message.getMessageId()); } else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) { // 訂單狀態是等到支付,可以直接刪除資料 log.debug("訂單沒有支付成功,刪除[waiting_confirm]訊息id[" + message.getMessageId() + "]的訊息"); rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId()); } log.debug("結束處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息"); } catch (Exception e) { log.error("處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息異常:", e); } } } /** * 處理[SENDING]狀態的訊息 * @param messages */ public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("開始處理[SENDING]狀態的訊息,總條數[" + messageMap.size() + "]"); // 根據配置獲取通知間隔時間 Map<Integer, Integer> notifyParam = getSendTime(); // 單條訊息處理 for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("開始處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息"); // 判斷髮送次數 int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times")); log.debug("[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息,已經重新發送的次數[" + message.getMessageSendTimes() + "]"); // 如果超過最大發送次數直接退出 if (maxTimes < message.getMessageSendTimes()) { // 標記為死亡 rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId()); continue; } // 判斷是否達到傳送訊息的時間間隔條件 int reSendTimes = message.getMessageSendTimes(); int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes); long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); long needTime = currentTimeInMillis - times * 60 * 1000; long hasTime = message.getEditTime().getTime(); // 判斷是否達到了可以再次傳送的時間條件 if (hasTime > needTime) { log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]訊息上次傳送時間[" + sdf.format(message.getEditTime()) + "],必須過了[" + times + "]分鐘才可以再發送。"); continue; } // 重新發送訊息 rpTransactionMessageService.reSendMessage(message); log.debug("結束處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息"); } catch (Exception e) { log.error("處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息異常:", e); } } } /** * 根據配置獲取通知間隔時間 * @return */ private Map<Integer, Integer> getSendTime() { Map<Integer, Integer> notifyParam = new HashMap<Integer, Integer>(); notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time"))); notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time"))); notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time"))); notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time"))); notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time"))); return notifyParam; } }public class AccountingMessageListener implements SessionAwareMessageListener<Message> { private static final Log LOG = LogFactory.getLog(AccountingMessageListener.class); /** * 會計佇列模板(由Spring建立並注入進來) */ @Autowired private JmsTemplate notifyJmsTemplate; @Autowired private RpAccountingVoucherService rpAccountingVoucherService; @Autowired private RpTransactionMessageService rpTransactionMessageService; public synchronized void onMessage(Message message, Session session) { RpAccountingVoucher param = null; String strMessage = null; try { ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message; strMessage = objectMessage.getText(); LOG.info("strMessage1 accounting:" + strMessage); param = JSONObject.parseObject(strMessage, RpAccountingVoucher.class); // 這裡轉換成相應的物件還有問題 if (param == null) { LOG.info("param引數為空"); return; } int entryType = param.getEntryType(); double payerChangeAmount = param.getPayerChangeAmount(); String voucherNo = param.getVoucherNo(); String payerAccountNo = param.getPayerAccountNo(); int fromSystem = param.getFromSystem(); int payerAccountType = 0; if (param.getPayerAccountType() != null && !param.getPayerAccountType().equals("")) { payerAccountType = param.getPayerAccountType(); } double payerFee = param.getPayerFee(); String requestNo = param.getRequestNo(); double bankChangeAmount = param.getBankChangeAmount(); double receiverChangeAmount = param.getReceiverChangeAmount(); String receiverAccountNo = param.getReceiverAccountNo(); String bankAccount = param.getBankAccount(); String bankChannelCode = param.getBankChannelCode(); double profit = param.getProfit(); double income = param.getIncome(); double cost = param.getCost(); String bankOrderNo = param.getBankOrderNo(); int receiverAccountType = 0; double payAmount = param.getPayAmount(); if (param.getReceiverAccountType() != null && !param.getReceiverAccountType().equals("")) { receiverAccountType = param.getReceiverAccountType(); } double receiverFee = param.getReceiverFee(); String remark = param.getRemark(); rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, payerChangeAmount, receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo, payerAccountType, payAmount, receiverAccountType, payerFee, receiverFee); //刪除訊息 rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId()); } catch (BizException e) { // 業務異常,不再寫會佇列 LOG.error("==>BizException", e); } catch (Exception e) { // 不明異常不再寫會佇列 LOG.error("==>Exception", e); } } public JmsTemplate getNotifyJmsTemplate() { return notifyJmsTemplate; } public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) { this.notifyJmsTemplate = notifyJmsTemplate; } public RpAccountingVoucherService getRpAccountingVoucherService() { return rpAccountingVoucherService; } public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) { this.rpAccountingVoucherService = rpAccountingVoucherService; } }
與常規MQ的ACK機制對比常規MQ確認機制:
Producer生成訊息併發送給MQ(同步、非同步);MQ接收訊息並將訊息資料持久化到訊息儲存(持久化操作為可選配置);MQ向Producer返回訊息的接收結果(返回值、異常);Consumer監聽並消費MQ中的訊息;Consumer獲取到訊息後執行業務處理;Consumer對已成功消費的訊息向MQ進行ACK確認(確認後的訊息將從MQ中刪除);常規MQ佇列訊息的處理流程無法實現 訊息傳送一致性, 因此直接使用現成的MQ中介軟體產品無法實現可靠訊息最終一致性的分散式事務解決方案
訊息傳送一致性:是指產生訊息的業務動作與訊息傳送的一致。也就是說,如果業務操作成功,那麼由這個業務操作所產生的訊息一定要成功投遞出去(一般是傳送到kafka、rocketmq、rabbitmq等訊息中介軟體中),否則就丟訊息。
下面用虛擬碼進行演示訊息傳送和投遞的不可靠性:
先進行資料庫操作,再發送訊息:
public void test1(){ //1 資料庫操作 //2 傳送MQ訊息}
這種情況下無法保證資料庫操作與傳送訊息的一致性,因為可能資料庫操作成功,傳送訊息失敗。
先發送訊息,再操作資料庫:
public void test1(){ //1 傳送MQ訊息 //2 資料庫操作}
這種情況下無法保證資料庫操作與傳送訊息的一致性,因為可能傳送訊息成功,資料庫操作失敗。
在資料庫事務中,先發送訊息,後操作資料庫:
@Transactionalpublic void test1(){ //1 傳送MQ訊息 //2 資料庫操作}
這裡使用spring 的@Transactional註解,方法裡面的操作都在一個事務中。同樣無法保證一致性,因為傳送訊息成功了,資料庫操作失敗的情況下,資料庫操作是回滾了,但是MQ訊息沒法進行回滾。
在資料庫事務中,先操作資料庫,後傳送訊息:
@Transactionalpublic void test1(){ //1 資料庫操作 //2 傳送MQ訊息}
這種情況下,貌似沒有問題,如果傳送MQ訊息失敗,丟擲異常,事務一定會回滾(加上了@Transactional註解後,spring方法丟擲異常後,會自動進行回滾)。
這只是一個假象,因為傳送MQ訊息可能事實上已經成功,如果是響應超時導致的異常。這個時候,資料庫操作依然回滾,但是MQ訊息實際上已經發送成功,導致不一致。
與訊息傳送一致性流程的對比:
常規MQ佇列訊息的處理流程無法實現訊息傳送一致性;投遞訊息的流程其實就是訊息的消費流程,可細化;TCC (Try-Confirm-Cancel)補償模式(最終一致性)TCC 其實就是採用的補償機制,其核心思想是:針對每個操作,都要註冊一個與其對應的確認和補償(撤銷)操作。
它分為三個階段:
Try 階段主要是對業務系統做檢測及資源預留Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,預設 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。舉例(Bob 要向 Smith 轉賬):
首先在 Try 階段,要先呼叫遠端介面把 Smith 和 Bob 的錢<typo id="typo-25870" data-origin="給" ignoretag="true">給</typo>凍結起來。在 Confirm 階段,執行遠端呼叫的轉賬的操作,轉賬成功進行解凍。如果第2步執行成功,那麼轉賬成功,如果第二步執行失敗,則呼叫遠端凍結介面對應的解凍方法 (Cancel)。優點:跟2PC比起來,實現以及流程相對簡單了一些,但資料的一致性比2PC也要差一些
缺點:缺點還是比較明顯的,在2,3步中都有可能失敗。TCC屬於應用層的一種補償方式,所以需要程式設計師在實現的時候多寫很多補償的程式碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。
可靠訊息最終一致(常用)不要用本地的訊息表了,直接基於MQ來實現事務。比如阿里的RocketMQ就支援訊息事務。
大概流程:
A系統先發送一個prepared訊息到mq,如果這個prepared訊息傳送失敗那麼就直接取消操作別執行了如果這個訊息傳送成功過了,那麼接著執行本地事務,如果成功就告訴mq傳送確認訊息,如果失敗就告訴mq回滾訊息如果傳送了確認訊息,那麼此時B系統會接收到確認訊息,然後執行本地的事務mq會自動定時<typo id="typo-26345" data-origin="輪詢" ignoretag="true">輪詢</typo>所有prepared訊息回撥你的介面,問你,這個訊息是不是本地事務處理失敗了,所有沒傳送確認訊息?那是繼續重試還是回滾?一般來說這裡你就可以查下資料庫看之前本地事務是否執行,如果回滾了,那麼這裡也回滾吧。這個就是避免可能本地事務執行成功了,別確認訊息傳送失敗了。這個方案裡,要是系統B的事務失敗了咋辦?重試咯,自動不斷重試直到成功,如果實在是不行,要麼就是針對重要的資金類業務進行回滾,比如B系統本地回滾後,想辦法通知系統A也回滾;或者是傳送報警由人工來手工回滾和補償
目前國內網際網路公司大都是這麼玩兒的,要不你使用RocketMQ支援的,要不你就基於其他MQ中介軟體自己封裝一套類似的邏輯,總之思路就是這樣的。
最大努力通知業務發起方將協調服務的訊息傳送到MQ,下游服務接收此訊息,如果處理失敗,將進行重試,重試N次後依然失敗,將不進行重試,放棄處理,這個應用場景要求對事物性要求不高的地方。