首頁>技術>

寫在前面

單機應用中的方法呼叫很簡單,直接呼叫就行,像這樣

因為呼叫方與被呼叫方在一個程序內

隨著業務的發展,單機應用會越來越力不從心,勢必會引入分散式來解決單機的問題,那麼呼叫方如何呼叫另一臺機器上的方法呢 ?

這就涉及到分散式通訊方式,從單機走向分散式,產生了很多通訊方式

而 RPC 就是實現遠端方法呼叫的方式之一;說 RPC 不是協議,可能很多小夥伴難以置信,以為我在騙你們

看著你們這一身腱子肉,我哪敢騙你們;只要你們把下面的看完,騙沒騙你們,你們自己說了算

RPC 的演進過程

先說明一下,下文中的示例雖然是 Java 程式碼實現的,但原理是通用的,重點是理解其中的原理

第一版

兩臺機器之間進行互動,那麼肯定離不開網路通訊協議,TCP / IP 也就成了繞不開的點,所以先輩們最初想到的方法就是透過 TCP / IP 來實現遠端方法的呼叫

而作業系統是沒有直接暴露 TCP / IP 介面的,而是透過 Socket 抽象了 TCP / IP 介面,所以我們可以透過 Socket 來實現最初版的遠端方法呼叫

完整示例程式碼:rpc-01,核心程式碼如下

package com.qsl.rpc;import com.qsl.rpc.entity.User;import com.qsl.rpc.server.UserServiceImpl;import com.qsl.rpc.service.IUserService;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.InputStream;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Server {    private static boolean is_running = true;    public static void main(String[] args) throws Exception {        ServerSocket serverSocket = new ServerSocket(8888);        while (is_running) {            System.out.println("等待 client 連線");            Socket client = serverSocket.accept();            System.out.println("獲取到 client...");            handle(client);            client.close();        }        serverSocket.close();    }    private static void handle(Socket client) throws Exception {        InputStream in = client.getInputStream();        OutputStream out = client.getOutputStream();        DataInputStream dis = new DataInputStream(in);        DataOutputStream dos = new DataOutputStream(out);        // 從 socket 讀取引數        int id = dis.readInt();        System.out.println("id = " + id);        // 查詢本地資料        IUserService userService = new UserServiceImpl();        User user = userService.getUserById(id);        // 往 socket 寫響應值        dos.writeInt(user.getId());        dos.writeUTF(user.getName());        dos.flush();        dis.close();        dos.close();    }}

Client:

package com.qsl.rpc;import com.qsl.rpc.entity.User;import java.io.DataInputStream;import java.io.DataOutputStream;import java.net.Socket;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Client {    public static void main(String[] args) throws Exception {        Socket s = new Socket("127.0.0.1", 8888);        // 網路傳輸資料        // 往 socket 寫請求引數        DataOutputStream dos = new DataOutputStream(s.getOutputStream());        dos.writeInt(18);        // 從 socket 讀響應值        DataInputStream  dis = new DataInputStream(s.getInputStream());        int id = dis.readInt();        String name = dis.readUTF();        // 將響應值封裝成 User 物件        User user = new User(id, name);        dos.close();        dis.close();        s.close();        // 進行業務處理        System.out.println(user);    }}

程式碼很簡單,就是一個簡單的 Socket 通訊;如果看不懂,那就需要去補充下 Socket 和 IO 的知識

測試結果如下

可以看到 Client 與 Server 之間是可以進行通訊的;但是,這種方式非常麻煩,有太多缺點,最明顯的一個就是

Client 端業務程式碼 與 網路傳輸程式碼 混合在一起,沒有明確的模組劃分

如果有多個開發者同時進行 Client 開發,那麼他們都需要知道 Socket、IO

第二版

針對第一版的缺點,演進出了這一版,引進 Stub (早期的叫法,不用深究,理解成代理就行)實現 Client 端網路傳輸程式碼的封裝

完整示例程式碼:rpc-02,改動部分如下

Stub:

package com.qsl.rpc;import com.qsl.rpc.entity.User;import java.io.DataInputStream;import java.io.DataOutputStream;import java.net.Socket;/** * 相當於一個靜態代理,封裝了網路資料傳輸 * @author 青石路 * @date 2021/1/17 9:38 */public class Stub {    public User getUserById(Integer id) throws Exception {        Socket s = new Socket("127.0.0.1", 8888);        // 網路傳輸資料        // 往 socket 寫請求引數        DataOutputStream dos = new DataOutputStream(s.getOutputStream());        dos.writeInt(id);        // 從 socket 讀響應值        DataInputStream dis = new DataInputStream(s.getInputStream());        int userId = dis.readInt();        String name = dis.readUTF();        // 將響應值封裝成 User 物件        User user = new User(userId, name);        dos.close();        dis.close();        s.close();        return user;    }}

Client:

package com.qsl.rpc;import com.qsl.rpc.entity.User;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Client {    public static void main(String[] args) throws Exception {        // 不再關注網路傳輸        Stub stub = new Stub();        User user = stub.getUserById(18);        // 進行業務處理        System.out.println(user);    }}

Client 不再關注網路資料傳輸,一心關注業務程式碼就好

有小夥伴可能就槓上了:這不就是把網路傳輸程式碼移了個位置嘛,這也算改進?

迭代開發是一個逐步完善的過程,而這也算是一個改進哦

但這一版還是有很多缺點,最明顯的一個就是

Stub 只能代理 IUserService 的一個方法 getUserById ,侷限性太大,不夠通用

如果想在 IUserService 新增一個方法: getUserByName ,那麼需要在 Stub 中新增對應的方法,Server 端也需要做對應的修改來支援

第三版

第二版中的 Stub 代理功能太弱了,那有沒有什麼方式可以增強 Stub 的代理功能了?

前面的 Stub 相當於是一個靜態代理,所以功能有限,那靜態代理的增強版是什麼了,沒錯,就是:動態代理

不熟悉動態代理的小夥伴,一定要先弄懂動態代理:設計模式之代理,手動實現動態代理,揭秘原理實現

JDK 有動態代理的 API,我們就用它來實現

完整示例程式碼:rpc-03,相較於第二版,改動比較大,大家需要仔細看

Server:

package com.qsl.rpc;import com.qsl.rpc.entity.User;import com.qsl.rpc.server.UserServiceImpl;import com.qsl.rpc.service.IUserService;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.lang.reflect.Method;import java.net.ServerSocket;import java.net.Socket;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Server {    private static boolean is_running = true;    public static void main(String[] args) throws Exception {        ServerSocket serverSocket = new ServerSocket(8888);        while (is_running) {            System.out.println("等待 client 連線");            Socket client = serverSocket.accept();            System.out.println("獲取到 client...");            handle(client);            client.close();        }        serverSocket.close();    }    private static void handle(Socket client) throws Exception {        InputStream in = client.getInputStream();        OutputStream out = client.getOutputStream();        ObjectInputStream ois = new ObjectInputStream(in);        ObjectOutputStream oos = new ObjectOutputStream(out);        // 獲取方法名、方法的引數型別、方法的引數值        String methodName = ois.readUTF();        Class[] parameterTypes = (Class[]) ois.readObject();        Object[] args = (Object[]) ois.readObject();        IUserService userService = new UserServiceImpl();        Method method = userService.getClass().getMethod(methodName, parameterTypes);        User user = (User) method.invoke(userService, args);        // 往 socket 寫響應值;直接寫可序列化物件(實現 Serializable 介面)        oos.writeObject(user);        oos.flush();        ois.close();        oos.close();    }}

Stub:

package com.qsl.rpc;import com.qsl.rpc.entity.User;import com.qsl.rpc.service.IUserService;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.Socket;/** * 動態代理,封裝了網路資料傳輸 * @author 青石路 * @date 2021/1/17 9:38 */public class Stub {    public static IUserService getStub() {        Object obj = Proxy.newProxyInstance(IUserService.class.getClassLoader(),                new Class[]{IUserService.class}, new NetInvocationHandler());        return (IUserService)obj;    }    static class NetInvocationHandler implements InvocationHandler {        /**         *         * @param proxy         * @param method         * @param args         * @return         * @throws Throwable         */        @Override        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {            Socket s = new Socket("127.0.0.1", 8888);            // 網路傳輸資料            ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());            // 傳輸方法名、方法引數型別、方法引數值;可能會有方法過載,所以要傳引數列表            oos.writeUTF(method.getName());            Class[] parameterTypes = method.getParameterTypes();            oos.writeObject(parameterTypes);            oos.writeObject(args);            // 從 socket 讀響應值            ObjectInputStream ois = new ObjectInputStream(s.getInputStream());            User user = (User) ois.readObject();            oos.close();            ois.close();            s.close();            return user;        }    }}

Client:

package com.qsl.rpc;import com.qsl.rpc.entity.User;import com.qsl.rpc.service.IUserService;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Client {    public static void main(String[] args) throws Exception {        IUserService userService = Stub.getStub();        //User user = userService.getUserById(23);        User user = userService.getUserByName("李小龍");        // 進行業務處理        System.out.println(user);    }}

我們來看下效果

此時, IUserService 介面的方法都能被代理了,即使它新增介面, Stub 不用做任何修改也能代理上

另外, Server 端的響應值改成了物件,而不是單個屬性逐一返回,那麼無論 User 是新增屬性,還是刪減屬性,Client 和 Server 都不受影響了

這一版的改進是非常大的進步;但還是存在比較明顯的缺點

只支援 IUserService ,通用性還是不夠完美

如果新引進了一個 IPersonService ,那怎麼辦 ?

第四版

第三版相當於 Client 與 Server 端約定好了,只進行 User 服務的互動,所以 User 之外的服務,兩邊是通訊不上的

如果還需要進行其他服務的互動,那麼 Client 就需要將請求的服務名作為引數傳遞給 Server,告訴 Server 我需要和哪個服務進行互動

所以,Client 和 Server 都需要進行改造

完整示例程式碼:rpc-04,相較於第三版,改動比較小,相信大家都能看懂

Server:

package com.qsl.rpc;import com.qsl.rpc.server.PersonServiceImpl;import com.qsl.rpc.server.UserServiceImpl;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.lang.reflect.Method;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Server {    private static boolean is_running = true;    private static final HashMap<String, Object> REGISTRY_MAP = new HashMap();    public static void main(String[] args) throws Exception {        // 向註冊中心註冊服務        REGISTRY_MAP.put("com.qsl.rpc.service.IUserService", new UserServiceImpl());        REGISTRY_MAP.put("com.qsl.rpc.service.IPersonService", new PersonServiceImpl());        ServerSocket serverSocket = new ServerSocket(8888);        while (is_running) {            System.out.println("等待 client 連線");            Socket client = serverSocket.accept();            System.out.println("獲取到 client...");            handle(client);            client.close();        }        serverSocket.close();    }    private static void handle(Socket client) throws Exception {        InputStream in = client.getInputStream();        OutputStream out = client.getOutputStream();        ObjectInputStream ois = new ObjectInputStream(in);        ObjectOutputStream oos = new ObjectOutputStream(out);        // 獲取服務名        String serviceName = ois.readUTF();        System.out.println("serviceName = " + serviceName);        // 獲取方法名、方法的引數型別、方法的引數值        String methodName = ois.readUTF();        Class[] parameterTypes = (Class[]) ois.readObject();        Object[] args = (Object[]) ois.readObject();        // 獲取服務;從服務註冊中心獲取服務        Object serverObject = REGISTRY_MAP.get(serviceName);        // 透過反射呼叫服務的方法        Method method = serverObject.getClass().getMethod(methodName, parameterTypes);        Object resp = method.invoke(serverObject, args);        // 往 socket 寫響應值;直接寫可序列化物件(實現 Serializable 介面)        oos.writeObject(resp);        oos.flush();        ois.close();        oos.close();    }}

Stub:

package com.qsl.rpc;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.Socket;/** * 動態代理,封裝了網路資料傳輸 * @author 青石路 * @date 2021/1/17 9:38 */public class Stub {    public static Object getStub(Class clazz) {        Object obj = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new NetInvocationHandler(clazz));        return obj;    }    static class NetInvocationHandler implements InvocationHandler {        private Class clazz;        NetInvocationHandler(Class clazz){            this.clazz = clazz;        }        @Override        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {            Socket s = new Socket("127.0.0.1", 8888);            // 網路傳輸資料            ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());            // 傳輸介面名,告訴服務端,我要哪個服務            oos.writeUTF(clazz.getName());            // 傳輸方法名、方法引數型別、方法引數值;可能會有方法過載,所以要傳引數列表            oos.writeUTF(method.getName());            Class[] parameterTypes = method.getParameterTypes();            oos.writeObject(parameterTypes);            oos.writeObject(args);            // 從 socket 讀響應值            ObjectInputStream ois = new ObjectInputStream(s.getInputStream());            Object resp = ois.readObject();            oos.close();            ois.close();            s.close();            return resp;        }    }}

Client:

package com.qsl.rpc;import com.qsl.rpc.entity.Person;import com.qsl.rpc.entity.User;import com.qsl.rpc.service.IPersonService;import com.qsl.rpc.service.IUserService;/** * @author 青石路 * @date 2021/1/16 19:49 */public class Client {    public static void main(String[] args) throws Exception {        /*IUserService userService = (IUserService)Stub.getStub(IUserService.class);        User user = userService.getUserByName("青石路");        System.out.println(user);*/        IPersonService personService = (IPersonService)Stub.getStub(IPersonService.class);        Person person = personService.getPersonByPhoneNumber("123");        System.out.println(person);    }}

此版本抽象的比較好了,遮蔽了底層細節,支援任何服務的任意方法,算是一個比較完美的版本了

至此,一個最基礎的 RPC 就已經實現了

但是,還是有大量的細節可以改善,序列化與反序列化就是其中之一

網路中資料的傳輸都是二進位制,所以請求引數需要序列化成二進位制,響應引數需要反序列化成物件

而 JDK 自帶的序列化與反序列化,具有語言侷限性、效率慢、序列化後的長度太長等缺點

序列化與反序列化協議非常多,常見的有

這些協議孰好孰壞,本文不做過多闡述,這裡提出來只是想告訴大家:序列化與反序列化協議是 RPC 中的重要一環

總結

1、RPC 的演進過程

2、RPC 的組成要素

三要素:動態代理、序列化與反序列化協議、網路通訊協議

網路通訊協議可以是 TCP、UDP,也可以是 HTTP 1.x、HTTP 2,甚至有能力可以是自定義協議

3、RPC 框架

RPC 不等同於 RPC 框架,RPC 是一個概念,是一個分散式通訊方式

基於 RPC 產生了很多 RPC 框架:Dubbo、Netty、gRPC、BRPC、Thrift、JSON-RPC 等等

RPC 框架對 RPC 進行了功能豐富,包括:服務註冊、服務發現、服務治理、服務監控、服務負載均衡等功能

17
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 用 VS Code 寫 Python,這幾個外掛是必裝的