首頁>技術>

一. Nacos介紹

再講Nacos之前,先來講一下服務註冊和發現。我們知道,現在微服務架構是目前開發的一個趨勢。服務消費者要去呼叫多個服務提供者組成的叢集。這裡需要做到以下幾點:

服務消費者需要在本地配置檔案中維護服務提供者叢集的每個節點的請求地址。服務提供者叢集中如果某個節點宕機,服務消費者的本地配置中需要同步刪除這個節點的請求地址,防止請求傳送到已經宕機的節點上造成請求失敗。

因此需要引入服務註冊中心,它具有以下幾個功能:

服務地址的管理。服務註冊。服務動態感知。

而Nacos致力於解決微服務中的統一配置,服務註冊和發現等問題。Nacos集成了註冊中心和配置中心。其相關特性包括:

服務發現和服務健康監測。

Nacos支援基於DNS和RPC的服務發現,即服務消費者可以使用DNS或者HTTP的方式來查詢和發現服務。Nacos提供對服務的實時的健康檢查,阻止向不健康的主機或者服務例項傳送請求。Nacos支援傳輸層(Ping/TCP)、應用層(HTTP、Mysql)的健康檢查。

動態配置服務。

動態配置服務可以以中心化、外部化和動態化的方式管理所有環境的應用配置和服務配置。

動態DNS服務。

支援權重路由,讓開發者更容易的實現中間層的負載均衡、更靈活的路由策略、流量控制以及DNS解析服務。

服務和元資料管理。

Nacos允許開發者從微服務平臺建設的視角來管理資料中心的所有服務和元資料。如:服務的生命週期、靜態依賴分析、服務的健康狀態、服務的流量管理、路由和安全策略等。

二. Nacos註冊中心實現原理分析2.1 Nacos架構圖

以下是Nacos的架構圖:

其中分為這麼幾個模組:

Provider APP:服務提供者。Consumer APP:服務消費者。Name Server:透過Virtual IP或者DNS的方式實現Nacos高可用叢集的服務路由。Nacos Server:Nacos服務提供者。

其中包含:OpenAPI:功能訪問入口。Config Service、Naming Service:Nacos提供的配置服務、名字服務模組。Consistency Protocol:一致性協議,用來實現Nacos叢集節點的資料同步,使用Raft演算法實現。

Nacos Console:Nacos控制檯。

小總結:

服務提供者透過VIP(Virtual IP)訪問Nacos Server高可用叢集,基於OpenAPI完成服務的註冊和服務的查詢。Nacos Server的底層則透過資料一致性演算法(Raft)來完成節點的資料同步。2.2 註冊中心的原理

這裡對其原理做一個大致的介紹,在後文則從原始碼角度進行分析。

首先,服務註冊的功能體現在:

服務例項啟動時註冊到服務登錄檔、關閉時則登出(服務註冊)。服務消費者可以透過查詢服務登錄檔來獲得可用的例項(服務發現)。服務註冊中心需要呼叫服務例項的健康檢查API來驗證其是否可以正確的處理請求(健康檢查)。

Nacos服務註冊和發現的實現原理的圖如下:

三. Nacos原始碼分析

前提(在本地或者虛機上先啟動好Nacos)這一部分從2個角度來講Nacos是如何實現的:

服務註冊。服務發現3.1 Nacos服務註冊

首先看下一個包:spring-cloud-commons

這個ServiceRegistry介面是SpringCloud提供的服務註冊的標準,整合到SpringCloud中實現服務註冊的元件,都需要實現這個介面。 來看下它的結構:

public interface ServiceRegistry<R extends Registration> {    void register(R registration);    void deregister(R registration);    void close();    void setStatus(R registration, String status);    <T> T getStatus(R registration);}1234567891011

那麼對於Nacos而言,該介面的實現類是NacosServiceRegistry,該類在這個pom包下:

再回過頭來看spring-cloud-commons包:

spring.factories主要是包含了自動裝配的配置資訊,如圖:

在我之前的文章裡我有提到過,在spring.factories中配置EnableAutoConfiguration的內容後,專案在啟動的時候,會匯入相應的自動配置類,那麼也就允許對該類的相關屬性進行一個自動裝配。那麼顯然,在這裡匯入了AutoServiceRegistrationAutoConfiguration這個類,而這個類顧名思義是服務註冊相關的配置類

該類的完整程式碼如下:

@Configuration(    proxyBeanMethods = false)@Import({AutoServiceRegistrationConfiguration.class})@ConditionalOnProperty(    value = {"spring.cloud.service-registry.auto-registration.enabled"},    matchIfMissing = true)public class AutoServiceRegistrationAutoConfiguration {    @Autowired(        required = false    )    private AutoServiceRegistration autoServiceRegistration;    @Autowired    private AutoServiceRegistrationProperties properties;    public AutoServiceRegistrationAutoConfiguration() {    }    @PostConstruct    protected void init() {        if (this.autoServiceRegistration == null && this.properties.isFailFast()) {            throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");        }    }}1234567891011121314151617181920212223242526

這裡做一個分析,AutoServiceRegistrationAutoConfiguration中注入了AutoServiceRegistration例項,該類的關係圖如下:

我們先來看一下這個抽象類AbstractAutoServiceRegistration:

public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {	public void onApplicationEvent(WebServerInitializedEvent event) {	    this.bind(event);	}}1234567

這裡實現了ApplicationListener介面,並且傳入了WebServerInitializedEvent作為泛型,啥意思嘞,意思是:

NacosAutoServiceRegistration監聽WebServerInitializedEvent事件。也就是WebServer初始化完成後,會呼叫對應的事件繫結方法,呼叫onApplicationEvent(),該方法最終呼叫NacosServiceRegistry的register()方法(NacosServiceRegistry實現了Spring的一個服務註冊標準介面)。

對於register()方法,主要呼叫的是Nacos Client SDK中的NamingService下的registerInstance()方法完成服務的註冊

public void register(Registration registration) {    if (StringUtils.isEmpty(registration.getServiceId())) {        log.warn("No service to register for nacos client...");    } else {        String serviceId = registration.getServiceId();        String group = this.nacosDiscoveryProperties.getGroup();        Instance instance = this.getNacosInstanceFromRegistration(registration);        try {            this.namingService.registerInstance(serviceId, group, instance);            log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});        } catch (Exception var6) {            log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6});            ReflectionUtils.rethrowRuntimeException(var6);        }    }}public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {    if (instance.isEphemeral()) {        BeatInfo beatInfo = new BeatInfo();        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));        beatInfo.setIp(instance.getIp());        beatInfo.setPort(instance.getPort());        beatInfo.setCluster(instance.getClusterName());        beatInfo.setWeight(instance.getWeight());        beatInfo.setMetadata(instance.getMetadata());        beatInfo.setScheduled(false);        long instanceInterval = instance.getInstanceHeartBeatInterval();        beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);        // 1.addBeatInfo()負責建立心跳資訊實現健康監測。因為Nacos Server必須要確保註冊的服務例項是健康的。        // 而心跳監測就是服務健康監測的一種手段。        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);    }	// 2.registerService()實現服務的註冊    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);}1234567891011121314151617181920212223242526272829303132333435363738

再來看一下心跳監測的方法addBeatInfo():

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);    String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());    BeatInfo existBeat = null;    if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {        existBeat.setStopped(true);    }    this.dom2Beat.put(key, beatInfo);    // 透過schedule()方法,定時的向服務端傳送一個數據包,然後啟動一個執行緒不斷地檢測服務端的迴應。    // 如果在指定的時間內沒有收到服務端的迴應,那麼認為伺服器出現了故障。    // 引數1:可以說是這個例項的相關資訊。    // 引數2:一個long型別的時間,代表從現在開始推遲執行的時間,預設是5000    // 引數3:時間的單位,預設是毫秒,結合5000即代表每5秒傳送一次心跳資料包    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());}1234567891011121314151617

心跳檢查如果正常,即代表這個需要註冊的服務是健康的,那麼執行下面的註冊方法registerInstance():

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {    LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});    Map<String, String> params = new HashMap(9);    params.put("namespaceId", this.namespaceId);    params.put("serviceName", serviceName);    params.put("groupName", groupName);    params.put("clusterName", instance.getClusterName());    params.put("ip", instance.getIp());    params.put("port", String.valueOf(instance.getPort()));    params.put("weight", String.valueOf(instance.getWeight()));    params.put("enable", String.valueOf(instance.isEnabled()));    params.put("healthy", String.valueOf(instance.isHealthy()));    params.put("ephemeral", String.valueOf(instance.isEphemeral()));    params.put("metadata", JSON.toJSONString(instance.getMetadata()));    // 這裡可以看出來,把上述服務例項的一些必要引數儲存到一個Map中,透過OpenAPI的方式傳送註冊請求    this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");}1234567891011121314151617

下面直接Debug走一遍。兩個前提(這裡不再展開):

啟動一個Nacos服務。搞一個Maven專案,整合Nacos。案例1:用Debug來理解Nacos服務註冊流程

1.專案初始化後,根據上文說法,會執行抽象類AbstractAutoServiceRegistration下面的onApplicationEvent()方法,即事件被監聽到。

2.作為抽象類的子類實現NacosAutoServiceRegistration,監聽到Web服務啟動後, 開始執行super.register()方法。

3.執行NacosServiceRegistry下的register()方法(super),前面說過,整合到SpringCloud中實現服務註冊的元件,都需要實現ServiceRegistry這個介面,而對於Nacos而言,NacosServiceRegistry就是具體的實現子類。執行註冊方法需要傳入的三個引數:

例項名稱serviceId。例項歸屬的組。具體例項

而registerInstance()主要做兩件事:

檢查服務的健康(this.beatReactor.addBeatInfo())。執行服務的註冊(this.serverProxy.registerService())。

服務健康的檢查:

檢查通過後,傳送OpenAPI進行服務的註冊:

服務註冊小總結☆:

這裡來做一個大框架式的梳理(也許前面寫的有點亂,這裡通過幾個問答的形式來進行總結)

問題1:Nacos的服務註冊為什麼和spring-cloud-commons這個包扯上關係?

回答:1.首先,Nacos的服務註冊肯定少不了pom包:spring-cloud-starter-alibaba-nacos-discovery吧。2.這個包下面包括了spring-cloud-commons包,那麼這個包有什麼用?3.spring-cloud-commons中有一個介面叫做ServiceRegistry,而整合到SpringCloud中實現服務註冊的元件,都需要實現這個介面。4.因此對於需要註冊到Nacos上的服務,也需要實現這個介面,那麼具體的實現子類為NacosServiceRegistry。

問題2:為什麼我的專案加了這幾個依賴,服務啟動時依舊沒有註冊到Nacos中?

回答:1.本文提到過,進行Nacos服務註冊的時候,會有一個事件的監聽過程,而監聽的物件是WebServer,因此,這個專案需要是一個Web專案!2.因此檢視你的pom檔案中是否有依賴:spring-boot-starter-web。

問題3:除此之外,spring-cloud-commons這個包還有什麼作用?

回答:1.這個包下的spring.factories檔案中,配置了相關的服務註冊的置類,即支援其自動裝配。2.這個配置類叫做AutoServiceRegistrationAutoConfiguration。其注入了類AutoServiceRegistration,而NacosAutoServiceRegistration是該類的一個具體實現。3.當WebServer初始化的時候,透過繫結的事件監聽器,會實現監聽,執行服務的註冊邏輯。

說白了:

第一件事情:引入一個Spring監聽器,當容器初始化後,執行Nacos服務的註冊。第二件事情:而Nacos服務註冊的方法的實現,其需要實現的介面來自於該包下的ServiceRegistry介面。

接下來就對Nacos註冊的流程進行一個總結:

服務(專案)啟動時,根據spring-cloud-commons中spring.factories的配置,自動裝配了類AutoServiceRegistrationAutoConfiguration。AutoServiceRegistrationAutoConfiguration類中注入了類AutoServiceRegistration,其最終實現子類實現了Spring的監聽器。根據監聽器,執行了服務註冊方法。而這個服務註冊方法則是呼叫了NacosServiceRegistry的register()方法。該方法主要呼叫的是Nacos Client SDK中的NamingService下的registerInstance()方法完成服務的註冊。registerInstance()方法主要做兩件事:服務例項的健康監測和例項的註冊。透過schedule()方法定時的傳送資料包,檢測例項的健康。若健康監測透過,呼叫registerService()方法,透過OpenAPI方式執行服務註冊,其中將例項Instance的相關資訊儲存到HashMap中。3.2 Nacos服務發現

有一點我們需要清楚:Nacos服務的發現發生在什麼時候。例如:微服務發生遠端介面呼叫的時候。一般我們在使用OpenFeign進行遠端介面呼叫時,都需要用到對應的微服務名稱,而這個名稱就是用來進行服務發現的。

舉個例子:

@FeignClient("test-application")public interface MyFeignService {    @RequestMapping("getInfoById")    R info(@PathVariable("id") Long id);}12345

接下來直接開始講重點,Nacos在進行服務發現的時候,會呼叫NacosServerList類下的getServers()方法:

public class NacosServerList extends AbstractServerList<NacosServer> {	private List<NacosServer> getServers() {        try {            String group = this.discoveryProperties.getGroup();            // 1.透過唯一的serviceId(一般是服務名稱)和組來獲得對應的所有例項。            List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(this.serviceId, group, true);            // 2.將List<Instance>轉換成List<NacosServer>資料,然後返回。            return this.instancesToServerList(instances);        } catch (Exception var3) {            throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + this.serviceId, var3);        }    }}12345678910111213

接下來來看一下NacosNamingService.selectInstances()方法:

public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {   return this.selectInstances(serviceName, groupName, healthy, true);}123

該方法最終會呼叫到其過載方法

public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, 		boolean healthy, boolean subscribe) throws NacosException {	// 儲存服務例項資訊的物件    ServiceInfo serviceInfo;    // 如果該消費者訂閱了這個服務,那麼會在本地維護一個服務列表,服務從本地獲取    if (subscribe) {        serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));    } else {    // 否則例項會從服務中心進行獲取。        serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));    }    return this.selectInstances(serviceInfo, healthy);}1234567891011121314

這裡應該重點關注this.hostReactor這個物件,它裡面比較重要的是幾個Map型別的儲存結構:

public class HostReactor {    private static final long DEFAULT_DELAY = 1000L;    private static final long UPDATE_HOLD_INTERVAL = 5000L;    // 存放執行緒非同步呼叫的一個回撥結果    private final Map<String, ScheduledFuture<?>> futureMap;    // 本地已存在的服務列表,key是服務名稱,value是ServiceInfo    private Map<String, ServiceInfo> serviceInfoMap;    // 待更新的例項列表    private Map<String, Object> updatingMap;    // 定時任務(負責服務列表的實時更新)    private ScheduledExecutorService executor;    ....}12345678910111213

再看一看它的getServiceInfo()方法:

public ServiceInfo getServiceInfo(String serviceName, String clusters) {    LogUtils.NAMING_LOGGER.debug("failover-mode: " + this.failoverReactor.isFailoverSwitch());    String key = ServiceInfo.getKey(serviceName, clusters);    if (this.failoverReactor.isFailoverSwitch()) {        return this.failoverReactor.getService(key);    } else {    	// 1.先透過serverName即服務名獲得一個serviceInfo        ServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);        // 如果沒有serviceInfo,則透過傳進來的引數new出一個新的serviceInfo物件,並且同時維護到本地Map和更新Map        // 這裡是serviceInfoMap和updatingMap        if (null == serviceObj) {            serviceObj = new ServiceInfo(serviceName, clusters);            this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);            this.updatingMap.put(serviceName, new Object());            // 2.updateServiceNow(),立刻去Nacos服務端拉去資料。            this.updateServiceNow(serviceName, clusters);            this.updatingMap.remove(serviceName);        } else if (this.updatingMap.containsKey(serviceName)) {            synchronized(serviceObj) {                try {                    serviceObj.wait(5000L);                } catch (InterruptedException var8) {                    LogUtils.NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, var8);                }            }        }		// 3.定時更新例項資訊        this.scheduleUpdateIfAbsent(serviceName, clusters);        // 最後返回服務例項資料(前面已經進行了更新)        return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());    }}1234567891011121314151617181920212223242526272829303132

來看下scheduleUpdateIfAbsent()方法:

// 透過心跳的方式,每10秒去更新一次資料,並不是只有在呼叫服務的時候才會進行更新,而是透過定時任務來非同步進行。public void scheduleUpdateIfAbsent(String serviceName, String clusters) {    if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {        synchronized(this.futureMap) {            if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {            	// 建立一個UpdateTask的更新執行緒任務,每10秒去非同步更新集合資料                ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters));                this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);            }        }    }}123456789101112
案例2:用Debug來理解Nacos服務發現流程

1.進行遠端介面呼叫,觸發服務的發現,呼叫NacosServerList的getServers()方法。傳入的serviceId和對應Feign介面上的介面@FeignClient中的名稱一致。

例如,我這裡呼叫的Feign介面是:

@FeignClient("gulimall-member")public interface MemberFeignService {    @RequestMapping("/member/member/info/{id}")    R info(@PathVariable("id") Long id);}12345

這裡可以看出來,返回的是一個Instance型別的List,對應的服務也發現並返回了。

2.這裡則呼叫了NacosNamingService的selectInstances()方法,我這裡的subscribe值是true,即代表我這個消費者直接訂閱了這個服務,因此最終的資訊是從本地Map中獲取,即Nacos維護了一個註冊列表。

3.再看下HostReactor的getServiceInfo()方法:最終所需要的結果是從serviceInfoMap中獲取,並且透過多個Map進行維護服務例項,若存在資料的變化,還會透過強制睡眠5秒鐘的方式來等待資料的更新。

4.無論怎樣都會呼叫this.scheduleUpdateIfAbsent(serviceName, clusters)方法:

5.透過scheduleUpdateIfAbsent()方法定時的獲取實時的例項資料,並且負責維護本地的服務註冊列表,若服務發生更新,則更新本地的服務資料。

服務發現小總結☆:

經常有人說過,Nacos有個好處,就是當一個服務掛了之後,短時間內不會造成影響,因為有個本地註冊列表,在服務不更新的情況下,服務還能夠正常的運轉,其原因如下:

Nacos的服務發現,一般是透過訂閱的形式來獲取服務資料。而透過訂閱的方式,則是從本地的服務註冊列表中獲取(可以理解為快取)。相反,如果不訂閱,那麼服務的資訊將會從Nacos服務端獲取,這時候就需要對應的服務是健康的。(宕機就不能使用了)在程式碼設計上,透過Map來存放例項資料,key為例項名稱,value為例項的相關資訊資料(ServiceInfo物件)。

最後,服務發現的流程就是:

以呼叫遠端介面(OpenFeign)為例,當執行遠端呼叫時,需要經過服務發現的過程。服務發現先執行NacosServerList類中的getServers()方法,根據遠端呼叫介面上@FeignClient中的屬性作為serviceId傳入NacosNamingService.selectInstances()方法中進行呼叫。根據subscribe的值來決定服務是從本地註冊列表中獲取還是從Nacos服務端中獲取。以本地註冊列表為例,透過呼叫HostReactor.getServiceInfo()來獲取服務的資訊(serviceInfo),Nacos本地註冊列表由3個Map來共同維護。

本地Map–>serviceInfoMap,更新Map–>updatingMap非同步更新結果Map–>futureMap,最終的結果從serviceInfoMap當中獲取。

HostReactor類中的getServiceInfo()方法透過this.scheduleUpdateIfAbsent() 方法和updateServiceNow()方法實現服務的定時更新和立刻更新。而對於scheduleUpdateIfAbsent()方法,則透過執行緒池來進行非同步的更新,將回調的結果(Future)儲存到futureMap中,並且發生提交執行緒任務時,還負責更新本地註冊列表中的資料。看完三件事❤️

如果你覺得這篇內容對你還蠻有幫助,我想邀請你幫我三個小忙:

11
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 每次10分鐘跟我學Python(第二十八次課)