了解完協(xié)程的誕生背景以及基本概念之后,,我們來(lái)探究一下 Python 的協(xié)程是如何實(shí)現(xiàn)的,。首先 Python 的協(xié)程和生成器之間有著非常緊密的聯(lián)系,因此在看協(xié)程之前,,強(qiáng)烈推薦你先看本系列的第 65 和 第 66 篇文章,。 另外,關(guān)于協(xié)程的基礎(chǔ)知識(shí),,以及 asyncio 模塊的一些基本用法這里就不再贅述了,,我們重點(diǎn)看協(xié)程的實(shí)現(xiàn)原理。 使用 async def 定義出來(lái)的函數(shù)叫協(xié)程函數(shù),,當(dāng)然協(xié)程函數(shù)本質(zhì)上也是一個(gè)函數(shù),。記得我們說(shuō)過(guò),函數(shù)的 __code__ 里面有一個(gè) co_flags 字段,,它可以用來(lái)判斷函數(shù)的種類,。
def func(): pass
async def coro_func(): pass
print(func.__class__) # <class 'function'> print(coro_func.__class__) # <class 'function'>
# 兩者都是 <class 'function'> 類型 print(func.__code__.co_flags & 0x80) # 0 print(coro_func.__code__.co_flags & 0x80) # 128
# 但如果是協(xié)程函數(shù),那么 co_flags & 0x80 為真
普通函數(shù)調(diào)用之后,,會(huì)將內(nèi)部的字節(jié)碼全部執(zhí)行完畢,;協(xié)程函數(shù)調(diào)用之后,不會(huì)執(zhí)行內(nèi)部的字節(jié)碼,,而是返回一個(gè)協(xié)程對(duì)象,,簡(jiǎn)稱協(xié)程,協(xié)程需要扔到事件循環(huán)里面執(zhí)行,。 那么下面看看協(xié)程的底層結(jié)構(gòu),,開(kāi)頭說(shuō)協(xié)程和生成器有著很緊密的聯(lián)系,看完之后你就知道這個(gè)聯(lián)系有多緊密了,。協(xié)程對(duì)應(yīng)的結(jié)構(gòu)體定義在 genobject.h 中,,咦,這不是生成器相關(guān)的頭文件嗎,? 紅色框框內(nèi)的部分是生成器的底層結(jié)構(gòu),,不要眨眼,再來(lái)看看協(xié)程的底層結(jié)構(gòu),。 所以我們看到所謂協(xié)程,,本質(zhì)上還是基于生成器實(shí)現(xiàn)的,只是字段名不一樣,。比如生成器有 gi_frame, gi_running, gi_code 等字段,,協(xié)程則是 cr_frame, cr_running, cr_code。
而且協(xié)程也有 send,、close,、throw 等方法,我們來(lái)對(duì)比一下生成器和協(xié)程。 def gen_func(): yield return "gen result"
async def coro_func(): return f"coroutine result"
# 先來(lái)看看生成器 gen = gen_func() gen.__next__() try: gen.__next__() except StopIteration as e: print(e.value) # gen result # 當(dāng)生成器 return 時(shí),,會(huì)拋出一個(gè) StopIteration # 并將返回值設(shè)置在里面,,因此 return 只是一個(gè)語(yǔ)法糖 # 對(duì)于協(xié)程亦是如此 coro = coro_func() try: coro.__await__().__next__() except StopIteration as e: print(e.value) # coroutine result
生成器就不說(shuō)了,直接看協(xié)程,。首先基于 async def 定義的協(xié)程函數(shù),,內(nèi)部不可以出現(xiàn) yield from,否則會(huì)出現(xiàn)語(yǔ)法錯(cuò)誤,,這是一個(gè)編譯階段就能檢測(cè)出來(lái)的錯(cuò)誤,。但是出現(xiàn) yield 是可以的,只不過(guò)此時(shí)就不叫協(xié)程函數(shù)了,,而是叫異步生成器函數(shù),。 而對(duì)于協(xié)程而言,只要運(yùn)行一次 __next__,,那么就會(huì)將內(nèi)部的邏輯全部執(zhí)行完,。但需要注意,協(xié)程本身是沒(méi)有 __next__ 方法的,,它需要先調(diào)用 __await__,。那么問(wèn)題來(lái)了,這個(gè) __await__ 又是什么呢,?下面來(lái)解釋一下,。 我們知道,如果想在一個(gè)協(xié)程內(nèi)部驅(qū)動(dòng)另一個(gè)協(xié)程執(zhí)行,,那么可以使用 await 關(guān)鍵字,。
async def coro_func(): return f"coroutine result"
async def main(): result = await coro_func() return f"來(lái)自 coro_func 的返回值: {result}"
try: main().__await__().__next__() except StopIteration as e: print(e) # 來(lái)自 coro_func 的返回值: coroutine result
await 后面可以跟協(xié)程、asyncio.Task 對(duì)象,、asyncio.Future 對(duì)象,,而 await obj 本質(zhì)上會(huì)調(diào)用 obj 的 __await__ 方法,然后通過(guò) __next__ 驅(qū)動(dòng)執(zhí)行,。
為了更好地理解 await,,我們對(duì)比一下 yield from,之前說(shuō)過(guò) yield from 可以用來(lái)實(shí)現(xiàn)委托生成器,。
def gen_func(): yield 123 return "result"
def middle(): res = yield (yield from gen_func()) print(res)
# 我們說(shuō)委托生成器有一個(gè)重要的作用 # 它會(huì)在子生成器和調(diào)用方之間建立一個(gè)雙向通道 g = middle() # 此時(shí)是調(diào)用方和子生成器之間直接通信 print(g.__next__()) # 123 # 顯然此時(shí)要拋異常了,那么委托生成器要負(fù)責(zé)兜底 # 會(huì)拿到子生成器的返回值,,在自身內(nèi)部尋找下一個(gè) yield print(g.__next__()) # result
而對(duì)于 await 也是同樣的道理,,它的作用和 yield from 類似。
async def coro_func(): return f"coroutine result"
async def main(): result = await coro_func() return f"來(lái)自 coro_func 的返回值: {result}"
# main() 里面 await 一個(gè)子協(xié)程 # 此時(shí) await 同樣會(huì)建立一個(gè)雙向通道 # 子協(xié)程會(huì)和調(diào)用方直接通信 # 另外這里的調(diào)用方可以是我們手動(dòng)調(diào)用,,也可以是事件循環(huán) try: # 子協(xié)程要拋異常了,,那么 main() 要兜底 # await 會(huì)捕獲異常,然后將返回值取出來(lái) main().__await__().__next__() except StopIteration as e: print(e.value) # 來(lái)自 coro_func 的返回值: coroutine result
我們?cè)趨f(xié)程里面,可以 await 另一個(gè)協(xié)程,,然后驅(qū)動(dòng)它執(zhí)行,。等到執(zhí)行完畢之后,再拿到它的返回值,?;蛘呶覀冞€有一種寫(xiě)法:
async def coro_func(): return f"coroutine result"
async def main(): try: coro_func().__await__().__next__() except StopIteration as e: return f"來(lái)自 coro_func 的返回值: {e.value}"
try: main().__await__().__next__() except StopIteration as e: print(e.value) # 來(lái)自 coro_func 的返回值: coroutine result
此時(shí)兩者的做法是等價(jià)的,但很明顯使用 await 要方便的多,。 另外,,我們上面運(yùn)行協(xié)程的時(shí)候并沒(méi)有依賴事件循環(huán),原因是協(xié)程本質(zhì)上就是個(gè)生成器,,即使不依賴事件循環(huán)也可以運(yùn)行,。當(dāng)然啦,在工作中肯定還是要交給事件循環(huán)進(jìn)行調(diào)度的,。
接下來(lái),,我們通過(guò)字節(jié)碼來(lái)進(jìn)一步觀察這背后的細(xì)節(jié)。
s = """ async def coro_func(): return f"coroutine result"
async def main(): result = await coro_func() return f"來(lái)自 coro_func 的返回值: {result}" """ if __name__ == '__main__': import dis dis.dis(compile(s, "<file>", "exec"))
先來(lái)看看模塊對(duì)應(yīng)的字節(jié)碼:
模塊對(duì)應(yīng)的字節(jié)碼沒(méi)什么好說(shuō)的,,我們重點(diǎn)看一下函數(shù)的字節(jié)碼,。
以上是兩個(gè)函數(shù)的字節(jié)碼,coro_func 也沒(méi)什么可說(shuō)的,,重點(diǎn)是 main,。 里面出現(xiàn)了一個(gè)指令 GET_AWAITABLE,它所做的事情就是調(diào)用協(xié)程對(duì)象的 __await__ 方法,。 case TARGET(GET_AWAITABLE): { PREDICTED(GET_AWAITABLE); //上一步的 CALL_FUNCTION 指令會(huì)構(gòu)建一個(gè)協(xié)程 //并且將協(xié)程壓入到棧頂,,也就是這里的iterable PyObject *iterable = TOP(); //調(diào)用協(xié)程的 __await__ PyObject *iter = _PyCoro_GetAwaitableIter(iterable);
//... //將 iter 設(shè)置為新的棧頂元素 SET_TOP(iter); /* Even if it's NULL */
if (iter == NULL) { goto error; }
PREDICT(LOAD_CONST); DISPATCH(); }
所以重點(diǎn)是 _PyCoro_GetAwaitableIter 這個(gè)函數(shù),我們看看協(xié)程的 __await__ 究竟返回了個(gè)啥,? PyObject * _PyCoro_GetAwaitableIter(PyObject *o) { unaryfunc getter = NULL; PyTypeObject *ot; //如果是一個(gè)使用 async def 定義的協(xié)程 //或者是一個(gè)被 @asyncio.coroutine 裝飾的協(xié)程(不推薦) //那么直接返回 if (PyCoro_CheckExact(o) || gen_is_coroutine(o)) { /* 'o' is a coroutine. */ Py_INCREF(o); return o; }
//如果不是一個(gè)協(xié)程,,那么它必須實(shí)現(xiàn) __await__ //對(duì)應(yīng) tp_as_async 的 am_await 成員 ot = Py_TYPE(o); if (ot->tp_as_async != NULL) { getter = ot->tp_as_async->am_await; } //如果實(shí)現(xiàn)了 __await__ if (getter != NULL) { //那么進(jìn)行調(diào)用 PyObject *res = (*getter)(o); if (res != NULL) { //如果一個(gè)對(duì)象不是協(xié)程,但又實(shí)現(xiàn)了 __await__ //那么 __await__ 必須返回迭代器 if (PyCoro_CheckExact(res) || gen_is_coroutine(res)) { /* __await__ must return an *iterator*, not a coroutine or another awaitable (see PEP 492) */ PyErr_SetString(PyExc_TypeError, "__await__() returned a coroutine"); Py_CLEAR(res); } else if (!PyIter_Check(res)) { PyErr_Format(PyExc_TypeError, "__await__() returned non-iterator " "of type '%.100s'", Py_TYPE(res)->tp_name); Py_CLEAR(res); } } return res; } //否則就說(shuō)明,,該對(duì)象不可以被 await PyErr_Format(PyExc_TypeError, "object %.100s can't be used in 'await' expression", ot->tp_name); return NULL; }
關(guān)于 __await__ 必須返回迭代器,,我們舉個(gè)栗子: class A:
def __await__(self): return coro_func().__await__()
async def coro_func(): return f"coroutine result"
async def main(): # A() 不是一個(gè)協(xié)程,那么它必須實(shí)現(xiàn) __await__ # 并且 __await__ 里面必須返回一個(gè)迭代器 # 只有這樣,,才能調(diào)用 __next__ 方法 result = await A() return f"來(lái)自 A().__await__ 的返回值: {result}"
try: main().__await__().__next__() except StopIteration as e: print(e.value) # 來(lái)自 A().__await__ 的返回值: coroutine result
然后重點(diǎn)來(lái)了,,后面又調(diào)用了 YIELD_FROM。因?yàn)?await obj 實(shí)際上分為兩步,,第一步是調(diào)用 obj 的 __await__ 返回一個(gè)迭代器,,由指令 GET_AWAITABLE 完成;第二步是通過(guò) __next__ 驅(qū)動(dòng)執(zhí)行,,由指令 YIELD_FROM 完成,。
也就是說(shuō),,obj.__await__().__next__() 再加一個(gè) StopIteration異常的捕獲邏輯,等價(jià)于 await obj,,因?yàn)?YIELD_FROM 會(huì)捕獲子協(xié)程 raise 的 StopIteration,。
因此所謂的協(xié)程無(wú)非就是基于生成器進(jìn)行的一個(gè)封裝罷了,只是生成器既可以用作生成器本身,,也可以用作協(xié)程,,那么這就會(huì)出現(xiàn)混亂。于是 Python 在 3.5 的時(shí)候引入了 async 和 await 兩個(gè)關(guān)鍵字,,專門用于協(xié)程,,但我們知道它們本質(zhì)上還是基于生成器實(shí)現(xiàn)的即可。 通過(guò)事件循環(huán)運(yùn)行協(xié)程 雖然我們可以直接驅(qū)動(dòng)協(xié)程執(zhí)行,,但這不是一個(gè)好的方式,,協(xié)程應(yīng)該放到事件循環(huán)中,由事件循環(huán)驅(qū)動(dòng)執(zhí)行,。 import asyncio
async def coro_func(n): await asyncio.sleep(n) print(f"我睡了 {n} 秒") return f"coroutine result"
async def main(): # 基于協(xié)程創(chuàng)建 asyncio.Task 對(duì)象 task1 = asyncio.create_task(coro_func(3)) task2 = asyncio.create_task(coro_func(1)) await task1
loop = asyncio.get_event_loop() loop.run_until_complete(main()) """ 我睡了 1 秒 我睡了 3 秒 """
這里有一個(gè)比較神奇的地方,,我們明明只 await task1,但是 task2 居然也被執(zhí)行了,,這是什么情況,。原因就在于事件循環(huán)的運(yùn)行單元是 asyncio.Task 對(duì)象,當(dāng)我們調(diào)用 create_task 創(chuàng)建 Task 對(duì)象的時(shí)候,,這個(gè) Task 對(duì)象就已經(jīng)被加入到事件循環(huán)中了,。 所以此時(shí)事件循環(huán)里面有 task1 和 task2 兩個(gè)任務(wù),當(dāng)我們 await task1 時(shí),,事件循環(huán)就會(huì)驅(qū)動(dòng) task1 執(zhí)行,。而一旦事件循環(huán)啟動(dòng),那么不單單會(huì)執(zhí)行 task1,,而是會(huì)執(zhí)行所有注冊(cè)進(jìn)來(lái)的任務(wù),。 所以先打印了 "我睡了 1 秒",因?yàn)樵搮f(xié)程 sleep 的時(shí)間更短,,盡管它后執(zhí)行,,說(shuō)明在 task1 的 sleep 時(shí)發(fā)生了協(xié)程切換。 import asyncio
async def coro_func(n): await asyncio.sleep(n) print(f"我睡了 {n} 秒") return f"coroutine result"
async def main(): task1 = asyncio.create_task(coro_func(3)) task2 = asyncio.create_task(coro_func(1)) task3 = coro_func(1)
await task1
loop = asyncio.get_event_loop() loop.run_until_complete(main()) """ 我睡了 1 秒 我睡了 3 秒 """
還是之前的代碼,,但此時(shí) task3 就沒(méi)有執(zhí)行,,原因是它沒(méi)有注冊(cè)到事件循環(huán)里面。而且此時(shí)還拋出了警告,,告訴我們沒(méi)有 await,。因?yàn)閰f(xié)程一旦創(chuàng)建了,就肯定要執(zhí)行,,而執(zhí)行有兩種方式,,一種是封裝成 Task 對(duì)象交給事件循環(huán)執(zhí)行,另一種是直接 await,。那么這兩種方式有什么區(qū)別呢,? import asyncio import time
async def coro_func(n): await asyncio.sleep(n) print(f"我睡了 {n} 秒")
async def main1(): task1 = asyncio.create_task(coro_func(3)) task2 = asyncio.create_task(coro_func(3))
await task1 await task2
async def main2(): await coro_func(3) await coro_func(3)
loop = asyncio.get_event_loop() start = time.perf_counter() loop.run_until_complete(main1()) print(time.perf_counter() - start) """ 我睡了 3 秒 我睡了 3 秒 3.0026131 """
start = time.perf_counter() loop.run_until_complete(main2()) print(time.perf_counter() - start) """ 我睡了 3 秒 我睡了 3 秒 6.0027813000000005 """
可以看到,執(zhí)行 main1 花了 3 秒鐘,,但是執(zhí)行 main2 花了 6 秒鐘,。 對(duì)于 main1 來(lái)講,里面的協(xié)程在被包裝成任務(wù)的時(shí)候,,就已經(jīng)注冊(cè)到事件循環(huán)里面去了,。驅(qū)動(dòng)任何一個(gè)任務(wù)執(zhí)行,均會(huì)使得事件循環(huán)里的所有任務(wù)都被執(zhí)行,。即使里面只有 await task1,,task2 同樣會(huì)被執(zhí)行,只不過(guò)加上 await task2 會(huì)使得邏輯必須在 task2 完成之后才能往下走,。 對(duì)于 main2 來(lái)講,,await 后面不是 Task 對(duì)象,而是一個(gè)協(xié)程,,所以此時(shí)不會(huì)進(jìn)入事件循環(huán),。而是我們之前說(shuō)的,像驅(qū)動(dòng)生成器一樣,,但此時(shí)無(wú)法實(shí)現(xiàn)切換,,兩個(gè)協(xié)程串行執(zhí)行。
所以實(shí)際工作中,,我們需要通過(guò)事件循環(huán)來(lái)驅(qū)動(dòng)協(xié)程執(zhí)行,,并且好的習(xí)慣是不要直接 await 一個(gè)協(xié)程,而是要把協(xié)程封裝成 Task 對(duì)象之后再 await,。在 Python 里面一個(gè)協(xié)程就是調(diào)用一個(gè)原生可以掛起的函數(shù),,任務(wù)則是對(duì)協(xié)程的進(jìn)一步封裝,里面包含了協(xié)程在執(zhí)行時(shí)的各種狀態(tài),。 而正是基于 Task 對(duì)象,,我們才能夠?qū)崿F(xiàn)協(xié)程間的切換,因?yàn)樗S護(hù)著協(xié)程的執(zhí)行狀態(tài),。而除了 Task 對(duì)象,,還有一個(gè) Future 對(duì)象,負(fù)責(zé)保存返回值,。當(dāng)然,,由于 Task 是 Future 的子類,所以 Task 對(duì)象包含了 Future 對(duì)象的所有功能,。 Task 和 Future 都定義在 _asynciomodule.c 中,。 asyncio 模塊依賴于 _asyncio,,這是一個(gè)內(nèi)嵌在解釋器里的模塊,感興趣的話可以去看一下,。 這里我們就介紹了協(xié)程的實(shí)現(xiàn)原理,,它和生成器具有高度的相似性。當(dāng)然了,,我們不僅要理解協(xié)程,,還要知道如何使用 asyncio 這個(gè)協(xié)程庫(kù),關(guān)于 asyncio 這里就不多說(shuō)了,,可以參考官方文檔,。 至于 _asynciomodule.c 有興趣的話可以讀一讀,因?yàn)槔锩娴膬?nèi)容讀起來(lái)難度還是蠻大的,。當(dāng)然,,如果能從基本概念上理解 asyncio、并且會(huì)用,,也已經(jīng)足夠了,。
|