首頁>技術>

ServerBoostrap

使用者可以透過 netty 的 ServerBoostrap 啟動服務端,時序圖如下:

輸入圖片說明

入門例子

為了便於大家理解,我們把服務端啟動的程式碼放在下面:

public void run() throws Exception {    /**     * EventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器     * bossGroup: 用來接收進來的連線     * workerGroup: 用來處理已經被接收的連線     * 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。     */    EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)    EventLoopGroup workerGroup = new NioEventLoopGroup();    try {        /**         * ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。         * 你可以在這個服務中直接使用 Channel,但是這會是一個複雜的處理過程,在很多情況下你並不需要這樣做。         */        ServerBootstrap b = new ServerBootstrap(); // (2)        b.group(bossGroup, workerGroup)                //指定使用 NioServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連線。                .channel(NioServerSocketChannel.class) // (3)                /**                 * 這裡的事件處理類經常會被用來處理一個最近的已經接收的 Channel。                 * ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。                 * 也許你想透過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網路程式。                 * 當你的程式變的複雜時,可能你會增加更多的處理類到 pipeline 上,然後提取這些匿名類到最頂層的類上。                 */                .childHandler(new ChannelInitializer<SocketChannel>() { // (4)                    @Override                    public void initChannel(SocketChannel ch) throws Exception {                        ch.pipeline().addLast(new DiscardServerHandler());                    }                })                /**                 * 你可以設定這裡指定的 Channel 實現的配置引數。                 * 我們正在寫一個TCP/IP 的服務端,因此我們被允許設定 socket 的引數選項比如tcpNoDelay 和 keepAlive。                 * 請參考 ChannelOption 和詳細的 ChannelConfig 實現的介面文件以此可以對ChannelOption 的有一個大概的認識。                 *                 * option() 是提供給 NioServerSocketChannel 用來接收進來的連線。                 * childOption() 是提供給由父管道 ServerChannel 接收到的連線,在這個例子中也是 NioServerSocketChannel。                 */                .option(ChannelOption.SO_BACKLOG, 128)          // (5)                .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)        /**         * 剩下的就是繫結埠然後啟動服務。這裡我們在機器上綁定了機器所有網絡卡上的 8080 埠。         * 當然現在你可以多次呼叫 bind() 方法(基於不同繫結地址)。         */        // 繫結埠,開始接收進來的連線        ChannelFuture f = b.bind(port).sync(); // (7)        System.out.println("DiscardServer start...");        // 等待伺服器  socket 關閉 。        // 在這個例子中,這不會發生,但你可以優雅地關閉你的伺服器。        f.channel().closeFuture().sync();    } finally {        workerGroup.shutdownGracefully();        bossGroup.shutdownGracefully();    }}
淺析

(1)Builder 構建者模式

為了解決引數較多的問題,這裡 netty ServerBootstrap 使用了 builder 模式,可以大大降低我們的配置量,也可以靈活指定配置。

(2)EventLoopGroup 執行緒池

為了提升效能,執行緒池是一個自然的選擇。

(3)設定並且繫結 channel

可以發現我們原始碼中只有一句話 channel(NioServerSocketChannel.class)

這個方法原始碼如下:

public B channel(Class<? extends C> channelClass) {    if (channelClass == null) {        throw new NullPointerException("channelClass");    }    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}

很顯然,這裡使用了工廠模式。

我們只需要簡單的指定 class 資訊,netty 會自動透過反射建立對應的實現類。

(4)初始化 ChannelPipeline

流水線使用了責任鏈模式,用於處理一系列的 ChannelHandler。

(5)新增 ChannelHandler

netty 這裡的 initChannel 方法,可以讓我們非常方便的新增 ChannelHandler。

對個人的影響也比較大,我寫的很多工具方法也會採用類似的模式。

(6)繫結並且啟動監聽埠

我們使用時只有非常優雅的一句話 ChannelFuture f = b.bind(port).sync();,實際上 netty 為我們做了一些封裝:

雖然我們只指定了 port,本質上肯定還是 socket 地址,只不過預設 ip 為本地而已。

public ChannelFuture bind(SocketAddress localAddress) {    validate();    if (localAddress == null) {        throw new NullPointerException("localAddress");    }    return doBind(localAddress);}

doBind 的實現還是比較多的,暫時不做展開。

(7)selector 輪訓 & 觸發

寫過 java nio 的小夥伴們肯定知道,需要透過 selector 輪訓獲取訊息。

實際上 netty 將這些細節封裝了起來。輪訓準備就緒 Channel 之後,將由 Reactor 執行緒 NioEventLoop 執行 ChannelPipeline 的響應方法,最終呼叫到 ChannelHandler。

ChannelHandler 中包含了系統內建的處理類,和使用者自定義的處理類。

原始碼解析

每一個版本的原始碼可能有差異,這裡老馬的版本是 4.1.17.Final。

EventLoopGroup

EventLoopGroup 就是 Reactor 執行緒池。

group 方法如下:

/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {    super.group(parentGroup);    if (childGroup == null) {        throw new NullPointerException("childGroup");    }    if (this.childGroup != null) {        throw new IllegalStateException("childGroup set already");    }    this.childGroup = childGroup;    return this;}

這裡呼叫了父類的方法 super.group(parentGroup);,實現如下:

/** * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created * {@link Channel} */public B group(EventLoopGroup group) {    if (group == null) {        throw new NullPointerException("group");    }    if (this.group != null) {        throw new IllegalStateException("group set already");    }    this.group = group;    return self();}

這個方法主要用於設定 IO 執行緒,執行和排程網路事件的讀寫。

channel

channel 的方法如下:

public B channel(Class<? extends C> channelClass) {    if (channelClass == null) {        throw new NullPointerException("channelClass");    }    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}

ReflectiveChannelFactory 反射的核心實現如下:

public ReflectiveChannelFactory(Class<? extends T> clazz) {    if (clazz == null) {        throw new NullPointerException("clazz");    }    this.clazz = clazz;}@Overridepublic T newChannel() {    try {        return clazz.getConstructor().newInstance();    } catch (Throwable t) {        throw new ChannelException("Unable to create Channel from class " + clazz, t);    }}

實際上就是透過 NioServerSocketChannel 建立了 Channel 物件。

啟動類設定 Handler

啟動類可以為啟動服務類和父類,分別設定 Handler。

這個也是一開始老馬學習 netty 比較迷惑的地方,這兩個有啥區別呢?

輸入圖片說明

本質區別:

(1)ServerBoostrap 中的 Handler 是 NioServerSocketChannel 使用的,所有連線這個監聽埠的客戶端都會執行。

(2)父類 AbstractServerBoostrap 中的 Handler 是一個工廠類,會為每一個接入的客戶端都建立一個新的 Handler。

埠繫結

最後,還有服務端的埠繫結。我們上面只是簡單的過了一下,這裡做一下展開:

private ChannelFuture doBind(final SocketAddress localAddress) {    //1. 建立 channel 並註冊    final ChannelFuture regFuture = initAndRegister();    final Channel channel = regFuture.channel();    if (regFuture.cause() != null) {        return regFuture;    }    //2. 建立完成後,設定對應的附加屬性    if (regFuture.isDone()) {        // At this point we know that the registration was complete and successful.        ChannelPromise promise = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise);        return promise;    } else {        // Registration future is almost always fulfilled already, but just in case it's not.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);        regFuture.addListener(new ChannelFutureListener() {            //3. 新增監聽器            @Override            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if (cause != null) {                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                    // IllegalStateException once we try to access the EventLoop of the Channel.                    promise.setFailure(cause);                } else {                    // Registration was successful, so set the correct executor to use.                    // See https://github.com/netty/netty/issues/2586                    promise.registered();                    doBind0(regFuture, channel, localAddress, promise);                }            }        });        return promise;    }}
建立 channel 並註冊

initAndRegister() 完整實現如下:

final ChannelFuture initAndRegister() {    Channel channel = null;    try {        //1. 透過 channelFactory 建立新的 channel        channel = channelFactory.newChannel();        //2. 初始化相關屬性        init(channel);    } catch (Throwable t) {        // 省略    }    // 省略    return regFuture;}

init 是一個抽象方法,服務端實現如下:

@Overridevoid init(Channel channel) throws Exception {    final Map<ChannelOption<?>, Object> options = options0();    //1. 設定 Socket 引數和 NioserverSocketChannel 的 附加屬性    synchronized (options) {        setChannelOptions(channel, options, logger);    }    final Map<AttributeKey<?>, Object> attrs = attrs0();    synchronized (attrs) {        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {            @SuppressWarnings("unchecked")            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();            channel.attr(key).set(e.getValue());        }    }    // 屬性省略    p.addLast(new ChannelInitializer<Channel>() {        @Override        public void initChannel(final Channel ch) throws Exception {            final ChannelPipeline pipeline = ch.pipeline();            //2. 將 AbstractBoostrap 的 Handler 新增到 NioserverSocketChannel 的 pipeline 中            ChannelHandler handler = config.handler();            if (handler != null) {                pipeline.addLast(handler);            }            //3. 將用於服務端註冊的 ServerBootstrapAcceptor 新增到 pipeline 中            ch.eventLoop().execute(new Runnable() {                @Override                public void run() {                    pipeline.addLast(new ServerBootstrapAcceptor(                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}

到這裡,服務端的監聽相關資源已經初始化完畢。

接下來,需要把 NioserverSocketChannel 註冊到 Reactor 執行緒的多路複用選擇器上,然後輪訓客戶端事件。

NioserverSocketChannel 簡介

構造器如下:

public NioServerSocketChannel() {    this(newSocket(DEFAULT_SELECTOR_PROVIDER));}

這個就是預設 channel 初始化的構造器,實際呼叫的是:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();private static ServerSocketChannel newSocket(SelectorProvider provider) {    return provider.openServerSocketChannel();}

歸根到底,預設的 SelectorProvider 應該是 jdk nio 的 DefaultSelectorProvider。

實際上,還是根據初始化 ServerSocketChannel:

public NioServerSocketChannel(ServerSocketChannel channel) {    super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}

可以看到,這裡預設註冊監聽了 SelectionKey.OP_ACCEPT 事件。

其中 SelectionKey 只有 4 種:

public static final int OP_ACCEPT = 1 << 4;public static final int OP_CONNECT = 1 << 3;public static final int OP_WRITE = 1 << 2;public static final int OP_READ = 1 << 0;
NioserverSocketChannel 註冊

註冊的原始碼比較多,看得人云裡霧裡的。

可以理解,就是首先註冊自己感興趣的事件,發生的時候通知你即可。

註冊的方法如下我們主要看 NioEventLoop 即可,這個類繼承自 SingleThreadEventLoop 類。

實現了 SingleThreadEventExecutor 類的 run 方法,如下:

@Overrideprotected void run() {    for (;;) {        try {            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:                    continue;                case SelectStrategy.SELECT:                    select(wakenUp.getAndSet(false));                    if (wakenUp.get()) {                        selector.wakeup();                    }                    // fall through                default:            }            // 省略            processSelectedKeys();            // 省略        } catch (Throwable t) {            handleLoopException(t);        }        // Always handle shutdown even if the loop processing threw an exception.        // 省略    }}

我們只看核心的部分,這裡實際上就是一個死迴圈,註冊的部分核心如下:

public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {    // 省略    ch.register(selector, interestOps, task);    // 省略}

在 AbstractSelectableChannel 中的實現如下:

public final SelectionKey register(Selector sel, int ops,                                   Object att)    throws ClosedChannelException{    synchronized (regLock) {        // 省略        SelectionKey k = findKey(sel);        if (k != null) {            k.interestOps(ops);            k.attach(att);        }        if (k == null) {            // New registration            synchronized (keyLock) {                if (!isOpen())                    throw new ClosedChannelException();                k = ((AbstractSelector)sel).register(this, ops, att);                addKey(k);            }        }        return k;    }}

這裡實際上是註冊感興趣的事件,服務端到這裡基本上已經告一段落了。

客戶端接入原始碼分析

下面我們看一下 NioEventLoop 是如何處理客戶端請求的。

當多路複用器就緒時,預設執行 processSelectedKeysOptimized() 方法:

private void processSelectedKeysOptimized() {    for (int i = 0; i < selectedKeys.size; ++i) {        final SelectionKey k = selectedKeys.keys[i];        selectedKeys.keys[i] = null;        final Object a = k.attachment();        // 這裡處理的 attachment 是 AbstractNioChannel        if (a instanceof AbstractNioChannel) {            processSelectedKey(k, (AbstractNioChannel) a);        } else {            @SuppressWarnings("unchecked")            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;            processSelectedKey(k, task);        }        if (needsToSelectAgain) {            // null out entries in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.reset(i + 1);            selectAgain();            i = -1;        }    }}

這裡實際上是根據不同的的型別,執行不同的操作:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        // 省略        try {            int readyOps = k.readyOps();            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                unsafe.finishConnect();            }            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write                ch.unsafe().forceFlush();            }            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead            // to a spin loop            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }

我們來重點關注下 read 方法。

NioUnsafe 是一個介面,有兩個子類:NioByteUnsafe 和 NioMessageUnsafe。

NioServerSocketChannel 繼承自 AbstractNioMessageChannel,使用的是 NioMessageUnsafe 類。

read 方法

實現如下:

@Overridepublic void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                // 核心方法                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                allocHandle.incMessagesRead(localRead);            } while (allocHandle.continueReading());        } catch (Throwable t) {            exception = t;        }        // 處理讀取的資訊        int size = readBuf.size();        for (int i = 0; i < size; i ++) {            readPending = false;            // 觸發 channel read             pipeline.fireChannelRead(readBuf.get(i));        }        readBuf.clear();        allocHandle.readComplete();        // 觸發 read complete        pipeline.fireChannelReadComplete();    } finally {        // 省略    }}

doReadMessages 在 NioServerSocketChannel 類中實現如下:

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {    SocketChannel ch = SocketUtils.accept(javaChannel());    try {        if (ch != null) {            buf.add(new NioSocketChannel(this, ch));            return 1;        }    } catch (Throwable t) {        // 省略    }    return 0;}

這裡就是 jdk nio 中的接收到一個新的客戶端請求的方法實現。

讀取完成之後,觸發 fireChannelRead,如下:

@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {    AbstractChannelHandlerContext.invokeChannelRead(head, msg);    return this;}

如下:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

invokeChannelRead 如下:

private void invokeChannelRead(Object msg) {    if (invokeHandler()) {        try {            ((ChannelInboundHandler) handler()).channelRead(this, msg);        } catch (Throwable t) {            notifyHandlerException(t);        }    } else {        fireChannelRead(msg);    }}

實際上最後就是一個責任鏈去呼叫各種 ChannelInboundHandler 類。

到此,客戶端接入完成。

可以進行網路讀寫等 IO 操作。

小結

讀到這裡的小夥伴們肯定會發現,netty 使用起來簡單,實際上背後做了很多的封裝。

這一切封裝的背後,都需要紮實的 java 基礎和網路程式設計知識作為支撐。

下一節我們將一起學習下客戶端啟動的原始碼。

我是老馬,期待與你的下次相遇。

4
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 你會嗎?基於NodeJS的高效能分散式遊戲日誌系統