0x00 摘要
SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。
本文為第十七篇,介紹SOFARegistry網路操作之配置資訊如何處理。
0x01 業務範疇1.1 配置作用比如某些系統相關的服務,需要由控制檯來設定。所以Meta Server對控制檯提供了介面,當Meta Server 接受到控制檯請求後,會和 Data Server,Session Server 進行互動,比如Meta Server 提供瞭如下介面:
@Bean@ConditionalOnMissingBeanpublic StopPushDataResource stopPushDataResource() { return new StopPushDataResource();}@Beanpublic BlacklistDataResource blacklistDataResource() { return new BlacklistDataResource();}@Beanpublic RenewSwitchResource renewSwitchResource() { return new RenewSwitchResource();}
對外提供http介面,是因為這是正常基本操作。但是Server之間依然是Bolt協議操作。
1.2 學習方向此處推導如下:在DataServer端,如何把配置資訊單獨摘出來。
0x02 資料結構2.1 目錄結構DataServer之中,配置相關目錄如下,可以看到有Handler,服務,task以及provideData。
│ ├── metaserver│ │ ├── DefaultMetaServiceImpl.java│ │ ├── IMetaServerService.java│ │ ├── MetaServerConnectionFactory.java│ │ ├── handler│ │ │ ├── NotifyProvideDataChangeHandler.java│ │ │ ├── ServerChangeHandler.java│ │ │ └── StatusConfirmHandler.java│ │ ├── provideData│ │ │ ├── ProvideDataProcessor.java│ │ │ ├── ProvideDataProcessorManager.java│ │ │ └── processor│ │ │ └── DatumExpireProvideDataProcessor.java│ │ └── task
2.2 資料結構定義配置相關資料結構如下:
ProvideData是對外的互動介面,裡面是版本號和服務標示dataInfoId。
public class ProvideData implements Serializable { private ServerDataBox provideData; private String dataInfoId; private Long version;}
ServerDataBox是具體業務,其定義如下
public class ServerDataBox implements Serializable { /** Null for locally instantiated, otherwise for internalized */ private byte[] bytes; /** Only available if bytes != null */ private int serialization; /** Actual object, lazy deserialized */ private Object object;}
關於ServerDataBox,目前在Data Server只有一處使用。使用的是boolean型別,也就是控制開關配置。
public void changeDataProcess(ProvideData provideData) { boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData() .getObject()); datumLeaseManager.setRenewEnable(enableDataDatumExpire);}
0x03 Meta Server 內流程這裡為了打通流程,需要先提一下 meta server 內部與 metaServerService.fetchData(dataInfoId) 相關的流程。
處於解耦的目的,Meta Server 把某些業務功能分割成四個層次,基本邏輯是:
Http Resource ———> TaskListener ———> Task ————> Service
首先給出流程圖如下,下文會逐步介紹流程:
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------+ 7+-------+ 1 | | +---------------------------------------+| Admin | +---> | +--------------------- | | Data Server |+-------+ | |fireDataChangeNotify| | | | | +--------------------+ | 6 | +-----------------------------------+ | +------------------------+ | | metaClientHandlers | | | +---------------------+ dataNodeExchanger.request | | +-------------------------------+ | | | 3 | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | | | +----------+----------+ NotifyProvideDataChange | | +-------------------------------+ | | | NotifyProvideDataChange ^ | | | | | | | +-----------------------------------+ | | 5 | notifyProvideDataChange +---------------------------------------+ v | +---------+-----------------------------------+ | | DefaultTaskListenerManager | | | | +----+----------------------------+ | +-----------------------------------------+ | 4 | | | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask | | | | | | | | | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+ | | | | | | dataNodeChangePushTaskListener | | | | | | | | sessionNodeChangePushTaskListener | | | +-----------------------------------------+ | +---------------------------------------------+
手機圖示如下 :
3.1 Admin請求響應前面提到,Meta Server 透過 Http協議給Admin提供了一些控制介面,下面我們就以 BlacklistDataResource 為例研究下。
可以看到,blacklistPush 函式中會先儲存在 persistenceDataDBService 中,然後 fireDataChangeNotify 間接傳送 NotifyProvideDataChange。
@Path("blacklist")public class BlacklistDataResource { @RaftReference private DBService persistenceDataDBService; @Autowired private TaskListenerManager taskListenerManager; /** * update blacklist * e.g. curl -d '{"FORBIDDEN_PUB":{"IP_FULL":["1.1.1.1","10.15.233.150"]},"FORBIDDEN_SUB_BY_PREFIX":{"IP_FULL":["1.1.1.1"]}}' -H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update */ @POST @Path("update") @Produces(MediaType.APPLICATION_JSON) public Result blacklistPush(String config) { PersistenceData persistenceData = createDataInfo(); persistenceData.setData(config); boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID, persistenceData); fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID, DataOperator.UPDATE); Result result = new Result(); result.setSuccess(true); return result; } private PersistenceData createDataInfo() { DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID); PersistenceData persistenceData = new PersistenceData(); persistenceData.setDataId(dataInfo.getDataId()); persistenceData.setGroup(dataInfo.getDataType()); persistenceData.setInstanceId(dataInfo.getInstanceId()); persistenceData.setVersion(System.currentTimeMillis()); return persistenceData; } private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) { NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId, version, dataOperator); TaskEvent taskEvent = new TaskEvent(notifyProvideDataChange, TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK); taskListenerManager.sendTaskEvent(taskEvent); }}
這裡對應上圖的:
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------++-------+ 1 | || Admin | +---> | +--------------------+ |+-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+
3.2 DBService
可以看到,DBService也是基於 Raft,這說明在MetaServer叢集內部自己維護了一致性。
@RaftReferenceprivate DBService persistenceDataDBService;
PersistenceDataDBService 類精簡版定義如下:
@RaftServicepublic class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService { private ConcurrentHashMap<String, Object> serviceMap = new ConcurrentHashMap<>(); @Override public boolean put(String key, Object value) { Object ret = serviceMap.put(key, value); return true; } @Override public DBResponse get(String key) { Object ret = serviceMap.get(key); return ret != null ? DBResponse.ok(ret).build() : DBResponse.notfound().build(); } @Override public boolean update(String key, Object value) { Object ret = serviceMap.put(key, value); return true; } @Override public Set<String> getSnapshotFileNames() { if (!snapShotFileNames.isEmpty()) { return snapShotFileNames; } snapShotFileNames.add(this.getClass().getSimpleName()); return snapShotFileNames; } }
可以看出來,主要採用了ConcurrentHashMap來進行儲存,Raft機制則用檔案系統完成快照備份。
3.3 Bean如前所述,為了解耦,Meta Server 把一些訊息處理轉發等功能封裝為TaskListener,由 TaskListenerManager 在邏輯上負責統一執行。這裡就以ProvideData相關功能為例,對應的Bean是。
@Configurationpublic static class MetaServerTaskConfiguration { ...... @Bean public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) { TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener( sessionNodeSingleTaskProcessor()); taskListenerManager.addTaskListener(taskListener); return taskListener; } @Bean public TaskListenerManager taskListenerManager() { return new DefaultTaskListenerManager(); }}
3.4 ListenerListener的執行引擎如下,可以看出來是遍歷listener列表進行處理,如果某listener可以處理,就執行。
public class DefaultTaskListenerManager implements TaskListenerManager { private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create(); @Override public Multimap<TaskType, TaskListener> getTaskListeners() { return taskListeners; } @Override public void addTaskListener(TaskListener taskListener) { taskListeners.put(taskListener.support(), taskListener); } @Override public void sendTaskEvent(TaskEvent taskEvent) { Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType()); for (TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); } }}
對應業務Listener如下:
public class PersistenceDataChangeNotifyTaskListener implements TaskListener { @Autowired private MetaServerConfig metaServerConfig; private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher; public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) { singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher( TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor); } @Override public TaskType support() { return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK; } @Override public void handleEvent(TaskEvent event) { MetaServerTask persistenceDataChangeNotifyTask = new PersistenceDataChangeNotifyTask( metaServerConfig); persistenceDataChangeNotifyTask.setTaskEvent(event); singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(), persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime()); }}
這裡對應瞭如下:
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------++-------+ 1 | || Admin | +---> | +--------------------+ |+-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | | 3 | | NotifyProvideDataChange | | v +---------+-----------------------------------+ | DefaultTaskListenerManager | | | | +-----------------------------------------+ | | | persistenceDataChangeNotifyTaskListener | | | | | | | | receiveStatusConfirmNotifyTaskListener | | | | | | | | dataNodeChangePushTaskListener | | | | | | | | sessionNodeChangePushTaskListener | | | +-----------------------------------------+ | +---------------------------------------------+
3.5 TaskListener會呼叫到Task。
處理Task如下,需要區分根據NoteType不同,來呼叫不同的服務:
public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask { private final SessionNodeService sessionNodeService; private final DataNodeService dataNodeService; final private MetaServerConfig metaServerConfig; private NotifyProvideDataChange notifyProvideDataChange; @Override public void execute() { Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes(); if (nodeTypes.contains(NodeType.DATA)) { dataNodeService.notifyProvideDataChange(notifyProvideDataChange); } if (nodeTypes.contains(NodeType.SESSION)) { sessionNodeService.notifyProvideDataChange(notifyProvideDataChange); } } @Override public void setTaskEvent(TaskEvent taskEvent) { Object obj = taskEvent.getEventObj(); if (obj instanceof NotifyProvideDataChange) { this.notifyProvideDataChange = (NotifyProvideDataChange) obj; } }}
這裡對應
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------++-------+ 1 | || Admin | +---> | +--------------------+ |+-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | | 3 | | NotifyProvideDataChange | | v+-------------------+-------------------------+| DefaultTaskListenerManager || | +---------------------------------+| +-----------------------------------------+ | 4 | || | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask || | | | | || | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+| | | || | dataNodeChangePushTaskListener | || | | || | sessionNodeChangePushTaskListener | || +-----------------------------------------+ |+---------------------------------------------+
3.6 服務task會呼叫服務來執行具體業務,具體業務服務如下,這裡會向DataServer或者SessionServer傳送推送。
public class DataNodeServiceImpl implements DataNodeService { @Autowired private NodeExchanger dataNodeExchanger; @Autowired private StoreService dataStoreService; @Autowired private AbstractServerHandler dataConnectionHandler; @Override public NodeType getNodeType() { return NodeType.DATA; } @Override public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) { NodeConnectManager nodeConnectManager = getNodeConnectManager(); Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null); // add register confirm StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA); Map<String, DataNode> dataNodes = storeService.getNodes(); for (InetSocketAddress connection : connections) { if (!dataNodes.keySet().contains(connection.getAddress().getHostAddress())) { continue; } try { Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() { @Override public NotifyProvideDataChange getRequestBody() { return notifyProvideDataChange; } @Override public URL getRequestUrl() { return new URL(connection); } }; dataNodeExchanger.request(request); } } }}
這裡對應
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------++-------+ 1 | || Admin | +---> | +--------------------+ |+-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | +---------------------+ | 3 | DataNodeServiceImpl | | +----------+----------+ | NotifyProvideDataChange ^ | | | 5 | notifyProvideDataChange v |+---------+-----------------------------------+ || DefaultTaskListenerManager | || | +----+----------------------------+| +-----------------------------------------+ | 4 | || | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask || | | | | || | receiveStatusConfirmNotifyTaskListener | | +---------------------------------+| | | || | dataNodeChangePushTaskListener | || | | || | sessionNodeChangePushTaskListener | || +-----------------------------------------+ |+---------------------------------------------+
傳送之後,就是
+------------------------+ | | 2 +-------------------------+ | BlacklistDataResource +------>-+PersistenceDataDBService | | | update +-------------------------++-------+ 1 | || Admin | +---> | +--------------------+ |+-------+ | |fireDataChangeNotify| | | +--------------------+ | +------------------------+ | 3 | NotifyProvideDataChange v+-------------------+-------------------------+| DefaultTaskListenerManager || | +---------------------------------+| +-----------------------------------------+ | 4 | || | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask || | receiveStatusConfirmNotifyTaskListener | | | || | dataNodeChangePushTaskListener | | +----+----------------------------+| | sessionNodeChangePushTaskListener | | || +-----------------------------------------+ | |+---------------------------------------------+ 5 | notifyProvideDataChange | +-------------------------------------------+ | v +---v---------+-------+ | DataNodeServiceImpl | +---------------------------------------+ +-------------+-------+ | Data Server 7 | | 6 | | | dataNodeExchanger.request | +-----------------------------------+ | +->------------------------------>+ | metaClientHandlers | | NotifyProvideDataChange | | +-------------------------------+ | | | | | notifyPro|ideDataChangeHandler| | | | | +-------------------------------+ | | | +-----------------------------------+ | +---------------------------------------+
現在我們知道了,在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函式會通知 Data Server,現在有一個NotifyProvideDataChange 訊息。
0x04 呼叫路徑 in Data Server執行序列來到了DataServer。我們先要做一些前提準備。
4.1 BeanBean metaClientHandlers是 MetaNodeExchanger 的響應函式。而 notifyProvideDataChangeHandler 是 metaClientHandlers 的一部分。
@Bean(name = "metaClientHandlers")public Collection<AbstractClientHandler> metaClientHandlers() { Collection<AbstractClientHandler> list = new ArrayList<>(); list.add(serverChangeHandler()); list.add(statusConfirmHandler()); list.add(notifyProvideDataChangeHandler()); return list;}
4.2 網路互動MetaNodeExchanger 在 DefaultMetaServiceImpl.getMetaServerMap 呼叫 metaNodeExchanger.connect 的時候,會設定這個 metaClientHandlers。這樣就把notifyProvideDataChangeHandler同MetaServer以Bolt方式聯絡了起來。
public class DefaultMetaServiceImpl implements IMetaServerService { @Override public Map<String, Set<String>> getMetaServerMap() { Connection connection = null; connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator() .next(), dataServerConfig.getMetaServerPort()))).getConnection(); }}
MetaNodeExchanger定義如下,其作用是統一處理DataServer內部關於MetaServer的互動。
public class MetaNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private IMetaServerService metaServerService; @Resource(name = "metaClientHandlers") private Collection<AbstractClientHandler> metaClientHandlers; public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.META_SERVER_TYPE, url, metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()])); } } } //try to connect data Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; }}
4.3 Handler 定義NotifyProvideDataChangeHandler 在 interest 函式中,設定了自己可以處理 NotifyProvideDataChange 型別訊息。這樣當 MetaServer 通知有 NotifyProvideDataChange 的時候,就會呼叫 metaServerService.fetchData(dataInfoId); 獲取 ProvideData,進行後續處理。
public class NotifyProvideDataChangeHandler extends AbstractClientHandler { @Autowired private IMetaServerService metaServerService; @Autowired private ProvideDataProcessor provideDataProcessorManager; @Override public Object doHandle(Channel channel, Object request) { NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request; String dataInfoId = notifyProvideDataChange.getDataInfoId(); if (notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) { ProvideData provideData = metaServerService.fetchData(dataInfoId); provideDataProcessorManager.changeDataProcess(provideData); } return null; } @Override public Class interest() { return NotifyProvideDataChange.class; } }
4.4 呼叫 Handler在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函式會通知 Data Server,現在有一個NotifyProvideDataChange 訊息。
於是NotifyProvideDataChangeHandler將作出響應。
4.5 獲取 ProvideData在 NotifyProvideDataChangeHandler 之中 ,有如下
ProvideData provideData = metaServerService.fetchData(dataInfoId);
然後呼叫 DefaultMetaServiceImpl 中 fetchData 來去 Meta Server 獲取 ProvideData。
@Overridepublic ProvideData fetchData(String dataInfoId) { Map<String, Connection> connectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); String leader = getLeader().getIp(); if (connectionMap.containsKey(leader)) { Connection connection = connectionMap.get(leader); if (connection.isFine()) { try { Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() { @Override public FetchProvideDataRequest getRequestBody() { return new FetchProvideDataRequest(dataInfoId); } @Override public URL getRequestUrl() { return new URL(connection.getRemoteIP(), connection.getRemotePort()); } }; Response response = metaNodeExchanger.request(request); Object result = response.getResult(); if (result instanceof ProvideData) { return (ProvideData) result; } } } } String newip = refreshLeader().getIp(); return null;}
現在圖示如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+| DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server || | | | 1 | +-----------------------------+ || getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | || | +-----------------------------------+--------------+ | | | |+---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | |+------------------------------------------+ | | | | | || metaClientHandlers +--------------------------+ | +-----------------------------+ || | +---------------------------------+| +------------------------------------+ | | ^| | serverChangeHandler | | | || | | | | || | statusConfirmHandler | | NotifyProvideDataChange | || | | | | || | +--------------------------------+ | | 2 | 3 || | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ || | | | | | || | | | | | || | | | | | || | | ProvideData provideData = +--------------------------------------------------------------------------------------+| | | | | | get ProvideData from Meta Server FetchProvideDataRequest| | | metaServerService.fetchData | | || | | | | || | | | | || | | | | || | | changeDataProcess(provideData) | | || | | | | || | +--------------------------------+ | || +------------------------------------+ |+------------------------------------------+
手機上如下:
0x05 回到了MetaServer執行序列回到了MetaServer,它收到了FetchProvideDataRequest。
5.1 ata Server請求響應FetchProvideDataRequestHandler是響應函式。函式邏輯相對簡單,就是從DBService之中根據DataInfoId獲取資料,返回給呼叫者。
public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> { @RaftReference private DBService persistenceDataDBService; @Override public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) { DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId()); if (ret.getOperationStatus() == OperationStatus.SUCCESS) { PersistenceData data = (PersistenceData) ret.getEntity(); ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()), fetchProvideDataRequest.getDataInfoId(), data.getVersion()); return provideData; } else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) { ProvideData provideData = new ProvideData(null, fetchProvideDataRequest.getDataInfoId(), null); return provideData; } } } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override public Class interest() { return FetchProvideDataRequest.class; }}
由此可見,這裡的關鍵是 DBService。
於是從MetaServer角度看,流程如下:
+----------------------------------------------+| Data Server || || +---------------------------------------+ || | NotifyProvideDataChangeHandler | || | | || | | || |metaSer^erSer^ice.fetchData(dataInfoId)| || +---------------------------------------+ |+----------------------------------------------+ | ^ | | 1 | | FetchProvideDataRequest | | ProvideData | | | | 4 +-----------------------------------------+ | Meta Server | | | | | | | | +--------------------v---+-------+ | | | FetchProvideDataRequestHandler | | | +--------------+---+-------------+ | | 2 | ^ | | | | DBResponse | | get(DataInfoId) | | 3 | | v | | | +---------+---+------------+ | | | PersistenceDataDBService | | | +--------------------------+ | +-----------------------------------------+
5.2 Session Server對應處理Session Server 也會發起 FetchProvideDataRequest。在 SessionServerBootstrap 中有如下函式,都會發起請求,獲取配置資訊。
private void fetchStopPushSwitch(URL leaderUrl) { FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest( ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID); Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl); if (ret instanceof ProvideData) { ProvideData provideData = (ProvideData) ret; provideDataProcessorManager.fetchDataProcess(provideData); } }private void fetchEnableDataRenewSnapshot(URL leaderUrl) { FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest( ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT); Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl); if (data instanceof ProvideData) { ProvideData provideData = (ProvideData) data; provideDataProcessorManager.fetchDataProcess(provideData); }}private void fetchBlackList() { blacklistManager.load();}
0x06 DataServer6.1 處理 ProvideData在 NotifyProvideDataChangeHandler 之中,如下語句用來處理ProvideData。就是在fetchData之中。
在請求響應處理中
Response response = metaNodeExchanger.request(request); Object result = response.getResult(); if (result instanceof ProvideData) { return (ProvideData) result; }
就是如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+| DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server || | | | 1 | +-----------------------------+ || getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | || | +-----------------------------------+--------------+ | | | |+---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | |+------------------------------------------+ | | | | | || metaClientHandlers +--------------------------+ | +-----------------------------+ || | +---------------------------------+| +------------------------------------+ | | ^ || | serverChangeHandler | | | | || | | | | | || | statusConfirmHandler | | NotifyProvideDataChange | | || | | | | | || | +--------------------------------+ | | 2 | 3 | | 4| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | || | | | | | | || | | | | | get ProvideData from Meta Server | || | | | | | | || | | ProvideData provideData = +--------------------------------------------------------------------------------------+ || | | | | | || | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+| | | | | | ProvideData| | | | | || | | | | || | | changeDataProcess(provideData) | | || | | | | || | +--------------------------------+ | || +------------------------------------+ |+------------------------------------------+
手機如下:
繼續處理是如下:
provideDataProcessorManager.changeDataProcess(provideData);
這就牽扯瞭如何用引擎處理。
6.1.1 Bean這裡生成了處理引擎 ProvideDataProcessorManager,添加了一個處理handler DatumExpireProvideDataProcessor。
@Configurationpublic static class DataProvideDataConfiguration { @Bean public ProvideDataProcessor provideDataProcessorManager() { return new ProvideDataProcessorManager(); } @Bean public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) { ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor(); ((ProvideDataProcessorManager) provideDataProcessorManager) .addProvideDataProcessor(datumExpireProvideDataProcessor); return datumExpireProvideDataProcessor; }}
6.1.2 處理引擎 ProvideDataProcessorManager這裡的套路依然很熟悉,即ProvideDataProcessor引擎,也就是ProvideDataProcessorManager也繼承了ProvideDataProcessor,但是在support之中設定了 return false,這樣引擎遍歷執行時候,就不會執行自己了。
public class ProvideDataProcessorManager implements ProvideDataProcessor { private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>(); public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) { provideDataProcessors.add(provideDataProcessor); } @Override public void changeDataProcess(ProvideData provideData) { for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) { if (provideDataProcessor.support(provideData)) { provideDataProcessor.changeDataProcess(provideData); } } } @Override public boolean support(ProvideData provideData) { return false; }}
6.1.3 處理Handler這裡的 DatumLeaseManager 就可以對應到前面講的 AfterWorkingProcess。
Handler之中呼叫DatumLeaseManager完成配置資料的部署。
public class DatumExpireProvideDataProcessor implements ProvideDataProcessor { @Autowired private DatumLeaseManager datumLeaseManager; @Override public void changeDataProcess(ProvideData provideData) { if (checkInvalid(provideData)) { return; } boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData() .getObject()); datumLeaseManager.setRenewEnable(enableDataDatumExpire); } private boolean checkInvalid(ProvideData provideData) { boolean invalid = provideData == null || provideData.getProvideData() == null || provideData.getProvideData().getObject() == null; return invalid; } @Override public boolean support(ProvideData provideData) { return ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId()); }}
最終,圖示如下:
+---------------------------+ +--------------------------------------------------+ +---------------------------------+| DefaultMetaServiceImpl | | MetaNodeExchanger | | Meta Server || | | | 1 | +-----------------------------+ || getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | | DataNodeServiceImpl | || | +-----------------------------------+--------------+ | | | |+---------------------------+ ^ | | | | | | | notifyProvideDataChange | | | | | + | |+------------------------------------------+ | | | | | || metaClientHandlers +--------------------------+ | +-----------------------------+ || | +---------------------------------+| +------------------------------------+ | | ^ || | serverChangeHandler | | | | || | | | | | || | statusConfirmHandler | | NotifyProvideDataChange | | || | | | | | || | +--------------------------------+ | | 2 | 3 | | 4| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+ | || | | | | | | || | | | | | get ProvideData from Meta Server | || | | | | | | || | | ProvideData provideData = +--------------------------------------------------------------------------------------+ || | | | | | || | | metaServerService.fetchData <-----------------------------------------------------------------------------------------+| | | | | | ProvideData| | | | | || | | | | | 5 +---------------------------------------------+| | | changeDataProcess(provideData)+--------------+ | ProvideDataProcessor || | | | | | | | || | +--------------------------------+ | | +-------> | changeDataProcess(ProvideData provideData) || +------------------------------------+ | | |+------------------------------------------+ +---------------------------------------------+
手機圖例如下:
連結:https://www.cnblogs.com/rossiXYZ/p/14289209.html