首頁>技術>

一、為什麼需要 io.netty.util.concurrent.Promise ?

如果你有一個阻塞的方法,比如 Thread.sleep(1000),而又不想阻塞當前執行緒 A,只需要把該方法包裝成一個任務由另一個執行緒 B 執行即可。

ExecutorService pool = Executors.newFixedThreadPool(3);Future<Integer> future = pool.submit(() -> {    Thread.sleep(1000);    return 1;});

如果你需要在任務結束之後執行其他邏輯,一種方式是 A 執行緒先透過呼叫 future.get() 獲取值,然後執行其他程式碼;但是 get 方法本身也是一個阻塞方法,在這期間 A 執行緒阻塞。

另外一種方法是 B 執行緒執行完任務後,繼續執行後續邏輯。Netty 中的 Future,io.netty.util.concurrent.Future,透過回撥方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); 實現了該功能。

Promise 介面繼承了 Future 介面,在增加了 listener 的情況下,提供了 Promise<V> setSuccess(V result) 方法,可以在任務中手動設定返回值,並立即通知 listeners。

二、例項程式
private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8);public void methodA() {    Promise promise = methodA("ceee...eeeb");    promise.addListener(future -> {		// 1        Object ret = future.get();      // 4. 此時可以直接拿到結果      	// 後續邏輯由 B 執行緒執行        System.out.println(ret);    });  	// A 執行緒不阻塞,繼續執行其他程式碼...}public Promise<ResponsePacket> methodB(String name) {    Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next());    loopGroup.schedule(() -> {		// 2        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("scheduler thread: " + Thread.currentThread().getName());        promise.setSuccess("hello " + name);	// 3    }, 0, TimeUnit.SECONDS);    return promise;}

簡單的使用 Promise 包括:

給 promise 增加 listener,promise.addListener();分配執行任務的執行緒,loopGroup.schedule();在任務執行過程中,設定結果,promise.setSuccess();三、原始碼分析1. addListener
// class: DefaultPromisepublic Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {    checkNotNull(listener, "listener");    synchronized (this) {  // 1. 增加 listener        addListener0(listener);	    }    if (isDone()) {  // 2. 如果任務執行完了,通知所有 listener        notifyListeners();    }    return this;}

繼續看 addListener0:

private Object listeners;private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { 	// 1. 新增第 1 個 listener 時,直接賦值即可    if (listeners == null) {        listeners = listener;    }   	// 3. 新增第 3 個以及更多 listener 時,直接加入陣列即可  	else if (listeners instanceof DefaultFutureListeners) {        ((DefaultFutureListeners) listeners).add(listener);    }   	// 2. 新增第 2 個 listener 時,listeners 型別更改為 DefaultFutureListeners,內部實現為一個數組  	else {        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);    }}

由於可以新增多個 listener,很容易想到透過一個數組儲存所有 listener。而實現類裡面 listeners 型別為 Object,可能是考慮到大部分都只有 1 個 listener,節省記憶體空間。

2. schedule

將任務加入佇列,由執行緒池執行。

3. setSuccess
// class: DefaultPromisepublic Promise<V> setSuccess(V result) {    if (setSuccess0(result)) {	// 如果設定成功,返回;否則拋異常        return this;    }    throw new IllegalStateException("complete already: " + this);}private boolean setSuccess0(V result) {  	// 設定 result    return setValue0(result == null ? SUCCESS : result);}private boolean setValue0(Object objResult) {    // cas 操作    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {        if (checkNotifyWaiters()) {            notifyListeners();        }        return true;    }    return false;}private synchronized boolean checkNotifyWaiters() {      /**       * 有些執行緒不是透過增加 listener 的方式獲取結果,而是透過 promise.get() 方法獲取,       * 那麼這些執行緒為阻塞狀態;當設定了 result 後,需要喚醒這些執行緒       */    if (waiters > 0) {        notifyAll();    }    return listeners != null;  // 只要存在 listener,就返回 true}

繼續檢視 notifyListeners:

// class: DefaultPromiseprivate void notifyListeners() {    EventExecutor executor = executor();    if (executor.inEventLoop()) {        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();        final int stackDepth = threadLocals.futureListenerStackDepth();      	// TODO 巢狀監聽        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {            threadLocals.setFutureListenerStackDepth(stackDepth + 1);            try {              	// 1. 如果是 promise 繫結的執行緒,直接執行                notifyListenersNow();            } finally {                threadLocals.setFutureListenerStackDepth(stackDepth);            }            return;        }    }    // 2. 否則,加入任務排程, 因此 listener 方法最終還是由 promise 繫結的執行緒執行的    safeExecute(executor, new Runnable() {        @Override        public void run() {            notifyListenersNow();        }    });}private void notifyListenersNow() {    Object listeners;    synchronized (this) {        if (notifyingListeners || this.listeners == null) {            return;        }        notifyingListeners = true;        listeners = this.listeners;        this.listeners = null;    }    for (;;) {      	// 依次通知所有 listener        if (listeners instanceof DefaultFutureListeners) {            notifyListeners0((DefaultFutureListeners) listeners);        } else {            notifyListener0(this, (GenericFutureListener<?>) listeners);        }        synchronized (this) {            if (this.listeners == null) {                notifyingListeners = false;                return;            }            // 通知原先的 listeners 時,有可能有新的 listener 在此期間註冊, 也需要通知到            listeners = this.listeners;            this.listeners = null;        }    }}private static void notifyListener0(Future future, GenericFutureListener l) {    try {        l.operationComplete(future);	// 執行 listener 中的方法    } catch (Throwable t) {        if (logger.isWarnEnabled()) {            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);        }    }}

作者:pggsnap連結:https://juejin.cn/post/6844903987020300302來源:掘金

12
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 我們如何設法將延遲減少了3倍