在 Kubernetes 中通過 Ingress 來暴露服務到叢集外部,這個已經是一個很普遍的方式了,而真正扮演請求轉發的角色是背後的 Ingress Controller,比如我們經常使用的 traefik、ingress-nginx 等就是一個 Ingress Controller。本文我們將通過 golang 來實現一個簡單的自定義的 Ingress Controller,可以加深我們對 Ingress 的理解。
概述
我們在 Kubernetes 叢集上往往會執行很多無狀態的 Web 應用,一般來說這些應用是通過一個 Deployment 和一個對應的 Service 組成,比如我們在叢集上執行一個 whoami 的應用,對應的資源清單如下所示:(whoami.yaml)
apiVersion: apps/v1kind: Deploymentmetadata: name: whoami labels: app: whoamispec: replicas: 1 selector: matchLabels: app: whoami template: metadata: labels: app: whoami spec: containers: - name: whoami image: cnych/whoami ports: - containerPort: 80---kind: ServiceapiVersion: v1metadata: name: whoamispec: selector: app: whoami ports: - protocol: TCP port: 80 targetPort: 80
可以直接使用上面的資源清單部署該應用:
$ kubectl apply -f whoami.yaml
通過部署該應用,在 Kubernetes 叢集內部我們可以通過地址 whoami.default.svc.cluster.local 來訪問該 Web 應用,但是在叢集外部的使用者應該如何來訪問呢?當然我們可以使用 NodePort 型別的 Service 來進行訪問,但是當我們應用越來越多的時候埠的管理也是一個很大的問題,所以一般情況下不採用該方式,之前我們的方法是用 DaemonSet 在每個邊緣節點上執行一個 Nginx 應用:
spec: hostNetwork: true containers: - image: nginx:1.15.3-alpine name: nginx ports: - name: http containerPort: 80 hostPort: 80
通過設定 hostNetwork:true,容器將繫結節點的80埠,而不僅僅是容器,這樣我們就可以通過節點的公共 IP 地址的 80 埠訪問到 Nginx 應用了。這種方法理論上肯定是有效的,但是有一個最大的問題就是需要建立一個 Nginx 配置檔案,如果應用有變更,還需要手動修改配置,不能自動發現和熱更新,這對於大量的應用維護的成本顯然太大。這個時候我們就可以用另外一個 Kubernetes 提供的方案了:Ingress。
Ingress 物件
Kubernetes 內建就支援通過 Ingress 物件將外部的域名對映到叢集內部服務,我們可以通過如下的 Ingress 物件來對外暴露服務:
apiVersion: extensions/v1beta1kind: Ingressmetadata: name: whoamispec: tls: - hosts: - "*.qikqiak.com" secretName: qikqiak-tls rules: - host: who.qikqiak.com http: paths: - path: / backend: serviceName: whoami servicePort: 80
該資源清單聲明了如何將 HTTP 請求路由到後端服務:
任何到域名 who.qikqiak.com 的請求都將被路由到 whoami 服務後面的 Pod 列表中去。如果是 HTTPS 請求,並且域名匹配 *.qikqiak.com,則對請求使用 qikqiak-tls 這個證書。這個配置顯然比我們取手動維護 Nginx 的配置要方便太多了,完全就是自動化的。
Ingress Controllers
上面我們宣告的 Ingress 物件,只是一個叢集的資源物件而已,並不會去真正處理我們的請求,這個時候我們還必須安裝一個 Ingress Controller,該控制器負責讀取 Ingress 物件的規則並進行真正的請求處理,簡單來說就是 Ingress 物件只是一個宣告,Ingress Controllers 就是真正的實現。
對於 Ingress Controller 有很多種選擇,比如我們前面文章大量提到的 traefik、或者 ingress-nginx 等等,我們可以根據自己的需求選擇合適的 Ingress Controller 安裝即可。
但是實際上,自定義一個 Ingress Controller 也是非常簡單的(當然要支援各種請求特性就需要大量的工作了)。
自定義 Ingress Controller
這裡我們將用 Golang 來自定義一個簡單的 Ingress Controller,自定義的控制器主要需要實現以下幾個功能:
通過 Kubernetes API 查詢和監聽 Service、Ingress 以及 Secret 這些物件載入 TLS 證書用於 HTTPS 請求根據載入的 Kubernetes 資料構造一個用於 HTTP 服務的路由,當然該路由需要非常高效,因為所有傳入的流量都將通過該路由在 80 和 443 埠上監聽傳入的 HTTP 請求,然後根據路由查詢對應的後端服務,然後代理請求和響應。443 埠將使用 TLS 證書進行安全連線。下面我們將來依次介紹上面的實現。
Kubernetes 物件查詢
我們可以通過一個 rest 配置然後呼叫 NewForConfig 來建立一個 Kubernetes 客戶端,由於我們要通過叢集內部的 Service 進行服務的訪問,所以不能在叢集外部使用,所以不能使用 kubeconfig 的方式來獲取 Config:
config, err := rest.InClusterConfig()if err != nil { log.Fatal().Err(err).Msg("get kubernetes configuration failed")}client, err := kubernetes.NewForConfig(config)if err != nil { log.Fatal().Err(er).Msg("create kubernetes client failed")}
然後我們建立一個 Watcher 和 Payload,Watcher 是來負責查詢 Kubernetes 和建立 Payloads 的,Payloads 包含了滿足 HTTP 請求所需要的所有的 Kubernetes 資料:
// 通過 Watcher 載入的 Kubernetes 的資料集合。type Payload struct { Ingresses []IngressPayload TLSCertificates map[string]*tls.Certificate}// 一個 IngressPayload 是一個 Ingress 加上他的服務埠。type IngressPayload struct { Ingress *extensionsv1beta1.Ingress ServicePorts map[string]map[string]int}
另外需要注意除了埠外,Ingress 還可以通過埠名稱來引用後端服務的埠,所以我們可以通過查詢相應的 Service 來填充該資料。
Watcher 主要用來監聽 Ingress、Service、Secret 的變化:
// 在 Kubernetes 叢集中監聽 Ingress 物件的 Watchertype Watcher struct { client kubernetes.Interface onChange func(*Payload)}
只要我們檢測到某些變化,就會呼叫 onChange 函式。為了實現上面的監聽功能,我們需要使用 k8s.io/client-go/informers 這個包,該包提供了一種型別安全、高效的機制來查詢、list 和 watch Kubernetes 物件,我們只需要為需要的每個物件建立一個 SharedInformerFactory 以及 Listers 即可:
func (w *Watcher) Run(ctx context.Context) error { factory := informers.NewSharedInformerFactory(w.client, time.Minute) secretLister := factory.Core().V1().Secrets().Lister() serviceLister := factory.Core().V1().Services().Lister() ingressLister := factory.Extensions().V1beta1().Ingresses().Lister() ...}
然後定義一個 onChange 的本地函式,該函式在檢測到變更時隨時呼叫。我們這裡在每種型別的變更時每次都從頭開始重新構建所有的內容,暫時還未考慮效能問題。因為 Watcher 和 HTTP 處理程式都在不同的 goroutine 中執行,所以我們基本上可以構建一個有效的負載,而不會影響任何正在進行的請求,當然這是一種簡單粗暴的做法。
我們可以通過從 listing ingresses 物件開始:
ingresses, err := ingressLister.List(labels.Everything())if err != nil { log.Error().Err(err).Msg("failed to list ingresses") return}
對於每個 ingress 物件,如果有 TLS 規則,則從 secrets 物件中載入證書:
for _, rec := range ingress.Spec.TLS { if rec.SecretName != "" { secret, err := secretLister.Secrets(ingress.Namespace).Get(rec.SecretName) if err != nil { log.Error().Err(err).Str("namespace", ingress.Namespace).Str("name", rec.SecretName).Msg("unknown secret") continue } cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"]) if err != nil { log.Error().Err(err).Str("namespace", ingress.Namespace).Str("name", rec.SecretName).Msg("invalid tls certificate") continue } payload.TLSCertificates[rec.SecretName] = &cert }}
Go 語言已經內建了一些和加密相關的包,可以很簡單的處理 TLS 證書,對於實際的 HTTP 規則,這裡我們添加了一個 addBackend 的輔助函式:
addBackend := func(ingressPayload *IngressPayload, backend extensionsv1beta1.IngressBackend) { svc, err := serviceLister.Services(ingressPayload.Ingress.Namespace).Get(backend.ServiceName) if err != nil { log.Error().Err(err).Str("namespace", ingressPayload.Ingress.Namespace).Str("name", backend.ServiceName).Msg("unknown service") } else { m := make(map[string]int) for _, port := range svc.Spec.Ports { m[port.Name] = int(port.Port) } ingressPayload.ServicePorts[svc.Name] = m }}
每個 HTTP 規則和可選的預設規則都會呼叫該方法:
if ingress.Spec.Backend != nil { addBackend(&ingressPayload, *ingress.Spec.Backend)}for _, rule := range ingress.Spec.Rules { if rule.HTTP != nil { continue } for _, path := range rule.HTTP.Paths { addBackend(&ingressPayload, path.Backend) }}
然後呼叫 onChange 回撥:
w.onChange(payload)
每當發生更改時,都會呼叫本地 onChange 函式,最後一步就是啟動我們的 informers:
var wg sync.WaitGroupwg.Add(1)go func() { informer := factory.Core().V1().Secrets().Informer() informer.AddEventHandler(handler) informer.Run(ctx.Done()) wg.Done()}()wg.Add(1)go func() { informer := factory.Extensions().V1beta1().Ingresses().Informer() informer.AddEventHandler(handler) informer.Run(ctx.Done()) wg.Done()}()wg.Add(1)go func() { informer := factory.Core().V1().Services().Informer() informer.AddEventHandler(handler) informer.Run(ctx.Done()) wg.Done()}()wg.Wait()
我們這裡每個 informer 都使用同一個 handler:
debounced := debounce.New(time.Second)handler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { debounced(onChange) }, UpdateFunc: func(oldObj, newObj interface{}) { debounced(onChange) }, DeleteFunc: func(obj interface{}) { debounced(onChange) },}
Debouncing(防抖動) 是一種避免事件重複的方法,我們設定一個小的延遲,如果在達到延遲之前發生了其他事件,則重啟計時器。
路由表
路由表的目標是通過預先計算大部分查詢相關資訊來提高查詢效率,這裡我們就需要使用一些高效的資料結構來進行儲存,由於在叢集中有大量的路由規則,所以要實現對映查詢既高效又容易理解的最簡單的方法我們能想到的就是使用 Map,Map 可以為我們提供 O(1) 效率的查詢,我們這裡使用 Map 進行初始化查詢,如果在後面找到了多個規則,則使用切片來儲存這些規則。
一個路由表由兩個 Map 構成,一個是根據域名對映的證書,一個就是根據域名對映的後端路由表:
type RoutingTable struct { certificatesByHost map[string]map[string]*tls.Certificate backendsByHost map[string][]routingTableBackend}// NewRoutingTable 建立一個新的路由表func NewRoutingTable(payload *watcher.Payload) *RoutingTable { rt := &RoutingTable{ certificatesByHost: make(map[string]map[string]*tls.Certificate), backendsByHost: make(map[string][]routingTableBackend), } rt.init(payload) return rt}
此外路由表下面還有兩個主要的方法:
// GetCertificate 獲得一個證書func (rt *RoutingTable) GetCertificate(sni string) (*tls.Certificate, error) { hostCerts, ok := rt.certificatesByHost[sni] if ok { for h, cert := range hostCerts { if rt.matches(sni, h) { return cert, nil } } } return nil, errors.New("certificate not found")}// GetBackend 通過給定的 host 和 path 獲取後端程式func (rt *RoutingTable) GetBackend(host, path string) (*url.URL, error) { // strip the port if idx := strings.IndexByte(host, ':'); idx > 0 { host = host[:idx] } backends := rt.backendsByHost[host] for _, backend := range backends { if backend.matches(path) { return backend.url, nil } } return nil, errors.New("backend not found")}
其中 GetCertificate 來獲取用於安全連線的 TLS 證書。HTTP 處理程式使用 GetBackend 將請求代理到後端,對於 TLS 證書,我們還有一個 matches 方法來處理萬用字元證書:
func (rt *RoutingTable) matches(sni string, certHost string) bool { for strings.HasPrefix(certHost, "*.") { if idx := strings.IndexByte(sni, '.'); idx >= 0 { sni = sni[idx+1:] } else { return false } certHost = certHost[2:] } return sni == certHost}
其實對於後端應用來說,matches 方法實際上就是一個正則表示式匹配(因為 Ingress 物件的 path 欄位定義的是一個正則表示式):
type routingTableBackend struct { pathRE *regexp.Regexp url *url.URL}func (rtb routingTableBackend) matches(path string) bool { if rtb.pathRE == nil { return true } return rtb.pathRE.MatchString(path)}
HTTP Server
最後我們需要來實現一個 HTTP Server,用來接收網路入口的請求。首先定義一個私有的 config 結構體:
type config struct { host string port int tlsPort int}
定義一個 Option 型別:
// config 的修改器type Option func(*config)
定義一個設定 Option 的函式:
// WithHost 設定 host 繫結到 config 上。func WithHost(host string) Option { return func(cfg *config) { cfg.host = host }}
服務的結構體和構造器如下所示:
// 代理 HTTP 請求type Server struct { cfg *config routingTable atomic.Value ready *Event}// New 建立一個新的服務func New(options ...Option) *Server { cfg := defaultConfig() for _, o := range options { o(cfg) } s := &Server{ cfg: cfg, ready: NewEvent(), } s.routingTable.Store(NewRoutingTable(nil)) return s}
通過使用一個合適的預設值,上面的初始化方法可以使大多數客戶端使用變得非常容易,同時還可以根據需要進行靈活的更改,這種 API 方法在 Go 語言中是非常普遍的,有很多實際示例,比如 gRPC 的 Dail 方法。
除了配置之外,我們的伺服器還有指向路由表的指標和一個就緒的事件,用於在第一次設定 payload 時發出訊號。但是需要注意的是,我們這裡使用的是 atomic.Value 來儲存路由表,這是為什麼呢?
由於這裡我們的應用不是執行緒安全的,如果在 HTTP 處理程式嘗試讀取路由表的同時對其進行了修改,則可能導致狀態錯亂或者程式崩潰。所以,我們需要防止同時讀取和寫入這個共享的資料結構,當然有多種方法可以實現該需求:
第一種就是我們這裡使用的 atomic.Value,該型別提供了一個 Load 和 Store 的方法,可以允許我們自動讀取/寫入該值。由於我們在每次更改時都會重新構建路由表,所以我們可以在一次操作中安全地交換新舊路由表,這和文件中的 ReadMostly 示例非常相似:不過這種方法的一個缺點是必須在執行時宣告儲存的值型別:
s.routingTable.Load().(*RoutingTable).GetBackend(r.Host, r.URL.Path)另外我們也可以使用 Mutext 或 RWMutex 來控制對關鍵區域程式碼的訪問:
// 讀s.mu.RLock()backendURL, err := s.routingTable.GetBackend(r.Host, r.URL.Path)s.mu.RUnlock()// 寫rt := NewRoutingTable(payload)s.mu.Lock()s.routingTable = rts.mu.Unlock()還有一種方法就是讓路由表本身成為執行緒安全的,使用 sync.Map 來代替 Map 並新增方法來動態更新路由表。一般來說,我會避免使用這種方法,它使程式碼更難於理解和維護了,而且如果你實際上最終沒有多個 goroutine 訪問資料結構的話,就會增加不必要的開銷了。
真正的處理服務的 ServeHTTP 方法如下所示:
// ServeHTTP 處理 HTTP 請求func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 根據請求的域名和 Path 路徑獲取背後真實的後端地址 backendURL, err := s.routingTable.Load().(*RoutingTable).GetBackend(r.Host, r.URL.Path) if err != nil { http.Error(w, "upstream server not found", http.StatusNotFound) return } log.Info().Str("host", r.Host).Str("path", r.URL.Path).Str("backend", backendURL.String()).Msg("proxying request") // 對後端真實 URL 發起代理請求 p := httputil.NewSingleHostReverseProxy(backendURL) p.ErrorLog = stdlog.New(log.Logger, "", 0) p.ServeHTTP(w, r)}
這裡我們使用了 httputil 這個包,該包具有反向代理的一些實現方法,我們可以將其用於 HTTP 服務,它可以將請求轉發到指定的 URL 上,然後將響應傳送回客戶端。
Main 函式
將所有元件組合在一起,然後通過 main 方法提供入口,我們這裡使用 flag 包來提供一些命令列引數:
func main() { flag.StringVar(&host, "host", "0.0.0.0", "the host to bind") flag.IntVar(&port, "port", 80, "the insecure http port") flag.IntVar(&tlsPort, "tls-port", 443, "the secure https port") flag.Parse() client, err := kubernetes.NewForConfig(getKubernetesConfig()) if err != nil { log.Fatal().Err(err).Msg("failed to create kubernetes client") } s := server.New(server.WithHost(host), server.WithPort(port), server.WithTLSPort(tlsPort)) w := watcher.New(client, func(payload *watcher.Payload) { s.Update(payload) }) var eg errgroup.Group eg.Go(func() error { return s.Run(context.TODO()) }) eg.Go(func() error { return w.Run(context.TODO()) }) if err := eg.Wait(); err != nil { log.Fatal().Err(err).Send() }}
Kubernetes 配置
有了伺服器程式碼,現在我們就可以在 Kubernetes 上用 DaemonSet 控制器來執行我們的 Ingress Controller:(k8s-ingress-controller.yaml)
apiVersion: v1kind: ServiceAccountmetadata: name: k8s-simple-ingress-controller namespace: default---kind: ClusterRoleapiVersion: rbac.authorization.k8s.io/v1beta1metadata: name: k8s-simple-ingress-controllerrules: - apiGroups: - "" resources: - services - endpoints - secrets verbs: - get - list - watch - apiGroups: - extensions resources: - ingresses verbs: - get - list - watch---kind: ClusterRoleBindingapiVersion: rbac.authorization.k8s.io/v1beta1metadata: name: k8s-simple-ingress-controllerroleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: k8s-simple-ingress-controllersubjects:- kind: ServiceAccount name: k8s-simple-ingress-controller namespace: default---apiVersion: extensions/v1beta1kind: DaemonSetmetadata: name: k8s-simple-ingress-controller labels: app: ingress-controllerspec: selector: matchLabels: app: ingress-controller template: metadata: labels: app: ingress-controller spec: hostNetwork: true dnsPolicy: ClusterFirstWithHostNet serviceAccountName: k8s-simple-ingress-controller containers: - name: k8s-simple-ingress-controller image: cnych/k8s-simple-ingress-controller:v0.1 ports: - name: http containerPort: 80 - name: https containerPort: 443
由於我們要在應用中監聽 Ingress、Service、Secret 這些資源物件,所以需要宣告對應的 RBAC 許可權,這樣當我們的請求到達 Ingress Controller 的節點後,然後根據 Ingress 物件的規則,將請求轉發到對應的 Service 上就完成了服務暴露的整個過程。
直接建立上面我們自定義的 Ingress Controller 的資源清單:
$ kubectl apply -f k8s-ingress-controller.yaml$ kubectl get pods -l app=ingress-controllerNAME READY STATUS RESTARTS AGEk8s-simple-ingress-controller-694df987c7-h2qlc 1/1 Running 0 7m59s
然後為我們最開始的 whoami 服務建立一個 Ingress 物件:(whoami-ingress.yaml)
apiVersion: extensions/v1beta1kind: Ingressmetadata: name: whoamispec: rules: - host: who.qikqiak.com http: paths: - path: / backend: serviceName: whoami servicePort: 80
$ kubectl apply -f whoami-ingress.yaml
後將域名 who.qikqiak.com 解析到我們部署的 Ingress Controller 的 Pod 節點上,就可以直接訪問了:
k8s simple ingress controller demo
$ kubectl logs -f k8s-simple-ingress-controller-694df987c7-h2qlc5:37AM INF starting secure HTTP server addr=0.0.0.0:4435:37AM INF starting insecure HTTP server addr=0.0.0.0:805:39AM INF proxying request backend=http://whoami:80 host=who.qikqiak.com path=/
到這裡我們就完成了自定義一個簡單的 Ingress Controller,當然這只是一個最基礎的功能,在實際使用中還會有更多的需求,比如 TCP 的支援、對請求進行一些修改之類的,這就需要花更多的時間去實現了。
本文相關程式碼都整理到了 GitHub 上,地址:https://github.com/cnych/kubernetes-simple-ingress-controller。
參考連結
https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/http://www.doxsey.net/blog/how-to-build-a-custom-kubernetes-ingress-controller-in-go