首頁>技術>

實戰:在Java中實現常用網路I/O模型

Java從初創之日起,就是為網路而生的。隨著網際網路應用的發展,Java也被越來越多的企業所採用。本節演示瞭如何基於Java實現常用網路I/O模型。

Java OIO

早期的Java提供java.net包用於開發網路應用,這類API也被稱為Java OIO(Old-blocking I/O,阻塞I/O)。以下演示使用java.net包及java.io來開發Echo協議的客戶端及伺服器的過程。

Echo協議是指把接收到的訊息按照原樣返回,其主要用於檢測和除錯網路。這個協議可以基於TCP/UDP用於伺服器檢測埠7有無訊息。

1.實戰:開發Echo協議的伺服器

以下是使用原生java.net包來開發Echo協議的伺服器的示例。

package com.waylau.java.demo.net;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.ServerSocket;import java.net.Socket;public class BlockingEchoServer {public static int DEFAULT_PORT = 7;/*** @param args*/public static void main(String[] args) {int port;try {port = Integer.parseInt(args[0]);} catch (RuntimeException ex) {port = DEFAULT_PORT;}ServerSocket serverSocket = null;try {// 伺服器監聽serverSocket = new ServerSocket(port);System.out.println("BlockingEchoServer已啟動,埠:" + port);} catch (IOException e) {System.out.println("BlockingEchoServer啟動異常,埠:" + port);System.out.println(e.getMessage());}// Java 7 try-with-resource語句try (// 接受客戶端建立連結,生成Socket例項Socket clientSocket = serverSocket.accept();PrintWriter out =new PrintWriter(clientSocket.getOutputStream(), true);// 接收客戶端的訊息BufferedReader in =new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) {String inputLine;while ((inputLine = in.readLine()) != null) {// 傳送訊息給客戶端out.println(inputLine);System.out.println("BlockingEchoServer -> "+ clientSocket.getRemoteSocketAddress() + ":" + inputLine);}} catch (IOException e) {System.out.println("BlockingEchoServer異常!" + e.getMessage());}}}

上述例子BlockingEchoServer實現了Echo協議。BlockingEchoServer使用了java.net包中的Socket和ServerSocket類庫,這兩個類庫主要用於開發基於TCP的應用。如果想要開發UDP的應用,則需要使用DatagramSocket類。

ServerSocket用於伺服器端,而Socket是建立網路連線時使用的。在客戶端連線伺服器成功時,客戶端和伺服器端都會產生一個Socket例項,透過操作這個例項,來完成所需的會話。對於一個網路連線來說,Socket是平等的,並沒有差別,不因為在伺服器端或在客戶端而產生不同的級別,不管是Socket還是ServerSocket,它們的工作都是透過Socket類和其子類來完成的。

執行BlockingEchoServer,可以看到控制檯輸出內容如下。

BlockingEchoServer已啟動,埠:7

2.實戰:開發Echo協議的客戶端

以下是使用原生java.net包來開發Echo協議的客戶端的示例。

package com.waylau.java.demo.net;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;import java.net.UnknownHostException;public class BlockingEchoClient {/*** @param args*/public static void main(String[] args) {if (args.length != 2) {System.err.println("用法: java BlockingEchoClient <host name> <port number>");System.exit(1);}String hostName = args[0];int portNumber = Integer.parseInt(args[1]);try (Socket echoSocket = new Socket(hostName, portNumber);PrintWriter out =new PrintWriter(echoSocket.getOutputStream(), true);BufferedReader in =new BufferedReader(new InputStreamReader(echoSocket.getInputStream()));BufferedReader stdIn =new BufferedReader(new InputStreamReader(System.in))) {String userInput;while ((userInput = stdIn.readLine()) != null) {out.println(userInput);System.out.println("echo: " + in.readLine());}} catch (UnknownHostException e) {System.err.println("不明主機,主機名為: " + hostName);System.exit(1);} catch (IOException e) {System.err.println("不能從主機中獲取I/O,主機名為:" +hostName);System.exit(1);}}}

BlockingEchoClient的Socket的使用與BlockingEchoServer的Socket的使用基本類似。如果你本地的JDK版本是11以上,則可以跳過編譯階段直接執行原始碼,命令如下。

$ java BlockingEchoClient.java localhost 7

從JDK 11開始,可以直接執行啟動Java原始碼檔案。有關Java的最新特性,可見筆者所著的《Java核心程式設計》。

當BlockingEchoClient客戶端與BlockingEchoServer伺服器建立了連線之後,客戶端就可以與伺服器進行互動了。

當我們在客戶端輸入“a”字元時,伺服器也會將“a”傳送回客戶端,客戶端輸入的任何內容,伺服器也會原樣返回。

BlockingEchoServer控制檯輸出內容如下。

BlockingEchoServer已啟動,埠:7BlockingEchoServer -> /127.0.0.1:52831:aBlockingEchoServer -> /127.0.0.1:52831:hello waylau

3.java.net包API的缺點

BlockingEchoClient和BlockingEchoServer程式碼只是一個簡單的示例,如果要建立一個複雜的客戶端/伺服器協議,仍然需要大量的樣板程式碼,並且要求開發人員必須掌握相當多的底層技術細節才能使它整個流暢地執行起來。Socket和ServerSocket類庫的API只支援由本地系統套接字型檔提供的所謂的阻塞函式,因此客戶端與伺服器的通訊是阻塞的,並且要求每個新加入的連線,必須在伺服器中建立一個新的Socket例項。這極大消耗了伺服器的效能,並且也使得連線數受到了限制。

BlockingEchoClient客戶端與BlockingEchoServer伺服器所實現的方式是阻塞的。

那麼Java是否可以實現非阻塞的I/O程式呢?答案是肯定的。

Java NIO

從Java 1.4開始,Java提供了NIO(New I/O),用來替代標準JavaI/O API(3.5.1小節所描述的早期的Java網路程式設計API)。Java NIO也被稱為“Non-blocking I/O”,提供了非阻塞I/O的方式,用法與標準I/O有非常大的差異。

Java NIO提供了以下3個核心概念。

·通道(Channel)和緩衝區(Buffer):標準的I/O是基於位元組流和字元流進行操作的,而NIO是基於通道和緩衝區進行操作的,資料總是從通道讀取到緩衝區,或者從緩衝區寫入通道。

·非阻塞I/O(Non-blocking I/O):Java NIO可以讓你非阻塞地使用I/O,例如,當執行緒從通道讀取資料到緩衝區時,執行緒還可以進行其他事情。當資料被寫入緩衝區時,執行緒可以繼續處理它。從緩衝區寫入通道也類似。

·選擇器(Selector):Java NIO引入了選擇器的概念,選擇器用於監聽多個通道的事件(比如連線開啟、資料到達)。因此,單個的執行緒可以監聽多個數據通道,這極大提升了單機的併發能力。

Java NIO API位於java.nio包下。下面介紹Java NIO版本實現的支援Echo協議的客戶端及伺服器。

1.實戰:開發NIO版本的Echo伺服器

下面是使用原生Java NIO API來開發Echo協議的伺服器的示例。package com.waylau.java.demo.nio;

import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NonBlockingEchoServer {public static int DEFAULT_PORT = 7;/*** @param args*/public static void main(String[] args) {int port;try {port = Integer.parseInt(args[0]);} catch (RuntimeException ex) {port = DEFAULT_PORT;}ServerSocketChannel serverChannel;Selector selector;try {serverChannel = ServerSocketChannel.open();InetSocketAddress address = new InetSocketAddress(port);serverChannel.bind(address);serverChannel.configureBlocking(false);selector = Selector.open();serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("NonBlockingEchoServer已啟動,埠:" + port);} catch (IOException ex) {ex.printStackTrace();return;}while (true) {try {selector.select();} catch (IOException e) {System.out.println("NonBlockingEchoServer異常!" + e.getMessage());}Set<SelectionKey> readyKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = readyKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();try {// 可連線if (key.isAcceptable()) {ServerSocketChannel server =(ServerSocketChannel) key.channel();SocketChannel client = server.accept();System.out.println("NonBlockingEchoServer接受客戶端的連線:"+ client);// 設定為非阻塞client.configureBlocking(false);// 客戶端註冊到SelectorSelectionKey clientKey = client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ);// 分配快取區ByteBuffer buffer = ByteBuffer.allocate(100);clientKey.attach(buffer);}// 可讀if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer output = (ByteBuffer) key.attachment();client.read(output);System.out.println(client.getRemoteAddress()+ " -> NonBlockingEchoServer:" + output.toString());key.interestOps(SelectionKey.OP_WRITE);}// 可寫if (key.isWritable()) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer output = (ByteBuffer) key.attachment();output.flip();client.write(output);System.out.println("NonBlockingEchoServer -> "+ client.getRemoteAddress() + ":" + output.toString());output.compact();key.interestOps(SelectionKey.OP_READ);}}ca tch (IOException ex) {key.cancel();try {key.channel().close();} catch (IOException cex) {}}}}}}

上述例子NonBlockingEchoServer實現了Echo協議,ServerSocketChannel與ServerSocket的職責類似。相比較而言,ServerSocket讀和寫操作都是同步阻塞的,在面對高併發的場景時,需要消耗大量的執行緒來維持連線。CPU在大量的執行緒之間頻繁切換,效能損耗很大。一旦單機的連線超過1萬,甚至達到幾萬的時候,伺服器的效能會急劇下降。

NIO的Selector卻很好地解決了這個問題,用主執行緒(一個執行緒或者是CPU個數的執行緒)保持所有的連線,管理和讀取客戶端連線的資料,將讀取的資料交給後面的執行緒處理,後續執行緒處理完業務邏輯後,將結果交給主執行緒傳送響應給客戶端,這樣少量的執行緒就可以處理大量連線的請求。

上述NonBlockingEchoServer例子,使用Selector註冊Channel,然後呼叫它的select方法。這個select方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,執行緒就可以處理這些事件。事件包括例如有新連線進來(OP_ACCEPT)、資料接收(OP_READ)等。

執行,可以看到控制檯輸出內容如下。

NonBlockingEchoServer已啟動,埠:7

2.實戰:開發NIO版本的Echo客戶端

下面是使用原生NIO API來開發Echo協議的客戶端的示例。

package com.waylau.java.demo.nio;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;public class NonBlockingEchoClient {/*** @param args*/public static void main(String[] args) {if (args.length != 2) {System.err.println("用法: java NonBlockingEchoClient <host name> <portnumber>");System.exit(1);}String hostName = args[0];int portNumber = Integer.parseInt(args[1]);SocketChannel socketChannel = null;try {socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress(hostName, portNumber));} catch (IOException e) {System.err.println("NonBlockingEchoClient異常: " + e.getMessage());System.exit(1);}ByteBuffer writeBuffer = ByteBuffer.allocate(32);ByteBuffer readBuffer = ByteBuffer.allocate(32);try (BufferedReader stdIn =new BufferedReader(new InputStreamReader(System.in))) {String userInput;while ((userInput = stdIn.readLine()) != null) {writeBuffer.put(userInput.getBytes());writeBuffer.flip();writeBuffer.rewind();// 寫訊息到管道socketChannel.write(writeBuffer);// 管道讀訊息socketChannel.read(readBuffer);// 清理緩衝區writeBuffer.clear();readBuffer.clear();System.out.println("echo: " + userInput);}} catch (UnknownHostException e) {System.err.println("不明主機,主機名為: " + hostName);System.exit(1);} catch (IOException e) {System.err.println("不能從主機中獲取I/O,主機名為:"+ hostName);System.exit(1);}}}

NonBlockingEchoClient的SocketChannel的使用與NonBlockingEchoServer的SocketChannel的使用基本類似。啟動客戶端,命令如下。

$ java NonBlockingEchoClient.java localhost 7

當NonBlockingEchoClient客戶端與NonBlockingEchoServer伺服器建立連線之後,客戶端就可以與伺服器進行互動了。

當我們在客戶端輸入“a”字元時,伺服器也會將“a”傳送回客戶端,客戶端輸入的任務內容,伺服器也會原樣返回。

NonBlockingEchoServer控制檯輸出內容如下。

NonBlockingEchoServer已啟動,埠:7

NonBlockingEchoServer接受客戶端的連線:

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7 remote=/127.0.0.1:56515]NonBlockingEchoServer -> /127.0.0.1:56515:java.nio.HeapByteBuffer[pos=0 lim=0 cap=100]/127.0.0.1:56515 -> NonBlockingEchoServer:java.nio.HeapByteBuffer[pos=1 lim=100 cap=100]NonBlockingEchoServer -> /127.0.0.1:56515:java.nio.HeapByteBuffer[pos=1 lim=1 cap=100]/127.0.0.1:56515 -> NonBlockingEchoServer:java.nio.HeapByteBuffer[pos=12 lim=100 cap=100]NonBlockingEchoServer -> /127.0.0.1:56515:java.nio.HeapByteBuffer[pos=12 lim=12 cap=100]

Java AIO

從Java 1.7開始,Java提供了AIO(非同步I/O)。Java AIO也被稱為“NIO.2”,提供了非同步I/O的方式,用法與標準I/O有非常大的差異。

Java AIO採用“釋出/訂閱”模式,即應用程式向作業系統註冊I/O監聽,然後繼續做自己的事情。當作業系統發生I/O事件,並且準備好資料後,再主動通知應用程式,觸發相應的函式。

與同步I/O一樣,Java的AIO也是由作業系統進行支援的。微軟的Windows系統提供了一種非同步I/O技術——I/O完成埠(I/O CompletionPort,IOCP),而在Linux平臺下並沒有這種非同步I/O技術,所以使用的是epoll對非同步I/O進行模擬。

Java AIO API同Java NIO一樣,都是位於java.nio包下。下面介紹Java AIO版本實現的支援Echo協議的客戶端及伺服器。

1.實戰:開發AIO版本的Echo伺服器

下面是使用原生Java AIO API來開發Echo協議的伺服器的示例。

package com.waylau.java.demo.aio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.StandardSocketOptions;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class AsyncEchoServer {public static int DEFAULT_PORT = 7;/*** @param args*/public static void main(String[] args) {int port;try {port = Integer.parseInt(args[0]);} catch (RuntimeException ex) {port = DEFAULT_PORT;}AsynchronousServerSocketChannel serverChannel;try {serverChannel = AsynchronousServerSocketChannel.open();InetSocketAddress address = new InetSocketAddress(port);serverChannel.bind(address);// 設定闡述serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);System.out.println("AsyncEchoServer已啟動,埠:" + port);} catch (IOException ex) {ex.printStackTrace();return;}while (true) {// 可連線Future<AsynchronousSocketChannel> future = serverChannel.accept();AsynchronousSocketChannel socketChannel = null;try {socketChannel = future.get();} catch (InterruptedException | ExecutionException e) {System.out.println("AsyncEchoServer異常!" + e.getMessage());}System.out.println("AsyncEchoServer接受客戶端的連線:" + socketChannel);// 分配快取區ByteBuffer buffer = ByteBuffer.allocate(100);try {while (socketChannel.read(buffer).get() != -1) {buffer.flip();socketChannel.write(buffer).get();System.out.println("AsyncEchoServer -> " +socketChannel.getRemoteAddress() +":" + buffer.toString());if (buffer.hasRemaining()) {buffer.compact();} else {buffer.clear();}}socketChannel.close();} catch (InterruptedException| ExecutionException| IOException e) {System.out.println("AsyncEchoServer異常!"+ e.getMessage());}}}}

上述例子AsyncEchoServer實現了Echo協議,AsynchronousServerSocketChannel與Server SocketChannel的職責類似。

相比較而言,AsynchronousServerSocketChannel實現了非同步的I/O,而無須再使用Selector,因此整體程式碼比ServerSocketChannel要簡化很多。

執行程式碼後可以看到控制檯輸出內容如下。

AsyncEchoServer已啟動,埠:7

2.實戰:開發AIO版本的Echo客戶端

下面是使用原生AIO API來開發Echo協議的客戶端的示例。

package com.waylau.java.demo.aio;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;public class AsyncEchoClient {/*** @param args*/public static void main(String[] args) {if (args.length != 2) {System.err.println("用法: java AsyncEchoClient <host name> <port number>");System.exit(1);}String hostName = args[0];int portNumber = Integer.parseInt(args[1]);AsynchronousSocketChannel socketChannel = null;try {socketChannel = AsynchronousSocketChannel.open();socketChannel.connect(new InetSocketAddress(hostName, portNumber));} catch (IOException e) {System.err.println("AsyncEchoClient異常: "+ e.getMessage());System.exit(1);}ByteBuffer writeBuffer = ByteBuffer.allocate(32);ByteBuffer readBuffer = ByteBuffer.allocate(32);try (BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {String userInput;while ((userInput = stdIn.readLine()) != null) {writeBuffer.put(userInput.getBytes());writeBuffer.flip();writeBuffer.rewind();// 寫訊息到管道socketChannel.write(writeBuffer);// 管道讀訊息socketChannel.read(readBuffer);// 清理緩衝區writeBuffer.clear();readBuffer.clear();System.out.println("echo: " + userInput);}} catch (UnknownHostException e) {System.err.println("不明主機,主機名為: " + hostName);System.exit(1);} catch (IOException e) {System.err.println("不能從主機中獲取I/O,主機名為:"+ hostName);System.exit(1);}}}

AsyncEchoClient的AsynchronousSocketChannel的使用與NonBlockingEchoClient的SocketChannel的使用基本類似。啟動客戶端,命令如下。

$ java AsyncEchoClient.java localhost 7

當AsyncEchoClient客戶端與AsyncEchoServer伺服器建立連線之後,客戶端就可以與伺服器進行互動了。

當我們在客戶端輸入“a”字元時,伺服器也會將“a”傳送回客戶端,客戶端輸入的任務內容,伺服器也會原樣返回。

AsyncEchoServer控制檯輸出內容如下。

AsyncEchoServer已啟動,埠:7

AsyncEchoServer接受客戶端的連線:

10
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 學完大佬的分散式系統核心:程序間的通訊,事件驅動後,我頓悟了