前言Python 在 3.5 版本中引入了關(guān)于協(xié)程的語法糖 async 和 await, 在 python3.7 版本可以通過 asyncio.run() 運(yùn)行一個協(xié)程,。 所以建議大家學(xué)習(xí)協(xié)程的時候使用 python3.7+ 版本,,本文示例代碼在 python3.8 上運(yùn)行的。
什么是協(xié)程,?網(wǎng)上有個關(guān)于洗衣機(jī)的例子,,寫的挺好的,借用下 假設(shè)有1個洗衣房,,里面有10臺洗衣機(jī),,有一個洗衣工在負(fù)責(zé)這10臺洗衣機(jī)。那么洗衣房就相當(dāng)于1個進(jìn)程,,洗衣工就相當(dāng)1個線程,。如果有10個洗衣工,就相當(dāng)于10個線程,,1個進(jìn)程是可以開多線程的,。這就是多線程,!**那么協(xié)程呢,?** 先不急。大家都知道,,洗衣機(jī)洗衣服是需要等待時間的,,如果10個洗衣工,1人負(fù)責(zé)1臺洗衣機(jī),,這樣效率肯定會提高,,但是不覺得浪費(fèi)資源嗎?明明1 個人能做的事,,卻要10個人來做,。只是把衣服放進(jìn)去,打開開關(guān),就沒事做了,,等衣服洗好再拿出來就可以了,。就算很多人來洗衣服,1個人也足以應(yīng)付了,,開好第一臺洗衣機(jī),,在等待的時候去開第二臺洗衣機(jī),再開第三臺,,……直到有衣服洗好了,,就回來把衣服取出來, 接著再取另一臺的(哪臺洗好先就取哪臺,,所以協(xié)程是無序的),。這就是計算機(jī)的協(xié)程!洗衣機(jī)就是執(zhí)行的方法,?!?/code>
協(xié)程,又稱微線程,。 協(xié)程的作用是在執(zhí)行函數(shù)A時可以隨時中斷去執(zhí)行函數(shù)B,,然后中斷函數(shù)B繼續(xù)執(zhí)行函數(shù)A(可以自由切換)。 但這一過程并不是函數(shù)調(diào)用,,這一整個過程看似像多線程,,然而協(xié)程只有一個線程執(zhí)行。 協(xié)程很適合處理IO密集型程序的效率問題,。協(xié)程的本質(zhì)是個單線程,,它不能同時將 單個CPU 的多個核用上,因此對于CPU密集型程序協(xié)程需要和多進(jìn)程配合。 進(jìn)程與線程 進(jìn)程(Process)是計算機(jī)中的程序關(guān)于某數(shù)據(jù)集合上的一次運(yùn)行活動,,是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ)。 在早期面向進(jìn)程設(shè)計的計算機(jī)結(jié)構(gòu)中,,進(jìn)程是程序的基本執(zhí)行實體,;
在當(dāng)代面向線程設(shè)計的計算機(jī)結(jié)構(gòu)中,進(jìn)程是線程的容器,。 程序是指令,、數(shù)據(jù)及其組織形式的描述,進(jìn)程是程序的實體,。 線程(thread)是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,。 它被包含在進(jìn)程之中,是進(jìn)程中的實際運(yùn)作單位,。 一條線程指的是進(jìn)程中一個單一順序的控制流,,一個進(jìn)程中可以并發(fā)多個線程,,每條線程并行執(zhí)行不同的任務(wù)。
在Unix System V及SunOS中也被稱為輕量進(jìn)程(lightweight processes),,但輕量進(jìn)程更多指內(nèi)核線程(kernel thread),,而把用戶線程(user thread)稱為線程。
一個進(jìn)程可以有很多線程,,每條線程并行執(zhí)行不同的任務(wù),。
洗衣機(jī)例子假設(shè)有3臺洗衣機(jī)在工作,每個洗衣機(jī)工作的時長不一樣 import time
def washing1(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
def washing2(): time.sleep(8) print('washer2 finished')
def washing3(): time.sleep(5) print('washer3 finished')
if __name__ == '__main__': start_time = time.time() washing1() washing2() washing3() end_time = time.time() print('總共耗時:{}'.format(end_time-start_time))
我們通過函數(shù)的方式調(diào)用,,3個函數(shù),,總共耗時16 秒! 這里函數(shù)的執(zhí)行方式是同步運(yùn)行的,,于是這里需要知道一個概念: 同步/異步 同步: 在發(fā)出一個同步調(diào)用時,,在沒有得到結(jié)果之前,該調(diào)用就不返回,。 異步: 在發(fā)出一個異步調(diào)用后,,調(diào)用者不會立刻得到結(jié)果,該調(diào)用就返回了,。
再舉個小學(xué)生在學(xué)校學(xué)習(xí)的一個案例: 小明同學(xué)的媽媽給他早上安排了三件事: 1.洗衣機(jī)洗衣服需要花 15 分鐘,, 2.電飯煲做飯需要花 20 分鐘, 3.做作業(yè)需要花 25 分鐘 那么請問:小明同學(xué)早上完成以上三件事需要花多久,?,?? 這個大家肯定都知道是25分鐘,因為在做作業(yè)的時候,,可以先按下洗衣機(jī)和電飯煲的按鈕,,不用等它完成,洗衣機(jī)和電飯煲做好了會發(fā)出'滴滴滴’的聲音通知你,。 所以這三件事是可以異步完成的,,這就是異步的魅力! 協(xié)程(異步)協(xié)程(coroutines)通過 async/await 語法進(jìn)行聲明,,是編寫 asyncio 應(yīng)用的推薦方式,。 這里我們需要學(xué)一個新的語法糖 async , 例如,以下代碼段(需要 Python 3.7+) import time
async def washing1(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
async def washing2(): time.sleep(8) print('washer2 finished')
async def washing3(): time.sleep(5) print('washer3 finished')
if __name__ == '__main__': start_time = time.time() washing1() washing2() washing3() end_time = time.time() print('總共耗時:{}'.format(end_time-start_time))
如果我們直接當(dāng)函數(shù)運(yùn)行,會出現(xiàn)警告,,并且并沒有只需函數(shù)里面的print內(nèi)容 untimeWarning: coroutine 'washing1' was never awaited washing1() RuntimeWarning: Enable tracemalloc to get the object allocation traceback
先看下async 定義的異步函數(shù)到底返回的是什么 import time
async def fun(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
res = fun() print(res) # <coroutine object fun at 0x000001FA1882B9C0>
返回的是coroutine object 也就是協(xié)程對象,,并沒直接執(zhí)行 執(zhí)行協(xié)程 coroutine 函數(shù)執(zhí)行協(xié)程函數(shù),,必須使用事件循環(huán)get_event_loop() ,。 import time import asyncio
async def fun(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
coroutine_1 = fun() # 協(xié)程是一個對象,不能直接運(yùn)行 loop = asyncio.get_event_loop() # 創(chuàng)建一個事件循環(huán) result = loop.run_until_complete(coroutine_1) # 將協(xié)程對象加入到事件循環(huán)中,,并執(zhí)行
在python3.7+以后的版本,,可以直接asyncio.run() 去執(zhí)行一個協(xié)程函數(shù) import time import asyncio
async def fun(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
coroutine_1 = fun() # 協(xié)程是一個對象,,不能直接運(yùn)行 asyncio.run(coroutine_1)
多個任務(wù)執(zhí)行 asyncio.create_task()當(dāng)我們需要3臺洗衣機(jī)一起來工作,這時候需要創(chuàng)建多個任務(wù),,也就是會用到asyncio.create_task() import time import asyncio
async def washing1(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
async def washing2(): time.sleep(8) print('washer2 finished')
async def washing3(): time.sleep(5) print('washer3 finished')
async def main(): print('start main:') start_time = time.time() task1 = asyncio.create_task(washing1()) task2 = asyncio.create_task(washing2()) task3 = asyncio.create_task(washing3()) await task1 await task2 await task3 end_time = time.time() print('-----------end main----------') print('總共耗時:{}'.format(end_time-start_time))
if __name__ == '__main__': # asyncio.run(main()) loop = asyncio.get_event_loop() # 創(chuàng)建一個事件循環(huán) result = loop.run_until_complete(main()) # 將協(xié)程對象加入到事件循環(huán)中,,并執(zhí)行
運(yùn)行結(jié)果: start main: washer1 finished washer2 finished washer3 finished -----------end main---------- 總共耗時:16.000632286071777
我們發(fā)現(xiàn)運(yùn)行的總耗時還是16秒,并沒有達(dá)到我們想要的結(jié)果,,最大耗時8秒,,這個問題稍后再講,先解決如果有很多個任務(wù),,那我們總不會一直寫 task1 = asyncio.create_task(washing1()) task2 = asyncio.create_task(washing2()) task3 = asyncio.create_task(washing3()) await task1 await task2 await task3
這里可以優(yōu)化下 import time import asyncio
async def washing1(): time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
async def washing2(): time.sleep(8) print('washer2 finished')
async def washing3(): time.sleep(5) print('washer3 finished')
if __name__ == '__main__': print('start main:') start_time = time.time() # step1 創(chuàng)建一個事件循環(huán) loop = asyncio.get_event_loop() # step2 將異步函數(shù)(協(xié)程)加入事件隊列 tasks = [ washing1(), washing2(), washing3() ] # step3 執(zhí)行事件隊列 直到最晚的一個事件被處理完畢后結(jié)束 loop.run_until_complete(asyncio.wait(tasks)) end_time = time.time() print('-----------end main----------') print('總共耗時:{}'.format(end_time-start_time))
優(yōu)化后,,運(yùn)行結(jié)果 start main: washer3 finished washer1 finished washer2 finished -----------end main---------- 總共耗時:16.00227665901184
await 使用上面雖然異步執(zhí)行了三個任務(wù),但是時間并沒減少,,主要是因為 time.sleep() 是阻塞的,,需換成異步的 import time import asyncio
async def washing1(): await asyncio.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
async def washing2(): await asyncio.sleep(8) print('washer2 finished')
async def washing3(): await asyncio.sleep(5) print('washer3 finished')
if __name__ == '__main__': print('start main:') start_time = time.time() # step1 創(chuàng)建一個事件循環(huán) loop = asyncio.get_event_loop() # step2 將異步函數(shù)(協(xié)程)加入事件隊列 tasks = [ washing1(), washing2(), washing3() ] # step3 執(zhí)行事件隊列 直到最晚的一個事件被處理完畢后結(jié)束 loop.run_until_complete(asyncio.wait(tasks)) end_time = time.time() print('-----------end main----------') print('總共耗時:{}'.format(end_time-start_time))
這樣就可以達(dá)到我們的預(yù)期,總共耗時是8秒了 start main: washer1 finished washer3 finished washer2 finished -----------end main---------- 總共耗時:8.002010822296143
接著我們在看下 await 如何使用, 當(dāng)我們直接 await time.sleep(3) 時 import time import asyncio
async def washing1(): await time.sleep(3) # 第一臺洗衣機(jī), print('washer1 finished') # 洗完了
coroutine_1 = washing1() # 協(xié)程是一個對象,,不能直接運(yùn)行 loop = asyncio.get_event_loop() # 創(chuàng)建一個事件循環(huán) result = loop.run_until_complete(coroutine_1) # 將協(xié)程對象加入到事件循環(huán)中,,并執(zhí)行
運(yùn)行會報錯:TypeError: object NoneType can’t be used in 'await’ expression Traceback (most recent call last): File "D:/demo/a6.py", line 11, in <module> result = loop.run_until_complete(coroutine_1) # 將協(xié)程對象加入到事件循環(huán)中,并執(zhí)行 File "D:\python3.8\lib\asyncio\base_events.py", line 616, in run_until_complete return future.result() File "D:/demo/a6.py", line 6, in washing1 await time.sleep(3) # 第一臺洗衣機(jī), TypeError: object NoneType can't be used in 'await' expression
因為 await 后面必須要是一個可等待對象 await + 可等待對象(協(xié)程對象,,F(xiàn)uture,,Task對象(IO等待)) 等待到對象的返回結(jié)果,才會繼續(xù)執(zhí)行后續(xù)代碼
可等待對象 await 的使用可等待對象:如果一個對象可以在 await 語句中使用,,那么它就是 可等待 對象,。許多 asyncio API 都被設(shè)計為接受可等待對象。 可等待 對象有三種主要類型: 協(xié)程, 任務(wù) 和 Future . 協(xié)程:python中的協(xié)程屬于 可等待 對象,,所以可以在其他協(xié)程中被等待 接著我們再把洗衣機(jī)工作的場景分2個步驟實現(xiàn),,第一個步驟是放衣服,第二個步驟是洗衣機(jī)工作 import time import asyncio
async def add_clothes(): print('往洗衣機(jī)添加衣服....') await asyncio.sleep(2) # 模擬這個任務(wù)耗時2秒
async def washing1(): print('洗衣機(jī)工作之前,,需加衣服進(jìn)去') await add_clothes() # 等待這個事情完成 print('衣服加進(jìn)去,,可以開始工作了。,。,。。') await asyncio.sleep(3) # 模擬洗衣機(jī)工作的耗時 print('washer1 finished') # 洗完了
print('start washing:') start_time = time.time() coroutine_1 = washing1() # 協(xié)程是一個對象,,不能直接運(yùn)行 loop = asyncio.get_event_loop() # 創(chuàng)建一個事件循環(huán) result = loop.run_until_complete(coroutine_1) # 將協(xié)程對象加入到事件循環(huán)中,,并執(zhí)行 end_time = time.time() print('-----------end washing----------') print('總共耗時:{}'.format(end_time-start_time))
運(yùn)行結(jié)果 start washing: 洗衣機(jī)工作之前,需加衣服進(jìn)去 往洗衣機(jī)添加衣服.... 衣服加進(jìn)去,,可以開始工作了,。。,。,。washer1 finished -----------end washing---------- 總共耗時:5.001740217208862
往洗衣機(jī)加衣服和洗衣機(jī)工作這2個事情,,它是需要等第一件事完成才能執(zhí)行,所以這2個任務(wù)是需要等待完成才能做下一步的,。 2個洗衣機(jī)工作,,是互不影響的,所以不需要等第一個洗衣機(jī)工作完成,,2個洗衣機(jī)工作的任務(wù)是異步的,。
|