首頁>技術>

前言

在上一篇文章中,我講了網路 IO 的基礎知識,本篇文章將從原始碼角度具體講解 DBLE 的網路模組:包括 DBLE 是如何處理 MySQL 包的,多路複用在 DBLE 中是如何實現的,以及請求的非同步化處理相關邏輯。

DBLE 是如何處理 MySQL 包的?

我們將以客戶端連線 DBLE 為例,從原始碼角度講解 DBLE 的相關處理流程。

客戶端與 DBLE 建立連線的流程如下圖所示(因為 DBLE 實現了 MySQL 協議,所以與客戶端連線 MySQL 的流程一樣):

客戶端與 DBLE 建立連線的流程

主要包括以下四個步驟:

1、 客戶端發起 connect 連線;

2、服務端傳送握手包;

3、客戶端回覆握手包;

4、服務端返回 OK 包,表示連線建立完成,進入命令階段。

我們直接看原始碼:

1、DBLE 處理客戶端 connect

DBLE 處理客戶端 connect 的程式碼在 NIOAcceptor#run 方法中:

public void run() {        //這裡的selector即IO多路複用選擇器,一個selector可以處理多個客戶端連線請求        final Selector tSelector = this.selector;        for (; ; ) {            try {                tSelector.select(1000L);                Set<SelectionKey> keys = tSelector.selectedKeys();                try {                    for (SelectionKey key : keys) {                        if (key.isValid() && key.isAcceptable()) {                            //當連線有效且可接受時,處理客戶端連線                            accept();                        } else {                            key.cancel();                        }                    }                } catch (final Throwable e) {                    LOGGER.warn("caught Throwable err: ", e);                } finally {                    keys.clear();                }            } catch (Exception e) {                LOGGER.info(getName(), e);            }        }    }

可以看出上述方法呼叫了 accept 來接受客戶端發起的 TCP 連線,繼續看該類的 accept 方法:

private void accept() {        SocketChannel channel = null;        try {            //與客戶端建立TCP連線            channel = serverChannel.accept();            channel.configureBlocking(false);            NIOSocketWR socketWR = new NIOSocketWR();            FrontendConnection c = factory.make(channel, socketWR);            socketWR.initFromConnection(c);            c.setId(ID_GENERATOR.getId());            IOProcessor processor = DbleServer.getInstance().nextFrontProcessor();            c.setProcessor(processor);            NIOReactor reactor = reactorPool.getNextReactor();            //將已建立好的連線註冊給NIOReactor            reactor.postRegister(c);        } catch (Exception e) {            LOGGER.info(getName(), e);            closeChannel(channel);        }    }

可以看出上述程式碼將客戶端發起建立的 TCP 連線註冊給了 NIOReactor 來管理。

到這裡 DBLE 對客戶端的 connect 已經處理完成了,他們之間已經完成了 TCP 連線的建立,同時 DBLE 將客戶端請求的連線註冊給了 NIOReactor

2、服務端傳送握手包

接著上面我們繼續看 NIOReactor#postRegister 方法:

//該方法將連線放入佇列,並喚醒reactorR的selector,其中reactorR為NIOReactor的內部類RWvoid postRegister(AbstractConnection c) {        reactorR.registerQueue.offer(c);        reactorR.selector.wakeup();    }

我們直接看 RW 處理註冊佇列裡客戶端連線的程式碼,在 RW#register 方法中:

private void register(Selector finalSelector) {            ……            while ((c = registerQueue.poll()) != null) {                try {                     //下面這行程式碼需要注意,該程式碼將連線註冊到了RW的selector多路複用選擇器中,使得該選擇器後續能夠讀取該連線傳送過來的資料                    ((NIOSocketWR) c.getSocketWR()).register(finalSelector);                    //服務端傳送握手包的邏輯在下面這個方法中                    c.register();                }             ……        }

AbstractConnection#register 方法中又呼叫了 AbstractService#register 方法,對於客戶端連線請求來講,這裡最終呼叫的是 MySQLFrontAuthService#register 方法:

public void register() throws IOException {        //終於看到了greeting,該方法向客戶端傳送了握手包        greeting();        this.connection.getSocketWR().asyncRead();    }

MySQLFrontAuthService#greeting 方法實現了拼裝握手包,並將握手包傳送給客戶端的邏輯:

private void greeting() {        // generate auth data        byte[] rand1 = RandomUtil.randomBytes(8);        byte[] rand2 = RandomUtil.randomBytes(12);        // save auth data        byte[] rand = new byte[rand1.length + rand2.length];        System.arraycopy(rand1, 0, rand, 0, rand1.length);        System.arraycopy(rand2, 0, rand, rand1.length, rand2.length);        this.seed = rand;        HandshakeV10Packet hs = new HandshakeV10Packet();        hs.setPacketId(0);        hs.setProtocolVersion(Versions.PROTOCOL_VERSION);  // [0a] protocol version   V10        hs.setServerVersion(Versions.getServerVersion());        hs.setThreadId(connection.getId());        hs.setSeed(rand1);        hs.setServerCapabilities(getServerCapabilities());        int charsetIndex = CharsetUtil.getCharsetDefaultIndex(SystemConfig.getInstance().getCharset());        hs.setServerCharsetIndex((byte) (charsetIndex & 0xff));        hs.setServerStatus(2);        hs.setRestOfScrambleBuff(rand2);        hs.setAuthPluginName(pluginName.name().getBytes());        //這裡的呼叫即傳送握手包到客戶端        hs.write(connection);    }

到這裡就完成了服務端傳送握手包的邏輯。

3、DBLE 處理客戶端的握手回覆包

服務端傳送了握手包給客戶端,客戶端收到後需要傳送握手回覆包過來了,一般該握手回覆包中會包含使用者相關資訊。

那麼 DBLE 如何讀取並處理客戶端傳送過來的握手回覆包呢?

相應的程式碼在 RW#run 方法中,那為什麼在這個方法中能處理客戶端傳送過來的資料呢?因為之前在處理客戶端連線的時候,已經把相應的連線註冊給了 RW 的多路複用選擇器,所以它當然能處理相應連線的資料了,記不得的同學可以看前面 RW#register 方法中的註釋。

RW#run 方法中處理客戶端傳送資料的主要程式碼如下:

public void run() {            final Selector finalSelector = this.selector;            Set<SelectionKey> keys = null;            for (; ; ) {                    ……                                    //當連線中有資料的時候,這裡會返回相應的selection keys                    keys = finalSelector.selectedKeys();                    if (keys.size() == 0) {                        continue;                    }                    //對有相應事件的連線進行處理                    executeKeys(keys);             }                    ……     }

我們繼續看 RW#executeKeys 方法(對程式碼做了一些精簡,但不影響理解):

private void executeKeys(Set<SelectionKey> keys) {            for (SelectionKey key : keys) {                AbstractConnection con = null;                Object att = key.attachment();                con = (AbstractConnection) att;                if (key.isValid() && key.isReadable()) {                    //這裡即為讀取客戶端傳送過來的資料                    con.asyncRead();                }            }        }

跟著程式碼走,相應的處理邏輯在 NIOSocketWR#asyncRead 方法中:

public void asyncRead() throws IOException {        ByteBuffer theBuffer = con.findReadBuffer();        //讀取客戶端傳送過來的資料到快取theBuffer中        int got = channel.read(theBuffer);        //處理相應的資料        con.onReadData(got);    }

AbstractConnection#onReadData 方法中又進一步呼叫了 AbstractService#handle 方法來處理資料,所以我們直接看 AbstractService#handle 方法:

public void handle(ByteBuffer dataBuffer) {        this.sessionStart();        boolean hasReming = true;        int offset = 0;        while (hasReming) {            //下面這行程式碼實際處理了客戶端傳過來的資料包,裡面包含計算包總長度、判斷讀取的資料包是否完整等邏輯            ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);            switch (result.getCode()) {                //客戶端首次發來的握手包,所以是完整的資料包,進入這裡的處理邏輯,這裡將讀取的資料封裝成task任務,提交到佇列中,然後透過執行緒非同步處理                case REACH_END_BUFFER:                    connection.readReachEnd();                    byte[] packetData = result.getPacketData();                    if (packetData != null) {                        taskCreate(packetData);                    }                    dataBuffer.clear();                    hasReming = false;                    break;                case BUFFER_PACKET_UNCOMPLETE:                    connection.compactReadBuffer(dataBuffer, result.getOffset());                    hasReming = false;                    break;                case BUFFER_NOT_BIG_ENOUGH:                    connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());                    hasReming = false;                    break;                case STLL_DATA_REMING:                    byte[] partData = result.getPacketData();                    if (partData != null) {                        taskCreate(partData);                    }                    offset = result.getOffset();                    continue;                default:                    throw new RuntimeException("unknown error when read data");            }        }    }

到這裡 DBLE 完成了讀取客戶端傳送過來的握手包,並將它封裝成了非同步任務以備下一步處理。

4、DBLE 非同步處理任務並返回 OK 包

非同步是高效能的秘訣之一。上面DBLE將讀取到的資料封裝成了任務,然後交由執行緒非同步處理。

我們直接來看任務處理的相關程式碼,在 AbstractService#execute 方法中:

public void execute(ServiceTask task) {        task.increasePriority();        handleData(task);    }

對於客戶端握手回覆包的處理,最後呼叫的程式碼在 MySQLFrontAuthService#handleAuthPacket 方法中,所以對於該場景,我們直接看該方法的相關程式碼:

private void handleAuthPacket(byte[] data) {        //將讀取到的資料轉換為AuthPacket        AuthPacket auth = new AuthPacket();        auth.read(data);        this.authPacket = auth;        ……        //檢查使用者名稱和密碼        auth();        ……    }

上面的 auth 方法裡又呼叫了 MySQLFrontAuthService#checkForResult 方法,所以我們直接看該方法:

private void checkForResult(AuthResultInfo info) {            ……            AbstractService service = BusinessServiceFactory.getBusinessService(info, connection);            connection.setService(service);            //驗證通過後,拼裝併發送OK包給客戶端            MySQLPacket packet = new OkPacket();            packet.setPacketId(needAuthSwitched ? 4 : 2);            packet.write(connection);            ……    }

到這裡,DBLE 已經處理完了握手回覆包,並返回 OK 包給客戶端,整個客戶端與 DBLE 的連線建立過程就結束了。

該過程結束後,將進入 MySQL 協議的 Command 階段,如果你透過命令列連線 DBLE 的話,即進入了下面的介面:

mysql>

是不是沒想到在進入這個命令介面前發生了這麼多……

多路複用在 DBLE 中是如何實現的

其實這個問題的答案,如果仔細看前面程式碼章節的話就已經能夠知道了。DBLE 的多路複用其實就是透過 JAVA 的多路複用選擇器 Selector 來實現的,透過將連線註冊給 Selector,這樣只是在連線中有資料時候才進行讀取,能夠實現一個執行緒監聽多個連線。

請求的非同步化處理

DBLE 在讀取完資料後,並沒有在當前執行緒中處理這些資料,而是將資料封裝成任務提交到佇列中去,然後透過另外的執行緒來進行處理,這即是請求非同步化處理,能夠極大的提高效能,這在上面的原始碼解讀章節裡也進行了說明。

13
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Mars:加速資料科學的新方式