下面我們一起來學習下如何使用 java 實現 BIO/NIO/AIO 這 3 種不同的網路 IO 模型程式設計。
BIO 程式設計BIO 作為最基礎的 IO 版本,實現起來比較簡單。
比如我的實現是直接阻塞當前執行緒的,這當然非常的不友好。
可以使用線執行緒池的方式進行最佳化改進。
執行緒版本public class TimeThreadServer { public static void main(String[] args) throws IOException { final int port = 8088; ServerSocket serverSocket = new ServerSocket(port); System.out.println("server started at port " + port); // 迴圈監聽 while (true) { Socket socket = serverSocket.accept(); System.out.println("客戶端連線成功"); new ServerHandler(socket).start(); } } static class ServerHandler extends Thread { private final Socket socket; ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { try { // 讀取客戶端的資訊 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("Server Recevie: " + bufferedReader.readLine()); // 讀取客戶端的資訊 PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); String currentTime = System.currentTimeMillis()+""; printWriter.println(currentTime); } catch (IOException e) { e.printStackTrace(); } } }}
執行緒池版本public static void main(String[] args) throws IOException { final int port = 8088; ServerSocket serverSocket = new ServerSocket(port); System.out.println("server started at port " + port); ExecutorService executorService = Executors.newFixedThreadPool(2); // 迴圈監聽 while (true) { Socket socket = serverSocket.accept(); System.out.println("客戶端連線成功"); // 執行緒池處理 executorService.submit(new ServerHandler(socket)); }}
其他程式碼保持不變。
優缺點執行緒池版本的 BIO 又被稱作偽非同步 IO。
屬於在 NIO 還沒有流行之前的一種實戰解決方案。
這種方式的效能和 BIO 想比較提升了很多,實現起來也比較簡單,但是可靠性相對較差。
NIO 基本概念BufferJava NIO Buffers用於和NIO Channel互動。正如你已經知道的,我們從channel中讀取資料到buffers裡,從buffer把資料寫入到channels.
buffer 本質上就是一塊記憶體區,可以用來寫入資料,並在稍後讀取出來。
這塊記憶體被NIO Buffer包裹起來,對外提供一系列的讀寫方便開發的介面。
ChannelJava NIO Channel通道和流非常相似,主要有以下幾點區別:
通道可以讀也可以寫,流一般來說是單向的(只能讀或者寫)。通道可以非同步讀寫。通道總是基於緩衝區Buffer來讀寫。Selector用單執行緒處理多個channels的好處是我需要更少的執行緒來處理channel。
實際上,你甚至可以用一個執行緒來處理所有的channels。
從作業系統的角度來看,切換執行緒開銷是比較昂貴的,並且每個執行緒都需要佔用系統資源,因此暫用執行緒越少越好。
需要留意的是,現代作業系統和CPU在多工處理上已經變得越來越好,所以多執行緒帶來的影響也越來越小。
簡而言之,透過Selector我們可以實現單執行緒操作多個channel。
NIO 實現方式NIO 採取通道(Channel)和緩衝區(Buffer)來傳輸和儲存資料,它是非阻塞式的 I/O,即在等待連線、讀寫資料(這些都是在一執行緒以客戶端的程式中會阻塞執行緒的操作)的時候,程式也可以做其他事情,以實現執行緒的非同步操作。
考慮一個即時訊息伺服器,可能有上千個客戶端同時連線到伺服器,但是在任何時刻只有非常少量的訊息需要讀取和分發(如果採用執行緒池或者一執行緒一客戶端方式,則會非常浪費資源),這就需要一種方法能阻塞等待,直到有一個通道可以進行 I/O 操作。
NIO 的 Selector 選擇器就實現了這樣的功能,一個 Selector 例項可以同時檢查一組通道的 I/O 狀態,它就類似一個觀察者,只要我們把需要探知的 SocketChannel 告訴 Selector,我們接著做別的事情,當有事件(比如,連線開啟、資料到達等)發生時,它會通知我們,傳回一組 SelectionKey,我們讀取這些 Key,就會獲得我們剛剛註冊過的 SocketChannel,然後,我們從這個 Channel 中讀取資料,接著我們可以處理這些資料。
Selector 內部原理實際是在做一個對所註冊的 Channel 的輪詢訪問,不斷的輪詢(目前就這一個演算法),一旦輪詢到一個 Channel 有所註冊的事情發生,比如資料來了,它就會讀取 Channel 中的資料,並對其進行處理。
要使用選擇器,需要建立一個 Selector 例項,並將其註冊到想要監控的通道上(透過 Channel 的方法實現)。
最後呼叫選擇器的 select()方法,該方法會阻塞等待,直到有一個或多個通道準備好了 I/O 操作或等待超時,或另一個執行緒呼叫了該選擇器的 wakeup()方法。
現在,在一個單獨的執行緒中,透過呼叫 select()方法,就能檢查多個通道是否準備好進行 I/O 操作,由於非阻塞 I/O 的非同步特性,在檢查的同時,我們也可以執行其他任務。
服務端步驟(1)建立一個 Selector 例項;
(2)將其註冊到各種通道,並指定每個通道上感興趣的I/O操作;
(3)重複執行:
呼叫一種 select() 方法;獲取選取的鍵列表;對於已選鍵集中的每個鍵: 獲取通道,並從鍵中獲取附件(如果為通道及其相關的 key 添加了附件的話); 確定準備就緒的操縱並執行,如果是 accept 操作,將接收的通道設定為非阻塞模式,並註冊到選擇器; 如果需要,修改鍵的興趣操作集; 從已選鍵集中移除鍵。
程式碼實現import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.concurrent.TimeUnit;/** * @author binbin.hou * @since 1.0.0 */public class NioTcpServer { /** * 緩衝區的長度 */ private static final int BUFSIZE = 256; /** * select方法等待通道準備好的最長時間 */ private static final int TIMEOUT = 3000; /** * 監聽的埠號 */ private static final int PORT = 18888; public static void main(String[] args) throws IOException, InterruptedException { // 1. 例項化一個通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 設定為非阻塞模式 serverSocketChannel.configureBlocking(false); // 繫結監聽的埠 serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); System.out.println("Server started listen on: " + PORT); // 2. 構建一個 Selector,用於監聽 Channel 的狀態 Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //3. 不斷迴圈等待 while (true) { //3.1 迴圈等待直到有通道已經準備好 if(selector.select(TIMEOUT) == 0) { System.out.println("."); TimeUnit.SECONDS.sleep(1); continue; } //3.2 遍歷多有的 key Iterator<SelectionKey> selectionKeySetIter = selector.selectedKeys().iterator(); while(selectionKeySetIter.hasNext()) { SelectionKey selectionKey = selectionKeySetIter.next(); // accept I/O形式 if(selectionKey.isAcceptable()) { ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel(); // 獲取客戶端 channel SocketChannel socketChannel = serverSocketChannel1.accept(); socketChannel.configureBlocking(false); // 選擇器註冊監聽的事件,同時制定關聯的附件 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE, ByteBuffer.allocate(BUFSIZE)); } // 客戶端通道已經準備好了讀取資料到 buffer if(selectionKey.isReadable()) { // 讀取程式碼 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 獲取對應的附件資訊 ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); long bufferRead = socketChannel.read(byteBuffer); //客戶端關閉的連結。可以安全關閉 if(bufferRead == -1) { socketChannel.close(); } else { // 緩衝區讀取到了資料,將其感興趣的操作設定為可讀可寫。 selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 列印讀取的內容 System.out.println("Server read: " + new String(byteBuffer.array())); } } // 寫入處理 if(selectionKey.isValid() && selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 獲取附件 ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); // 重置緩衝區,準備將資料寫入到通道 byteBuffer.flip(); socketChannel.write(byteBuffer); //Tells whether there are any elements between the current position and the limit. // 如果已經全部寫入到通道,則將該通道感興趣的操作標識為讀 if(!byteBuffer.hasRemaining()) { selectionKey.interestOps(SelectionKey.OP_READ); } // 為讀取更多的資料騰出空間 byteBuffer.compact(); } // 手動刪除 selectionKeySetIter.remove(); } } }}
客戶端程式碼實現import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.concurrent.TimeUnit;/** * @author binbin.hou * @since 1.0.0 */public class NioTcpClient { /** * 監聽的埠號 */ private static final int PORT = 18888; public static void main(String[] args) throws IOException, InterruptedException { //1. 設定為非阻塞 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress(PORT)); //2. 連線中... while (!socketChannel.finishConnect()) { System.out.println("."); TimeUnit.SECONDS.sleep(1); } System.out.println("\n"); //3. 寫入/讀取資訊 String info = "hello nio test"; ByteBuffer readBuffer = ByteBuffer.allocate(info.length()); ByteBuffer writeBuffer = ByteBuffer.wrap(info.getBytes()); int totalReceivedBytes = 0; int receivedBytes = 0; while (totalReceivedBytes < info.length()) { // 迴圈寫入 while (writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); } receivedBytes = socketChannel.read(readBuffer); // 說明服務端中斷 if(receivedBytes == -1) { throw new RuntimeException("Server has been shut done."); } totalReceivedBytes += receivedBytes; } System.out.println("Client received from server: " + new String(readBuffer.array())); socketChannel.close(); }}
測試執行服務端服務端Server started listen on: 18888
執行客戶端客戶端
Client received from server: hello nio test
服務端
...Server read: hello nio test ...
JDK AIOjdk7中新增了一些與檔案(網路)I/O相關的一些api。這些API被稱為NIO.2,或稱為AIO(Asynchronous I/O)。
AIO最大的一個特性就是非同步能力,這種能力對socket與檔案I/O都起作用。
實現方式Future 方式即提交一個 I/O 操作請求(accept/read/write),返回一個 Future。
然後您可以對 Future 進行檢查(呼叫get(timeout)),確定它是否完成,或者阻塞 IO 操作直到操作正常完成或者超時異常。
使用 Future 方式很簡單,需要注意的是,因為Future.get()是同步的,所以如果不仔細考慮使用場合,使用 Future 方式可能很容易進入完全同步的程式設計模式,從而使得非同步操作成為一個擺設。
如果這樣,那麼原來舊版本的 Socket API 便可以完全勝任,大可不必使用非同步 I/O.
Callback 方式即提交一個 I/O 操作請求,並且指定一個 CompletionHandler。
當非同步 I/O 操作完成時,便傳送一個通知,此時這個 CompletionHandler 物件的 completed 或者 failed 方法將會被呼叫。
效能因為AIO的實施需充分呼叫OS參與,IO需要作業系統支援、併發也同樣需要作業系統的支援,所以效能方面不同作業系統差異會比較明顯。
Future 實現方式Serverimport java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class AioFutureServer { private static final int DEFAULT_PORT = 12345; private AsynchronousServerSocketChannel serverSocketChannel; public AioFutureServer() throws IOException { serverSocketChannel = AsynchronousServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(DEFAULT_PORT)); System.out.println("Server listen on port: " + DEFAULT_PORT); } public void startWithFuture() throws InterruptedException, ExecutionException, TimeoutException { while (true) { // 迴圈接收客戶端請求 Future<AsynchronousSocketChannel> future = serverSocketChannel.accept(); // get() 是為了確保 accept 到一個連線 AsynchronousSocketChannel socket = future.get(); handleWithFuture(socket); } } /** * 處理未來的資訊 * @param channel 非同步客戶端 */ private void handleWithFuture(AsynchronousSocketChannel channel) throws InterruptedException, ExecutionException, TimeoutException { ByteBuffer readBuf = ByteBuffer.allocate(8); readBuf.clear(); // 一次可能讀不完 while (true) { //get 是為了確保 read 完成,超時時間可以有效避免DOS攻擊,如果客戶端一直不傳送資料,則進行超時處理 Integer integer = channel.read(readBuf).get(10, TimeUnit.SECONDS); System.out.println("read: " + integer); if (integer == -1) { break; } readBuf.flip(); System.out.println("received: " + Charset.forName("UTF-8").decode(readBuf)); readBuf.clear(); } } public static void main(String[] args) throws IOException, InterruptedException, ExecutionException, TimeoutException { new AioFutureServer().startWithFuture(); }}
客戶端import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class AioClient { private static final int DEFAULT_PORT = 12345; public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); client.connect(new InetSocketAddress("localhost", DEFAULT_PORT)).get(); client.write(ByteBuffer.wrap("123456789".getBytes())); }}
測試啟動服務端Server listen on port: 12345
啟動客戶端
服務端日誌
read: 8received: 12345678read: 1received: 9Exception in thread "main" java.util.concurrent.ExecutionException: java.io.IOException: 指定的網路名不再可用。
Callback 模式服務端
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;import java.util.concurrent.TimeUnit;public class AioCompletionServer { private static final int DEFAULT_PORT = 12345; private AsynchronousServerSocketChannel serverSocketChannel; public AioCompletionServer() throws IOException { serverSocketChannel = AsynchronousServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(DEFAULT_PORT)); System.out.println("Server listen on port: " + DEFAULT_PORT); } /** * 使用回撥的方式 */ public void startWithCompletionHandler() { serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel result, Object attachment) { // 再此接收客戶端連線 serverSocketChannel.accept(null, this); // 處理結果 handleWithCompletionHandler(result); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); } /** * 處理非同步的結果 * @param channel 客戶端通道 */ private void handleWithCompletionHandler(final AsynchronousSocketChannel channel) { try { final long timeout = 10L; final ByteBuffer buffer = ByteBuffer.allocate(8); // 再次讀取,還是一種回撥的方式。 channel.read(buffer, timeout, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { System.out.println("read:" + result); if (result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } buffer.flip(); System.out.println("received message:" + Charset.forName("UTF-8").decode(buffer)); buffer.clear(); // 遞迴呼叫,直到結束為止。 channel.read(buffer, timeout, TimeUnit.SECONDS, null, this); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException { new AioCompletionServer().startWithCompletionHandler(); // 沉睡等待處理。 TimeUnit.SECONDS.sleep(100); }}
客戶端同上
小結本文講述了 jdk 實現的 bio/nio/aio 的方式,你是否會感覺 jdk 中的 api 設計過於複雜呢?
下一節我們將透過 netty 框架實現上述功能,並講述我們為什麼要選擇 netty 作為網路開發的基本工具。
我是老馬,期待與你的下次相遇。