久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Python編程學(xué)習(xí):這會(huì)是你見(jiàn)過(guò)講得最清楚的「異步爬蟲(chóng)指南」

 千鋒Python學(xué)堂 2019-07-16

前面關(guān)于Python編程學(xué)習(xí)教程中有跟大家提到過(guò)異步爬蟲(chóng),,這幾天想了想,還是得跟大家出一篇詳細(xì)的異步爬蟲(chóng)教程,畢竟一直后臺(tái)留言異步爬蟲(chóng)的伙伴也還不少!有需要的伙伴可得認(rèn)真閱讀完哦,!后期關(guān)于異步爬蟲(chóng)的Python編程學(xué)習(xí)教程應(yīng)該不會(huì)再像今天這么詳細(xì)了!且看且珍惜哈你們!

在執(zhí)行一些 IO 密集型任務(wù)的時(shí)候,,程序常常會(huì)因?yàn)榈却?IO 而阻塞。比如在網(wǎng)絡(luò)爬蟲(chóng)中,,如果我們使用 requests 庫(kù)來(lái)進(jìn)行請(qǐng)求的話,,如果網(wǎng)站響應(yīng)速度過(guò)慢,程序一直在等待網(wǎng)站響應(yīng),,最后導(dǎo)致其爬取效率是非常非常低的,。

為了解決這類問(wèn)題,本文就來(lái)探討一下 Python 中異步協(xié)程來(lái)加速的方法,,此種方法對(duì)于 IO 密集型任務(wù)非常有效,。如將其應(yīng)用到網(wǎng)絡(luò)爬蟲(chóng)中,爬取效率甚至可以成百倍地提升,。

【注】:本文協(xié)程使用 async/await 來(lái)實(shí)現(xiàn),,需要 Python 3.5 及以上版本。

基本了解

在了解異步協(xié)程之前,,我們首先得了解一些基礎(chǔ)概念,,如阻塞和非阻塞、同步和異步,、多進(jìn)程和協(xié)程,。

阻塞

阻塞狀態(tài)指程序未得到所需計(jì)算資源時(shí)被掛起的狀態(tài)。程序在等待某個(gè)操作完成期間,,自身無(wú)法繼續(xù)干別的事情,,則稱該程序在該操作上是阻塞的。

常見(jiàn)的阻塞形式有:網(wǎng)絡(luò) I/O 阻塞,、磁盤 I/O 阻塞,、用戶輸入阻塞等。阻塞是無(wú)處不在的,,包括 CPU 切換上下文時(shí),,所有的進(jìn)程都無(wú)法真正干事情,它們也會(huì)被阻塞,。如果是多核 CPU 則正在執(zhí)行上下文切換操作的核不可被利用,。

非阻塞

程序在等待某操作過(guò)程中,自身不被阻塞,,可以繼續(xù)運(yùn)行干別的事情,,則稱該程序在該操作上是非阻塞的。

非阻塞并不是在任何程序級(jí)別,、任何情況下都可以存在的,。

僅當(dāng)程序封裝的級(jí)別可以囊括獨(dú)立的子程序單元時(shí),,它才可能存在非阻塞狀態(tài)。

非阻塞的存在是因?yàn)樽枞嬖?,正因?yàn)槟硞€(gè)操作阻塞導(dǎo)致的耗時(shí)與效率低下,,我們才要把它變成非阻塞的。

同步

不同程序單元為了完成某個(gè)任務(wù),,在執(zhí)行過(guò)程中需靠某種通信方式以協(xié)調(diào)一致,,稱這些程序單元是同步執(zhí)行的。

例如購(gòu)物系統(tǒng)中更新商品庫(kù)存,,需要用“行鎖”作為通信信號(hào),,讓不同的更新請(qǐng)求強(qiáng)制排隊(duì)順序執(zhí)行,那更新庫(kù)存的操作是同步的,。

簡(jiǎn)言之,,同步意味著有序。

異步

為完成某個(gè)任務(wù),,不同程序單元之間過(guò)程中無(wú)需通信協(xié)調(diào),,也能完成任務(wù)的方式,不相關(guān)的程序單元之間可以是異步的,。

例如,,爬蟲(chóng)下載網(wǎng)頁(yè)。調(diào)度程序調(diào)用下載程序后,,即可調(diào)度其他任務(wù),,而無(wú)需與該下載任務(wù)保持通信以協(xié)調(diào)行為。不同網(wǎng)頁(yè)的下載,、保存等操作都是無(wú)關(guān)的,,也無(wú)需相互通知協(xié)調(diào)。這些異步操作的完成時(shí)刻并不確定,。

簡(jiǎn)言之,,異步意味著無(wú)序。

多進(jìn)程

多進(jìn)程就是利用 CPU 的多核優(yōu)勢(shì),,在同一時(shí)間并行地執(zhí)行多個(gè)任務(wù),,可以大大提高執(zhí)行效率。

協(xié)程

協(xié)程,,英文叫做 Coroutine,,又稱微線程,纖程,,協(xié)程是一種用戶態(tài)的輕量級(jí)線程,。

協(xié)程擁有自己的寄存器上下文和棧。協(xié)程調(diào)度切換時(shí),,將寄存器上下文和棧保存到其他地方,,在切回來(lái)的時(shí)候,,恢復(fù)先前保存的寄存器上下文和棧。因此協(xié)程能保留上一次調(diào)用時(shí)的狀態(tài),,即所有局部狀態(tài)的一個(gè)特定組合,,每次過(guò)程重入時(shí),就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài),。

協(xié)程本質(zhì)上是個(gè)單進(jìn)程,協(xié)程相對(duì)于多進(jìn)程來(lái)說(shuō),,無(wú)需線程上下文切換的開(kāi)銷,,無(wú)需原子操作鎖定及同步的開(kāi)銷,編程模型也非常簡(jiǎn)單,。

我們可以使用協(xié)程來(lái)實(shí)現(xiàn)異步操作,,比如在網(wǎng)絡(luò)爬蟲(chóng)場(chǎng)景下,我們發(fā)出一個(gè)請(qǐng)求之后,,需要等待一定的時(shí)間才能得到響應(yīng),,但其實(shí)在這個(gè)等待過(guò)程中,程序可以干許多其他的事情,,等到響應(yīng)得到之后才切換回來(lái)繼續(xù)處理,,這樣可以充分利用 CPU 和其他資源,這就是異步協(xié)程的優(yōu)勢(shì),。

異步協(xié)程用法

接下來(lái)讓我們來(lái)了解下協(xié)程的實(shí)現(xiàn),,從 Python 3.4 開(kāi)始,Python 中加入了協(xié)程的概念,,但這個(gè)版本的協(xié)程還是以生成器對(duì)象為基礎(chǔ)的,,在 Python 3.5 則增加了 async/await,使得協(xié)程的實(shí)現(xiàn)更加方便,。

Python 中使用協(xié)程最常用的庫(kù)莫過(guò)于 asyncio,,所以本文會(huì)以 asyncio 為基礎(chǔ)來(lái)介紹協(xié)程的使用。

首先我們需要了解下面幾個(gè)概念:

  • event_loop:事件循環(huán),,相當(dāng)于一個(gè)無(wú)限循環(huán),,我們可以把一些函數(shù)注冊(cè)到這個(gè)事件循環(huán)上,當(dāng)滿足條件發(fā)生的時(shí)候,,就會(huì)調(diào)用對(duì)應(yīng)的處理方法,。

  • coroutine:中文翻譯叫協(xié)程,在 Python 中常指代為協(xié)程對(duì)象類型,,我們可以將協(xié)程對(duì)象注冊(cè)到時(shí)間循環(huán)中,,它會(huì)被事件循環(huán)調(diào)用。我們可以使用 async 關(guān)鍵字來(lái)定義一個(gè)方法,,這個(gè)方法在調(diào)用時(shí)不會(huì)立即被執(zhí)行,,而是返回一個(gè)協(xié)程對(duì)象,。

  • task:任務(wù),它是對(duì)協(xié)程對(duì)象的進(jìn)一步封裝,,包含了任務(wù)的各個(gè)狀態(tài),。

  • future:代表將來(lái)執(zhí)行或沒(méi)有執(zhí)行的任務(wù)的結(jié)果,實(shí)際上和 task 沒(méi)有本質(zhì)區(qū)別,。

另外我們還需要了解 async/await 關(guān)鍵字,,它是從 Python 3.5 才出現(xiàn)的,專門用于定義協(xié)程,。其中,,async 定義一個(gè)協(xié)程,await 用來(lái)掛起阻塞方法的執(zhí)行,。

定義協(xié)程

首先我們來(lái)定義一個(gè)協(xié)程,,體驗(yàn)一下它和普通進(jìn)程在實(shí)現(xiàn)上的不同之處,代碼如下:

import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

運(yùn)行結(jié)果:

Coroutine: <coroutine object execute at 0x1034cf830>
After calling execute
Number: 1
After calling loop

首先我們引入了 asyncio 這個(gè)包,,這樣我們才可以使用 async 和 await,,然后我們使用 async 定義了一個(gè) execute() 方法,方法接收一個(gè)數(shù)字參數(shù),,方法執(zhí)行之后會(huì)打印這個(gè)數(shù)字,。

隨后我們直接調(diào)用了這個(gè)方法,然而這個(gè)方法并沒(méi)有執(zhí)行,,而是返回了一個(gè) coroutine 協(xié)程對(duì)象,。隨后我們使用 get_event_loop() 方法創(chuàng)建了一個(gè)事件循環(huán) loop,并調(diào)用了 loop 對(duì)象的 run_until_complete() 方法將協(xié)程注冊(cè)到事件循環(huán) loop 中,,然后啟動(dòng),。最后我們才看到了 execute() 方法打印了輸出結(jié)果。

可見(jiàn),,async 定義的方法就會(huì)變成一個(gè)無(wú)法直接執(zhí)行的 coroutine 對(duì)象,,必須將其注冊(cè)到事件循環(huán)中才可以執(zhí)行。

上文我們還提到了 task,,它是對(duì) coroutine 對(duì)象的進(jìn)一步封裝,,它里面相比 coroutine 對(duì)象多了運(yùn)行狀態(tài),比如 running,、finished 等,,我們可以用這些狀態(tài)來(lái)獲取協(xié)程對(duì)象的執(zhí)行情況。

在上面的例子中,,當(dāng)我們將 coroutine 對(duì)象傳遞給 run_until_complete() 方法的時(shí)候,,實(shí)際上它進(jìn)行了一個(gè)操作就是將 coroutine 封裝成了 task 對(duì)象,我們也可以顯式地進(jìn)行聲明,如下所示:

import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

運(yùn)行結(jié)果:

Coroutine: <coroutine object execute at 0x10e0f7830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop

這里我們定義了 loop 對(duì)象之后,,接著調(diào)用了它的 create_task() 方法將 coroutine 對(duì)象轉(zhuǎn)化為了 task 對(duì)象,,隨后我們打印輸出一下,發(fā)現(xiàn)它是 pending 狀態(tài),。接著我們將 task 對(duì)象添加到事件循環(huán)中得到執(zhí)行,,隨后我們?cè)俅蛴≥敵鲆幌?task 對(duì)象,發(fā)現(xiàn)它的狀態(tài)就變成了 finished,,同時(shí)還可以看到其 result 變成了 1,,也就是我們定義的 execute() 方法的返回結(jié)果。

另外定義 task 對(duì)象還有一種方式,,就是直接通過(guò) asyncio 的 ensure_future() 方法,,返回結(jié)果也是 task 對(duì)象,這樣的話我們就可以不借助于 loop 來(lái)定義,,即使我們還沒(méi)有聲明 loop 也可以提前定義好 task 對(duì)象,寫法如下:

import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

運(yùn)行結(jié)果:

Coroutine: <coroutine object execute at 0x10aa33830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop

發(fā)現(xiàn)其效果都是一樣的,。

綁定回調(diào)

另外我們也可以為某個(gè) task 綁定一個(gè)回調(diào)方法,,來(lái)看下面的例子:

import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task):
print('Status:', task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

在這里我們定義了一個(gè) request() 方法,請(qǐng)求了百度,,返回狀態(tài)碼,,但是這個(gè)方法里面我們沒(méi)有任何 print() 語(yǔ)句。隨后我們定義了一個(gè) callback() 方法,,這個(gè)方法接收一個(gè)參數(shù),,是 task 對(duì)象,然后調(diào)用 print() 方法打印了 task 對(duì)象的結(jié)果,。這樣我們就定義好了一個(gè) coroutine 對(duì)象和一個(gè)回調(diào)方法,,我們現(xiàn)在希望的效果是,當(dāng) coroutine 對(duì)象執(zhí)行完畢之后,,就去執(zhí)行聲明的 callback() 方法,。

那么它們二者怎樣關(guān)聯(lián)起來(lái)呢?很簡(jiǎn)單,,只需要調(diào)用 add_done_callback() 方法即可,,我們將 callback() 方法傳遞給了封裝好的 task 對(duì)象,這樣當(dāng) task 執(zhí)行完畢之后就可以調(diào)用 callback() 方法了,,同時(shí) task 對(duì)象還會(huì)作為參數(shù)傳遞給 callback() 方法,,調(diào)用 task 對(duì)象的 result() 方法就可以獲取返回結(jié)果了。

運(yùn)行結(jié)果:

Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>

實(shí)際上不用回調(diào)方法,,直接在 task 運(yùn)行完畢之后也可以直接調(diào)用 result() 方法獲取結(jié)果,,如下所示:

import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())

運(yùn)行結(jié)果是一樣的:

Task: <Task pending coro=<request() running at demo.py:4>>
Task: <Task finished coro=<request() done, defined at demo.py:4> result=<Response [200]>>
Task Result: <Response [200]>

多任務(wù)協(xié)程

上面的例子我們只執(zhí)行了一次請(qǐng)求,如果我們想執(zhí)行多次請(qǐng)求應(yīng)該怎么辦呢?我們可以定義一個(gè) task 列表,,然后使用 asyncio 的 wait() 方法即可執(zhí)行,,看下面的例子:

import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())

這里我們使用一個(gè) for 循環(huán)創(chuàng)建了五個(gè) task,組成了一個(gè)列表,,然后把這個(gè)列表首先傳遞給了 asyncio 的 wait() 方法,,然后再將其注冊(cè)到時(shí)間循環(huán)中,就可以發(fā)起五個(gè)任務(wù)了,。最后我們?cè)賹⑷蝿?wù)的運(yùn)行結(jié)果輸出出來(lái),,運(yùn)行結(jié)果如下:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

可以看到五個(gè)任務(wù)被順次執(zhí)行了,并得到了運(yùn)行結(jié)果,。

協(xié)程實(shí)現(xiàn)

前面說(shuō)了這么一通,,又是 async,又是 coroutine,,又是 task,,又是 callback,但似乎并沒(méi)有看出協(xié)程的優(yōu)勢(shì)???反而寫法上更加奇怪和麻煩了,別急,,上面的案例只是為后面的使用作鋪墊,,接下來(lái)我們正式來(lái)看下協(xié)程在解決 IO 密集型任務(wù)上有怎樣的優(yōu)勢(shì)吧!

上面的代碼中,,我們用一個(gè)網(wǎng)絡(luò)請(qǐng)求作為示例,,這就是一個(gè)耗時(shí)等待的操作,因?yàn)槲覀冋?qǐng)求網(wǎng)頁(yè)之后需要等待頁(yè)面響應(yīng)并返回結(jié)果,。耗時(shí)等待的操作一般都是 IO 操作,,比如文件讀取、網(wǎng)絡(luò)請(qǐng)求等等,。協(xié)程對(duì)于處理這種操作是有很大優(yōu)勢(shì)的,,當(dāng)遇到需要等待的情況的時(shí)候,程序可以暫時(shí)掛起,,轉(zhuǎn)而去執(zhí)行其他的操作,,從而避免一直等待一個(gè)程序而耗費(fèi)過(guò)多的時(shí)間,充分利用資源,。

為了表現(xiàn)出協(xié)程的優(yōu)勢(shì),,我們需要先創(chuàng)建一個(gè)合適的實(shí)驗(yàn)環(huán)境,最好的方法就是模擬一個(gè)需要等待一定時(shí)間才可以獲取返回結(jié)果的網(wǎng)頁(yè),,上面的代碼中使用了百度,,但百度的響應(yīng)太快了,,而且響應(yīng)速度也會(huì)受本機(jī)網(wǎng)速影響,所以最好的方式是自己在本地模擬一個(gè)慢速服務(wù)器,,這里我們選用 Flask,。

如果沒(méi)有安裝 Flask 的話可以執(zhí)行如下命令安裝:

pip3 install flask

然后編寫服務(wù)器代碼如下:

from flask import Flask
import time
app = Flask(__name__)
@app.route('/')
def index():
time.sleep(3)
return 'Hello!'
if __name__ == '__main__':
app.run(threaded=True)

這里我們定義了一個(gè) Flask 服務(wù),主入口是 index() 方法,,方法里面先調(diào)用了 sleep() 方法休眠 3 秒,,然后接著再返回結(jié)果,也就是說(shuō),,每次請(qǐng)求這個(gè)接口至少要耗時(shí) 3 秒,,這樣我們就模擬了一個(gè)慢速的服務(wù)接口。

注意這里服務(wù)啟動(dòng)的時(shí)候,,run() 方法加了一個(gè)參數(shù) threaded,,這表明 Flask 啟動(dòng)了多線程模式,不然默認(rèn)是只有一個(gè)線程的,。如果不開(kāi)啟多線程模式,,同一時(shí)刻遇到多個(gè)請(qǐng)求的時(shí)候,只能順次處理,,這樣即使我們使用協(xié)程異步請(qǐng)求了這個(gè)服務(wù),,也只能一個(gè)一個(gè)排隊(duì)等待,瓶頸就會(huì)出現(xiàn)在服務(wù)端,。所以,多線程模式是有必要打開(kāi)的,。

啟動(dòng)之后,,F(xiàn)lask 應(yīng)該默認(rèn)會(huì)在 127.0.0.1:5000 上運(yùn)行,運(yùn)行之后控制臺(tái)輸出結(jié)果如下:

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

接下來(lái)我們?cè)僦匦率褂蒙厦娴姆椒ㄕ?qǐng)求一遍:

import asyncio
import requests
import time
start = time.time()
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = requests.get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)

在這里我們還是創(chuàng)建了五個(gè) task,,然后將 task 列表傳給 wait() 方法并注冊(cè)到時(shí)間循環(huán)中執(zhí)行,。

運(yùn)行結(jié)果如下:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.049368143081665

可以發(fā)現(xiàn)和正常的請(qǐng)求并沒(méi)有什么兩樣,依然還是順次執(zhí)行的,,耗時(shí) 15 秒,,平均一個(gè)請(qǐng)求耗時(shí) 3 秒,說(shuō)好的異步處理呢,?

其實(shí),,要實(shí)現(xiàn)異步處理,我們得先要有掛起的操作,,當(dāng)一個(gè)任務(wù)需要等待 IO 結(jié)果的時(shí)候,,可以掛起當(dāng)前任務(wù),轉(zhuǎn)而去執(zhí)行其他任務(wù),,這樣我們才能充分利用好資源,,上面方法都是一本正經(jīng)的串行走下來(lái),連個(gè)掛起都沒(méi)有,怎么可能實(shí)現(xiàn)異步,?想太多了,。

要實(shí)現(xiàn)異步,接下來(lái)我們?cè)倭私庖幌?await 的用法,,使用 await 可以將耗時(shí)等待的操作掛起,,讓出控制權(quán)。當(dāng)協(xié)程執(zhí)行的時(shí)候遇到 await,,時(shí)間循環(huán)就會(huì)將本協(xié)程掛起,,轉(zhuǎn)而去執(zhí)行別的協(xié)程,直到其他的協(xié)程掛起或執(zhí)行完畢,。

所以,,我們可能會(huì)將代碼中的 request() 方法改成如下的樣子:

async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await requests.get(url)
print('Get response from', url, 'Result:', response.text)

僅僅是在 requests 前面加了一個(gè) await,然而執(zhí)行以下代碼,,會(huì)得到如下報(bào)錯(cuò):

Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Cost time: 15.048935890197754
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
File "demo.py", line 10, in request
status = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

這次它遇到 await 方法確實(shí)掛起了,,也等待了,但是最后卻報(bào)了這么個(gè)錯(cuò),,這個(gè)錯(cuò)誤的意思是 requests 返回的 Response 對(duì)象不能和 await 一起使用,,為什么呢?因?yàn)楦鶕?jù)官方文檔說(shuō)明,,await 后面的對(duì)象必須是如下格式之一:

  • A native coroutine object returned from a native coroutine function,,一個(gè)原生 coroutine 對(duì)象。

  • A generator-based coroutine object returned from a function decorated with types.coroutine(),,一個(gè)由 types.coroutine() 修飾的生成器,,這個(gè)生成器可以返回 coroutine 對(duì)象。

  • An object with an await__ method returning an iterator,,一個(gè)包含 __await 方法的對(duì)象返回的一個(gè)迭代器,。

reqeusts 返回的 Response 不符合上面任一條件,因此就會(huì)報(bào)上面的錯(cuò)誤了,。

那么有的小伙伴就發(fā)現(xiàn)了,,既然 await 后面可以跟一個(gè) coroutine 對(duì)象,那么我用 async 把請(qǐng)求的方法改成 coroutine 對(duì)象不就可以了嗎,?所以就改寫成如下的樣子:

import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)

這里我們將請(qǐng)求頁(yè)面的方法獨(dú)立出來(lái),,并用 async 修飾,這樣就得到了一個(gè) coroutine 對(duì)象,,我們運(yùn)行一下看看:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447

還是不行,,它還不是異步執(zhí)行,也就是說(shuō)我們僅僅將涉及 IO 操作的代碼封裝到 async 修飾的方法里面是不可行的,!我們必須要使用支持異步操作的請(qǐng)求方式才可以實(shí)現(xiàn)真正的異步,,所以這里就需要 aiohttp 派上用場(chǎng)了,。

使用 aiohttp

aiohttp 是一個(gè)支持異步請(qǐng)求的庫(kù),利用它和 asyncio 配合我們可以非常方便地實(shí)現(xiàn)異步請(qǐng)求操作,。

安裝方式如下:

pip3 install aiohttp

官方文檔鏈接為:https://aiohttp./,,它分為兩部分,一部分是 Client,,一部分是 Server,,詳細(xì)的內(nèi)容可以參考官方文檔。

下面我們將 aiohttp 用上來(lái),,將代碼改成如下樣子:

import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = await get(url)
print('Get response from', url, 'Result:', result)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)

在這里我們將請(qǐng)求庫(kù)由 requests 改成了 aiohttp,,通過(guò) aiohttp 的 ClientSession 類的 get() 方法進(jìn)行請(qǐng)求,結(jié)果如下:

Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 3.0199508666992188

成功了,!我們發(fā)現(xiàn)這次請(qǐng)求的耗時(shí)由 15 秒變成了 3 秒,,耗時(shí)直接變成了原來(lái)的 1/5。

代碼里面我們使用了 await,,后面跟了 get() 方法,,在執(zhí)行這五個(gè)協(xié)程的時(shí)候,如果遇到了 await,,那么就會(huì)將當(dāng)前協(xié)程掛起,,轉(zhuǎn)而去執(zhí)行其他的協(xié)程,直到其他的協(xié)程也掛起或執(zhí)行完畢,,再進(jìn)行下一個(gè)協(xié)程的執(zhí)行,。

開(kāi)始運(yùn)行時(shí),時(shí)間循環(huán)會(huì)運(yùn)行第一個(gè) task,,針對(duì)第一個(gè) task 來(lái)說(shuō),,當(dāng)執(zhí)行到第一個(gè) await 跟著的 get() 方法時(shí),它被掛起,,但這個(gè) get() 方法第一步的執(zhí)行是非阻塞的,掛起之后立馬被喚醒,,所以立即又進(jìn)入執(zhí)行,,創(chuàng)建了 ClientSession 對(duì)象,接著遇到了第二個(gè) await,,調(diào)用了 session.get() 請(qǐng)求方法,,然后就被掛起了,由于請(qǐng)求需要耗時(shí)很久,,所以一直沒(méi)有被喚醒,,好第一個(gè) task 被掛起了,那接下來(lái)該怎么辦呢,?事件循環(huán)會(huì)尋找當(dāng)前未被掛起的協(xié)程繼續(xù)執(zhí)行,,于是就轉(zhuǎn)而執(zhí)行第二個(gè) task 了,,也是一樣的流程操作,直到執(zhí)行了第五個(gè) task 的 session.get() 方法之后,,全部的 task 都被掛起了,。所有 task 都已經(jīng)處于掛起狀態(tài),那咋辦,?只好等待了,。3 秒之后,幾個(gè)請(qǐng)求幾乎同時(shí)都有了響應(yīng),,然后幾個(gè) task 也被喚醒接著執(zhí)行,,輸出請(qǐng)求結(jié)果,最后耗時(shí),,3 秒,!

怎么樣?這就是異步操作的便捷之處,,當(dāng)遇到阻塞式操作時(shí),,任務(wù)被掛起,程序接著去執(zhí)行其他的任務(wù),,而不是傻傻地等著,,這樣可以充分利用 CPU 時(shí)間,而不必把時(shí)間浪費(fèi)在等待 IO 上,。

有人就會(huì)說(shuō)了,,既然這樣的話,在上面的例子中,,在發(fā)出網(wǎng)絡(luò)請(qǐng)求后,,既然接下來(lái)的 3 秒都是在等待的,在 3 秒之內(nèi),,CPU 可以處理的 task 數(shù)量遠(yuǎn)不止這些,,那么豈不是我們放 10 個(gè)、20 個(gè),、50 個(gè),、100 個(gè)、1000 個(gè) task 一起執(zhí)行,,最后得到所有結(jié)果的耗時(shí)不都是 3 秒左右嗎,?因?yàn)檫@幾個(gè)任務(wù)被掛起后都是一起等待的。

理論來(lái)說(shuō)確實(shí)是這樣的,,不過(guò)有個(gè)前提,,那就是服務(wù)器在同一時(shí)刻接受無(wú)限次請(qǐng)求都能保證正常返回結(jié)果,也就是服務(wù)器無(wú)限抗壓,,另外還要忽略 IO 傳輸時(shí)延,,確實(shí)可以做到無(wú)限 task 一起執(zhí)行且在預(yù)想時(shí)間內(nèi)得到結(jié)果,。

我們這里將 task 數(shù)量設(shè)置成 100,再試一下:

tasks = [asyncio.ensure_future(request()) for _ in range(100)]

耗時(shí)結(jié)果如下:

Cost time: 3.106252670288086

最后運(yùn)行時(shí)間也是在 3 秒左右,,當(dāng)然多出來(lái)的時(shí)間就是 IO 時(shí)延了,。

可見(jiàn),使用了異步協(xié)程之后,,我們幾乎可以在相同的時(shí)間內(nèi)實(shí)現(xiàn)成百上千倍次的網(wǎng)絡(luò)請(qǐng)求,,把這個(gè)運(yùn)用在爬蟲(chóng)中,速度提升可謂是非??捎^了,。

與單進(jìn)程、多進(jìn)程對(duì)比

可能有的小伙伴非常想知道上面的例子中,,如果 100 次請(qǐng)求,,不是用異步協(xié)程的話,使用單進(jìn)程和多進(jìn)程會(huì)耗費(fèi)多少時(shí)間,,我們來(lái)測(cè)試一下:

首先來(lái)測(cè)試一下單進(jìn)程的時(shí)間:

import requests
import time
start = time.time()
def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = requests.get(url).text
print('Get response from', url, 'Result:', result)
for _ in range(100):
request()
end = time.time()
print('Cost time:', end - start)

最后耗時(shí):

Cost time: 305.16639709472656

接下來(lái)我們使用多進(jìn)程來(lái)測(cè)試下,,使用 multiprocessing 庫(kù):

import requests
import time
import multiprocessing
start = time.time()
def request(_):
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = requests.get(url).text
print('Get response from', url, 'Result:', result)
cpu_count = multiprocessing.cpu_count()
print('Cpu count:', cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(request, range(100))
end = time.time()
print('Cost time:', end - start)

這里我使用了multiprocessing 里面的 Pool 類,即進(jìn)程池,。我的電腦的 CPU 個(gè)數(shù)是 8 個(gè),,這里的進(jìn)程池的大小就是 8。

運(yùn)行時(shí)間:

Cost time: 48.17306900024414

可見(jiàn) multiprocessing 相比單線程來(lái)說(shuō),,還是可以大大提高效率的,。

與多進(jìn)程的結(jié)合

既然異步協(xié)程和多進(jìn)程對(duì)網(wǎng)絡(luò)請(qǐng)求都有提升,那么為什么不把二者結(jié)合起來(lái)呢,?在最新的 PyCon 2018 上,,來(lái)自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點(diǎn),并開(kāi)發(fā)了一個(gè)新的庫(kù),,叫做 aiomultiprocess,,感興趣的可以了解下

這個(gè)庫(kù)的安裝方式是:

pip3 install aiomultiprocess

需要 Python 3.6 及更高版本才可使用。

使用這個(gè)庫(kù),,我們可以將上面的例子改寫如下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
urls = [url for _ in range(100)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
end = time.time()
print('Cost time:', end - start)

這樣就會(huì)同時(shí)使用多進(jìn)程和異步協(xié)程進(jìn)行請(qǐng)求,,當(dāng)然最后的結(jié)果其實(shí)和異步是差不多的:

Cost time: 3.1156570434570312

因?yàn)槲业臏y(cè)試接口的原因,最快的響應(yīng)也是 3 秒,,所以這部分多余的時(shí)間基本都是 IO 傳輸時(shí)延,。但在真實(shí)情況下,,我們?cè)谧雠廊〉臅r(shí)候遇到的情況千變?nèi)f化,,一方面我們使用異步協(xié)程來(lái)防止阻塞,另一方面我們使用 multiprocessing 來(lái)利用多核成倍加速,,節(jié)省時(shí)間其實(shí)還是非??捎^的,。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多