TCP為什麼會粘包/拆包
我們知道,TCP是以一種流的方式來進行網路轉播的,當tcp三次握手建立通訊後,客戶端服務端之間就建立了一種通訊管道,我們可以想象成自來水管道,流出來的水是連城一片的,是沒有分界線的。
TCP底層並不瞭解上層的業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分。
所以對於我們應用層而言。
我們直觀是傳送一個個連續完整TCP資料包的,而在底層就可能會出現將一個完整的TCP拆分成多個包傳送或者將多個包封裝成一個大的資料包傳送。
這就是所謂的TCP粘包和拆包。
當發生TCP粘包/拆包會發生什麼情況我們舉一個簡單例子說明:
客戶端向服務端傳送兩個資料包:第一個內容為 123;第二個內容為456。
服務端接受一個數據並做相應的業務處理(這裡就是列印接受資料加一個逗號)。
那麼服務端輸出結果將會出現下面四種情況
拆包
如何解決主流的協議解決方案可以歸納如下:
(1) 訊息定長,例如每個報文的大小固定為20個位元組,如果不夠,空位補空格;
(2) 在包尾增加回車換行符進行切割;
(3) 將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度的欄位;
(4) 更復雜的應用層協議。
對於之前描述的案例,在這裡我們就可以採取方案1和方案3。
以方案1為例:我們每次傳送的TCP包只有三個數字,那麼我將報文設定為3個位元組大小的,此時,伺服器就會以三個位元組為基準來接受包,以此來解決站包拆包問題。
未考慮拆包/粘包的案例服務端PackTimeServerHandler.javaimport java.nio.charset.StandardCharsets;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class PackTimeServerHandler extends ChannelInboundHandlerAdapter { private static final String NEW_LINE = System.getProperty("line.separator"); private int count = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 讀取收到的資訊 ByteBuf byteBuf = (ByteBuf)msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); // 獲取內容,移除掉換行符號 String body = new String(bytes, 0, bytes.length-NEW_LINE.length(), StandardCharsets.UTF_8); count++; System.out.println("Server revice body from client : " + body + ", count is " +count); // 回寫到 client 端時間。 long currentTime = System.currentTimeMillis(); String currentTimeStr = currentTime+""+NEW_LINE; ByteBuf timeBuffer = Unpooled.copiedBuffer(currentTimeStr.getBytes()); ctx.writeAndFlush(timeBuffer); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
DefaultServer.javaimport io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;public class DefaultServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new PackTimeServerHandler()) .bind(8888) .syncUninterruptibly(); // 優雅關閉 channelFuture.channel().closeFuture().syncUninterruptibly(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}
客戶端PackTimeClientHandler.javaimport java.nio.charset.StandardCharsets;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class PackTimeClientHandler extends ChannelInboundHandlerAdapter { private static final String NEW_LINE = System.getProperty("line.separator"); private static final String QUERY_INFO = "query time " + NEW_LINE; private int count = 0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 寫入到 channel 中 ByteBuf byteBuf; for(int i = 0 ; i < 100; i++) { byteBuf = Unpooled.copiedBuffer(QUERY_INFO.getBytes()); ctx.writeAndFlush(byteBuf); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String info = new String(bytes, StandardCharsets.UTF_8); System.out.println("Client received from server : " + info + "count " + count++); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
DefaultClient.javaimport io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class DefaultClient { public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap serverBootstrap = new Bootstrap(); ChannelFuture channelFuture = serverBootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new PackTimeClientHandler()) .connect("localhost", 8888) .syncUninterruptibly(); // 優雅關閉 channelFuture.channel().closeFuture().syncUninterruptibly(); } finally { workerGroup.shutdownGracefully(); } }}
測試驗證(1) 啟動服務端
(2) 啟動客戶端
客戶端日誌Client received from server : 15689040615561568904061565count 0
服務端
Server revice body from client : query time query time ... query time que, count is 1Server revice body from client : y time query time query time query time query time query time query time query time query time query time query time query time query time query time query time , count is 2
雖然收到了客戶端的 100 次 query time,但是對於客戶端卻只得到了一次反饋。
利用LineBasedFrameDecoder解決TCP粘包問題服務端FixDefaultServer.java核心調整。
.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 新增解碼器 .addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder()) .addLast(new FixPackTimeServerHandler()); } })
FixPackTimeServerHandler.javapublic class FixPackTimeServerHandler extends ChannelInboundHandlerAdapter { private static final String NEW_LINE = System.getProperty("line.separator"); private int count = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 讀取收到的資訊 String body = (String)msg; count++; System.out.println("Server revice body from client : " + body + ", count is " +count); // 回寫到 client 端時間。 long currentTime = System.currentTimeMillis(); String currentTimeStr = currentTime+""+NEW_LINE; ByteBuf timeBuffer = Unpooled.copiedBuffer(currentTimeStr.getBytes()); ctx.writeAndFlush(timeBuffer); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
客戶端FixDefaultClient核心調整
.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder()) .addLast(new FixPackTimeClientHandler()); } })
FixPackTimeClientHandler.javapublic class FixPackTimeClientHandler extends ChannelInboundHandlerAdapter { private static final String NEW_LINE = System.getProperty("line.separator"); private static final String QUERY_INFO = "query time " + NEW_LINE; private int count = 0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 寫入到 channel 中 ByteBuf message; for(int i = 0 ; i < 100; i++) { message = Unpooled.copiedBuffer(QUERY_INFO.getBytes()); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("Client received from server : " + body + "count " + count++); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
測試驗證順序和原來一樣。
客戶端Client received from server : 1568905436714count 0Client received from server : 1568905436723count 1Client received from server : 1568905436723count 2Client received from server : 1568905436723count 3Client received from server : 1568905436724count 4Client received from server : 1568905436724count 5....Client received from server : 1568905436748count 99
服務端
Server revice body from client : query time , count is 1Server revice body from client : query time , count is 2Server revice body from client : query time , count is 3Server revice body from client : query time , count is 4Server revice body from client : query time , count is 5Server revice body from client : query time , count is 6Server revice body from client : query time , count is 7Server revice body from client : query time , count is 8....Server revice body from client : query time , count is 100
原理簡單分析LineBasedFrameDecoder 的原理分析LineBasedFrameDecoder 的工作原理是依次遍歷ByteBuf中的可讀位元組,判斷是否有\n或者\r\n,如果有,就在此位置為結束位置,從可讀索引到結束位置區間的位元組就組成了一行。
它是以換行符為結束標誌的解碼器,支援攜帶結束符和不攜帶結束符2種解碼方式,同時支援配置單行位元組的最大長度。
如果連續讀取到最大長度後仍然沒有發現換行符,就會丟擲異常,同時忽略掉之前的讀到的異常碼流。
StringDecoderStringDecoder 就是將接收到的物件轉換成字串,然後繼續呼叫後面的Handler。
指定分隔符方案透過約定的分隔符進行拆分,也是一種很常見的解決方案。
我們直接指定以 $ 符號作為分隔符號。
服務端程式碼.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("$".getBytes()))) .addLast(new StringDecoder()) .addLast(new DelimiterTimeServerHandler()); }})
其中:
DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("$".getBytes())
我們這裡指定使用分隔符的方式,去處理我們的拆包/黏包問題。
這裡客戶端呼叫的時候,也是以 $ 分隔符作為結尾。
為了整理日誌簡單。本次將呼叫次數降低為 20 次。
客戶端啟動程式碼 handler 也做如下的初始化:
.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("$".getBytes()))) .addLast(new StringDecoder()) .addLast(new DelimiterTimeClientHandler()); }})
測試驗證(1)啟動服務端
(2)啟動客戶端
服務端日誌Server receive from client ask for time , count 1Server receive from client ask for time , count 2Server receive from client ask for time , count 3Server receive from client ask for time , count 4Server receive from client ask for time , count 5Server receive from client ask for time , count 6Server receive from client ask for time , count 7Server receive from client ask for time , count 8Server receive from client ask for time , count 9Server receive from client ask for time , count 10Server receive from client ask for time , count 11Server receive from client ask for time , count 12Server receive from client ask for time , count 13Server receive from client ask for time , count 14Server receive from client ask for time , count 15Server receive from client ask for time , count 16Server receive from client ask for time , count 17Server receive from client ask for time , count 18Server receive from client ask for time , count 19Server receive from client ask for time , count 20
客戶端日誌Client receive from server 1568961677739, count 1Client receive from server 1568961677741, count 2Client receive from server 1568961677742, count 3Client receive from server 1568961677742, count 4Client receive from server 1568961677743, count 5Client receive from server 1568961677743, count 6Client receive from server 1568961677743, count 7Client receive from server 1568961677743, count 8Client receive from server 1568961677744, count 9Client receive from server 1568961677744, count 10Client receive from server 1568961677744, count 11Client receive from server 1568961677745, count 12Client receive from server 1568961677745, count 13Client receive from server 1568961677745, count 14Client receive from server 1568961677745, count 15Client receive from server 1568961677746, count 16Client receive from server 1568961677746, count 17Client receive from server 1568961677746, count 18Client receive from server 1568961677746, count 19Client receive from server 1568961677747, count 20
定長解決方案方案說明有時候直接指定長度,根據長度進行擷取也是一種常見的方式。
Netty 解決方案Netty 中提供了類 FixedLengthFrameDecoder
netty 設計的有優點netty 的這種泳道式設計,使得後期的拓展變得非常簡單。
而且提供了大量豐富而強大的類庫,極大的降低了重複開發的成本。
服務端FixedLengthServerHandler.java非常簡單,直接輸出。
public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 直接列印資訊 String info = (String)msg; System.out.println(info); }}
伺服器 handler 指定.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(20)) .addLast(new StringDecoder()) .addLast(new FixedLengthServerHandler()); }})
測試
直接使用命令列 telnet localhost 8888
然後輸入資訊,列印得到
123asdfasdfasdfasdfasdfasdf12a
小結
本節的內容相對比較簡單,但是確實非常的使用。
這一節主要是參考《Netty 權威指南》中的個人學習筆記,下一節將分析一下 netty 服務端的啟動原始碼。
我是老馬,期待與你的下次相遇。