來源:伯樂在線專欄作者-人世間 鏈接:http://python./85177/
線程和進(jìn)程
計(jì)算機(jī),用于計(jì)算的機(jī)器,。計(jì)算機(jī)的核心是CPU,,在現(xiàn)在多核心的電腦很常見了,。為了充分利用cpu核心做計(jì)算任務(wù),程序?qū)崿F(xiàn)了多線程模型,。通過多線程實(shí)現(xiàn)多任務(wù)的并行執(zhí)行,。
現(xiàn)在的操作系統(tǒng)多是多任務(wù)操作系統(tǒng)。每個(gè)應(yīng)用程序都有一個(gè)自己的進(jìn)程,。操作系統(tǒng)會(huì)為這些進(jìn)程分配一些執(zhí)行資源,,例如內(nèi)存空間等。在進(jìn)程中,,又可以創(chuàng)建一些線程,,他們共享這些內(nèi)存空間,并由操作系統(tǒng)調(diào)用,,以便并行計(jì)算,。
線程狀態(tài)
創(chuàng)建線程之后,線程并不是始終保持一個(gè)狀態(tài),。其狀態(tài)大概如下:
這些狀態(tài)之間是可以相互轉(zhuǎn)換的,一圖勝千顏色:
threading_state (圖片引用 內(nèi)心求法博客)
線程中執(zhí)行到阻塞,,可能有3種情況:
同步:線程中獲取同步鎖,,但是資源已經(jīng)被其他線程鎖定時(shí),進(jìn)入Locked狀態(tài),,直到該資源可獲?。ǐ@取的順序由Lock隊(duì)列控制) 睡眠:線程運(yùn)行sleep()或join()方法后,線程進(jìn)入Sleeping狀態(tài),。區(qū)別在于sleep等待固定的時(shí)間,,而join是等待子線程執(zhí)行完。當(dāng)然join也可以指定一個(gè)“超時(shí)時(shí)間”,。從語義上來說,,如果兩個(gè)線程a,b, 在a中調(diào)用b.join(),相當(dāng)于合并(join)成一個(gè)線程,。最常見的情況是在主線程中join所有的子線程,。 等待:線程中執(zhí)行wait()方法后,線程進(jìn)入Waiting狀態(tài),,等待其他線程的通知(notify),。
線程類型
線程有著不同的狀態(tài),也有不同的類型,。大致可分為:
Python線程與GIL
相比進(jìn)程,,線程更加輕量,,可以實(shí)現(xiàn)并發(fā)??墒窃趐ython的世界里,,對于線程,就不得不說一句GIL(全局解釋器鎖),。GIL的存在讓python的多線程多少有點(diǎn)雞肋了,。Cpython的線程是操作系統(tǒng)原生的線程在解釋器解釋執(zhí)行任何Python代碼時(shí),都需要先獲得這把鎖才行,,在遇到 I/O 操作時(shí)會(huì)釋放這把鎖,。因?yàn)閜ython的進(jìn)程做為一個(gè)整體,解釋器進(jìn)程內(nèi)只有一個(gè)線程在執(zhí)行,,其它的線程都處于等待狀態(tài)等著GIL的釋放,。
關(guān)于GIL可以有更多的趣事,一時(shí)半會(huì)都說不完,??傊畃ython想用多線程并發(fā),效果可能還不如單線程(線程切換耗時(shí)間),。想要利用多核,,可以考慮使用多進(jìn)程,。
線程的創(chuàng)建
雖然python線程比較雞肋,可是也并發(fā)一無是處,。多了解還是有理由對并發(fā)模型的理解,。
Python提供兩個(gè)模塊進(jìn)行多線程的操作,分別是thread和threading,,前者是比較低級的模塊,,用于更底層的操作,,一般應(yīng)用級別的開發(fā)不常用。后者則封裝了更多高級的接口,,類似java的多線程風(fēng)格,,提供run方法和start調(diào)用。
import time import threading class MyThread(threading.Thread): def run(self): for i in range(5): print 'thread {}, @number: {}'.format(self.name, i) time.sleep(1) def main(): print 'Start main threading' # 創(chuàng)建三個(gè)線程 threads = [MyThread() for i in range(3)] # 啟動(dòng)三個(gè)線程 for t in threads: t.start() print 'End Main threading' if __name__ == '__main__': main()
輸入如下:(不同的環(huán)境不一樣)
Start main threading thread Thread-1, @number: 0 thread Thread-2, @number: 0 thread Thread-3, @number: 0 End Main threading thread Thread-1, @number: 1 thread Thread-3, @number: 1 thread Thread-2, @number: 1 thread Thread-3, @number: 2 thread Thread-1, @number: 2 thread Thread-2, @number: 2 thread Thread-2, @number: 3 thread Thread-1, @number: 3 thread Thread-3, @number: 3
每個(gè)線程都依次打印 0 – 3 三個(gè)數(shù)字,可是從輸出的結(jié)果觀察,線程并不是順序的執(zhí)行,而是三個(gè)線程之間相互交替執(zhí)行。此外,我們的主線程執(zhí)行結(jié)束,將會(huì)打印 End Main threading。從輸出結(jié)果可以知道,主線程結(jié)束后,,新建的線程還在運(yùn)行,。
線程合并(join方法)
上述的例子中,主線程結(jié)束了,子線程還在運(yùn)行,。如果需要主線程等待子線程執(zhí)行完畢再退出,,可是使用線程的join方法,。join方法官網(wǎng)文檔大概是
join(timeout)方法將會(huì)等待直到線程結(jié)束。這將阻塞正在調(diào)用的線程,,直到被調(diào)用join()方法的線程結(jié)束,。
主線程或者某個(gè)函數(shù)如果創(chuàng)建了子線程,只要調(diào)用了子線程的join方法,,那么主線程就會(huì)被子線程所阻塞,,直到子線程執(zhí)行完畢再輪到主線程執(zhí)行,。其結(jié)果就是所有子線程執(zhí)行完畢,,才打印 End Main threading。只需要修改上面的main函數(shù)
def main(): print 'Start main threading' threads = [MyThread() for i in range(3)] for t in threads: t.start() # 一次讓新創(chuàng)建的線程執(zhí)行 join for t in threads: t.join() print 'End Main threading'
輸入如下:
Start main threading thread Thread-1, @number: 0 thread Thread-2, @number: 0 thread Thread-3, @number: 0 thread Thread-2, @number: 1 .... thread Thread-3, @number: 4 End Main threading Process finished with exit code 0
所有子線程結(jié)束了才會(huì)執(zhí)行也行print 'End Main threading',。有人會(huì)這么想,,如果在 t.start()之后join會(huì)怎么樣,?結(jié)果也能阻塞主線程,,但是每個(gè)線程都是依次執(zhí)行,變得有順序了。其實(shí)join很好理解,,就字面上的意思就是子線程 “加入”(join)主線程嘛,。在CPU執(zhí)行時(shí)間片段上“等于”主線程的一部分。在start之后join,,也就是每個(gè)子線程由被后來新建的子線程給阻塞了,,因此線程之間變得有順序了。
借用moxie的總結(jié):
1 join方法的作用是阻塞主進(jìn)程(擋住,,無法執(zhí)行join以后的語句),,專注執(zhí)行多線程。
2 多線程多join的情況下,,依次執(zhí)行各線程的join方法,,前頭一個(gè)結(jié)束了才能執(zhí)行后面一個(gè)。
3 無參數(shù),,則等待到該線程結(jié)束,,才開始執(zhí)行下一個(gè)線程的join。
4 設(shè)置參數(shù)后,,則等待該線程這么長時(shí)間就不管它了(而該線程并沒有結(jié)束),。不管的意思就是可以執(zhí)行后面的主進(jìn)程了
線程同步與互斥鎖
線程之所以比進(jìn)程輕量,其中一個(gè)原因就是他們共享內(nèi)存,。也就是各個(gè)線程可以平等的訪問內(nèi)存的數(shù)據(jù),,如果在短時(shí)間“同時(shí)并行”讀取修改內(nèi)存的數(shù)據(jù),很可能造成數(shù)據(jù)不同步,。例如下面的例子:
count = 0 class MyThread(threading.Thread): def run(self): global count time.sleep(1) for i in range(100): count += 1 print 'thread {} add 1, count is {}'.format(self.name, count) def main(): print 'Start main threading' for i in range(10): MyThread().start() print 'End Main threading'
輸出結(jié)果如下,,十個(gè)線程,,每個(gè)線程增加100,運(yùn)算結(jié)果應(yīng)該是1000:
Start main threading End Main threading thread Thread-6 add 1, count is 161thread Thread-1 add 1, count is 433 thread Thread-7 add 1, count is 482 thread Thread-2 add 1, count is 100 thread Thread-9 add 1, count is 125 thread Thread-8 add 1, count is 335 thread Thread-5 add 1, count is 533thread Thread-3 add 1, count is 533 thread Thread-10 add 1, count is 261 thread Thread-4 add 1, count is 308
為了避免線程不同步造成是數(shù)據(jù)不同步,,可以對資源進(jìn)行加鎖,。也就是訪問資源的線程需要獲得鎖,才能訪問,。threading模塊正好提供了一個(gè)Lock功能,,修改代碼如下:
# 創(chuàng)建鎖 mutex = threading.Lock() class MyThread(threading.Thread): def run(self): global count time.sleep(1) # 獲取鎖,修改資源 if mutex.acquire(): for i in range(100): count += 1 print 'thread {} add 1, count is {}'.format(self.name, count) # 釋放鎖 mutex.release()
死鎖
有鎖就可以方便處理線程同步問題,,可是多線程的復(fù)雜度和難以調(diào)試的根源也來自于線程的鎖,。利用不當(dāng),甚至?xí)砀鄦栴},。比如死鎖就是需要避免的問題,。
mutex_a = threading.Lock() mutex_b = threading.Lock() class MyThread(threading.Thread): def task_a(self): if mutex_a.acquire(): print 'thread {} get mutex a '.format(self.name) time.sleep(1) if mutex_b.acquire(): print 'thread {} get mutex b '.format(self.name) mutex_b.release() mutex_a.release() def task_b(self): if mutex_b.acquire(): print 'thread {} get mutex a '.format(self.name) time.sleep(1) if mutex_a.acquire(): print 'thread {} get mutex b '.format(self.name) mutex_a.release() mutex_b.release() def run(self): self.task_a() self.task_b() def main(): print 'Start main threading' threads = [MyThread() for i in range(2)] for t in threads: t.start() print 'End Main threading'
線程需要執(zhí)行兩個(gè)任務(wù),兩個(gè)任務(wù)都需要獲取鎖,,然而兩個(gè)任務(wù)先得到鎖后,,就需要等另外鎖釋放。
可重入鎖
為了支持在同一線程中多次請求同一資源,,python提供了可重入鎖(RLock),。RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),,從而使得資源可以被多次require,。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源,。
mutex = threading.RLock() class MyThread(threading.Thread): def run(self): if mutex.acquire(1): print 'thread {} get mutex'.format(self.name) time.sleep(1) mutex.acquire() mutex.release() mutex.release() def main(): print 'Start main threading' threads = [MyThread() for i in range(2)] for t in threads: t.start() print 'End Main threading'
條件變量
實(shí)用鎖可以達(dá)到線程同步,,前面的互斥鎖就是這種機(jī)制。更復(fù)雜的環(huán)境,,需要針對鎖進(jìn)行一些條件判斷,。Python提供了Condition對象。它除了具有acquire和release方法之外,,還提供了wait和notify方法,。線程首先acquire一個(gè)條件變量鎖。如果條件不足,,則該線程wait,,如果滿足就執(zhí)行線程,甚至可以notify其他線程,。其他處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件,。
條件變量可以看成不同的線程先后acquire獲得鎖,如果不滿足條件,,可以理解為被扔到一個(gè)(Lock或RLock)的waiting池,。直達(dá)其他線程notify之后再重新判斷條件,。該模式常用于生成消費(fèi)者模式:
queue = [] con = threading.Condition() class Producer(threading.Thread): def run(self): while True: if con.acquire(): if len(queue) > 100: con.wait() else: elem = random.randrange(100) queue.append(elem) print 'Producer a elem {}, Now size is {}'.format(elem, len(queue)) time.sleep(random.random()) con.notify() con.release() class Consumer(threading.Thread): def run(self): while True: if con.acquire(): if len(queue) <>0: con.wait() else: elem = queue.pop() print 'Consumer a elem {}. Now size is {}'.format(elem, len(queue)) time.sleep(random.random()) con.notify() con.release() def main(): for i in range(3): Producer().start() for i in range(2): Consumer().start()
上述就是一個(gè)簡單的生產(chǎn)者消費(fèi)模型,先看生產(chǎn)者,,生產(chǎn)者條件變量鎖之后就檢查條件,如果不符合條件則wait,,wait的時(shí)候會(huì)釋放鎖,。如果條件符合,則往隊(duì)列添加元素,,然后會(huì)notify其他線程,。注意生產(chǎn)者調(diào)用了condition的notify()方法后,消費(fèi)者被喚醒,,但喚醒不意味著它可以開始運(yùn)行,,notify()并不釋放lock,調(diào)用notify()后,,lock依然被生產(chǎn)者所持有,。生產(chǎn)者通過con.release()顯式釋放lock。消費(fèi)者再次開始運(yùn)行,,獲得條件鎖然后判斷條件執(zhí)行,。
隊(duì)列
生產(chǎn)消費(fèi)者模型主要是對隊(duì)列進(jìn)程操作,貼心的Python為我們實(shí)現(xiàn)了一個(gè)隊(duì)列結(jié)構(gòu),,隊(duì)列內(nèi)部實(shí)現(xiàn)了鎖的相關(guān)設(shè)置,。可以用隊(duì)列重寫生產(chǎn)消費(fèi)者模型,。
import Queue queue = Queue.Queue(10) class Producer(threading.Thread): def run(self): while True: elem = random.randrange(100) queue.put(elem) print 'Producer a elem {}, Now size is {}'.format(elem, queue.qsize()) time.sleep(random.random()) class Consumer(threading.Thread): def run(self): while True: elem = queue.get() queue.task_done() print 'Consumer a elem {}. Now size is {}'.format(elem, queue.qsize()) time.sleep(random.random()) def main(): for i in range(3): Producer().start() for i in range(2): Consumer().start()
queue內(nèi)部實(shí)現(xiàn)了相關(guān)的鎖,,如果queue的為空,則get元素的時(shí)候會(huì)被阻塞,,知道隊(duì)列里面被其他線程寫入數(shù)據(jù),。同理,當(dāng)寫入數(shù)據(jù)的時(shí)候,,如果元素個(gè)數(shù)大于隊(duì)列的長度,,也會(huì)被阻塞。也就是在 put 或 get的時(shí)候都會(huì)獲得Lock,。
線程通信
線程可以讀取共享的內(nèi)存,,通過內(nèi)存做一些數(shù)據(jù)處理。這就是線程通信的一種,,python還提供了更加高級的線程通信接口,。Event對象可以用來進(jìn)行線程通信,調(diào)用event對象的wait方法,,線程則會(huì)阻塞等待,,直到別的線程set之后,,才會(huì)被喚醒。
class MyThread(threading.Thread): def __init__(self, event): super(MyThread, self).__init__() self.event = event def run(self): print 'thread {} is ready '.format(self.name) self.event.wait() print 'thread {} run'.format(self.name) signal = threading.Event() def main(): start = time.time() for i in range(3): t = MyThread(signal) t.start() time.sleep(3) print 'after {}s'.format(time.time() - start) signal.set()
上面的例子創(chuàng)建了3個(gè)線程,,調(diào)用線程之后,,線程將會(huì)被阻塞,sleep 3秒后,,才會(huì)被喚醒執(zhí)行,,大概輸出如下:
thread Thread-1 is ready thread Thread-2 is ready thread Thread-3 is ready after 3.00441598892s thread Thread-2 run thread Thread-3 run thread Thread-1 run
后臺線程
默認(rèn)情況下,主線程退出之后,,即使子線程沒有join,。那么主線程結(jié)束后,子線程也依然會(huì)繼續(xù)執(zhí)行,。如果希望主線程退出后,,其子線程也退出而不再執(zhí)行,則需要設(shè)置子線程為后臺線程,。python提供了seDeamon方法:
class MyThread(threading.Thread): def run(self): wait_time = random.randrange(1, 10) print 'thread {} will wait {}s'.format(self.name, wait_time) time.sleep(wait_time) print 'thread {} finished'.format(self.name) def main(): print 'Start main threading' for i in range(5): t = MyThread() t.setDaemon(True) t.start() print 'End Main threading'
輸出結(jié)果如下:
Start main threading thread Thread-1 will wait 3s thread Thread-2 will wait 6s thread Thread-3 will wait 4s thread Thread-4 will wait 6s thread Thread-5 will wait 2sEnd Main threading
每個(gè)線程都應(yīng)該等待sleep幾秒,,可是主線程很快就執(zhí)行完了,子線程因?yàn)樵O(shè)置了后臺線程,,所以也跟著主線程退出了,。
關(guān)于Python多線程的介紹暫且就這些,多線程用于并發(fā)任務(wù),。對于并發(fā)模型,,Python還有比線程更好的方法。同樣設(shè)計(jì)任務(wù)的時(shí)候,,也需要考慮是計(jì)算密集型還是IO密集型,。針對不同的場景,設(shè)計(jì)不同的程式系統(tǒng),。
文中的代碼 learn-threading(https://github.com/rsj217/flask--scaffold/tree/master/learn-threading)
參考資料:
1 http://www.cnblogs.com/holbrook/tag/%E5%A4%9A%E7%BA%BF%E7%A8%8B/ 2 http:///python-thread-gil-and-ctypes.html
|