Python多線程學(xué)習(xí)資料Python中使用線程有兩種方式:函數(shù)或者用類來包裝線程對象
- 一、Python中的線程使用:
Python中使用線程有兩種方式:函數(shù)或者用類來包裝線程對象。 1,、 函數(shù)式:調(diào)用thread模塊中的start_new_thread()函數(shù)來產(chǎn)生新線程。如下例: 復(fù)制代碼 代碼如下:
import time import thread def timer(no, interval): cnt = 0 while cnt<10: print 'Thread:(%d) Time:%s\n'%(no, time.ctime()) time.sleep(interval) cnt+=1 thread.exit_thread() def test(): #Use thread.start_new_thread() to create 2 new threads thread.start_new_thread(timer, (1,1)) thread.start_new_thread(timer, (2,2)) if __name__=='__main__': test() 上面的例子定義了一個(gè)線程函數(shù)timer,它打印出10條時(shí)間記錄后退出,,每次打印的間隔由interval參數(shù)決定,。thread.start_new_thread(function, args[, kwargs])的第一個(gè)參數(shù)是線程函數(shù)(本例中的timer方法),第二個(gè)參數(shù)是傳遞給線程函數(shù)的參數(shù),,它必須是tuple類型,,kwargs是可選參數(shù)。 線程的結(jié)束可以等待線程自然結(jié)束,,也可以在線程函數(shù)中調(diào)用thread.exit()或thread.exit_thread()方法,。 2、 創(chuàng)建threading.Thread的子類來包裝一個(gè)線程對象,,如下例: 復(fù)制代碼 代碼如下:
import threading import time class timer(threading.Thread): #The timer class is derived from the class threading.Thread def __init__(self, num, interval): threading.Thread.__init__(self) self.thread_num = num self.interval = interval self.thread_stop = False def run(self): #Overwrite run() method, put what you want the thread do here while not self.thread_stop: print 'Thread Object(%d), Time:%s\n' %(self.thread_num, time.ctime()) time.sleep(self.interval) def stop(self): self.thread_stop = True def test(): thread1 = timer(1, 1) thread2 = timer(2, 2) thread1.start() thread2.start() time.sleep(10) thread1.stop() thread2.stop() return if __name__ == '__main__': test() 就我個(gè)人而言,,比較喜歡第二種方式,即創(chuàng)建自己的線程類,,必要時(shí)重寫threading.Thread類的方法,,線程的控制可以由自己定制。 threading.Thread類的使用: 1,,在自己的線程類的__init__里調(diào)用threading.Thread.__init__(self, name = threadname) Threadname為線程的名字 2,, run(),通常需要重寫,,編寫代碼實(shí)現(xiàn)做需要的功能,。 3,getName(),獲得線程對象名稱 4,,setName(),,設(shè)置線程對象名稱 5,start(),,啟動線程 6,,jion([timeout]),等待另一線程結(jié)束后再運(yùn)行,。 7,,setDaemon(bool),設(shè)置子線程是否隨主線程一起結(jié)束,,必須在start()之前調(diào)用,。默認(rèn)為False。 8,,isDaemon(),,判斷線程是否隨主線程一起結(jié)束。 9,,isAlive(),,檢查線程是否在運(yùn)行中。 此外threading模塊本身也提供了很多方法和其他的類,,可以幫助我們更好的使用和管理線程,。可以參看http://www./doc/2.5.2/lib/module-threading.html,。 假設(shè)兩個(gè)線程對象t1和t2都要對num=0進(jìn)行增1運(yùn)算,,t1和t2都各對num修改10次,num的最終的結(jié)果應(yīng)該為20,。但是由于是多線程訪問,,有可能出現(xiàn)下面情況:在num=0時(shí),t1取得num=0,。系統(tǒng)此時(shí)把t1調(diào)度為”sleeping”狀態(tài),,把t2轉(zhuǎn)換為”running”狀態(tài),t2頁獲得num=0,。然后t2對得到的值進(jìn)行加1并賦給num,,使得num=1。然后系統(tǒng)又把t2調(diào)度為”sleeping”,,把t1轉(zhuǎn)為”running”,。線程t1又把它之前得到的0加1后賦值給num。這樣,,明明t1和t2都完成了1次加1工作,但結(jié)果仍然是num=1。 上面的case描述了多線程情況下最常見的問題之一:數(shù)據(jù)共享,。當(dāng)多個(gè)線程都要去修改某一個(gè)共享數(shù)據(jù)的時(shí)候,,我們需要對數(shù)據(jù)訪問進(jìn)行同步。 1,、 簡單的同步 最簡單的同步機(jī)制就是“鎖”,。鎖對象由threading.RLock類創(chuàng)建。線程可以使用鎖的acquire()方法獲得鎖,,這樣鎖就進(jìn)入“l(fā)ocked”狀態(tài),。每次只有一個(gè)線程可以獲得鎖。如果當(dāng)另一個(gè)線程試圖獲得這個(gè)鎖的時(shí)候,,就會被系統(tǒng)變?yōu)椤癰locked”狀態(tài),,直到那個(gè)擁有鎖的線程調(diào)用鎖的release()方法來釋放鎖,這樣鎖就會進(jìn)入“unlocked”狀態(tài),?!癰locked”狀態(tài)的線程就會收到一個(gè)通知,并有權(quán)利獲得鎖,。如果多個(gè)線程處于“blocked”狀態(tài),,所有線程都會先解除“blocked”狀態(tài),然后系統(tǒng)選擇一個(gè)線程來獲得鎖,,其他的線程繼續(xù)沉默(“blocked”),。 Python中的thread模塊和Lock對象是Python提供的低級線程控制工具,使用起來非常簡單,。如下例所示: 復(fù)制代碼 代碼如下:
import thread import time mylock = thread.allocate_lock() #Allocate a lock num=0 #Shared resource def add_num(name): global num while True: mylock.acquire() #Get the lock # Do something to the shared resource print 'Thread %s locked! num=%s'%(name,str(num)) if num >= 5: print 'Thread %s released! num=%s'%(name,str(num)) mylock.release() thread.exit_thread() num+=1 print 'Thread %s released! num=%s'%(name,str(num)) mylock.release() #Release the lock. def test(): thread.start_new_thread(add_num, ('A',)) thread.start_new_thread(add_num, ('B',)) if __name__== '__main__': test() Python 在thread的基礎(chǔ)上還提供了一個(gè)高級的線程控制庫,就是之前提到過的threading,。Python的threading module是在建立在thread module基礎(chǔ)之上的一個(gè)module,,在threading module中,暴露了許多thread module中的屬性,。在thread module中,,python提供了用戶級的線程同步工具“Lock”對象。而在threading module中,,python又提供了Lock對象的變種: RLock對象,。RLock對象內(nèi)部維護(hù)著一個(gè)Lock對象,它是一種可重入的對象,。對于Lock對象而言,,如果一個(gè)線程連續(xù)兩次進(jìn)行acquire操作,那么由于第一次acquire之后沒有release,,第二次acquire將掛起線程,。這會導(dǎo)致Lock對象永遠(yuǎn)不會release,,使得線程死鎖。RLock對象允許一個(gè)線程多次對其進(jìn)行acquire操作,,因?yàn)樵谄鋬?nèi)部通過一個(gè)counter變量維護(hù)著線程acquire的次數(shù),。而且每一次的acquire操作必須有一個(gè)release操作與之對應(yīng),在所有的release操作完成之后,,別的線程才能申請?jiān)揜Lock對象,。 下面來看看如何使用threading的RLock對象實(shí)現(xiàn)同步。 復(fù)制代碼 代碼如下:
import threading mylock = threading.RLock() num=0 class myThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self) self.t_name = name def run(self): global num while True: mylock.acquire() print '\nThread(%s) locked, Number: %d'%(self.t_name, num) if num>=4: mylock.release() print '\nThread(%s) released, Number: %d'%(self.t_name, num) break num+=1 print '\nThread(%s) released, Number: %d'%(self.t_name, num) mylock.release() def test(): thread1 = myThread('A') thread2 = myThread('B') thread1.start() thread2.start() if __name__== '__main__': test() 我們把修改共享數(shù)據(jù)的代碼成為“臨界區(qū)”,。必須將所有“臨界區(qū)”都封閉在同一個(gè)鎖對象的acquire和release之間。 2,、 條件同步 鎖只能提供最基本的同步,。假如只在發(fā)生某些事件時(shí)才訪問一個(gè)“臨界區(qū)”,這時(shí)需要使用條件變量Condition,。 Condition對象是對Lock對象的包裝,,在創(chuàng)建Condition對象時(shí),其構(gòu)造函數(shù)需要一個(gè)Lock對象作為參數(shù),,如果沒有這個(gè)Lock對象參數(shù),,Condition將在內(nèi)部自行創(chuàng)建一個(gè)Rlock對象。在Condition對象上,,當(dāng)然也可以調(diào)用acquire和release操作,,因?yàn)閮?nèi)部的Lock對象本身就支持這些操作。但是Condition的價(jià)值在于其提供的wait和notify的語義,。 條件變量是如何工作的呢,?首先一個(gè)線程成功獲得一個(gè)條件變量后,調(diào)用此條件變量的wait()方法會導(dǎo)致這個(gè)線程釋放這個(gè)鎖,,并進(jìn)入“blocked”狀態(tài),,直到另一個(gè)線程調(diào)用同一個(gè)條件變量的notify()方法來喚醒那個(gè)進(jìn)入“blocked”狀態(tài)的線程。如果調(diào)用這個(gè)條件變量的notifyAll()方法的話就會喚醒所有的在等待的線程,。 如果程序或者線程永遠(yuǎn)處于“blocked”狀態(tài)的話,,就會發(fā)生死鎖。所以如果使用了鎖,、條件變量等同步機(jī)制的話,,一定要注意仔細(xì)檢查,防止死鎖情況的發(fā)生,。對于可能產(chǎn)生異常的臨界區(qū)要使用異常處理機(jī)制中的finally子句來保證釋放鎖,。等待一個(gè)條件變量的線程必須用notify()方法顯式的喚醒,否則就永遠(yuǎn)沉默,。保證每一個(gè)wait()方法調(diào)用都有一個(gè)相對應(yīng)的notify()調(diào)用,,當(dāng)然也可以調(diào)用notifyAll()方法以防萬一,。 生產(chǎn)者與消費(fèi)者問題是典型的同步問題。這里簡單介紹兩種不同的實(shí)現(xiàn)方法,。 1,, 條件變量 復(fù)制代碼 代碼如下:
import threading import time class Producer(threading.Thread): def __init__(self, t_name): threading.Thread.__init__(self, name=t_name) def run(self): global x con.acquire() if x > 0: con.wait() else: for i in range(5): x=x+1 print "producing..." + str(x) con.notify() print x con.release() class Consumer(threading.Thread): def __init__(self, t_name): threading.Thread.__init__(self, name=t_name) def run(self): global x con.acquire() if x == 0: print 'consumer wait1' con.wait() else: for i in range(5): x=x-1 print "consuming..." + str(x) con.notify() print x con.release() con = threading.Condition() x=0 print 'start consumer' c=Consumer('consumer') print 'start producer' p=Producer('producer') p.start() c.start() p.join() c.join() print x 上面的例子中,在初始狀態(tài)下,,Consumer處于wait狀態(tài),,Producer連續(xù)生產(chǎn)(對x執(zhí)行增1操作)5次后,notify正在等待的Consumer,。Consumer被喚醒開始消費(fèi)(對x執(zhí)行減1操作) 2,, 同步隊(duì)列 Python中的Queue對象也提供了對線程同步的支持。使用Queue對象可以實(shí)現(xiàn)多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者形成的FIFO的隊(duì)列,。 生產(chǎn)者將數(shù)據(jù)依次存入隊(duì)列,,消費(fèi)者依次從隊(duì)列中取出數(shù)據(jù)。 復(fù)制代碼 代碼如下:
# producer_consumer_queue from Queue import Queue import random import threading import time #Producer thread class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(5): print "%s: %s is producing %d to the queue!\n" %(time.ctime(), self.getName(), i) self.data.put(i) time.sleep(random.randrange(10)/5) print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): for i in range(5): val = self.data.get() print "%s: %s is consuming. %d in the queue is consumed!\n" %(time.ctime(), self.getName(), val) time.sleep(random.randrange(10)) print "%s: %s finished!" %(time.ctime(), self.getName()) #Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer = Consumer('Con.', queue) producer.start() consumer.start() producer.join() consumer.join() print 'All threads terminate!' if __name__ == '__main__': main() 在上面的例子中,,Producer在隨機(jī)的時(shí)間內(nèi)生產(chǎn)一個(gè)“產(chǎn)品”,,放入隊(duì)列中。Consumer發(fā)現(xiàn)隊(duì)列中有了“產(chǎn)品”,,就去消費(fèi)它,。本例中,由于Producer生產(chǎn)的速度快于Consumer消費(fèi)的速度,,所以往往Producer生產(chǎn)好幾個(gè)“產(chǎn)品”后,,Consumer才消費(fèi)一個(gè)產(chǎn)品。 Queue模塊實(shí)現(xiàn)了一個(gè)支持多producer和多consumer的FIFO隊(duì)列,。當(dāng)共享信息需要安全的在多線程之間交換時(shí),,Queue非常有用。Queue的默認(rèn)長度是無限的,,但是可以設(shè)置其構(gòu)造函數(shù)的maxsize參數(shù)來設(shè)定其長度,。Queue的put方法在隊(duì)尾插入,該方法的原型是: put( item[, block[, timeout]]) 如果可選參數(shù)block為true并且timeout為None(缺省值),,線程被block,,直到隊(duì)列空出一個(gè)數(shù)據(jù)單元,。如果timeout大于0,,在timeout的時(shí)間內(nèi),,仍然沒有可用的數(shù)據(jù)單元,,F(xiàn)ull exception被拋出,。反之,,如果block參數(shù)為false(忽略timeout參數(shù)),item被立即加入到空閑數(shù)據(jù)單元中,,如果沒有空閑數(shù)據(jù)單元,F(xiàn)ull exception被拋出,。 Queue的get方法是從隊(duì)首取數(shù)據(jù),其參數(shù)和put方法一樣,。如果block參數(shù)為true且timeout為None(缺省值),,線程被block,直到隊(duì)列中有數(shù)據(jù),。如果timeout大于0,,在timeout時(shí)間內(nèi),仍然沒有可取數(shù)據(jù),,Empty exception被拋出,。反之,,如果block參數(shù)為false(忽略timeout參數(shù)),,隊(duì)列中的數(shù)據(jù)被立即取出,。如果此時(shí)沒有可取數(shù)據(jù),,Empty exception也會被拋出,。 |
|