Python 之路
Changelog:
2019/9/6: 增加一些測試的描述2019/11/10: 增加 ABC2019/12/7: 增加 sharedmemory
一、概述本文起源於我在 Twitter 上釋出的關於 Python 經歷的一系列話題。
出於某些原因,想記錄一下我過去數年使用 Python 的經驗和一些感悟。 畢竟算是一門把我帶入網際網路行業的語言,而我近期已經幾乎不再寫 Py 程式碼, 做一個記錄,也許會對他人起到些微的幫助,也算是紀念與感恩了。
二、摘錄推文地址:https://twitter.com/ppcelery/status/1159620182089728000
最早接觸 py 是 2010 年左右,那之前主要是使用 c、fortran 和 matlab 做數值運算。當時在做一些檔案文字處理時覺得很麻煩,後來看到 NASA 說要用 py 取代 matlab,就去接觸了 py。
python 那極為簡潔與優美的語法給了當時的我極大的震撼,時至今日,寫 py 程式碼對我而言依然是一種帶有藝術意味的享受。
首先開宗明義的說一句:python 並不慢,至少不夠慢。拿一個 web 後端來說,一臺垃圾 4 核虛機,跑 4 個同步阻塞的 django,假設 django 上合理利用執行緒分擔了阻塞操作,假設每節點每秒可以處理 50 個請求(超低估),在白天的 10 小時內就可以處理 720 萬請求。而這種機器跑一天僅需要 20 塊錢。
在學習 Python 以前需要強調的是:基礎語法非常重要。雖然我們都不推崇過多的死記硬背,但是少量必要的死背是以後所有複雜思維活動的基礎,就像五十音對於日語,通假字和常用動名詞對於文言文,你不會就是不行。
一般認為,這包括資料型別(值/引用)、作用域(scope)、keyword、builtin 函式等
關於 Python 版本的選擇,很多公司老專案依然在用 2.6、2.7,新專案的話建議至少選擇 3.6(擁有穩定的 asyncio)。
從 2.7 到 3.4 https://blog.laisky.com/p/whats-new-in-python3-4/從 3.4 到 3.5 https://blog.laisky.com/p/whats-new-in-python3-5/從 3.5 到 3.6 https://blog.laisky.com/p/whats-new-in-python3-6/從 3.6 到 3.7 https://docs.python.org/zh-cn/3/whatsnew/3.7.html關於版本最後在說幾點,建議在本地和伺服器上都透過 pyenv 來管理版本,而不要去動系統自帶的 python(以免引起額外的麻煩) https://blog.laisky.com/p/pyenv/
另外一點就是,如果你想寫一個相容 2、3 的工具包,你可以考慮使用 future http://python-future.org/compatible_idioms.html
最後提醒一下,2to3 這個指令碼是有可能出錯的。
學完基礎就可以開始動手寫程式碼了,這時候應該謹記遵守一些“通行規範”,幾年前給公司內分享時做過一個摘要:
風格指引 https://laisky.github.io/style-guide-cn/style-guides/source-code-style-guides/一些注意事項 https://laisky.github.io/style-guide-cn/style-guides/consensuses/有了一定的實踐經驗後,你應該學習更多的包來提高自己的程式碼水平。
值得學習的內建包 https://pymotw.com/3/值得了解的第三方包 https://github.com/vinta/awesome-python因為 py 的哲學(import this)建議應該有且僅有一個完美的方式做一件事, 所以建議優先採用且完善既有專案而不建議過多的造輪子。
一個小插曲,寫這段的 Tim Peters 就是發明 timsort 的那位。
https://en.wikipedia.org/wiki/Tim_Peters_(software_engineer)
有空時候,建議儘可能的完整讀教材和文件,建立系統性的知識體系,這可以極大的提升你的眼界和思維能力。 我自己讀過且覺得值得推薦的針對 py 的書籍有:
https://docs.python.org/3/learning python核心程式設計改進Python的91個建議Python高手之路Python原始碼剖析資料結構與演算法:Python語言描述如果你真的很喜歡 Python 的話,那我覺得你應該也會喜歡閱讀 PEP,記得幾年前我只要有空就會去翻閱 PEP,這相當於是 Py 的 RFC,裡面記錄了幾乎每一項語法的設計理念與目的。我特別喜歡的 PEP 有:
83148380484 & 3107492: async4403132495 你甚至能學到歷史知識以前聽別人講過一個比喻,靜態語言是吃冒菜,一次性燙好。而動態語言是涮火鍋,吃一點涮一點。
那麼我覺得,GIL 就是僅有一雙筷子的火鍋,即使你菜很多,一次也只能涮一個。
但是,對於 I/O bound 的操作,你不必一直夾著菜,而是可以夾一些扔到鍋裡,這樣就可以同時涮很多,提高並行效率。
GIL 在一個程序內,直譯器僅能同時解釋執行一條指令,這為 py 提供了指令級的執行緒安全, 從很多意義上說,這都極大的簡化了並行程式設計的難度。 對於 I/O 型應用,多執行緒並不會受到多大影響。對於 CPU 型應用,編寫一個基於 Queue 的多程序 worker 其實也就是幾行的事。
需要注意的是,GIL 保證的是指令級執行緒安全,而不是語句級執行緒安全, 也就是說,pyhton 裡的一條語句、一個操作並不一定是執行緒安全的,後文會有程式碼驗證。
from time import sleepfrom concurrent.futures import ProcessPoolExecutor, waitfrom multiprocessing import Manager, QueueN_PARALLEL = 5def worker(i: int, q: Queue) -> None: print(f'worker {i} start') while 1: data = q.get() if data is None: # 採用毒丸(poison pill)方式來結束程序池 q.put(data) print(f'worker {i} exit') return print(f'dealing with data {data}...') sleep(1)def main(): executor = ProcessPoolExecutor(max_workers=N_PARALLEL) # 控制併發量 with Manager() as manager: queue = manager.Queue(maxsize=50) # 控制快取量 workers = [executor.submit(worker, i, queue) for i in range(N_PARALLEL)] for i in range(50): queue.put(i) print('all task data submitted') queue.put(None) wait(workers) print('all done')main()
我經常給新人講,是否能謹慎的對待並行程式設計,是一個區分初級和資深後端開發的分水嶺。業界有一句老話:“沒有正確的並行程式,只有不夠量的並行度”,由此可見並行開發的複雜程度。
我個人認為思考並行時主要是在考慮兩個問題:同步控制和資源用量。
對於同步控制,你在 thread, multiprocessing, asyncio 幾個包裡都會發現一系列的工具:
Lock 互斥鎖RLock 可重入鎖Queue 佇列Condition 條件鎖Event 事件鎖Semaphore 訊號量這個就不展開細談了,屬於另一個語言無關的大領域。(以前寫過一個很簡略的簡介:並行程式設計中的各種鎖)
前文提到了 python 的執行緒安全是偽指令級的,我們可以做一個簡單的驗證, 將多執行緒時的資源競態導致的資料衝突表現出來:
from concurrent.futures import ThreadPoolExecutor, waitfrom typing import Dictfrom threading import EventV_KEY = "key"def worker(evt: Event, data: Dict[str, int]): evt.wait() for _ in range(100000): data[V_KEY] += 1def main(): data = {V_KEY: 0} evt = Event() executor = ThreadPoolExecutor(max_workers=10) fs = [executor.submit(worker, evt, data) for _ in range(10)] evt.set() wait(fs) print(data) # not equal to 10*100000if __name__ == "__main__": main()
對於資源控制,一般來說主要就是兩個地方:
快取區有多大(Queue 長度)併發量有多大(workers 數量)一般來說,前者直接確定了你記憶體的消耗量,最好選擇一個恰好或略高於消費量的數。後者一般直接決定了你的 CPU 使用率,過高的併發量會增加切換開銷,得不償失。
既然提到了 workers,稍微簡單展開一下“池”這個概念。我們經常提到執行緒池、程序池、連線池。說白了就是對於一些可重用的資源,不必每次都建立新的,而是使用完畢後回收留待下一個資料繼續使用。比如你可以選擇不斷地開子執行緒,也可以選擇預先開好一批執行緒,然後透過 queue 來不斷的獲取和處理資料。
所以說使用“池”的主要目的就是減少資源的消耗。另一個優點是,使用池可以非常方便的控制併發度(很多新人以為 Queue 是用來控制併發度的,這是錯誤的,Queue 控制的是快取量)。
對於連線池,還有另一層好處,那就是埠資源是有限的,而且回收埠的速度很慢,你不斷的建立連線會導致埠迅速耗盡。
這裡做一個用語的訂正。Queue 控制的應該是緩衝量(buffer),而不是快取量(cache)。一般來說,我們習慣上將寫入佇列稱為緩衝,將讀取佇列稱為快取(有源)。
對前面介紹的 python 中程序/執行緒做一個小結,執行緒池可以用來解決 I/O 的阻塞,而程序可以用來解決 GIL 對 CPU 的限制(因為每一個程序內都有一個 GIL)。所以你可以開 N 個(小於等於核數)程序池,然後在每一個程序中啟動一個執行緒池,所有的執行緒池都可以訂閱同一個 Queue,來實現真正的多核並行。
非常簡單的描述一下程序/執行緒,對於作業系統而言,可以認為程序是資源的最小單位(在 PCB 內儲存如圖 1 的資料)。而執行緒是排程的最小單位。同一個程序內的執行緒共享除棧和暫存器外的所有資料。
所以在開發時候,要小心程序內多執行緒資料的衝突,也要注意多程序資料間的隔離(需要特別使用程序間通訊)
作業系統筆記:程序作業系統筆記:排程再簡單的補充一下,程序間通訊的手段有:管道、訊號、訊息佇列、訊號量、共享記憶體和套接字。不過在 Py 裡,單機上最常用的程序間通訊就是 multiprocessing 裡的 Queue 和 sharedctypes。
順帶一提,因為 CPython 的 refcnt 機制,所以 COW(copy on write)並不可靠。
人們在見到別人的“錯誤寫法”時,傾向於無視或吐槽諷刺。但是這個行為除了讓自己爽一下外沒有任何意義,不懂的還是不懂,最後真正發揮影響的還是那些能夠描繪一整條學習路徑的方法。
關於程序間的記憶體隔離,補充一個簡單直觀的例子。可以看到普通變數 normal_v 在兩個子程序內變成了兩個獨立的變數(都輸出 1),而共享記憶體的 shared_v 仍然是同一個變數,分別輸出了 1 和 2。
from time import sleepfrom concurrent.futures import ProcessPoolExecutor, waitfrom multiprocessing import Manager, Queuefrom ctypes import c_int64def worker(i, normal_v, shared_v): normal_v += 1 # 因為程序間記憶體隔離,所以每個程序都會得到 1 shared_v.value += 1 # 因為使用了共享記憶體,所以會分別得到 1 和 2 print(f'worker[{i}] got normal_v {normal_v}, shared_v {shared_v.value}')def main(): executor = ProcessPoolExecutor(max_workers=2) with Manager() as manager: lock = manager.Lock() shared_v = manager.Value(c_int64, 0, lock=lock) normal_v = 0 workers = [executor.submit(worker, i, normal_v, shared_v) for i in range(2)] wait(workers) print('all done')main()
順帶一提,在 3.8 裡有了 sharedmemory:
"""shared memory=============Output::: worker[0] got normal_v 1, shared_v 1 worker[2] got normal_v 1, shared_v 2 worker[3] got normal_v 1, shared_v 3 worker[1] got normal_v 1, shared_v 4 worker[4] got normal_v 1, shared_v 5 worker[5] got normal_v 1, shared_v 6 worker[6] got normal_v 1, shared_v 7 worker[8] got normal_v 1, shared_v 8 worker[7] got normal_v 1, shared_v 9 worker[9] got normal_v 1, shared_v 10 all done"""from traceback import print_excfrom time import sleepfrom concurrent.futures import ProcessPoolExecutor, waitfrom multiprocessing import Event, RLockfrom multiprocessing.shared_memory import ShareableListfrom multiprocessing.managers import SharedMemoryManager, SyncManagerfrom ctypes import c_int64def worker(l: RLock, evt: Event, i: int, normal_v: int, shared_v: ShareableList): try: evt.wait() # 確保任務同時開始 normal_v += 1 # 因為程序間記憶體隔離,所以每個程序都會得到 1 with RLock(): # 需要自行處理鎖 shared_v[0] += 1 # 因為使用了共享記憶體,所以會得到連續累加的值 print(f"worker[{i}] got normal_v {normal_v}, shared_v {shared_v[0]}") except Exception: print_exc() raisedef main(): executor = ProcessPoolExecutor(max_workers=10) with SharedMemoryManager() as smm, SyncManager() as sm: evt = sm.Event() shared_v = smm.ShareableList([0]) normal_v = 0 workers = [ executor.submit(worker, sm.RLock(), evt, i, normal_v, shared_v) for i in range(10) ] evt.set() wait(workers) [f.result() for f in workers] print("all done")if __name__ == "__main__": main()
從過去的工作經驗中,我總結了一個簡單粗暴的規矩:如果你要使用多程序,那麼在程式啟動的時候就把程序池啟動起來,然後需要任何資源都請在程序內自行建立使用。如果有資料需要共享,一定要顯式的採用共享記憶體或 queue 的方式進行傳遞。
見過太多在程序間共享不該共享的東西而導致的極為詭異的資料行為。
最早,一臺機器從頭到尾只能幹一件事情。
後來,有了分時系統,我們可以開很多程序,同時幹很多事。
但是程序的上下文切換開銷太大,所以又有了執行緒,這樣一個核可以一直跑一個程序,而僅需要切換程序內子執行緒的棧和暫存器。
直到遇到了 C10K 問題,人們發覺切換幾萬個執行緒還是挺重的,是否能更輕?
這裡簡單的展開一下,記憶體在作業系統中會被劃分為核心態和使用者態兩部分,核心態供核心執行,使用者態供普通的程式用。
應用程式透過系統 API(俗稱 syscall)和核心發生互動。拿常見的 HTTP 請求來說,其實就是一次同步阻塞的 socket 呼叫,每次呼叫都會導致執行緒阻塞等待核心響應(核心陷入)。
而被阻塞的執行緒就會導致切換的發生。所以自然會問,能不能減少這種切換開銷?換句話說,能不能在一個地方把事情做完,而不要切來切去的。
這個問題有兩個解決思路,一是把所有的工作放進核心去做(如 BPF)。
另一個思路就是把儘可能多的工作放到使用者態來做。這需要核心介面提供額外的支援:非同步系統呼叫。
如 socket 這樣的呼叫就支援非阻塞呼叫,呼叫後會拿到一個未就緒的 fd,將這個 fd 交給負責管理 I/O 多路複用的 selector, 再註冊好需要監聽的事件和回撥函式(或者像 tornado 一樣採用定時 poll), 就可以在事件就緒(如 HTTP 請求的返回已就緒)時執行相關函式。
https://github.com/tornadoweb/tornado/blob/f1824029db933d822f5b0d02583e4e6137f2bfd2/tornado/ioloop.py#L746
這樣就可以實現在一個執行緒內,啟動多個曾經會導致執行緒被切換的系統呼叫, 然後在一個執行緒內監聽這些呼叫的事件,誰先就緒就處理誰,將切換的開銷降到了最小。
有一個需要特別注意的要點,你會發現主執行緒其實就是一個死迴圈, 所有的呼叫都發生在這個迴圈之內。所以,你寫的程式碼一定要避免任何阻塞。
聽上去很美好,這是個萬能方案嗎?
很可惜不是的,最直接的一個問題是,並不是所有的 syscall 都提供了非同步方法,對於這種呼叫,可以用執行緒池進行封裝。對於 CPU 密集型呼叫,可以用程序池進行封裝,asyncio 裡提供了 executor 和協程進行聯動的方法,這裡提供一個執行緒池的簡單例子,程序池其實同理。
from time import sleepfrom asyncio import get_event_loop, sleep as asleep, gather, ensure_futurefrom concurrent.futures import ThreadPoolExecutor, wait, Futurefrom functools import wrapsexecutor = ThreadPoolExecutor(max_workers=10)ioloop = get_event_loop()def nonblocking(func) -> Future: @wraps(func) def wrapper(*args): return ioloop.run_in_executor(executor, func, *args) return wrapper@nonblocking # 用執行緒池封裝沒法協程化的普通阻塞程式def foo(n: int): """假裝我是個很耗時的阻塞呼叫""" print('start blocking task...') sleep(n) print('end blocking task')async def coroutine_demo(n: int): """我就是個普通的協程""" # 協程內不能出現任何的阻塞呼叫,所謂一朝協程,永世協程 # 那我偏要調一個普通的阻塞函式怎麼辦? # 最簡單的辦法,套一個執行緒池… await foo(n)async def coroutine_demo_2(): print('start coroutine task...') await asleep(1) print('end coroutine task')async def coroutine_main(): """一般我們會寫一個 coroutine 的 main 函式,專門負責管理協程""" await gather( coroutine_demo(1), coroutine_demo_2() )def main(): ioloop.run_until_complete(coroutine_main()) print('all done')main()
Python3 asyncio 簡介上面的例子全部都基於 3.7,如果你還在使用 Py2,那麼你也可以透過 gevent、tornado 用上協程。
我個人傾向於 tornado,因為更為白盒,而且寫法和 3 接近,如果你也贊同,那麼可以試試我以前給公司寫的 kipp 庫,基於 tornado 封裝了更多的工具。
https://github.com/Laisky/kipp/blob/2bc5bda6e7f593f89be662f46fed350c9daabded/kipp/aio/__init__.py
Gevent Demo:
#!/usr/bin/env python# -*- coding: utf-8 -*-"""Gevent Pool & Child Tasks=========================You can use gevent.pool.Pool to limit the concurrency of coroutines.And you can create unlimit subtasks in each coroutine.Benchmark========= cost 2.675039052963257s for url http://httpbin.org/ cost 2.66813588142395s for url http://httpbin.org/ip cost 2.674264907836914s for url http://httpbin.org/user-agent cost 2.6776888370513916s for url http://httpbin.org/get cost 3.97711181640625s for url http://httpbin.org/headers total cost 3.9886841773986816s"""import timeimport geventfrom gevent.pool import Poolimport gevent.monkeypool = Pool(10) # set the concurrency limitgevent.monkey.patch_socket()try: import urllib2except ImportError: import urllib.request as urllib2TARGET_URLS = ( 'http://httpbin.org/', 'http://httpbin.org/ip', 'http://httpbin.org/user-agent', 'http://httpbin.org/headers', 'http://httpbin.org/get',)def demo_child_task(): """Sub coroutine task""" gevent.sleep(2)def demo_task(url): """Main coroutine You should wrap your each task into one entry coroutine, then spawn its own sub coroutine tasks. """ start_ts = time.time() r = urllib2.urlopen(url) demo_child_task() print('cost {}s for url {}'.format(time.time() - start_ts, url))def main(): start_ts = time.time() pool.map(demo_task, TARGET_URLS) print('total cost {}s'.format(time.time() - start_ts))if __name__ == '__main__': main()
tornado demo:
#!/usr/bin/env python# -*- coding: utf-8 -*-"""cost 0.5578329563140869s, get http://httpbin.org/getcost 0.5621621608734131s, get http://httpbin.org/ipcost 0.5613000392913818s, get http://httpbin.org/user-agentcost 0.5709919929504395s, get http://httpbin.org/cost 0.572376012802124s, get http://httpbin.org/headerstotal cost 0.5809519290924072s"""import timeimport tornadoimport tornado.webimport tornado.httpclientTARGET_URLS = [ 'http://httpbin.org/', 'http://httpbin.org/ip', 'http://httpbin.org/user-agent', 'http://httpbin.org/headers', 'http://httpbin.org/get',]@tornado.gen.coroutinedef demo_hanlder(ioloop): for i, url in enumerate(TARGET_URLS): demo_task(url, ioloop=ioloop)@tornado.gen.coroutinedef demo_task(url, ioloop=None): start_ts = time.time() http_client = tornado.httpclient.AsyncHTTPClient() r = yield http_client.fetch(url) # r is the response object end_ts = time.time() print('cost {}s, get {}'.format(end_ts - start_ts, url)) TARGET_URLS.remove(url) if not TARGET_URLS: ioloop.stop()def main(): start_ts = time.time() ioloop = tornado.ioloop.IOLoop.instance() ioloop.add_future(demo_hanlder(ioloop), lambda f: None) ioloop.start() # total cost will equal to the longest task print('total cost {}s'.format(time.time() - start_ts))if __name__ == '__main__': main()
kipp demo:
from time import sleepfrom kipp.aio import coroutine2, run_until_complete, sleep, return_in_coroutinefrom kipp.utils import ThreadPoolExecutor, get_loggerexecutor = ThreadPoolExecutor(10)logger = get_logger()@coroutine2def coroutine_demo(): logger.info('start coroutine_demo') yield sleep(1) logger.info('coroutine_demo done') yield executor.submit(blocking_func) return_in_coroutine('yeo')def blocking_func(): logger.info('start blocking task...') sleep(1) logger.info('blocking task return') return 'hard'@coroutine2def coroutine_main(): logger.info('start coroutine_main') r = yield coroutine_demo() logger.info('coroutine_demo return: {}'.format(r)) yield sleep(1) return_in_coroutine('coroutine_main yo')def main(): f = coroutine_main() run_until_complete(f) logger.info('coroutine_main return: {}'.format(f.result()))if __name__ == '__main__': main()
使用 tornado 時需要注意,因為它依賴 generator 來模擬協程,所以函式無法返回,只能用 raise gen.Return 來模擬。3.4 裡引入了 yield from 到 3.6 的 async/await 才算徹底解決了這個問題。還有就是小心 tornado 裡的 Future 不是執行緒安全的。
至於 gevent,容我吐個槽,求別再提 monkey_patch 了…
https://docs.python.org/3/library/asyncio-task.html 官方文件對於 asyncio 的描述很清晰易懂,推薦一讀。 一個小提示,async 函式被呼叫後會建立一個 coroutine,這時候該協程並不會執行,需要透過 ensure_future 或 create_task 方法生成 Task 後才會被排程執行。
另外,一個程序內不要建立多個 ioloop。
做一個小結,一個簡單的做法是,啟動程式後,分別建立一個程序池(程序數小於等於可用核數)、執行緒池和 ioloop,ioloop 負責排程一切的協程,遇到阻塞的呼叫時,I/O 型的扔進執行緒池,CPU 型的扔進程序池,這樣程式碼邏輯簡單,還能儘可能的利用機器效能。 一個簡單的完整示例:
"""✗ python process_thread_coroutine.py[2019-08-11 09:09:37,670Z - INFO - kipp] - main running...[2019-08-11 09:09:37,671Z - INFO - kipp] - coroutine_main running...[2019-08-11 09:09:37,671Z - INFO - kipp] - io_blocking_task running...[2019-08-11 09:09:37,690Z - INFO - kipp] - coroutine_task running...[2019-08-11 09:09:37,691Z - INFO - kipp] - coroutine_error running...[2019-08-11 09:09:37,691Z - INFO - kipp] - coroutine_error end, cost 0.00s[2019-08-11 09:09:37,693Z - INFO - kipp] - cpu_blocking_task running...[2019-08-11 09:09:38,674Z - INFO - kipp] - io_blocking_task end, cost 1.00s[2019-08-11 09:09:38,695Z - INFO - kipp] - coroutine_task end, cost 1.00s[2019-08-11 09:09:39,580Z - INFO - kipp] - cpu_blocking_task end, cost 1.89s[2019-08-11 09:09:39,582Z - INFO - kipp] - coroutine_main got [None, AttributeError('yo'), None, None][2019-08-11 09:09:39,582Z - INFO - kipp] - coroutine_main end, cost 1.91s[2019-08-11 09:09:39,582Z - INFO - kipp] - main end, cost 1.91s"""from time import sleep, timefrom asyncio import get_event_loop, sleep as asleep, gather, ensure_future, iscoroutinefrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, waitfrom functools import wrapsfrom kipp.utils import get_loggerlogger = get_logger()N_FORK = 4N_THREADS = 10thread_executor = ThreadPoolExecutor(max_workers=N_THREADS)process_executor = ProcessPoolExecutor(max_workers=N_FORK)ioloop = get_event_loop()def timer(func): @wraps(func) def wrapper(*args, **kw): logger.info(f"{func.__name__} running...") start_at = time() try: r = func(*args, **kw) finally: logger.info(f"{func.__name__} end, cost {time() - start_at:.2f}s") return wrapperdef async_timer(func): @wraps(func) async def wrapper(*args, **kw): logger.info(f"{func.__name__} running...") start_at = time() try: return await func(*args, **kw) finally: logger.info(f"{func.__name__} end, cost {time() - start_at:.2f}s") return wrapper@timerdef io_blocking_task(): """I/O 型阻塞呼叫""" sleep(1)@timerdef cpu_blocking_task(): """CPU 型阻塞呼叫""" for _ in range(1 << 26): pass@async_timerasync def coroutine_task(): """非同步協程呼叫""" await asleep(1)@async_timerasync def coroutine_error(): """會丟擲異常的協程呼叫""" raise AttributeError("yo")@async_timerasync def coroutine_main(): ioloop = get_event_loop() r = await gather( coroutine_task(), coroutine_error(), ioloop.run_in_executor(thread_executor, io_blocking_task), ioloop.run_in_executor(process_executor, cpu_blocking_task), return_exceptions=True, ) logger.info(f"coroutine_main got {r}")@timerdef main(): get_event_loop().run_until_complete(coroutine_main())if __name__ == "__main__": main()
學到這一步,你已經能夠熟練的運用協程、執行緒、程序處理不同型別的任務。接著拿上面提到的垃圾 4 核虛機舉例,你現在應該可以比較輕鬆的實現達到 1k QPS 的服務,在白天十小時裡可以處理超過一億請求,費用依然僅 20元/天。你還有什麼藉口說是因為 Python 慢呢?
人們在聊到語言/框架/工具效能時,考慮的是“當程式設計師儘可能的最佳化後,工具效能會成為最終的瓶頸,所以我們一定要選一個最快的”。
但事實上是,程式設計師本身才是效能的最大瓶頸,而工具真正體現出來的價值,是在程式設計師很爛時,所能提供的兜底效能。
如果你覺得自己並不是那個瓶頸,那也沒必要來聽我講了
在效能最佳化上有兩句老話:
一定要針對瓶頸做最佳化過早最佳化是萬惡之源所以我覺得要開放、冷靜地看待工具的效能。在一套完整的業務系統中,框架工具往往是耗時佔比最低的那個,在擴容、快取技術如此發達的今天,你已經很難說出工具效能不夠這樣的話了。
成長的空間很大,多在自己身上找原因。
一個經驗觀察,即使在工作中不斷的實際練習,對於非同步協程這種全新的思維模式,從學會到能在工作中熟練運用且不犯大錯,比較聰明的人也需要一個月。
換成 go 也不會好很多,await 也能實現同步寫法,而且你依然需要面對我前文提到過的同步控制和資源用量兩個核心問題。
簡單提一下效能分析,py 可以利用 cProfile、line_profiler、memory_profiler、vprof、objgraph 等工具生成耗時、記憶體佔用、呼叫關係圖、火焰圖等。
關於效能分析領域的更多方法論和理念,推薦閱讀《效能之巔》(過去做的關於效能之巔的部分摘抄 https://twitter.com/ppcelery/status/1051832271001382912)。
必須強調:最佳化必須要有足夠的資料支撐,包括最佳化前和最佳化後。
效能最佳化其實是一個非常複雜的領域,雖然上面提到的工具可以生成各式各樣的看上去就很厲害的圖,但是最佳化不是簡單的你看哪慢就去改哪,而是需要有極其紮實的基礎知識和全域性思維的。
而且,上述工具得出的指標,在效能尚未逼近極限時,可能會有相當大的誤導性,使用的時候也要小心。
有一些較為普適的經驗:
I/O 越少越好,儘量在記憶體裡完成記憶體分配越少越好,儘量複用變數儘可能少,gc 友好儘量提高區域性性儘量用內建函式,不要輕率造輪子下列方法如非瓶頸不要輕易用:
迴圈展開記憶體對齊zero copy(mmap、sendfile)測試是開發人員很容易忽視的一個環節,很多人認為交給 QA 即可,但其實測試也是開發過程中的一個重要組成部分,不但可以提高軟體的交付質量,還可以增進你的程式碼組織能力。
最常見的劃分可以稱之為黑盒 & 白盒,前者是隻針對介面行為的測試,後者是深入瞭解實現細節,針對實現方式進行的針對性測試。
對 Py 開發者而言,最簡單實用的工具就是 unitest.TestCase 和 pytest,在包內任何以 test*.py 命名的檔案,內含 TestCase 類的以 test* 命名的方法都會被執行。
測試方法也很簡單,你給定入參,然後呼叫想要測試的函式,然後檢查其返回是否符合需求,不符合就丟擲異常。
https://docs.pytest.org/en/latest/
"""test_demo.py"""from unittest import TestCasefrom typing import Listdef demo(l: List[int]) -> int: return l[0]class DemoTestCase(TestCase): def setUp(self): print("first run") def tearDown(self): print("last run") def test_demo(self): data = [] self.assertRaises(IndexError, demo, data)
開始寫測試後,你才會意識到你的很多函式非常難以測試。因為它們可能有巢狀呼叫,可能有內含狀態,可能有外部依賴等等。
但是需要強調的是,這不但不是不寫測試的理由,這其實正是寫測試的目的!
透過努力地寫測試,會強迫你開始編寫精簡、功能單一、無狀態、依賴注入、避免鏈式呼叫的函式。
一個簡單直觀的“好壞對比”,鏈式呼叫的函式很難測試,它內含了太多其他函式的呼叫,一旦測試就變成了一個“整合測試”。而將其按照步驟一一拆分後,就可以對其進行精細化的“單元測試”,這可以契合你開發的步伐,步步為營穩步推進。
"""這是很糟糕的鏈式呼叫"""def main(): func1()def func1(): return func2()def func2(): return func3()def func3(): return "shit""""這樣寫會好很多"""def step1(): return "yoo"def step2(v): return f"hello, {v}"def step3(v): return f"you know nothing, {v}"def main(): r1 = step1() r2 = step2(r1) step3(r2)
順帶一提,對於一些無法繞開的外部呼叫,如網路請求、資料庫請求。單元測試的準則之一就是“排除一切外部因素”,你不應該發起任何真正的外部呼叫的,因為這會引入不可控的資料。 正確做法是透過依賴注入 Mock 物件,或者透過 patch 去改寫呼叫的介面物件。
以前寫過一篇簡介:https://blog.laisky.com/p/unittest-mock/
單元測試應該兼顧黑盒、白盒。你既應該編寫面對介面的案例,也應該儘可能的試探內部的實現路徑(增加覆蓋率)。
你還可以逐漸地把線上遇到的各種 bug 都編寫為案例,這些案例會成為專案寶貴的財富,為迴歸測試提供強有力的支援。而且有這麼多測試案例提供保護,coding 的時候也會安心很多。
在單元測試的基礎上,人們發展出了 TDD,但是在實踐的過程中,發現有些“狡猾的”開發會針對案例的特例進行程式設計。 為此,人們決定應該拋棄形式,迴歸本源,從方法論的高度來探尋測試的道路。其中光明一方,就是 PBT,試圖透過描述問題的實質,來自動生成測試案例。
一篇簡介:https://blog.laisky.com/p/pbt-hypothesis/
另一個黑暗的方向就是 Fuzzing,它乾脆完全忽略函式的實現,貫徹黑盒到底,透過遺傳演算法,隨機的生成入參,以測試到宇宙盡頭的決心,對函式進行死纏爛打,發掘出正常人根本想不到猜不著的犄角旮旯裡的 bug。
py 是一門動態解釋型語言,使得你幾乎可以寫出各種想得到的寫法,但是能夠寫和應該寫是兩回事。雖然 py 支援多樣化的寫法,但是你還是應該有意識的限制自己的行為,按照一定的規範進行編碼,以儘可能的在條件允許的情況下,提高程式碼的穩健型和可維護性。
一些常見的規範不用多講,比如:
不要寫 magic value,多使用常量(如列舉、或 XXX_VAR_NAME 這種寫法)不同型別的引數或返回不要放在 list 裡儘可能多用 key-value 型別,而不是到處都在用下標取值儘可能多用不可變型別,函式儘可能做到冪等此外,“靜態化”是一種提高程式可讀性和可維護性的重要手段,比如在函式定義時指明 type-hints,寫清楚引數和返回值的型別。 以及對於 OOP,也可以寫出定義明確的的“介面-實現”型程式碼,比如按照 abc -> BaseClass -> Class -> Instance 的形式進行定義,就會規範很多。
from abc import ABC, abstractmethod, abstractpropertyclass ThingsABC(ABC): @abstractproperty def etable(self): passclass BaseFood(ThingsABC): etable = Trueclass BirdABC(ABC): """ 在抽象類中定義抽象方法和屬性, 例項化的時候會自動檢查這些抽象方法和方法必須已被實現,否則會丟擲一場。 具體實現的方法多種多樣,比如直接在類裡定義,或者多繼承等等 """ @abstractmethod def fly(self): pass @abstractmethod def eat(self, food: BaseFood): passclass BaseBird(BirdABC): """ 可以定義一些鳥類都應該有的通用屬性和方法 """ passclass Robin(BaseBird): """ 定義一些知更鳥特有的屬性和方法 """# def fly(self):# pass# def eat(self, foold: BaseFood):# passr = Robin() # 會報錯,因為沒有實現抽象方法# ---------------------------------------------------------------------------# TypeError Traceback (most recent call last)# <ipython-input-1-a4984ec6275b> in <module># 45# 46# ---> 47 r = Robin() # 會報錯,因為沒有實現抽象方法# TypeError: Can't instantiate abstract class Robin with abstract methods eat, fly