基本概念
網工在自學Python的時候肯定或多或少聽說過同步(Synchronous)、非同步(Asynchronous)、單執行緒(Single Threaded)、多執行緒(Multi Threaded)、多程序(Multiprocessing)、多工(Multitasking) 、併發(Concurrent)、並行(Parallesim)、協程(Coroutine)、I/O密集型(I/O-bound)、CPU密集型(CPU-bound)等術語,如何區分它們對學習Python的網工來說是一個難點,開篇講concurrent.futures之前先把上述這些術語之間的關係和區別給大家大致捋一下:
1. 同步(Synchronous) VS 非同步(Asynchronous)所謂同步,可以理解為每當系統執行完一段程式碼或者函式後,系統將一直等待該段程式碼或函式返回的值或訊息,直到系統接收到返回的值或訊息後才繼續往下執行下一段程式碼或者函式,在等待返回值或訊息的期間,程式處於阻塞狀態,系統將不做任何事情。而非同步則恰恰相反,系統在執行完一段程式碼或者函式後,不用阻塞性地等待返回的值或訊息,而是繼續執行下一段程式碼或函式,在同一時間段裡執行多個任務(而不是傻傻地等著一件事情做完並且直到結果出來了以後才去做下件事情),將多個任務併發(注意不是並行),從而提高程式的執行效率。如果你有讀過數學家華羅庚的《統籌方法》,一定不會對其中所舉的例子感到陌生:同樣是沏茶的步驟,因為燒水需要一段時間,你不用等水煮沸了過後才來洗茶杯、倒茶葉(類似“同步”),而是在等待燒水的過程中就把茶杯洗好,把茶葉倒好,等水燒開了就能直接泡茶喝了,這裡燒水、洗茶杯、倒茶葉三個任務是在同一個時間段內併發完成的,這就是一種典型的“非同步”。對我們網工來說,paramiko, netmiko, telnetlib, pexpect, ciscolib等第三方模組預設都是基於同步的,基於非同步的模組有asyncio, asyncping, netdev等等(pexpect也支援非同步,但是必須手動調,預設狀態下是同步)。
2. 執行緒(Thread) VS 程序(Process)所謂執行緒是指作業系統能夠進行運算排程的最小單位。執行緒依託於程序存在,是程序中的實際運作單位,一個程序可以有多個執行緒,每條執行緒可以併發執行不同的任務。
3. 單執行緒(Single Threaded) VS 多執行緒 (Multi Threaded)我們也可以引用同樣的例子來說明單執行緒和多執行緒的區別。在上面講到的華羅庚《統籌方法》裡沏茶的這個例子中,如果只有一個人來完成燒水、洗茶杯、倒茶葉三項任務的話,因為此時只有一個勞動力,我們就可以把它看成是單執行緒(同步、非同步IO都是基於單執行緒的)。假設我們能找來三個人分別負責燒水、洗茶杯、倒茶葉,那我們就可以把它看成是多執行緒,每一個勞動力代表一個執行緒,但是由於多執行緒的Global Interpreter Lock機制(俗稱的GIL全域性鎖)的存在,實際上這三個勞動力並不是同時開工的,從併發的效能和效率的角度來看,多執行緒實際上是弱於基於單執行緒的非同步IO的,這點我們已經在之前的兩篇文章裡透過實驗驗證了。
講到單執行緒和多執行緒,還需要講下非同步IO和多執行緒之間的區別:
非同步IO是單執行緒,而多執行緒顧名思義就是多執行緒。非同步IO和多執行緒的區別在於它們的機制不一樣,多執行緒使用的是搶佔式多工處理(Pre-emptive Multitasking) 。在這種搶佔式環境下,作業系統本身具有掌控所有任務(也就是程式)的能力,能隨心所欲地剝奪每個任務的時間片來提供給其他任務,也就是有一個幕後大boss掌控一切。而非同步IO的機制為協作式多工處理(Cooperative Multitasking), 這種機制沒有幕後大boss,在協作式環境下,每個任務被排程的前提是當前任務主動放棄時間片。非同步IO的核心是協程(Coroutine),這個是多執行緒不具備的。協程是一種輕量級執行緒,它是一種特殊的生成器函式,它可以在return語句被執行前停止該函式當前正在執行的任務,並且能在一段時間內間接地將執行權交給另外一個協程函式。協程強調的是合作,而不是多執行緒強調的搶佔,asyncio是Python中唯一支援協程的標準庫。4. 併發(Concurrent) VS 並行 (Parallesim)併發是一個籠統的概念,在Python裡,在邏輯上同時發生的任務有多種稱謂:多執行緒,非同步IO(多工),多程序,它們都是併發的一種。深入地說,只有呼叫多核CPU的多程序(Multiprocessing)是用來處理在物理上同時發生的任務的,這個叫並行。基於單核CPU的多執行緒和非同步IO(多工)同一時間內只能處理一件事件(但是它們有自己獨特的機制來加快處理不同事件的能力),這個叫做併發。
借用某知乎網友舉的例子來說明同步、併發、並行三者之間的區別。
當你吃飯的時候突然有人給你打電話,如果此時你:
不接聽電話,繼續吃飯,等把飯吃完過後再來回電話,這個叫做同步。接聽電話後放下筷子停止進食,等通話完畢後再接著吃,這個叫做併發。接聽電話的同時繼續進食,這個叫做並行。綜上,並行是併發的一種,但是併發並不等於並行。
5. I/O密集型(I/O bound) VS CPU密集型(CPU bound)I/O密集型(I/O bound) 是指不會特別消耗 CPU 資源,但是I/O比較頻繁的任務和操作,比如檔案的讀寫、網路通訊、資料庫訪問等等。
CPU密集型(CPU bound)是指需要大量耗費CPU資源的任務和操作,比如計算、解壓縮、加密解密等等。
非同步和多執行緒適合I/O密集型場景, 多程序適合CPU密集型場景。
上述內容可以歸納總結成下表:併發型別切換機制CPU數量適用場景代表Python庫多執行緒(搶佔式多工處理)作業系統決定何時切換任務1個I/O密集型_thread(已淘汰), threading, cocurrent.futures, nornir非同步(協作式多工處理)任務本身決定何時切換1個I/O密集型asyncio, netdev, aiohttp, aioping, gevent,tornado, twisted多程序 (並行)所有任務同時執行多個CPU密集型multiprocessing
好了,說了那麼多下面進入本篇正文:concurrent.futures。
什麼是Concurrent.futuresConcurrent.futures是Python中的一個標準庫,顧名思義它是併發程式設計的一種,根據Python官方的定義,concurrent.futures是一種高階介面,它同時融合了多執行緒和多程序的特點,並將兩者簡化。Concurrent.futures從Python3.2中被引入,它的誕生時間晚於threading和multiprocessing兩個標準庫,但是早於誕生於Python3.4的asyncio標準庫。
Future物件在concurrent.futures中引入了future這個物件,關於future的中文翻譯目前為止我聽說過未來、期程等,但還沒有一個統一的說法(Python中文官方文件上也沒有說明),所以這裡我們還是用future來講。
主執行緒(或程序)可以透過future物件獲取某一個執行緒(程序)執行的狀態或者某一個任務執行的狀態及返回值。
執行器物件Concurrent.futures中還有一個重要的物件叫做執行器(Executor),分為ThreadPoolExecutor和ProcessPoolExecutor兩種,你基本可以把它倆看成是multiprocessing庫中的執行緒池和程序池(支援多程序的multiprocessing標準庫以前沒講過,我準備下篇文章中再講),前面提到了,concurrent.futures相較於multiprocessing以及threading兩個庫來說它的優勢在於其語法更簡單,學習成本更低。
理論的東西先講到這裡,接下來直接做實驗說明concurrent.futures怎麼用,為了做對比,我會用單執行緒同步、threading、concurrent.futures分別舉三個例子。首先來看最原始的單執行緒同步:
1. 單執行緒同步實驗:import timedef do_something(): print ('休眠1秒') time.sleep(1)start_time = time.perf_counter()do_something()do_something()end_time = time.perf_counter()-start_timeprint (f'總共耗時{round(end_time, 2)}秒')
這裡我們自定義一個叫做do_something()的函式,它的任務很簡單,就是打印出內容“休眠1秒”,然後使用time.sleep(1)來讓程式休眠1秒。然後我們呼叫兩次do_something()函式,打印出耗時,因為是單執行緒同步,所以兩次執行do_something()的總耗時為2.01秒。
2. Threading實驗import threadingimport timedef do_something(): time.sleep(1)start_time = time.perf_counter()threads = []for i in range(1,11): t = threading.Thread(target=do_something, name=f'執行緒{str(i)}') print (f'{t.name}開始執行') print ('休眠1秒') t.start() threads.append(t)for thread in threads: thread.join()end_time = time.perf_counter()-start_timeprint (f'總共耗時{round(end_time, 2)}秒')
這裡我們用threading來總共執行10次do_something(),如果按單執行緒同步的方法的話,總計會耗費10秒+才能完成,而透過threading模組我們使用多執行緒讓這10次do_something()併發執行,所以僅僅只用到了1.05秒便宣告完成。
3. Concurrent.futures實驗(分為三種程式碼)因為涉及到不同的知識點,Concurrent.futures實驗的程式碼我將分三種來寫,首先來看第一段程式碼:
from concurrent.futures import ThreadPoolExecutorimport timedef do_something(seconds): print (f'休眠{seconds}秒') time.sleep(seconds) return '休眠完畢'start_time = time.perf_counter()executor = ThreadPoolExecutor()f1 = executor.submit(do_something, 1) f2 = executor.submit(do_something, 1)print (f1.result()) print (f2.result()) print (f'task1是否完成: {f1.done()}')print (f'task2是否完成: {f1.done()}')end_time = time.perf_counter()-start_timeprint (f'總共耗時{round(end_time,2)}秒')
程式碼講解(只講和concurrent.futures有關的知識點):
這裡我們使用from concurrent.futures import ThreadPoolExecutor來呼叫concurrent.futures的執行緒池處理器物件from concurrent.futures import ThreadPoolExecutor
這裡注意我們在do_something()函式後面加了引數seconds,並在最後面加了一個return '休眠完畢',它們的作用等會兒會講到:
def do_something(seconds): print (f'休眠{seconds}秒') time.sleep(seconds) return '休眠完畢'
在concurent.futures中,ThreadPoolExecutor是Executor下面的兩個子類之一(另一個是ProcessPoolExecutor),它使用執行緒池來執行非同步呼叫,這裡我們將ThreadPoolExecutor()賦值給一個叫做executor的變數。
executor = ThreadPoolExecutor()
然後我們使用ThreadPoolExecutor下面的submit()函式來建立執行緒,submit()函式中包含了要呼叫的任務,即do_something(),以及該函式要呼叫的引數(也就是dosmeting()裡面的seconds),這裡我們放1,表示休眠一秒鐘,所以寫成submit(do_something, 1),因為submit()函式返回的值為future型別的物件,所以這裡我們把future簡寫為f, 分別賦值給f1和f2兩個變數,表示併發執行兩次do_something()函式。f1 = executor.submit(do_something, 1) f2 = executor.submit(do_something, 1)
前面講到了,future物件的作用是幫助主執行緒(或程序)獲取某一個執行緒(程序)執行的狀態或者某一個任務執行的狀態及返回值,為了向大家演示,這裡我對f1和f2兩個future物件分別呼叫了result()和done()兩個函式並將它們的結果打印出來。
print (f1.result()) print (f2.result()) print (f'task1是否完成: {f1.done()}')print (f'task2是否完成: {f1.done()}')
在future中,result()的作用是告知你任務走到了哪一步,是否有異常,如果任務沒有異常正常完成的話,那麼result()會返回自定義函式下面return的內容(也就是我們do_someting()最下面的return'休眠完畢'),如果任務執行過程中遇到異常 ,那麼result()則會返回異常的具體內容。 done()則返回一個布林值,來告訴你任務是否完成,如果完成,則返回True,反之則返回False。
接下來看指令碼執行效果:
可以看到同步需要2秒+完成的兩次任務透過concurrent.futures縮短為1.02秒完成(這個時間不定,如果你多跑指令碼幾次,你會看到1.01秒,1.02秒,1.03秒,1.04秒等幾種,這個和當前電腦的效能有關係)。注意這裡的兩個“休眠完畢”是print (f1.result()) 和print (f2.result())打印出來的, “task1是否完成: True”和“task2是否完成: True”是 print (f'task1是否完成: {f1.done()}')和print (f'task2是否完成: {f1.done()}')打印出來的。
接下來我們再看concurrent.futures的第二段實驗程式碼:from concurrent.futures import ThreadPoolExecutor, as_completedimport timedef do_something(seconds): print (f'休眠{seconds}秒') time.sleep(seconds) return '休眠完畢'start_time = time.perf_counter()executor=ThreadPoolExecutor()results = [executor.submit(do_something, 1) for i in range(10)]for f in as_completed(results): print (f.result())end_time = time.perf_counter()-start_timeprint (f'總共耗時{round(end_time,2)}秒')
程式碼講解(只講和concurrent.futures有關的知識點):
這裡我們從concurrent.futures中新匯入了一個函式叫做as_completed,它的作用後面會講到。from concurrent.futures import ThreadPoolExecutor, as_completed
第一段程式碼缺乏靈活性,因為我們是透過手動的方式建立了f1和f2兩個執行緒,如果我們要併發執行do_something()這個任務100次,顯然我們不可能去手動建立f1, f2, f3......f100這100個變數。這裡我們可以用list comprehension的方式建立一個列表,讓do_something()這個函式併發執行10次。results = [executor.submit(do_something, 1) for i in range(10)]
在concurrent.futures中,as_completed(fs)函式的作用是針對給定的 future 迭代器 fs,在其完成後,返回完成後的迭代器(型別仍然為future)。這裡的fs即為我們建立的列表results。因為concurrent.futures.as_completed(results)返回的值是迭代器,因此我們可以使用for迴圈來遍歷它,然後對其中的元素(均為future型別)呼叫前面講到的result()函式並列印
for f in as_completed(results): print (f.result())
執行程式碼看效果,可以看到10次do_something()任務1.06秒便完成了。
concurrent.futures的第三段實驗程式碼:from concurrent.futures import ThreadPoolExecutorimport timedef do_something(seconds): print (f'休眠{seconds}秒') time.sleep(seconds) return '休眠完畢'start_time = time.perf_counter()executor=ThreadPoolExecutor()sec = [5,4,3,2,1]results = executor.map(do_something, sec)for result in results: print (result)end_time = time.perf_counter()-start_timeprint (f'總共耗時{round(end_time,2)}秒')
程式碼講解(只講和concurrent.futures有關的知識點):
除了透過list comprehension來指定N次併發執行do_something(seconds)外,我們還可以透過concurrent.futures.ThreadPoolExecutor()下面的map()函式來達到目的,map()函式和submit()函式的用法類似,都可以用來建立執行緒,然後併發執行任務並返回future物件,但是它比submit()函式更靈活。它們的區別是:map()函式傳入的第二個引數為一個可遍歷的物件,這個可遍歷的物件裡的元素可以用來作為函式的引數。比如說這裡我們定義了sec = [5,4,3,2,1]這個列表,該列表作為map()函式的第二個引數被傳入(executor.map(do_something, sec)),因為該列表總共有5個元素,因此我們這裡建立並且併發了5個執行緒來分5次執行do_something(seconds),第一次列表中的元素5作為引數被傳入do_something(seconds), 也就是第一個執行緒執行後將休眠5秒,第二次列表中的元素4作為引數被傳入do_something(seconds), 也就是第二個執行緒執行後將休眠4秒,以此類推。executor=ThreadPoolExecutor()sec = [5,4,3,2,1]results = executor.map(do_something, sec)
接下來看指令碼執行效果:因為5次任務是併發執行的,所以程式消耗了5秒,4秒,3秒,2秒,1秒中的最大值,總共耗時5.03秒完成。
編輯於 2 小時前