久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Java中使用ThreadPoolExecutor并行執(zhí)行獨(dú)立的單線(xiàn)程任務(wù)

 spzproot 2016-08-08

Java SE 5.0中引入了任務(wù)執(zhí)行框架,,這是簡(jiǎn)化多線(xiàn)程程序設(shè)計(jì)開(kāi)發(fā)的一大進(jìn)步。使用這個(gè)框架可以方便地管理任務(wù):管理任務(wù)的生命周期以及執(zhí)行策略,。

在這篇文章中,,我們通過(guò)一個(gè)簡(jiǎn)單的例子來(lái)展現(xiàn)這個(gè)框架所帶來(lái)的靈活與簡(jiǎn)單。

基礎(chǔ)

執(zhí)行框架引入了Executor接口來(lái)管理任務(wù)的執(zhí)行,。Executor是一個(gè)用來(lái)提交Runnable任務(wù)的接口,。這個(gè)接口將任務(wù)提交與任務(wù)執(zhí)行隔離起來(lái):擁有不同執(zhí)行策略的executor都實(shí)現(xiàn)了同一個(gè)提交接口。改變執(zhí)行策略不會(huì)影響任務(wù)的提交邏輯,。

如果你要提交一個(gè)Runnable對(duì)象來(lái)執(zhí)行,,很簡(jiǎn)單:

Executor exec = …;exec.execute(runnable);

線(xiàn)程池

如前所述,executor如何去執(zhí)行提交的runnable任務(wù)并沒(méi)有在Executor接口中規(guī)定,,這取決于你所用的executor的具體類(lèi)型,。這個(gè)框架提供了幾種不同的executor,執(zhí)行策略針對(duì)不同的場(chǎng)景而不同,。

你可能會(huì)用到的最常見(jiàn)的executor類(lèi)型就是線(xiàn)程池executor,,也就是ThreadPoolExecutor類(lèi)(及其子類(lèi))的實(shí)例。ThreadPoolExecutor管理著一個(gè)線(xiàn)程池和一個(gè)工作隊(duì)列,,線(xiàn)程池存放著用于執(zhí)行任務(wù)的工作線(xiàn)程,。

你肯定在其他技術(shù)中也了解過(guò)“池”的概念。使用“池”的一個(gè)最大的好處就是減少資源創(chuàng)建的開(kāi)銷(xiāo),,用過(guò)并釋放后,,還可以重用,。另一個(gè)間接的好處是你可以控制使用資源的多少。比如,,你可以調(diào)整線(xiàn)程池的大小達(dá)到你想要的負(fù)載,,而不損害系統(tǒng)的資源。

這個(gè)框架提供了一個(gè)工廠(chǎng)類(lèi),,叫Executors,,來(lái)創(chuàng)建線(xiàn)程池。使用這個(gè)工程類(lèi)你可以創(chuàng)建不同特性的線(xiàn)程池,。盡管底層的實(shí)現(xiàn)常常是一樣的(ThreadPoolExecutor),,但工廠(chǎng)類(lèi)可以使你不必使用復(fù)雜的構(gòu)造函數(shù)就可以快速地設(shè)置一個(gè)線(xiàn)程池。工程類(lèi)的工廠(chǎng)方法有:

  • newFixedThreadPool:該方法返回一個(gè)最大容量固定的線(xiàn)程池,。它會(huì)按需創(chuàng)建新線(xiàn)程,,線(xiàn)程數(shù)量不大于配置的數(shù)量大小。當(dāng)線(xiàn)程數(shù)達(dá)到最大以后,,線(xiàn)程池會(huì)一直維持這么多不變,。
  • newCachedThreadPool:該方法返回一個(gè)無(wú)界的線(xiàn)程池,也就是沒(méi)有最大數(shù)量限制,。但當(dāng)工作量減小時(shí),,這類(lèi)線(xiàn)程池會(huì)銷(xiāo)毀沒(méi)用的線(xiàn)程。
  • newSingleThreadedExecutor:該方法返回一個(gè)executor,,它可以保證所有的任務(wù)都在一個(gè)單線(xiàn)程中執(zhí)行,。
  • newScheduledThreadPool:該方法返回一個(gè)固定大小的線(xiàn)程池,它支持延時(shí)和定時(shí)任務(wù)的執(zhí)行,。

這僅僅是一個(gè)開(kāi)端,。Executor還有一些其他用法已超出了這篇文章的范圍,我強(qiáng)烈推薦你研究以下內(nèi)容:

  • 生命周期管理的方法,,這些方法由ExecutorService接口聲明(比如shutdown()和awaitTermination()),。
  • 使用CompletionService來(lái)查詢(xún)?nèi)蝿?wù)狀態(tài)、獲取返回值,,如果有返回值的話(huà),。

ExecutorService接口特別重要,因?yàn)樗峁┝岁P(guān)閉線(xiàn)程池的方法,,并確保清理了不再使用的資源,。令人欣慰的是,ExecutorService接口相當(dāng)簡(jiǎn)單,、一目了然,,我建議全面地學(xué)習(xí)下它的文檔,。

大致來(lái)說(shuō),,當(dāng)你向ExecutorService發(fā)送了一個(gè)shutdown()消息后,,它就不會(huì)接收新提交的任務(wù),但是仍在隊(duì)列中的任務(wù)會(huì)被繼續(xù)處理完,。你可以使用isTerminated()來(lái)查詢(xún)ExecutorService終止?fàn)顟B(tài),,或使用awaitTermination(…)方法來(lái)等待ExecutorService終止。如果傳入一個(gè)最大超時(shí)時(shí)間作為參數(shù),,awaitTermination方法就不會(huì)永遠(yuǎn)等待,。

警告: 對(duì)JVM進(jìn)程永遠(yuǎn)不會(huì)退出的理解上,存在著一些錯(cuò)誤和迷惑,。如果你不關(guān)閉executorService,,只是銷(xiāo)毀了底層的線(xiàn)程,JVM就不會(huì)退出,。當(dāng)最后一個(gè)普通線(xiàn)程(非守護(hù)線(xiàn)程)退出后,,JVM也會(huì)退出。

配置ThreadPoolExecutor

如果你決定不使用Executor的工廠(chǎng)類(lèi),,而是手動(dòng)創(chuàng)建一個(gè) ThreadPoolExecutor,,你需要使用構(gòu)造函數(shù)來(lái)創(chuàng)建并配置。下面是這個(gè)類(lèi)使用最廣泛的一個(gè)構(gòu)造函數(shù):

public ThreadPoolExecutor( int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler);

如你所見(jiàn),,你可以配置以下內(nèi)容:

  • 核心池的大?。ň€(xiàn)程池將會(huì)使用的大小)
  • 最大池大小
  • 存活時(shí)間,,空閑線(xiàn)程在這個(gè)時(shí)間后被銷(xiāo)毀
  • 存放任務(wù)的工作隊(duì)列
  • 任務(wù)提交拒絕后要執(zhí)行的策略

限制隊(duì)列中任務(wù)數(shù)

限制執(zhí)行任務(wù)的并發(fā)數(shù),、限制線(xiàn)程池大小對(duì)應(yīng)用程序以及程序執(zhí)行結(jié)果的可預(yù)期性與穩(wěn)定性有很大的好處。無(wú)盡地創(chuàng)建線(xiàn)程,,最終會(huì)耗盡運(yùn)行時(shí)資源,。你的應(yīng)用程序因此會(huì)產(chǎn)生嚴(yán)重的性能問(wèn)題,甚至導(dǎo)致程序不穩(wěn)定,。

這只解決了部分問(wèn)題:限制了并發(fā)任務(wù)數(shù),,但并沒(méi)有限制提交到等待隊(duì)列的任務(wù)數(shù)。如果任務(wù)提交的速率一直高于任務(wù)執(zhí)行的速率,,那么應(yīng)用程序最終會(huì)出現(xiàn)資源短缺的狀況,。

解決方法是:

  • 為Executor提供一個(gè)存放待執(zhí)行任務(wù)的阻塞隊(duì)列。如果隊(duì)列填滿(mǎn),,以后提交的任務(wù)會(huì)被“拒絕”,。
  • 當(dāng)任務(wù)提交被拒絕時(shí)會(huì)觸發(fā)RejectedExecutionHandler,這也是為什么這個(gè)類(lèi)名中引用動(dòng)詞“rejected”,。你可以實(shí)現(xiàn)自己的拒絕策略,,或者使用框架內(nèi)置的策略。

默認(rèn)的拒絕策略可以讓executor拋出一個(gè)RejectedExecutionException異常,。然而,,還有其他的內(nèi)建策略:

  • 悄悄地丟棄一個(gè)任務(wù)
  • 丟棄最舊的任務(wù),,重新提交最新的
  • 在調(diào)用者的線(xiàn)程中執(zhí)行被拒絕的任務(wù)

什么時(shí)候以及為什么我們才會(huì)這樣配置線(xiàn)程池?讓我們看一個(gè)例子,。

示例:并行執(zhí)行獨(dú)立的單線(xiàn)程任務(wù)

最近,,我被叫去解決一個(gè)很久以前的任務(wù)的問(wèn)題,我的客戶(hù)之前就運(yùn)行過(guò)這個(gè)任務(wù),。大致來(lái)說(shuō),,這個(gè)任務(wù)包含一個(gè)組件,這個(gè)組件監(jiān)聽(tīng)目錄樹(shù)所產(chǎn)生的文件系統(tǒng)事件,。每當(dāng)一個(gè)事件被觸發(fā),,必須處理一個(gè)文件。一個(gè)專(zhuān)門(mén)的單線(xiàn)程執(zhí)行文件處理,。說(shuō)真的,,根據(jù)任務(wù)的特點(diǎn),即使我能把它并行化,,我也不想那么做,。一天的某些時(shí)候,事件到達(dá)率才很高,,文件也沒(méi)必要實(shí)時(shí)處理,,在第二天之前處理完即可。

當(dāng)前的實(shí)現(xiàn)采用了一些混合且匹配的技術(shù),,包括使用UNIX SHELL腳本掃描目錄結(jié)構(gòu),,并檢測(cè)是否發(fā)生改變。實(shí)現(xiàn)完成后,,我們采用了雙核的執(zhí)行環(huán)境,。同樣,事件的到達(dá)率相當(dāng)?shù)停耗壳盀橹?,事件?shù)以百萬(wàn)計(jì),,總共要處理1~2T字節(jié)的原始數(shù)據(jù)。

運(yùn)行處理程序的主機(jī)是12核的機(jī)器:很好機(jī)會(huì)去并行化這些舊的單線(xiàn)程任務(wù),?;旧希覀冇辛耸匙V的所有原料,,我們需要做的僅僅是把程序建立起來(lái)并調(diào)節(jié),。在寫(xiě)代碼前,我們必須了解下程序的負(fù)載,。我列一下我檢測(cè)到的內(nèi)容:

  • 有非常多的文件需要被周期性地掃描:每個(gè)目錄包含1~2百萬(wàn)個(gè)文件
  • 掃描算法很快,,可以并行化
  • 處理一個(gè)文件至少需要1s,甚至上升到2s或3s
  • 處理文件時(shí),,性能瓶頸主要是CPU
  • CPU利用率必須可調(diào),,根據(jù)一天時(shí)間的不同而使用不同的負(fù)載配置,。

我需要這樣一個(gè)線(xiàn)程池,它的大小在程序運(yùn)行的時(shí)候通過(guò)負(fù)載配置來(lái)設(shè)置,。我傾向于根據(jù)負(fù)載策略創(chuàng)建一個(gè)固定大小的線(xiàn)程池,。由于線(xiàn)程的性能瓶頸在CPU,,它的核心使用率是100%,,不會(huì)等待其他資源,那么負(fù)載策略就很好計(jì)算了:用執(zhí)行環(huán)境的CPU核心數(shù)乘以一個(gè)負(fù)載因子(保證計(jì)算的結(jié)果在峰值時(shí)至少有一個(gè)核心):

int cpus = Runtime.getRuntime().availableProcessors();int maxThreads = cpus * scaleFactor;maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后我需要使用阻塞隊(duì)列創(chuàng)建一個(gè)ThreadPoolExecutor,,可以限制提交的任務(wù)數(shù),。為什么?是這樣,,掃描算法執(zhí)行很快,,很快就產(chǎn)生龐大數(shù)量需要處理的文件。數(shù)量有多龐大呢,?很難預(yù)測(cè),,因?yàn)樽儎?dòng)太大了。我不想讓executor內(nèi)部的隊(duì)列不加選擇地填滿(mǎn)了要執(zhí)行的任務(wù)實(shí)例(這些實(shí)例包含了龐大的文件描述符),。我寧愿在隊(duì)列填滿(mǎn)時(shí),,拒絕這些文件。

而且,,我將使用ThreadPoolExecutor.CallerRunsPolicy作為拒絕策略,。為什么?因?yàn)楫?dāng)隊(duì)列已滿(mǎn)時(shí),,線(xiàn)程池的線(xiàn)程忙于處理文件,,我讓提交任務(wù)的線(xiàn)程去執(zhí)行它(被拒絕的任務(wù))。這樣,,掃面會(huì)停止,,轉(zhuǎn)而去處理一個(gè)文件,處理結(jié)束后馬上又會(huì)掃描目錄,。

下面是創(chuàng)建executor的代碼:

ExecutorService executorService = new ThreadPoolExecutor( maxThreads, // core thread pool size maxThreads, // maximum thread pool size 1, // time to wait before resizing pool TimeUnit.MINUTES, new ArrayBlockingQueue(maxThreads, true), new ThreadPoolExecutor.CallerRunsPolicy());
 下面是程序的框架(極其簡(jiǎn)化版):

// scanning loop: fake scanningwhile (!dirsToProcess.isEmpty()) { File currentDir = dirsToProcess.pop(); // listing children File[] children = currentDir.listFiles(); // processing children for (final File currentFile : children) { // if it's a directory, defer processing if (currentFile.isDirectory()) { dirsToProcess.add(currentFile); continue; } executorService.submit(new Runnable() { @Override public void run() { try { // if it's a file, process it new ConvertTask(currentFile).perform(); } catch (Exception ex) { // error management logic } } }); }}// ...// wait for all of the executor threads to finishexecutorService.shutdown();try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // pool didn't terminate after the first try executorService.shutdownNow(); } if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // pool didn't terminate after the second try }} catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt();}

總結(jié)

看到了吧,,Java并發(fā)API非常簡(jiǎn)單易用,十分靈活,,也很強(qiáng)大,。真希望我多年前可以多花點(diǎn)功夫?qū)懸粋€(gè)這樣簡(jiǎn)單的程序。這樣我就可以在幾小時(shí)內(nèi)解決由傳統(tǒng)單線(xiàn)程組件所引發(fā)的擴(kuò)展性問(wèn)題,。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀(guān)點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購(gòu)買(mǎi)等信息,,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多