Python學(xué)習(xí)教程(Python學(xué)習(xí)路線):進(jìn)程和線程今天我們使用的計算機(jī)早已進(jìn)入多CPU或多核時代,而我們使用的操作系統(tǒng)都是支持“多任務(wù)”的操作系統(tǒng),,這使得我們可以同時運(yùn)行多個程序,,也可以將一個程序分解為若干個相對獨(dú)立的子任務(wù),讓多個子任務(wù)并發(fā)的執(zhí)行,,從而縮短程序的執(zhí)行時間,,同時也讓用戶獲得更好的體驗。因此在當(dāng)下不管是用什么編程語言進(jìn)行開發(fā),,實現(xiàn)讓程序同時執(zhí)行多個任務(wù)也就是常說的“并發(fā)編程”,,應(yīng)該是程序員必備技能之一。為此,,我們需要先討論兩個概念,,一個叫進(jìn)程,一個叫線程,。 概念進(jìn)程就是操作系統(tǒng)中執(zhí)行的一個程序,,操作系統(tǒng)以進(jìn)程為單位分配存儲空間,每個進(jìn)程都有自己的地址空間,、數(shù)據(jù)棧以及其他用于跟蹤進(jìn)程執(zhí)行的輔助數(shù)據(jù),,操作系統(tǒng)管理所有進(jìn)程的執(zhí)行,為它們合理的分配資源,。進(jìn)程可以通過fork或spawn的方式來創(chuàng)建新的進(jìn)程來執(zhí)行其他的任務(wù),,不過新的進(jìn)程也有自己獨(dú)立的內(nèi)存空間,因此必須通過進(jìn)程間通信機(jī)制(IPC,,Inter-Process Communication)來實現(xiàn)數(shù)據(jù)共享,,具體的方式包括管道、信號,、套接字,、共享內(nèi)存區(qū)等。 一個進(jìn)程還可以擁有多個并發(fā)的執(zhí)行線索,,簡單的說就是擁有多個可以獲得CPU調(diào)度的執(zhí)行單元,,這就是所謂的線程。由于線程在同一個進(jìn)程下,,它們可以共享相同的上下文,,因此相對于進(jìn)程而言,線程間的信息共享和通信更加容易,。當(dāng)然在單核CPU系統(tǒng)中,,真正的并發(fā)是不可能的,,因為在某個時刻能夠獲得CPU的只有唯一的一個線程,多個線程共享了CPU的執(zhí)行時間,。使用多線程實現(xiàn)并發(fā)編程為程序帶來的好處是不言而喻的,,最主要的體現(xiàn)在提升程序的性能和改善用戶體驗,今天我們使用的軟件幾乎都用到了多線程技術(shù),,這一點可以利用系統(tǒng)自帶的進(jìn)程監(jiān)控工具(如macOS中的“活動監(jiān)視器”,、Windows中的“任務(wù)管理器”)來證實,如下圖所示,。 當(dāng)然多線程也并不是沒有壞處,,站在其他進(jìn)程的角度,多線程的程序?qū)ζ渌绦虿⒉挥押?,因為它占用了更多的CPU執(zhí)行時間,,導(dǎo)致其他程序無法獲得足夠的CPU執(zhí)行時間;另一方面,,站在開發(fā)者的角度,,編寫和調(diào)試多線程的程序都對開發(fā)者有較高的要求,對于初學(xué)者來說更加困難,。 Python既支持多進(jìn)程又支持多線程,,因此使用Python實現(xiàn)并發(fā)編程主要有3種方式:多進(jìn)程、多線程,、多進(jìn)程+多線程,。 Python中的多進(jìn)程Unix和Linux操作系統(tǒng)上提供了 下面用一個下載文件的例子來說明使用多進(jìn)程和不使用多進(jìn)程到底有什么差別,先看看下面的代碼。 from random import randintfrom time import time, sleepdef download_task(filename): print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費(fèi)了%d秒' % (filename, time_to_download))def main(): start = time() download_task('Python從入門到住院.pdf') download_task('Peking Hot.avi') end = time() print('總共耗費(fèi)了%.2f秒.' % (end - start))if __name__ == '__main__': main() 下面是運(yùn)行程序得到的一次運(yùn)行結(jié)果,。 開始下載Python從入門到住院.pdf... Python從入門到住院.pdf下載完成! 耗費(fèi)了6秒 開始下載Peking Hot.avi... Peking Hot.avi下載完成! 耗費(fèi)了7秒 總共耗費(fèi)了13.01秒. 從上面的例子可以看出,,如果程序中的代碼只能按順序一點點的往下執(zhí)行,那么即使執(zhí)行兩個毫不相關(guān)的下載任務(wù),,也需要先等待一個文件下載完成后才能開始下一個下載任務(wù),,很顯然這并不合理也沒有效率。接下來我們使用多進(jìn)程的方式將兩個下載任務(wù)放到不同的進(jìn)程中,,代碼如下所示,。 from multiprocessing import Processfrom os import getpidfrom random import randintfrom time import time, sleepdef download_task(filename): print('啟動下載進(jìn)程,進(jìn)程號[%d].' % getpid()) print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費(fèi)了%d秒' % (filename, time_to_download))def main(): start = time() p1 = Process(target=download_task, args=('Python從入門到住院.pdf', )) p1.start() p2 = Process(target=download_task, args=('Peking Hot.avi', )) p2.start() p1.join() p2.join() end = time() print('總共耗費(fèi)了%.2f秒.' % (end - start))if __name__ == '__main__': main() 在上面的代碼中,,我們通過 啟動下載進(jìn)程,,進(jìn)程號[1530]. 開始下載Python從入門到住院.pdf... 啟動下載進(jìn)程,,進(jìn)程號[1531]. 開始下載Peking Hot.avi... Peking Hot.avi下載完成! 耗費(fèi)了7秒 Python從入門到住院.pdf下載完成! 耗費(fèi)了10秒 總共耗費(fèi)了10.01秒. 我們也可以使用subprocess模塊中的類和函數(shù)來創(chuàng)建和啟動子進(jìn)程,然后通過管道來和子進(jìn)程通信,,這些內(nèi)容我們不在此進(jìn)行講解,,有興趣的讀者可以自己了解這些知識。接下來我們將重點放在如何實現(xiàn)兩個進(jìn)程間的通信,。我們啟動兩個進(jìn)程,,一個輸出Ping,一個輸出Pong,,兩個進(jìn)程輸出的Ping和Pong加起來一共10個,。聽起來很簡單吧,但是如果這樣寫可是錯的哦,。 from multiprocessing import Processfrom time import sleep counter = 0def sub_task(string): global counter while counter < 10: print(string, end='', flush=True) counter += 1sleep(0.01) def main(): Process(target=sub_task, args=('Ping', )).start() Process(target=sub_task, args=('Pong', )).start()if __name__ == '__main__': main() 看起來沒毛病,,但是最后的結(jié)果是Ping和Pong各輸出了10個,Why,?當(dāng)我們在程序中創(chuàng)建進(jìn)程的時候,,子進(jìn)程復(fù)制了父進(jìn)程及其所有的數(shù)據(jù)結(jié)構(gòu),每個子進(jìn)程有自己獨(dú)立的內(nèi)存空間,這也就意味著兩個子進(jìn)程中各有一個 Python中的多線程在Python早期的版本中就引入了thread模塊(現(xiàn)在名為_thread)來實現(xiàn)多線程編程,,然而該模塊過于底層,,而且很多功能都沒有提供,因此目前的多線程開發(fā)我們推薦使用threading模塊,,該模塊對多線程編程提供了更好的面向?qū)ο蟮姆庋b,。我們把剛才下載文件的例子用多線程的方式來實現(xiàn)一遍。 from random import randintfrom threading import Threadfrom time import time, sleepdef download(filename): print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費(fèi)了%d秒' % (filename, time_to_download))def main(): start = time() t1 = Thread(target=download, args=('Python從入門到住院.pdf',)) t1.start() t2 = Thread(target=download, args=('Peking Hot.avi',)) t2.start() t1.join() t2.join() end = time() print('總共耗費(fèi)了%.3f秒' % (end - start))if __name__ == '__main__': main() 我們可以直接使用threading模塊的 from random import randintfrom threading import Threadfrom time import time, sleepclass DownloadTask(Thread): def __init__(self, filename): super().__init__() self._filename = filename def run(self): print('開始下載%s...' % self._filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費(fèi)了%d秒' % (self._filename, time_to_download))def main(): start = time() t1 = DownloadTask('Python從入門到住院.pdf') t1.start() t2 = DownloadTask('Peking Hot.avi') t2.start() t1.join() t2.join() end = time() print('總共耗費(fèi)了%.2f秒.' % (end - start))if __name__ == '__main__': main() 因為多個線程可以共享進(jìn)程的內(nèi)存空間,,因此要實現(xiàn)多個線程間的通信相對簡單,,大家能想到的最直接的辦法就是設(shè)置一個全局變量,多個線程共享這個全局變量即可,。但是當(dāng)多個線程共享同一個變量(我們通常稱之為“資源”)的時候,,很有可能產(chǎn)生不可控的結(jié)果從而導(dǎo)致程序失效甚至崩潰。如果一個資源被多個線程競爭使用,,那么我們通常稱之為“臨界資源”,,對“臨界資源”的訪問需要加上保護(hù),否則資源會處于“混亂”的狀態(tài),。下面的例子演示了100個線程向同一個銀行賬戶轉(zhuǎn)賬(轉(zhuǎn)入1元錢)的場景,,在這個例子中,銀行賬戶就是一個臨界資源,,在沒有保護(hù)的情況下我們很有可能會得到錯誤的結(jié)果,。 from time import sleepfrom threading import Threadclass Account(object): def __init__(self): self._balance = 0def deposit(self, money): # 計算存款后的余額new_balance = self._balance + money # 模擬受理存款業(yè)務(wù)需要0.01秒的時間sleep(0.01) # 修改賬戶余額self._balance = new_balance @propertydef balance(self): return self._balanceclass AddMoneyThread(Thread): def __init__(self, account, money): super().__init__() self._account = account self._money = money def run(self): self._account.deposit(self._money)def main(): account = Account() threads = [] # 創(chuàng)建100個存款的線程向同一個賬戶中存錢for _ in range(100): t = AddMoneyThread(account, 1) threads.append(t) t.start() # 等所有存款的線程都執(zhí)行完畢for t in threads: t.join() print('賬戶余額為: ¥%d元' % account.balance)if __name__ == '__main__': main() 運(yùn)行上面的程序,結(jié)果讓人大跌眼鏡,,100個線程分別向賬戶中轉(zhuǎn)入1元錢,,結(jié)果居然遠(yuǎn)遠(yuǎn)小于100元。之所以出現(xiàn)這種情況是因為我們沒有對銀行賬戶這個“臨界資源”加以保護(hù),,多個線程同時向賬戶中存錢時,,會一起執(zhí)行到 from time import sleepfrom threading import Thread, Lockclass Account(object): def __init__(self): self._balance = 0self._lock = Lock() def deposit(self, money): # 先獲取鎖才能執(zhí)行后續(xù)的代碼self._lock.acquire() try: new_balance = self._balance + money sleep(0.01) self._balance = new_balance finally: # 在finally中執(zhí)行釋放鎖的操作保證正常異常鎖都能釋放self._lock.release() @propertydef balance(self): return self._balanceclass AddMoneyThread(Thread): def __init__(self, account, money): super().__init__() self._account = account self._money = money def run(self): self._account.deposit(self._money)def main(): account = Account() threads = [] for _ in range(100): t = AddMoneyThread(account, 1) threads.append(t) t.start() for t in threads: t.join() print('賬戶余額為: ¥%d元' % account.balance)if __name__ == '__main__': main() 比較遺憾的一件事情是Python的多線程并不能發(fā)揮CPU的多核特性,,這一點只要啟動幾個執(zhí)行死循環(huán)的線程就可以得到證實了,。之所以如此,是因為Python的解釋器有一個“全局解釋器鎖”(GIL)的東西,,任何線程執(zhí)行前必須先獲得GIL鎖,然后每執(zhí)行100條字節(jié)碼,,解釋器就自動釋放GIL鎖,,讓別的線程有機(jī)會執(zhí)行,這是一個歷史遺留問題,,但是即便如此,,就如我們之前舉的例子,使用多線程在提升執(zhí)行效率和改善用戶體驗方面仍然是有積極意義的,。 多進(jìn)程還是多線程無論是多進(jìn)程還是多線程,,只要數(shù)量一多,效率肯定上不去,,為什么呢,?我們打個比方,假設(shè)你不幸正在準(zhǔn)備中考,,每天晚上需要做語文,、數(shù)學(xué)、英語,、物理,、化學(xué)這5科的作業(yè),每項作業(yè)耗時1小時,。如果你先花1小時做語文作業(yè),,做完了,,再花1小時做數(shù)學(xué)作業(yè),這樣,,依次全部做完,,一共花5小時,這種方式稱為單任務(wù)模型,。如果你打算切換到多任務(wù)模型,,可以先做1分鐘語文,再切換到數(shù)學(xué)作業(yè),,做1分鐘,,再切換到英語,以此類推,,只要切換速度足夠快,,這種方式就和單核CPU執(zhí)行多任務(wù)是一樣的了,以旁觀者的角度來看,,你就正在同時寫5科作業(yè),。 但是,切換作業(yè)是有代價的,,比如從語文切到數(shù)學(xué),,要先收拾桌子上的語文書本、鋼筆(這叫保存現(xiàn)場),,然后,,打開數(shù)學(xué)課本、找出圓規(guī)直尺(這叫準(zhǔn)備新環(huán)境),,才能開始做數(shù)學(xué)作業(yè),。操作系統(tǒng)在切換進(jìn)程或者線程時也是一樣的,它需要先保存當(dāng)前執(zhí)行的現(xiàn)場環(huán)境(CPU寄存器狀態(tài),、內(nèi)存頁等),,然后,把新任務(wù)的執(zhí)行環(huán)境準(zhǔn)備好(恢復(fù)上次的寄存器狀態(tài),,切換內(nèi)存頁等),,才能開始執(zhí)行。這個切換過程雖然很快,,但是也需要耗費(fèi)時間,。如果有幾千個任務(wù)同時進(jìn)行,操作系統(tǒng)可能就主要忙著切換任務(wù),,根本沒有多少時間去執(zhí)行任務(wù)了,,這種情況最常見的就是硬盤狂響,點窗口無反應(yīng),,系統(tǒng)處于假死狀態(tài),。所以,,多任務(wù)一旦多到一個限度,反而會使得系統(tǒng)性能急劇下降,,最終導(dǎo)致所有任務(wù)都做不好,。 是否采用多任務(wù)的第二個考慮是任務(wù)的類型,可以把任務(wù)分為計算密集型和I/O密集型,。計算密集型任務(wù)的特點是要進(jìn)行大量的計算,,消耗CPU資源,比如對視頻進(jìn)行編碼解碼或者格式轉(zhuǎn)換等等,,這種任務(wù)全靠CPU的運(yùn)算能力,,雖然也可以用多任務(wù)完成,但是任務(wù)越多,,花在任務(wù)切換的時間就越多,,CPU執(zhí)行任務(wù)的效率就越低。計算密集型任務(wù)由于主要消耗CPU資源,,這類任務(wù)用Python這樣的腳本語言去執(zhí)行效率通常很低,,最能勝任這類任務(wù)的是C語言,我們之前提到了Python中有嵌入C/C++代碼的機(jī)制,。 除了計算密集型任務(wù),其他的涉及到網(wǎng)絡(luò),、存儲介質(zhì)I/O的任務(wù)都可以視為I/O密集型任務(wù),,這類任務(wù)的特點是CPU消耗很少,任務(wù)的大部分時間都在等待I/O操作完成(因為I/O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度),。對于I/O密集型任務(wù),如果啟動多任務(wù),,就可以減少I/O等待時間從而讓CPU高效率的運(yùn)轉(zhuǎn),。有一大類的任務(wù)都屬于I/O密集型任務(wù),這其中包括了我們很快會涉及到的網(wǎng)絡(luò)應(yīng)用和Web應(yīng)用,。
單線程+異步I/O現(xiàn)代操作系統(tǒng)對I/O操作的改進(jìn)中最為重要的就是支持異步I/O,。如果充分利用操作系統(tǒng)提供的異步I/O支持,,就可以用單進(jìn)程單線程模型來執(zhí)行多任務(wù),這種全新的模型稱為事件驅(qū)動模型,。Nginx就是支持異步I/O的Web服務(wù)器,,它在單核CPU上采用單進(jìn)程模型就可以高效地支持多任務(wù),。在多核CPU上,,可以運(yùn)行多個進(jìn)程(數(shù)量與CPU核心數(shù)相同),,充分利用多核CPU。用Node.js開發(fā)的服務(wù)器端程序也使用了這種工作模式,,這也是當(dāng)下實現(xiàn)多任務(wù)編程的一種趨勢,。 在Python語言中,單線程+異步I/O的編程模型稱為協(xié)程,,有了協(xié)程的支持,,就可以基于事件驅(qū)動編寫高效的多任務(wù)程序。協(xié)程最大的優(yōu)勢就是極高的執(zhí)行效率,,因為子程序切換不是線程切換,,而是由程序自身控制,因此,,沒有線程切換的開銷,。協(xié)程的第二個優(yōu)勢就是不需要多線程的鎖機(jī)制,因為只有一個線程,,也不存在同時寫變量沖突,,在協(xié)程中控制共享資源不用加鎖,,只需要判斷狀態(tài)就好了,,所以執(zhí)行效率比多線程高很多。如果想要充分利用CPU的多核特性,,最簡單的方法是多進(jìn)程+協(xié)程,,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,,可獲得極高的性能,。關(guān)于這方面的內(nèi)容,我稍后會做一個專題來進(jìn)行講解,。 應(yīng)用案例例子1:將耗時間的任務(wù)放到線程中以獲得更好的用戶體驗,。如下所示的界面中,有“下載”和“關(guān)于”兩個按鈕,,用休眠的方式模擬點擊“下載”按鈕會聯(lián)網(wǎng)下載文件需要耗費(fèi)10秒的時間,,如果不使用“多線程”,我們會發(fā)現(xiàn),,當(dāng)點擊“下載”按鈕后整個程序的其他部分都被這個耗時間的任務(wù)阻塞而無法執(zhí)行了,,這顯然是非常糟糕的用戶體驗,,代碼如下所示。 import timeimport tkinterimport tkinter.messageboxdef download(): # 模擬下載任務(wù)需要花費(fèi)10秒鐘時間time.sleep(10) tkinter.messagebox.showinfo('提示', '下載完成!')def show_about(): tkinter.messagebox.showinfo('關(guān)于', '作者: 駱昊(v1.0)')def main(): top = tkinter.Tk() top.title('單線程') top.geometry('200x150') top.wm_attributes('-topmost', True) panel = tkinter.Frame(top) button1 = tkinter.Button(panel, text='下載', command=download) button1.pack(side='left') button2 = tkinter.Button(panel, text='關(guān)于', command=show_about) button2.pack(side='right') panel.pack(side='bottom') tkinter.mainloop()if __name__ == '__main__': main() 如果使用多線程將耗時間的任務(wù)放到一個獨(dú)立的線程中執(zhí)行,,這樣就不會因為執(zhí)行耗時間的任務(wù)而阻塞了主線程,,修改后的代碼如下所示。 import timeimport tkinterimport tkinter.messageboxfrom threading import Threaddef main(): class DownloadTaskHandler(Thread): def run(self): time.sleep(10) tkinter.messagebox.showinfo('提示', '下載完成!') # 啟用下載按鈕button1.config(state=tkinter.NORMAL) def download(): # 禁用下載按鈕button1.config(state=tkinter.DISABLED) # 通過daemon參數(shù)將線程設(shè)置為守護(hù)線程(主程序退出就不再保留執(zhí)行)# 在線程中處理耗時間的下載任務(wù)DownloadTaskHandler(daemon=True).start() def show_about(): tkinter.messagebox.showinfo('關(guān)于', '作者: 駱昊(v1.0)') top = tkinter.Tk() top.title('單線程') top.geometry('200x150') top.wm_attributes('-topmost', 1) panel = tkinter.Frame(top) button1 = tkinter.Button(panel, text='下載', command=download) button1.pack(side='left') button2 = tkinter.Button(panel, text='關(guān)于', command=show_about) button2.pack(side='right') panel.pack(side='bottom') tkinter.mainloop()if __name__ == '__main__': main() 例子2:使用多進(jìn)程對復(fù)雜任務(wù)進(jìn)行“分而治之”,。我們來完成1~100000000求和的計算密集型任務(wù),,這個問題本身非常簡單,有點循環(huán)的知識就能解決,,代碼如下所示,。 from time import timedef main(): total = 0number_list = [x for x in range(1, 100000001)] start = time() for number in number_list: total += number print(total) end = time() print('Execution time: %.3fs' % (end - start))if __name__ == '__main__': main() 在上面的代碼中,我故意先去創(chuàng)建了一個列表容器然后填入了100000000個數(shù),,這一步其實是比較耗時間的,,所以為了公平起見,當(dāng)我們將這個任務(wù)分解到8個進(jìn)程中去執(zhí)行的時候,,我們暫時也不考慮列表切片操作花費(fèi)的時間,,只是把做運(yùn)算和合并運(yùn)算結(jié)果的時間統(tǒng)計出來,代碼如下所示,。 from multiprocessing import Process, Queuefrom random import randintfrom time import timedef task_handler(curr_list, result_queue): total = 0for number in curr_list: total += number result_queue.put(total)def main(): processes = [] number_list = [x for x in range(1, 100000001)] result_queue = Queue() index = 0# 啟動8個進(jìn)程將數(shù)據(jù)切片后進(jìn)行運(yùn)算for _ in range(8): p = Process(target=task_handler, args=(number_list[index:index + 12500000], result_queue)) index += 12500000processes.append(p) p.start() # 開始記錄所有進(jìn)程執(zhí)行完成花費(fèi)的時間start = time() for p in processes: p.join() # 合并執(zhí)行結(jié)果total = 0while not result_queue.empty(): total += result_queue.get() print(total) end = time() print('Execution time: ', (end - start), 's', sep='')if __name__ == '__main__': main() 比較兩段代碼的執(zhí)行結(jié)果(在我目前使用的MacBook上,,上面的代碼需要大概6秒左右的時間,而下面的代碼只需要不到1秒的時間,,再強(qiáng)調(diào)一次我們只是比較了運(yùn)算的時間,,不考慮列表創(chuàng)建及切片操作花費(fèi)的時間),,使用多進(jìn)程后由于獲得了更多的CPU執(zhí)行時間以及更好的利用了CPU的多核特性,,明顯的減少了程序的執(zhí)行時間,而且計算量越大效果越明顯,。當(dāng)然,,如果愿意還可以將多個進(jìn)程部署在不同的計算機(jī)上,,做成分布式進(jìn)程,具體的做法就是通過multiprocessing.managers模塊中提供的管理器將 |
|