首頁>技術>

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。

本文為第十七篇,介紹SOFARegistry網路操作之配置資訊如何處理。

0x01 業務範疇1.1 配置作用

比如某些系統相關的服務,需要由控制檯來設定。所以Meta Server對控制檯提供了介面,當Meta Server 接受到控制檯請求後,會和 Data Server,Session Server 進行互動,比如Meta Server 提供瞭如下介面:

@Bean@ConditionalOnMissingBeanpublic StopPushDataResource stopPushDataResource() {    return new StopPushDataResource();}@Beanpublic BlacklistDataResource blacklistDataResource() {    return new BlacklistDataResource();}@Beanpublic RenewSwitchResource renewSwitchResource() {    return new RenewSwitchResource();}

對外提供http介面,是因為這是正常基本操作。但是Server之間依然是Bolt協議操作。

1.2 學習方向

此處推導如下:在DataServer端,如何把配置資訊單獨摘出來。

0x02 資料結構2.1 目錄結構

DataServer之中,配置相關目錄如下,可以看到有Handler,服務,task以及provideData。

│   ├── metaserver│   │   ├── DefaultMetaServiceImpl.java│   │   ├── IMetaServerService.java│   │   ├── MetaServerConnectionFactory.java│   │   ├── handler│   │   │   ├── NotifyProvideDataChangeHandler.java│   │   │   ├── ServerChangeHandler.java│   │   │   └── StatusConfirmHandler.java│   │   ├── provideData│   │   │   ├── ProvideDataProcessor.java│   │   │   ├── ProvideDataProcessorManager.java│   │   │   └── processor│   │   │       └── DatumExpireProvideDataProcessor.java│   │   └── task
2.2 資料結構定義

配置相關資料結構如下:

ProvideData是對外的互動介面,裡面是版本號和服務標示dataInfoId。

public class ProvideData implements Serializable {    private ServerDataBox provideData;    private String        dataInfoId;    private Long          version;}

ServerDataBox是具體業務,其定義如下

public class ServerDataBox implements Serializable {    /** Null for locally instantiated, otherwise for internalized */    private byte[]            bytes;    /** Only available if bytes != null */    private int               serialization;    /** Actual object, lazy deserialized */    private Object            object;}  

關於ServerDataBox,目前在Data Server只有一處使用。使用的是boolean型別,也就是控制開關配置。

public void changeDataProcess(ProvideData provideData) {    boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()        .getObject());    datumLeaseManager.setRenewEnable(enableDataDatumExpire);}
0x03 Meta Server 內流程

這裡為了打通流程,需要先提一下 meta server 內部與 metaServerService.fetchData(dataInfoId) 相關的流程。

處於解耦的目的,Meta Server 把某些業務功能分割成四個層次,基本邏輯是:

Http Resource ———>  TaskListener ———>   Task  ————>   Service   

首先給出流程圖如下,下文會逐步介紹流程:

                +------------------------+                |                        |   2    +-------------------------+                | BlacklistDataResource  +------>-+PersistenceDataDBService |                |                        | update +-------------------------+                                                      7+-------+   1   |                        |                                                                       +---------------------------------------+| Admin | +---> | +--------------------- |                                                                       |    Data Server                        |+-------+       | |fireDataChangeNotify| |                                                                       |                                       |                | +--------------------+ |                                                     6                 | +-----------------------------------+ |                +------------------------+                                                                       | |    metaClientHandlers             | |                    |                                     +---------------------+    dataNodeExchanger.request   | | +-------------------------------+ | |                    |        3                            | DataNodeServiceImpl | +----------------------------->+ | | notifyProvideDataChangeHandler| | |                    |                                     +----------+----------+     NotifyProvideDataChange    | | +-------------------------------+ | |                    |   NotifyProvideDataChange                      ^                                           | |                                   | |                    |                                                |                                           | +-----------------------------------+ |                    |                                             5  | notifyProvideDataChange                   +---------------------------------------+                    v                                                |          +---------+-----------------------------------+            |          |       DefaultTaskListenerManager            |            |          |                                             |       +----+----------------------------+          | +-----------------------------------------+ | 4     |                                 |          | | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask |          | |                                         | |       |                                 |          | | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+          | |                                         | |          | | dataNodeChangePushTaskListener          | |          | |                                         | |          | | sessionNodeChangePushTaskListener       | |          | +-----------------------------------------+ |          +---------------------------------------------+

手機圖示如下 :

3.1 Admin請求響應

前面提到,Meta Server 透過 Http協議給Admin提供了一些控制介面,下面我們就以 BlacklistDataResource 為例研究下。

可以看到,blacklistPush 函式中會先儲存在 persistenceDataDBService 中,然後 fireDataChangeNotify 間接傳送 NotifyProvideDataChange。

@Path("blacklist")public class BlacklistDataResource {    @RaftReference    private DBService           persistenceDataDBService;    @Autowired    private TaskListenerManager taskListenerManager;    /**     * update blacklist     * e.g. curl -d '{"FORBIDDEN_PUB":{"IP_FULL":["1.1.1.1","10.15.233.150"]},"FORBIDDEN_SUB_BY_PREFIX":{"IP_FULL":["1.1.1.1"]}}' -H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update     */    @POST    @Path("update")    @Produces(MediaType.APPLICATION_JSON)    public Result blacklistPush(String config) {        PersistenceData persistenceData = createDataInfo();        persistenceData.setData(config);        boolean ret = persistenceDataDBService.update(ValueConstants.BLACK_LIST_DATA_ID,                persistenceData);        fireDataChangeNotify(persistenceData.getVersion(), ValueConstants.BLACK_LIST_DATA_ID,            DataOperator.UPDATE);        Result result = new Result();        result.setSuccess(true);        return result;    }    private PersistenceData createDataInfo() {        DataInfo dataInfo = DataInfo.valueOf(ValueConstants.BLACK_LIST_DATA_ID);        PersistenceData persistenceData = new PersistenceData();        persistenceData.setDataId(dataInfo.getDataId());        persistenceData.setGroup(dataInfo.getDataType());        persistenceData.setInstanceId(dataInfo.getInstanceId());        persistenceData.setVersion(System.currentTimeMillis());        return persistenceData;    }    private void fireDataChangeNotify(Long version, String dataInfoId, DataOperator dataOperator) {        NotifyProvideDataChange notifyProvideDataChange = new NotifyProvideDataChange(dataInfoId,            version, dataOperator);        TaskEvent taskEvent = new TaskEvent(notifyProvideDataChange,            TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK);        taskListenerManager.sendTaskEvent(taskEvent);    }}

這裡對應上圖的:

                +------------------------+                |                        |   2    +-------------------------+                | BlacklistDataResource  +------>-+PersistenceDataDBService |                |                        | update +-------------------------++-------+   1   |                        || Admin | +---> | +--------------------+ |+-------+       | |fireDataChangeNotify| |                | +--------------------+ |                +------------------------+
3.2 DBService

可以看到,DBService也是基於 Raft,這說明在MetaServer叢集內部自己維護了一致性。

@RaftReferenceprivate DBService           persistenceDataDBService;

PersistenceDataDBService 類精簡版定義如下:

@RaftServicepublic class PersistenceDataDBService extends AbstractSnapshotProcess implements DBService {    private ConcurrentHashMap<String, Object> serviceMap        = new ConcurrentHashMap<>();    @Override    public boolean put(String key, Object value) {        Object ret = serviceMap.put(key, value);        return true;    }    @Override    public DBResponse get(String key) {        Object ret = serviceMap.get(key);        return ret != null ? DBResponse.ok(ret).build() : DBResponse.notfound().build();    }    @Override    public boolean update(String key, Object value) {        Object ret = serviceMap.put(key, value);        return true;    }          @Override    public Set<String> getSnapshotFileNames() {        if (!snapShotFileNames.isEmpty()) {            return snapShotFileNames;        }        snapShotFileNames.add(this.getClass().getSimpleName());        return snapShotFileNames;    }      }

可以看出來,主要採用了ConcurrentHashMap來進行儲存,Raft機制則用檔案系統完成快照備份。

3.3 Bean

如前所述,為了解耦,Meta Server 把一些訊息處理轉發等功能封裝為TaskListener,由 TaskListenerManager 在邏輯上負責統一執行。這裡就以ProvideData相關功能為例,對應的Bean是。

@Configurationpublic static class MetaServerTaskConfiguration {      ......    @Bean    public TaskListener persistenceDataChangeNotifyTaskListener(TaskListenerManager taskListenerManager) {        TaskListener taskListener = new PersistenceDataChangeNotifyTaskListener(            sessionNodeSingleTaskProcessor());        taskListenerManager.addTaskListener(taskListener);        return taskListener;    }    @Bean    public TaskListenerManager taskListenerManager() {        return new DefaultTaskListenerManager();    }}
3.4 Listener

Listener的執行引擎如下,可以看出來是遍歷listener列表進行處理,如果某listener可以處理,就執行。

public class DefaultTaskListenerManager implements TaskListenerManager {    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();    @Override    public Multimap<TaskType, TaskListener> getTaskListeners() {        return taskListeners;    }    @Override    public void addTaskListener(TaskListener taskListener) {        taskListeners.put(taskListener.support(), taskListener);    }    @Override    public void sendTaskEvent(TaskEvent taskEvent) {        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());        for (TaskListener taskListener : taskListeners) {            taskListener.handleEvent(taskEvent);        }    }}

對應業務Listener如下:

public class PersistenceDataChangeNotifyTaskListener implements TaskListener {    @Autowired    private MetaServerConfig                       metaServerConfig;    private TaskDispatcher<String, MetaServerTask> singleTaskDispatcher;    public PersistenceDataChangeNotifyTaskListener(TaskProcessor sessionNodeSingleTaskProcessor) {        singleTaskDispatcher = TaskDispatchers.createDefaultSingleTaskDispatcher(            TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK.getName(), sessionNodeSingleTaskProcessor);    }    @Override    public TaskType support() {        return TaskType.PERSISTENCE_DATA_CHANGE_NOTIFY_TASK;    }    @Override    public void handleEvent(TaskEvent event) {        MetaServerTask persistenceDataChangeNotifyTask = new PersistenceDataChangeNotifyTask(            metaServerConfig);        persistenceDataChangeNotifyTask.setTaskEvent(event);        singleTaskDispatcher.dispatch(persistenceDataChangeNotifyTask.getTaskId(),            persistenceDataChangeNotifyTask, persistenceDataChangeNotifyTask.getExpiryTime());    }}

這裡對應瞭如下:

                +------------------------+                |                        |   2    +-------------------------+                | BlacklistDataResource  +------>-+PersistenceDataDBService |                |                        | update +-------------------------++-------+   1   |                        || Admin | +---> | +--------------------+ |+-------+       | |fireDataChangeNotify| |                | +--------------------+ |                +------------------------+                    |                    |        3                    |                    |   NotifyProvideDataChange                    |                    |                    v          +---------+-----------------------------------+          |       DefaultTaskListenerManager            |          |                                             |          | +-----------------------------------------+ |          | | persistenceDataChangeNotifyTaskListener | |          | |                                         | |          | | receiveStatusConfirmNotifyTaskListener  | |          | |                                         | |          | | dataNodeChangePushTaskListener          | |          | |                                         | |          | | sessionNodeChangePushTaskListener       | |          | +-----------------------------------------+ |          +---------------------------------------------+
3.5 Task

Listener會呼叫到Task。

處理Task如下,需要區分根據NoteType不同,來呼叫不同的服務:

public class PersistenceDataChangeNotifyTask extends AbstractMetaServerTask {    private final SessionNodeService sessionNodeService;    private final DataNodeService    dataNodeService;    final private MetaServerConfig   metaServerConfig;    private NotifyProvideDataChange  notifyProvideDataChange;    @Override    public void execute() {        Set<NodeType> nodeTypes = notifyProvideDataChange.getNodeTypes();        if (nodeTypes.contains(NodeType.DATA)) {            dataNodeService.notifyProvideDataChange(notifyProvideDataChange);        }        if (nodeTypes.contains(NodeType.SESSION)) {            sessionNodeService.notifyProvideDataChange(notifyProvideDataChange);        }    }    @Override    public void setTaskEvent(TaskEvent taskEvent) {        Object obj = taskEvent.getEventObj();        if (obj instanceof NotifyProvideDataChange) {            this.notifyProvideDataChange = (NotifyProvideDataChange) obj;        }     }}

這裡對應

                +------------------------+                |                        |   2    +-------------------------+                | BlacklistDataResource  +------>-+PersistenceDataDBService |                |                        | update +-------------------------++-------+   1   |                        || Admin | +---> | +--------------------+ |+-------+       | |fireDataChangeNotify| |                | +--------------------+ |                +------------------------+                    |                    |        3                    |                    |   NotifyProvideDataChange                    |                    |                    v+-------------------+-------------------------+|       DefaultTaskListenerManager            ||                                             |       +---------------------------------+| +-----------------------------------------+ | 4     |                                 || | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask || |                                         | |       |                                 || | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+| |                                         | || | dataNodeChangePushTaskListener          | || |                                         | || | sessionNodeChangePushTaskListener       | || +-----------------------------------------+ |+---------------------------------------------+
3.6 服務

task會呼叫服務來執行具體業務,具體業務服務如下,這裡會向DataServer或者SessionServer傳送推送。

public class DataNodeServiceImpl implements DataNodeService {    @Autowired    private NodeExchanger         dataNodeExchanger;    @Autowired    private StoreService          dataStoreService;    @Autowired    private AbstractServerHandler dataConnectionHandler;    @Override    public NodeType getNodeType() {        return NodeType.DATA;    }    @Override    public void notifyProvideDataChange(NotifyProvideDataChange notifyProvideDataChange) {        NodeConnectManager nodeConnectManager = getNodeConnectManager();        Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);        // add register confirm        StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);        Map<String, DataNode> dataNodes = storeService.getNodes();        for (InetSocketAddress connection : connections) {            if (!dataNodes.keySet().contains(connection.getAddress().getHostAddress())) {                continue;            }            try {                Request<NotifyProvideDataChange> request = new Request<NotifyProvideDataChange>() {                    @Override                    public NotifyProvideDataChange getRequestBody() {                        return notifyProvideDataChange;                    }                    @Override                    public URL getRequestUrl() {                        return new URL(connection);                    }                };                dataNodeExchanger.request(request);            }         }    }}

這裡對應

                +------------------------+                |                        |   2    +-------------------------+                | BlacklistDataResource  +------>-+PersistenceDataDBService |                |                        | update +-------------------------++-------+   1   |                        || Admin | +---> | +--------------------+ |+-------+       | |fireDataChangeNotify| |                | +--------------------+ |                +------------------------+          |                                     +---------------------+          |        3                            | DataNodeServiceImpl |          |                                     +----------+----------+          |   NotifyProvideDataChange                      ^          |                                                |          |                                             5  | notifyProvideDataChange          v                                                |+---------+-----------------------------------+            ||       DefaultTaskListenerManager            |            ||                                             |       +----+----------------------------+| +-----------------------------------------+ | 4     |                                 || | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask || |                                         | |       |                                 || | receiveStatusConfirmNotifyTaskListener  | |       +---------------------------------+| |                                         | || | dataNodeChangePushTaskListener          | || |                                         | || | sessionNodeChangePushTaskListener       | || +-----------------------------------------+ |+---------------------------------------------+

傳送之後,就是

                +------------------------+                |                        |   2    +-------------------------+                | BlacklistDataResource  +------>-+PersistenceDataDBService |                |                        | update +-------------------------++-------+   1   |                        || Admin | +---> | +--------------------+ |+-------+       | |fireDataChangeNotify| |                | +--------------------+ |                +------------------------+                    |       3                    |   NotifyProvideDataChange                    v+-------------------+-------------------------+|       DefaultTaskListenerManager            ||                                             |       +---------------------------------+| +-----------------------------------------+ | 4     |                                 || | persistenceDataChangeNotifyTaskListener | +------>+ PersistenceDataChangeNotifyTask || | receiveStatusConfirmNotifyTaskListener  | |       |                                 || | dataNodeChangePushTaskListener          | |       +----+----------------------------+| | sessionNodeChangePushTaskListener       | |            || +-----------------------------------------+ |            |+---------------------------------------------+         5  | notifyProvideDataChange                                                           |               +-------------------------------------------+               |               v +---v---------+-------+ | DataNodeServiceImpl |                         +---------------------------------------+ +-------------+-------+                         |    Data Server                      7 |               |                       6         |                                       |               |     dataNodeExchanger.request   | +-----------------------------------+ |               +->------------------------------>+ |    metaClientHandlers             | |                      NotifyProvideDataChange    | | +-------------------------------+ | |                                                 | | | notifyPro|ideDataChangeHandler| | |                                                 | | +-------------------------------+ | |                                                 | +-----------------------------------+ |                                                 +---------------------------------------+

現在我們知道了,在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函式會通知 Data Server,現在有一個NotifyProvideDataChange 訊息。

0x04 呼叫路徑 in Data Server

執行序列來到了DataServer。我們先要做一些前提準備。

4.1 Bean

Bean metaClientHandlers是 MetaNodeExchanger 的響應函式。而 notifyProvideDataChangeHandler 是 metaClientHandlers 的一部分。

@Bean(name = "metaClientHandlers")public Collection<AbstractClientHandler> metaClientHandlers() {    Collection<AbstractClientHandler> list = new ArrayList<>();    list.add(serverChangeHandler());    list.add(statusConfirmHandler());    list.add(notifyProvideDataChangeHandler());    return list;}
4.2 網路互動

MetaNodeExchanger 在 DefaultMetaServiceImpl.getMetaServerMap 呼叫 metaNodeExchanger.connect 的時候,會設定這個 metaClientHandlers。這樣就把notifyProvideDataChangeHandler同MetaServer以Bolt方式聯絡了起來。

public class DefaultMetaServiceImpl implements IMetaServerService {  @Override  public Map<String, Set<String>> getMetaServerMap() {      Connection connection = null;              connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()                  .next(), dataServerConfig.getMetaServerPort()))).getConnection();  }}

MetaNodeExchanger定義如下,其作用是統一處理DataServer內部關於MetaServer的互動。

public class MetaNodeExchanger implements NodeExchanger {    @Autowired    private Exchange                          boltExchange;    @Autowired    private IMetaServerService                metaServerService;    @Resource(name = "metaClientHandlers")    private Collection<AbstractClientHandler> metaClientHandlers;    public Channel connect(URL url) {        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);        if (client == null) {            synchronized (this) {                client = boltExchange.getClient(Exchange.META_SERVER_TYPE);                if (client == null) {                    client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,                        metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()]));                }            }        }        //try to connect data        Channel channel = client.getChannel(url);        if (channel == null) {            synchronized (this) {                channel = client.getChannel(url);                if (channel == null) {                    channel = client.connect(url);                }            }        }        return channel;    }}
4.3 Handler 定義

NotifyProvideDataChangeHandler 在 interest 函式中,設定了自己可以處理 NotifyProvideDataChange 型別訊息。這樣當 MetaServer 通知有 NotifyProvideDataChange 的時候,就會呼叫 metaServerService.fetchData(dataInfoId); 獲取 ProvideData,進行後續處理。

public class NotifyProvideDataChangeHandler extends AbstractClientHandler {    @Autowired    private IMetaServerService   metaServerService;    @Autowired    private ProvideDataProcessor provideDataProcessorManager;    @Override    public Object doHandle(Channel channel, Object request) {        NotifyProvideDataChange notifyProvideDataChange = (NotifyProvideDataChange) request;        String dataInfoId = notifyProvideDataChange.getDataInfoId();        if (notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) {            ProvideData provideData = metaServerService.fetchData(dataInfoId);            provideDataProcessorManager.changeDataProcess(provideData);        }        return null;    }      @Override    public Class interest() {        return NotifyProvideDataChange.class;    }  }
4.4 呼叫 Handler

在Meta Server 之中,DataNodeServiceImpl.notifyProvideDataChange 函式會通知 Data Server,現在有一個NotifyProvideDataChange 訊息。

於是NotifyProvideDataChangeHandler將作出響應。

4.5 獲取 ProvideData

在 NotifyProvideDataChangeHandler 之中 ,有如下

ProvideData provideData = metaServerService.fetchData(dataInfoId);

然後呼叫 DefaultMetaServiceImpl 中 fetchData 來去 Meta Server 獲取 ProvideData。

@Overridepublic ProvideData fetchData(String dataInfoId) {    Map<String, Connection> connectionMap = metaServerConnectionFactory        .getConnections(dataServerConfig.getLocalDataCenter());    String leader = getLeader().getIp();    if (connectionMap.containsKey(leader)) {        Connection connection = connectionMap.get(leader);        if (connection.isFine()) {            try {                Request<FetchProvideDataRequest> request = new Request<FetchProvideDataRequest>() {                    @Override                    public FetchProvideDataRequest getRequestBody() {                        return new FetchProvideDataRequest(dataInfoId);                    }                    @Override                    public URL getRequestUrl() {                        return new URL(connection.getRemoteIP(), connection.getRemotePort());                    }                };                Response response = metaNodeExchanger.request(request);                Object result = response.getResult();                if (result instanceof ProvideData) {                    return (ProvideData) result;                }             }         }    }    String newip = refreshLeader().getIp();    return null;}

現在圖示如下:

+---------------------------+     +--------------------------------------------------+         +---------------------------------+|  DefaultMetaServiceImpl   |     | MetaNodeExchanger                                |         |      Meta Server                ||                           |     |                                                  |  1      | +-----------------------------+ ||    getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | |   DataNodeServiceImpl       | ||                           |     +-----------------------------------+--------------+         | |                             | |+---------------------------+                                         ^                        | |                             | |                                                                      |                        | |  notifyProvideDataChange    | |                                                                      |                        | |              +              | |+------------------------------------------+                          |                        | |              |              | ||     metaClientHandlers                   +--------------------------+                        | +-----------------------------+ ||                                          |                                                   +---------------------------------+| +------------------------------------+   |                                                                    |       ^| |  serverChangeHandler               |   |                                                                    |       || |                                    |   |                                                                    |       || |  statusConfirmHandler              |   |           NotifyProvideDataChange                                  |       || |                                    |   |                                                                    |       || | +--------------------------------+ |   |                                     2                              |     3 || | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       || | |                                | |   |                                                                            || | |                                | |   |                                                                            || | |                                | |   |                                                                            || | |  ProvideData provideData = +--------------------------------------------------------------------------------------+| | |                                | |   |         get ProvideData from Meta Server          FetchProvideDataRequest| | |  metaServerService.fetchData   | |   || | |                                | |   || | |                                | |   || | |                                | |   || | | changeDataProcess(provideData) | |   || | |                                | |   || | +--------------------------------+ |   || +------------------------------------+   |+------------------------------------------+

手機上如下:

0x05 回到了MetaServer

執行序列回到了MetaServer,它收到了FetchProvideDataRequest。

5.1 ata Server請求響應

FetchProvideDataRequestHandler是響應函式。函式邏輯相對簡單,就是從DBService之中根據DataInfoId獲取資料,返回給呼叫者。

public class FetchProvideDataRequestHandler extends AbstractServerHandler<FetchProvideDataRequest> {    @RaftReference    private DBService           persistenceDataDBService;    @Override    public Object reply(Channel channel, FetchProvideDataRequest fetchProvideDataRequest) {            DBResponse ret = persistenceDataDBService.get(fetchProvideDataRequest.getDataInfoId());            if (ret.getOperationStatus() == OperationStatus.SUCCESS) {                PersistenceData data = (PersistenceData) ret.getEntity();                ProvideData provideData = new ProvideData(new ServerDataBox(data.getData()),                    fetchProvideDataRequest.getDataInfoId(), data.getVersion());                return provideData;            } else if (ret.getOperationStatus() == OperationStatus.NOTFOUND) {                ProvideData provideData = new ProvideData(null,                    fetchProvideDataRequest.getDataInfoId(), null);                return provideData;            }         }    }    @Override    public HandlerType getType() {        return HandlerType.PROCESSER;    }    @Override    public Class interest() {        return FetchProvideDataRequest.class;    }}

由此可見,這裡的關鍵是 DBService。

於是從MetaServer角度看,流程如下:

+----------------------------------------------+|                 Data Server                  ||                                              || +---------------------------------------+    || |    NotifyProvideDataChangeHandler     |    || |                                       |    || |                                       |    || |metaSer^erSer^ice.fetchData(dataInfoId)|    || +---------------------------------------+    |+----------------------------------------------+                         |   ^                         |   |                    1    |   | FetchProvideDataRequest |   | ProvideData                         |   |                         |   |  4 +-----------------------------------------+ |    Meta Server        |   |             | |                       |   |             | |  +--------------------v---+-------+     | |  | FetchProvideDataRequestHandler |     | |  +--------------+---+-------------+     | |              2  |   ^                   | |                 |   | DBResponse        | | get(DataInfoId) |   |  3                | |                 v   |                   | |       +---------+---+------------+      | |       | PersistenceDataDBService |      | |       +--------------------------+      | +-----------------------------------------+
5.2 Session Server對應處理

Session Server 也會發起 FetchProvideDataRequest。在 SessionServerBootstrap 中有如下函式,都會發起請求,獲取配置資訊。

private void fetchStopPushSwitch(URL leaderUrl) {    FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(        ValueConstants.STOP_PUSH_DATA_SWITCH_DATA_ID);    Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl);    if (ret instanceof ProvideData) {        ProvideData provideData = (ProvideData) ret;        provideDataProcessorManager.fetchDataProcess(provideData);    } }private void fetchEnableDataRenewSnapshot(URL leaderUrl) {    FetchProvideDataRequest fetchProvideDataRequest = new FetchProvideDataRequest(        ValueConstants.ENABLE_DATA_RENEW_SNAPSHOT);    Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl);    if (data instanceof ProvideData) {        ProvideData provideData = (ProvideData) data;        provideDataProcessorManager.fetchDataProcess(provideData);    }}private void fetchBlackList() {    blacklistManager.load();}
0x06 DataServer6.1 處理 ProvideData

在 NotifyProvideDataChangeHandler 之中,如下語句用來處理ProvideData。就是在fetchData之中。

在請求響應處理中

            Response response = metaNodeExchanger.request(request);            Object result = response.getResult();            if (result instanceof ProvideData) {                return (ProvideData) result;            } 

就是如下:

+---------------------------+     +--------------------------------------------------+         +---------------------------------+|  DefaultMetaServiceImpl   |     | MetaNodeExchanger                                |         |      Meta Server                ||                           |     |                                                  |  1      | +-----------------------------+ ||    getMetaServerMap +---------->-+boltExchange.connect(metaClientHandlers.toArray) +-------> | |   DataNodeServiceImpl       | ||                           |     +-----------------------------------+--------------+         | |                             | |+---------------------------+                                         ^                        | |                             | |                                                                      |                        | |  notifyProvideDataChange    | |                                                                      |                        | |              +              | |+------------------------------------------+                          |                        | |              |              | ||     metaClientHandlers                   +--------------------------+                        | +-----------------------------+ ||                                          |                                                   +---------------------------------+| +------------------------------------+   |                                                                    |       ^    || |  serverChangeHandler               |   |                                                                    |       |    || |                                    |   |                                                                    |       |    || |  statusConfirmHandler              |   |           NotifyProvideDataChange                                  |       |    || |                                    |   |                                                                    |       |    || | +--------------------------------+ |   |                                     2                              |     3 |    | 4| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |    || | |                                | |   |                                                                            |    || | |                                | |   |                     get ProvideData from Meta Server                       |    || | |                                | |   |                                                                            |    || | |  ProvideData provideData = +--------------------------------------------------------------------------------------+    || | |                                | |   |                                                                                 || | |  metaServerService.fetchData <-----------------------------------------------------------------------------------------+| | |                                | |   |                          ProvideData| | |                                | |   || | |                                | |   || | | changeDataProcess(provideData) | |   || | |                                | |   || | +--------------------------------+ |   || +------------------------------------+   |+------------------------------------------+

手機如下:

繼續處理是如下:

provideDataProcessorManager.changeDataProcess(provideData);

這就牽扯瞭如何用引擎處理。

6.1.1 Bean

這裡生成了處理引擎 ProvideDataProcessorManager,添加了一個處理handler DatumExpireProvideDataProcessor。

@Configurationpublic static class DataProvideDataConfiguration {    @Bean    public ProvideDataProcessor provideDataProcessorManager() {        return new ProvideDataProcessorManager();    }    @Bean    public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {        ProvideDataProcessor datumExpireProvideDataProcessor = new DatumExpireProvideDataProcessor();        ((ProvideDataProcessorManager) provideDataProcessorManager)            .addProvideDataProcessor(datumExpireProvideDataProcessor);        return datumExpireProvideDataProcessor;    }}
6.1.2 處理引擎 ProvideDataProcessorManager

這裡的套路依然很熟悉,即ProvideDataProcessor引擎,也就是ProvideDataProcessorManager也繼承了ProvideDataProcessor,但是在support之中設定了 return false,這樣引擎遍歷執行時候,就不會執行自己了。

public class ProvideDataProcessorManager implements ProvideDataProcessor {    private Collection<ProvideDataProcessor> provideDataProcessors = new ArrayList<>();    public void addProvideDataProcessor(ProvideDataProcessor provideDataProcessor) {        provideDataProcessors.add(provideDataProcessor);    }    @Override    public void changeDataProcess(ProvideData provideData) {        for (ProvideDataProcessor provideDataProcessor : provideDataProcessors) {            if (provideDataProcessor.support(provideData)) {                provideDataProcessor.changeDataProcess(provideData);            }        }    }    @Override    public boolean support(ProvideData provideData) {        return false;    }}
6.1.3 處理Handler

這裡的 DatumLeaseManager 就可以對應到前面講的 AfterWorkingProcess。

Handler之中呼叫DatumLeaseManager完成配置資料的部署。

public class DatumExpireProvideDataProcessor implements ProvideDataProcessor {    @Autowired    private DatumLeaseManager   datumLeaseManager;    @Override    public void changeDataProcess(ProvideData provideData) {        if (checkInvalid(provideData)) {            return;        }        boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()            .getObject());        datumLeaseManager.setRenewEnable(enableDataDatumExpire);    }    private boolean checkInvalid(ProvideData provideData) {        boolean invalid = provideData == null || provideData.getProvideData() == null                          || provideData.getProvideData().getObject() == null;        return invalid;    }    @Override    public boolean support(ProvideData provideData) {        return ValueConstants.ENABLE_DATA_DATUM_EXPIRE.equals(provideData.getDataInfoId());    }}

最終,圖示如下:

+---------------------------+     +--------------------------------------------------+         +---------------------------------+|  DefaultMetaServiceImpl   |     | MetaNodeExchanger                                |         |      Meta Server                ||                           |     |                                                  |  1      | +-----------------------------+ ||    getMetaServerMap +---------->--boltExchange.connect(metaClientHandlers.toArray) +-------> | |   DataNodeServiceImpl       | ||                           |     +-----------------------------------+--------------+         | |                             | |+---------------------------+                                         ^                        | |                             | |                                                                      |                        | |  notifyProvideDataChange    | |                                                                      |                        | |              +              | |+------------------------------------------+                          |                        | |              |              | ||     metaClientHandlers                   +--------------------------+                        | +-----------------------------+ ||                                          |                                                   +---------------------------------+| +------------------------------------+   |                                                                    |       ^    || |  serverChangeHandler               |   |                                                                    |       |    || |                                    |   |                                                                    |       |    || |  statusConfirmHandler              |   |           NotifyProvideDataChange                                  |       |    || |                                    |   |                                                                    |       |    || | +--------------------------------+ |   |                                     2                              |     3 |    | 4| | |notifyProvideDataChangeHandler<-------------------------------------<--------------------------------------+       |    || | |                                | |   |                                                                            |    || | |                                | |   |                     get ProvideData from Meta Server                       |    || | |                                | |   |                                                                            |    || | |  ProvideData provideData = +--------------------------------------------------------------------------------------+    || | |                                | |   |                                                                                 || | |  metaServerService.fetchData <-----------------------------------------------------------------------------------------+| | |                                | |   |                          ProvideData| | |                                | |   || | |                                | |   |     5           +---------------------------------------------+| | | changeDataProcess(provideData)+--------------+         |   ProvideDataProcessor                      || | |                                | |   |       |         |                                             || | +--------------------------------+ |   |       +-------> | changeDataProcess(ProvideData provideData)  || +------------------------------------+   |                 |                                             |+------------------------------------------+                 +---------------------------------------------+

手機圖例如下:

連結:https://www.cnblogs.com/rossiXYZ/p/14289209.html

15
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • leetcode460_go_LFU快取