首頁>技術>

一、介紹

在介紹訊息中介軟體 MQ 之前,我們先來簡單的瞭解一下,為何要引用訊息中介軟體。

例如,在電商平臺中,常見的使用者下單,會經歷以下幾個流程。

當用戶下單時,建立完訂單之後,會呼叫第三方支付平臺,對使用者的賬戶金額進行扣款,如果平臺支付扣款成功,會將結果通知到對應的業務系統,接著業務系統會更新訂單狀態,同時呼叫倉庫介面,進行減庫存,通知物流進行發貨!

試想一下,從訂單狀態更新、到扣減庫存、通知物流發貨都在一個方法內同步完成,假如使用者支付成功、訂單狀態更新也成功,但是在扣減庫存或者通知物流發貨步驟失敗了,那麼就會造成一個問題,使用者已經支付成功了,只是在倉庫扣減庫存方面失敗,從而導致整個交易失敗!

一單失敗,老闆可以假裝看不見,但是如果上千個單子都因此失敗,那麼因系統造成的業務損失,將是巨大的,老闆可能坐不住了!

因此,針對這種業務場景,架構師們引入了非同步通訊技術方案,從而保證服務的高可用,大體流程如下:

當訂單系統收到支付平臺傳送的扣款結果之後,會將訂單訊息傳送到 MQ 訊息中介軟體,同時也會更新訂單狀態。

在另一端,由倉庫系統來非同步監聽訂單系統傳送的訊息,當收到訂單訊息之後,再操作扣減庫存、通知物流公司發貨等服務!

在最佳化後的流程下,即使扣減庫存服務失敗,也不會影響使用者交易。

正如《人月神話》中所說的,軟體工程,沒有銀彈

當引入了 MQ 訊息中介軟體之後,同樣也會帶來另一個問題,假如 MQ 訊息中介軟體突然宕機了,導致訊息無法傳送出去,那倉庫系統就無法接受到訂單訊息,進而也無法發貨!

針對這個問題,業界主流的解決辦法是採用叢集部署,一主多從模式,從而實現服務的高可用,即使一臺機器突然宕機了,也依然能保證服務可用,在伺服器故障期間,透過運維手段,將服務重新啟動,之後服務依然能正常執行!

但是還有另一個問題,假如倉庫系統已經收到訂單訊息了,但是業務處理異常,或者伺服器異常,導致當前商品庫存並沒有扣減,也沒有發貨!

這個時候又改如何處理呢?

今天我們所要介紹的正是這種場景,假如訊息消費失敗,我們應該如何處理?

二、解決方案

針對訊息消費失敗的場景,我們一般會透過如下方式進行處理:

當訊息消費失敗時,會對訊息進行重新推送如果重試次數超過最大值,會將異常訊息儲存到資料庫,然後人工介入排查問題,進行手工重試

當訊息在客戶端消費失敗時,我們會將異常的訊息加入到一個訊息重試物件中,同時設定最大重試次數,並將訊息重新推送到 MQ 訊息中介軟體裡,當重試次數超過最大值時,會將異常的訊息儲存到 MongoDB資料庫中,方便後續查詢異常的資訊。

基於以上系統模型,我們可以編寫一個公共重試元件,話不多說,直接幹!

三、程式碼實踐

本次補償服務採用 rabbitmq 訊息中介軟體進行處理,其他訊息中介軟體處理思路也類似!

3.1、建立一個訊息重試實體類
@Data@EqualsAndHashCode(callSuper = false)@Accessors(chain = true)public class MessageRetryDTO implements Serializable {    private static final long serialVersionUID = 1L;    /**     * 原始訊息body     */    private String bodyMsg;    /**     * 訊息來源ID     */    private String sourceId;    /**     * 訊息來源描述     */    private String sourceDesc;    /**     * 交換器     */    private String exchangeName;    /**     * 路由鍵     */    private String routingKey;    /**     * 佇列     */    private String queueName;    /**     * 狀態,1:初始化,2:成功,3:失敗     */    private Integer status = 1;    /**     * 最大重試次數     */    private Integer maxTryCount = 3;    /**     * 當前重試次數     */    private Integer currentRetryCount = 0;    /**     * 重試時間間隔(毫秒)     */    private Long retryIntervalTime = 0L;    /**     * 任務失敗資訊     */    private String errorMsg;    /**     * 建立時間     */    private Date createTime;    @Override    public String toString() {        return "MessageRetryDTO{" +                "bodyMsg='" + bodyMsg + '\'' +                ", sourceId='" + sourceId + '\'' +                ", sourceDesc='" + sourceDesc + '\'' +                ", exchangeName='" + exchangeName + '\'' +                ", routingKey='" + routingKey + '\'' +                ", queueName='" + queueName + '\'' +                ", status=" + status +                ", maxTryCount=" + maxTryCount +                ", currentRetryCount=" + currentRetryCount +                ", retryIntervalTime=" + retryIntervalTime +                ", errorMsg='" + errorMsg + '\'' +                ", createTime=" + createTime +                '}';    }    /**     * 檢查重試次數是否超過最大值     *     * @return     */    public boolean checkRetryCount() {        retryCountCalculate();        //檢查重試次數是否超過最大值        if (this.currentRetryCount < this.maxTryCount) {            return true;        }        return false;    }    /**     * 重新計算重試次數     */    private void retryCountCalculate() {        this.currentRetryCount = this.currentRetryCount + 1;    }}
3.2、編寫服務重試抽象類
public abstract class CommonMessageRetryService {    private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private MongoTemplate mongoTemplate;    /**     * 初始化訊息     *     * @param message     */    public void initMessage(Message message) {        log.info("{} 收到訊息: {},業務資料:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));        try {            //封裝訊息            MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);            if (log.isInfoEnabled()) {                log.info("反序列化訊息:{}", messageRetryDto.toString());            }            prepareAction(messageRetryDto);        } catch (Exception e) {            log.warn("處理訊息異常,錯誤資訊:", e);        }    }    /**     * 準備執行     *     * @param retryDto     */    protected void prepareAction(MessageRetryDTO retryDto) {        try {            execute(retryDto);            doSuccessCallBack(retryDto);        } catch (Exception e) {            log.error("當前任務執行異常,業務資料:" + retryDto.toString(), e);            //執行失敗,計算是否還需要繼續重試            if (retryDto.checkRetryCount()) {                if (log.isInfoEnabled()) {                    log.info("重試訊息:{}", retryDto.toString());                }                retrySend(retryDto);            } else {                if (log.isWarnEnabled()) {                    log.warn("當前任務重試次數已經到達最大次數,業務資料:" + retryDto.toString(), e);                }                doFailCallBack(retryDto.setErrorMsg(e.getMessage()));            }        }    }    /**     * 任務執行成功,回撥服務(根據需要進行重寫)     *     * @param messageRetryDto     */    private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {        try {            successCallback(messageRetryDto);        } catch (Exception e) {            log.warn("執行成功回撥異常,佇列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());        }    }    /**     * 任務執行失敗,回撥服務(根據需要進行重寫)     *     * @param messageRetryDto     */    private void doFailCallBack(MessageRetryDTO messageRetryDto) {        try {            saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));            failCallback(messageRetryDto);        } catch (Exception e) {            log.warn("執行失敗回撥異常,佇列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());        }    }    /**     * 執行任務     *     * @param messageRetryDto     */    protected abstract void execute(MessageRetryDTO messageRetryDto);    /**     * 成功回撥     *     * @param messageRetryDto     */    protected abstract void successCallback(MessageRetryDTO messageRetryDto);    /**     * 失敗回撥     *     * @param messageRetryDto     */    protected abstract void failCallback(MessageRetryDTO messageRetryDto);    /**     * 構建訊息補償實體     * @param message     * @return     */    private MessageRetryDTO buildMessageRetryInfo(Message message){        //如果頭部包含補償訊息實體,直接返回        Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();        if(messageHeaders.containsKey("message_retry_info")){            Object retryMsg = messageHeaders.get("message_retry_info");            if(Objects.nonNull(retryMsg)){                return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);            }        }        //自動將業務訊息加入補償實體        MessageRetryDTO messageRetryDto = new MessageRetryDTO();        messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));        messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());        messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());        messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());        messageRetryDto.setCreateTime(new Date());        return messageRetryDto;    }    /**     * 異常訊息重新入庫     * @param retryDto     */    private void retrySend(MessageRetryDTO retryDto){        //將補償訊息實體放入頭部,原始訊息內容保持不變        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);        messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));        Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);        rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);    }    /**     * 將異常訊息儲存到mongodb中     * @param retryDto     */    private void saveMessageRetryInfo(MessageRetryDTO retryDto){        try {            mongoTemplate.save(retryDto, "message_retry_info");        } catch (Exception e){            log.error("將異常訊息儲存到mongodb失敗,訊息資料:" + retryDto.toString(), e);        }    }}
3.3、編寫監聽服務類

在消費端應用的時候,也非常簡單,例如,針對扣減庫存操作,我們可以透過如下方式進行處理!

@Componentpublic class OrderServiceListener extends CommonMessageRetryService {    private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);    /**     * 監聽訂單系統下單成功訊息     * @param message     */    @RabbitListener(queues = "mq.order.add")    public void consume(Message message) {        log.info("收到訂單下單成功訊息: {}", message.toString());        super.initMessage(message);    }    @Override    protected void execute(MessageRetryDTO messageRetryDto) {        //呼叫扣減庫存服務,將業務異常丟擲來    }    @Override    protected void successCallback(MessageRetryDTO messageRetryDto) {        //業務處理成功,回撥    }    @Override    protected void failCallback(MessageRetryDTO messageRetryDto) {        //業務處理失敗,回撥    }}

當訊息消費失敗,並超過最大次數時,會將訊息儲存到 mongodb 中,然後像常規資料庫操作一樣,可以透過 web 介面查詢異常訊息,並針對具體場景進行重試!

四、小結

可能有的同學會問,為啥不將異常訊息存在資料庫?

起初的確是儲存在 MYSQL 中,但是隨著業務的快速發展,訂單訊息資料結構越來越複雜,資料量也非常的大,甚至大到 MYSQL 中的 text 型別都無法儲存,同時這種資料結構也不太適合在 MYSQL 中儲存,因此將其遷移到 mongodb!

5
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • python大資料分析離線開發環境的配置