一、啟動一個服務端Provider1. 定義一個介面和實現本篇主要講Dubbo服務是如何註冊,匯出並接受服務請求。
public interface UserService { void say(String message);}public class UserServiceImpl implements UserService { public void say(String message) { System.out.println("say:" + message); }}
2. 本地服務註冊到zkpublic class Tester { @Test public void providerTest() { //1. 服務方要把UserService方法提供給外面呼叫 UserService userService = new UserServiceImpl(); //2. 應用配置 ApplicationConfig app = new ApplicationConfig(); app.setName("providerTest"); //3. 指定一個註冊中心 RegistryConfig registry = new RegistryConfig(); registry.setAddress("zookeeper://127.0.0.1:2181"); //4. 指定協議型別 ProtocolConfig protocol = new ProtocolConfig(); protocol.setName("dubbo"); protocol.setPort(8012); protocol.setThreads(200); // 服務提供者暴露服務配置 ServiceConfig<UserService> service = new ServiceConfig<UserService>(); // 此例項很重,封裝了與註冊中心的連線,請自行快取,否則可能造成記憶體和連線洩漏 service.setApplication(app); service.setRegistry(registry); // 多個註冊中心可以用setRegistries() service.setProtocol(protocol); // 多個協議可以用setProtocols() service.setInterface(UserService.class); service.setRef(userService); service.setVersion("1.0.0"); // 暴露及註冊服務 //dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version=1.0.0 service.export(); }}
3. 分析原理這裡只是分析下大概原理,給各位同學先帶來帶你感受,實際步驟後面分析原始碼時候再細說
在進行分析之前我們思考一下,當我們不使用RPC框架和SpringCloud的時候,如果我們要呼叫其他第三方的服務時候,我們會怎麼處理呢?
通過下面這種方式每次呼叫時候構建一個HTTP的請求。
public class Tester{ public static void sayRequest(String message){ OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("http://第三方服務的介面地址?message"+message) .get().addHeader("Cache-Control","no-cache") .build(); client.newCall(request).execute(); } public static void main(String[]args){ sayRequest("你好") }}
使用後我們就可以像呼叫本地方法一樣來呼叫遠端介面了? 那麼Dubbo是如何實現的呢? 其實就是在底層幫我們做了類似於http的通訊 而通過api的方式遮蔽了底層。讓我們直接將呼叫本地方法一樣呼叫遠端方法。
關鍵詞一:通訊協議dubbo預設不是基於HTTP,而是基於dubbo自定義的協議。因為jdk自帶的socket api不太友好,所以dubbo底層是使用netty類做通訊的 說白了這個協議和http類似都是基於tcp協議從而進行封裝,不同點就是資料格式不同。 如下我們自定義了一個協議來讀取tcp連線中資料。
下面程式碼不是重點,重點知道協議就是,約定從tcp連線中讀取資料的方式和方法。比如約定閱讀的第一個位元組是協議型別,第二個是序列化型別,第三個是報文資料長度,第四個就是具體的報文資料。
二、提供服務流程這裡我們只先分析dubbo的原始碼,後面再說dubbo整合spring的原理。
1. 要提供服務的物件public interface UserService { void say(String message);}public class UserServiceImpl implements UserService { public void say(String message) { System.out.println("say:" + message); }}
2. 建立一個應用ApplicationConfig app = new ApplicationConfig();app.setName("providerTest");
3. 指定註冊中心
這裡我們使用zookeeper作為註冊中心
RegistryConfig registry = new RegistryConfig(); registry.setAddress("zookeeper://127.0.0.1:2181");
4. 指定通訊協議
ProtocolConfig protocol = new ProtocolConfig(); protocol.setName("dubbo"); protocol.setPort(8012); protocol.setThreads(200);
5. 匯出服務到zk // 服務提供者暴露服務配置 ServiceConfig<UserService> service = new ServiceConfig<UserService>(); // 此例項很重,封裝了與註冊中心的連線,請自行快取,否則可能造成記憶體和連線洩漏 service.setApplication(app); service.setRegistry(registry); // 多個註冊中心可以用setRegistries() service.setProtocol(protocol); // 多個協議可以用setProtocols() service.setInterface(UserService.class); service.setRef(userService); service.setVersion("1.0.0"); // 暴露及註冊服務 //dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version=1.0.0 service.export();
當這一步進行完後,我們會在zookeeper的控制檯找到自己的服務地址。
通過url解碼之後就是
dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version=1.0.0
三、原始碼分析1. 服務註冊我們在看二流程中,可以看到前面的1234建立的步驟都是在5中使用的,說明1234其實都是資料的載體,具體如何使用是在5中來使用的。而5的物件是ServiceConfig。所以說看原始碼的入口就從ServiceConfig.export()開始。
ServiceConfig的export最終後呼叫doExport();
doExport方法會先檢查然後在註冊服務到Netty伺服器和註冊到zk
protected synchronized void doExport() { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); //檢查介面方法是否存在在介面中,這種是使用的方法級別的執行時候 checkInterfaceAndMethods(interfaceClass, methods); //檢查介面例項是否存在,必須存在否則無法執行反射 checkRef(); //檢查應該配置,如果沒有配置自動建立一個,應用名是dubbo.application.name的值 checkApplication(); //檢查註冊中心 checkRegistry(); //檢查協議,預設是dubbo協議 checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } //真正匯出服務 doExportUrls(); }
這一步會將服務在本地啟動一個服務,同時將服務註冊到註冊中心中。
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=providerTest&dubbo=2.5.3&pid=48656®istry=zookeeper×tamp=1597052329640 for (ProtocolConfig protocolConfig : protocols) { //核心邏輯在這裡 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
根據doExportUrlsFor1Protocol的原始碼。我們發現dubbo中的所有模型都向Invoker來靠攏。
先建立一個Socket來驗證下能不能連線上註冊中心然後根據協議資訊,找到實現類。埠如果指定了就用指定的,沒有指定就隨機生成。預設是20880獲取服務版本號,首先查詢MANIFEST.MF規範中的版本號。沒有就用指定的版本號通過反射生成Invoker物件匯出Invoker啟動一個Netty服務DubboProtocol.openServer註冊到zk中RegistryProtocol.export2. Netty服務接受服務前面註冊時候,我們說了在DubboProtocol中去建立服務的。那我們直接看這部分程式碼。
public class DubboProtocol extends AbstractProtocol { //邏輯處理器 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要處理高版本呼叫低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } //建立服務 private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一個只有server可以呼叫的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支援reset,配合override功能使用 server.reset(url); } } } //建立服務 private ExchangeServer createServer(URL url) { //預設開啟server關閉時傳送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //預設開啟heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; } }
我們主要看服務端要的3個方法
ExchangeHandler邏輯處理器建立服務openServer和createServer。 底層實現NettyTransporterExchangeHandler#ExchangeHandler既然我們說了底層是Netty來實現的,那麼又知道Netty是通訊框架。那麼我們來看下服務端的處理邏輯吧。
class DubboProtocol{ private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要處理高版本呼叫低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } }}
過濾器請看註釋
3. 編碼器和解碼器這裡稍微說一點編碼器,dubbo協議的編碼器。
public class NettyServer extends AbstractServer implements Server { @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ //解碼器 pipeline.addLast("decoder", adapter.getDecoder()); //編碼器 pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
主要看這個類Codec2
我們主要看服務端如何將tcp二進位制資料轉成dubbo裡面的模型。
客戶端: 資料DecodeableRpcInvocation -> 通過編碼器轉換成 -> 二進位制資料
服務端: 二進位制資料 -> 解碼器 -> DecodeableRpcInvocation -> DubboProtocol#requestHandler處理
4. 序列化協議java模型如何轉二進位制,就是序列化協議。我們所說的hession2協議就在這裡用的。這裡追求的是速度快,資料小。
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
四、Invoker前面說了dubbo中的模型都向Invoker靠攏。其實說白了就是反射。
1. 生成Invoker物件可以看到dubbo裡面已經提供了,構建方法。我們先熟悉如何使用其API。然後把這些小的知識點慢慢的串起來就好了。
public class Tester { @Test public void buildInvokerTest() { JavassistProxyFactory factory = new JavassistProxyFactory(); UserService userService = new UserServiceImpl(); URL dubboUrl = URL.valueOf("test://"); final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl); }2. 建立執行引數Invoker是執行體,Invocation是執行引數
public class Tester { public void buildInvokerTest() { final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl); Invocation invocation = new Invocation() { public String getMethodName() { return "say"; } public Class<?>[] getParameterTypes() { return new Class[]{String.class}; } public Object[] getArguments() { return new Object[]{"hello"}; } public Map<String, String> getAttachments() { return null; } public String getAttachment(String key) { return null; } public String getAttachment(String key, String defaultValue) { return null; } public Invoker<?> getInvoker() { return null; } }; System.out.println(invoker.invoke(invocation)); } }3. 構建帶有過濾器的Invoker@Test public void linkedInvokerTest() { JavassistProxyFactory factory = new JavassistProxyFactory(); UserService userService = new UserServiceImpl(); URL dubboUrl = URL.valueOf("test://"); final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl); List<Filter> filters = new ArrayList(); Filter filter = new Filter() { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { System.out.println("-------執行過濾器-------"); return invoker.invoke(invocation); } }; filters.add(filter); Invoker<UserService> userServiceInvoker = buildInvokerChain(invoker, filters); userServiceInvoker.invoke(new Invocation() { public String getMethodName() { return "say"; } public Class<?>[] getParameterTypes() { return new Class[]{String.class}; } public Object[] getArguments() { return new Object[]{"hello 過濾器"}; } public Map<String, String> getAttachments() { return null; } public String getAttachment(String key) { return null; } public String getAttachment(String key, String defaultValue) { return null; } public Invoker<?> getInvoker() { return null; } }); }五、總結知識點回顧
如何將物件方法生成Invoker如何將Invoker註冊到註冊地中心如何處理客戶端的請求二進位制資料轉java資料協議協議中包含的序列化知識最後求關注,求訂閱,謝謝你的閱讀!
下一篇會講,dubbo如何與Spring進行整合。