原文連結:https://www.cnblogs.com/2YSP/p/13545217.html
一、前言前段時間看到一篇不錯的文章《看了這篇你就會手寫RPC框架了》,於是便來了興趣對著實現了一遍,後面覺得還有很多最佳化的地方便對其進行了改進。
主要改動點如下:
除了Java序列化協議,增加了protobuf和kryo序列化協議,配置即用。增加多種負載均衡演算法(隨機、輪詢、加權輪詢、平滑加權輪詢),配置即用。客戶端增加本地服務列表快取,提高效能。修復高併發情況下,netty導致的記憶體洩漏問題由原來的每個請求建立一次連線,改為建立TCP長連線,並多次複用。服務端增加執行緒池提高訊息處理能力二、介紹RPC,即 Remote Procedure Call(遠端過程呼叫),呼叫遠端計算機上的服務,就像呼叫本地服務一樣。RPC可以很好的解耦系統,如WebService就是一種基於Http協議的RPC。
呼叫示意圖
呼叫示意圖
總的來說,就如下幾個步驟:
客戶端(ServerA)執行遠端方法時就呼叫client stub傳遞類名、方法名和引數等資訊。client stub會將引數等資訊序列化為二進位制流的形式,然後透過Sockect傳送給服務端(ServerB)服務端收到資料包後,server stub 需要進行解析反序列化為類名、方法名和引數等資訊。server stub呼叫對應的本地方法,並把執行結果返回給客戶端所以一個RPC框架有如下角色:
服務消費者遠端方法的呼叫方,即客戶端。一個服務既可以是消費者也可以是提供者。
服務提供者遠端服務的提供方,即服務端。一個服務既可以是消費者也可以是提供者。
註冊中心儲存服務提供者的服務地址等資訊,一般由zookeeper、redis等實現。
監控運維(可選)監控介面的響應時間、統計請求數量等,及時發現系統問題併發出告警通知。
三、實現本RPC框架rpc-spring-boot-starter涉及技術棧如下:
使用zookeeper作為註冊中心使用netty作為通訊框架訊息編解碼:protostuff、kryo、javaspring使用SPI來根據配置動態選擇負載均衡演算法等由於程式碼過多,這裡只講幾處改動點。
3.1動態負載均衡演算法1.編寫LoadBalance的實現類
負載均衡演算法實現類
2.自定義註解 @LoadBalanceAno
/** * 負載均衡註解 */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface LoadBalanceAno { String value() default "";}/** * 輪詢演算法 */@LoadBalanceAno(RpcConstant.BALANCE_ROUND)public class FullRoundBalance implements LoadBalance { private static Logger logger = LoggerFactory.getLogger(FullRoundBalance.class); private volatile int index; @Override public synchronized Service chooseOne(List<Service> services) { // 加鎖防止多執行緒情況下,index超出services.size() if (index == services.size()) { index = 0; } return services.get(index++); }}
3.新建在resource目錄下META-INF/servers資料夾並建立檔案
enter description here
4.RpcConfig增加配置項loadBalance
/** * @author 2YSP * @date 2020/7/26 15:13 */@ConfigurationProperties(prefix = "sp.rpc")public class RpcConfig { /** * 服務註冊中心地址 */ private String registerAddress = "127.0.0.1:2181"; /** * 服務暴露埠 */ private Integer serverPort = 9999; /** * 服務協議 */ private String protocol = "java"; /** * 負載均衡演算法 */ private String loadBalance = "random"; /** * 權重,預設為1 */ private Integer weight = 1; // 省略getter setter}
5.在自動配置類RpcAutoConfiguration根據配置選擇對應的演算法實現類
/** * 使用spi匹配符合配置的負載均衡演算法 * * @param name * @return */ private LoadBalance getLoadBalance(String name) { ServiceLoader<LoadBalance> loader = ServiceLoader.load(LoadBalance.class); Iterator<LoadBalance> iterator = loader.iterator(); while (iterator.hasNext()) { LoadBalance loadBalance = iterator.next(); LoadBalanceAno ano = loadBalance.getClass().getAnnotation(LoadBalanceAno.class); Assert.notNull(ano, "load balance name can not be empty!"); if (name.equals(ano.value())) { return loadBalance; } } throw new RpcException("invalid load balance config"); } @Bean public ClientProxyFactory proxyFactory(@Autowired RpcConfig rpcConfig) { ClientProxyFactory clientProxyFactory = new ClientProxyFactory(); // 設定服務發現著 clientProxyFactory.setServerDiscovery(new ZookeeperServerDiscovery(rpcConfig.getRegisterAddress())); // 設定支援的協議 Map<String, MessageProtocol> supportMessageProtocols = buildSupportMessageProtocols(); clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols); // 設定負載均衡演算法 LoadBalance loadBalance = getLoadBalance(rpcConfig.getLoadBalance()); clientProxyFactory.setLoadBalance(loadBalance); // 設定網路層實現 clientProxyFactory.setNetClient(new NettyNetClient()); return clientProxyFactory; }
3.2本地服務列表快取使用Map來快取資料
/** * 服務發現本地快取 */public class ServerDiscoveryCache { /** * key: serviceName */ private static final Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>(); /** * 客戶端注入的遠端服務service class */ public static final List<String> SERVICE_CLASS_NAMES = new ArrayList<>(); public static void put(String serviceName, List<Service> serviceList) { SERVER_MAP.put(serviceName, serviceList); } /** * 去除指定的值 * @param serviceName * @param service */ public static void remove(String serviceName, Service service) { SERVER_MAP.computeIfPresent(serviceName, (key, value) -> value.stream().filter(o -> !o.toString().equals(service.toString())).collect(Collectors.toList()) ); } public static void removeAll(String serviceName) { SERVER_MAP.remove(serviceName); } public static boolean isEmpty(String serviceName) { return SERVER_MAP.get(serviceName) == null || SERVER_MAP.get(serviceName).size() == 0; } public static List<Service> get(String serviceName) { return SERVER_MAP.get(serviceName); }}
ClientProxyFactory,先查本地快取,快取沒有再查詢zookeeper。
/** * 根據服務名獲取可用的服務地址列表 * @param serviceName * @return */ private List<Service> getServiceList(String serviceName) { List<Service> services; synchronized (serviceName){ if (ServerDiscoveryCache.isEmpty(serviceName)) { services = serverDiscovery.findServiceList(serviceName); if (services == null || services.size() == 0) { throw new RpcException("No provider available!"); } ServerDiscoveryCache.put(serviceName, services); } else { services = ServerDiscoveryCache.get(serviceName); } } return services; }
問題: 如果服務端因為宕機或網路問題下線了,快取卻還在就會導致客戶端請求已經不可用的服務端,增加請求失敗率。 **解決方案:**由於服務端註冊的是臨時節點,所以如果服務端下線節點會被移除。只要監聽zookeeper的子節點,如果新增或刪除子節點就直接清空本地快取即可。
DefaultRpcProcessor
/** * Rpc處理者,支援服務啟動暴露,自動注入Service * @author 2YSP * @date 2020/7/26 14:46 */public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { // Spring啟動完畢過後會收到一個事件通知 if (Objects.isNull(event.getApplicationContext().getParent())){ ApplicationContext context = event.getApplicationContext(); // 開啟服務 startServer(context); // 注入Service injectService(context); } } private void injectService(ApplicationContext context) { String[] names = context.getBeanDefinitionNames(); for(String name : names){ Class<?> clazz = context.getType(name); if (Objects.isNull(clazz)){ continue; } Field[] declaredFields = clazz.getDeclaredFields(); for(Field field : declaredFields){ // 找出標記了InjectService註解的屬性 InjectService injectService = field.getAnnotation(InjectService.class); if (injectService == null){ continue; } Class<?> fieldClass = field.getType(); Object object = context.getBean(name); field.setAccessible(true); try { field.set(object,clientProxyFactory.getProxy(fieldClass)); } catch (IllegalAccessException e) { e.printStackTrace(); } // 新增本地服務快取 ServerDiscoveryCache.SERVICE_CLASS_NAMES.add(fieldClass.getName()); } } // 註冊子節點監聽 if (clientProxyFactory.getServerDiscovery() instanceof ZookeeperServerDiscovery){ ZookeeperServerDiscovery serverDiscovery = (ZookeeperServerDiscovery) clientProxyFactory.getServerDiscovery(); ZkClient zkClient = serverDiscovery.getZkClient(); ServerDiscoveryCache.SERVICE_CLASS_NAMES.forEach(name ->{ String servicePath = RpcConstant.ZK_SERVICE_PATH + RpcConstant.PATH_DELIMITER + name + "/service"; zkClient.subscribeChildChanges(servicePath, new ZkChildListenerImpl()); }); logger.info("subscribe service zk node successfully"); } } private void startServer(ApplicationContext context) { ... }}
ZkChildListenerImpl
/** * 子節點事件監聽處理類 */public class ZkChildListenerImpl implements IZkChildListener { private static Logger logger = LoggerFactory.getLogger(ZkChildListenerImpl.class); /** * 監聽子節點的刪除和新增事件 * @param parentPath /rpc/serviceName/service * @param childList * @throws Exception */ @Override public void handleChildChange(String parentPath, List<String> childList) throws Exception { logger.debug("Child change parentPath:[{}] -- childList:[{}]", parentPath, childList); // 只要子節點有改動就清空快取 String[] arr = parentPath.split("/"); ServerDiscoveryCache.removeAll(arr[2]); }}
3.3nettyClient支援TCP長連線這部分的改動最多,先增加新的sendRequest介面。
新增介面
實現類NettyNetClient
/** * @author 2YSP * @date 2020/7/25 20:12 */public class NettyNetClient implements NetClient { private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); private static ExecutorService threadPool = new ThreadPoolExecutor(4, 10, 200, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("rpcClient-%d") .build()); private EventLoopGroup loopGroup = new NioEventLoopGroup(4); /** * 已連線的服務快取 * key: 服務地址,格式:ip:port */ public static Map<String, SendHandlerV2> connectedServerNodes = new ConcurrentHashMap<>(); @Override public byte[] sendRequest(byte[] data, Service service) throws InterruptedException { .... return respData; } @Override public RpcResponse sendRequest(RpcRequest rpcRequest, Service service, MessageProtocol messageProtocol) { String address = service.getAddress(); synchronized (address) { if (connectedServerNodes.containsKey(address)) { SendHandlerV2 handler = connectedServerNodes.get(address); logger.info("使用現有的連線"); return handler.sendRequest(rpcRequest); } String[] addrInfo = address.split(":"); final String serverAddress = addrInfo[0]; final String serverPort = addrInfo[1]; final SendHandlerV2 handler = new SendHandlerV2(messageProtocol, address); threadPool.submit(() -> { // 配置客戶端 Bootstrap b = new Bootstrap(); b.group(loopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline .addLast(handler); } }); // 啟用客戶端連線 ChannelFuture channelFuture = b.connect(serverAddress, Integer.parseInt(serverPort)); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { connectedServerNodes.put(address, handler); } }); } ); logger.info("使用新的連線。。。"); return handler.sendRequest(rpcRequest); } }}
每次請求都會呼叫sendRequest()方法,用執行緒池非同步和服務端建立TCP長連線,連線成功後將SendHandlerV2快取到ConcurrentHashMap中方便複用,後續請求的請求地址(ip+port)如果在connectedServerNodes中存在則使用connectedServerNodes中的handler處理不再重新建立連線。
SendHandlerV2
/** * @author 2YSP * @date 2020/8/19 20:06 */public class SendHandlerV2 extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(SendHandlerV2.class); /** * 等待通道建立最大時間 */ static final int CHANNEL_WAIT_TIME = 4; /** * 等待響應最大時間 */ static final int RESPONSE_WAIT_TIME = 8; private volatile Channel channel; private String remoteAddress; private static Map<String, RpcFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>(); private MessageProtocol messageProtocol; private CountDownLatch latch = new CountDownLatch(1); public SendHandlerV2(MessageProtocol messageProtocol,String remoteAddress) { this.messageProtocol = messageProtocol; this.remoteAddress = remoteAddress; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { this.channel = ctx.channel(); latch.countDown(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.debug("Connect to server successfully:{}", ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.debug("Client reads message:{}", msg); ByteBuf byteBuf = (ByteBuf) msg; byte[] resp = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(resp); // 手動回收 ReferenceCountUtil.release(byteBuf); RpcResponse response = messageProtocol.unmarshallingResponse(resp); RpcFuture<RpcResponse> future = requestMap.get(response.getRequestId()); future.setResponse(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); logger.error("Exception occurred:{}", cause.getMessage()); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); logger.error("channel inactive with remoteAddress:[{}]",remoteAddress); NettyNetClient.connectedServerNodes.remove(remoteAddress); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } public RpcResponse sendRequest(RpcRequest request) { RpcResponse response; RpcFuture<RpcResponse> future = new RpcFuture<>(); requestMap.put(request.getRequestId(), future); try { byte[] data = messageProtocol.marshallingRequest(request); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); if (latch.await(CHANNEL_WAIT_TIME,TimeUnit.SECONDS)){ channel.writeAndFlush(reqBuf); // 等待響應 response = future.get(RESPONSE_WAIT_TIME, TimeUnit.SECONDS); }else { throw new RpcException("establish channel time out"); } } catch (Exception e) { throw new RpcException(e.getMessage()); } finally { requestMap.remove(request.getRequestId()); } return response; }}
RpcFuture
package cn.sp.rpc.client.net;import java.util.concurrent.*;/** * @author 2YSP * @date 2020/8/19 22:31 */public class RpcFuture<T> implements Future<T> { private T response; /** * 因為請求和響應是一一對應的,所以這裡是1 */ private CountDownLatch countDownLatch = new CountDownLatch(1); /** * Future的請求時間,用於計算Future是否超時 */ private long beginTime = System.currentTimeMillis(); @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { if (response != null) { return true; } return false; } /** * 獲取響應,直到有結果才返回 * @return * @throws InterruptedException * @throws ExecutionException */ @Override public T get() throws InterruptedException, ExecutionException { countDownLatch.await(); return response; } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (countDownLatch.await(timeout,unit)){ return response; } return null; } public void setResponse(T response) { this.response = response; countDownLatch.countDown(); } public long getBeginTime() { return beginTime; }}
此處邏輯,第一次執行 SendHandlerV2#sendRequest() 時channel需要等待通道建立好之後才能傳送請求,所以用CountDownLatch來控制,等待通道建立。 自定義Future+requestMap快取來實現netty的請求和阻塞等待響應,RpcRequest物件在建立時會生成一個請求的唯一標識requestId,傳送請求前先將RpcFuture快取到requestMap中,key為requestId,讀取到服務端的響應資訊後(channelRead方法),將響應結果放入對應的RpcFuture中。 SendHandlerV2#channelInactive() 方法中,如果連線的服務端異常斷開連線了,則及時清理快取中對應的serverNode。
四、壓力測試測試環境:
(英特爾)Intel(R) Core(TM) i5-6300HQ CPU @ 2.30GHz 4核windows10家庭版(64位)16G記憶體1.本地啟動zookeeper 2.本地啟動一個消費者,兩個服務端,輪詢演算法 3.使用ab進行壓力測試,4個執行緒傳送10000個請求
ab -c 4 -n 10000 http://localhost:8080/test/user?id=1
測試結果:
測試結果
從圖片可以看出,10000個請求只用了11s,比之前的130+秒耗時減少了10倍以上。
程式碼地址:https://github.com/2YSP/rpc-spring-boot-starter
https://github.com/2YSP/rpc-example