ServerBoostrap
使用者可以透過 netty 的 ServerBoostrap 啟動服務端,時序圖如下:
輸入圖片說明
入門例子為了便於大家理解,我們把服務端啟動的程式碼放在下面:
public void run() throws Exception { /** * EventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器 * bossGroup: 用來接收進來的連線 * workerGroup: 用來處理已經被接收的連線 * 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。 */ EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** * ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。 * 你可以在這個服務中直接使用 Channel,但是這會是一個複雜的處理過程,在很多情況下你並不需要這樣做。 */ ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) //指定使用 NioServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連線。 .channel(NioServerSocketChannel.class) // (3) /** * 這裡的事件處理類經常會被用來處理一個最近的已經接收的 Channel。 * ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。 * 也許你想透過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網路程式。 * 當你的程式變的複雜時,可能你會增加更多的處理類到 pipeline 上,然後提取這些匿名類到最頂層的類上。 */ .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) /** * 你可以設定這裡指定的 Channel 實現的配置引數。 * 我們正在寫一個TCP/IP 的服務端,因此我們被允許設定 socket 的引數選項比如tcpNoDelay 和 keepAlive。 * 請參考 ChannelOption 和詳細的 ChannelConfig 實現的介面文件以此可以對ChannelOption 的有一個大概的認識。 * * option() 是提供給 NioServerSocketChannel 用來接收進來的連線。 * childOption() 是提供給由父管道 ServerChannel 接收到的連線,在這個例子中也是 NioServerSocketChannel。 */ .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) /** * 剩下的就是繫結埠然後啟動服務。這裡我們在機器上綁定了機器所有網絡卡上的 8080 埠。 * 當然現在你可以多次呼叫 bind() 方法(基於不同繫結地址)。 */ // 繫結埠,開始接收進來的連線 ChannelFuture f = b.bind(port).sync(); // (7) System.out.println("DiscardServer start..."); // 等待伺服器 socket 關閉 。 // 在這個例子中,這不會發生,但你可以優雅地關閉你的伺服器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}
淺析(1)Builder 構建者模式
為了解決引數較多的問題,這裡 netty ServerBootstrap 使用了 builder 模式,可以大大降低我們的配置量,也可以靈活指定配置。
(2)EventLoopGroup 執行緒池
為了提升效能,執行緒池是一個自然的選擇。
(3)設定並且繫結 channel
可以發現我們原始碼中只有一句話 channel(NioServerSocketChannel.class)
這個方法原始碼如下:
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}
很顯然,這裡使用了工廠模式。
我們只需要簡單的指定 class 資訊,netty 會自動透過反射建立對應的實現類。
(4)初始化 ChannelPipeline
流水線使用了責任鏈模式,用於處理一系列的 ChannelHandler。
(5)新增 ChannelHandler
netty 這裡的 initChannel 方法,可以讓我們非常方便的新增 ChannelHandler。
對個人的影響也比較大,我寫的很多工具方法也會採用類似的模式。
(6)繫結並且啟動監聽埠
我們使用時只有非常優雅的一句話 ChannelFuture f = b.bind(port).sync();,實際上 netty 為我們做了一些封裝:
雖然我們只指定了 port,本質上肯定還是 socket 地址,只不過預設 ip 為本地而已。
public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress);}
doBind 的實現還是比較多的,暫時不做展開。
(7)selector 輪訓 & 觸發
寫過 java nio 的小夥伴們肯定知道,需要透過 selector 輪訓獲取訊息。
實際上 netty 將這些細節封裝了起來。輪訓準備就緒 Channel 之後,將由 Reactor 執行緒 NioEventLoop 執行 ChannelPipeline 的響應方法,最終呼叫到 ChannelHandler。
ChannelHandler 中包含了系統內建的處理類,和使用者自定義的處理類。
原始碼解析每一個版本的原始碼可能有差異,這裡老馬的版本是 4.1.17.Final。
EventLoopGroupEventLoopGroup 就是 Reactor 執行緒池。
group 方法如下:
/** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this;}
這裡呼叫了父類的方法 super.group(parentGroup);,實現如下:
/** * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created * {@link Channel} */public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self();}
這個方法主要用於設定 IO 執行緒,執行和排程網路事件的讀寫。
channelchannel 的方法如下:
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}
ReflectiveChannelFactory 反射的核心實現如下:
public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz;}@Overridepublic T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); }}
實際上就是透過 NioServerSocketChannel 建立了 Channel 物件。
啟動類設定 Handler啟動類可以為啟動服務類和父類,分別設定 Handler。
這個也是一開始老馬學習 netty 比較迷惑的地方,這兩個有啥區別呢?
輸入圖片說明
本質區別:
(1)ServerBoostrap 中的 Handler 是 NioServerSocketChannel 使用的,所有連線這個監聽埠的客戶端都會執行。
(2)父類 AbstractServerBoostrap 中的 Handler 是一個工廠類,會為每一個接入的客戶端都建立一個新的 Handler。
埠繫結最後,還有服務端的埠繫結。我們上面只是簡單的過了一下,這裡做一下展開:
private ChannelFuture doBind(final SocketAddress localAddress) { //1. 建立 channel 並註冊 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //2. 建立完成後,設定對應的附加屬性 if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { //3. 新增監聽器 @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }}
建立 channel 並註冊initAndRegister() 完整實現如下:
final ChannelFuture initAndRegister() { Channel channel = null; try { //1. 透過 channelFactory 建立新的 channel channel = channelFactory.newChannel(); //2. 初始化相關屬性 init(channel); } catch (Throwable t) { // 省略 } // 省略 return regFuture;}
init 是一個抽象方法,服務端實現如下:
@Overridevoid init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); //1. 設定 Socket 引數和 NioserverSocketChannel 的 附加屬性 synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 屬性省略 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); //2. 將 AbstractBoostrap 的 Handler 新增到 NioserverSocketChannel 的 pipeline 中 ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } //3. 將用於服務端註冊的 ServerBootstrapAcceptor 新增到 pipeline 中 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
到這裡,服務端的監聽相關資源已經初始化完畢。
接下來,需要把 NioserverSocketChannel 註冊到 Reactor 執行緒的多路複用選擇器上,然後輪訓客戶端事件。
NioserverSocketChannel 簡介構造器如下:
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER));}
這個就是預設 channel 初始化的構造器,實際呼叫的是:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();private static ServerSocketChannel newSocket(SelectorProvider provider) { return provider.openServerSocketChannel();}
歸根到底,預設的 SelectorProvider 應該是 jdk nio 的 DefaultSelectorProvider。
實際上,還是根據初始化 ServerSocketChannel:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
可以看到,這裡預設註冊監聽了 SelectionKey.OP_ACCEPT 事件。
其中 SelectionKey 只有 4 種:
public static final int OP_ACCEPT = 1 << 4;public static final int OP_CONNECT = 1 << 3;public static final int OP_WRITE = 1 << 2;public static final int OP_READ = 1 << 0;
NioserverSocketChannel 註冊註冊的原始碼比較多,看得人云裡霧裡的。
可以理解,就是首先註冊自己感興趣的事件,發生的時候通知你即可。
註冊的方法如下我們主要看 NioEventLoop 即可,這個類繼承自 SingleThreadEventLoop 類。
實現了 SingleThreadEventExecutor 類的 run 方法,如下:
@Overrideprotected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } // 省略 processSelectedKeys(); // 省略 } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. // 省略 }}
我們只看核心的部分,這裡實際上就是一個死迴圈,註冊的部分核心如下:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) { // 省略 ch.register(selector, interestOps, task); // 省略}
在 AbstractSelectableChannel 中的實現如下:
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{ synchronized (regLock) { // 省略 SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; }}
這裡實際上是註冊感興趣的事件,服務端到這裡基本上已經告一段落了。
客戶端接入原始碼分析下面我們看一下 NioEventLoop 是如何處理客戶端請求的。
當多路複用器就緒時,預設執行 processSelectedKeysOptimized() 方法:
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; final Object a = k.attachment(); // 這裡處理的 attachment 是 AbstractNioChannel if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } }}
這裡實際上是根據不同的的型別,執行不同的操作:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 省略 try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
我們來重點關注下 read 方法。
NioUnsafe 是一個介面,有兩個子類:NioByteUnsafe 和 NioMessageUnsafe。
NioServerSocketChannel 繼承自 AbstractNioMessageChannel,使用的是 NioMessageUnsafe 類。
read 方法實現如下:
@Overridepublic void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 核心方法 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } // 處理讀取的資訊 int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 觸發 channel read pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // 觸發 read complete pipeline.fireChannelReadComplete(); } finally { // 省略 }}
doReadMessages 在 NioServerSocketChannel 類中實現如下:
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { // 省略 } return 0;}
這裡就是 jdk nio 中的接收到一個新的客戶端請求的方法實現。
讀取完成之後,觸發 fireChannelRead,如下:
@Overridepublic final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this;}
如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
invokeChannelRead 如下:
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); }}
實際上最後就是一個責任鏈去呼叫各種 ChannelInboundHandler 類。
到此,客戶端接入完成。
可以進行網路讀寫等 IO 操作。
小結讀到這裡的小夥伴們肯定會發現,netty 使用起來簡單,實際上背後做了很多的封裝。
這一切封裝的背後,都需要紮實的 java 基礎和網路程式設計知識作為支撐。
下一節我們將一起學習下客戶端啟動的原始碼。
我是老馬,期待與你的下次相遇。