Python并發(fā)和并行方案
在Python世界有3種并發(fā)和并行方案,如下:
多線程(threading)
多進程(multiprocessing)
異步IO(asyncio)
注: 并發(fā)和并行的區(qū)別先不提,最后會借著例子更好的解釋,另外稍后也會提到 concurrent.futures,不過它不是一種獨立的方案,所以在這里沒有列出來。
這些方案是為了解決不同特點的性能瓶頸。性能問題主要有2種:
CPU密集型(CPU-bound)。這也就是指計算密集型任務(wù),它的特點是需要要進行大量的計算。例如Python內(nèi)置對象的各種方法的執(zhí)行,科學(xué)計算,視頻轉(zhuǎn)碼等等。
I/O密集型(I/O-bound)。凡是涉及到網(wǎng)絡(luò)、內(nèi)存訪問、磁盤I/O等的任務(wù)都是IO密集型任務(wù),這類任務(wù)的特點是CPU消耗很少,任務(wù)的大部分時間都在等待I/O操作完成。例如數(shù)據(jù)庫連接、Web服務(wù)、文件讀寫等等。
如果你不知道一個任務(wù)哪種類型,我的經(jīng)驗是你問問自己,如果給你一個更好更快的CPU它可以更快,那么這就是一個CPU密集的任務(wù),否則就是I/O密集的任務(wù)。
這三個方案中對于CPU密集型的任務(wù),優(yōu)化方案只有一種,就是使用多進程充分利用多核CPU一起完成任務(wù),達到提速的目的。而對于I/O密集型的任務(wù),則這三種方案都可以。
接著借著一個抓取網(wǎng)頁并寫入本地(典型的I/O密集型任務(wù))小例子來挨個拆解對比一下這些方案。先看例子:
import requests
url?
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36' # noqa
}
def fetch(session, page):
with (session.get(f'{url}{page*25}', headers=headers) as r,
open(f'top250-{page}.html', 'w') as f):
f.write(r.text)
def main():
with requests.Session() as session:
for p in range(25):
fetch(session, p)
if __name__ == '__main__':
main()
在這個例子中會抓取豆瓣電影Top250的25個頁面(每頁顯示10個電影),使用requests庫,不同頁面按順序請求,一共花了3.9秒:
? time python io_non_concurrent.py
python io_non_concurrent.py 0.23s user 0.05s system 7% cpu 3.911 total
這個速度雖然看起來還是很好的,一方面是豆瓣做了很好的優(yōu)化,一方面我家的帶寬網(wǎng)速也比較好。接著用上面三種方案優(yōu)化看看效果。
多進程版本
Python解釋器使用單進程,如果服務(wù)器或者你的電腦是多核的,這么用其實是很浪費的,所以可以通過多進程提速:
from multiprocessing import Pool
def main():
with (Pool() as pool,
requests.Session() as session):
pool.starmap(fetch, [(session, p) for p in range(25)])
注: 這里省略到了那些上面已經(jīng)出現(xiàn)的了代碼,只展示改變了的那部分。
使用多進程池,但沒指定進程數(shù)量,所以會按著Macbook的核數(shù)啟動10個進程一起工作,耗時如下:
? time python use_multiprocessing.py
python use_multiprocessing.py 2.15s user 0.30s system 232% cpu 1.023 total
多進程理論上可以有十倍效率的提升,因為10個進程在一起執(zhí)行任務(wù)。當(dāng)然由于任務(wù)數(shù)量是25,不是整數(shù)倍,是無法達到10倍的降低耗時,而且由于抓取太快了,沒有充分顯示多進程方案下的效率提升,所以用時1秒,也就是大約4倍的效率提升。
多進程方案下沒有明顯的缺點,只要機器夠強悍,就可以更快。
多線程版本
Python解釋器不是線程安全的,為此Python設(shè)計了GIL: 獲得GIL鎖才可以訪問線程中的Python對象。所以在任何一個時間,只有一個線程可以執(zhí)行代碼,這樣就不會引發(fā)競態(tài)條件(Race Condition) ,雖然GIL的問題很多,但是GIL卻是還有它存在的優(yōu)點,例如簡化了內(nèi)存管理等等,這些不是本文重點所以就不展開了,有興趣的可以專門去了解。
那么有同學(xué)會問,既然同一時間永遠(yuǎn)只有一個線程在工作,那么多線程可以提高并發(fā)效率的原因是什么呢?
解釋這個問題還是要提GIL。延伸閱讀鏈接1《Understanding the Python GIL》中做了很好的解釋(這里要注意,我們提的方案是Python 3.2新的GIL,而不是Python2的舊版GIL,現(xiàn)在網(wǎng)上有很多針對舊的GIL的描述,其實是過時的,這部分也可以看看延伸閱讀鏈接2的文章幫助理解它們的區(qū)別),我截其中幾張PPT來說明:
在上圖里,本來只有1個線程,所以不需要釋放或者獲得GIL,但是接著出現(xiàn)了第二個線程,這樣就是多個線程,一開始線程2是掛起狀態(tài),因為它沒有GIL。
線程1在一個 cv_wait周期內(nèi)會自愿的放棄GIL,例如出現(xiàn)了I/O阻塞,或者超時了(線程不能一直拿著不放,即便在一個周期內(nèi)沒有出現(xiàn)I/O阻塞也要強制釋放執(zhí)行權(quán),這個默認(rèn)時間是5毫秒,可以通過 sys.setswitchinterval設(shè)置,當(dāng)然設(shè)置前你得知道你在做什么)都會觸發(fā)這個釋放GIL的操作。
這里演示了常規(guī)的例子(非超時被迫釋放),在 cv_wait階段,線程1由于遇到了I/O阻塞,會發(fā)送信號給線程2,此時線程1讓出GIL并掛起,而線程2獲得GIL,如此循環(huán),之后線程2會釋放GIL給線程1。這個PPT在業(yè)界非常知名,建議大家多看看。之后的PPT還列舉了超時的處理,由于和我們這篇文章關(guān)系稍遠(yuǎn)也不展開了,有興趣的接著看。btw,我第一次看這個PPT覺得這個超時時間好可怕,也就是說1秒鐘要最少切換200次,這也太浪費了,所以你可以嘗試在代碼中調(diào)大這個超時時間喲。
通過上面的內(nèi)容,多線程通過GIL的控制,每個線程都得到了更好的執(zhí)行時機,所以不會出現(xiàn)被某個線程任務(wù)一直阻塞,因為如果線程遇到阻塞會自愿讓出GIL讓自己掛起,把機會讓給其他線程,這樣就提高了執(zhí)行任務(wù)總體的效率。多線程模式下最完美的場景就是任何時間點對應(yīng)的線程都在做事,而不是有的線程其實等著被執(zhí)行,但是實際上卻被阻塞著。
我們看一下多線程的方案:
from multiprocessing.pool import ThreadPool
def main():
with (ThreadPool(processes=5) as pool,
requests.Session() as session):
pool.starmap(fetch, [(session, p) for p in range(25)])
這里說明2點:
多進程和多線程例子中我都使用了【池】,這是一個好的習(xí)慣,因為線(進)程過多會帶來額外的開銷,其中包括創(chuàng)建銷毀的開銷、調(diào)度開銷等等,同時也降低了計算機的整體性能。使用線(進)程池維護多個線(進)程,等待監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這樣一方面避免了處理任務(wù)時創(chuàng)建銷毀線程開銷的代價,另一方面避免了線程數(shù)量膨脹導(dǎo)致的過分調(diào)度問題,保證了對內(nèi)核的充分利用。另外用標(biāo)準(zhǔn)庫里的進程池和線程池的實現(xiàn)寫額外代碼極少,而且代碼結(jié)構(gòu)還很像,特別適合寫對比的例子。
processes如果不指定也是和CPU核數(shù)一致的10,但是并不是線程越多越好,因為線程多了,反而出現(xiàn)本來正常有效的執(zhí)行卻被GIL強制釋放,這就造成多余上下文切換反而是一個負(fù)擔(dān)了。
在這個例子中,線程數(shù)為5,這個其實一方面是經(jīng)驗,一方面是多次調(diào)試值的結(jié)果,所以這也暴露了多線程編程中如果稍有不慎會讓優(yōu)化變差,也會存在沒有找到最優(yōu)值得問題,因為GIL控制線程是一個黑盒操作,開發(fā)者無法直接控制,這哪怕對一些相對有經(jīng)驗的Python開發(fā)也非常不友好。
我們看一下時間:
? time python use_threading.py
python use_threading.py 0.62s user 0.24s system 74% cpu 1.157 total
可以看到,多線程方案下比原始方案速度快了一倍以上,但是比多進程方案差一點(事實上我認(rèn)為在真實的例子中會差很多)。這是因為在多進程方案下多核CPU都在獨立工作,而多線程方案一方面由于效率問題下不能使用那么多數(shù)量的線程,而且由于GIL的限制,在不需要被釋放GIL的時候依然被強制釋放,就這么不斷的切換的過程中反而降低了效率,讓效果大打折扣。
concurrent.futures版本
這里也順便提一下 concurrent.futures的方案。其實它不是一個全新的方案,這是在其他語言(例如Java)里早就出現(xiàn)的一種框架,可以通過它控制線(進)程的啟動、執(zhí)行和關(guān)閉。我把它理解為抽象了多進程池和多線程池的代碼,讓開發(fā)者不需要關(guān)注多線程和多進程模塊的具體細(xì)節(jié)和用法。其實理解起來也不難,你可以這么拆解:
其實理解起來也不難,例如ThreadPoolExecutor可以這么拆解: ThreadPoolExecutor=Thread+Pool+Executor,其實就是線程+池+執(zhí)行器。就是預(yù)先創(chuàng)建一個線程池用來被重復(fù)使用,Executor將任務(wù)提交和任務(wù)執(zhí)行進行解耦,它完成線程的調(diào)配(如何以及何時)和任務(wù)的執(zhí)行部分。
如果你想了解它的細(xì)節(jié),我推薦直接看它的源碼文件頭部的注釋,里面對于數(shù)據(jù)流有非常詳細(xì)的說明,可以說比任何技術(shù)文章寫的都要深入準(zhǔn)確了。
這里只演示一下ThreadPoolExecutor的用法:
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def main():
with (ThreadPoolExecutor(max_workers=5) as pool,
requests.Session() as session):
list(pool.map(partial(fetch, session), range(25)))
是不是很熟悉的配方?接口和上面用的進程池線程池都很像,但是要注意 max_workers如果不指定的話數(shù)量是CPU個數(shù)+4,最大為32。它和多線程的用法問題一樣,這個 max_workers需要調(diào)優(yōu)(這里為了對比,所以用了相同的數(shù)值)。
? time python use_executor.py
python use_executor.py 0.63s user 0.32s system 82% cpu 1.153 total
雖然 concurrent.futures是現(xiàn)在更主流的方案,但是在我使用的體驗里,它的效率要略低于直接使用進程池或者線程池的代碼,因為它高度抽象,卻把事情搞得復(fù)雜了,例如用到了對應(yīng)的queue(queue模塊)和信號量(Semaphore),這些反而限制了性能的提升。所以我的建議是,Python初學(xué)者可以用它,但高級開發(fā)者應(yīng)該自己控制并發(fā)實現(xiàn)。
asyncio版本
前面的多線程相關(guān)的方案中,需要開發(fā)者根據(jù)經(jīng)驗或者去實驗,找到一個(或者多個)最優(yōu)的線程數(shù)量,不同的場景這個值區(qū)別是很大的,這對于初學(xué)者很不友好,非常容易陷入【在用多線程,但是用錯了或者用的不夠好】這么一種境地。
后來Python引入了新的并發(fā)模型: aysncio,本小節(jié)給大家解釋下最新的asyncio方案為什么是一個更優(yōu)的選擇。首先還是看《Understanding the Python GIL》里面的一頁PPT:
我們回憶一下,它提到當(dāng)只有單個線程時,實際上不會觸發(fā)GIL,這個獨立的線程可以一直執(zhí)行下去。這也是asyncio找到的切入點: 因為是單進程單線程的,所以理論上不受GIL的限制。在事件驅(qū)動的機制下,可以更好的利用單線程的性能,尤其是通過await關(guān)鍵詞可以讓開發(fā)者自己決定調(diào)度方案,而不是多線程那種由GIL來控制。
那設(shè)想一下,在最美好的情況下,所有await的地方都是可能的I/O阻塞的。那么在執(zhí)行時,遇到I/O阻塞就可以切換協(xié)程,執(zhí)行其他可以繼續(xù)執(zhí)行的任務(wù),所以,這個線程一直都在工作而不會阻塞,可以說利用率達到100%!這是多線程方案下永遠(yuǎn)不可及的。
講到這個,我們再回去重新整理和理解一遍,先出基本理論開始。
協(xié)程
協(xié)程是一種特殊函數(shù),這個函數(shù)在本來的def關(guān)鍵字前面加了async關(guān)鍵字,本質(zhì)上它是生成器函數(shù),可以生成值或者接收外面發(fā)送(通過send方法)來的值,但是它最重要的特點是它可以在需要時保存上下文(或者說狀態(tài)),掛起自己并將控制權(quán)交給調(diào)用者,由于它保存了掛起時的上下文,在未來可以接著被執(zhí)行。
其實在調(diào)用協(xié)程是,它并不會立刻執(zhí)行:
In : async def a():
...:? ? ?print('Called')
...:
In : a()? # 并未執(zhí)行,只是返回了協(xié)程對象
Out:
In : await a()? # 使用await才會真的執(zhí)行
Called
異步和并發(fā)
異步(asynchronous)、非阻塞(non-blocking)、并發(fā)(concurrent)是很容易讓人產(chǎn)生迷惑的詞。結(jié)合asyncio場景,我的理解是:
協(xié)程是異步執(zhí)行的,在asyncio中,協(xié)程可以在等待執(zhí)行結(jié)果時把自己【暫停】,以便讓其他協(xié)程同時運行。
異步讓執(zhí)行不需要等待阻塞的邏輯完成就可以先讓其他代碼同時運行,所以這樣就不會【阻塞】其他代碼,那么這就是【非阻塞】的代碼
使用異步代碼編寫的程序執(zhí)行時,看起來其中的任務(wù)都在同時執(zhí)行和完成(因為會在等待中切換),所以看起來是【并發(fā)】的
事件循環(huán)(EventLoop)
Event Loop這個概念其實我理解了很多年,從Twisted時代開始。我一直覺得它非常神秘復(fù)雜,現(xiàn)在看來其實想多了。對于初學(xué)者,不如換個思路,它的重點就是事件+循環(huán): Loop是一個環(huán),每個任務(wù)作為一個事件放到這個環(huán)上,事件會不斷地循環(huán),在符合條件的情況下觸發(fā)執(zhí)行事件。它的特點如下:
一個事件循環(huán)運行在一個線程中
Awaitables對象(協(xié)程、Task、Future下面都會提到)都可以注冊到事件循環(huán)上
如果協(xié)程中調(diào)用了另外一個協(xié)程(通過await),這個協(xié)程會掛起,發(fā)生上下文切換轉(zhuǎn)而去執(zhí)行另外這個協(xié)程,如此循環(huán)
如果協(xié)程執(zhí)行時遇到I/O阻塞,這個協(xié)程會帶著上下文掛起,然后把控制權(quán)交還給EventLoop
既然是loop。注冊的全部事件執(zhí)行完畢后,循環(huán)會重新開始
Future/Task
asyncio.Future我覺得像Javascript里面的 Promise, 它是一個占位對象,代表一件還沒有做完的事情,在未來才會實現(xiàn)或者完成(當(dāng)然還可能由于內(nèi)部出錯而拋出異常)。它和上面提的 concurrent.futures方案中實現(xiàn)的 concurrent.futures.Futures很像,但是針對asyncio的事件循環(huán)做了很多定制。asyncio.Future它僅僅是一個數(shù)據(jù)的容器。
asyncio.Task是 asyncio.Future的子類,它用于在事件循環(huán)中運行協(xié)程。
在官方文檔中提到了一個非常直觀的例子,我這里改寫它在IPython里面執(zhí)行并說明:
In : async def set_after(fut):? # 創(chuàng)建一個協(xié)程,他會異步的sleep3秒,然后給future對象設(shè)置結(jié)果
...:? ? ?await asyncio.sleep(3)
...:? ? ?fut.set_result('Done')
...:
In : loop = asyncio.get_event_loop()? # 獲取當(dāng)前的事件循環(huán)
In : fut = loop.create_future()? # 在事件循環(huán)中創(chuàng)建一個Future
In : fut? # 此時它還是默認(rèn)的pending狀態(tài),因為沒有調(diào)用它
Out:
In : task = loop.create_task(set_after(fut))? # 在事件循環(huán)中創(chuàng)建(或者說注冊)了一個任務(wù)
In : task? # 馬上輸入它,此時剛創(chuàng)建任務(wù),還在執(zhí)行中
Out:
In : fut? # 馬上輸入它,此時剛創(chuàng)建任務(wù),還沒有執(zhí)行完所以future沒有變化
Out:
In : task? # 過了三秒,任務(wù)執(zhí)行完成了
Out:
In : fut? # Future也已經(jīng)設(shè)置了結(jié)果,所以狀態(tài)是finished
Out:
可以感受到:
Future對象不是任務(wù),就是存放狀態(tài)的一個容器
create_task會讓事件循環(huán)調(diào)度協(xié)程的執(zhí)行
創(chuàng)建任務(wù)可以用 ensure_future和 create_task, ensure_future是一個更高級封裝的函數(shù),但是Python3.7以上版本應(yīng)該使用 create_task
接著是了解await的作用。如果協(xié)程中await一個Future對象,Task會暫停協(xié)程的執(zhí)行并等待Future的完成。而當(dāng)Future完成后,包裝協(xié)程的執(zhí)行將繼續(xù):
In : async def a():
...:? ? ?print('IN a')
...:? ? ?await b()
...:? ? ?await c()
...:? ? ?print('OUT a')
...:
In : async def b():
...:? ? ?print('IN b')
...:? ? ?await d()
...:? ? ?print('OUT b')
...:
...:
In : async def c():
...:? ? ?print('IN c')
...:? ? ?await asyncio.sleep(1)
...:? ? ?print('OUT c')
...:
...:
In : async def d():
...:? ? ?print('IN d')
...:? ? ?await asyncio.sleep(1)
...:? ? ?print('OUT d')
...:
In : asyncio.run(a())
IN a
IN b
IN d
OUT d
OUT b
IN c
OUT c
OUT a
這個例子中,a的入口函數(shù),其中調(diào)用b和c,b又會調(diào)用d。await會讓對應(yīng)的協(xié)程獲取執(zhí)行權(quán)限,協(xié)程內(nèi)await的其他協(xié)程都執(zhí)行完畢才會釋放權(quán)限,所以注意這個更像DFS(深度優(yōu)先搜索),所以執(zhí)行順序是a->b->d->c。
所以這里就得出結(jié)論:
事件循環(huán)負(fù)責(zé)協(xié)程的協(xié)作調(diào)度:事件循環(huán)一次運行一個任務(wù)。當(dāng)一個任務(wù)等待一個Awaitables對象完成時,事件循環(huán)會運行其他任務(wù)、回調(diào)或執(zhí)行 IO 操作。
asyncio方案
在asyncio方案里,凡是涉及I/O阻塞操作的庫都要使用aio生態(tài)中的庫,所以已經(jīng)不能再使用requests庫,而是需要使用aiohttp,另外文件操作需要使用aiofiles。最終代碼如下(這個2個包需要下載再使用):
import aiofiles
import asyncio
import aiohttp
async def fetch(session, page):
? ? r = await session.get(f'{url}{page*25}', headers=headers)
? ? async with aiofiles.open(f'top250-{page}.html', mode='w') as f:
? ? ? ? await f.write(await r.text())
async def main():
? ? loop = asyncio.get_event_loop()
? ? async with aiohttp.ClientSession(loop=loop) as session:
? ? ? ? tasks = [asyncio.ensure_future(fetch(session, p)) for p in range(25)]
? ? ? ? await asyncio.gather(*tasks)
if __name__ == '__main__':
? ? asyncio.run(main())
看一下效率:
? time python use_asyncio.py
python use_asyncio.py? 0.20s user 0.04s system 34% cpu 0.684 total
所以asyncio的優(yōu)點如下:
asyncio用好了,是這些并發(fā)方案中最快的
它支持?jǐn)?shù)千級別的活動連接,這對于websockets和MQTT之類的場景下性能可以表現(xiàn)的很好,而多線程方案中在這個規(guī)模的線程數(shù)量下會出現(xiàn)嚴(yán)重的性能問題。
多線程方案下線程切換是隱式的,我們無法確認(rèn)它何時會切換線程的執(zhí)行權(quán),所以非常容易出現(xiàn)競態(tài)條件(Race Condition)。而asyncio方案里協(xié)程的切換是顯式、明確的,開發(fā)者可以明確地獲知或者指定執(zhí)行的順序
并發(fā)和并行
我之前翻到了一個對比這些方案的說法(延伸鏈接4),其中也提到了并發(fā)和并行,說的特別形象,我加以說明:
多進程。10個廚房,10個廚子,10道菜。也是1個廚房1廚子做1道菜。
多線程。1個廚房,10個廚子,10道菜。因為廚房比較小,只能大家一起擠在里面,事實上是輪著做,而且一個廚師在做的時候其他人只能等著輪到自己。
asyncio。1個廚房,1個廚子,10道菜。聽起來好像這就是一個順序執(zhí)行,但事實上,當(dāng)某道菜需要燉或者其他什么耗時的烹飪方法時,可以同時做其他的菜或者做準(zhǔn)備,最美好的場景是這個廚師一直在忙著做。
對于并發(fā)和并行我推薦看一下延伸閱讀連接3的文章。并發(fā)(Concurrency)允許同時執(zhí)行多個任務(wù),這些任務(wù)可能訪問相同的共享資源,例如硬盤、網(wǎng)絡(luò)以及對應(yīng)的那個單核CPU。既然會出現(xiàn)訪問共享資源,就可能出現(xiàn)競態(tài)條件,所以某個時間點事實上只有一個任務(wù)在執(zhí)行,在本質(zhì)上目標(biāo)是當(dāng)一個任務(wù)被迫等待外部資源時,通過在它們之間切換來防止任務(wù)相互阻塞,系統(tǒng)會有機制保證這些任務(wù)都在推進。并行(Parallelism)是指多個任務(wù)在獨立分區(qū)的資源(如多個CPU內(nèi)核)上并行運行,這樣可以最大限度地利用硬件資源。
審核編輯:劉清
?
評論
查看更多