首頁>技術>

推送業務思路

訊息推送一般的思路就是:

1.輪詢(Pull)客戶端定時的去詢問伺服器是否有新訊息需要下發;缺點很明顯Android後臺不停的訪問網路費電還浪費流量。

2.推送(Push)服務端有新訊息立即傳送給客戶端,這就沒有時間的延遲,訊息及時到達。

當時需求過來之後就首先考慮的這兩個,開發的角度Pull實現起來簡單省事,但從使用者來說省電和省流量才是主要的,所以最後選用Push。

客戶端與服務端使用長連線,客戶端定時向服務端傳送心跳包維持長連線。

心跳週期

那麼這裡會有一個問題,心跳包的週期多少才合理?

程式碼實現公共程式碼PushMsg.java

定義訊息結構體:

public class PushMsg implements Serializable {    private static final long serialVersionUID = 4631960168572447268L;    public static PushMsg newInstance() {        return new PushMsg();    }    /**     * 推送型別     */    private PushTypeEnum pushType;    /**     * 推送訊息     */    private String info;    public PushTypeEnum pushType() {        return pushType;    }    public PushMsg pushType(PushTypeEnum pushType) {        this.pushType = pushType;        return this;    }    public String info() {        return info;    }    public PushMsg info(String info) {        this.info = info;        return this;    }    @Override    public String toString() {        return "PushMsg{" +                "pushType=" + pushType +                ", info='" + info + '\'' +                '}';    }}
PushTypeEnum.java

定義各種訊息推送的型別列舉

public enum PushTypeEnum {    /**     * 請求連線     */    CONNECT_REQ,    /**     * 連線成功     */    CONNECT_SUCCESS,    /**     * 連線失敗     */    CONNECT_FAIL,    /**     * 心跳請求     */    HEARTBEAT_REQ,    /**     * 心跳響應     */    HEARTBEAT_RESP,    /**     * 訊息推送     */    MSG_PUSH;}
ChannelMaps.java

用於存放 channel 資訊,此處使用 channel.id() 對應的長文字,作為 key。

因為 id 長文字是唯一的。

public class ChannelMaps {    private ChannelMaps(){}    private static final Map<String, Channel> CHANNEL_MAP = new HashMap<>();    public static void addChannel(Channel channel) {        CHANNEL_MAP.put(channel.id().asLongText(), channel);    }    public static void removeChannel(Channel channel) {        CHANNEL_MAP.remove(channel.id().asLongText());    }    public static Collection<Channel> getAllChannel() {        return CHANNEL_MAP.values();    }}
服務端PushServer.java

服務端核心啟動程式碼。

push() 負責向客戶端推送訊息。

public class PushServer {    public static void main(String[] args) {        EventLoopGroup workerGroup = new NioEventLoopGroup();        EventLoopGroup bossGroup = new NioEventLoopGroup();        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(                                new ObjectEncoder(),                                new ObjectDecoder(ClassResolvers.cacheDisabled(null)),                                new ReadTimeoutHandler(100),                                new ConnectServerHandler(),                                new HeartBeatServerHandler()                            );                        }                    })                    .bind(8888)                    .syncUninterruptibly();            System.out.println("server start on 8888...");            // 這裡可以推送資訊            // 需要將 channel 有效資訊儲存在 map 中。            push();            channelFuture.channel().closeFuture().syncUninterruptibly();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }    /**     * 推送到所有客戶端     */    private static void push() {        try {            while (ChannelMaps.getAllChannel().isEmpty()) {                TimeUnit.SECONDS.sleep(5);                // 迴圈等待,直到有客戶端加入。            }            System.out.println("Start push...");            PushMsg pushMsg = new PushMsg();            pushMsg.pushType(PushTypeEnum.MSG_PUSH).info("hello client");            // 迴圈所有的客戶端,傳送訊息            for(Channel channel : ChannelMaps.getAllChannel()) {                System.out.println("start push client channel : " + channel.id().asLongText());                channel.writeAndFlush(pushMsg);            }            System.out.println("End push...");        } catch (InterruptedException e) {            e.printStackTrace();        }    }}
ConnectServerHandler.java

這裡是對連線相關的處理。

我們這裡模擬了 auth 驗證,如果使用者輸入的資訊為 "天王蓋地虎",我們才會認為認證成功。

public class ConnectServerHandler extends SimpleChannelInboundHandler<PushMsg> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception {        //如果是連線資訊,判斷是否是黑名單ip        if(PushTypeEnum.CONNECT_REQ.equals(msg.pushType())){            PushMsg response = new PushMsg();            // 口號判斷            if("天王蓋地虎".equals(msg.info())) {                response.pushType(PushTypeEnum.CONNECT_SUCCESS).info("寶塔鎮河妖");            } else {                response.pushType(PushTypeEnum.CONNECT_FAIL).info("有內鬼,終止交易");            }            ctx.writeAndFlush(response);        }else{            ctx.fireChannelRead(msg);        }    }}
HeartBeatServerHandler.java

服務端的心跳包處理。

如果接收到心跳包資訊,則直接響應 pong。

並且將傳送 ping 的客戶端,認為是活著的,放在 channel 列表中

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<PushMsg> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception {        //如果是心跳包ping,則返回pong        if(PushTypeEnum.HEARTBEAT_REQ.equals(msg.pushType())){            System.out.println("Received client heart beat : " + msg.info());            // 如果接收到心跳,則認為連線成功。            ChannelMaps.addChannel(ctx.channel());            // 返回響應            PushMsg response = new PushMsg();            response.pushType(PushTypeEnum.HEARTBEAT_RESP).info("pong");            ctx.writeAndFlush(response);        }else{            ctx.fireChannelRead(msg);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        // 如果接收到心跳異常,則直接移除當前連線        ChannelMaps.removeChannel(ctx.channel());        ctx.close();    }}
客戶端PushClient.java

客戶端啟動程式碼如下:

public class PushClient {    public static void main(String[] args) {        EventLoopGroup workGroup = new NioEventLoopGroup();        try {            Bootstrap bootstrap = new Bootstrap();            ChannelFuture channelFuture = bootstrap                    .group(workGroup)                    .channel(NioSocketChannel.class)                    .option(ChannelOption.SO_KEEPALIVE, true)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            p.addLast(new IdleStateHandler(20, 10, 0));                            p.addLast(new ObjectEncoder());                            p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));                            p.addLast(new ReadTimeoutHandler(100));                            p.addLast(new ConnectClientHandler());                            p.addLast(new HeatBeatClientHandler());                            p.addLast(new PushClientHandler());                        }                    })                    .connect("localhost", 8888)                    .syncUninterruptibly();            final String id = channelFuture.channel().id().asLongText();            System.out.println("client started: " + id);            channelFuture.channel().closeFuture().syncUninterruptibly();            System.out.println("client closed: " + id);        } finally {            workGroup.shutdownGracefully();            // 這裡可以進行重登嘗試        }    }}
ConnectClientHandler.java

連線處理類資訊。

會在 channel 連線上服務端之後,傳送一個請求資訊。

並且輸出客戶端對應的響應資訊。

public class ConnectClientHandler extends SimpleChannelInboundHandler<PushMsg> {    // 三次握手完成,傳送連線請求    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        PushMsg pushMsg = PushMsg.newInstance().pushType(PushTypeEnum.CONNECT_REQ).info("天王蓋地虎");        ctx.writeAndFlush(pushMsg);    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception {        // 判斷是否驗證成功        if(PushTypeEnum.CONNECT_SUCCESS.equals(msg.pushType())) {            // 傳送心跳            System.out.println("連線成功,服務端資訊: " + msg.info());            ctx.fireChannelRead(msg);        } else if(PushTypeEnum.CONNECT_FAIL.equals(msg.pushType())) {            System.err.println("連線失敗");            ctx.close();        } else {            ctx.fireChannelRead(msg);        }    }}
HeatBeatClientHandler.java

心跳包處理資訊。

登入成功的 client 端,會定時 30s 傳送一次心跳請求到服務端。

此處也會處理心跳的響應資訊。

public class HeatBeatClientHandler extends SimpleChannelInboundHandler<PushMsg> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception {        // 判斷是否驗證成功,則迴圈傳送心跳包        if(PushTypeEnum.CONNECT_SUCCESS.equals(msg.pushType())) {            System.out.println("連線成功,開始傳送心跳包。。。");            //30s 傳送一次心跳            ctx.executor().scheduleAtFixedRate(                    new HeatBeatClientTask(ctx), 0, 30, TimeUnit.SECONDS);        } else if(PushTypeEnum.HEARTBEAT_RESP.equals(msg.pushType())) {            // 處理響應資訊            System.out.println("接收到 server 響應: " + msg.info());        } else {            ctx.fireChannelRead(msg);        }    }    private class HeatBeatClientTask implements Runnable {        private ChannelHandlerContext ctx;        public HeatBeatClientTask(ChannelHandlerContext ctx) {            this.ctx = ctx;        }        @Override        public void run() {            ctx.writeAndFlush(PushMsg.newInstance().pushType(PushTypeEnum.HEARTBEAT_REQ).info("hello client"));        }    }}
PushClientHandler.java

對於服務端的訊息,進行處理。

此處非常簡單,直接做一個輸出即可,實際業務可以根據自己的需求進行處理。

public class PushClientHandler extends SimpleChannelInboundHandler<PushMsg> {    @Override    protected void channelRead0(ChannelHandlerContext ctx, PushMsg msg) throws Exception {        //TODO: 你可以在這裡實現更加複雜的邏輯。        System.out.println("Received info from server: " + msg);    }}
測試驗證啟動服務端服務端日誌
九月 29, 2019 4:57:08 下午 io.netty.handler.logging.LoggingHandler channelRegistered資訊: [id: 0xe899a8b1] REGISTERED九月 29, 2019 4:57:08 下午 io.netty.handler.logging.LoggingHandler bind資訊: [id: 0xe899a8b1] BIND: 0.0.0.0/0.0.0.0:8888server start on 8888...九月 29, 2019 4:57:08 下午 io.netty.handler.logging.LoggingHandler channelActive資訊: [id: 0xe899a8b1, L:/0:0:0:0:0:0:0:0:8888] ACTIVE九月 29, 2019 4:57:17 下午 io.netty.handler.logging.LoggingHandler channelRead資訊: [id: 0xe899a8b1, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xcd6512eb, L:/127.0.0.1:8888 - R:/127.0.0.1:57451]九月 29, 2019 4:57:17 下午 io.netty.handler.logging.LoggingHandler channelReadComplete資訊: [id: 0xe899a8b1, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
啟動客戶端服務端日誌
Received client heart beat : hello clientStart push...start push client channel : 00155dfffe2b601d-00004658-00000001-2d40d35bac46078d-cd6512ebEnd push...
客戶端日誌
client started: 00155dfffe2b601d-0000124c-00000000-d9cc2eabac460700-140980b9連線成功,服務端資訊: 寶塔鎮河妖連線成功,開始傳送心跳包。。。接收到 server 響應: pongReceived info from server: PushMsg{pushType=MSG_PUSH, info='hello client'}
後續心跳資訊服務端
Received client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello clientReceived client heart beat : hello client
客戶端
接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong接收到 server 響應: pong
小結

實際上掌握這個能力,就可以實現很多想要實現的功能。

比如最常見的配置中心,就是這種推拉結合的策略。也可以自己從零寫一個 RPC 框架之類的。

我是老馬,期待與你的下次相遇。

15
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Netty 實戰:如何實現檔案伺服器?