本文希望達(dá)到的目標(biāo):
- 學(xué)習(xí)Queue模塊
- 將Queue模塊與多線程編程相結(jié)合
- 通過Queue和threading模塊, 重構(gòu)爬蟲, 實(shí)現(xiàn)多線程爬蟲,
- 通過以上學(xué)習(xí)希望總結(jié)出一個通用的多線程爬蟲小模版
1. Queue模塊
Queue 模塊實(shí)現(xiàn)了多生產(chǎn)者多消費(fèi)者隊列, 尤其適合多線程編程.Queue 類中實(shí)現(xiàn)了所有需要的鎖原語 (這句話非常重要), Queue模塊實(shí)現(xiàn)了三種類型隊列:
- FIFO(先進(jìn)先出)隊列, 第一加入隊列的任務(wù), 被第一個取出
- LIFO(后進(jìn)先出)隊列,最后加入隊列的任務(wù), 被第一個取出(操作類似與棧, 總是從棧頂取出, 這個隊列還不清楚內(nèi)部的實(shí)現(xiàn))
- PriorityQueue(優(yōu)先級)隊列, 保持隊列數(shù)據(jù)有序, 最小值被先取出(在C++中我記得優(yōu)先級隊列是可以自己重寫排序規(guī)則的, Python不知道可以嗎)
1.1. 類和異常
import Queue
#類
Queue.Queue(maxsize = 0) #構(gòu)造一個FIFO隊列,maxsize設(shè)置隊列大小的上界, 如果插入數(shù)據(jù)時, 達(dá)到上界會發(fā)生阻塞, 直到隊列可以放入數(shù)據(jù). 當(dāng)maxsize小于或者等于0, 表示不限制隊列的大小(默認(rèn))
Queue.LifoQueue(maxsize = 0) #構(gòu)造一LIFO隊列,maxsize設(shè)置隊列大小的上界, 如果插入數(shù)據(jù)時, 達(dá)到上界會發(fā)生阻塞, 直到隊列可以放入數(shù)據(jù). 當(dāng)maxsize小于或者等于0, 表示不限制隊列的大小(默認(rèn))
Queue.PriorityQueue(maxsize = 0) #構(gòu)造一個優(yōu)先級隊列,,maxsize設(shè)置隊列大小的上界, 如果插入數(shù)據(jù)時, 達(dá)到上界會發(fā)生阻塞, 直到隊列可以放入數(shù)據(jù). 當(dāng)maxsize小于或者等于0, 表示不限制隊列的大小(默認(rèn)). 優(yōu)先級隊列中, 最小值被最先取出
#異常
Queue.Empty #當(dāng)調(diào)用非阻塞的get()獲取空隊列的元素時, 引發(fā)異常
Queue.Full #當(dāng)調(diào)用非阻塞的put()向滿隊列中添加元素時, 引發(fā)異常
1.2. Queue對象
三種隊列對象提供公共的方法
Queue.empty() #如果隊列為空, 返回True(注意隊列為空時, 并不能保證調(diào)用put()不會阻塞); 隊列不空返回False(不空時, 不能保證調(diào)用get()不會阻塞)
Queue.full() #如果隊列為滿, 返回True(不能保證調(diào)用get()不會阻塞), 如果隊列不滿, 返回False(并不能保證調(diào)用put()不會阻塞)
Queue.put(item[, block[, timeout]]) #向隊列中放入元素, 如果可選參數(shù)block為True并且timeout參數(shù)為None(默認(rèn)), 為阻塞型put(). 如果timeout是正數(shù), 會阻塞timeout時間并引發(fā)Queue.Full異常. 如果block為False為非阻塞put
Queue.put_nowait(item) #等價于put(itme, False)
Queue.get([block[, timeout]]) #移除列隊元素并將元素返回, block = True為阻塞函數(shù), block = False為非阻塞函數(shù). 可能返回Queue.Empty異常
Queue.get_nowait() #等價于get(False)
Queue.task_done() #在完成一項工作之后,,Queue.task_done()函數(shù)向任務(wù)已經(jīng)完成的隊列發(fā)送一個信號
Queue.join() #實(shí)際上意味著等到隊列為空,,再執(zhí)行別的操作
下面是官方文檔給多出的多線程模型:
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
2. Queue模塊與線程相結(jié)合
簡單寫了一個Queue和線程結(jié)合的小程序
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
import Queue
SHARE_Q = Queue.Queue() #構(gòu)造一個不限制大小的的隊列
_WORKER_THREAD_NUM = 3 #設(shè)置線程個數(shù)
class MyThread(threading.Thread) :
def __init__(self, func) :
super(MyThread, self).__init__()
self.func = func
def run(self) :
self.func()
def worker() :
global SHARE_Q
while not SHARE_Q.empty():
item = SHARE_Q.get() #獲得任務(wù)
print "Processing : ", item
time.sleep(1)
def main() :
global SHARE_Q
threads = []
for task in xrange(5) : #向隊列中放入任務(wù)
SHARE_Q.put(task)
for i in xrange(_WORKER_THREAD_NUM) :
thread = MyThread(worker)
thread.start()
threads.append(thread)
for thread in threads :
thread.join()
if __name__ == '__main__':
main()
3. 重構(gòu)爬蟲
主要針對之間寫過的豆瓣爬蟲進(jìn)行重構(gòu):
3.1. 豆瓣電影爬蟲重構(gòu)
通過對Queue和線程模型進(jìn)行改寫, 可以寫出下面的爬蟲程序 :
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# 多線程爬取豆瓣Top250的電影名稱
import urllib2, re, string
import threading, Queue, time
import sys
reload(sys)
sys.setdefaultencoding('utf8')
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue() #構(gòu)造一個不限制大小的的隊列
_WORKER_THREAD_NUM = 3 #設(shè)置線程的個數(shù)
class MyThread(threading.Thread) :
def __init__(self, func) :
super(MyThread, self).__init__() #調(diào)用父類的構(gòu)造函數(shù)
self.func = func #傳入線程函數(shù)邏輯
def run(self) :
self.func()
def worker() :
global SHARE_Q
while not SHARE_Q.empty():
url = SHARE_Q.get() #獲得任務(wù)
my_page = get_page(url) #爬取整個網(wǎng)頁的HTML代碼
find_title(my_page) #獲得當(dāng)前頁面的電影名
time.sleep(1)
SHARE_Q.task_done()
完整代碼請查看Github豆瓣多線程爬蟲 完成這個程序后, 又出現(xiàn)了新的問題:
無法保證數(shù)據(jù)的順序性, 因?yàn)榫€程是并發(fā)的, 思考的方法是: 設(shè)置一個主線程進(jìn)行管理, 然后他們的線程工作
4. 通用的多線程爬蟲小模版
下面是根據(jù)上面的爬蟲做了點(diǎn)小改動后形成的模板
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
import Queue
SHARE_Q = Queue.Queue() #構(gòu)造一個不限制大小的的隊列
_WORKER_THREAD_NUM = 3 #設(shè)置線程的個數(shù)
class MyThread(threading.Thread) :
"""
doc of class
Attributess:
func: 線程函數(shù)邏輯
"""
def __init__(self, func) :
super(MyThread, self).__init__() #調(diào)用父類的構(gòu)造函數(shù)
self.func = func #傳入線程函數(shù)邏輯
def run(self) :
"""
重寫基類的run方法
"""
self.func()
def do_something(item) :
"""
運(yùn)行邏輯, 比如抓站
"""
print item
def worker() :
"""
主要用來寫工作邏輯, 只要隊列不空持續(xù)處理
隊列為空時, 檢查隊列, 由于Queue中已經(jīng)包含了wait,
notify和鎖, 所以不需要在取任務(wù)或者放任務(wù)的時候加鎖解鎖
"""
global SHARE_Q
while True :
if not SHARE_Q.empty():
item = SHARE_Q.get() #獲得任務(wù)
do_something(item)
time.sleep(1)
SHARE_Q.task_done()
def main() :
global SHARE_Q
threads = []
#向隊列中放入任務(wù), 真正使用時, 應(yīng)該設(shè)置為可持續(xù)的放入任務(wù)
for task in xrange(5) :
SHARE_Q.put(task)
#開啟_WORKER_THREAD_NUM個線程
for i in xrange(_WORKER_THREAD_NUM) :
thread = MyThread(worker)
thread.start() #線程開始處理任務(wù)
threads.append(thread)
for thread in threads :
thread.join()
#等待所有任務(wù)完成
SHARE_Q.join()
if __name__ == '__main__':
main()
我感覺其實(shí)這個多線程挺凌亂的, 希望以后自己能重構(gòu)
5. 思考更高效的爬蟲方法
- 使用twisted進(jìn)行異步IO抓取
- 使用
Scrapy 框架(Scrapy 使用了 Twisted 異步網(wǎng)絡(luò)庫來處理網(wǎng)絡(luò)通訊)
6. 參考鏈接
Queue官方文檔 Twisted英文入門指南 Twisted中文入門指南
|