java5線程框架Executor的用法舉例
Executor 是 java5 下的一個(gè)多任務(wù)并發(fā)執(zhí)行框架(Doug Lea),可以建立一個(gè)類似數(shù)據(jù)庫(kù)連接池的線程池來執(zhí)行任務(wù),。這個(gè)框架主要由三個(gè)接口和其相應(yīng)的具體類組成,。Executor、 ExecutorService 和 ScheduledExecutorService ,。 1,、 Executor 接口:是用來執(zhí)行 Runnable 任務(wù)的;它只定義一個(gè)方法- execute(Runnable command),;執(zhí)行 Ruannable 類型的任務(wù),。 2、 ExecutorService 接口: 繼承Executor接口,,提供了執(zhí)行Callable任務(wù)和中止任務(wù)執(zhí)行的服務(wù),。 3、 ScheduledExecutorService 接口:繼承 ExecutorService 接口,,提供了按排程執(zhí)行任務(wù)的服務(wù),。 4、 Executors 類:為了方便使用, 建議使用 Executors的工具類來得到 Executor 接口的具體對(duì)象,。 Executors 類有幾個(gè)重要的方法,,在這里簡(jiǎn)明一下: 1、 callable(Runnable task): 將 Runnable 的任務(wù)轉(zhuǎn)化成 Callable 的任務(wù) 2,、 newSingleThreadExecutor(): 產(chǎn)生一個(gè) ExecutorService 對(duì)象,,這個(gè)對(duì)象只有一個(gè)線程可用來執(zhí)行任務(wù),若任務(wù)多于一個(gè),,任務(wù)將按先后順序執(zhí)行,。 3,、 newCachedThreadPool(): 產(chǎn)生一個(gè) ExecutorService 對(duì)象,這個(gè)對(duì)象帶有一個(gè)線程池,,線程池的大小會(huì)根據(jù)需要調(diào)整,,線程執(zhí)行完任務(wù)后返回線程池,供執(zhí)行下一次任務(wù)使用,。 4,、 newFixedThreadPool(int poolSize): 產(chǎn)生一個(gè) ExecutorService 對(duì)象,這個(gè)對(duì)象帶有一個(gè)大小為 poolSize 的線程池,,若任務(wù)數(shù)量大于 poolSize ,,任務(wù)會(huì)被放在一個(gè) queue 里順序執(zhí)行。 5,、 newSingleThreadScheduledExecutor(): 產(chǎn)生一個(gè) ScheduledExecutorService 對(duì)象,,這個(gè)對(duì)象的線程池大小為 1 ,若任務(wù)多于一個(gè),,任務(wù)將按先后順序執(zhí)行,。 6、 newScheduledThreadPool(int poolSize): 產(chǎn)生一個(gè) ScheduledExecutorService 對(duì)象,,這個(gè)對(duì)象的線程池大小為 poolSize ,,若任務(wù)數(shù)量大于 poolSize ,任務(wù)會(huì)在一個(gè) queue 里等待執(zhí)行,。 有關(guān)Executor框架其它類的說明請(qǐng)參看JAVA 5 的 API文檔 下面是幾個(gè)簡(jiǎn)單的例子,,用以示例Executors中幾個(gè)主要方法的使用。 1,、 Task.java 任務(wù) 2,、 SingleThreadExecutorTest.java 單線程執(zhí)行程序的測(cè)試 3、 CachedThreadPoolTest.java 線程池線程執(zhí)行程序的測(cè)試 4,、 FixedThreadPoolTest.java 線程池線程執(zhí)行程序的測(cè)試(線程數(shù)固定) 5,、 DaemonThreadFactory.java 守護(hù)線程生成工廠 6、 MaxPriorityThreadFactory.java 大優(yōu)先級(jí)線程生成工廠 7,、 MinPriorityThreadFactory.java 小優(yōu)先級(jí)線程生成工廠 8,、 ThreadFactoryExecutorTest.java 在自定義線程生成工廠下的測(cè)試 =============== 1、 Task.java package Executor; //可執(zhí)行任務(wù) public class Task implements Runnable { // 中斷信號(hào) volatile boolean stop = false; // 該任務(wù)執(zhí)行的次數(shù) private int runCount = 0; // 任務(wù)標(biāo)識(shí) private int taskId; public Task(int taskId) { this.taskId = taskId; System.out.println("Create Task-" + taskId); } // 執(zhí)行任務(wù) public void run() { while (!stop) { try { Thread.sleep(10); } catch (InterruptedException e) { System.out.println("Task interrupted..."); } // 線程運(yùn)行3次后,中斷信號(hào)置為true if (++runCount == 3) stop = true; // 輸出一些語句 System.out.println("" + Thread.currentThread().toString() + "tttt execute Task-" + taskId + "'s " + runCount + "th run. "); } } } =============== 1 end =============== 2,、 SingleThreadExecutorTest.java package Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SingleThreadExecutorTest { public static void main(String[] args) { try { // 創(chuàng)建一個(gè)單線程執(zhí)行程序 ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i =1; i <= 3; i++) { executorService.execute(new Task(i)); } executorService.shutdown(); } catch (Exception e) {} } } =============== 2 end =============== 3,、 CachedThreadPoolTest.java package Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CachedThreadPoolTest { public static void main(String[] args) { try { // 建新線程的線程池,如果之前構(gòu)造的線程可用則重用它們 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i =1; i <= 4; i++) { executorService.execute(new Task(i)); } executorService.shutdown(); } catch (Exception e) {} } } =============== 3 end =============== 4,、 FixedThreadPoolTest.java package Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FixedThreadPoolTest { public static void main(String[] args) { try { // 創(chuàng)建固定線程數(shù)的線程池,,以共享的無界隊(duì)列方式來運(yùn)行這些線程 ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i =1; i <= 5; i++) { executorService.execute(new Task(i)); } executorService.shutdown(); } catch (Exception e) {} } } =============== 4 end =============== 5、廣州軟件開發(fā)培訓(xùn) DaemonThreadFactory.java package Executor; import java.util.concurrent.ThreadFactory; public class DaemonThreadFactory implements ThreadFactory { //創(chuàng)建一個(gè)守護(hù)線程 public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } } =============== 5 end =============== 6,、 MaxPriorityThreadFactory.java package Executor; import java.util.concurrent.ThreadFactory; public class MaxPriorityThreadFactory implements ThreadFactory { //創(chuàng)建一個(gè)最大優(yōu)先級(jí)的線程 public Thread newThread(Runnable r) { Thread t = new Thread(r); //優(yōu)先級(jí)最大,、意思是切換到這個(gè)線程的概率比其它的低一些 t.setPriority(Thread.MAX_PRIORITY); return t; } } =============== 6 end =============== 7,、 MinPriorityThreadFactory.java package Executor; import java.util.concurrent.ThreadFactory; public class MinPriorityThreadFactory implements ThreadFactory { //創(chuàng)建一個(gè)最小優(yōu)先級(jí)的線程 public Thread newThread(Runnable r) { Thread t = new Thread(r); //優(yōu)先級(jí)最小、意思是切換到這個(gè)線程的概率比其它的低一些 t.setPriority(Thread.MIN_PRIORITY); return t; } } =============== 7 end =============== 8,、 ThreadFactoryExecutorTest.java package Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadFactoryExecutorTest { public static void main(String[] args) { try { // 創(chuàng)建一個(gè)單線程執(zhí)行程序 ExecutorService defaultExecutor = Executors.newCachedThreadPool(); ExecutorService daemonExec = Executors .newCachedThreadPool(new DaemonThreadFactory()); ExecutorService maxPriorityExecutor = Executors .newCachedThreadPool(new MaxPriorityThreadFactory()); ExecutorService minPriorityExecutor = Executors .newCachedThreadPool(new MinPriorityThreadFactory()); //用守護(hù)線程執(zhí)行任務(wù) for (int i = 1; i < 10; i++){ daemonExec.execute(new Task(i)); } //用其它線程執(zhí)行任務(wù) for (int j = 10; j <= 20; j++){ if (j == 10) maxPriorityExecutor.execute(new Task(j)); else if (j == 11) minPriorityExecutor.execute(new Task(j)); else defaultExecutor.execute(new Task(j)); } } catch (Exception e) {} } } 每問題每線程:在于它沒有對(duì)已創(chuàng)建線程的數(shù)量進(jìn)行任何限制,,除非對(duì)客戶端能夠拋出的請(qǐng)求速率進(jìn)行限制。 無限制創(chuàng)建線程的缺點(diǎn): 1.線程生命周期的開銷:線程的創(chuàng)建和關(guān)閉并不是“免費(fèi)的”,。 2.資源消耗量:活動(dòng)線程會(huì)消耗系統(tǒng)資源,,尤其是內(nèi)存。 3.穩(wěn)定性,。 1 線程池(Thread Pool) 在java中,,任務(wù)執(zhí)行的首要抽象不是Thread,而是Executor,。 public interface Executor { /** * Executor只是一個(gè)簡(jiǎn)單的接口,,但是他卻為一個(gè)靈活而且強(qiáng)大的框架創(chuàng)造了基礎(chǔ)。 */ void execute(Runnable command); } 這個(gè)框架可以用于異步任務(wù)執(zhí)行,,而且支持很多不同類型的任務(wù)執(zhí)行策略。它還為任務(wù)提交和任務(wù)執(zhí)行之間的解耦提供了標(biāo)準(zhǔn)的方法,。另外,,還提供了對(duì)生命周期的支持以及鉤子函數(shù),可以添加諸如統(tǒng)計(jì)收集,、應(yīng)用程序管理機(jī)制和監(jiān)視器等擴(kuò)展,。 1.1 創(chuàng)建/關(guān)閉 1.newCachedThreadPool 創(chuàng)建可緩存的線程池,多的話回收,,少的話增加,,沒有限制 N2.ewFixedThreadPool(int nThreads) 創(chuàng)建定長(zhǎng)的線程池,每提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,,直到最大,。如果某個(gè)以外終止,會(huì)補(bǔ)充一個(gè)新的 3.newScheduledThreadPool(int corePoolSize) 定長(zhǎng)線程池,,而且支持定時(shí)的以及周期性的任務(wù)執(zhí)行,。相當(dāng)于timer。 4.newSingleThreadExecutor() 單線程化的executor,,只創(chuàng)建唯一的工作線程來之心吧任務(wù),。如果它意外結(jié)束,會(huì)有另一個(gè)取代它,。它會(huì)保證任務(wù)隊(duì)列所規(guī)定的順序執(zhí)行,。 5.newSingleThreadScheduledExecutor() 創(chuàng)建一個(gè)單線程執(zhí)行程序,它可安排在給定延遲后運(yùn)行命令或者定期地執(zhí)行,。 private static final Executor exec=Executors.newFixedThreadPool(100); ServerSocket socket=new ServerSocket(80); while(true){ final Socket connection=socket.accept(); Runnable task=new Runnable(){ public void run(){ handleRequest(connection); } }; exec.execute(exec); } 1.2 ExecutorService 線程如果無法正常關(guān)閉http://,,則會(huì)阻止JVM的結(jié)束,。線程池中的任務(wù),可能已經(jīng)完成,,可能正在運(yùn)行,,其他還有在隊(duì)列中等待執(zhí)行。關(guān)閉的時(shí)候可能平緩的關(guān)閉,,到唐突的關(guān)閉(拔掉電源),。為了解決生命周期的問題,ExecutorService擴(kuò)展了Executor,,并且添加了一些用于生命周期管理的方法,。 public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; 。,。,。。,。 } 可以關(guān)閉 ExecutorService,,這將導(dǎo)致其拒絕新任務(wù)。提供兩個(gè)方法來關(guān)閉 ExecutorService,。shutdown() 方法在終止前允許執(zhí)行以前提交的任務(wù),,而 shutdownNow() 方法阻止等待任務(wù)啟動(dòng)并試圖停止當(dāng)前正在執(zhí)行的任務(wù)。在終止時(shí),,執(zhí)行程序沒有任務(wù)在執(zhí)行,,也沒有任務(wù)在等待執(zhí)行,并且無法提交新任務(wù),。應(yīng)該關(guān)閉未使用的 ExecutorService 以允許回收其資源,。 exec.shutdown(); ExecutorService線程池 java并發(fā)編程-Executor框架 1.3 延遲,并且周期性的任務(wù),,newScheduledThreadPool Timer的問題: 1.只能創(chuàng)建唯一的線程來執(zhí)行所有timer任務(wù),。 2.如果一個(gè)timerTask很耗時(shí),會(huì)導(dǎo)致其他TimerTask的時(shí)效準(zhǔn)確性出問題 3.如果TimerTask拋出未檢查的異常,,Timer將會(huì)產(chǎn)生無法預(yù)料的行為,。Timer不會(huì)重新恢復(fù)。另外一個(gè)Timer中的Task出現(xiàn)異常以后,,后面再給這個(gè)Timer的任務(wù)也將會(huì)無法執(zhí)行,。 1.4 結(jié)合DelayQueue 如果要自己構(gòu)建調(diào)度服務(wù),那還可以考慮使用DelayQueue,,它里面的每個(gè)對(duì)象低耦合一個(gè)延遲時(shí)間有關(guān)聯(lián),,只有過期以后,DelayQueue才能讓你執(zhí)行take操作獲取元素。那么當(dāng)它里面的對(duì)象是FutureTask的時(shí)候,,就可以構(gòu)成一個(gè)簡(jiǎn)單的調(diào)度隊(duì)列,。 2 Runnable和Future、CallBack 2.1 Runnable Executor框架讓定制一個(gè)執(zhí)行策略變得簡(jiǎn)單,,不過想要使用它,,你的任務(wù)還必須實(shí)現(xiàn)Runnable接口。在許多服務(wù)器請(qǐng)求中,,都存在一個(gè)情況,,那就是:?jiǎn)我坏目蛻粽?qǐng)求。它能執(zhí)行一些簡(jiǎn)單的任務(wù),,但是他不能返回一個(gè)值或者拋出受檢查的異常,。 2.2 Callback 很多任務(wù)都會(huì)引起計(jì)算延遲,包括執(zhí)行數(shù)據(jù)庫(kù)查詢,、從網(wǎng)絡(luò)上獲取資源,、進(jìn)行復(fù)雜的計(jì)算。這些任務(wù)Callback抽象更好,。它也可以被Executor框架執(zhí)行 Executors包含很多靜態(tài)方法,,可以吧Runnable和PrivilegedAction封裝為Callable。 2.3 FutureTask Runnable和Callable描述的是抽象的計(jì)算性任務(wù),,這些任務(wù)通常是有限的,,他們有開始,而且最終會(huì)結(jié)束,。 一個(gè)Executor執(zhí)行的任務(wù)有4個(gè)周期,創(chuàng)建,、提交,、開始、完成,。由于任務(wù)的執(zhí)行會(huì)花很長(zhǎng)時(shí)間,,我們也希望可以取消任務(wù)。 Future描述了任務(wù)的生命周期,,并提供了相關(guān)的方法來獲得任務(wù)的結(jié)果,、取消任務(wù)以及檢驗(yàn)任務(wù)是否完成還是取消。對(duì)應(yīng)的isDone,、isCancelled() 方法,,它不能后退,一旦完成,,就永遠(yuǎn)停在完成狀態(tài)上,。用get(等待)獲得結(jié)果。 /** * @param args * @throws InterruptedException * @throws ExecutionException */ public static void main(String[] args) throws InterruptedException { int threadCounts = 19;// 使用的線程數(shù) long sum = 0; ExecutorService exec = Executors.newFixedThreadPool(threadCounts); List<Callable<Long>> callList = new ArrayList<Callable<Long>>(); // 生成很大的List List<Integer> list = new ArrayList<Integer>(); for (int i = 0; i <= ; i++) { list.add(i); } int len = list.size() / threadCounts;// 平均分割List // List中的數(shù)量沒有線程數(shù)多(很少存在) if (len == 0) { threadCounts = list.size();// 采用一個(gè)線程處理List中的一個(gè)元素 len = list.size() / threadCounts;// 重新平均分割List } for (int i = 0; i < threadCounts; i++) { final List<Integer> subList; if (i == threadCounts - 1) { subList = list.subList(i * len, list.size()); } else { subList = list.subList(i * len, len * (i + 1) > list.size() ? list.size() : len * (i + 1)); } // 采用匿名內(nèi)部類實(shí)現(xiàn) callList.add(new Callable<Long>() { public Long call() throws Exception { long subSum = 0L; for (Integer i : subList) { subSum += i; } System.out.println("分配給線程:" + Thread.currentThread().getName() + "那一部分List的整數(shù)和為:tSubSum:" + subSum); return subSum; } }); } List<Future<Long>> futureList = exec.invokeAll(callList); for (Future<Long> future : futureList) { sum += future.get(); } exec.shutdown(); System.out.println(sum); } 2.4 和Service的結(jié)合 Runnable和Callable類都可以通過Service的submit方法提交,并且返回一個(gè)Future,,它表示這個(gè)任務(wù),,可以獲得該任務(wù)的執(zhí)行結(jié)果或者取消它。 3 CompletionService 向Executor提交一個(gè)批處理任務(wù),,并且希望獲得結(jié)果,,那么你將會(huì)使用Future,然后不斷的調(diào)用isDone來檢驗(yàn)是否完成,,這樣太麻煩,,還有更好的方法,那就是完成服務(wù),,CompletionService,。poll方法不會(huì)等待,返回null,。take方法會(huì)等待,。 它整合了Executor和BlockingQueue的功能,你可以將Callable任務(wù)交給他執(zhí)行,,然后使用類似于隊(duì)列中的take何poll方法,,在結(jié)果完整時(shí)可用時(shí)獲得這個(gè)結(jié)果。ExecutorCompletionService是它的實(shí)現(xiàn)類,。 它的實(shí)現(xiàn)也比較簡(jiǎn)單,,在構(gòu)造函數(shù)中創(chuàng)建一個(gè)BlockingQueue,用它保存結(jié)果: private final BlockingQueue<Future<V>> completionQueue; 提交的任務(wù)被包裝成QueueFuture: private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } 覆寫done方法,,將結(jié)果置入BlockingQueue,。 它與上面獲得一堆FutureTask,然后遍歷的去get等返回還不一樣,。它只能一個(gè)個(gè)獲取,,代表有一個(gè)拿一個(gè)。FutureTask的get可能后面的FutureTask都已經(jīng)好了,,可是有一個(gè)還沒好,,那就卡在中間了。
|
|