首頁>技術>

IoT是什麼

The Internet of things的簡稱IoT,即是物聯網的意思

IoT推送系統的設計

比如說,像一些智慧裝置,需要通過APP或者微信中的小程式等,給裝置傳送一條指令,讓這個裝置下載或者播放音樂,那麼需要做什麼才可以完成上面的任務呢?

首先需要推送伺服器,這個伺服器主要負責訊息的分發,不處理業務訊息;裝置會連線到推送伺服器,APP通過把指令傳送到推送伺服器,然後推送伺服器再把指令分發給相應的裝置。

可是,當買裝置的人越來越多,推送伺服器所能承受的壓力就越大,這個時候就需要對推送伺服器做叢集,一臺不行,就搞十臺,那麼還有一個問題,就是推送伺服器增加了,裝置如何找到相應的伺服器,然後和伺服器建立連線呢,註冊中心可以解決這個問題,每一臺伺服器都註冊到註冊中心上,裝置會請求註冊中心,得到推送伺服器的地址,然後再和伺服器建立連線。

而且還會有相應的redis叢集,用來記錄裝置訂閱的主題以及裝置的資訊;APP傳送指令到裝置,其實就是傳送了一串資料,相應的會提供推送API,提供一些介面,通過介面把資料傳送過去;而推送API不是直接去連線推送伺服器的,中間還會有MQ叢集,主要用來訊息的儲存,推送API推送訊息到MQ,推送伺服器從MQ中訂閱訊息,以上就是簡單的IoT推送系統的設計。

下面看下結構圖:

注意:裝置連線到註冊中心的是短連線,裝置和推送伺服器建立的連線是長連線

心跳檢測機制

簡述心跳檢測

心跳檢測,就是判斷對方是否還存活,一般採用定時的傳送一些簡單的包,如果在指定的時間段內沒有收到對方的迴應,則判斷對方已經掛掉

Netty提供了IdleStateHandler類來實現心跳,簡單的使用如下:

pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));

下面是IdleStateHandler的建構函式:

public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}

四個引數說明:

readerIdleTime,讀超時時間writerIdleTime,寫超時時間 allIdleTime,所有事件超時時間 TimeUnit unit,超時時間單位

心跳檢測機制程式碼示例

簡單示例: 服務端:

static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PingServer().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS)); pipeline.addLast(new PingHandler()); //每個連線都有個ConnectionCountHandler對連線記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> { System.out.println("埠繫結成功: " + beginPort); }); System.out.println("服務已啟動!");}public class PingHandler extends SimpleUserEventChannelHandler<IdleStateEvent> { private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes())); private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("pong".equals(str)) { System.out.println(ctx + " ---- " + str); count--; } ctx.fireChannelRead(msg); } @Override protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt.state() == ALL_IDLE) { if (count >= 3) { System.out.println("檢測到客戶端連線無響應,斷開連線:" + ctx.channel()); ctx.close(); return; } count++; System.out.println(ctx.channel() + " ---- ping"); ctx.writeAndFlush(PING_BUF.duplicate()); } ctx.fireUserEventTriggered(evt); }}

客戶端:

//服務端的IP private static final String SERVER_HOST = "localhost"; static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PoneClient().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new PongHandler()); } }); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort); channelFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { System.out.println("連線失敗,退出!"); System.exit(0); } }); try { channelFuture.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }public class PongHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes())); @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("ping".equals(str)) { ctx.writeAndFlush(PONG_BUF.duplicate()); } }}

服務端輸出結果:

百萬長連線優化

連線優化程式碼示例

服務端:

IoT推送系統IoT是什麼The Internet of things的簡稱IoT,即是物聯網的意思,具體的知識請查閱:什麼是Iot?什麼是AIot?IoT推送系統的設計比如說,像一些智慧裝置,需要通過APP或者微信中的小程式等,給裝置傳送一條指令,讓這個裝置下載或者播放音樂,那麼需要做什麼才可以完成上面的任務呢?首先需要推送伺服器,這個伺服器主要負責訊息的分發,不處理業務訊息;裝置會連線到推送伺服器,APP通過把指令傳送到推送伺服器,然後推送伺服器再把指令分發給相應的裝置。可是,當買裝置的人越來越多,推送伺服器所能承受的壓力就越大,這個時候就需要對推送伺服器做叢集,一臺不行,就搞十臺,那麼還有一個問題,就是推送伺服器增加了,裝置如何找到相應的伺服器,然後和伺服器建立連線呢,註冊中心可以解決這個問題,每一臺伺服器都註冊到註冊中心上,裝置會請求註冊中心,得到推送伺服器的地址,然後再和伺服器建立連線。而且還會有相應的redis叢集,用來記錄裝置訂閱的主題以及裝置的資訊;APP傳送指令到裝置,其實就是傳送了一串資料,相應的會提供推送API,提供一些介面,通過介面把資料傳送過去;而推送API不是直接去連線推送伺服器的,中間還會有MQ叢集,主要用來訊息的儲存,推送API推送訊息到MQ,推送伺服器從MQ中訂閱訊息,以上就是簡單的IoT推送系統的設計。下面看下結構圖:注意:裝置連線到註冊中心的是短連線,裝置和推送伺服器建立的連線是長連線心跳檢測機制簡述心跳檢測心跳檢測,就是判斷對方是否還存活,一般採用定時的傳送一些簡單的包,如果在指定的時間段內沒有收到對方的迴應,則判斷對方已經掛掉Netty提供了IdleStateHandler類來實現心跳,簡單的使用如下:pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));複製程式碼下面是IdleStateHandler的建構函式:public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}複製程式碼四個引數說明:1:readerIdleTime,讀超時時間2:writerIdleTime,寫超時時間3:allIdleTime,所有事件超時時間4:TimeUnit unit,超時時間單位心跳檢測機制程式碼示例簡單示例:服務端:static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PingServer().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS)); pipeline.addLast(new PingHandler()); //每個連線都有個ConnectionCountHandler對連線記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> { System.out.println("埠繫結成功: " + beginPort); }); System.out.println("服務已啟動!");}複製程式碼public class PingHandler extends SimpleUserEventChannelHandler<IdleStateEvent> { private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes())); private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("pong".equals(str)) { System.out.println(ctx + " ---- " + str); count--; } ctx.fireChannelRead(msg); } @Override protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt.state() == ALL_IDLE) { if (count >= 3) { System.out.println("檢測到客戶端連線無響應,斷開連線:" + ctx.channel()); ctx.close(); return; } count++; System.out.println(ctx.channel() + " ---- ping"); ctx.writeAndFlush(PING_BUF.duplicate()); } ctx.fireUserEventTriggered(evt); }}複製程式碼客戶端://服務端的IP private static final String SERVER_HOST = "localhost"; static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PoneClient().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new PongHandler()); } }); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort); channelFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { System.out.println("連線失敗,退出!"); System.exit(0); } }); try { channelFuture.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }複製程式碼public class PongHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes())); @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("ping".equals(str)) { ctx.writeAndFlush(PONG_BUF.duplicate()); } }}複製程式碼服務端輸出結果:百萬長連線優化連線優化程式碼示例服務端: static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Server().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //每個連線都有個ConnectionCountHandler對連線記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); //這裡開啟 10000到100099這100個埠 for (int i = 0; i < nPort; i++) { int port = beginPort + i; bootstrap.bind(port).addListener((ChannelFutureListener) future -> { System.out.println("埠繫結成功: " + port); }); } System.out.println("服務已啟動!"); }複製程式碼客戶端://服務端的IP private static final String SERVER_HOST = "192.168.231.129"; static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Client().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); //從10000的埠開始,按埠遞增的方式進行連線 while (!Thread.interrupted()) { port = beginPort + index; try { ChannelFuture channelFuture = bootstrap.connect(serverHost, port); channelFuture.addListener((ChannelFutureListener) future -> {  if (!future.isSuccess()) { System.out.println("連線失敗,退出!"); System.exit(0);  } }); channelFuture.get(); } catch (Exception e) { } if (++index == nPort) { index = 0; } } }複製程式碼ConnectionCountHandler類:public class ConnectionCountHandler extends ChannelInboundHandlerAdapter { //這裡用來對連線數進行記數,每兩秒輸出到控制檯 private static final AtomicInteger nConnection = new AtomicInteger(); static { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { System.out.println("連線數: " + nConnection.get()); }, 0, 2, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) { nConnection.incrementAndGet(); } @Override public void channelInactive(ChannelHandlerContext ctx) { nConnection.decrementAndGet(); }}複製程式碼上述的程式碼會打包成jar放到linux上執行,對於上述的優化來說,程式方面的就暫時不做,下面會從作業系統層面進行優化,讓其支撐起百萬連線。TCP連線四元組在優化之前先來看下網路裡的一個小知識,TCP連線四元組:伺服器的IP+伺服器的POST+客戶端的IP+客戶端的POST埠的範圍一般是1到65535:配置優化現在在虛擬機器上安裝兩個linux系統,配置分別是:地址CPU記憶體JDK作用192.168.15.130VM-4核8G1.8客戶端192.168.15.128VM-4核8G1.8服務端啟動服務端:java -Xmx4g -Xms4g -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Server > out.log 2>&1 &啟動客戶端:java -Xmx4g -Xms4g -Dserver.host=192.168.15.128 -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Client啟動服務端後可以使用tail -f命令檢視out.log中的日誌:客戶端啟動後,如果報了以下錯誤,需要修改系統的檔案最大控制代碼和程序的檔案最大控制代碼:Caused by: java.io.IOException: Too many open files at sun.nio.ch.FileDispatcherImpl.init(Native Method) at sun.nio.ch.FileDispatcherImpl.<clinit>(FileDispatcherImpl.java:35) ... 8 more複製程式碼優化系統最大控制代碼:檢視作業系統最大檔案控制代碼數,執行命令cat /proc/sys/fs/file-max,檢視最大控制代碼數是否滿足需要,如果不滿足,通過vim /etc/sysctl.conf命令插入如下配置:fs.file-max = 1000000複製程式碼設定單程序開啟的檔案最大控制代碼數,執行命令ulimit -a檢視當前設定是否滿足要求:[root@test-server2 download]# ulimit -a | grep "open files"open files  (-n) 1024複製程式碼當併發接入的Tcp連線數超過上限時,就會提示“Too many open files”,所有的新客戶端接入將會失敗。通過vim /etc/security/limits.conf 修改配置引數:* soft nofile 1000000* hard nofile 1000000複製程式碼修改配置引數後登出生效。如果程式被中斷,或報了異常java.io.IOException: 裝置上沒有空間 at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method) at sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:299) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:268) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) at sun.nio.ch.SelectorImpl.selectNow(SelectorImpl.java:105) at io.netty.channel.nio.SelectedSelectionKeySetSelector.selectNow(SelectedSelectionKeySetSelector.java:56) at io.netty.channel.nio.NioEventLoop.selectNow(NioEventLoop.java:750) at io.netty.channel.nio.NioEventLoop$1.get(NioEventLoop.java:71) at io.netty.channel.DefaultSelectStrategy.calculateStrategy(DefaultSelectStrategy.java:30) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:426) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)複製程式碼此時可以檢視作業系統的日誌more /var/log/messages,或在程式啟動時執行tail -f /var/log/messages 監控日誌。如果日誌中出現以下內容,說明需要優化TCP/IP引數Jun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets複製程式碼==優化TCP/IP相關引數:==檢視客戶端埠範圍限制cat /proc/sys/net/ipv4/ip_local_port_range複製程式碼通過vim /etc/sysctl.conf 修改網路引數客戶端修改埠範圍的限制net.ipv4.ip_local_port_range = 1024 65535複製程式碼優化TCP引數net.ipv4.tcp_mem = 786432 2097152 3145728net.ipv4.tcp_wmem = 4096 4096 16777216net.ipv4.tcp_rmem = 4096 4096 16777216net.ipv4.tcp_keepalive_time = 1800net.ipv4.tcp_keepalive_intvl = 20net.ipv4.tcp_keepalive_probes = 5net.ipv4.tcp_tw_reuse = 1net.ipv4.tcp_tw_recycle = 1net.ipv4.tcp_fin_timeout = 30複製程式碼==引數說明:==net.ipv4.tcp_mem: 分配給tcp連線的記憶體,單位是page(1個Page通常是4KB,可以通過getconf PAGESIZE命令檢視),三個值分別是最小、預設、和最大。比如以上配置中的最大是3145728,那分配給tcp的最大記憶體=31457284 / 1024 / 1024 = 12GB。一個TCP連線大約佔7.5KB,粗略可以算出百萬連線≈7.51000000/4=1875000 3145728足以滿足測試所需。net.ipv4.tcp_wmem: 為每個TCP連線分配的寫緩衝區記憶體大小,單位是位元組。三個值分別是最小、預設、和最大。net.ipv4.tcp_rmem: 為每個TCP連線分配的讀緩衝區記憶體大小,單位是位元組。三個值分別是最小、預設、和最大。net.ipv4.tcp_keepalive_time: 最近一次資料包傳送與第一次keep alive探測訊息傳送的事件間隔,用於確認TCP連線是否有效。net.ipv4.tcp_keepalive_intvl: 在未獲得探測訊息響應時,傳送探測訊息的時間間隔。net.ipv4.tcp_keepalive_probes: 判斷TCP連線失效連續傳送的探測訊息個數,達到之後判定連線失效。net.ipv4.tcp_tw_reuse: 是否允許將TIME_WAIT Socket 重新用於新的TCP連線,預設為0,表示關閉。net.ipv4.tcp_tw_recycle: 是否開啟TIME_WAIT Socket 的快速回收功能,預設為0,表示關閉。net.ipv4.tcp_fin_timeout: 套接字自身關閉時保持在FIN_WAIT_2 狀態的時間。預設為60。作者:狐言不胡言連結:https://juejin.im/post/6861560765200105486來源:掘金著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。

客戶端:

//服務端的IP private static final String SERVER_HOST = "192.168.231.129"; static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Client().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); //從10000的埠開始,按埠遞增的方式進行連線 while (!Thread.interrupted()) { port = beginPort + index; try { ChannelFuture channelFuture = bootstrap.connect(serverHost, port); channelFuture.addListener((ChannelFutureListener) future -> {  if (!future.isSuccess()) { System.out.println("連線失敗,退出!"); System.exit(0);  } }); channelFuture.get(); } catch (Exception e) { } if (++index == nPort) { index = 0; } } }

ConnectionCountHandler類:

public class ConnectionCountHandler extends ChannelInboundHandlerAdapter { //這裡用來對連線數進行記數,每兩秒輸出到控制檯 private static final AtomicInteger nConnection = new AtomicInteger(); static { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { System.out.println("連線數: " + nConnection.get()); }, 0, 2, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) { nConnection.incrementAndGet(); } @Override public void channelInactive(ChannelHandlerContext ctx) { nConnection.decrementAndGet(); }}

上述的程式碼會打包成jar放到linux上執行,對於上述的優化來說,程式方面的就暫時不做,下面會從作業系統層面進行優化,讓其支撐起百萬連線。

TCP連線四元組

在優化之前先來看下網路裡的一個小知識,TCP連線四元組: 伺服器的IP+伺服器的POST+客戶端的IP+客戶端的POST

埠的範圍一般是1到65535:

配置優化

現在在虛擬機器上安裝兩個linux系統,配置分別是:

地址CPU記憶體JDK作用192.168.15.130VM-4核8G1.8客戶端192.168.15.128VM-4核8G1.8服務端

啟動服務端: java -Xmx4g -Xms4g -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Server > out.log 2>&1 & 啟動客戶端: java -Xmx4g -Xms4g -Dserver.host=192.168.15.128 -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Client

啟動服務端後可以使用tail -f命令檢視out.log中的日誌:

客戶端啟動後,如果報了以下錯誤,需要修改系統的檔案最大控制代碼和程序的檔案最大控制代碼:

Caused by: java.io.IOException: Too many open files at sun.nio.ch.FileDispatcherImpl.init(Native Method) at sun.nio.ch.FileDispatcherImpl.<clinit>(FileDispatcherImpl.java:35) ... 8 more

優化系統最大控制代碼: 檢視作業系統最大檔案控制代碼數,執行命令cat /proc/sys/fs/file-max,檢視最大控制代碼數是否滿足需要,如果不滿足,通過vim /etc/sysctl.conf命令插入如下配置:

fs.file-max = 1000000
設定單程序開啟的檔案最大控制代碼數,執行命令ulimit -a檢視當前設定是否滿足要求:
[root@test-server2 download]# ulimit -a | grep "open files"open files  (-n) 1024

當併發接入的Tcp連線數超過上限時,就會提示“Too many open files”,所有的新客戶端接入將會失敗。通過vim /etc/security/limits.conf 修改配置引數:

* soft nofile 1000000* hard nofile 1000000

修改配置引數後登出生效。

如果程式被中斷,或報了異常
java.io.IOException: 裝置上沒有空間 at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method) at sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:299) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:268) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) at sun.nio.ch.SelectorImpl.selectNow(SelectorImpl.java:105) at io.netty.channel.nio.SelectedSelectionKeySetSelector.selectNow(SelectedSelectionKeySetSelector.java:56) at io.netty.channel.nio.NioEventLoop.selectNow(NioEventLoop.java:750) at io.netty.channel.nio.NioEventLoop$1.get(NioEventLoop.java:71) at io.netty.channel.DefaultSelectStrategy.calculateStrategy(DefaultSelectStrategy.java:30) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:426) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
此時可以檢視作業系統的日誌more /var/log/messages,或在程式啟動時執行tail -f /var/log/messages 監控日誌。如果日誌中出現以下內容,說明需要優化TCP/IP引數
Jun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets

==優化TCP/IP相關引數:==

檢視客戶端埠範圍限制
cat /proc/sys/net/ipv4/ip_local_port_range
通過vim /etc/sysctl.conf 修改網路引數客戶端修改埠範圍的限制
net.ipv4.ip_local_port_range = 1024 65535
優化TCP引數
net.ipv4.tcp_mem = 786432 2097152 3145728net.ipv4.tcp_wmem = 4096 4096 16777216net.ipv4.tcp_rmem = 4096 4096 16777216net.ipv4.tcp_keepalive_time = 1800net.ipv4.tcp_keepalive_intvl = 20net.ipv4.tcp_keepalive_probes = 5net.ipv4.tcp_tw_reuse = 1net.ipv4.tcp_tw_recycle = 1net.ipv4.tcp_fin_timeout = 30

==引數說明:==

net.ipv4.tcp_mem: 分配給tcp連線的記憶體,單位是page(1個Page通常是4KB,可以通過getconf PAGESIZE命令檢視),三個值分別是最小、預設、和最大。比如以上配置中的最大是3145728,那分配給tcp的最大記憶體=31457284 / 1024 / 1024 = 12GB。一個TCP連線大約佔7.5KB,粗略可以算出百萬連線≈7.51000000/4=1875000 3145728足以滿足測試所需。

net.ipv4.tcp_wmem: 為每個TCP連線分配的寫緩衝區記憶體大小,單位是位元組。三個值分別是最小、預設、和最大。

net.ipv4.tcp_rmem: 為每個TCP連線分配的讀緩衝區記憶體大小,單位是位元組。三個值分別是最小、預設、和最大。

net.ipv4.tcp_keepalive_time: 最近一次資料包傳送與第一次keep alive探測訊息傳送的事件間隔,用於確認TCP連線是否有效。

net.ipv4.tcp_keepalive_intvl: 在未獲得探測訊息響應時,傳送探測訊息的時間間隔。

net.ipv4.tcp_keepalive_probes: 判斷TCP連線失效連續傳送的探測訊息個數,達到之後判定連線失效。

net.ipv4.tcp_tw_reuse: 是否允許將TIME_WAIT Socket 重新用於新的TCP連線,預設為0,表示關閉。

net.ipv4.tcp_tw_recycle: 是否開啟TIME_WAIT Socket 的快速回收功能,預設為0,表示關閉。

net.ipv4.tcp_fin_timeout: 套接字自身關閉時保持在FIN_WAIT_2 狀態的時間。預設為60。

轉載於:https://juejin.im/post/6861560765200105486

247
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 跨端方案的三大困境