回覆列表
  • 1 # DevOps

    冪等性,不僅對MQ有要求,對業務上下游也有要求。

    需要分不同策略設計上半場和下半場通訊.

    這裡空間有限 很難贅述.建議百度多看看成型的解決方案.

  • 2 # WeCoding

    01 冪等性如此重要

    Kafka作為分散式MQ,大量用於分散式系統中,如訊息推送系統、業務平臺系統(如結算平臺),就拿結算來說,業務方作為上游把資料打到結算平臺,如果一份資料被計算、處理了多次,產生的後果將會特別嚴重。

    02 哪些因素影響冪等性

    使用Kafka時,需要保證exactly-once語義。要知道在分散式系統中,出現網路分割槽是不可避免的,如果kafka broker 在回覆ack時,出現網路故障或者是full gc導致ack timeout,producer將會重發,如何保證producer重試時不造成重複or亂序?又或者producer 掛了,新的producer並沒有old producer的狀態資料,這個時候如何保證冪等?即使Kafka 傳送訊息滿足了冪等,consumer拉取到訊息後,把訊息交給執行緒池workers,workers執行緒對message的處理可能包含非同步操作,又會出現以下情況:

    先commit,再執行業務邏輯:提交成功,處理失敗 。造成丟失

    先執行業務邏輯,再commit:提交失敗,執行成功。造成重複執行

    先執行業務邏輯,再commit:提交成功,非同步執行fail。造成丟失

    03 Kafka保證傳送冪等性

    針對以上的問題,kafka在0.11版新增了冪等型producer和事務型producer。前者解決了單會話冪等性等問題,後者解決了多會話冪等性。

    單會話冪等性

    為解決producer重試引起的亂序和重複。Kafka增加了pid和seq。Producer中每個RecordBatch都有一個單調遞增的seq; Broker上每個tp也會維護pid-seq的對映,並且每Commit都會更新lastSeq。這樣recordBatch到來時,broker會先檢查RecordBatch再儲存資料:如果batch中 baseSeq(第一條訊息的seq)比Broker維護的序號(lastSeq)大1,則儲存資料,否則不儲存(inSequence方法)。

    引申:Kafka producer 對有序性做了哪些處理

    假設我們有5個請求,batch1、batch2、batch3、batch4、batch5;如果只有batch2 ack failed,3、4、5都儲存了,那2將會隨下次batch重發而造成重複。我們可以設定max.in.flight.requests.per.connection=1(客戶端在單個連線上能夠傳送的未響應請求的個數)來解決亂序,但降低了系統吞吐。

    新版本kafka設定enable.idempotence=true後能夠動態調整max-in-flight-request。正常情況下max.in.flight.requests.per.connection大於1。當重試請求到來且時,batch 會根據 seq重新新增到佇列的合適位置,並把max.in.flight.requests.per.connection設為1,這樣它 前面的 batch序號都比它小,只有前面的都發完了,它才能發。

    多會話冪等性

    在單會話冪等性中介紹,kafka透過引入pid和seq來實現單會話冪等性,但正是引入了pid,當應用重啟時,新的producer並沒有old producer的狀態資料。可能重複儲存。

    Kafka事務透過隔離機制來實現多會話冪等性

    kafka事務引入了transactionId 和Epoch,設定transactional.id後,一個transactionId只對應一個pid, 且Server 端會記錄最新的 Epoch 值。這樣有新的producer初始化時,會向TransactionCoordinator傳送InitPIDRequest請求, TransactionCoordinator 已經有了這個 transactionId對應的 meta,會返回之前分配的 PID,並把 Epoch 自增 1 返回,這樣當old producer恢復過來請求操作時,將被認為是無效producer丟擲異常。 如果沒有開啟事務,TransactionCoordinator會為新的producer返回new pid,這樣就起不到隔離效果,因此無法實現多會話冪等。

    04 Consumer端冪等性

    如上所述,consumer拉取到訊息後,把訊息交給執行緒池workers,workers對message的handle可能包含非同步操作,又會出現以下情況:

    先commit,再執行業務邏輯:提交成功,處理失敗 。造成丟失

    先執行業務邏輯,再commit:提交失敗,執行成功。造成重複執行

    先執行業務邏輯,再commit:提交成功,非同步執行fail。造成丟失

    對此我們常用的方法時,works取到訊息後先執行如下code:

    if(cache.contain(msgId)){

    // cache中包含msgId,已經處理過

    continue;

    }else {

    lock.lock();

    cache.put(msgId,timeout);

    commitSync();

    lock.unLock();

    }

  • 中秋節和大豐收的關聯?
  • 海蘭心皇太極是什麼電視劇?