首頁>技術>

本文章資訊量較大,從 IO 多路複用,到生成器的使用,再到 async、await 背後的實現原理,深入淺出,剖析得非常透徹,非常硬核!

原文連結:https://zhuanlan.zhihu.com/p/330549526

這兩天因為一點個人原因寫了點好久沒碰的 Python ,其中涉及到「協程」程式設計,上次搞的時候,它還是 Web 框架 tornado 特有的 feature,現在已經有 async、await 關鍵字支援了。思考了一下其實現,回顧了下這些年的演變,覺得還有點意思。

都是單執行緒,為什麼原來低效率的程式碼用了 async、await 加一些非同步庫就變得效率高了?

如果在做基於 Python 的網路或者 Web 開發時,對於這個問題曾感到疑惑,這篇文章嘗試給一個答案。

⚠️ 本文僅是提供了一個獨立的思考方向,並未遵循歷史和現有實際具體的實現細節。

其次,閱讀這篇文章需要你對 Python 比較熟悉,至少了解 Python 中的生成器 generator 的概念。

0x01 IO 多路複用

這是效能的關鍵。但我們這裡只解釋概念,其實現細節不是重點,這對我們理解 Python 的協程已經足夠了,如已足夠了解,前進到 0x02。

首先,你要知道所有的網路服務程式都是一個巨大的死迴圈,你的業務邏輯都在這個迴圈的某個時刻被呼叫:

def handler(request):    # 處理請求    pass# 你的 handler 執行在 while 迴圈中while True:    # 獲取一個新請求    request = accept()    # 根據路由對映獲取到使用者寫的業務邏輯函式    handler = get_handler(request)    # 執行使用者的handler,處理請求    handler(request)

設想你的 Web 服務的某個 handler,在接收到請求後需要一個 API 呼叫才能響應結果。

對於最傳統的網路應用,你的 API 請求發出去後在等待響應,此時程式停止執行,甚至新的請求也得在響應結束後才進得來。如果你依賴的 API 請求網路丟包嚴重,響應特別慢呢?那應用的吞吐量將非常低。

很多傳統 Web 伺服器使用多執行緒技術解決這個問題:把 handler 的執行放到其他執行緒上,每個執行緒處理一個請求,本執行緒阻塞不影響新請求進入。這能一定程度上解決問題,但對於併發比較大的系統,過多執行緒排程會帶來很大的效能開銷。

IO 多路複用可以做到不使用執行緒解決問題,它是由作業系統核心提供的功能,可以說專門為這類場景而生。簡單來講,你的程式遇到網路IO時,告訴作業系統幫你盯著,同時作業系統提供給你一個方法,讓你可以隨時獲取到有哪些 IO 操作已經完成。就像這樣:

# 作業系統的IO複用示例虛擬碼# 向作業系統IO註冊自己關注的IO操作的id和型別io_register(io_id, io_type)io_register(io_id, io_type)# 獲取完成的IO操作events = io_get_finished()for (io_id, io_type) in events:    if io_type == READ:        data = read_data(io_id)     elif io_type == WRITE:        write_data(io_id,data)

把 IO 複用邏輯融合到我們的伺服器中,大概會像這樣:

call_backs = {}def handler(req):    # do jobs here    io_register(io_id, io_type)    def call_back(result):        # 使用返回的result完成剩餘工作...    call_backs[io_id] = call_back# 新的迴圈while True:    # 獲取已經完成的io事件    events = io_get_finished()    for (io_id, io_type) in events:        if io_type == READ: # 讀取            data = read(io_id)             call_back = call_backs[io_id]            call_back(data)        else:            # 其他型別io事件的處理            pass    # 獲取一個新請求    request = accept()    # 根據路由對映獲取到使用者寫的業務邏輯函式    handler = get_handler(request)    # 執行使用者的handler,處理請求    handler(request)

我們的 handler 對於 IO 操作,註冊了回撥就立刻返回,同時每次迭代都會對已完成的 IO 執行回撥,網路請求不再阻塞整個伺服器。

上面的虛擬碼僅便於理解,具體實現細節更復雜。而且就連線受新請求也是在從作業系統得到監聽埠的 IO 事件後進行的。

我們如果把迴圈部分還有 call_backs 字典拆分到單獨模組,就能得到一個 EventLoop,也就是 Python 標準庫 asyncio 包中提供的 ioloop。

0x02 用生成器消除 callback

著重看下我們業務中經常寫的 handler 函式,在有獨立的 ioloop 後,它現在變成類似這樣:

def handler(request):    # 業務邏輯程式碼...    # 需要執行一次 API 請求    def call_back(result):        # 使用 API 返回的result完成剩餘工作        print(result)    # 沒有io_call這個方法,這裡只是示意,表示註冊一個IO操作    asyncio.get_event_loop().io_call(api, call_back)

到這裡,效能問題已經解決了:我們不再需要多執行緒就能源源不斷接受新請求,而且不用care依賴的 API 響應有多慢。

但是我們也引入了一個新問題,原來流暢的業務邏輯程式碼現在被拆成了兩部分,請求 API 之前的程式碼還正常,請求 API 之後的程式碼只能寫在回撥函數里面了。

這裡我們業務邏輯只有一個 API 呼叫,如果有多個 API ,再加上對 Redis 或者 MySQL 的呼叫(它們本質也是網路請求),整個邏輯會被拆分的更散,這對業務開發是一筆負擔。

對於有匿名函式的一些語言(沒錯就是JavaScript),還可能會引發所謂的「回撥地獄」。

接下來我們想辦法解決這個問題。

我們很容易會想到:如果函式在執行到網路 IO 操作處後能夠暫停,完成後又能在斷點處喚醒就好了。

如果你對 Python 的「生成器」熟悉,你應該會發現,它恰好具有這個功能:

def example():    value = yield 2    print("get", value)    return valueg = example()# 啟動生成器,我們會得到 2got = g.send(None)print(got)  # 2try:    # 再次啟動 會顯示 "get 4", 就是我們傳入的值    got = g.send(got*2)except StopIteration as e:    # 生成器執行完成,將會print(4),e.value 是生成器return的值    print(e.value)

函式中有 yield 關鍵字,呼叫函式將會得到一個生成器,生成器一個關鍵的方法 send() 可以跟生成器互動。

g.send(None) 會執行生成器內程式碼直到遇到 yield,並返回其後的物件,也就是 2,生成器程式碼就停在這裡了,直到我們再次執行 g.send(got*2),會把 2*2 也就是 4 賦值給yield 前面的變數 value,然後繼續執行生成器程式碼。

yield 在這裡就像一扇門,可以把一件東西從這裡送出去,也可以把另一件東西拿進來。

如果 send 讓生成器執行到下一個 yield 前就結束了,send 呼叫會引發一個特殊的異常StopIteration,這個異常自帶一個屬性 value,為生成器 return 的值。

如果我們把我們的 handler 用 yield 關鍵字轉換成一個生成器,執行它來把 IO 操作的具體內容返回,IO 完成後的回撥函式中把 IO 結果放回並恢復生成器執行,那就解決了業務程式碼不流暢的問題了:

def handler(request):    # 業務邏輯程式碼...    # 需要執行一次 API 請求,直接把 IO 請求資訊yield出去    result = yield io_info    # 使用 API 返回的result完成剩餘工作    print(result)# 這個函式註冊到ioloop中,用來當有新請求的時候回撥def on_request(request):    # 根據路由對映獲取到使用者寫的業務邏輯函式    handler = get_handler(request)    g = handler(request)    # 首次啟動獲得io_info    io_info = g.send(None)    # io完成回撥函式    def call_back(result):        # 重新啟動生成器        g.send(result)    asyncio.get_event_loop().io_call(io_info, call_back)

上面的例子,使用者寫的 handler 程式碼已經不會被打散到 callback 中,on_request 函式使用 callback 和 ioloop 互動,但它會被實現在 Web 框架中,對使用者不可見。

上面程式碼足以給我們提供用生成器消滅的 callback 的啟發,但侷限性有兩點:

業務邏輯中僅發起一次網路 IO,但實際中往往更多業務邏輯沒有呼叫其他非同步函式(協程),但實際中我們往往會呼叫其他協程0x03 解決完整呼叫鏈

我們來看一個更復雜的例子:

其中 request 執行真正的 IO,func1、func2 僅呼叫。顯然我們的程式碼只能寫成這樣:

def func1():    ret = yield request("http://test.com/foo")    ret = yield func2(ret)    return retdef func2(data):    result = yield request("http://test.com/"+data)    return resultdef request(url):    # 這裡模擬返回一個io操作,包含io操作的所有資訊,這裡用字串簡化代替    result = yield "iojob of %s" % url    return result

對於 request,我們把 IO 操作透過 yield 暴露給框架。

對於 func1 和 func2,呼叫 request 顯然也要加 yield 關鍵字,否則 request 呼叫返回一個生成器後不會暫停,繼續執行後續邏輯顯然會出錯。

這基本就是我們在沒有 yield from、aysnc、await 時代,在 tornado 框架中寫非同步程式碼的樣子。

要執行整個呼叫棧,大概流程如下:

呼叫 func1() 得到生成器呼叫 send(None) 啟動它得到會得到 request("http://test.com/foo") 的結果,還是生成器物件send(None) 啟動由 request() 產生的生成器,會得到 IO 操作,由框架註冊到 ioloop並指定回撥IO 完成後的回撥函式內喚醒 request 生成器,生成器會走到 return 語句結束捕獲異常得到 request 生成器的返回值,將上一層 func1 喚醒,同時又得到 func2()生成器繼續執行...

對演算法和資料結構熟悉的朋友遇到這種前進後退的遍歷邏輯,可以遞迴也可以用棧,因為遞迴使用生成器還做不到,我們可以使用棧,其實這就是「呼叫棧」一詞的由來。

藉助棧,我們可以把整個呼叫鏈上串聯的所有生成器對錶現為一個生成器,對其不斷 send就能不斷得到所有 IO 操作資訊並推動呼叫鏈前進,實現方法如下:

第一個生成器入棧呼叫 send,如果得到生成器就入棧並進入下一輪迭代遇到到 IO 請求 yield 出來,讓框架註冊到 ioloopIO 操作完成後被喚醒,快取結果並出棧,進入下一輪迭代,目的讓上層函式使用 IO 結果恢復執行如果一個生成器執行完畢,也需要和4一樣讓上層函式恢復執行

如果實現出來,程式碼不長但資訊量比較大。

它把整個呼叫鏈對外變成一個生成器,對其呼叫 send,就能整個呼叫鏈中的 IO,完成這些 IO,繼續推動呼叫鏈內的邏輯執行,直到整體邏輯結束:

def wrapper(gen):    # 第一層呼叫 入棧    stack = Stack()    stack.push(gen)    # 開始逐層呼叫    while True:        # 獲取棧頂元素        item = stack.peak()        result = None        # 生成器        if isgenerator(item):            try:                # 嘗試獲取下層呼叫併入棧                child = item.send(result)                stack.push(child)                # result 使用過後就還原為None                result = None                # 入棧後直接進入下次迴圈,繼續向下探索                continue            except StopIteration as e:                # 如果自己執行結束了,就暫存result,下一步讓自己出棧                result = e.value        else:  # IO 操作            # 遇到了 IO 操作,yield 出去,IO 完成後會被用 IO 結果喚醒並暫存到 result            result = yield item        # 走到這裡則本層已經執行完畢,出棧,下次迭代將是呼叫鏈上一層        stack.pop()        # 沒有上一層的話,那整個呼叫鏈都執行完成了,return                if stack.empty():            print("finished")            return result

這可能是最複雜的部分,如果看起來吃力的話,其實只要明白,對於上面示例中的呼叫鏈,它可以實現的效果如下就好了:

w = wrapper(func1())# 將會得到 "iojob of http://test.com/foo"w.send(None)# 上個iojob foo 完成後的結果"bar"傳入,繼續執行,得到 "iojob of http://test.com/bar"w.send("bar")# 上個iojob bar 完成後的結構"barz"傳入,繼續執行,結束。w.send("barz")

有了這部分以後框架再加上配套的程式碼:

# 維護一個就緒列表,存放所有完成的IO事件,格式為(wrapper,result) ready = []def on_request(request):    handler = get_handler(request)    # 使用 wrapper 包裝後,可以只通過 send 處理 IO 了    g = wrapper(func1())    # 把開始狀態直接視為結果為None的就緒狀態    ready.append((g, None))# 讓ioloop每輪迴圈都執行此函式,用來處理的就緒的IOdef process_ready(self):    def call_back(g, result):        ready.append((g, result))     # 遍歷所有已經就緒生成器,將其向下推進    for g, result in self.ready:          # 用result喚醒生成器,並得到下一個io操作        io_job = g.send(result)        # 註冊io操作 完成後把生成器加入就緒列表,等待下一輪處理        asyncio.get_event_loop().io_call(            io_job, lambda result: ready.append((g, result)

這裡核心思想是維護一個就緒列表,ioloop 每輪迭代都來掃一遍,推動就緒的狀態的生成器向下執行,並把新的 IO 操作註冊,IO 完成後再次加入就緒,經過幾輪 ioloop 的迭代一個 handler 最終會被執行完成。

至此,我們使用生成器寫法寫業務邏輯已經可以正常執行。

0x04 提高擴充套件性

如果到這裡能讀懂, Python 的協程原理基本就明白了。

我們已經實現了一個微型的協程框架,標準庫的實現細節跟這裡看起來大不一樣,但具體的思想是一致的。

我們的協程框架有一個限制,我們只能把 IO 操作非同步化,雖然在網路程式設計和 Web 程式設計的世界裡,阻塞的基本只有 IO 操作,但也有一些例外,比如我想讓當前操作 sleep 幾秒,用 time.sleep() 又會讓整個執行緒阻塞住,就需要特殊實現。再比如,可以把一些 CPU 密集的操作透過多執行緒非同步化,讓另一個執行緒通知事件已經完成後再執行後續。

所以,協程最好能與網路解耦開,讓等待網路IO只是其中一種場景,提高擴充套件性。

Python 官方的解決方案是讓使用者自己處理阻塞程式碼,至於是向 ioloop 來註冊 IO 事件還是開一個執行緒完全由你自己,並提供了一個標準「佔位符」Future,表示他的結果等到未來才會有,其部分原型如下:

class Future:    # 設定結果    def set_result(result): pass    # 獲取結果    def result():  pass    #  表示這個future物件是不是已經被設定過結果了    def done(): pass    # 設定在他被設定結果時應該執行的回撥函式,可以設定多個    def add_done_callback(callback):  pass

我們的稍加改動就能支援 Future,讓擴充套件性變得更強。對於使用者程式碼的中的網路請求函式 request:

# 現在 request 函式,不是生成器,它返回futuredef request(url):    # future 理解為佔位符    fut = Future()    def callback(result):        # 當網路IO完成回撥的時候給佔位符賦值        fut.set_result(result)    asyncio.get_event_loop().io_call(url, callback)    # 返回佔位符    return future

現在,request 不再是一個生成器,而是直接返回 future。

而對於位於框架中處理就緒列表的函式:

def process_ready(self):    def callback(fut):        # future被設定結果會被放入就緒列表        ready.append((g, fut.result()))    # 遍歷所有已經就緒生成器,將其向下推進    for g, result in self.ready:          # 用result喚醒生成器,得到的不再是io操作,而是future        fut = g.send(result)        # future被設定結果的時候會呼叫callback        fut.add_done_callback(callback)
0x05 發展和變革

許多年前用 tornado 的時候,大概只有一個 yield 關鍵字可用,協程要想實現,就是這麼個思路,甚至 yield 關鍵字和 return 關鍵字不能一個函數里面出現,你要想在生成器執行完後返回一個值,需要手動 raise 一個異常,雖然效果跟現在 return 一樣,但寫起來還是很彆扭,不優雅。

後來有了 yield from 表示式。它可以做什麼?

通俗地說,它就是做了上面那個生成器 wrapper 所做的事:透過棧實現呼叫鏈遍歷的 ,它是 wrapper 邏輯的語法糖。

有了它,同一個例子你可以這麼寫:

def func1():    # 注意 yield from    ret = yield from request("http://test.com/foo")    # 注意 yield from    ret = yield from func2(ret)     return retdef func2(data):    # 注意 yield from    result = yield from request("http://test.com/"+data)    return result# 現在 request 函式,不是生成器,它返回futuredef request(url):    # 同上基於future實現的request

然後你就不再需要那個燒腦的 wrapper 函數了:

g = func1()# 返回第一個請求的 future g.send(None)# 繼續執行,自動進入func2 並得到第它裡面的那個futureg.send("bar")# 繼續執行,完成呼叫鏈剩餘邏輯,丟擲StopIteration異常g.send("barz")

yield from 直接打通了整個呼叫鏈,已經是很大的進步了,但是用來非同步程式設計看著還是彆扭,其他語言都有專門的協程 async、await 關鍵字了,直到再後來的版本把這些內容用專用的 async、await 關鍵字包裝,才成為今天比較優雅的樣子。

0x06 總結和比較

總的來說, Python 的原生的協程從兩方面實現:

基於 IO 多路複用技術,讓整個應用在 IO 上非阻塞,實現高效率透過生成器讓分散的 callback 程式碼變成同步程式碼,減少業務編寫困難

有生成器這種物件的語言,其 IO 協程實現大抵如此,JavaScript 協程的演進基本一模一樣,關鍵字相同,Future 類比 Promise 本質相同。

但是對於以協程出名的 Go 的協程實現跟這個就不同了,並不顯式地基於生成器。

如果類比的話,可以 Python 的 gevent 算作一類,都是自己實現 runtime,並 patch 掉系統呼叫接入自己的 runtime,自己來排程協程,gevent 專注於網路相關,基於網路 IO 排程,比較簡單,而 Go 實現了完善的多核支援,排程更加複雜和完善,而且創造了基於 channel 新程式設計正規化

10
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Linux的程序、執行緒、檔案描述符是什麼