首頁>技術>

出處:https://mp.weixin.qq.com/s?__biz=Mzg5NjA1MjkxNw==&mid=2247493468&idx=1&sn=95dd5eab1158ee3b6bb232b03b8f7688

前言

SuperEdge 介紹

SuperEdge 是基於原生 Kubernetes 的邊緣容器管理系統。該系統把雲原生能力擴充套件到邊緣側,很好的實現了雲端對邊緣端的管理和控制,極大簡化了應用從雲端部署到邊緣端的過程。同時SuperEdge設計了分散式健康檢查機制規避了雲邊網路不穩定造成的大量pod遷移和重建,保證了服務的穩定。

SuperEdge 分散式健康檢查

邊緣計算場景下,邊緣節點與雲端的網路環境十分複雜,連線並不可靠,在原生 Kubernetes 叢集中,會造成 apiserver 和節點連線的中斷,節點狀態的異常,最終導致pod的驅逐和 endpoint 的缺失,造成服務的中斷和波動,具體來說原生 Kubernetes 處理如下:

失聯的節點被置為 ConditionUnknown 狀態,並被新增 NoSchedule 和 NoExecute 的 taints失聯的節點上的 pod 被驅逐,並在其他節點上進行重建失聯的節點上的 pod 從 Service 的 Endpoint 列表中移除

因此,邊緣計算場景僅僅依賴邊端和 apiserver 的連線情況是不足以判斷節點是否異常的,會因為網路的不可靠造成誤判,影響正常服務。而相較於雲端和邊緣端的連線,顯然邊端節點之間的連線更為穩定,具有更高的參考價值,因此 superedge 提出了邊緣分散式健康檢查機制。該機制中節點狀態判定除了要考慮 apiserver 的因素外,還引入了節點的評估因素,進而對節點進行更為全面的狀態判斷。透過這個功能,能夠避免由於雲邊網路不可靠造成的大量的 pod 遷移和重建,保證服務的穩定

具體來說,主要透過如下三個層面增強節點狀態判斷的準確性:

每個節點定期探測其他節點健康狀態叢集內所有節點定期投票決定各節點的狀態雲端和邊端節點共同決定節點狀態

而分散式健康檢查最終的判斷處理如下:

edge-health-daemon 原始碼分析

在深入原始碼之前先介紹一下分散式健康檢查的實現原理,其架構圖如下所示:

Kubernetes 每個 node 在 kube-node-lease namespace 下會對應一個 Lease object,kubelet 每隔 node-status-update-frequency 時間(預設10s)會更新對應node的 Lease object。

node-controller 會每隔 node-monitor-period 時間(預設5s)檢查 Lease object 是否更新,如果超過 node-monitor-grace-period 時間(預設40s)沒有發生過更新,則認為這個 node 不健康,會更新 NodeStatus(ConditionUnknown)

而當節點心跳超時(ConditionUnknown)之後,node controller 會給該 node 新增如下 taints:

spec:  ...  taints:  - effect: NoSchedule    key: node.kubernetes.io/unreachable    timeAdded: "2020-07-02T03:50:47Z"  - effect: NoExecute    key: node.kubernetes.io/unreachable    timeAdded: "2020-07-02T03:50:53Z"

同時,endpoint controller 會從 endpoint backend 中踢掉該母機上的所有 pod

對於打上 NoSchedule taint 的母機,Scheduler 不會排程新的負載在該 node 上了;而對於打上 NoExecute(node.kubernetes.io/unreachable) taint 的母機,node controller 會在節點心跳超時之後一段時間(預設5mins)驅逐該節點上的 pod。

分散式健康檢查邊端的 edge-health-daemon 元件會對同區域邊緣節點執行分散式健康檢查,並向 apiserver 傳送健康狀態投票結果(給 node 打 annotation)。

此外,為了實現在雲邊斷連且分散式健康檢查狀態正常的情況下:

失聯的節點上的 pod 不會從 Service 的 Endpoint 列表中移除失聯的節點上的 pod 不會被驅逐

還需要在雲端執行 edge-health-admission( Kubernetes mutating admission webhook [1] ),不斷根據 node edge-health annotation 調整 kube-controller-manager 設定的 node taint(去掉NoExecute taint)以及 endpoints (將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現雲端和邊端共同決定節點狀態。

本章將主要介紹 edge-health-daemon 原理,如下為 edge-health-daemon 的相關資料結構:

type EdgeHealthMetadata struct {    *NodeMetadata    *CheckMetadata}type NodeMetadata struct {    NodeList []v1.Node    sync.RWMutex}type CheckMetadata struct {    CheckInfo            map[string]map[string]CheckDetail // Checker ip:{Checked ip:Check detail}    CheckPluginScoreInfo map[string]map[string]float64     // Checked ip:{Plugin name:Check score}    sync.RWMutex}type CheckDetail struct {    Normal bool    Time   time.Time}type CommunInfo struct {    SourceIP    string                 // ClientIP,Checker ip    CheckDetail map[string]CheckDetail // Checked ip:Check detail    Hmac        string}

含義如下:

NodeMetadata:為了實現分割槽域分散式健康檢查機制而維護的邊緣節點 cache,其中包含該區域內的所有邊緣節點列表 NodeListCheckMetadata:存放健康檢查的結果,具體來說包括兩個資料結構:CheckPluginScoreInfo:為 Checked ip:{Plugin name:Check score} 組織形式。第一級 key 表示:被檢查的ip;第二級 key 表示:檢查外掛的名稱;value 表示:檢查分數CheckInfo:為 Checker ip:{Checked ip:Check detail} 組織形式。第一級key表示:執行檢查的ip;第二級key表示:被檢查的ip;value表示檢查結果 CheckDetailCheckDetail:代表健康檢查的結果Normal:Normal 為 true 表示檢查結果正常;false 表示異常Time:表示得出該結果時的時間,用於結果有效性的判斷(超過一段時間沒有更新的結果將無效)CommunInfo:邊緣節點向其它節點發送健康檢查結果時使用的資料,其中包括:SourceIP:表示執行檢查的ipCheckDetail:為 Checked ip:Check detail 組織形式,包含被檢查的ip以及檢查結果Hmac:SourceIP 以及 CheckDetail 進行 hmac 得到,用於邊緣節點通訊過程中判斷傳輸資料的有效性(是否被篡改)

edge-health-daemon 主體邏輯包括四部分功能:

SyncNodeList:根據邊緣節點所在的 zone 重新整理 node cache,同時更新 CheckMetadata相關資料ExecuteCheck:對每個邊緣節點執行若干種類的健康檢查外掛(ping,kubelet等),並將各外掛檢查分數彙總,根據使用者設定的基準線得出節點是否健康的結果Commun:將本節點對其它各節點健康檢查的結果傳送給其它節點Vote:對所有節點健康檢查的結果分類,如果某個節點被大多數(>1/2)節點判定為正常,則對該節點新增 superedgehealth/node-health:true annotation,表明該節點分散式健康檢查結果為正常;否則,對該節點新增 superedgehealth/node-health:false annotation,表明該節點分散式健康檢查結果為異常

下面依次對上述功能進行原始碼分析:

1、SyncNodeList

SyncNodeList 每隔 HealthCheckPeriod 秒 (health-check-period 選項)執行一次,會按照如下情況分類重新整理 node cache:

如果 kube-system namespace 下不存在名為 edge-health-zone-config的configmap,則沒有開啟多地域探測,因此會獲取所有邊緣節點列表並重新整理 node cache否則,如果 edge-health-zone-config 的 configmap 資料部分 TaintZoneAdmission 為 false,則沒有開啟多地域探測,因此會獲取所有邊緣節點列表並重新整理 node cache如果 TaintZoneAdmission 為 true,且 node 有"superedgehealth/topology-zone"標籤(標示區域),則獲取"superedgehealth/topology-zone" label value 相同的節點列表並重新整理 node cache如果 node 沒有"superedgehealth/topology-zone" label,則只會將邊緣節點本身新增到分散式健康檢查節點列表中並重新整理 node cache(only itself)
func (ehd *EdgeHealthDaemon) SyncNodeList() {    // Only sync nodes when self-located found    var host *v1.Node    if host = ehd.metadata.GetNodeByName(ehd.cfg.Node.HostName); host == nil {        klog.Errorf("Self-hostname %s not found", ehd.cfg.Node.HostName)        return    }    // Filter cloud nodes and retain edge ones    masterRequirement, err := labels.NewRequirement(common.MasterLabel, selection.DoesNotExist, []string{})    if err != nil {        klog.Errorf("New masterRequirement failed %+v", err)        return    }    masterSelector := labels.NewSelector()    masterSelector = masterSelector.Add(*masterRequirement)    if mrc, err := ehd.cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.TaintZoneConfigMap); err != nil {        if apierrors.IsNotFound(err) { // multi-region configmap not found            if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {                klog.Errorf("Multi-region configmap not found and get nodes err %+v", err)                return            } else {                ehd.metadata.SetByNodeList(NodeList)            }        } else {            klog.Errorf("Get multi-region configmap err %+v", err)            return        }    } else { // multi-region configmap found        mrcv := mrc.Data[common.TaintZoneConfigMapKey]        klog.V(4).Infof("Multi-region value is %s", mrcv)        if mrcv == "false" { // close multi-region check            if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {                klog.Errorf("Multi-region configmap exist but disabled and get nodes err %+v", err)                return            } else {                ehd.metadata.SetByNodeList(NodeList)            }        } else { // open multi-region check            if hostZone, existed := host.Labels[common.TopologyZone]; existed {                klog.V(4).Infof("Host %s has HostZone %s", host.Name, hostZone)                zoneRequirement, err := labels.NewRequirement(common.TopologyZone, selection.Equals, []string{hostZone})                if err != nil {                    klog.Errorf("New masterZoneRequirement failed: %+v", err)                    return                }                masterZoneSelector := labels.NewSelector()                masterZoneSelector = masterZoneSelector.Add(*masterRequirement, *zoneRequirement)                if nodeList, err := ehd.nodeLister.List(masterZoneSelector); err != nil {                    klog.Errorf("TopologyZone label for hostname %s but get nodes err: %+v", host.Name, err)                    return                } else {                    ehd.metadata.SetByNodeList(nodeList)                }            } else { // Only check itself if there is no TopologyZone label                klog.V(4).Infof("Only check itself since there is no TopologyZone label for hostname %s", host.Name)                ehd.metadata.SetByNodeList([]*v1.Node{host})            }        }    }    // Init check plugin score    ipList := make(map[string]struct{})    for _, node := range ehd.metadata.Copy() {        for _, addr := range node.Status.Addresses {            if addr.Type == v1.NodeInternalIP {                ipList[addr.Address] = struct{}{}                ehd.metadata.InitCheckPluginScore(addr.Address)            }        }    }    // Delete redundant check plugin score    for _, checkedIp := range ehd.metadata.CopyCheckedIp() {        if _, existed := ipList[checkedIp]; !existed {            ehd.metadata.DeleteCheckPluginScore(checkedIp)        }    }    // Delete redundant check info    for checkerIp := range ehd.metadata.CopyAll() {        if _, existed := ipList[checkerIp]; !existed {            ehd.metadata.DeleteByIp(ehd.cfg.Node.LocalIp, checkerIp)        }    }    klog.V(4).Infof("SyncNodeList check info %+v successfully", ehd.metadata)}...func (cm *CheckMetadata) DeleteByIp(localIp, ip string) {    cm.Lock()    defer cm.Unlock()    delete(cm.CheckInfo[localIp], ip)    delete(cm.CheckInfo, ip)}

在按照如上邏輯更新node cache 之後,會初始化CheckMetadata.CheckPluginScoreInfo,將節點ip賦值給 CheckPluginScoreInfo key( Checked ip :被檢查的ip)。

2、ExecuteCheck

ExecuteCheck 也是每隔 HealthCheckPeriod 秒(health-check-period選項)執行一次,會對每個邊緣節點執行若干種類的健康檢查外掛(ping,kubelet等),並將各外掛檢查分數彙總,根據使用者設定的基準線 HealthCheckScoreLine (health-check-scoreline 選項)得出節點是否健康的結果。

func (ehd *EdgeHealthDaemon) ExecuteCheck() {    util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {        ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)    })    klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)    for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {        totalScore := 0.0        for _, score := range pluginScores {            totalScore += score        }        if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})        } else {            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})        }    }    klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)}

這裡會呼叫 ParallelizeUntil 併發執行各檢查外掛,edge-health 目前支援 ping 以及 kubelet 兩種檢查外掛,在 checkplugin 目錄(github.com/superedge/superedge/pkg/edge-health/checkplugin),透過 Register 註冊到 PluginInfo 單例(plugin列表)中,如下:

// TODO: handle flag parse errorsfunc (pcp *PingCheckPlugin) Set(s string) error {    var err error    for _, para := range strings.Split(s, ",") {        if len(para) == 0 {            continue        }        arr := strings.Split(para, "=")        trimKey := strings.TrimSpace(arr[0])        switch trimKey {        case "timeout":            timeout, _ := strconv.Atoi(strings.TrimSpace(arr[1]))            pcp.HealthCheckoutTimeOut = timeout        case "retries":            retries, _ := strconv.Atoi(strings.TrimSpace(arr[1]))            pcp.HealthCheckRetries = retries        case "weight":            weight, _ := strconv.ParseFloat(strings.TrimSpace(arr[1]), 64)            pcp.Weight = weight        case "port":            port, _ := strconv.Atoi(strings.TrimSpace(arr[1]))            pcp.Port = port        }    }    PluginInfo = NewPlugin()    PluginInfo.Register(pcp)    return err}func (p *Plugin) Register(plugin CheckPlugin) {    p.Plugins = append(p.Plugins, plugin)    klog.V(4).Info("Register check plugin: %+v", plugin)}...var (    PluginOnce sync.Once    PluginInfo Plugin)type Plugin struct {    Plugins []CheckPlugin}func NewPlugin() Plugin {    PluginOnce.Do(func() {        PluginInfo = Plugin{            Plugins: []CheckPlugin{},        }    })    return PluginInfo}

每種外掛具體執行健康檢查的邏輯封裝在 CheckExecute 中,這裡以 ping plugin 為例:

// github.com/superedge/superedge/pkg/edge-health/checkplugin/pingcheck.gofunc (pcp *PingCheckPlugin) CheckExecute(checkMetadata *metadata.CheckMetadata) {    copyCheckedIp := checkMetadata.CopyCheckedIp()    util.ParallelizeUntil(context.TODO(), 16, len(copyCheckedIp), func(index int) {        checkedIp := copyCheckedIp[index]        var err error        for i := 0; i < pcp.HealthCheckRetries; i++ {            if _, err := net.DialTimeout("tcp", checkedIp+":"+strconv.Itoa(pcp.Port), time.Duration(pcp.HealthCheckoutTimeOut)*time.Second); err == nil {                break            }        }        if err == nil {            klog.V(4).Infof("Edge ping health check plugin %s for ip %s succeed", pcp.Name(), checkedIp)            checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMax)        } else {            klog.Warning("Edge ping health check plugin %s for ip %s failed, possible reason %s", pcp.Name(), checkedIp, err.Error())            checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMin)        }    })}// CheckPluginScoreInfo relevant functionsfunc (cm *CheckMetadata) SetByPluginScore(checkedIp, pluginName string, weight float64, score int) {    cm.Lock()    defer cm.Unlock()    if _, existed := cm.CheckPluginScoreInfo[checkedIp]; !existed {        cm.CheckPluginScoreInfo[checkedIp] = make(map[string]float64)    }    cm.CheckPluginScoreInfo[checkedIp][pluginName] = float64(score) * weight}

CheckExecute 會對同區域每個節點執行 ping 探測(net.DialTimeout),如果失敗,則給該節點打 CheckScoreMin 分(0);否則,打 CheckScoreMax 分(100)

每種檢查外掛會有一個 Weight 引數,表示了該檢查外掛分數的權重值,所有權重引數之和應該為1,對應基準分數線 HealthCheckScoreLine 範圍0-100。因此這裡在設定分數時,會乘以權重。

回到 ExecuteCheck 函式,在呼叫各外掛執行健康檢查得出權重分數(CheckPluginScoreInfo)後,還需要將該分數與基準線 HealthCheckScoreLine 對比:如果高於(>=)分數線,則認為該節點本次檢查正常;否則異常。

func (ehd *EdgeHealthDaemon) ExecuteCheck() {    util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {        ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)    })    klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)    for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {        totalScore := 0.0        for _, score := range pluginScores {            totalScore += score        }        if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})        } else {            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})        }    }    klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)}

3、Commun

在對同區域各邊緣節點執行健康檢查後,需要將檢查的結果傳遞給其它各節點,這也就是 commun 模組負責的事情:

func (ehd *EdgeHealthDaemon) Run(stopCh <-chan struct{}) {    // Execute edge health prepare and check    ehd.PrepareAndCheck(stopCh)    // Execute vote    vote := vote.NewVoteEdge(&ehd.cfg.Vote)    go vote.Vote(ehd.metadata, ehd.cfg.Kubeclient, ehd.cfg.Node.LocalIp, stopCh)    // Execute communication    communEdge := commun.NewCommunEdge(&ehd.cfg.Commun)    communEdge.Commun(ehd.metadata.CheckMetadata, ehd.cmLister, ehd.cfg.Node.LocalIp, stopCh)    <-stopCh}

既然是互相傳遞結果給其它節點,則必然會有接受和傳送模組:

func (c *CommunEdge) Commun(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string, stopCh <-chan struct{}) {    go c.communReceive(checkMetadata, cmLister, stopCh)    wait.Until(func() {        c.communSend(checkMetadata, cmLister, localIp)    }, time.Duration(c.CommunPeriod)*time.Second, stopCh)}

其中 communSend 負責向其它節點發送本節點對它們的檢查結果;而 communReceive 負責接受其它邊緣節點的檢查結果。下面依次分析:

func (c *CommunEdge) communSend(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string) {    copyLocalCheckDetail := checkMetadata.CopyLocal(localIp)    var checkedIps []string    for checkedIp := range copyLocalCheckDetail {        checkedIps = append(checkedIps, checkedIp)    }    util.ParallelizeUntil(context.TODO(), 16, len(checkedIps), func(index int) {        // Only send commun information to other edge nodes(excluding itself)        dstIp := checkedIps[index]        if dstIp == localIp {            return        }        // Send commun information        communInfo := metadata.CommunInfo{SourceIP: localIp, CheckDetail: copyLocalCheckDetail}        if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {            log.Errorf("communSend: generateHmac err %+v", err)            return        } else {            communInfo.Hmac = hmac        }        commonInfoBytes, err := json.Marshal(communInfo)        if err != nil {            log.Errorf("communSend: json.Marshal commun info err %+v", err)            return        }        commonInfoReader := bytes.NewReader(commonInfoBytes)        for i := 0; i < c.CommunRetries; i++ {            req, err := http.NewRequest("PUT", "http://"+dstIp+":"+strconv.Itoa(c.CommunServerPort)+"/result", commonInfoReader)            if err != nil {                log.Errorf("communSend: NewRequest for remote edge node %s err %+v", dstIp, err)                continue            }            if err = util.DoRequestAndDiscard(c.client, req); err != nil {                log.Errorf("communSend: DoRequestAndDiscard for remote edge node %s err %+v", dstIp, err)            } else {                log.V(4).Infof("communSend: put commun info %+v to remote edge node %s successfully", communInfo, dstIp)                break            }        }    })}

傳送邏輯如下:

構建 CommunInfo 結構體,包括:SourceIP:表示執行檢查的ipCheckDetail:為 Checked ip:Check detail 組織形式,包含被檢查的ip以及檢查結果呼叫 GenerateHmac 構建 Hmac:實際上是以 kube-system 下的 hmac-config configmap hmackey 欄位為 key,對 SourceIP 以及 CheckDetail進行 hmac 得到,用於判斷傳輸資料的有效性(是否被篡改)
func GenerateHmac(communInfo metadata.CommunInfo, cmLister corelisters.ConfigMapLister) (string, error) {    addrBytes, err := json.Marshal(communInfo.SourceIP)    if err != nil {        return "", err    }    detailBytes, _ := json.Marshal(communInfo.CheckDetail)    if err != nil {        return "", err    }    hmacBefore := string(addrBytes) + string(detailBytes)    if hmacConf, err := cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.HmacConfig); err != nil {        return "", err    } else {        return GetHmacCode(hmacBefore, hmacConf.Data[common.HmacKey])    }}func GetHmacCode(s, key string) (string, error) {    h := hmac.New(sha256.New, []byte(key))    if _, err := io.WriteString(h, s); err != nil {        return "", err    }    return fmt.Sprintf("%x", h.Sum(nil)), nil}
傳送上述構建的 CommunInfo 給其它邊緣節點 (DoRequestAndDiscard)

communReceive 邏輯也很清晰:

// TODO: support changeable server listen portfunc (c *CommunEdge) communReceive(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, stopCh <-chan struct{}) {    svr := &http.Server{Addr: ":" + strconv.Itoa(c.CommunServerPort)}    svr.ReadTimeout = time.Duration(c.CommunTimeout) * time.Second    svr.WriteTimeout = time.Duration(c.CommunTimeout) * time.Second    http.HandleFunc("/debug/flags/v", pkgutil.UpdateLogLevel)    http.HandleFunc("/result", func(w http.ResponseWriter, r *http.Request) {        var communInfo metadata.CommunInfo        if r.Body == nil {            http.Error(w, "Invalid commun information", http.StatusBadRequest)            return        }        err := json.NewDecoder(r.Body).Decode(&communInfo)        if err != nil {            http.Error(w, fmt.Sprintf("Invalid commun information %+v", err), http.StatusBadRequest)            return        }        log.V(4).Infof("Received common information from %s : %+v", communInfo.SourceIP, communInfo.CheckDetail)        if _, err := io.WriteString(w, "Received!\n"); err != nil {            log.Errorf("communReceive: send response err %+v", err)            http.Error(w, fmt.Sprintf("Send response err %+v", err), http.StatusInternalServerError)            return        }        if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {            log.Errorf("communReceive: server GenerateHmac err %+v", err)            http.Error(w, fmt.Sprintf("GenerateHmac err %+v", err), http.StatusInternalServerError)            return        } else {            if hmac != communInfo.Hmac {                log.Errorf("communReceive: Hmac not equal, hmac is %s but received commun info hmac is %s", hmac, communInfo.Hmac)                http.Error(w, "Hmac not match", http.StatusForbidden)                return            }        }        log.V(4).Infof("communReceive: Hmac match")        checkMetadata.SetByCommunInfo(communInfo)        log.V(4).Infof("After communicate, check info is %+v", checkMetadata.CheckInfo)    })    go func() {        if err := svr.ListenAndServe(); err != http.ErrServerClosed {            log.Fatalf("Server: exit with error %+v", err)        }    }()    for {        select {        case <-stopCh:            ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)            defer cancel()            if err := svr.Shutdown(ctx); err != nil {                log.Errorf("Server: program exit, server exit error %+v", err)            }            return        default:        }    }}

負責接受其它邊緣節點的檢查結果,並寫入自身檢查結果 CheckInfo,流程如下:

透過 /result 路由接受請求,並將請求內容解析成 CommunInfo對 CommunInfo 執行 GenerateHmac 獲取hmac值,並與 CommunInfo.Hmac 欄位進行對比,檢查接受資料的有效性最後將 CommunInfo 檢查結果寫入 CheckInfo,注意:CheckDetail.Time 設定為寫 入時的時間
// CheckInfo relevant functionsfunc (cm *CheckMetadata) SetByCommunInfo(c CommunInfo) {  cm.Lock()  defer cm.Unlock()  if _, existed := cm.CheckInfo[c.SourceIP]; !existed {      cm.CheckInfo[c.SourceIP] = make(map[string]CheckDetail)  }  for k, detail := range c.CheckDetail {      // Update time to local timestamp since different machines have different ones      detail.Time = time.Now()      c.CheckDetail[k] = detail  }  cm.CheckInfo[c.SourceIP] = c.CheckDetail}
最後在接受到 stopCh 訊號時,透過 svr.Shutdown 平滑關閉服務。

4、Vote

在接受到其它節點的健康檢查結果後,vote 模組會對結果進行統計得出最終判決,並向 apiserver 報告:

func (v *VoteEdge) Vote(edgeHealthMetadata *metadata.EdgeHealthMetadata, kubeclient clientset.Interface,    localIp string, stopCh <-chan struct{}) {    go wait.Until(func() {        v.vote(edgeHealthMetadata, kubeclient, localIp, stopCh)    }, time.Duration(v.VotePeriod)*time.Second, stopCh)}

首先根據檢查結果統計出狀態正常以及異常的節點列表:

type votePair struct {    pros int    cons int}...var (    prosVoteIpList, consVoteIpList []string    // Init votePair since cannot assign to struct field voteCountMap[checkedIp].pros in map    vp votePair)voteCountMap := make(map[string]votePair) // {"127.0.0.1":{"pros":1,"cons":2}}copyCheckInfo := edgeHealthMetadata.CopyAll()// Note that voteThreshold should be calculated by checked instead of checker// since checked represents the total valid edge health nodes while checker may contain partly ones.voteThreshold := (edgeHealthMetadata.GetCheckedIpLen() + 1) / 2for _, checkedDetails := range copyCheckInfo {    for checkedIp, checkedDetail := range checkedDetails {        if !time.Now().After(checkedDetail.Time.Add(time.Duration(v.VoteTimeout) * time.Second)) {            if _, existed := voteCountMap[checkedIp]; !existed {                voteCountMap[checkedIp] = votePair{0, 0}            }            vp = voteCountMap[checkedIp]            if checkedDetail.Normal {                vp.pros++                if vp.pros >= voteThreshold {                    prosVoteIpList = append(prosVoteIpList, checkedIp)                }            } else {                vp.cons++                if vp.cons >= voteThreshold {                    consVoteIpList = append(consVoteIpList, checkedIp)                }            }            voteCountMap[checkedIp] = vp        }    }}log.V(4).Infof("Vote: voteCountMap is %+v", voteCountMap)...

其中狀態判斷的邏輯如下:

如果超過一半(>)的節點對該節點的檢查結果為正常,則認為該節點狀態正常(注意時間差在 VoteTimeout 內)如果超過一半(>)的節點對該節點的檢查結果為異常,則認為該節點狀態異常(注意時間差在 VoteTimeout 內)除開上述情況,認為節點狀態判斷無效,對這些節點不做任何處理(可能存在腦裂的情況)

對狀態正常的節點做如下處理:

...// Handle prosVoteIpListutil.ParallelizeUntil(context.TODO(), 16, len(prosVoteIpList), func(index int) {    if node := edgeHealthMetadata.GetNodeByAddr(prosVoteIpList[index]); node != nil {        log.V(4).Infof("Vote: vote pros to edge node %s begin ...", node.Name)        nodeCopy := node.DeepCopy()        needUpdated := false        if nodeCopy.Annotations == nil {            nodeCopy.Annotations = map[string]string{                common.NodeHealthAnnotation: common.NodeHealthAnnotationPros,            }            needUpdated = true        } else {            if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {                if healthy != common.NodeHealthAnnotationPros {                    nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros                    needUpdated = true                }            } else {                nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros                needUpdated = true            }        }        if index, existed := admissionutil.TaintExistsPosition(nodeCopy.Spec.Taints, common.UnreachableNoExecuteTaint); existed {            nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints[:index], nodeCopy.Spec.Taints[index+1:]...)            needUpdated = true        }        if needUpdated {            if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {                log.Errorf("Vote: update pros vote to edge node %s error %+v ", nodeCopy.Name, err)            } else {                log.V(2).Infof("Vote: update pros vote to edge node %s successfully", nodeCopy.Name)            }        }    } else {        log.Warningf("Vote: edge node addr %s not found", prosVoteIpList[index])    }})...
新增或者更新"superedgehealth/node-health" annotation 值為"true",表明分散式健康檢查判斷該節點狀態正常。如果node存在 NoExecute(node.kubernetes.io/unreachable) taint,則將其去掉,並更新 node.

而對狀態異常的節點會新增或者更新"superedgehealth/node-health" annotation值為"false",表明分散式健康檢查判斷該節點狀態異常:

// Handle consVoteIpListutil.ParallelizeUntil(context.TODO(), 16, len(consVoteIpList), func(index int) {    if node := edgeHealthMetadata.GetNodeByAddr(consVoteIpList[index]); node != nil {        log.V(4).Infof("Vote: vote cons to edge node %s begin ...", node.Name)        nodeCopy := node.DeepCopy()        needUpdated := false        if nodeCopy.Annotations == nil {            nodeCopy.Annotations = map[string]string{                common.NodeHealthAnnotation: common.NodeHealthAnnotationCons,            }            needUpdated = true        } else {            if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {                if healthy != common.NodeHealthAnnotationCons {                    nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons                    needUpdated = true                }            } else {                nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons                needUpdated = true            }        }        if needUpdated {            if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {                log.Errorf("Vote: update cons vote to edge node %s error %+v ", nodeCopy.Name, err)            } else {                log.V(2).Infof("Vote: update cons vote to edge node %s successfully", nodeCopy.Name)            }        }    } else {        log.Warningf("Vote: edge node addr %s not found", consVoteIpList[index])    }})

在邊端 edge-health-daemon 向 apiserver 傳送節點健康結果後,雲端執行 edge-health-admission( Kubernetes mutating admission webhook [1] ),會不斷根據 node edge-health annotation 調整 kube-controller-manager 設定的 node taint(去掉NoExecute taint) 以及 endpoints(將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現即便雲邊斷連,但是分散式健康檢查狀態正常的情況下:

失聯的節點上的 pod 不會從 Service 的 Endpoint 列表中移除失聯的節點上的 pod 不會被驅逐

總結分散式健康檢查對於雲邊斷連情況的處理區別原生Kubernetes如下:原生Kubernetes:失聯的節點被置為ConditionUnknown狀態,並被新增NoSchedule和NoExecute的taints失聯的節點上的pod被驅逐,並在其他節點上進行重建失聯的節點上的pod從Service的Endpoint列表中移除分散式健康檢查:分散式健康檢查主要透過如下三個層面增強節點狀態判斷的準確性:每個節點定期探測其他節點健康狀態叢集內所有節點定期投票決定各節點的狀態雲端和邊端節點共同決定節點狀態分散式健康檢查功能由邊端的edge-health-daemon以及雲端的edge-health-admission組成,功能分別如下:edge-health-daemon:對同區域邊緣節點執行分散式健康檢查,並向apiserver傳送健康狀態投票結果(給node打annotation),主體邏輯包括四部分功能:SyncNodeList:根據邊緣節點所在的zone重新整理node cache,同時更新CheckMetadata相關資料ExecuteCheck:對每個邊緣節點執行若干種類的健康檢查外掛(ping,kubelet等),並將各外掛檢查分數彙總,根據使用者設定的基準線得出節點是否健康的結果Commun:將本節點對其它各節點健康檢查的結果傳送給其它節點Vote:對所有節點健康檢查的結果分類,如果某個節點被大多數(>1/2)節點判定為正常,則對該節點新增superedgehealth/node-health:true annotation,表明該節點分散式健康檢查結果為正常;否則,對該節點新增superedgehealth/node-health:false annotation,表明該節點分散式健康檢查結果為異常。edge-health-admission( Kubernetes mutating admission webhook ):不斷根據node edge-health annotation調整kube-controller-manager設定的node taint(去掉NoExecute taint)以及endpoints(將失聯節點上的pods從endpoint subsets notReadyAddresses移到addresses中),從而實現雲端和邊端共同決定節點狀態。

作者:杜楊浩,騰訊雲高階工程師,熱衷於開源、容器和Kubernetes。目前主要從事映象倉庫、Kubernetes叢集高可用&備份還原,以及邊緣計算相關研發工作。

出處:https://mp.weixin.qq.com/s?__biz=Mzg5NjA1MjkxNw==&mid=2247493468&idx=1&sn=95dd5eab1158ee3b6bb232b03b8f7688

14
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Go 去找個物件吧