首頁>技術>
前言當配置中心的內容發生變更時,客戶端是如何獲取到最新內容的?監聽資料變更的 Long-Polling 長輪詢是如何實現的?在客戶端叢集模式中,如何做到只更改某一臺客戶端的配置內容?當 Nacos 掛掉後,客戶端還可以獲取資料嗎?簡介

動態配置服務是 Nacos 其中的關鍵特性之一,動態配置服務可以讓您以中心化、外部化和動態化的方式管理所有環境的應用配置和服務配置。動態配置消除了配置變更時重新部署應用和服務的需要,讓配置管理變得更加高效和敏捷。配置中心化管理讓實現無狀態服務變得更簡單,讓服務按需彈性擴充套件變得更容易。

我們可以透過 Nacos 控制檯來發布配置,也可以使用 Nacos 提供的 REST 介面來發布配置。

釋出配置

curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=HelloWorld"

獲取配置

curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"

Nacos 配置中心分為服務端和客戶端,服務端提供 REST 介面查詢、更改配置,客戶端SDK透過封裝了服務端的 REST 介面來獲取配置

客戶端實現原理

使用SDK獲取配置demo

String serverAddr = "{serverAddr}";String dataId = "{dataId}";String group = "{group}";Properties properties = new Properties();properties.put("serverAddr", serverAddr);//建立ConfigServiceConfigService configService = NacosFactory.createConfigService(properties);//獲取dataId的配置String content = configService.getConfig(dataId, group, 5000);System.out.println(content);//動態監聽配置,當dataId資料發生變更時會呼叫receiveConfigInfo方法configService.addListener(dataId, group, new Listener() {    @Override    public void receiveConfigInfo(String configInfo) {        System.out.println("recieve1:" + configInfo);    }    @Override    public Executor getExecutor() {        return null;    }});

demo 中首先獲取了 dataId 的配置內容,並且為該內容添加了監聽器,當dataId內容發生變化時會回撥 receiveConfigInfo 方法獲取最新的內容

獲取配置原理解析

獲取配置的主要方法是 NacosConfigService 類的 getConfigInner 方法,該方法優先從本地檔案中獲取配置,如果沒有本地檔案,則透過 HTTP REST 介面從服務端獲取配置並將配置儲存到本地快照檔案中,如果從服務端獲取配置失敗,則會從快照檔案中獲取配置。

@Overridepublic String getConfig(String dataId, String group, long timeoutMs) throws NacosException {    return getConfigInner(namespace, dataId, group, timeoutMs);}

ConfigService 的 getConfig 方法呼叫的是 getConfigInner 方法

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {    group = null2defaultGroup(group);    ParamUtils.checkKeyParam(dataId, group);    ConfigResponse cr = new ConfigResponse();    cr.setDataId(dataId);    cr.setTenant(tenant);    cr.setGroup(group);    // 1 優先使用本地配置    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);    if (content != null) {        LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),                    dataId, group, tenant, ContentUtils.truncateContent(content));        cr.setContent(content);        configFilterChainManager.doFilter(null, cr);        content = cr.getContent();        return content;    }    //2 如果沒有本地配置,則獲取伺服器中的配置    try {        String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);        cr.setContent(ct[0]);        configFilterChainManager.doFilter(null, cr);        content = cr.getContent();        return content;    } catch (NacosException ioe) {        if (NacosException.NO_RIGHT == ioe.getErrCode()) {            throw ioe;        }        LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",                    agent.getName(), dataId, group, tenant, ioe.toString());    }    //3 如果獲取伺服器配置失敗,則獲取本地的快照資料    LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),                dataId, group, tenant, ContentUtils.truncateContent(content));    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);    cr.setContent(content);    configFilterChainManager.doFilter(null, cr);    content = cr.getContent();    return content;}

getConfigInner 方法優先從本地檔案獲取配置,本地檔案預設是不存在的,因此如果想用本地配置覆蓋遠端配置只需要在本地新建配置檔案即可,Nacos 會優先使用本地檔案,本地檔案配置的路徑為:

/{user.home}/nacos/config/fixed-{serverName}/data/config-data/{group}/{dataId}

如果沒有本地配置,則呼叫 REST 介面從伺服器中獲取配置,呼叫介面為:

/v1/cs/configs?dataId={dataId}&group={group}

如果從伺服器獲取配置失敗,則從本地快照資料中獲取,每次從伺服器獲取資料時都會更新本地快照資料,快照檔案的路徑為:

/{user.home}/nacos/config/fixed-{serverName}/snapshot/{group}/{dataId}

getConfig 方法只是獲取一次配置檔案內容,當配置發生變更後還需要透過上面新增的監聽器來獲得最新的配置

監聽配置原理解析

當透過 addListener 註冊了監聽器後,NacosConfigService 類會使用 ClientWorker 類的 checkConfigInfo 方法建立 LongPollingRunnable 長輪詢執行緒去監聽服務端的配置,預設3000個數據為一組建立一個 LongPollingRunnable 執行緒,長輪詢連線預設超時時間為30秒,在30秒內如果監聽的資料有任何變化會立即返回最新的資料,如果30秒內資料沒有任何變化,則會結束當前的監聽並開啟下一輪監聽。

在 NacosConfigService 類初始化時建立了 ClientWorke r物件,ClientWorker 負責獲取 Nacos 的配置以及建立長輪詢連線監聽Client中所有使用過的配置。

this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties)

NacosConfigService 中初始化 ClientWorker 物件,在 ClientWorker 構造方法中開啟了以10毫秒的間隔去建立一個預設超時事件為30秒的長輪詢連線去監聽本地資料的變化,當資料發生變化時則更新本地資料,否則繼續監聽。

 this.executor.scheduleWithFixedDelay(new Runnable() {     public void run() {         try {             ClientWorker.this.checkConfigInfo();         } catch (Throwable var2) {             ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2);         }     } }, 1L, 10L, TimeUnit.MILLISECONDS);

建立監聽執行緒監聽配置變化

 public void checkConfigInfo() {     // Dispatch taskes.     int listenerSize = cacheMap.get().size();     //把本地資料進行分組,預設每3000個數據為一組,每組會開啟一個長輪詢監聽執行緒     int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());     if (longingTaskCount > currentLongingTaskCount) {         for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {             // 開啟長輪詢執行緒             executorService.execute(new LongPollingRunnable(i));         }         currentLongingTaskCount = longingTaskCount;     } }

LongPollingRunnable 是長輪詢執行緒

 // 使用長輪詢獲取變更的資料列表List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);for (String groupKey : changedGroupKeys) {    //…………    //透過變更的資料key呼叫REST介面獲取最新的資料內容    String[] ct = getServerConfig(dataId, group, tenant, 3000L);    CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));    cache.setContent(ct[0]);    //…………}//開啟下一輪30秒長輪詢監聽executorService.execute(this);

LongPollingRunnable 執行緒首先透過 checkUpdateDataIds 中的長輪詢連線監聽資料,如果資料有變更則更新本地資料,否則開啟下一輪的監聽 checkUpdateDataIds方 法呼叫的是 checkUpdateConfigStr 開啟長輪詢監聽

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {    Map<String, String> params = new HashMap<String, String>(2);    //監聽配置的    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);    Map<String, String> headers = new HashMap<String, String>(2);    //長輪詢超時時間 預設30秒    headers.put("Long-Pulling-Timeout", "" + timeout);   //………………    try {        //呼叫 Nacos服務端的 /listener 介面開始監聽配置變化        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);        HttpRestResult<String> result = agent            .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),                      readTimeoutMs);        //返回監聽的內容,如果配置發生了變化那麼result就是最新的資料,如果配置沒有發生變化那麼result=null        if (result.ok()) {            setHealthServer(true);            return parseUpdateDataIdResponse(result.getData());        } else {            setHealthServer(false);            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),                         result.getCode());        }    } catch (Exception e) {        setHealthServer(false);        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);        throw e;    }    return Collections.emptyList();}

checkUpdateConfigStr 方法發起長輪詢連線來監聽 Nacos 的配置是否有變化,如果在30秒內配置發生了變化則會立即返回新的資料,如果在30秒內沒有任何資料變化,則會返回 NULL,同時會開啟下一輪30秒的監聽。

服務端實現原理

ConfigController 類提供了釋出&獲取配置的 REST 介面,我們分別看下釋出配置獲取配置的實現原理。

釋出配置原理解析

釋出配置時會先把資料持久化到儲存引擎上,一般是mysql或者是Nacos內建的derby資料庫,完成資料持久化之後會將資料變更包裝成 ConfigDataChangeEvent 事件,透過 NotifyCenter.publishEvent 向外廣播資料變更事件,所有訂閱了 ConfigDataChangeEvent 事件的消費方會收到資料變更事件。

ConfigDataChangeEvent 事件訂閱者在收到事件訊息後,會先透過 HTTP REST 介面通知 Nacos 叢集中的所有機器,叢集在接收到通知後會先更新本地的記憶體資料,然後將資料變更事件包裝成 LocalDataChangeEvent 事件透過 NotifyCenter.publishEvent 向外廣播本地資料變更事件,所有訂閱了 LocalDataChangeEvent 事件的消費方會收到資料變更事件。

LocalDataChangeEvent 事件訂閱者在收到事件訊息後,會建立一個執行緒來遍歷所有的客戶端長輪詢連線監聽的資料是否包含此次事件中的變更資料,如果變更的資料有客戶端正在監聽,則直接透過長連線把資料返回給客戶端。

釋出配置的 REST 介面為 ConfigController 中的 publishConfig 方法

@PostMapping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,                             @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,                             @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,                             @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,                             @RequestParam(value = "appName", required = false) String appName,                             @RequestParam(value = "src_user", required = false) String srcUser,                             @RequestParam(value = "config_tags", required = false) String configTags,                             @RequestParam(value = "desc", required = false) String desc,                             @RequestParam(value = "use", required = false) String use,                             @RequestParam(value = "effect", required = false) String effect,                             @RequestParam(value = "type", required = false) String type,                             @RequestParam(value = "schema", required = false) String schema) throws NacosException {    /*    * 省略    */    final String srcIp = RequestUtil.getRemoteIp(request);    final String requestIpApp = RequestUtil.getAppName(request);    /*    * 省略    */    final Timestamp time = TimeUtils.getCurrentTime();    String betaIps = request.getHeader("betaIps");    ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);    configInfo.setType(type);        //更新持久化資料    persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);    //廣播資料變更事件    ConfigChangePublisher        .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));        return true;}

publishConfig 方法先將資料進行持久化,然後將資料變更包裝成 ConfigDataChangeEvent 事件透過 NotifyCenter.publishEvent 向外廣播,ConfigDataChangeEvent 事件的訊息訂閱類是 AsyncNotifyService,它在構造方法中呼叫 NotifyCenter.registerSubscriber 註冊了 Subscriber 事件處理類,因此onEvent呼叫的是 AsyncNotifyService.onEvent 方法

public AsyncNotifyService(ServerMemberManager memberManager) {    this.memberManager = memberManager;    // 訂閱事件型別    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);    // 訂閱事件處理類    NotifyCenter.registerSubscriber(new Subscriber() {        @Override        public void onEvent(Event event) {            // 只處理事件為ConfigDataChangeEvent的類            if (event instanceof ConfigDataChangeEvent) {                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;                long dumpTs = evt.lastModifiedTs;                String dataId = evt.dataId;                String group = evt.group;                String tenant = evt.tenant;                String tag = evt.tag;                //Nacos叢集列表,因為我是單機執行模式,所以ipList是本機節點                Collection<Member> ipList = memberManager.allMembers();                // 將ConfigDataChangeEvent事件轉換為NotifySingleTask任務並將任務放入到佇列中                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();                // 每個叢集都建立一個NotifySingleTask任務                for (Member member : ipList) {                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),                                                   evt.isBeta));                }                // 將佇列資料保證成AsyncTask物件並使用執行緒池執行AsyncTask的run方法                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));            }        }    });}

AsyncNotifyService 的 onEvent 方法在接收到事件資料後將資料包裝成 AsyncTask 任務,並使用執行緒池處理 AsyncTask,如果 Nacos 叢集存在多個N個節點,則相應建立N個 AsyncTask 任務

class AsyncTask implements Runnable {    @Override    public void run() {        executeAsyncInvoke();    }    private void executeAsyncInvoke() {        while (!queue.isEmpty()) {            NotifySingleTask task = queue.poll();            String targetIp = task.getTargetIP();            if (memberManager.hasMember(targetIp)) {                // 檢查叢集中當前服務是否健康,如果服務是下線狀態則延時執行任務                boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);                if (unHealthNeedDelay) {                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,                                                      task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,                                                      0, task.target);                    // 後臺延時執行任務                    asyncTaskExecute(task);                } else {                    Header header = Header.newInstance();                    header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));                    header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());                    if (task.isBeta) {                        header.addParam("isBeta", "true");                    }                    //呼叫叢集的dataChange REST介面                    //task.url = http://127.0.0.1:8848/nacos/v1/cs/communication/dataChange?dataId=nacos.cfg.dataId.1&group=test                    restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));                }            }        }    }}

AsyncTask 會根據當前叢集節點的健康狀態來延時或者直接呼叫叢集節點的 /nacos/v1/cs/communication/dataChange REST介面來更新每個叢集中的記憶體資料

@GetMapping("/dataChange")public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,                                @RequestParam("group") String group,                                @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,                                @RequestParam(value = "tag", required = false) String tag) {    dataId = dataId.trim();    group = group.trim();    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);    String isBetaStr = request.getHeader("isBeta");    //呼叫dump方法更新叢集節點的    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);    } else {        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);    }    return true;}

dataChange REST 介面中呼叫了 dumpService.dump 方法來更新節點的記憶體資料

public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {    String groupKey = GroupKey2.getKey(dataId, group, tenant);    //將資料包裝成DumpTask任務並加入到dumpTaskMgr任務列表中    dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));}

dump 方法將資料包裝成 DumpTask 任務並加入到 dumpTaskMgr 任務列表中,由父類 NacosDelayTaskExecuteEngine 的 processingExecutor 執行緒池按照100毫秒的間隔執行 processTasks 方法

protected void processTasks() {    Collection<Object> keys = getAllTaskKeys();    for (Object taskKey : keys) {        //刪除task        AbstractDelayTask task = removeTask(taskKey);        if (null == task) {            continue;        }        //獲取task物件應的Processor        NacosTaskProcessor processor = getProcessor(taskKey);        if (null == processor) {            getEngineLog().error("processor not found for task, so discarded. " + task);            continue;        }        try {            //如果執行失敗 加入重試任務            if (!processor.process(task)) {                retryFailedTask(taskKey, task);            }        } catch (Throwable e) {            getEngineLog().error("Nacos task execute error : " + e.toString(), e);            retryFailedTask(taskKey, task);        }    }}

processTasks 每100毫秒會執行一次,獲取 Task 對應的 Processor 處理類並執行其 process 方法,如果執行失敗則重新加入到任務佇列中,其中預設的 Processor 為 DumpProcessor,因此 processor.process 呼叫的是 DumpProcessor.process 方法

public class DumpProcessor implements NacosTaskProcessor {        public DumpProcessor(DumpService dumpService) {        this.dumpService = dumpService;    }        @Override    public boolean process(NacosTask task) {        final PersistService persistService = dumpService.getPersistService();        DumpTask dumpTask = (DumpTask) task;        String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());        String dataId = pair[0];        String group = pair[1];        String tenant = pair[2];        long lastModified = dumpTask.getLastModified();        String handleIp = dumpTask.getHandleIp();        boolean isBeta = dumpTask.isBeta();        String tag = dumpTask.getTag();        ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)            .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);           /*           *  省略           */                    //根據dataId、group、tenant從儲存上查詢最新資料            ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);            build.remove(Objects.isNull(cf));            //最新的Content資料            build.content(Objects.isNull(cf) ? null : cf.getContent());            build.type(Objects.isNull(cf) ? null : cf.getType());            return DumpConfigHandler.configDump(build.build());                    }    }    final DumpService dumpService;}

在 DumpProcessor 的 process 方法會從儲存上(mysql、derby)中查詢最新的資料,然後呼叫 dump 方法更新記憶體中的md5值

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,            String type) {    String groupKey = GroupKey2.getKey(dataId, group, tenant);    //獲取key對應的快取物件    CacheItem ci = makeSure(groupKey);    ci.setType(type);    //寫鎖    final int lockResult = tryWriteLock(groupKey);    assert (lockResult != 0);    //加鎖失敗直接返回    if (lockResult < 0) {        DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);        return false;    }    try {        //更新後配置內容的md5值        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);        //新md5和記憶體中的舊md5值如果一致 則不執行saveToDisk的判斷        if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {            DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "                          + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),                          lastModifiedTs);        }         //如果儲存在本地檔案,則儲存到磁碟上        else if (!PropertyUtil.isDirectRead()) {            DiskUtil.saveToDisk(dataId, group, tenant, content);        }        //更新md5值        updateMd5(groupKey, md5, lastModifiedTs);        return true;    } catch (IOException ioe) {        DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);        if (ioe.getMessage() != null) {            String errMsg = ioe.getMessage();            if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg                .contains(DISK_QUATA_EN)) {                // Protect from disk full.                FATAL_LOG.error("磁碟滿自殺退出", ioe);                System.exit(0);            }        }        return false;    } finally {        releaseWriteLock(groupKey);    }}

dump 方法將更新配置的 md5 值

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {    CacheItem cache = makeSure(groupKey);    if (cache.md5 == null || !cache.md5.equals(md5)) {        cache.md5 = md5;        cache.lastModifiedTs = lastModifiedTs;        NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));    }}

updateMd5 方法將更新的資料包裝成 LocalDataChangeEvent 事件並向事件訂閱者廣播,LocalDataChangeEvent 事件的訂閱者是 LongPollingService,因此會呼叫 LongPollingService.onEvent 方法

public LongPollingService() {    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);    // 註冊訂閱事件型別    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);    // 註冊訂閱事件處理類    NotifyCenter.registerSubscriber(new Subscriber() {        @Override        public void onEvent(Event event) {            if (isFixedPolling()) {                // Ignore.            } else {                if (event instanceof LocalDataChangeEvent) {                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;                    //將事件資料包裝成DataChangeTask任務並使用執行緒池執行                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));                }            }        }    });}

LongPollingService 的構造方法中註冊了 LocalDataChangeEvent 事件,並將事件包裝成 DataChangeTask 交給 LongPolling 執行緒池處理

/**  * allSubs儲存了與client端的所有長連結列表  */final Queue<ClientLongPolling> allSubs;class DataChangeTask implements Runnable {            @Override    public void run() {        try {            ConfigCacheService.getContentBetaMd5(groupKey);            //迴圈所有client長連線            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {                ClientLongPolling clientSub = iter.next();                //判斷是監聽列表中是否存在當前groupKey                if (clientSub.clientMd5Map.containsKey(groupKey)) {                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {                        continue;                    }                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {                        continue;                    }                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());                                        //刪除監聽關係                    iter.remove();                     LogUtil.CLIENT_LOG                        .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",                              RequestUtil                              .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),                              "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);                                        //向被監聽的client長連線傳送結果                    clientSub.sendResponse(Arrays.asList(groupKey));                }            }        } catch (Throwable t) {            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));        }    }}void sendResponse(List<String> changedGroups) {     // Cancel time out task.    if (null != asyncTimeoutFuture) {        asyncTimeoutFuture.cancel(false);    }    //生成響應內容    generateResponse(changedGroups);}void generateResponse(List<String> changedGroups) {    //如果沒有變更資料 結束Tomcat的非同步請求    if (null == changedGroups) { // Tell web container to send http response.        asyncContext.complete();        return;    }    HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();    try {        final String respString = MD5Util.compareMd5ResultString(changedGroups);        // 禁用快取        response.setHeader("Pragma", "no-cache");        response.setDateHeader("Expires", 0);        response.setHeader("Cache-Control", "no-cache,no-store");        response.setStatus(HttpServletResponse.SC_OK);        //返回變更的groupKey        response.getWriter().println(respString);        //結束Tomcat的非同步請求        asyncContext.complete();    } catch (Exception ex) {        PULL_LOG.error(ex.toString(), ex);        asyncContext.complete();    }}

DataChangeTask 每次執行時會在所有的 Queue<ClientLongPolling> allSubs 長連線列表中查詢有監聽當前資料變更的 Client,並將變更資料推送給 Client,同時結束 Client 長連線輪詢連線。

以上是整個釋出配置的流程,程式碼比較長,需要仔細閱讀,Long-Polling 長輪詢也是 Nacos 比較核心的特性。

獲取配置原理解析

Nacos 獲取配置有2種方法,客戶端可以透過 HTTP GET 獲取一次配置內容,另外一種是客戶端透過 HTTP GET /listener 長輪詢的方式監聽某個配置,當服務端配置發生變化時會將最新的配置推送給客戶端。

1 短連接獲取一次資料

根據 dataId、group 等引數獲取 Nacos 中的內容,在 Nacos 中每個配置項都是一個 CacheItem 物件,每個 CacheItem 物件中都包含一把讀寫鎖,當客戶端來讀取資料時,先根據 dataId 等引數獲取 CacheItem,如果 CacheItem 不存在,則返回404,如果 CacheItem 存在會對其加read鎖,如果加鎖失敗,則會重試,超過最大重試次數10次後仍然失敗的,則返回409,如果加鎖成功,則從資料庫中讀取資料返回給客戶端,最後釋放鎖。

HTTP GET 獲取一次配置的REST介面為 getConfig 方法

@GetMapping@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void getConfig(HttpServletRequest request, HttpServletResponse response,                      @RequestParam("dataId") String dataId, @RequestParam("group") String group,                      @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,                      @RequestParam(value = "tag", required = false) String tag)    throws IOException, ServletException, NacosException {    // 檢查多租戶引數    ParamUtils.checkTenant(tenant);    tenant = NamespaceUtil.processNamespaceParameter(tenant);    // 驗證引數    ParamUtils.checkParam(dataId, group, "datumId", "content");    ParamUtils.checkParam(tag);    final String clientIp = RequestUtil.getRemoteIp(request);    //呼叫doGetConfig獲取配置    inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);}

getConfig 方法是獲取配置的 REST 介面,呼叫了 doGetConfig 方法

public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,            String tenant, String tag, String clientIp) throws IOException, ServletException {        //將引數拼接成組合成groupKey字串        final String groupKey = GroupKey2.getKey(dataId, group, tenant);        String autoTag = request.getHeader("Vipserver-Tag");        String requestIpApp = RequestUtil.getAppName(request);            //對groupKey加讀鎖,Nacos會把所有的配置資料dump到記憶體中做快取,每個快取資料物件中都會包含一把讀寫鎖        //lockResult = 0 記憶體中不存在groupKey對應的資料        //lockResult = 1 加鎖成功        //lockResult = -1 加鎖失敗        int lockResult = tryConfigReadLock(groupKey);        final String requestIp = RequestUtil.getRemoteIp(request);        boolean isBeta = false;        //加鎖成功        if (lockResult > 0) {                       /**            *程式碼太長,省略非核心邏輯            */                        String md5 = Constants.NULL;            long lastModified = 0L;            //從快取中獲取CacheItem物件            CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);            //配置內容的md5            md5 = cacheItem.getMd5();            //配置內容最後修改時間            lastModified = cacheItem.getLastModifiedTs();            //如果是單機模式,直接從持久化的資料來源讀取資料,如mysql、derby,否則從檔案系統讀取資料            if (PropertyUtil.isDirectRead()) {                configInfoBase = persistService.findConfigInfo(dataId, group, tenant);            } else {                file = DiskUtil.targetFile(dataId, group, tenant);            }            //容錯處理            //如果持久化資料來源和檔案都不存在資料返回資料不存在            if (configInfoBase == null && fileNotExist(file)) {                // FIXME CacheItem                // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.                ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,                                                ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp);                response.setStatus(HttpServletResponse.SC_NOT_FOUND);                response.getWriter().println("config data not exist");                return HttpServletResponse.SC_NOT_FOUND + "";            }                      /**            *程式碼太長,省略非核心邏輯            */            // 禁用快取.            response.setHeader("Pragma", "no-cache");            response.setDateHeader("Expires", 0);            response.setHeader("Cache-Control", "no-cache,no-store");            if (PropertyUtil.isDirectRead()) {                response.setDateHeader("Last-Modified", lastModified);            } else {                fis = new FileInputStream(file);                response.setDateHeader("Last-Modified", file.lastModified());            }            //返回資料            if (PropertyUtil.isDirectRead()) {                out = response.getWriter();                out.print(configInfoBase.getContent());                out.flush();                out.close();            } else {                fis.getChannel()                    .transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));            }                        }         //加鎖資料 返回資料不存在        else if (lockResult == 0) {            // FIXME CacheItem No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.            ConfigTraceService                    .logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1,                            requestIp);            response.setStatus(HttpServletResponse.SC_NOT_FOUND);            response.getWriter().println("config data not exist");            return HttpServletResponse.SC_NOT_FOUND + "";                    }         //在嘗試了10次加鎖後失敗,返回資源衝突         else {            PULL_LOG.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);            response.setStatus(HttpServletResponse.SC_CONFLICT);            response.getWriter().println("requested file is being modified, please try later.");            return HttpServletResponse.SC_CONFLICT + "";        }        return HttpServletResponse.SC_OK + "";}

doGetConfig 方法對 CacheItem 加讀鎖成功後從持久化資料層或者檔案中讀取配置內容,doGetConfig 獲取配置內容的邏輯比較簡單

2 長連線監聽資料變更

根據客戶端傳入的 probeModify 監聽的資料列表,先判斷客戶端是否支援長輪詢,如果客戶端支援長輪詢,則開啟長輪詢連線,如果客戶端不支援,則檢測被監聽資料 probeModify 中的資料在服務端是否存在變更,如果有直接返回最新的資料。

服務端長輪詢是使用的一個延時執行緒 ClientLongPolling 實現的,用以阻塞客戶端的連線,並且將執行緒 ClientLongPolling 加入到 Queue<ClientLongPolling> allSubs 儲存了起來,執行緒預設延時時間為客戶端傳入的 Long-Pulling-Timeout 減去0.5秒,因此延時時間一般是29.5秒,在29.5秒後延時執行緒會直接返回 NULL,由客戶端發起下一次長輪詢請求,直接返回 NULL 的原因是因為如果在這29.5秒中如果被監聽的 probeModify 資料發生了變化,會在釋出配置時建立的 DataChangeTask 執行緒中會從 Queue<ClientLongPolling> allSubs 延時執行緒列表中找到響應的 ClientLongPolling 執行緒,將執行緒取消,同時將最新的資料透過 ClientLongPolling 中儲存的 AsyncContext 物件將資料推送給客戶端,因為如果 ClientLongPolling 執行緒在29.5秒後執行了,說明在這期間沒有資料變更,因此直接返回NULL。

HTTP Long-Polling 長輪詢監聽介面為 listener 方法

@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response)    throws ServletException, IOException {    //設定非同步引數    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);    //獲取監聽的dataId    String probeModify = request.getParameter("Listening-Configs");    if (StringUtils.isBlank(probeModify)) {        throw new IllegalArgumentException("invalid probeModify");    }    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);    Map<String, String> clientMd5Map;    try {        clientMd5Map = MD5Util.getClientMd5Map(probeModify);    } catch (Throwable e) {        throw new IllegalArgumentException("invalid probeModify");    }    // 開始長輪詢監聽    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}

org.apache.catalina.ASYNC_SUPPORTED 是 Servlet3.0 的新特性,支援非同步處理請求

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,            Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {            // 如果客戶端支援長輪詢,將開啟長輪詢監聽資料變更    if (LongPollingService.isSupportLongPolling(request)) {        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);        return HttpServletResponse.SC_OK + "";    }    // 如果客戶端不支援長輪詢,則直接查詢probeModify中修改的資料並返回結果    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);    String oldResult = MD5Util.compareMd5OldResult(changedGroups);    String newResult = MD5Util.compareMd5ResultString(changedGroups);    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);    if (version == null) {        version = "2.0.0";    }    int versionNum = Protocol.getVersionNumber(version);    // 2.0.4之前的版本將新老MD5值放入header中    if (versionNum < START_LONG_POLLING_VERSION_NUM) {        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);    } else {        request.setAttribute("content", newResult);    }    Loggers.AUTH.info("new content:" + newResult);    // 禁用快取.    response.setHeader("Pragma", "no-cache");    response.setDateHeader("Expires", 0);    response.setHeader("Cache-Control", "no-cache,no-store");    response.setStatus(HttpServletResponse.SC_OK);    return HttpServletResponse.SC_OK + "";}

doPollingConfig 先判斷客戶端是否支援長輪詢如果支援則使用長輪詢監聽,否則直接返回有變更資料的 md5 值

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,            int probeRequestSize) {    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);    String tag = req.getHeader("Vipserver-Tag");    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);    // 獲取長輪詢超時時間,為避免客戶端超時這裡的超時時間減去了500毫秒    //timeout 一般為 30 - 0.5 = 29.5秒    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);    if (isFixedPolling()) {        timeout = Math.max(10000, getFixedPollingInterval());    } else {        long start = System.currentTimeMillis();        //查詢有變化的資料        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);        //資料發生了變化        if (changedGroups.size() > 0) {            //直接返回變化的資料            generateResponse(req, rsp, changedGroups);            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",                                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                                    changedGroups.size());            return;        }         //如果監聽的資料沒有變化並且header中有Long-Pulling-Timeout-No-Hangup標示則直接結束        else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                                    changedGroups.size());            return;        }    }        /**    * 注意!!!    * 當資料沒有變化時,程式碼會執行到這裡,開始使用長輪詢阻塞請求    */        String ip = RequestUtil.getRemoteIp(req);    // 開啟非同步支援    final AsyncContext asyncContext = req.startAsync();    // Servlet的非同步超時時間不正確,超時時間由自己來控制    asyncContext.setTimeout(0L);    // 建立ClientLongPolling執行緒並交給ConfigExecutor執行    ConfigExecutor.executeLongPolling(        new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}//透過Response返回變化的資料void generateResponse(HttpServletRequest request, HttpServletResponse response, List<String> changedGroups) {    if (null == changedGroups) {        return;    }    try {        final String respString = MD5Util.compareMd5ResultString(changedGroups);        // 禁用快取.        response.setHeader("Pragma", "no-cache");        response.setDateHeader("Expires", 0);        response.setHeader("Cache-Control", "no-cache,no-store");        response.setStatus(HttpServletResponse.SC_OK);        response.getWriter().println(respString);    } catch (Exception ex) {        PULL_LOG.error(ex.toString(), ex);    }}

addLongPollingClient 首先會檢測一次資料是否有變化,如果有則透過 generateResponse 方法直接返回響應結果,否則建立 ClientLongPolling 執行緒開啟長輪詢連線,長輪詢連線使用一個延時執行緒實現,延時時間從客戶端的 header 中獲取,預設為30s,實際上是29.5秒防止客戶端超時。

/**  * allSubs儲存了與client端的所有長連結列表  */final Queue<ClientLongPolling> allSubs;class ClientLongPolling implements Runnable {            @Override    public void run() {        //run 方法建立了一個延時執行緒,延時時間為長輪詢的超時時間 30 - 0.5 = 29.5秒        //也就是說在29.5秒之後會執行下面的run方法        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {            @Override            public void run() {                try {                    /**                    * 29.5秒之後開始執行                    */                                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());                    // 刪除訂閱關係                    allSubs.remove(ClientLongPolling.this);                    //不會走這個分支                    if (isFixedPolling()) {                        LogUtil.CLIENT_LOG                            .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",                                  RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),                                  "polling", clientMd5Map.size(), probeRequestSize);                        List<String> changedGroups = MD5Util                            .compareMd5((HttpServletRequest) asyncContext.getRequest(),                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);                        if (changedGroups.size() > 0) {                            sendResponse(changedGroups);                        } else {                            sendResponse(null);                        }                    }                     //因為過了29.5秒到了長輪詢的超時時間,說明在29.5秒內沒有資料發生過變化                    //因此傳送空資料給client,由client開啟下一次長輪詢                    else {                        LogUtil.CLIENT_LOG                            .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",                                  RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),                                  "polling", clientMd5Map.size(), probeRequestSize);                        //傳送空的資料給client                        sendResponse(null);                    }                } catch (Throwable t) {                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());                }            }        }, timeoutTime, TimeUnit.MILLISECONDS);        //注意!!!        //在建立完延時執行緒後,就將當前物件加入allSubs佇列中了,allSubs儲存了與client端的所有長連結列表        allSubs.add(this);    }}

ClientLongPolling 會建立一個29.5秒的延時執行緒,並將當前長輪詢物件加入到 allSubs 佇列中,在29.5之內如果監聽的資料發生了變化會由釋出配置的 DataChangeTask 執行緒將變更資料傳送給 Client 同時取消 asyncTimeoutFuture 這個延時執行緒,如果在29.5內監聽的資料沒有傳送變化則傳送空資料給 Client,由 Client 開啟下一次長輪詢。

總結Nacos 客戶端SDK在獲取配置時會優先從本地檔案中讀取配置,也就是說如果不想從 Nacos 服務端中獲取資料,可以在本地新建檔案,這樣就可以單獨更改叢集中某個機器的配置Nacos 客戶端SDK在從 Nacos 伺服器獲取配置失敗時,會從快照資料中讀取配置,而快照資料是儲存的上一次從伺服器拉取的資料,當 Nacos 伺服器掛掉後,從本地快照依然可以獲取資料Nacos 服務端支援短連接獲取配置和長輪詢監聽配置方式,長輪詢監聽是基於 Servlet3.0 的非同步特性實現的,由客戶端發起長輪詢請求,服務端使用延時執行緒阻塞請求,阻塞超時時間為30秒,在30秒內監聽資料發生變化服務端將最新資料推送給客戶端,在30秒外服務端返回NULL給客戶端,由客戶端發起下一次長輪詢請求,繼續監聽。

8
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 普通運維人員就是秋後的螞蚱