久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Python 多線程

 River_LaLaLa 2016-08-21


來源:伯樂在線專欄作者-人世間 

鏈接:http://python./85177/


線程和進(jìn)程


計(jì)算機(jī),用于計(jì)算的機(jī)器,。計(jì)算機(jī)的核心是CPU,,在現(xiàn)在多核心的電腦很常見了,。為了充分利用cpu核心做計(jì)算任務(wù),程序?qū)崿F(xiàn)了多線程模型,。通過多線程實(shí)現(xiàn)多任務(wù)的并行執(zhí)行,。


現(xiàn)在的操作系統(tǒng)多是多任務(wù)操作系統(tǒng)。每個(gè)應(yīng)用程序都有一個(gè)自己的進(jìn)程,。操作系統(tǒng)會(huì)為這些進(jìn)程分配一些執(zhí)行資源,,例如內(nèi)存空間等。在進(jìn)程中,,又可以創(chuàng)建一些線程,,他們共享這些內(nèi)存空間,并由操作系統(tǒng)調(diào)用,,以便并行計(jì)算,。


線程狀態(tài)


創(chuàng)建線程之后,線程并不是始終保持一個(gè)狀態(tài),。其狀態(tài)大概如下:


  • New 創(chuàng)建,。

  • Runnable 就緒。等待調(diào)度

  • Running 運(yùn)行,。

  • Blocked 阻塞,。阻塞可能在 Wait Locked Sleeping

  • Dead 消亡


這些狀態(tài)之間是可以相互轉(zhuǎn)換的,一圖勝千顏色:



threading_state

(圖片引用 內(nèi)心求法博客)


線程中執(zhí)行到阻塞,,可能有3種情況:


  • 同步:線程中獲取同步鎖,,但是資源已經(jīng)被其他線程鎖定時(shí),進(jìn)入Locked狀態(tài),,直到該資源可獲?。ǐ@取的順序由Lock隊(duì)列控制)

  • 睡眠:線程運(yùn)行sleep()或join()方法后,線程進(jìn)入Sleeping狀態(tài),。區(qū)別在于sleep等待固定的時(shí)間,,而join是等待子線程執(zhí)行完。當(dāng)然join也可以指定一個(gè)“超時(shí)時(shí)間”,。從語義上來說,,如果兩個(gè)線程a,b, 在a中調(diào)用b.join(),相當(dāng)于合并(join)成一個(gè)線程,。最常見的情況是在主線程中join所有的子線程,。

  • 等待:線程中執(zhí)行wait()方法后,線程進(jìn)入Waiting狀態(tài),,等待其他線程的通知(notify),。


線程類型


線程有著不同的狀態(tài),也有不同的類型,。大致可分為:


  • 主線程

  • 子線程

  • 守護(hù)線程(后臺線程)

  • 前臺線程


Python線程與GIL


相比進(jìn)程,,線程更加輕量,,可以實(shí)現(xiàn)并發(fā)??墒窃趐ython的世界里,,對于線程,就不得不說一句GIL(全局解釋器鎖),。GIL的存在讓python的多線程多少有點(diǎn)雞肋了,。Cpython的線程是操作系統(tǒng)原生的線程在解釋器解釋執(zhí)行任何Python代碼時(shí),都需要先獲得這把鎖才行,,在遇到 I/O 操作時(shí)會(huì)釋放這把鎖,。因?yàn)閜ython的進(jìn)程做為一個(gè)整體,解釋器進(jìn)程內(nèi)只有一個(gè)線程在執(zhí)行,,其它的線程都處于等待狀態(tài)等著GIL的釋放,。


關(guān)于GIL可以有更多的趣事,一時(shí)半會(huì)都說不完,??傊畃ython想用多線程并發(fā),效果可能還不如單線程(線程切換耗時(shí)間),。想要利用多核,,可以考慮使用多進(jìn)程,。


線程的創(chuàng)建


雖然python線程比較雞肋,可是也并發(fā)一無是處,。多了解還是有理由對并發(fā)模型的理解,。


Python提供兩個(gè)模塊進(jìn)行多線程的操作,分別是thread和threading,,前者是比較低級的模塊,,用于更底層的操作,,一般應(yīng)用級別的開發(fā)不常用。后者則封裝了更多高級的接口,,類似java的多線程風(fēng)格,,提供run方法和start調(diào)用。


import time

import threading

 

class MyThread(threading.Thread):

    def run(self):

        for i in range(5):

            print 'thread {}, @number: {}'.format(self.name, i)

            time.sleep(1)

 

def main():

    print 'Start main threading'

    # 創(chuàng)建三個(gè)線程

    threads = [MyThread() for i in range(3)]

    # 啟動(dòng)三個(gè)線程

    for t in threads:

        t.start()

 

    print 'End Main threading'

 

 

if __name__ == '__main__':

    main()


輸入如下:(不同的環(huán)境不一樣)


Start main threading

thread Thread-1, @number: 0

thread Thread-2, @number: 0

thread Thread-3, @number: 0

End Main threading

thread Thread-1, @number: 1

thread Thread-3, @number: 1

thread Thread-2, @number: 1

thread Thread-3, @number: 2

thread Thread-1, @number: 2

thread Thread-2, @number: 2

thread Thread-2, @number: 3

thread Thread-1, @number: 3

thread Thread-3, @number: 3


每個(gè)線程都依次打印 0 – 3 三個(gè)數(shù)字,可是從輸出的結(jié)果觀察,線程并不是順序的執(zhí)行,而是三個(gè)線程之間相互交替執(zhí)行。此外,我們的主線程執(zhí)行結(jié)束,將會(huì)打印 End Main threading。從輸出結(jié)果可以知道,主線程結(jié)束后,,新建的線程還在運(yùn)行,。


線程合并(join方法)


上述的例子中,主線程結(jié)束了,子線程還在運(yùn)行,。如果需要主線程等待子線程執(zhí)行完畢再退出,,可是使用線程的join方法,。join方法官網(wǎng)文檔大概是


join(timeout)方法將會(huì)等待直到線程結(jié)束。這將阻塞正在調(diào)用的線程,,直到被調(diào)用join()方法的線程結(jié)束,。


主線程或者某個(gè)函數(shù)如果創(chuàng)建了子線程,只要調(diào)用了子線程的join方法,,那么主線程就會(huì)被子線程所阻塞,,直到子線程執(zhí)行完畢再輪到主線程執(zhí)行,。其結(jié)果就是所有子線程執(zhí)行完畢,,才打印 End Main threading。只需要修改上面的main函數(shù)


def main():

    print 'Start main threading'

 

    threads = [MyThread() for i in range(3)]

 

    for t in threads:

        t.start()

 

    # 一次讓新創(chuàng)建的線程執(zhí)行 join

    for t in threads:

        t.join()

 

    print 'End Main threading'


輸入如下:


Start main threading

thread Thread-1, @number: 0

thread Thread-2, @number: 0

thread Thread-3, @number: 0

thread Thread-2, @number: 1

....

thread Thread-3, @number: 4

End Main threading

 

Process finished with exit code 0


所有子線程結(jié)束了才會(huì)執(zhí)行也行print 'End Main threading',。有人會(huì)這么想,,如果在 t.start()之后join會(huì)怎么樣,?結(jié)果也能阻塞主線程,,但是每個(gè)線程都是依次執(zhí)行,變得有順序了。其實(shí)join很好理解,,就字面上的意思就是子線程 “加入”(join)主線程嘛,。在CPU執(zhí)行時(shí)間片段上“等于”主線程的一部分。在start之后join,,也就是每個(gè)子線程由被后來新建的子線程給阻塞了,,因此線程之間變得有順序了。


借用moxie的總結(jié):


1 join方法的作用是阻塞主進(jìn)程(擋住,,無法執(zhí)行join以后的語句),,專注執(zhí)行多線程。


2 多線程多join的情況下,,依次執(zhí)行各線程的join方法,,前頭一個(gè)結(jié)束了才能執(zhí)行后面一個(gè)。


3 無參數(shù),,則等待到該線程結(jié)束,,才開始執(zhí)行下一個(gè)線程的join。


4 設(shè)置參數(shù)后,,則等待該線程這么長時(shí)間就不管它了(而該線程并沒有結(jié)束),。不管的意思就是可以執(zhí)行后面的主進(jìn)程了


線程同步與互斥鎖


線程之所以比進(jìn)程輕量,其中一個(gè)原因就是他們共享內(nèi)存,。也就是各個(gè)線程可以平等的訪問內(nèi)存的數(shù)據(jù),,如果在短時(shí)間“同時(shí)并行”讀取修改內(nèi)存的數(shù)據(jù),很可能造成數(shù)據(jù)不同步,。例如下面的例子:


count = 0

class MyThread(threading.Thread):

    def run(self):

        global count

        time.sleep(1)

        for i in range(100):

            count += 1

        print 'thread {} add 1, count is {}'.format(self.name, count)

 

 

def main():

    print 'Start main threading'

    for i in range(10):

        MyThread().start()

 

    print 'End Main threading'


輸出結(jié)果如下,,十個(gè)線程,,每個(gè)線程增加100,運(yùn)算結(jié)果應(yīng)該是1000:


Start main threading

End Main threading

thread Thread-6 add 1, count is 161thread Thread-1 add 1, count is 433

thread Thread-7 add 1, count is 482

thread Thread-2 add 1, count is 100

thread Thread-9 add 1, count is 125

 

thread Thread-8 add 1, count is 335

thread Thread-5 add 1, count is 533thread Thread-3 add 1, count is 533

thread Thread-10 add 1, count is 261

 

thread Thread-4 add 1, count is 308


為了避免線程不同步造成是數(shù)據(jù)不同步,,可以對資源進(jìn)行加鎖,。也就是訪問資源的線程需要獲得鎖,才能訪問,。threading模塊正好提供了一個(gè)Lock功能,,修改代碼如下:


# 創(chuàng)建鎖

mutex = threading.Lock()

 

class MyThread(threading.Thread):

    def run(self):

        global count

        time.sleep(1)

        # 獲取鎖,修改資源

        if mutex.acquire():

            for i in range(100):

                count += 1

            print 'thread {} add 1, count is {}'.format(self.name, count)

            # 釋放鎖

            mutex.release()


死鎖


有鎖就可以方便處理線程同步問題,,可是多線程的復(fù)雜度和難以調(diào)試的根源也來自于線程的鎖,。利用不當(dāng),甚至?xí)砀鄦栴},。比如死鎖就是需要避免的問題,。


mutex_a = threading.Lock()

mutex_b = threading.Lock()

 

class MyThread(threading.Thread):

 

    def task_a(self):

        if mutex_a.acquire():

            print 'thread {} get mutex a '.format(self.name)

            time.sleep(1)

            if mutex_b.acquire():

                print 'thread {} get mutex b '.format(self.name)

                mutex_b.release()

            mutex_a.release()

 

    def task_b(self):

 

        if mutex_b.acquire():

            print 'thread {} get mutex a '.format(self.name)

            time.sleep(1)

            if mutex_a.acquire():

                print 'thread {} get mutex b '.format(self.name)

                mutex_a.release()

            mutex_b.release()

 

    def run(self):

        self.task_a()

        self.task_b()

 

def main():

    print 'Start main threading'

 

    threads = [MyThread() for i in range(2)]

 

    for t in threads:

        t.start()

 

    print 'End Main threading'


線程需要執(zhí)行兩個(gè)任務(wù),兩個(gè)任務(wù)都需要獲取鎖,,然而兩個(gè)任務(wù)先得到鎖后,,就需要等另外鎖釋放。


可重入鎖


為了支持在同一線程中多次請求同一資源,,python提供了可重入鎖(RLock),。RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),,從而使得資源可以被多次require,。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源,。


mutex = threading.RLock()

 

class MyThread(threading.Thread):

 

    def run(self):

        if mutex.acquire(1):

            print 'thread {} get mutex'.format(self.name)

            time.sleep(1)

            mutex.acquire()

            mutex.release()

            mutex.release()

 

def main():

    print 'Start main threading'

 

    threads = [MyThread() for i in range(2)]

    for t in threads:

        t.start()

 

    print 'End Main threading'


條件變量


實(shí)用鎖可以達(dá)到線程同步,,前面的互斥鎖就是這種機(jī)制。更復(fù)雜的環(huán)境,,需要針對鎖進(jìn)行一些條件判斷,。Python提供了Condition對象。它除了具有acquire和release方法之外,,還提供了wait和notify方法,。線程首先acquire一個(gè)條件變量鎖。如果條件不足,,則該線程wait,,如果滿足就執(zhí)行線程,甚至可以notify其他線程,。其他處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件,。


條件變量可以看成不同的線程先后acquire獲得鎖,如果不滿足條件,,可以理解為被扔到一個(gè)(Lock或RLock)的waiting池,。直達(dá)其他線程notify之后再重新判斷條件,。該模式常用于生成消費(fèi)者模式:


queue = []

 

con = threading.Condition()

 

class Producer(threading.Thread):

    def run(self):

        while True:

            if con.acquire():

                if len(queue) > 100:

                    con.wait()

                else:

                    elem = random.randrange(100)

                    queue.append(elem)

                    print 'Producer a elem {}, Now size is {}'.format(elem, len(queue))

                    time.sleep(random.random())

                    con.notify()

                con.release()

 

class Consumer(threading.Thread):

    def run(self):

        while True:

            if con.acquire():

                if len(queue) <>0:

                    con.wait()

                else:

                    elem = queue.pop()

                    print 'Consumer a elem {}. Now size is {}'.format(elem, len(queue))

                    time.sleep(random.random())

                    con.notify()

                con.release()

 

def main():

    for i in range(3):

        Producer().start()

 

    for i in range(2):

        Consumer().start()


上述就是一個(gè)簡單的生產(chǎn)者消費(fèi)模型,先看生產(chǎn)者,,生產(chǎn)者條件變量鎖之后就檢查條件,如果不符合條件則wait,,wait的時(shí)候會(huì)釋放鎖,。如果條件符合,則往隊(duì)列添加元素,,然后會(huì)notify其他線程,。注意生產(chǎn)者調(diào)用了condition的notify()方法后,消費(fèi)者被喚醒,,但喚醒不意味著它可以開始運(yùn)行,,notify()并不釋放lock,調(diào)用notify()后,,lock依然被生產(chǎn)者所持有,。生產(chǎn)者通過con.release()顯式釋放lock。消費(fèi)者再次開始運(yùn)行,,獲得條件鎖然后判斷條件執(zhí)行,。


隊(duì)列


生產(chǎn)消費(fèi)者模型主要是對隊(duì)列進(jìn)程操作,貼心的Python為我們實(shí)現(xiàn)了一個(gè)隊(duì)列結(jié)構(gòu),,隊(duì)列內(nèi)部實(shí)現(xiàn)了鎖的相關(guān)設(shè)置,。可以用隊(duì)列重寫生產(chǎn)消費(fèi)者模型,。


import Queue

 

queue = Queue.Queue(10)

 

class Producer(threading.Thread):

 

    def run(self):

        while True:

            elem = random.randrange(100)

            queue.put(elem)

            print 'Producer a elem {}, Now size is {}'.format(elem, queue.qsize())

            time.sleep(random.random())

 

class Consumer(threading.Thread):

 

    def run(self):

        while True:

            elem = queue.get()

            queue.task_done()

            print 'Consumer a elem {}. Now size is {}'.format(elem, queue.qsize())

            time.sleep(random.random())

 

def main():

 

    for i in range(3):

        Producer().start()

 

    for i in range(2):

        Consumer().start()


queue內(nèi)部實(shí)現(xiàn)了相關(guān)的鎖,,如果queue的為空,則get元素的時(shí)候會(huì)被阻塞,,知道隊(duì)列里面被其他線程寫入數(shù)據(jù),。同理,當(dāng)寫入數(shù)據(jù)的時(shí)候,,如果元素個(gè)數(shù)大于隊(duì)列的長度,,也會(huì)被阻塞。也就是在 put 或 get的時(shí)候都會(huì)獲得Lock,。


線程通信


線程可以讀取共享的內(nèi)存,,通過內(nèi)存做一些數(shù)據(jù)處理。這就是線程通信的一種,,python還提供了更加高級的線程通信接口,。Event對象可以用來進(jìn)行線程通信,調(diào)用event對象的wait方法,,線程則會(huì)阻塞等待,,直到別的線程set之后,,才會(huì)被喚醒。


class MyThread(threading.Thread):

    def __init__(self, event):

        super(MyThread, self).__init__()

        self.event = event

 

    def run(self):

        print 'thread {} is ready '.format(self.name)

        self.event.wait()

        print 'thread {} run'.format(self.name)

 

signal = threading.Event()

 

def main():

    start = time.time()

    for i in range(3):

        t = MyThread(signal)

        t.start()

    time.sleep(3)

    print 'after {}s'.format(time.time() - start)

    signal.set()


上面的例子創(chuàng)建了3個(gè)線程,,調(diào)用線程之后,,線程將會(huì)被阻塞,sleep 3秒后,,才會(huì)被喚醒執(zhí)行,,大概輸出如下:


thread Thread-1 is ready

thread Thread-2 is ready

thread Thread-3 is ready

after 3.00441598892s

thread Thread-2 run

thread Thread-3 run

thread Thread-1 run


后臺線程


默認(rèn)情況下,主線程退出之后,,即使子線程沒有join,。那么主線程結(jié)束后,子線程也依然會(huì)繼續(xù)執(zhí)行,。如果希望主線程退出后,,其子線程也退出而不再執(zhí)行,則需要設(shè)置子線程為后臺線程,。python提供了seDeamon方法:


class MyThread(threading.Thread):

 

    def run(self):

        wait_time = random.randrange(1, 10)

        print 'thread {} will wait {}s'.format(self.name, wait_time)

        time.sleep(wait_time)

        print 'thread {} finished'.format(self.name)

 

def main():

    print 'Start main threading'

    for i in range(5):

        t = MyThread()

        t.setDaemon(True)

        t.start()

 

    print 'End Main threading'


輸出結(jié)果如下:


Start main threading

thread Thread-1 will wait 3s

thread Thread-2 will wait 6s

thread Thread-3 will wait 4s

thread Thread-4 will wait 6s

thread Thread-5 will wait 2sEnd Main threading


每個(gè)線程都應(yīng)該等待sleep幾秒,,可是主線程很快就執(zhí)行完了,子線程因?yàn)樵O(shè)置了后臺線程,,所以也跟著主線程退出了,。


關(guān)于Python多線程的介紹暫且就這些,多線程用于并發(fā)任務(wù),。對于并發(fā)模型,,Python還有比線程更好的方法。同樣設(shè)計(jì)任務(wù)的時(shí)候,,也需要考慮是計(jì)算密集型還是IO密集型,。針對不同的場景,設(shè)計(jì)不同的程式系統(tǒng),。


文中的代碼 learn-threading(https://github.com/rsj217/flask--scaffold/tree/master/learn-threading)


參考資料:


1 http://www.cnblogs.com/holbrook/tag/%E5%A4%9A%E7%BA%BF%E7%A8%8B/

2 http:///python-thread-gil-and-ctypes.html


    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多