作者:DavidDing來源:https://zhuanlan.zhihu.com/p/56135195
一、前言
最近公司在預研裝置app端與服務端的互動方案,主要方案有:
服務端和app端通過阿里iot套件實現訊息的收發;服務端通過極光推送主動給app端推訊息,app通過rest介面與服務端進行互動;服務端與app通過mqtt訊息佇列來實現彼此的訊息互動;服務端與app通過原生socket長連線互動。雖然上面的一些成熟方案肯定更利於上生產環境,但它們通訊基礎也都是socket長連線,所以本人主要是預研了一下socket長連線的互動,寫了個簡單demo,採用了BIO的多執行緒方案,集成了springboot,實現了自定義簡單協議,心跳機制,socket客戶端身份強制驗證,socket客戶端斷線獲知等功能,並暴露了一些介面,可通過介面簡單實現客戶端與服務端的socket互動。
Github原始碼:
https://github.com/DavidDingXu/springboot-socket-demo二、IO通訊模型
1. IO通訊模型簡介
IO通訊模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路複用IO以及非同步IO。
該部分內容總結自專欄文章:
https://blog.csdn.net/yinwenjie/column/info/sys-communication/31.1 阻塞式同步IO
BIO就是:blocking IO。最容易理解、最容易實現的IO工作方式,應用程式向作業系統請求網路IO操作,這時應用程式會一直等待;另一方面,作業系統收到請求後,也會等待,直到網路上有資料傳到監聽埠;作業系統在收集資料後,會把資料傳送給應用程式;最後應用程式受到資料,並解除等待狀態。
BIO通訊示意圖
1.2 非阻塞式同步IO
這種模式下,應用程式的執行緒不再一直等待作業系統的IO狀態,而是在等待一段時間後,就解除阻塞。如果沒有得到想要的結果,則再次進行相同的操作。這樣的工作方式,暴增了應用程式的執行緒可以不會一直阻塞,而是可以進行一些其他工作。
非阻塞式IO示意圖
1.3 多路複用IO(阻塞+非阻塞)
多路複用IO示意圖
目前流程的多路複用IO實現主要包括四種:select、poll、epoll、kqueue。下表是他們的一些重要特性的比較:
1.4 非同步IO
非同步IO則是採用“訂閱-通知”模式:即應用程式向作業系統註冊IO監聽,然後繼續做自己的事情。當作業系統發生IO事件,並且準備好資料後,在主動通知應用程式,觸發相應的函式。
非同步IO示意圖
和同步IO一樣,非同步IO也是由作業系統進行支援的。微軟的windows系統提供了一種非同步IO技術:IOCP(I/O Completion Port,I/O完成埠);
Linux下由於沒有這種非同步IO技術,所以使用的是epoll(上文介紹過的一種多路複用IO技術的實現)對非同步IO進行模擬。
2. Java對IO模型的支援
Java對阻塞式同步IO的支援主要是java.net包中的Socket套接字實現;Java中非阻塞同步IO模式通過設定serverSocket.setSoTimeout(100);即可實現;Java 1.4中引入了NIO框架(java.nio包)可以構建多路複用、同步非阻塞IO程式;Java 7中對NIO進行了進一步改進,即NIO2,引入了非同步非阻塞IO方式。由於是要實現socket長連線的demo,主要關注其一些實現注意點及方案,所以本demo採用了BIO的多執行緒方案,該方案程式碼比較簡單、直觀,引入了多執行緒技術後,IO的處理吞吐量也大大提高了。下面是BIO多執行緒方案server端的簡單實現:
public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(83); try { while(true) { Socket socket = null; socket = serverSocket.accept(); //這邊獲得socket連線後開啟一個執行緒監聽處理資料 SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { log.error("Socket accept failed. Exception:{}", e.getMessage()); } finally { if(serverSocket != null) { serverSocket.close(); } } }}@slf4jclass SocketServerThread implements Runnable { private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen; StringBuffer message = new StringBuffer(); BIORead:while(true) { try { while((realLen = in.read(contextBytes, 0, maxLen)) != -1) { message.append(new String(contextBytes , 0 , realLen)); /* * 我們假設讀取到“over”關鍵字, * 表示客戶端的所有資訊在經過若干次傳送後,完成 * */ if(message.indexOf("over") != -1) { break BIORead; } } } //下面列印資訊 log.info("伺服器(收到來自於埠:" + sourcePort + "的資訊:" + message); //下面開始傳送資訊 out.write("回發響應資訊!".getBytes()); //關閉 out.close(); in.close(); this.socket.close(); } catch(Exception e) { log.error("Socket read failed. Exception:{}", e.getMessage()); } }}
三、注意點及實現方案
1. TCP粘包/拆包
1.1 問題說明
假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,故可能存在以下4種情況。 1. 服務端分兩次讀取到了兩個獨立的資料包,分別是D1和D2,沒有粘包和拆包; 2. 服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包; 3. 服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩餘內容,這被稱為TCP拆包; 4. 服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩餘內容D1_2和D2包的整包。如果此時服務端TCP接收滑窗非常小,而資料包D1和D2比較大,很有可能會發生第五種可能,即服務端分多次才能將D1和D2包接收完全,期間發生多次拆包。
1.2 解決思路
由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下: 1. 訊息定長,例如每個報文的大小為固定長度200位元組,如果不夠,空位補空格; 2. 在包尾增加回車換行符進行分割,例如FTP協議; 3. 將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度(或者訊息體長度)的欄位,通常設計思路為訊息頭的第一個欄位使用int32來表示訊息的總長度; 4. 更復雜的應用層協議。
1.3 demo方案
作為socket長連線的demo,使用了上述的解決思路2,即在包尾增加回車換行符進行資料的分割,同時整體資料使用約定的Json體進行作為訊息的傳輸格式。
使用換行符進行資料分割,可如下進行資料的單行讀取:
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));String message;while ((message = reader.readLine()) != null) {//....}
可如下進行資料的單行寫入:
PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);writer.println(message);
Json訊息格式如下:
(1) 服務端接收訊息實體類
@Datapublic class ServerReceiveDto implements Serializable { private static final long serialVersionUID = 6600253865619639317L; /** * 功能碼 0 心跳 1 登陸 2 登出 3 傳送訊息 */ private Integer functionCode; /** * 使用者id */ private String userId; /** * 這邊假設是string的訊息體 */ private String message;}
(2) 服務端傳送訊息實體類
@Datapublic class ServerSendDto implements Serializable { private static final long serialVersionUID = -7453297551797390215L; /** * 狀態碼 20000 成功,否則有errorMessage */ private Integer statusCode; private String message; /** * 功能碼 */ private Integer functionCode; /** * 錯誤訊息 */ private String errorMessage;}
(3) 客戶端傳送訊息實體類
@Datapublic class ClientSendDto implements Serializable { private static final long serialVersionUID = 97085384412852967L; /** * 功能碼 0 心跳 1 登陸 2 登出 3 傳送訊息 */ private Integer functionCode; /** * 使用者id */ private String userId; /** * 這邊假設是string的訊息體 */ private String message;}
2. 客戶端或服務端掉線檢測功能
2.1 實現思路
通過自定義心跳包來實現掉線檢測功能,具體思路如下:
客戶端連線上服務端後,在服務端會維護一個線上客戶端列表。客戶端每隔一段時間,向服務端傳送一個心跳包,服務端受收到包以後,會更新客戶端最近一次線上時間。一旦服務端超過規定時間沒有接收到客戶端發來的包,則視為掉線。
2.2 程式碼實現
維護一個客戶端map,其中key代表使用者的唯一id(使用者唯一id的身份驗證下面會說明),value代表使用者對應的一個實體
/** * 儲存當前由使用者資訊活躍的的socket執行緒 */private ConcurrentMap<String, Connection> existSocketMap = new ConcurrentHashMap<>();
其中Connection物件包含的資訊如下:
@Slf4j@Datapublic class Connection { /** * 當前的socket連線例項 */ private Socket socket; /** * 當前連線執行緒 */ private ConnectionThread connectionThread; /** * 當前連線是否登陸 */ private boolean isLogin; /** * 儲存當前的user資訊 */ private String userId; /** * 建立時間 */ private Date createTime; /** * 最後一次更新時間,用於判斷心跳 */ private Date lastOnTime;}
主要關注其中的lastOnTime欄位,每次服務端接收到標識是心跳資料,會更新當前的lastOnTime欄位,程式碼如下:
if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) { //心跳型別 connection.setLastOnTime(new Date()); //傳送同樣的心跳資料給客戶端 ServerSendDto dto = new ServerSendDto(); dto.setFunctionCode(FunctionCodeEnum.HEART.getValue()); connection.println(JSONObject.toJSONString(dto));}
額外會有一個監測程序,以一定頻率來監測上述維護的map中的每一個Connection物件,如果當前時間與lastOnTime的時間間隔超過自定義的長度,則自動將其對應的socket連線關閉,程式碼如下:
Date now = new Date();Date lastOnTime = connectionThread.getConnection().getLastOnTime();long heartDuration = now.getTime() - lastOnTime.getTime();if (heartDuration > SocketConstant.HEART_RATE) { //心跳超時,關閉當前執行緒 log.error("心跳超時"); connectionThread.stopRunning();}
在上面程式碼中,服務端收到標識是心跳資料的時候,除了更新該socket對應的lastOnTime,還會同樣同樣心跳型別的資料給客戶端,客戶端收到標識是心跳資料的時候也會更新自己的lastOnTime欄位,同時也有一個心跳監測執行緒在監測當前的socket連線心跳是否超時
3. 客戶端身份獲知、強制身份驗證
3.1 實現思路
通過程式碼socket = serverSocket.accept()獲得的一個socket連線我們僅僅只能知道其客戶端的ip以及埠號,並不能獲知這個socket連線對應的到底是哪一個客戶端,因此必須得先獲得客戶端的身份並且驗證通過其身份才能讓其正常連線。
具體的實現思路是:
自定義一個登陸處理介面,當server端受到標識是使用者登陸的時候(此時會攜帶使用者資訊或者token,此處簡化為使用者id),呼叫使用者的登陸驗證,驗證通過的話則將該socket連線與使用者資訊繫結,設定其為已登入,並且封裝對應的物件放入前面提的客戶端map中,由此可獲得具體使用者對應的哪一個socket連線。
為了實現socket連線的強制驗證,在監測執行緒中,也會判斷當前使用者多長時間內沒有實現登入態,若超時則認為該socket連線為非法連線,主動關閉該socket連線。
3.2 程式碼實現
自定義登陸處理介面,這邊簡單以userId來判斷是否允許登陸:
public interface LoginHandler { /** * client登陸的處理函式 * * @param userId 使用者id * * @return 是否驗證通過 */ boolean canLogin(String userId);}
收到客戶端發來的資料時候的處理:
if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) { //登陸,身份驗證 String userId = receiveDto.getUserId(); if (socketServer.getLoginHandler().canLogin(userId)) { //設定使用者物件已登入狀態 connection.setLogin(true); connection.setUserId(userId); if (socketServer.getExistSocketMap().containsKey(userId)) { //存在已登入的使用者,傳送登出指令並主動關閉該socket Connection existConnection = socketServer.getExistSocketMap().get(userId); ServerSendDto dto = new ServerSendDto(); dto.setStatusCode(999); dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue()); dto.setErrorMessage("force logout"); existConnection.println(JSONObject.toJSONString(dto)); existConnection.getConnectionThread().stopRunning(); log.error("使用者被客戶端重入踢出,userId:{}", userId); } //新增到已登入map中 socketServer.getExistSocketMap().put(userId, connection);}
監測執行緒判斷使用者是否完成身份驗證:
if (!connectionThread.getConnection().isLogin()) { //還沒有使用者登陸成功 Date createTime = connectionThread.getConnection().getCreateTime(); long loginDuration = now.getTime() - createTime.getTime(); if (loginDuration > SocketConstant.LOGIN_DELAY) { //身份驗證超時 log.error("身份驗證超時"); connectionThread.stopRunning(); }}
4. socket異常處理與垃圾執行緒回收
4.1 實現思路
socket在讀取資料或者傳送資料的時候會出現各種異常,比如客戶端的socket已斷開連線(正常斷開或物理連線斷開等),但是服務端還在傳送資料或者還在接受資料的過程中,此時socket會丟擲相關異常,對於該異常的處理需要將自身的socket連線關閉,避免資源的浪費,同時由於是多執行緒方案,還需將該socket對應的執行緒正常清理。
4.2 程式碼實現
下面以server端傳送資料為例,該程式碼中加入了重試機制:
public void println(String message) { int count = 0; PrintWriter writer; do { try { writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true); writer.println(message); break; } catch (IOException e) { count++; if (count >= RETRY_COUNT) { //重試多次失敗,說明client端socket異常 this.connectionThread.stopRunning(); } } try { Thread.sleep(2 * 1000); } catch (InterruptedException e1) { log.error("Connection.println.IOException interrupt,userId:{}", userId); } } while (count < 3);}
上述呼叫的this.connectionThread.stopRunning()程式碼如下:
public void stopRunning() { //設定執行緒物件狀態,便於執行緒清理 isRunning = false; try { //異常情況需要將該socket資源釋放 socket.close(); } catch (IOException e) { log.error("ConnectionThread.stopRunning failed.exception:{}", e); }}
上述程式碼中設定了執行緒物件的狀態,下述程式碼在監測執行緒中執行,將沒有執行的執行緒給清理掉
/** * 儲存只要有socket處理的執行緒 */private List<ConnectionThread> existConnectionThreadList = Collections.synchronizedList(new ArrayList<>());/** * 中間list,用於遍歷的時候刪除 */private List<ConnectionThread> noConnectionThreadList = Collections.synchronizedList(new ArrayList<>());//...//刪除list中沒有用的thread引用existConnectionThreadList.forEach(connectionThread -> { if (!connectionThread.isRunning()) { noConnectionThreadList.add(connectionThread); }});noConnectionThreadList.forEach(connectionThread -> { existConnectionThreadList.remove(connectionThread); if (connectionThread.getConnection().isLogin()) { //說明使用者已經身份驗證成功了,需要刪除map this.existSocketMap.remove(connectionThread.getConnection().getUserId()); }});noConnectionThreadList.clear();
四、專案結構
由於使用了springboot框架來實現該demo,所以專案結構如下:
整體專案結構圖
socket工具包目錄如下:
socket工具包目錄
pom檔案主要添加了springboot的相關依賴,以及json工具和lombok工具等,依賴如下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency></dependencies>
自己寫的socket工具包的使用方式如下:
@Configuration@Slf4jpublic class SocketServerConfig {@Beanpublic SocketServer socketServer() { SocketServer socketServer = new SocketServer(60000); socketServer.setLoginHandler(userId -> { log.info("處理socket使用者身份驗證,userId:{}", userId); //使用者名稱中包含了dingxu則允許登陸 return userId.contains("dingxu"); }); socketServer.setMessageHandler((connection, receiveDto) -> log .info("處理socket訊息,userId:{},receiveDto:{}", connection.getUserId(), JSONObject.toJSONString(receiveDto))); socketServer.start(); return socketServer;}}
該demo中主要提供了以下幾個介面進行測試:
服務端:獲得當前使用者列表,傳送一個訊息客戶端:開始一個socket客戶端,傳送一個訊息,關閉一個socket客戶端,檢視已開啟的客戶端具體的postman檔案也放已在專案中,具體可點此連結獲得
demo中還提供了一個簡單壓測函式,如下:
@Slf4jpublic class SocketClientTest { public static void main(String[] args) { ExecutorService clientService = Executors.newCachedThreadPool(); String userId = "dingxu"; for (int i = 0; i < 1000; i++) { int index = i; clientService.execute(() -> { try { SocketClient client; client = new SocketClient(InetAddress.getByName("127.0.0.1"), 60000); //登陸 ClientSendDto dto = new ClientSendDto(); dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue()); dto.setUserId(userId + index); client.println(JSONObject.toJSONString(dto)); ScheduledExecutorService clientHeartExecutor = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, "socket_client+heart_" + r.hashCode())); clientHeartExecutor.scheduleWithFixedDelay(() -> { try { ClientSendDto heartDto = new ClientSendDto(); heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue()); client.println(JSONObject.toJSONString(heartDto)); } catch (Exception e) { log.error("客戶端異常,userId:{},exception:{}", userId, e.getMessage()); client.close(); } }, 0, 5, TimeUnit.SECONDS); while (true){ } } catch (Exception e) { log.error(e.getMessage()); } }); } }}
原始碼地址如下,僅供學習參考
github.com/DavidDingXu/springboot-socket-demo
五、參考
https://blog.csdn.net/yinwenjie/column/info/sys-communication/3https://blog.csdn.net/m0_37739193/article/details/78738253https://www.cnblogs.com/snaildev/p/7724867.html