Java SE 5.0中引入了任務(wù)執(zhí)行框架,,這是簡(jiǎn)化多線(xiàn)程程序設(shè)計(jì)開(kāi)發(fā)的一大進(jìn)步。使用這個(gè)框架可以方便地管理任務(wù):管理任務(wù)的生命周期以及執(zhí)行策略,。 在這篇文章中,,我們通過(guò)一個(gè)簡(jiǎn)單的例子來(lái)展現(xiàn)這個(gè)框架所帶來(lái)的靈活與簡(jiǎn)單。 基礎(chǔ)執(zhí)行框架引入了Executor接口來(lái)管理任務(wù)的執(zhí)行,。Executor是一個(gè)用來(lái)提交Runnable任務(wù)的接口,。這個(gè)接口將任務(wù)提交與任務(wù)執(zhí)行隔離起來(lái):擁有不同執(zhí)行策略的executor都實(shí)現(xiàn)了同一個(gè)提交接口。改變執(zhí)行策略不會(huì)影響任務(wù)的提交邏輯,。 如果你要提交一個(gè)Runnable對(duì)象來(lái)執(zhí)行,,很簡(jiǎn)單: Executor exec = …;exec.execute(runnable); 線(xiàn)程池如前所述,executor如何去執(zhí)行提交的runnable任務(wù)并沒(méi)有在Executor接口中規(guī)定,,這取決于你所用的executor的具體類(lèi)型,。這個(gè)框架提供了幾種不同的executor,執(zhí)行策略針對(duì)不同的場(chǎng)景而不同,。 你可能會(huì)用到的最常見(jiàn)的executor類(lèi)型就是線(xiàn)程池executor,,也就是ThreadPoolExecutor類(lèi)(及其子類(lèi))的實(shí)例。ThreadPoolExecutor管理著一個(gè)線(xiàn)程池和一個(gè)工作隊(duì)列,,線(xiàn)程池存放著用于執(zhí)行任務(wù)的工作線(xiàn)程,。 你肯定在其他技術(shù)中也了解過(guò)“池”的概念。使用“池”的一個(gè)最大的好處就是減少資源創(chuàng)建的開(kāi)銷(xiāo),,用過(guò)并釋放后,,還可以重用,。另一個(gè)間接的好處是你可以控制使用資源的多少。比如,,你可以調(diào)整線(xiàn)程池的大小達(dá)到你想要的負(fù)載,,而不損害系統(tǒng)的資源。 這個(gè)框架提供了一個(gè)工廠(chǎng)類(lèi),,叫Executors,,來(lái)創(chuàng)建線(xiàn)程池。使用這個(gè)工程類(lèi)你可以創(chuàng)建不同特性的線(xiàn)程池,。盡管底層的實(shí)現(xiàn)常常是一樣的(ThreadPoolExecutor),,但工廠(chǎng)類(lèi)可以使你不必使用復(fù)雜的構(gòu)造函數(shù)就可以快速地設(shè)置一個(gè)線(xiàn)程池。工程類(lèi)的工廠(chǎng)方法有:
這僅僅是一個(gè)開(kāi)端,。Executor還有一些其他用法已超出了這篇文章的范圍,我強(qiáng)烈推薦你研究以下內(nèi)容:
ExecutorService接口特別重要,因?yàn)樗峁┝岁P(guān)閉線(xiàn)程池的方法,,并確保清理了不再使用的資源,。令人欣慰的是,ExecutorService接口相當(dāng)簡(jiǎn)單,、一目了然,,我建議全面地學(xué)習(xí)下它的文檔,。 大致來(lái)說(shuō),,當(dāng)你向ExecutorService發(fā)送了一個(gè)shutdown()消息后,,它就不會(huì)接收新提交的任務(wù),但是仍在隊(duì)列中的任務(wù)會(huì)被繼續(xù)處理完,。你可以使用isTerminated()來(lái)查詢(xún)ExecutorService終止?fàn)顟B(tài),,或使用awaitTermination(…)方法來(lái)等待ExecutorService終止。如果傳入一個(gè)最大超時(shí)時(shí)間作為參數(shù),,awaitTermination方法就不會(huì)永遠(yuǎn)等待,。 警告: 對(duì)JVM進(jìn)程永遠(yuǎn)不會(huì)退出的理解上,存在著一些錯(cuò)誤和迷惑,。如果你不關(guān)閉executorService,,只是銷(xiāo)毀了底層的線(xiàn)程,JVM就不會(huì)退出,。當(dāng)最后一個(gè)普通線(xiàn)程(非守護(hù)線(xiàn)程)退出后,,JVM也會(huì)退出。 配置ThreadPoolExecutor如果你決定不使用Executor的工廠(chǎng)類(lèi),,而是手動(dòng)創(chuàng)建一個(gè) ThreadPoolExecutor,,你需要使用構(gòu)造函數(shù)來(lái)創(chuàng)建并配置。下面是這個(gè)類(lèi)使用最廣泛的一個(gè)構(gòu)造函數(shù): public ThreadPoolExecutor( int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit unit, BlockingQueue 如你所見(jiàn),,你可以配置以下內(nèi)容:
限制隊(duì)列中任務(wù)數(shù)限制執(zhí)行任務(wù)的并發(fā)數(shù),、限制線(xiàn)程池大小對(duì)應(yīng)用程序以及程序執(zhí)行結(jié)果的可預(yù)期性與穩(wěn)定性有很大的好處。無(wú)盡地創(chuàng)建線(xiàn)程,,最終會(huì)耗盡運(yùn)行時(shí)資源,。你的應(yīng)用程序因此會(huì)產(chǎn)生嚴(yán)重的性能問(wèn)題,甚至導(dǎo)致程序不穩(wěn)定,。 這只解決了部分問(wèn)題:限制了并發(fā)任務(wù)數(shù),,但并沒(méi)有限制提交到等待隊(duì)列的任務(wù)數(shù)。如果任務(wù)提交的速率一直高于任務(wù)執(zhí)行的速率,,那么應(yīng)用程序最終會(huì)出現(xiàn)資源短缺的狀況,。 解決方法是:
默認(rèn)的拒絕策略可以讓executor拋出一個(gè)RejectedExecutionException異常,。然而,,還有其他的內(nèi)建策略:
什么時(shí)候以及為什么我們才會(huì)這樣配置線(xiàn)程池?讓我們看一個(gè)例子,。 示例:并行執(zhí)行獨(dú)立的單線(xiàn)程任務(wù)最近,,我被叫去解決一個(gè)很久以前的任務(wù)的問(wèn)題,我的客戶(hù)之前就運(yùn)行過(guò)這個(gè)任務(wù),。大致來(lái)說(shuō),,這個(gè)任務(wù)包含一個(gè)組件,這個(gè)組件監(jiān)聽(tīng)目錄樹(shù)所產(chǎn)生的文件系統(tǒng)事件,。每當(dāng)一個(gè)事件被觸發(fā),,必須處理一個(gè)文件。一個(gè)專(zhuān)門(mén)的單線(xiàn)程執(zhí)行文件處理,。說(shuō)真的,,根據(jù)任務(wù)的特點(diǎn),即使我能把它并行化,,我也不想那么做,。一天的某些時(shí)候,事件到達(dá)率才很高,,文件也沒(méi)必要實(shí)時(shí)處理,,在第二天之前處理完即可。 當(dāng)前的實(shí)現(xiàn)采用了一些混合且匹配的技術(shù),,包括使用UNIX SHELL腳本掃描目錄結(jié)構(gòu),,并檢測(cè)是否發(fā)生改變。實(shí)現(xiàn)完成后,,我們采用了雙核的執(zhí)行環(huán)境,。同樣,事件的到達(dá)率相當(dāng)?shù)停耗壳盀橹?,事件?shù)以百萬(wàn)計(jì),,總共要處理1~2T字節(jié)的原始數(shù)據(jù)。 運(yùn)行處理程序的主機(jī)是12核的機(jī)器:很好機(jī)會(huì)去并行化這些舊的單線(xiàn)程任務(wù),?;旧希覀冇辛耸匙V的所有原料,,我們需要做的僅僅是把程序建立起來(lái)并調(diào)節(jié),。在寫(xiě)代碼前,我們必須了解下程序的負(fù)載,。我列一下我檢測(cè)到的內(nèi)容:
我需要這樣一個(gè)線(xiàn)程池,它的大小在程序運(yùn)行的時(shí)候通過(guò)負(fù)載配置來(lái)設(shè)置,。我傾向于根據(jù)負(fù)載策略創(chuàng)建一個(gè)固定大小的線(xiàn)程池,。由于線(xiàn)程的性能瓶頸在CPU,,它的核心使用率是100%,,不會(huì)等待其他資源,那么負(fù)載策略就很好計(jì)算了:用執(zhí)行環(huán)境的CPU核心數(shù)乘以一個(gè)負(fù)載因子(保證計(jì)算的結(jié)果在峰值時(shí)至少有一個(gè)核心): int cpus = Runtime.getRuntime().availableProcessors();int maxThreads = cpus * scaleFactor;maxThreads = (maxThreads > 0 ? maxThreads : 1); 然后我需要使用阻塞隊(duì)列創(chuàng)建一個(gè)ThreadPoolExecutor,,可以限制提交的任務(wù)數(shù),。為什么?是這樣,,掃描算法執(zhí)行很快,,很快就產(chǎn)生龐大數(shù)量需要處理的文件。數(shù)量有多龐大呢,?很難預(yù)測(cè),,因?yàn)樽儎?dòng)太大了。我不想讓executor內(nèi)部的隊(duì)列不加選擇地填滿(mǎn)了要執(zhí)行的任務(wù)實(shí)例(這些實(shí)例包含了龐大的文件描述符),。我寧愿在隊(duì)列填滿(mǎn)時(shí),,拒絕這些文件。 而且,,我將使用ThreadPoolExecutor.CallerRunsPolicy作為拒絕策略,。為什么?因?yàn)楫?dāng)隊(duì)列已滿(mǎn)時(shí),,線(xiàn)程池的線(xiàn)程忙于處理文件,,我讓提交任務(wù)的線(xiàn)程去執(zhí)行它(被拒絕的任務(wù))。這樣,,掃面會(huì)停止,,轉(zhuǎn)而去處理一個(gè)文件,處理結(jié)束后馬上又會(huì)掃描目錄,。 下面是創(chuàng)建executor的代碼: ExecutorService executorService = new ThreadPoolExecutor( maxThreads, // core thread pool size maxThreads, // maximum thread pool size 1, // time to wait before resizing pool TimeUnit.MINUTES, new ArrayBlockingQueue 下面是程序的框架(極其簡(jiǎn)化版): // scanning loop: fake scanningwhile (!dirsToProcess.isEmpty()) { File currentDir = dirsToProcess.pop(); // listing children File[] children = currentDir.listFiles(); // processing children for (final File currentFile : children) { // if it's a directory, defer processing if (currentFile.isDirectory()) { dirsToProcess.add(currentFile); continue; } executorService.submit(new Runnable() { @Override public void run() { try { // if it's a file, process it new ConvertTask(currentFile).perform(); } catch (Exception ex) { // error management logic } } }); }}// ...// wait for all of the executor threads to finishexecutorService.shutdown();try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // pool didn't terminate after the first try executorService.shutdownNow(); } if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // pool didn't terminate after the second try }} catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt();} 總結(jié)看到了吧,,Java并發(fā)API非常簡(jiǎn)單易用,十分靈活,,也很強(qiáng)大,。真希望我多年前可以多花點(diǎn)功夫?qū)懸粋€(gè)這樣簡(jiǎn)單的程序。這樣我就可以在幾小時(shí)內(nèi)解決由傳統(tǒng)單線(xiàn)程組件所引發(fā)的擴(kuò)展性問(wèn)題,。 |
|