首頁>技術>

前言

之前給小夥伴們科普ClickHouse叢集的時候,我曾經提到ClickHouse叢集幾乎是去中心化的(decentralized),亦即叢集中各個CK例項是對等的,沒有主從之分。叢集上的複製表、分散式表機制只是靠外部ZooKeeper做分散式協調工作。想了想,又補了一句:

“其實單純靠P2P互相通訊就能維護完整的叢集狀態,實現叢集自治,比如Redis Cluster。”

當然限於時間沒有展開說。這個週末休息夠了,難得有空,來隨便講兩句吧。

在官方Redis Cluster出現之前,要實現叢集化Redis都是依靠Sharding+Proxy技術,如Twemproxy和Codis(筆者之前也寫過 Codis叢集 的事兒)。而官方Redis Cluster走了去中心化的路,其通訊基礎就是Gossip協議,同時該協議還能保證一致性和可用性。本文先來介紹一下它。

Gossip協議

簡介

最近幾個月一直在看《Friends》下飯。認為自己從不gossip的Rachel一語道破了gossip的本質。

現實生活中的流言八卦傳播的機制就是“I hear something and I pass that information on”,並且其傳播速度非常快。而Gossip協議就是借鑑了這個特點產生的,在P2P網路和分散式系統中應用廣泛,它的方法論也特別簡單:

在一個處於有界網路的集群裡,如果每個節點都隨機與其他節點交換特定資訊,經過足夠長的時間後,叢集各個節點對該份資訊的認知終將收斂到一致。

這裡的“特定資訊”一般就是指叢集狀態、各節點的狀態以及其他元資料等。可見,Gossip協議是完全符合BASE理論精神的,所以它基本可以用於任何只要求最終一致性的領域,典型的例子就是區塊鏈,以及部分分散式儲存。另外,它可以很方便地實現彈性叢集(即節點可以隨時上下線),如失敗檢測與動態負載均衡等。

以下GIF圖示出Gossip協議下一種可能的訊息傳播過程。藍色節點表示對訊息無感知,紅色節點表示有感知。

Source: https://managementfromscratch.wordpress.com/2016/04/01/introduction-to-gossip/

為了使Gossip協議更易於表達和分析,一般都會借用流行病學(epidemiology)中的SIR模型進行描述,因為大流行病(pandemic,比如這次新冠肺炎)的傳播與流言八卦的傳播具有相似性,並且已經由前人總結出一套成熟的數學模型了。

流行病學SIR模型

SIR模型早在1927年就由Kermack與McKendrick提出。該模型將傳染病流行範圍內的人群分為3類:

S(易感者/susceptible) ,指未患病的人,但缺乏免疫能力,與感染者接觸之後容易受到感染。I(感染者/infective) ,指已患病的人,並且可以將病原體傳播給易感者人群;R(隔離者/removed) ,指被隔離在無傳染環境,或者因病癒獲得免疫力而不再易感的人。

如果不考慮人口的增長和減少,即s(t)+i(t)+r(t)始終為一常量的話,那麼SIR模型就可以用如下的微分方程組來表示。

其中,係數β是感染率,γ則是治癒率。為了阻止以至消滅傳染病的流行,醫學界會努力降低感染率,提高治癒率。但是在Gossip協議的語境下,計算機科學家要做的恰恰相反,即儘量高效地讓叢集內所有節點都“感染”(對資訊有感知)。由SIR模型推演出的Gossip協議傳播模型主要有兩種,即反熵(Anti-entropy)和謠言傳播(Rumor-mongering),下面分別介紹之。

反熵(Anti-entropy)

熵是物理學中體系混亂程度的度量,而反熵就是透過看似雜亂無章的通訊達到最終一致。反熵只用到SIR模型中的S和I狀態,S狀態表示節點尚未感知到資料,I狀態表示節點已感知到資料,並且正在傳播給其他節點。具體來講,反熵Gossip協議有3種實現方式:

推模式(push):處於I狀態的節點週期性地隨機選擇其他節點,並將自己持有的資料傳送出去;拉模式(pull):處於S狀態的節點週期性地隨機選擇其他節點,並請求接收其他節點持有的資料;推-拉模式(push-pull):即以上兩者的綜合。

下圖示出在有界叢集P中,以週期Δ執行反熵Gossip協議的虛擬碼描述。

如何分析其效率呢?為了簡化問題,提出以下約束:

每一輪週期每個節點都只隨機選擇一個其他節點進行通訊;起始時,只有一個節點處於I狀態,其他節點都處於S狀態。

令s(t)表示在時刻t時,S狀態的節點佔總節點數n的比例(注意是比例),那麼顯然有s(0) = 1 - 1/n,可以計算出s(t)的期望為:

推模式拉模式

由下圖可見,拉模式的資訊傳播效率比推模式高,達到了真正的指數級收斂速度。綜合了兩者的推-拉模式效率則比拉模式更高。

但是,推模式每輪只需要1次資訊交換,拉模式需要2次,推-拉模式需要3次。由於反熵Gossip協議每次都交換全量訊息,資料量可能會比較大,因此具體選擇哪種模式,還是需要考慮網路資源的開銷再決定。

謠言傳播(Rumor-mongering)

謠言傳播與反熵不同的一點是,它採用完整的SIR模型。處於R狀態的結點表示已經獲取到了資訊,但是不會將這個資訊分享給其他節點,就像“謠言止於智者”一樣。另一個不同點是,謠言傳播機制每次只會交換髮生變化的資訊,而不是全量資訊,所以它對網路資源的開銷會比反熵機制要小很多。

下圖示出在有界叢集P中,以週期Δ執行謠言傳播Gossip協議的虛擬碼描述。

圖中的blind/feedback和coin/counter是怎麼一回事呢?它們表示節點從I狀態轉移到R狀態的條件。

coin:在每輪傳播中,節點以1/k的機率從I轉移到R狀態。counter:在參與k輪傳播之後(即傳送k次資訊)之後,節點從I狀態轉移到R狀態。feedback:在發出資訊後,對位節點有反饋才可以進入R狀態。blind:在發出資訊後,不必等待對位節點有反饋,隨時都可以進入R狀態。

由上可見,謠言傳播模式的結束條件是所有節點都對謠言“免疫”,但是又有可能造成部分節點始終無法對訊息有感知(即保持S狀態)。以coin條件為例,可以寫出如下的微分方程組。其中s和i仍然表示S狀態和I狀態的節點佔總節點數的比例。

消去t,可得:

根據初始條件:i(1 - 1/n) = 1,可以推匯出:

如果我們要讓i(s*) = 0的話:

可見,s 會隨著k值的增高而指數級下降。當k = 1時,s 約為20%,而當k = 5時,s*就只有約0.24%了。也就是說,如果節點每輪以1/5的機率從I轉換為R狀態,就已經比較安全了。

在實際應用中,反熵和謠言傳播的各種方式往往結合在一起使用,因此Gossip協議非常靈活,沒有完全統一的標準。以下就看一看Redis Cluster的實現。

Redis Cluster的Gossip方案

Redis Cluster是在3.0版本加入的feature,故我們就選擇3.0版本的原始碼來簡單解說。下圖是主從架構的Redis Cluster示意圖,其中虛線表示各個節點之間的Gossip通訊。

訊息型別

Gossip協議是個鬆散的協議,沒有對資料交換的格式做特別的約束,各框架可以自由設定自己的implementation。Redis Cluster有以下9種訊息型別的定義,詳情可見註釋(註釋非我所寫,而是來自 redis-3.0-annotated 專案,致敬)。

/* Note that the PING, PONG and MEET messages are actually the same exact * kind of packet. PONG is the reply to ping, in the exact format as a PING, * while MEET is a special PING that forces the receiver to add the sender * as a node (if it is not already in the list). */// 注意,PING 、 PONG 和 MEET 實際上是同一種訊息。// PONG 是對 PING 的回覆,它的實際格式也為 PING 訊息,// 而 MEET 則是一種特殊的 PING 訊息,用於強制訊息的接收者將訊息的傳送者新增到叢集中// (如果節點尚未在節點列表中的話)// PING#define CLUSTERMSG_TYPE_PING 0          /* Ping */// PONG (回覆 PING)#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */// 請求將某個節點新增到叢集中#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */// 將某個節點標記為 FAIL#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */// 透過釋出與訂閱功能廣播訊息#define CLUSTERMSG_TYPE_PUBLISH 4       /* Pub/Sub Publish propagation */// 請求進行故障轉移操作,要求訊息的接收者透過投票來支援訊息的傳送者#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */// 訊息的接收者同意向訊息的傳送者投票#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* Yes, you have my vote */// 槽佈局已經發生變化,訊息傳送者要求訊息接收者進行相應的更新#define CLUSTERMSG_TYPE_UPDATE 7        /* Another node slots configuration */// 為了進行手動故障轉移,暫停各個客戶端#define CLUSTERMSG_TYPE_MFSTART 8       /* Pause clients for manual failover */

可見,Redis Gossip除了負責資訊交換之外,還會負責節點的上下線及failover。

訊息格式

Redis Gossip訊息分為訊息頭和訊息體,訊息體一共有4類,其中MEET、PING和PONG訊息都用clusterMsgDataGossip結構來表示。

typedef struct {    // 節點的名字    // 在剛開始的時候,節點的名字會是隨機的    // 當 MEET 資訊傳送並得到回覆之後,叢集就會為節點設定正式的名字    char nodename[REDIS_CLUSTER_NAMELEN];    // 最後一次向該節點發送 PING 訊息的時間戳    uint32_t ping_sent;    // 最後一次從該節點接收到 PONG 訊息的時間戳    uint32_t pong_received;    // 節點的 IP 地址    char ip[REDIS_IP_STR_LEN];    /* IP address last time it was seen */    // 節點的埠號    uint16_t port;  /* port last time it was seen */    // 節點的標識值    uint16_t flags;    // 對齊位元組,不使用    uint32_t notused; /* for 64 bit alignment */} clusterMsgDataGossip; typedef struct {    // 下線節點的名字    char nodename[REDIS_CLUSTER_NAMELEN];} clusterMsgDataFail; typedef struct {    // 頻道名長度    uint32_t channel_len;    // 訊息長度    uint32_t message_len;    // 訊息內容,格式為 頻道名+訊息    // bulk_data[0:channel_len-1] 為頻道名    // bulk_data[channel_len:channel_len+message_len-1] 為訊息    unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */} clusterMsgDataPublish; typedef struct {    // 節點的配置紀元    uint64_t configEpoch; /* Config epoch of the specified instance. */    // 節點的名字    char nodename[REDIS_CLUSTER_NAMELEN]; /* Name of the slots owner. */    // 節點的槽佈局    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* Slots bitmap. */} clusterMsgDataUpdate; union clusterMsgData {    /* PING, MEET and PONG */    struct {        /* Array of N clusterMsgDataGossip structures */        clusterMsgDataGossip gossip[1];    } ping;    /* FAIL */    struct {        clusterMsgDataFail about;    } fail;    /* PUBLISH */    struct {        clusterMsgDataPublish msg;    } publish;    /* UPDATE */    struct {        clusterMsgDataUpdate nodecfg;    } update;};

排程Gossip通訊

在redis.c中,有一個負責排程執行Redis server內週期性任務的函式,名為serverCron()。其中,與叢集相關的程式碼段如下。

/* Run the Redis Cluster cron. */// 如果伺服器執行在叢集模式下,那麼執行叢集操作run_with_period(100) {    if (server.cluster_enabled)     clusterCron();}

可見,在啟用叢集時,每個節點都會每隔100毫秒執行關於叢集的週期性任務clusterCron(),該函式中與Gossip有關的程式碼有多處,以下是部分節選。註釋寫得非常清楚,筆者就不再獻醜了。

節點加入叢集

// 為未建立連線的節點建立連線if (node->link == NULL) {    // .....    /* Queue a PING in the new connection ASAP: this is crucial     * to avoid false positives in failure detection.     *     * If the node is flagged as MEET, we send a MEET message instead     * of a PING one, to force the receiver to add us in its node     * table. */    // 向新連線的節點發送 PING 命令,防止節點被識進入下線    // 如果節點被標記為 MEET ,那麼傳送 MEET 命令,否則傳送 PING 命令    old_ping_sent = node->ping_sent;    clusterSendPing(link, node->flags & REDIS_NODE_MEET ?            CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);    // 這不是第一次傳送 PING 資訊,所以可以還原這個時間    // 等 clusterSendPing() 函式來更新它    if (old_ping_sent) {        /* If there was an active ping before the link was         * disconnected, we want to restore the ping time, otherwise         * replaced by the clusterSendPing() call. */        node->ping_sent = old_ping_sent;    }    /* We can clear the flag after the first packet is sent.     *     * 在傳送 MEET 資訊之後,清除節點的 MEET 標識。     *     * If we'll never receive a PONG, we'll never send new packets     * to this node. Instead after the PONG is received and we     * are no longer in meet/handshake status, we want to send     * normal PING packets.      *     * 如果當前節點(傳送者)沒能收到 MEET 資訊的回覆,     * 那麼它將不再向目標節點發送命令。     *     * 如果接收到回覆的話,那麼節點將不再處於 HANDSHAKE 狀態,     * 並繼續向目標節點發送普通 PING 命令。     */    node->flags &= ~REDIS_NODE_MEET;    redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",            node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);}

隨機週期性傳送PING訊息

/* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */// clusterCron() 每執行 10 次(至少間隔一秒鐘),就向一個隨機節點發送 gossip 資訊if (!(iteration % 10)) {    int j;    /* Check a few random nodes and ping the one with the oldest     * pong_received time. */    // 隨機 5 個節點,選出其中一個    for (j = 0; j < 5; j++) {        // 隨機在叢集中挑選節點        de = dictGetRandomKey(server.cluster->nodes);        clusterNode *this = dictGetVal(de);        /* Don't ping nodes disconnected or with a ping currently active. */        // 不要 PING 連線斷開的節點,也不要 PING 最近已經 PING 過的節點        if (this->link == NULL || this->ping_sent != 0) continue;        if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))            continue;        // 選出 5 個隨機節點中最近一次接收 PONG 回覆距離現在最舊的節點        if (min_pong_node == NULL || min_pong > this->pong_received) {            min_pong_node = this;            min_pong = this->pong_received;        }    }    // 向最久沒有收到 PONG 回覆的節點發送 PING 命令    if (min_pong_node) {        redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);        clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);    }}

防止節點假超時及狀態過期

/* If we are waiting for the PONG more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */// 如果等到 PONG 到達的時間超過了 node timeout 一半的連線// 因為儘管節點依然正常,但連線可能已經出問題了if (node->link && /* is connected */    now - node->link->ctime >    server.cluster_node_timeout && /* was not already reconnected */    node->ping_sent && /* we already sent a ping */    node->pong_received < node->ping_sent && /* still waiting pong */    /* and we are waiting for the pong more than timeout/2 */    now - node->ping_sent > server.cluster_node_timeout/2){    /* Disconnect the link, it will be reconnected automatically. */    // 釋放連線,下次 clusterCron() 會自動重連    freeClusterLink(node->link);}/* If we have currently no active ping in this instance, and the * received PONG is older than half the cluster timeout, send * a new ping now, to ensure all the nodes are pinged without * a too big delay. */// 如果目前沒有在 PING 節點// 並且已經有 node timeout 一半的時間沒有從節點那裡收到 PONG 回覆// 那麼向節點發送一個 PING ,確保節點的資訊不會太舊// (因為一部分節點可能一直沒有被隨機中)if (node->link &&    node->ping_sent == 0 &&    (now - node->pong_received) > server.cluster_node_timeout/2){    clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);    continue;}

處理failover和標記疑似下線

/* If we are a master and one of the slaves requested a manual * failover, ping it continuously. */// 如果這是一個主節點,並且有一個從伺服器請求進行手動故障轉移// 那麼向從伺服器傳送 PING 。if (server.cluster->mf_end &&    nodeIsMaster(myself) &&    server.cluster->mf_slave == node &&    node->link){    clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);    continue;}/* Check only if we have an active ping for this instance. */// 以下程式碼只在節點發送了 PING 命令的情況下執行if (node->ping_sent == 0) continue;/* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */// 計算等待 PONG 回覆的時長delay = now - node->ping_sent;// 等待 PONG 回覆的時長超過了限制值,將目標節點標記為 PFAIL (疑似下線)if (delay > server.cluster_node_timeout) {    /* Timeout reached. Set the node as possibly failing if it is     * not already in this state. */    if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {        redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",            node->name);        // 開啟疑似下線標記        node->flags |= REDIS_NODE_PFAIL;        update_state = 1;    }}

由上可知, server.cluster_node_timeout 是判斷節點狀態過期及疑似下線的標準,所以對於不同網路狀態和規模的叢集,要視實際情況設定。

實際傳送Gossip訊息

以下是前方多次呼叫過的clusterSendPing()方法的原始碼,不難理解。

/* Send a PING or PONG packet to the specified node, making sure to add enough * gossip informations. */// 向指定節點發送一條 MEET 、 PING 或者 PONG 訊息void clusterSendPing(clusterLink *link, int type) {    unsigned char buf[sizeof(clusterMsg)];    clusterMsg *hdr = (clusterMsg*) buf;    int gossipcount = 0, totlen;    /* freshnodes is the number of nodes we can still use to populate the     * gossip section of the ping packet. Basically we start with the nodes     * we have in memory minus two (ourself and the node we are sending the     * message to). Every time we add a node we decrement the counter, so when     * it will drop to <= zero we know there is no more gossip info we can     * send. */    // freshnodes 是用於傳送 gossip 資訊的計數器    // 每次傳送一條資訊時,程式將 freshnodes 的值減一    // 當 freshnodes 的數值小於等於 0 時,程式停止傳送 gossip 資訊    // freshnodes 的數量是節點目前的 nodes 表中的節點數量減去 2     // 這裡的 2 指兩個節點,一個是 myself 節點(也即是傳送資訊的這個節點)    // 另一個是接受 gossip 資訊的節點    int freshnodes = dictSize(server.cluster->nodes)-2;     // 如果傳送的資訊是 PING ,那麼更新最後一次傳送 PING 命令的時間戳    if (link->node && type == CLUSTERMSG_TYPE_PING)        link->node->ping_sent = mstime();     // 將當前節點的資訊(比如名字、地址、埠號、負責處理的槽)記錄到訊息裡面    clusterBuildMessageHdr(hdr,type);     /* Populate the gossip fields */    // 從當前節點已知的節點中隨機選出兩個節點    // 並透過這條訊息捎帶給目標節點,從而實現 gossip 協議     // 每個節點有 freshnodes 次傳送 gossip 資訊的機會    // 每次向目標節點發送 2 個被選中節點的 gossip 資訊(gossipcount 計數)    while(freshnodes > 0 && gossipcount < 3) {        // 從 nodes 字典中隨機選出一個節點(被選中節點)        dictEntry *de = dictGetRandomKey(server.cluster->nodes);        clusterNode *this = dictGetVal(de);         clusterMsgDataGossip *gossip;        int j;         /* In the gossip section don't include:         * 以下節點不能作為被選中節點:         * 1) Myself.         *    節點本身。         * 2) Nodes in HANDSHAKE state.         *    處於 HANDSHAKE 狀態的節點。         * 3) Nodes with the NOADDR flag set.         *    帶有 NOADDR 標識的節點         * 4) Disconnected nodes if they don't have configured slots.         *    因為不處理任何槽而被斷開連線的節點          */        if (this == myself ||            this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||            (this->link == NULL && this->numslots == 0))        {                freshnodes--; /* otherwise we may loop forever. */                continue;        }         /* Check if we already added this node */        // 檢查被選中節點是否已經在 hdr->data.ping.gossip 數組裡面        // 如果是的話說明這個節點之前已經被選中了        // 不要再選中它(否則就會出現重複)        for (j = 0; j < gossipcount; j++) {            if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,                    REDIS_CLUSTER_NAMELEN) == 0) break;        }        if (j != gossipcount) continue;         /* Add it */         // 這個被選中節點有效,計數器減一        freshnodes--;         // 指向 gossip 資訊結構        gossip = &(hdr->data.ping.gossip[gossipcount]);         // 將被選中節點的名字記錄到 gossip 資訊        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);        // 將被選中節點的 PING 命令傳送時間戳記錄到 gossip 資訊        gossip->ping_sent = htonl(this->ping_sent);        // 將被選中節點的 PING 命令回覆的時間戳記錄到 gossip 資訊        gossip->pong_received = htonl(this->pong_received);        // 將被選中節點的 IP 記錄到 gossip 資訊        memcpy(gossip->ip,this->ip,sizeof(this->ip));        // 將被選中節點的埠號記錄到 gossip 資訊        gossip->port = htons(this->port);        // 將被選中節點的標識值記錄到 gossip 資訊        gossip->flags = htons(this->flags);         // 這個被選中節點有效,計數器增一        gossipcount++;    }     // 計算資訊長度    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);    // 將被選中節點的數量(gossip 資訊中包含了多少個節點的資訊)    // 記錄在 count 屬性裡面    hdr->count = htons(gossipcount);    // 將資訊的長度記錄到資訊裡面    hdr->totlen = htonl(totlen);     // 傳送資訊    clusterSendMessage(link,buf,totlen);}
The End

作者:zthinker

出處:https://zthinker.com/archives/%E6%BC%AB%E8%B0%88gossip%E5%8D%8F%E8%AE%AE%E4%B8%8E%E5%85%B6%E5%9C%A8rediscluster%E4%B8%AD%E7%9A%84%E5%AE%9E%E7%8E%B0

9
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • GeoHash原理及redis geo相關操作