首頁>技術>

下面我們一起來學習下如何使用 java 實現 BIO/NIO/AIO 這 3 種不同的網路 IO 模型程式設計。

BIO 程式設計

BIO 作為最基礎的 IO 版本,實現起來比較簡單。

比如我的實現是直接阻塞當前執行緒的,這當然非常的不友好。

可以使用線執行緒池的方式進行最佳化改進。

執行緒版本
public class TimeThreadServer {    public static void main(String[] args) throws IOException {        final int port = 8088;        ServerSocket serverSocket = new ServerSocket(port);        System.out.println("server started at port " + port);        // 迴圈監聽        while (true) {            Socket socket = serverSocket.accept();            System.out.println("客戶端連線成功");            new ServerHandler(socket).start();        }    }    static class ServerHandler extends Thread {        private final Socket socket;        ServerHandler(Socket socket) {            this.socket = socket;        }        @Override        public void run() {            try {                // 讀取客戶端的資訊                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));                System.out.println("Server Recevie: " + bufferedReader.readLine());                // 讀取客戶端的資訊                PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);                String currentTime = System.currentTimeMillis()+"";                printWriter.println(currentTime);            } catch (IOException e) {                e.printStackTrace();            }        }    }}
執行緒池版本
public static void main(String[] args) throws IOException {    final int port = 8088;    ServerSocket serverSocket = new ServerSocket(port);    System.out.println("server started at port " + port);    ExecutorService executorService = Executors.newFixedThreadPool(2);    // 迴圈監聽    while (true) {        Socket socket = serverSocket.accept();        System.out.println("客戶端連線成功");        // 執行緒池處理        executorService.submit(new ServerHandler(socket));    }}

其他程式碼保持不變。

優缺點

執行緒池版本的 BIO 又被稱作偽非同步 IO。

屬於在 NIO 還沒有流行之前的一種實戰解決方案。

這種方式的效能和 BIO 想比較提升了很多,實現起來也比較簡單,但是可靠性相對較差。

NIO 基本概念Buffer

Java NIO Buffers用於和NIO Channel互動。正如你已經知道的,我們從channel中讀取資料到buffers裡,從buffer把資料寫入到channels.

buffer 本質上就是一塊記憶體區,可以用來寫入資料,並在稍後讀取出來。

這塊記憶體被NIO Buffer包裹起來,對外提供一系列的讀寫方便開發的介面。

Channel

Java NIO Channel通道和流非常相似,主要有以下幾點區別:

通道可以讀也可以寫,流一般來說是單向的(只能讀或者寫)。通道可以非同步讀寫。通道總是基於緩衝區Buffer來讀寫。Selector

用單執行緒處理多個channels的好處是我需要更少的執行緒來處理channel。

實際上,你甚至可以用一個執行緒來處理所有的channels。

從作業系統的角度來看,切換執行緒開銷是比較昂貴的,並且每個執行緒都需要佔用系統資源,因此暫用執行緒越少越好。

需要留意的是,現代作業系統和CPU在多工處理上已經變得越來越好,所以多執行緒帶來的影響也越來越小。

簡而言之,透過Selector我們可以實現單執行緒操作多個channel。

NIO 實現方式

NIO 採取通道(Channel)和緩衝區(Buffer)來傳輸和儲存資料,它是非阻塞式的 I/O,即在等待連線、讀寫資料(這些都是在一執行緒以客戶端的程式中會阻塞執行緒的操作)的時候,程式也可以做其他事情,以實現執行緒的非同步操作。

考慮一個即時訊息伺服器,可能有上千個客戶端同時連線到伺服器,但是在任何時刻只有非常少量的訊息需要讀取和分發(如果採用執行緒池或者一執行緒一客戶端方式,則會非常浪費資源),這就需要一種方法能阻塞等待,直到有一個通道可以進行 I/O 操作。

NIO 的 Selector 選擇器就實現了這樣的功能,一個 Selector 例項可以同時檢查一組通道的 I/O 狀態,它就類似一個觀察者,只要我們把需要探知的 SocketChannel 告訴 Selector,我們接著做別的事情,當有事件(比如,連線開啟、資料到達等)發生時,它會通知我們,傳回一組 SelectionKey,我們讀取這些 Key,就會獲得我們剛剛註冊過的 SocketChannel,然後,我們從這個 Channel 中讀取資料,接著我們可以處理這些資料。

Selector 內部原理實際是在做一個對所註冊的 Channel 的輪詢訪問,不斷的輪詢(目前就這一個演算法),一旦輪詢到一個 Channel 有所註冊的事情發生,比如資料來了,它就會讀取 Channel 中的資料,並對其進行處理。

要使用選擇器,需要建立一個 Selector 例項,並將其註冊到想要監控的通道上(透過 Channel 的方法實現)。

最後呼叫選擇器的 select()方法,該方法會阻塞等待,直到有一個或多個通道準備好了 I/O 操作或等待超時,或另一個執行緒呼叫了該選擇器的 wakeup()方法。

現在,在一個單獨的執行緒中,透過呼叫 select()方法,就能檢查多個通道是否準備好進行 I/O 操作,由於非阻塞 I/O 的非同步特性,在檢查的同時,我們也可以執行其他任務。

服務端步驟

(1)建立一個 Selector 例項;

(2)將其註冊到各種通道,並指定每個通道上感興趣的I/O操作;

(3)重複執行:

呼叫一種 select() 方法;獲取選取的鍵列表;對於已選鍵集中的每個鍵:    獲取通道,並從鍵中獲取附件(如果為通道及其相關的 key 添加了附件的話);    確定準備就緒的操縱並執行,如果是 accept 操作,將接收的通道設定為非阻塞模式,並註冊到選擇器;    如果需要,修改鍵的興趣操作集;    從已選鍵集中移除鍵。
程式碼實現
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.concurrent.TimeUnit;/** * @author binbin.hou * @since 1.0.0 */public class NioTcpServer {    /**     * 緩衝區的長度     */    private static final int BUFSIZE = 256;    /**     * select方法等待通道準備好的最長時間     */    private static final int TIMEOUT = 3000;    /**     * 監聽的埠號     */    private static final int PORT = 18888;    public static void main(String[] args) throws IOException, InterruptedException {        // 1. 例項化一個通道        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();        // 設定為非阻塞模式        serverSocketChannel.configureBlocking(false);        // 繫結監聽的埠        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));        System.out.println("Server started listen on: " + PORT);        // 2. 構建一個 Selector,用於監聽 Channel 的狀態        Selector selector = Selector.open();        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        //3. 不斷迴圈等待        while (true) {            //3.1 迴圈等待直到有通道已經準備好            if(selector.select(TIMEOUT) == 0) {                System.out.println(".");                TimeUnit.SECONDS.sleep(1);                continue;            }            //3.2 遍歷多有的 key            Iterator<SelectionKey> selectionKeySetIter = selector.selectedKeys().iterator();            while(selectionKeySetIter.hasNext()) {                SelectionKey selectionKey = selectionKeySetIter.next();                // accept I/O形式                if(selectionKey.isAcceptable()) {                    ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();                    // 獲取客戶端 channel                    SocketChannel socketChannel = serverSocketChannel1.accept();                    socketChannel.configureBlocking(false);                    // 選擇器註冊監聽的事件,同時制定關聯的附件                    socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE,                            ByteBuffer.allocate(BUFSIZE));                }                // 客戶端通道已經準備好了讀取資料到 buffer                if(selectionKey.isReadable()) {                    // 讀取程式碼                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();                    // 獲取對應的附件資訊                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();                    long bufferRead = socketChannel.read(byteBuffer);                    //客戶端關閉的連結。可以安全關閉                    if(bufferRead == -1) {                        socketChannel.close();                    } else {                        // 緩衝區讀取到了資料,將其感興趣的操作設定為可讀可寫。                        selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);                        // 列印讀取的內容                        System.out.println("Server read: " + new String(byteBuffer.array()));                    }                }                // 寫入處理                if(selectionKey.isValid()                    && selectionKey.isWritable()) {                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();                    // 獲取附件                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();                    // 重置緩衝區,準備將資料寫入到通道                    byteBuffer.flip();                    socketChannel.write(byteBuffer);                    //Tells whether there are any elements between the current position and the limit.                    // 如果已經全部寫入到通道,則將該通道感興趣的操作標識為讀                    if(!byteBuffer.hasRemaining()) {                        selectionKey.interestOps(SelectionKey.OP_READ);                    }                    // 為讀取更多的資料騰出空間                    byteBuffer.compact();                }                // 手動刪除                selectionKeySetIter.remove();            }        }    }}
客戶端程式碼實現
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.concurrent.TimeUnit;/** * @author binbin.hou * @since 1.0.0 */public class NioTcpClient {    /**     * 監聽的埠號     */    private static final int PORT = 18888;    public static void main(String[] args) throws IOException, InterruptedException {        //1. 設定為非阻塞        SocketChannel socketChannel = SocketChannel.open();        socketChannel.configureBlocking(false);        socketChannel.connect(new InetSocketAddress(PORT));        //2. 連線中...        while (!socketChannel.finishConnect()) {            System.out.println(".");            TimeUnit.SECONDS.sleep(1);        }        System.out.println("\n");        //3. 寫入/讀取資訊        String info = "hello nio test";        ByteBuffer readBuffer = ByteBuffer.allocate(info.length());        ByteBuffer writeBuffer = ByteBuffer.wrap(info.getBytes());        int totalReceivedBytes = 0;        int receivedBytes = 0;        while (totalReceivedBytes < info.length()) {            // 迴圈寫入            while (writeBuffer.hasRemaining()) {                socketChannel.write(writeBuffer);            }            receivedBytes = socketChannel.read(readBuffer);            // 說明服務端中斷            if(receivedBytes == -1) {                throw new RuntimeException("Server has been shut done.");            }            totalReceivedBytes += receivedBytes;        }        System.out.println("Client received from server: " + new String(readBuffer.array()));        socketChannel.close();    }}
測試執行服務端服務端
Server started listen on: 18888
執行客戶端客戶端
Client received from server: hello nio test
服務端
...Server read: hello nio test                                                                                                               ...
JDK AIO

jdk7中新增了一些與檔案(網路)I/O相關的一些api。這些API被稱為NIO.2,或稱為AIO(Asynchronous I/O)。

AIO最大的一個特性就是非同步能力,這種能力對socket與檔案I/O都起作用。

實現方式Future 方式

即提交一個 I/O 操作請求(accept/read/write),返回一個 Future。

然後您可以對 Future 進行檢查(呼叫get(timeout)),確定它是否完成,或者阻塞 IO 操作直到操作正常完成或者超時異常。

使用 Future 方式很簡單,需要注意的是,因為Future.get()是同步的,所以如果不仔細考慮使用場合,使用 Future 方式可能很容易進入完全同步的程式設計模式,從而使得非同步操作成為一個擺設。

如果這樣,那麼原來舊版本的 Socket API 便可以完全勝任,大可不必使用非同步 I/O.

Callback 方式

即提交一個 I/O 操作請求,並且指定一個 CompletionHandler。

當非同步 I/O 操作完成時,便傳送一個通知,此時這個 CompletionHandler 物件的 completed 或者 failed 方法將會被呼叫。

效能

因為AIO的實施需充分呼叫OS參與,IO需要作業系統支援、併發也同樣需要作業系統的支援,所以效能方面不同作業系統差異會比較明顯。

Future 實現方式Server
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class AioFutureServer {    private static final int DEFAULT_PORT = 12345;    private AsynchronousServerSocketChannel serverSocketChannel;    public AioFutureServer() throws IOException {        serverSocketChannel = AsynchronousServerSocketChannel.open();        serverSocketChannel.bind(new InetSocketAddress(DEFAULT_PORT));        System.out.println("Server listen on port: " + DEFAULT_PORT);    }    public void startWithFuture() throws InterruptedException,            ExecutionException, TimeoutException {        while (true) {            // 迴圈接收客戶端請求            Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();            // get() 是為了確保 accept 到一個連線            AsynchronousSocketChannel socket = future.get();            handleWithFuture(socket);        }    }    /**     * 處理未來的資訊     * @param channel 非同步客戶端     */    private void handleWithFuture(AsynchronousSocketChannel channel) throws InterruptedException, ExecutionException, TimeoutException {        ByteBuffer readBuf = ByteBuffer.allocate(8);        readBuf.clear();        // 一次可能讀不完        while (true) {            //get 是為了確保 read 完成,超時時間可以有效避免DOS攻擊,如果客戶端一直不傳送資料,則進行超時處理            Integer integer = channel.read(readBuf).get(10, TimeUnit.SECONDS);            System.out.println("read: " + integer);            if (integer == -1) {                break;            }            readBuf.flip();            System.out.println("received: " + Charset.forName("UTF-8").decode(readBuf));            readBuf.clear();        }    }    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException, TimeoutException {        new AioFutureServer().startWithFuture();    }}
客戶端
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class AioClient {    private static final int DEFAULT_PORT = 12345;    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();        client.connect(new InetSocketAddress("localhost", DEFAULT_PORT)).get();        client.write(ByteBuffer.wrap("123456789".getBytes()));    }}
測試啟動服務端
Server listen on port: 12345
啟動客戶端

服務端日誌

read: 8received: 12345678read: 1received: 9Exception in thread "main" java.util.concurrent.ExecutionException: java.io.IOException: 指定的網路名不再可用。
Callback 模式服務端
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;import java.util.concurrent.TimeUnit;public class AioCompletionServer {    private static final int DEFAULT_PORT = 12345;    private AsynchronousServerSocketChannel serverSocketChannel;    public AioCompletionServer() throws IOException {        serverSocketChannel = AsynchronousServerSocketChannel.open();        serverSocketChannel.bind(new InetSocketAddress(DEFAULT_PORT));        System.out.println("Server listen on port: " + DEFAULT_PORT);    }    /**     * 使用回撥的方式     */    public void startWithCompletionHandler() {        serverSocketChannel.accept(null,                new CompletionHandler<AsynchronousSocketChannel, Object>() {                    @Override                    public void completed(AsynchronousSocketChannel result, Object attachment) {                        // 再此接收客戶端連線                        serverSocketChannel.accept(null, this);                        // 處理結果                        handleWithCompletionHandler(result);                    }                    @Override                    public void failed(Throwable exc, Object attachment) {                        exc.printStackTrace();                    }                });    }    /**     * 處理非同步的結果     * @param channel 客戶端通道     */    private void handleWithCompletionHandler(final AsynchronousSocketChannel channel) {        try {            final long timeout = 10L;            final ByteBuffer buffer = ByteBuffer.allocate(8);            // 再次讀取,還是一種回撥的方式。            channel.read(buffer, timeout, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {                @Override                public void completed(Integer result, Object attachment) {                    System.out.println("read:" + result);                    if (result == -1) {                        try {                            channel.close();                        } catch (IOException e) {                            e.printStackTrace();                        }                        return;                    }                    buffer.flip();                    System.out.println("received message:" + Charset.forName("UTF-8").decode(buffer));                    buffer.clear();                    // 遞迴呼叫,直到結束為止。                    channel.read(buffer, timeout, TimeUnit.SECONDS, null, this);                }                @Override                public void failed(Throwable exc, Object attachment) {                    exc.printStackTrace();                }            });        } catch (Exception e) {            e.printStackTrace();        }    }    public static void main(String[] args) throws IOException, InterruptedException {        new AioCompletionServer().startWithCompletionHandler();        // 沉睡等待處理。        TimeUnit.SECONDS.sleep(100);    }}
客戶端

同上

小結

本文講述了 jdk 實現的 bio/nio/aio 的方式,你是否會感覺 jdk 中的 api 設計過於複雜呢?

下一節我們將透過 netty 框架實現上述功能,並講述我們為什麼要選擇 netty 作為網路開發的基本工具。

我是老馬,期待與你的下次相遇。

30
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 為什麼選擇 Netty?Netty 入門教程