推送業務思路
訊息推送一般的思路就是:
1.輪詢(Pull)客戶端定時的去詢問伺服器是否有新訊息需要下發;缺點很明顯Android後臺不停的訪問網路費電還浪費流量。
2.推送(Push)服務端有新訊息立即傳送給客戶端,這就沒有時間的延遲,訊息及時到達。
當時需求過來之後就首先考慮的這兩個,開發的角度Pull實現起來簡單省事,但從使用者來說省電和省流量才是主要的,所以最後選用Push。
客戶端與服務端使用長連線,客戶端定時向服務端傳送心跳包維持長連線。
心跳週期那麼這裡會有一個問題,心跳包的週期多少才合理?
程式碼實現公共程式碼PushMsg.java定義訊息結構體:
public class PushMsg implements Serializable { private static final long serialVersionUID = 4631960168572447268L; public static PushMsg newInstance() { return new PushMsg(); } /** * 推送型別 */ private PushTypeEnum pushType; /** * 推送訊息 */ private String info; public PushTypeEnum pushType() { return pushType; } public PushMsg pushType(PushTypeEnum pushType) { this.pushType = pushType; return this; } public String info() { return info; } public PushMsg info(String info) { this.info = info; return this; } @Override public String toString() { return "PushMsg{" + "pushType=" + pushType + ", info='" + info + '\'' + '}'; }}
PushTypeEnum.java定義各種訊息推送的型別列舉
public enum PushTypeEnum { /** * 請求連線 */ CONNECT_REQ, /** * 連線成功 */ CONNECT_SUCCESS, /** * 連線失敗 */ CONNECT_FAIL, /** * 心跳請求 */ HEARTBEAT_REQ, /** * 心跳響應 */ HEARTBEAT_RESP, /** * 訊息推送 */ MSG_PUSH;}
ChannelMaps.java用於存放 channel 資訊,此處使用 channel.id() 對應的長文字,作為 key。
因為 id 長文字是唯一的。
public class ChannelMaps { private ChannelMaps(){} private static final Map<String, Channel> CHANNEL_MAP = new HashMap<>(); public static void addChannel(Channel channel) { CHANNEL_MAP.put(channel.id().asLongText(), channel); } public static void removeChannel(Channel channel) { CHANNEL_MAP.remove(channel.id().asLongText()); } public static Collection<Channel> getAllChannel() { return CHANNEL_MAP.values(); }}
服務端PushServer.java服務端核心啟動程式碼。
push() 負責向客戶端推送訊息。
public class PushServer { public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(null)), new ReadTimeoutHandler(100), new ConnectServerHandler(), new HeartBeatServerHandler() ); } }) .bind(8888) .syncUninterruptibly(); System.out.println("server start on 8888..."); // 這裡可以推送資訊 // 需要將 channel 有效資訊儲存在 map 中。 push(); channelFuture.channel().closeFuture().syncUninterruptibly(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } /** * 推送到所有客戶端 */ private static void push() { try { while (ChannelMaps.getAllChannel().isEmpty()) { TimeUnit.SECONDS.sleep(5); // 迴圈等待,直到有客戶端加入。 } System.out.println("Start push..."); PushMsg pushMsg = new PushMsg(); pushMsg.pushType(PushTypeEnum.MSG_PUSH).info("hello client"); // 迴圈所有的客戶端,傳送訊息 for(Channel channel : ChannelMaps.getAllChannel()) { System.out.println("start push client channel : " + channel.id().asLongText()); channel.writeAndFlush(pushMsg); } System.out.println("End push..."); } catch (InterruptedException e) { e.printStackTrace(); } }}
ConnectServerHandler.java這裡是對連線相關的處理。
我們這裡模擬了 auth 驗證,如果使用者輸入的資訊為 "天王蓋地虎",我們才會認為認證成功。
public class ConnectServerHandler extends SimpleChannelInboundHandler<PushMsg> { @Override protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception { //如果是連線資訊,判斷是否是黑名單ip if(PushTypeEnum.CONNECT_REQ.equals(msg.pushType())){ PushMsg response = new PushMsg(); // 口號判斷 if("天王蓋地虎".equals(msg.info())) { response.pushType(PushTypeEnum.CONNECT_SUCCESS).info("寶塔鎮河妖"); } else { response.pushType(PushTypeEnum.CONNECT_FAIL).info("有內鬼,終止交易"); } ctx.writeAndFlush(response); }else{ ctx.fireChannelRead(msg); } }}
HeartBeatServerHandler.java服務端的心跳包處理。
如果接收到心跳包資訊,則直接響應 pong。
並且將傳送 ping 的客戶端,認為是活著的,放在 channel 列表中
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<PushMsg> { @Override protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception { //如果是心跳包ping,則返回pong if(PushTypeEnum.HEARTBEAT_REQ.equals(msg.pushType())){ System.out.println("Received client heart beat : " + msg.info()); // 如果接收到心跳,則認為連線成功。 ChannelMaps.addChannel(ctx.channel()); // 返回響應 PushMsg response = new PushMsg(); response.pushType(PushTypeEnum.HEARTBEAT_RESP).info("pong"); ctx.writeAndFlush(response); }else{ ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 如果接收到心跳異常,則直接移除當前連線 ChannelMaps.removeChannel(ctx.channel()); ctx.close(); }}
客戶端PushClient.java客戶端啟動程式碼如下:
public class PushClient { public static void main(String[] args) { EventLoopGroup workGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); ChannelFuture channelFuture = bootstrap .group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new IdleStateHandler(20, 10, 0)); p.addLast(new ObjectEncoder()); p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); p.addLast(new ReadTimeoutHandler(100)); p.addLast(new ConnectClientHandler()); p.addLast(new HeatBeatClientHandler()); p.addLast(new PushClientHandler()); } }) .connect("localhost", 8888) .syncUninterruptibly(); final String id = channelFuture.channel().id().asLongText(); System.out.println("client started: " + id); channelFuture.channel().closeFuture().syncUninterruptibly(); System.out.println("client closed: " + id); } finally { workGroup.shutdownGracefully(); // 這裡可以進行重登嘗試 } }}
ConnectClientHandler.java連線處理類資訊。
會在 channel 連線上服務端之後,傳送一個請求資訊。
並且輸出客戶端對應的響應資訊。
public class ConnectClientHandler extends SimpleChannelInboundHandler<PushMsg> { // 三次握手完成,傳送連線請求 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { PushMsg pushMsg = PushMsg.newInstance().pushType(PushTypeEnum.CONNECT_REQ).info("天王蓋地虎"); ctx.writeAndFlush(pushMsg); } @Override protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception { // 判斷是否驗證成功 if(PushTypeEnum.CONNECT_SUCCESS.equals(msg.pushType())) { // 傳送心跳 System.out.println("連線成功,服務端資訊: " + msg.info()); ctx.fireChannelRead(msg); } else if(PushTypeEnum.CONNECT_FAIL.equals(msg.pushType())) { System.err.println("連線失敗"); ctx.close(); } else { ctx.fireChannelRead(msg); } }}
HeatBeatClientHandler.java心跳包處理資訊。
登入成功的 client 端,會定時 30s 傳送一次心跳請求到服務端。
此處也會處理心跳的響應資訊。
public class HeatBeatClientHandler extends SimpleChannelInboundHandler<PushMsg> { @Override protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception { // 判斷是否驗證成功,則迴圈傳送心跳包 if(PushTypeEnum.CONNECT_SUCCESS.equals(msg.pushType())) { System.out.println("連線成功,開始傳送心跳包。。。"); //30s 傳送一次心跳 ctx.executor().scheduleAtFixedRate( new HeatBeatClientTask(ctx), 0, 30, TimeUnit.SECONDS); } else if(PushTypeEnum.HEARTBEAT_RESP.equals(msg.pushType())) { // 處理響應資訊 System.out.println("接收到 server 響應: " + msg.info()); } else { ctx.fireChannelRead(msg); } } private class HeatBeatClientTask implements Runnable { private ChannelHandlerContext ctx; public HeatBeatClientTask(ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { ctx.writeAndFlush(PushMsg.newInstance().pushType(PushTypeEnum.HEARTBEAT_REQ).info("hello client")); } }}
PushClientHandler.java對於服務端的訊息,進行處理。
此處非常簡單,直接做一個輸出即可,實際業務可以根據自己的需求進行處理。
public class PushClientHandler extends SimpleChannelInboundHandler<PushMsg> { @Override protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception { //TODO: 你可以在這裡實現更加複雜的邏輯。 System.out.println("Received info from server: " + msg); }}
測試驗證啟動服務端服務端日誌九月 29, 2019 4:57:08 下午 io.netty.handler.logging.LoggingHandler channelRegistered資訊: [id: 0xe899a8b1] REGISTERED九月 29, 2019 4:57:08 下午 io.netty.handler.logging.LoggingHandler bind資訊: [id: 0xe899a8b1] BIND: 0.0.0.0/0.0.0.0:8888server start on 8888...九月 29, 2019 4:57:08 下午 io.netty.handler.logging.LoggingHandler channelActive資訊: [id: 0xe899a8b1, L:/0:0:0:0:0:0:0:0:8888] ACTIVE九月 29, 2019 4:57:17 下午 io.netty.handler.logging.LoggingHandler channelRead資訊: [id: 0xe899a8b1, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xcd6512eb, L:/127.0.0.1:8888 - R:/127.0.0.1:57451]九月 29, 2019 4:57:17 下午 io.netty.handler.logging.LoggingHandler channelReadComplete資訊: [id: 0xe899a8b1, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
啟動客戶端服務端日誌Received client heart beat : hello clientStart push...start push client channel : 00155dfffe2b601d-00004658-00000001-2d40d35bac46078d-cd6512ebEnd push...
客戶端日誌
client started: 00155dfffe2b601d-0000124c-00000000-d9cc2eabac460700-140980b9連線成功,服務端資訊: 寶塔鎮河妖連線成功,開始傳送心跳包。。。接收到 server 響應: pongReceived info from server: PushMsg{pushType=MSG_PUSH, info='hello client'}
後續心跳資訊服務端
Received client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello client
客戶端接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong
小結
實際上掌握這個能力,就可以實現很多想要實現的功能。
比如最常見的配置中心,就是這種推拉結合的策略。也可以自己從零寫一個 RPC 框架之類的。
我是老馬,期待與你的下次相遇。
最新評論