前言
在上一篇文章中,我講了網路 IO 的基礎知識,本篇文章將從原始碼角度具體講解 DBLE 的網路模組:包括 DBLE 是如何處理 MySQL 包的,多路複用在 DBLE 中是如何實現的,以及請求的非同步化處理相關邏輯。
DBLE 是如何處理 MySQL 包的?我們將以客戶端連線 DBLE 為例,從原始碼角度講解 DBLE 的相關處理流程。
客戶端與 DBLE 建立連線的流程如下圖所示(因為 DBLE 實現了 MySQL 協議,所以與客戶端連線 MySQL 的流程一樣):
客戶端與 DBLE 建立連線的流程
主要包括以下四個步驟:
1、 客戶端發起 connect 連線;
2、服務端傳送握手包;
3、客戶端回覆握手包;
4、服務端返回 OK 包,表示連線建立完成,進入命令階段。
我們直接看原始碼:
1、DBLE 處理客戶端 connect
DBLE 處理客戶端 connect 的程式碼在 NIOAcceptor#run 方法中:
public void run() { //這裡的selector即IO多路複用選擇器,一個selector可以處理多個客戶端連線請求 final Selector tSelector = this.selector; for (; ; ) { try { tSelector.select(1000L); Set<SelectionKey> keys = tSelector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { //當連線有效且可接受時,處理客戶端連線 accept(); } else { key.cancel(); } } } catch (final Throwable e) { LOGGER.warn("caught Throwable err: ", e); } finally { keys.clear(); } } catch (Exception e) { LOGGER.info(getName(), e); } } }
可以看出上述方法呼叫了 accept 來接受客戶端發起的 TCP 連線,繼續看該類的 accept 方法:
private void accept() { SocketChannel channel = null; try { //與客戶端建立TCP連線 channel = serverChannel.accept(); channel.configureBlocking(false); NIOSocketWR socketWR = new NIOSocketWR(); FrontendConnection c = factory.make(channel, socketWR); socketWR.initFromConnection(c); c.setId(ID_GENERATOR.getId()); IOProcessor processor = DbleServer.getInstance().nextFrontProcessor(); c.setProcessor(processor); NIOReactor reactor = reactorPool.getNextReactor(); //將已建立好的連線註冊給NIOReactor reactor.postRegister(c); } catch (Exception e) { LOGGER.info(getName(), e); closeChannel(channel); } }
可以看出上述程式碼將客戶端發起建立的 TCP 連線註冊給了 NIOReactor 來管理。
到這裡 DBLE 對客戶端的 connect 已經處理完成了,他們之間已經完成了 TCP 連線的建立,同時 DBLE 將客戶端請求的連線註冊給了 NIOReactor。
2、服務端傳送握手包
接著上面我們繼續看 NIOReactor#postRegister 方法:
//該方法將連線放入佇列,並喚醒reactorR的selector,其中reactorR為NIOReactor的內部類RWvoid postRegister(AbstractConnection c) { reactorR.registerQueue.offer(c); reactorR.selector.wakeup(); }
我們直接看 RW 處理註冊佇列裡客戶端連線的程式碼,在 RW#register 方法中:
private void register(Selector finalSelector) { …… while ((c = registerQueue.poll()) != null) { try { //下面這行程式碼需要注意,該程式碼將連線註冊到了RW的selector多路複用選擇器中,使得該選擇器後續能夠讀取該連線傳送過來的資料 ((NIOSocketWR) c.getSocketWR()).register(finalSelector); //服務端傳送握手包的邏輯在下面這個方法中 c.register(); } …… }
AbstractConnection#register 方法中又呼叫了 AbstractService#register 方法,對於客戶端連線請求來講,這裡最終呼叫的是 MySQLFrontAuthService#register 方法:
public void register() throws IOException { //終於看到了greeting,該方法向客戶端傳送了握手包 greeting(); this.connection.getSocketWR().asyncRead(); }
MySQLFrontAuthService#greeting 方法實現了拼裝握手包,並將握手包傳送給客戶端的邏輯:
private void greeting() { // generate auth data byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // save auth data byte[] rand = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, rand, 0, rand1.length); System.arraycopy(rand2, 0, rand, rand1.length, rand2.length); this.seed = rand; HandshakeV10Packet hs = new HandshakeV10Packet(); hs.setPacketId(0); hs.setProtocolVersion(Versions.PROTOCOL_VERSION); // [0a] protocol version V10 hs.setServerVersion(Versions.getServerVersion()); hs.setThreadId(connection.getId()); hs.setSeed(rand1); hs.setServerCapabilities(getServerCapabilities()); int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset()); hs.setServerCharsetIndex((byte) (charsetIndex & 0xff)); hs.setServerStatus(2); hs.setRestOfScrambleBuff(rand2); hs.setAuthPluginName(pluginName.name().getBytes()); //這裡的呼叫即傳送握手包到客戶端 hs.write(connection); }
到這裡就完成了服務端傳送握手包的邏輯。
3、DBLE 處理客戶端的握手回覆包
服務端傳送了握手包給客戶端,客戶端收到後需要傳送握手回覆包過來了,一般該握手回覆包中會包含使用者相關資訊。
那麼 DBLE 如何讀取並處理客戶端傳送過來的握手回覆包呢?
相應的程式碼在 RW#run 方法中,那為什麼在這個方法中能處理客戶端傳送過來的資料呢?因為之前在處理客戶端連線的時候,已經把相應的連線註冊給了 RW 的多路複用選擇器,所以它當然能處理相應連線的資料了,記不得的同學可以看前面 RW#register 方法中的註釋。
RW#run 方法中處理客戶端傳送資料的主要程式碼如下:
public void run() { final Selector finalSelector = this.selector; Set<SelectionKey> keys = null; for (; ; ) { …… //當連線中有資料的時候,這裡會返回相應的selection keys keys = finalSelector.selectedKeys(); if (keys.size() == 0) { continue; } //對有相應事件的連線進行處理 executeKeys(keys); } …… }
我們繼續看 RW#executeKeys 方法(對程式碼做了一些精簡,但不影響理解):
private void executeKeys(Set<SelectionKey> keys) { for (SelectionKey key : keys) { AbstractConnection con = null; Object att = key.attachment(); con = (AbstractConnection) att; if (key.isValid() && key.isReadable()) { //這裡即為讀取客戶端傳送過來的資料 con.asyncRead(); } } }
跟著程式碼走,相應的處理邏輯在 NIOSocketWR#asyncRead 方法中:
public void asyncRead() throws IOException { ByteBuffer theBuffer = con.findReadBuffer(); //讀取客戶端傳送過來的資料到快取theBuffer中 int got = channel.read(theBuffer); //處理相應的資料 con.onReadData(got); }
AbstractConnection#onReadData 方法中又進一步呼叫了 AbstractService#handle 方法來處理資料,所以我們直接看 AbstractService#handle 方法:
public void handle(ByteBuffer dataBuffer) { this.sessionStart(); boolean hasReming = true; int offset = 0; while (hasReming) { //下面這行程式碼實際處理了客戶端傳過來的資料包,裡面包含計算包總長度、判斷讀取的資料包是否完整等邏輯 ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress); switch (result.getCode()) { //客戶端首次發來的握手包,所以是完整的資料包,進入這裡的處理邏輯,這裡將讀取的資料封裝成task任務,提交到佇列中,然後透過執行緒非同步處理 case REACH_END_BUFFER: connection.readReachEnd(); byte[] packetData = result.getPacketData(); if (packetData != null) { taskCreate(packetData); } dataBuffer.clear(); hasReming = false; break; case BUFFER_PACKET_UNCOMPLETE: connection.compactReadBuffer(dataBuffer, result.getOffset()); hasReming = false; break; case BUFFER_NOT_BIG_ENOUGH: connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength()); hasReming = false; break; case STLL_DATA_REMING: byte[] partData = result.getPacketData(); if (partData != null) { taskCreate(partData); } offset = result.getOffset(); continue; default: throw new RuntimeException("unknown error when read data"); } } }
到這裡 DBLE 完成了讀取客戶端傳送過來的握手包,並將它封裝成了非同步任務以備下一步處理。
4、DBLE 非同步處理任務並返回 OK 包
非同步是高效能的秘訣之一。上面DBLE將讀取到的資料封裝成了任務,然後交由執行緒非同步處理。
我們直接來看任務處理的相關程式碼,在 AbstractService#execute 方法中:
public void execute(ServiceTask task) { task.increasePriority(); handleData(task); }
對於客戶端握手回覆包的處理,最後呼叫的程式碼在 MySQLFrontAuthService#handleAuthPacket 方法中,所以對於該場景,我們直接看該方法的相關程式碼:
private void handleAuthPacket(byte[] data) { //將讀取到的資料轉換為AuthPacket AuthPacket auth = new AuthPacket(); auth.read(data); this.authPacket = auth; …… //檢查使用者名稱和密碼 auth(); …… }
上面的 auth 方法裡又呼叫了 MySQLFrontAuthService#checkForResult 方法,所以我們直接看該方法:
private void checkForResult(AuthResultInfo info) { …… AbstractService service = BusinessServiceFactory.getBusinessService(info, connection); connection.setService(service); //驗證通過後,拼裝併發送OK包給客戶端 MySQLPacket packet = new OkPacket(); packet.setPacketId(needAuthSwitched ? 4 : 2); packet.write(connection); …… }
到這裡,DBLE 已經處理完了握手回覆包,並返回 OK 包給客戶端,整個客戶端與 DBLE 的連線建立過程就結束了。
該過程結束後,將進入 MySQL 協議的 Command 階段,如果你透過命令列連線 DBLE 的話,即進入了下面的介面:
mysql>
是不是沒想到在進入這個命令介面前發生了這麼多……
多路複用在 DBLE 中是如何實現的其實這個問題的答案,如果仔細看前面程式碼章節的話就已經能夠知道了。DBLE 的多路複用其實就是透過 JAVA 的多路複用選擇器 Selector 來實現的,透過將連線註冊給 Selector,這樣只是在連線中有資料時候才進行讀取,能夠實現一個執行緒監聽多個連線。
請求的非同步化處理DBLE 在讀取完資料後,並沒有在當前執行緒中處理這些資料,而是將資料封裝成任務提交到佇列中去,然後透過另外的執行緒來進行處理,這即是請求非同步化處理,能夠極大的提高效能,這在上面的原始碼解讀章節裡也進行了說明。