首頁>技術>

本篇主要講Dubbo服務是如何註冊,匯出並接受服務請求。

一、啟動一個服務端Provider1. 定義一個介面和實現
public interface UserService {    void say(String message);}public class UserServiceImpl implements UserService {    public void say(String message) {        System.out.println("say:" + message);    }}
2. 本地服務註冊到zk
public class Tester {    @Test    public void providerTest() {        //1. 服務方要把UserService方法提供給外面呼叫        UserService userService = new UserServiceImpl();        //2. 應用配置        ApplicationConfig app = new ApplicationConfig();        app.setName("providerTest");        //3. 指定一個註冊中心        RegistryConfig registry = new RegistryConfig();        registry.setAddress("zookeeper://127.0.0.1:2181");        //4. 指定協議型別        ProtocolConfig protocol = new ProtocolConfig();        protocol.setName("dubbo");        protocol.setPort(8012);        protocol.setThreads(200);        // 服務提供者暴露服務配置        ServiceConfig<UserService> service = new ServiceConfig<UserService>(); // 此例項很重,封裝了與註冊中心的連線,請自行快取,否則可能造成記憶體和連線洩漏        service.setApplication(app);        service.setRegistry(registry); // 多個註冊中心可以用setRegistries()        service.setProtocol(protocol); // 多個協議可以用setProtocols()        service.setInterface(UserService.class);        service.setRef(userService);        service.setVersion("1.0.0");        // 暴露及註冊服務        //dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version=1.0.0        service.export();    }}
3. 分析原理

這裡只是分析下大概原理,給各位同學先帶來帶你感受,實際步驟後面分析原始碼時候再細說

在進行分析之前我們思考一下,當我們不使用RPC框架和SpringCloud的時候,如果我們要呼叫其他第三方的服務時候,我們會怎麼處理呢?

通過下面這種方式每次呼叫時候構建一個HTTP的請求。

public class Tester{    public static void sayRequest(String message){        OkHttpClient client = new OkHttpClient();        Request request = new Request.Builder()            .url("http://第三方服務的介面地址?message"+message)            .get().addHeader("Cache-Control","no-cache")            .build();        client.newCall(request).execute();        }    public static void main(String[]args){        sayRequest("你好")    }}

使用後我們就可以像呼叫本地方法一樣來呼叫遠端介面了? 那麼Dubbo是如何實現的呢? 其實就是在底層幫我們做了類似於http的通訊 而通過api的方式遮蔽了底層。讓我們直接將呼叫本地方法一樣呼叫遠端方法。

關鍵詞一:通訊協議

dubbo預設不是基於HTTP,而是基於dubbo自定義的協議。因為jdk自帶的socket api不太友好,所以dubbo底層是使用netty類做通訊的 說白了這個協議和http類似都是基於tcp協議從而進行封裝,不同點就是資料格式不同。 如下我們自定義了一個協議來讀取tcp連線中資料。

下面程式碼不是重點,重點知道協議就是,約定從tcp連線中讀取資料的方式和方法。比如約定閱讀的第一個位元組是協議型別,第二個是序列化型別,第三個是報文資料長度,第四個就是具體的報文資料。

二、提供服務流程

這裡我們只先分析dubbo的原始碼,後面再說dubbo整合spring的原理。

1. 要提供服務的物件
public interface UserService {    void say(String message);}public class UserServiceImpl implements UserService {    public void say(String message) {        System.out.println("say:" + message);    }}
2. 建立一個應用
ApplicationConfig app = new ApplicationConfig();app.setName("providerTest");
3. 指定註冊中心

這裡我們使用zookeeper作為註冊中心

 RegistryConfig registry = new RegistryConfig(); registry.setAddress("zookeeper://127.0.0.1:2181");
4. 指定通訊協議
        ProtocolConfig protocol = new ProtocolConfig();        protocol.setName("dubbo");        protocol.setPort(8012);        protocol.setThreads(200);
5. 匯出服務到zk
       // 服務提供者暴露服務配置        ServiceConfig<UserService> service = new ServiceConfig<UserService>(); // 此例項很重,封裝了與註冊中心的連線,請自行快取,否則可能造成記憶體和連線洩漏        service.setApplication(app);        service.setRegistry(registry); // 多個註冊中心可以用setRegistries()        service.setProtocol(protocol); // 多個協議可以用setProtocols()        service.setInterface(UserService.class);        service.setRef(userService);        service.setVersion("1.0.0");        // 暴露及註冊服務        //dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200×tamp=1597048727957&version=1.0.0        service.export();

當這一步進行完後,我們會在zookeeper的控制檯找到自己的服務地址。

通過url解碼之後就是

dubbo://192.168.1.9:8012/code.UserService?anyhost=true&application=providerTest&dubbo=2.5.3&interface=code.UserService&methods=say&pid=46787&revision=1.0.0&side=provider&threads=200&timestamp=1597048727957&version=1.0.0

三、原始碼分析1. 服務註冊

我們在看二流程中,可以看到前面的1234建立的步驟都是在5中使用的,說明1234其實都是資料的載體,具體如何使用是在5中來使用的。而5的物件是ServiceConfig。所以說看原始碼的入口就從ServiceConfig.export()開始。

ServiceConfig的export最終後呼叫doExport();

doExport方法會先檢查然後在註冊服務到Netty伺服器和註冊到zk

    protected synchronized void doExport() {        interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()                        .getContextClassLoader());        //檢查介面方法是否存在在介面中,這種是使用的方法級別的執行時候        checkInterfaceAndMethods(interfaceClass, methods);        //檢查介面例項是否存在,必須存在否則無法執行反射        checkRef();        //檢查應該配置,如果沒有配置自動建立一個,應用名是dubbo.application.name的值        checkApplication();        //檢查註冊中心        checkRegistry();        //檢查協議,預設是dubbo協議        checkProtocol();        appendProperties(this);        checkStubAndMock(interfaceClass);        if (path == null || path.length() == 0) {            path = interfaceName;        }        //真正匯出服務        doExportUrls();    }

這一步會將服務在本地啟動一個服務,同時將服務註冊到註冊中心中。

    private void doExportUrls() {        List<URL> registryURLs = loadRegistries(true);        //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=providerTest&dubbo=2.5.3&pid=48656®istry=zookeeper×tamp=1597052329640        for (ProtocolConfig protocolConfig : protocols) {            //核心邏輯在這裡            doExportUrlsFor1Protocol(protocolConfig, registryURLs);        }    }

根據doExportUrlsFor1Protocol的原始碼。我們發現dubbo中的所有模型都向Invoker來靠攏。

先建立一個Socket來驗證下能不能連線上註冊中心然後根據協議資訊,找到實現類。埠如果指定了就用指定的,沒有指定就隨機生成。預設是20880獲取服務版本號,首先查詢MANIFEST.MF規範中的版本號。沒有就用指定的版本號通過反射生成Invoker物件匯出Invoker啟動一個Netty服務DubboProtocol.openServer註冊到zk中RegistryProtocol.export2. Netty服務接受服務

前面註冊時候,我們說了在DubboProtocol中去建立服務的。那我們直接看這部分程式碼。

public class DubboProtocol extends AbstractProtocol {    //邏輯處理器    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {                public Object reply(ExchangeChannel channel, Object message) throws RemotingException {            if (message instanceof Invocation) {                Invocation inv = (Invocation) message;                Invoker<?> invoker = getInvoker(channel, inv);                //如果是callback 需要處理高版本呼叫低版本的問題                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){                    String methodsStr = invoker.getUrl().getParameters().get("methods");                    boolean hasMethod = false;                    if (methodsStr == null || methodsStr.indexOf(",") == -1){                        hasMethod = inv.getMethodName().equals(methodsStr);                    } else {                        String[] methods = methodsStr.split(",");                        for (String method : methods){                            if (inv.getMethodName().equals(method)){                                hasMethod = true;                                break;                            }                        }                    }                    if (!hasMethod){                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );                        return null;                    }                }                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());                return invoker.invoke(inv);            }            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());        }    //建立服務        private void openServer(URL url) {        // find server.        String key = url.getAddress();        //client 也可以暴露一個只有server可以呼叫的服務。        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);        if (isServer) {         ExchangeServer server = serverMap.get(key);         if (server == null) {          serverMap.put(key, createServer(url));         } else {          //server支援reset,配合override功能使用          server.reset(url);         }        }    }    //建立服務    private ExchangeServer createServer(URL url) {        //預設開啟server關閉時傳送readonly事件        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());        //預設開啟heartbeat        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))            throw new RpcException("Unsupported server type: " + str + ", url: " + url);        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);        ExchangeServer server;        try {            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }        str = url.getParameter(Constants.CLIENT_KEY);        if (str != null && str.length() > 0) {            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();            if (!supportedTypes.contains(str)) {                throw new RpcException("Unsupported client type: " + str);            }        }        return server;    }    }

我們主要看服務端要的3個方法

ExchangeHandler邏輯處理器建立服務openServer和createServer。 底層實現NettyTransporterExchangeHandler#ExchangeHandler

既然我們說了底層是Netty來實現的,那麼又知道Netty是通訊框架。那麼我們來看下服務端的處理邏輯吧。

class DubboProtocol{        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {            if (message instanceof Invocation) {                Invocation inv = (Invocation) message;                Invoker<?> invoker = getInvoker(channel, inv);                //如果是callback 需要處理高版本呼叫低版本的問題                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){                    String methodsStr = invoker.getUrl().getParameters().get("methods");                    boolean hasMethod = false;                    if (methodsStr == null || methodsStr.indexOf(",") == -1){                        hasMethod = inv.getMethodName().equals(methodsStr);                    } else {                        String[] methods = methodsStr.split(",");                        for (String method : methods){                            if (inv.getMethodName().equals(method)){                                hasMethod = true;                                break;                            }                        }                    }                    if (!hasMethod){                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );                        return null;                    }                }                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());                return invoker.invoke(inv);            }            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());        }        }}
過濾器

請看註釋

3. 編碼器和解碼器

這裡稍微說一點編碼器,dubbo協議的編碼器。

public class NettyServer extends AbstractServer implements Server {        @Override    protected void doOpen() throws Throwable {        NettyHelper.setNettyLoggerFactory();        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));        bootstrap = new ServerBootstrap(channelFactory);                final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);        channels = nettyHandler.getChannels();        // https://issues.jboss.org/browse/NETTY-365        // https://issues.jboss.org/browse/NETTY-379        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() {                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);                ChannelPipeline pipeline = Channels.pipeline();                /*int idleTimeout = getIdleTimeout();                if (idleTimeout > 10000) {                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));                }*/                //解碼器                pipeline.addLast("decoder", adapter.getDecoder());                //編碼器                pipeline.addLast("encoder", adapter.getEncoder());                pipeline.addLast("handler", nettyHandler);                return pipeline;            }        });        // bind        channel = bootstrap.bind(getBindAddress());    }}    

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);

主要看這個類Codec2

我們主要看服務端如何將tcp二進位制資料轉成dubbo裡面的模型。

客戶端: 資料DecodeableRpcInvocation -> 通過編碼器轉換成 -> 二進位制資料

服務端: 二進位制資料 -> 解碼器 -> DecodeableRpcInvocation -> DubboProtocol#requestHandler處理

4. 序列化協議

java模型如何轉二進位制,就是序列化協議。我們所說的hession2協議就在這裡用的。這裡追求的是速度快,資料小。

Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);

四、Invoker

前面說了dubbo中的模型都向Invoker靠攏。其實說白了就是反射。

1. 生成Invoker物件

可以看到dubbo裡面已經提供了,構建方法。我們先熟悉如何使用其API。然後把這些小的知識點慢慢的串起來就好了。

public class Tester {    @Test    public void buildInvokerTest() {        JavassistProxyFactory factory = new JavassistProxyFactory();        UserService userService = new UserServiceImpl();        URL dubboUrl = URL.valueOf("test://");        final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl);    }2. 建立執行引數

Invoker是執行體,Invocation是執行引數

public class Tester {     public void buildInvokerTest() {      final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl);        Invocation invocation = new Invocation() {            public String getMethodName() {                return "say";            }            public Class<?>[] getParameterTypes() {                return new Class[]{String.class};            }            public Object[] getArguments() {                return new Object[]{"hello"};            }            public Map<String, String> getAttachments() {                return null;            }            public String getAttachment(String key) {                return null;            }            public String getAttachment(String key, String defaultValue) {                return null;            }            public Invoker<?> getInvoker() {                return null;            }        };        System.out.println(invoker.invoke(invocation));      } }3. 構建帶有過濾器的Invoker@Test    public void linkedInvokerTest() {        JavassistProxyFactory factory = new JavassistProxyFactory();        UserService userService = new UserServiceImpl();        URL dubboUrl = URL.valueOf("test://");        final Invoker<UserService> invoker = factory.getInvoker(userService, UserService.class, dubboUrl);        List<Filter> filters = new ArrayList();        Filter filter = new Filter() {            public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {                System.out.println("-------執行過濾器-------");                return invoker.invoke(invocation);            }        };        filters.add(filter);        Invoker<UserService> userServiceInvoker = buildInvokerChain(invoker, filters);        userServiceInvoker.invoke(new Invocation() {            public String getMethodName() {                return "say";            }            public Class<?>[] getParameterTypes() {                return new Class[]{String.class};            }            public Object[] getArguments() {                return new Object[]{"hello 過濾器"};            }            public Map<String, String> getAttachments() {                return null;            }            public String getAttachment(String key) {                return null;            }            public String getAttachment(String key, String defaultValue) {                return null;            }            public Invoker<?> getInvoker() {                return null;            }        });    }五、總結

知識點回顧

如何將物件方法生成Invoker如何將Invoker註冊到註冊地中心如何處理客戶端的請求二進位制資料轉java資料協議協議中包含的序列化知識

最後求關注,求訂閱,謝謝你的閱讀!

下一篇會講,dubbo如何與Spring進行整合。

136
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Spring Boot 應用程式啟動流程分析