久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

java 線程框架Executor的用法舉例

 昵稱9230487 2012-03-21
java5線程框架Executor的用法舉例

Executor 是 java5 下的一個(gè)多任務(wù)并發(fā)執(zhí)行框架(Doug Lea),可以建立一個(gè)類似數(shù)據(jù)庫(kù)連接池的線程池來執(zhí)行任務(wù),。這個(gè)框架主要由三個(gè)接口和其相應(yīng)的具體類組成,。Executor、 ExecutorService 和 ScheduledExecutorService ,。

1,、 Executor 接口:是用來執(zhí)行 Runnable 任務(wù)的;它只定義一個(gè)方法- execute(Runnable command),;執(zhí)行 Ruannable 類型的任務(wù),。

2、 ExecutorService 接口: 繼承Executor接口,,提供了執(zhí)行Callable任務(wù)和中止任務(wù)執(zhí)行的服務(wù),。

3、 ScheduledExecutorService 接口:繼承 ExecutorService 接口,,提供了按排程執(zhí)行任務(wù)的服務(wù),。

4、 Executors 類:為了方便使用, 建議使用 Executors的工具類來得到 Executor 接口的具體對(duì)象,。

Executors 類有幾個(gè)重要的方法,,在這里簡(jiǎn)明一下:

1、 callable(Runnable task): 將 Runnable 的任務(wù)轉(zhuǎn)化成 Callable 的任務(wù)

2,、 newSingleThreadExecutor(): 產(chǎn)生一個(gè) ExecutorService 對(duì)象,,這個(gè)對(duì)象只有一個(gè)線程可用來執(zhí)行任務(wù),若任務(wù)多于一個(gè),,任務(wù)將按先后順序執(zhí)行,。

3,、 newCachedThreadPool(): 產(chǎn)生一個(gè) ExecutorService 對(duì)象,這個(gè)對(duì)象帶有一個(gè)線程池,,線程池的大小會(huì)根據(jù)需要調(diào)整,,線程執(zhí)行完任務(wù)后返回線程池,供執(zhí)行下一次任務(wù)使用,。

4,、 newFixedThreadPool(int poolSize): 產(chǎn)生一個(gè) ExecutorService 對(duì)象,這個(gè)對(duì)象帶有一個(gè)大小為 poolSize 的線程池,,若任務(wù)數(shù)量大于 poolSize ,,任務(wù)會(huì)被放在一個(gè) queue 里順序執(zhí)行。

5,、 newSingleThreadScheduledExecutor(): 產(chǎn)生一個(gè) ScheduledExecutorService 對(duì)象,,這個(gè)對(duì)象的線程池大小為 1 ,若任務(wù)多于一個(gè),,任務(wù)將按先后順序執(zhí)行,。

6、 newScheduledThreadPool(int poolSize): 產(chǎn)生一個(gè) ScheduledExecutorService 對(duì)象,,這個(gè)對(duì)象的線程池大小為 poolSize ,,若任務(wù)數(shù)量大于 poolSize ,任務(wù)會(huì)在一個(gè) queue 里等待執(zhí)行,。

有關(guān)Executor框架其它類的說明請(qǐng)參看JAVA 5 的 API文檔

下面是幾個(gè)簡(jiǎn)單的例子,,用以示例Executors中幾個(gè)主要方法的使用。

1,、 Task.java 任務(wù)

2,、 SingleThreadExecutorTest.java 單線程執(zhí)行程序的測(cè)試

3、 CachedThreadPoolTest.java 線程池線程執(zhí)行程序的測(cè)試

4,、 FixedThreadPoolTest.java 線程池線程執(zhí)行程序的測(cè)試(線程數(shù)固定)

5,、 DaemonThreadFactory.java 守護(hù)線程生成工廠

6、 MaxPriorityThreadFactory.java 大優(yōu)先級(jí)線程生成工廠

7,、 MinPriorityThreadFactory.java 小優(yōu)先級(jí)線程生成工廠

8,、 ThreadFactoryExecutorTest.java 在自定義線程生成工廠下的測(cè)試

=============== 1、 Task.java

package Executor;

//可執(zhí)行任務(wù)

public class Task implements Runnable {

// 中斷信號(hào)

volatile boolean stop = false;

// 該任務(wù)執(zhí)行的次數(shù)

private int runCount = 0;

// 任務(wù)標(biāo)識(shí)

private int taskId;

public Task(int taskId) {

this.taskId = taskId;

System.out.println("Create Task-" + taskId);

}

// 執(zhí)行任務(wù)

public void run() {

while (!stop) {

try {

Thread.sleep(10);

} catch (InterruptedException e) {

System.out.println("Task interrupted...");

}

// 線程運(yùn)行3次后,中斷信號(hào)置為true

if (++runCount == 3)

stop = true;

// 輸出一些語句

System.out.println("" + Thread.currentThread().toString() + "tttt execute Task-" + taskId + "'s " + runCount

+ "th run. ");

}

}

}

=============== 1 end

=============== 2,、 SingleThreadExecutorTest.java

package Executor;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class SingleThreadExecutorTest {

public static void main(String[] args) {

try {

// 創(chuàng)建一個(gè)單線程執(zhí)行程序

ExecutorService executorService = Executors.newSingleThreadExecutor();

for (int i =1; i <= 3; i++) {

executorService.execute(new Task(i));

}

executorService.shutdown();

} catch (Exception e) {}

}

}

=============== 2 end

=============== 3,、 CachedThreadPoolTest.java

package Executor;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CachedThreadPoolTest {

public static void main(String[] args) {

try {

// 建新線程的線程池,如果之前構(gòu)造的線程可用則重用它們

ExecutorService executorService = Executors.newCachedThreadPool();

for (int i =1; i <= 4; i++) {

executorService.execute(new Task(i));

}

executorService.shutdown();

} catch (Exception e) {}

}

}

=============== 3 end

=============== 4,、 FixedThreadPoolTest.java

package Executor;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class FixedThreadPoolTest {

public static void main(String[] args) {

try {

// 創(chuàng)建固定線程數(shù)的線程池,,以共享的無界隊(duì)列方式來運(yùn)行這些線程

ExecutorService executorService = Executors.newFixedThreadPool(2);

for (int i =1; i <= 5; i++) {

executorService.execute(new Task(i));

}

executorService.shutdown();

} catch (Exception e) {}

}

}

=============== 4 end

=============== 5、廣州軟件開發(fā)培訓(xùn) DaemonThreadFactory.java

package Executor;

import java.util.concurrent.ThreadFactory;

public class DaemonThreadFactory implements ThreadFactory {

//創(chuàng)建一個(gè)守護(hù)線程

public Thread newThread(Runnable r) {

Thread t = new Thread(r);

t.setDaemon(true);

return t;

}

}

=============== 5 end

=============== 6,、 MaxPriorityThreadFactory.java

package Executor;

import java.util.concurrent.ThreadFactory;

public class MaxPriorityThreadFactory implements ThreadFactory {

//創(chuàng)建一個(gè)最大優(yōu)先級(jí)的線程

public Thread newThread(Runnable r) {

Thread t = new Thread(r);

//優(yōu)先級(jí)最大,、意思是切換到這個(gè)線程的概率比其它的低一些

t.setPriority(Thread.MAX_PRIORITY);

return t;

}

}

=============== 6 end

=============== 7,、 MinPriorityThreadFactory.java

package Executor;

import java.util.concurrent.ThreadFactory;

public class MinPriorityThreadFactory implements ThreadFactory {

//創(chuàng)建一個(gè)最小優(yōu)先級(jí)的線程

public Thread newThread(Runnable r) {

Thread t = new Thread(r);

//優(yōu)先級(jí)最小、意思是切換到這個(gè)線程的概率比其它的低一些

t.setPriority(Thread.MIN_PRIORITY);

return t;

}

}

=============== 7 end

=============== 8,、 ThreadFactoryExecutorTest.java

package Executor;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class ThreadFactoryExecutorTest {

public static void main(String[] args) {

try {

// 創(chuàng)建一個(gè)單線程執(zhí)行程序

ExecutorService defaultExecutor = Executors.newCachedThreadPool();

ExecutorService daemonExec = Executors

.newCachedThreadPool(new DaemonThreadFactory());

ExecutorService maxPriorityExecutor = Executors

.newCachedThreadPool(new MaxPriorityThreadFactory());

ExecutorService minPriorityExecutor = Executors

.newCachedThreadPool(new MinPriorityThreadFactory());

//用守護(hù)線程執(zhí)行任務(wù)

for (int i = 1; i < 10; i++){

daemonExec.execute(new Task(i));

}

//用其它線程執(zhí)行任務(wù)

for (int j = 10; j <= 20; j++){

if (j == 10)

maxPriorityExecutor.execute(new Task(j));

else if (j == 11)

minPriorityExecutor.execute(new Task(j));

else

defaultExecutor.execute(new Task(j));

}

} catch (Exception e) {}

}

}

每問題每線程:在于它沒有對(duì)已創(chuàng)建線程的數(shù)量進(jìn)行任何限制,,除非對(duì)客戶端能夠拋出的請(qǐng)求速率進(jìn)行限制。

無限制創(chuàng)建線程的缺點(diǎn):

1.線程生命周期的開銷:線程的創(chuàng)建和關(guān)閉并不是“免費(fèi)的”,。

2.資源消耗量:活動(dòng)線程會(huì)消耗系統(tǒng)資源,,尤其是內(nèi)存。

3.穩(wěn)定性,。

1 線程池(Thread Pool)

在java中,,任務(wù)執(zhí)行的首要抽象不是Thread,而是Executor,。

public interface Executor {

/**

* Executor只是一個(gè)簡(jiǎn)單的接口,,但是他卻為一個(gè)靈活而且強(qiáng)大的框架創(chuàng)造了基礎(chǔ)。

*/

void execute(Runnable command);

}

這個(gè)框架可以用于異步任務(wù)執(zhí)行,,而且支持很多不同類型的任務(wù)執(zhí)行策略。它還為任務(wù)提交和任務(wù)執(zhí)行之間的解耦提供了標(biāo)準(zhǔn)的方法,。另外,,還提供了對(duì)生命周期的支持以及鉤子函數(shù),可以添加諸如統(tǒng)計(jì)收集,、應(yīng)用程序管理機(jī)制和監(jiān)視器等擴(kuò)展,。

1.1 創(chuàng)建/關(guān)閉

1.newCachedThreadPool

創(chuàng)建可緩存的線程池,多的話回收,,少的話增加,,沒有限制

N2.ewFixedThreadPool(int nThreads)

創(chuàng)建定長(zhǎng)的線程池,每提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,,直到最大,。如果某個(gè)以外終止,會(huì)補(bǔ)充一個(gè)新的

3.newScheduledThreadPool(int corePoolSize)

定長(zhǎng)線程池,,而且支持定時(shí)的以及周期性的任務(wù)執(zhí)行,。相當(dāng)于timer。

4.newSingleThreadExecutor()

單線程化的executor,,只創(chuàng)建唯一的工作線程來之心吧任務(wù),。如果它意外結(jié)束,會(huì)有另一個(gè)取代它,。它會(huì)保證任務(wù)隊(duì)列所規(guī)定的順序執(zhí)行,。

5.newSingleThreadScheduledExecutor()

創(chuàng)建一個(gè)單線程執(zhí)行程序,它可安排在給定延遲后運(yùn)行命令或者定期地執(zhí)行,。

private static final Executor

exec=Executors.newFixedThreadPool(100);

ServerSocket socket=new ServerSocket(80);

while(true){

final Socket connection=socket.accept();

Runnable task=new Runnable(){

public void run(){

handleRequest(connection);

}

};

exec.execute(exec);

}

1.2 ExecutorService

線程如果無法正常關(guān)閉http://,,則會(huì)阻止JVM的結(jié)束,。線程池中的任務(wù),可能已經(jīng)完成,,可能正在運(yùn)行,,其他還有在隊(duì)列中等待執(zhí)行。關(guān)閉的時(shí)候可能平緩的關(guān)閉,,到唐突的關(guān)閉(拔掉電源),。為了解決生命周期的問題,ExecutorService擴(kuò)展了Executor,,并且添加了一些用于生命周期管理的方法,。

public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)

throws InterruptedException;

。,。,。。,。

}

可以關(guān)閉 ExecutorService,,這將導(dǎo)致其拒絕新任務(wù)。提供兩個(gè)方法來關(guān)閉 ExecutorService,。shutdown() 方法在終止前允許執(zhí)行以前提交的任務(wù),,而 shutdownNow() 方法阻止等待任務(wù)啟動(dòng)并試圖停止當(dāng)前正在執(zhí)行的任務(wù)。在終止時(shí),,執(zhí)行程序沒有任務(wù)在執(zhí)行,,也沒有任務(wù)在等待執(zhí)行,并且無法提交新任務(wù),。應(yīng)該關(guān)閉未使用的 ExecutorService 以允許回收其資源,。

exec.shutdown();

ExecutorService線程池

java并發(fā)編程-Executor框架

1.3 延遲,并且周期性的任務(wù),,newScheduledThreadPool

Timer的問題:

1.只能創(chuàng)建唯一的線程來執(zhí)行所有timer任務(wù),。

2.如果一個(gè)timerTask很耗時(shí),會(huì)導(dǎo)致其他TimerTask的時(shí)效準(zhǔn)確性出問題

3.如果TimerTask拋出未檢查的異常,,Timer將會(huì)產(chǎn)生無法預(yù)料的行為,。Timer不會(huì)重新恢復(fù)。另外一個(gè)Timer中的Task出現(xiàn)異常以后,,后面再給這個(gè)Timer的任務(wù)也將會(huì)無法執(zhí)行,。

1.4 結(jié)合DelayQueue

如果要自己構(gòu)建調(diào)度服務(wù),那還可以考慮使用DelayQueue,,它里面的每個(gè)對(duì)象低耦合一個(gè)延遲時(shí)間有關(guān)聯(lián),,只有過期以后,DelayQueue才能讓你執(zhí)行take操作獲取元素。那么當(dāng)它里面的對(duì)象是FutureTask的時(shí)候,,就可以構(gòu)成一個(gè)簡(jiǎn)單的調(diào)度隊(duì)列,。

2 Runnable和Future、CallBack

2.1 Runnable

Executor框架讓定制一個(gè)執(zhí)行策略變得簡(jiǎn)單,,不過想要使用它,,你的任務(wù)還必須實(shí)現(xiàn)Runnable接口。在許多服務(wù)器請(qǐng)求中,,都存在一個(gè)情況,,那就是:?jiǎn)我坏目蛻粽?qǐng)求。它能執(zhí)行一些簡(jiǎn)單的任務(wù),,但是他不能返回一個(gè)值或者拋出受檢查的異常,。

2.2 Callback

很多任務(wù)都會(huì)引起計(jì)算延遲,包括執(zhí)行數(shù)據(jù)庫(kù)查詢,、從網(wǎng)絡(luò)上獲取資源,、進(jìn)行復(fù)雜的計(jì)算。這些任務(wù)Callback抽象更好,。它也可以被Executor框架執(zhí)行

Executors包含很多靜態(tài)方法,,可以吧Runnable和PrivilegedAction封裝為Callable。

2.3 FutureTask

Runnable和Callable描述的是抽象的計(jì)算性任務(wù),,這些任務(wù)通常是有限的,,他們有開始,而且最終會(huì)結(jié)束,。

一個(gè)Executor執(zhí)行的任務(wù)有4個(gè)周期,創(chuàng)建,、提交,、開始、完成,。由于任務(wù)的執(zhí)行會(huì)花很長(zhǎng)時(shí)間,,我們也希望可以取消任務(wù)。

Future描述了任務(wù)的生命周期,,并提供了相關(guān)的方法來獲得任務(wù)的結(jié)果,、取消任務(wù)以及檢驗(yàn)任務(wù)是否完成還是取消。對(duì)應(yīng)的isDone,、isCancelled() 方法,,它不能后退,一旦完成,,就永遠(yuǎn)停在完成狀態(tài)上,。用get(等待)獲得結(jié)果。

/**

* @param args

* @throws InterruptedException

* @throws ExecutionException

*/

public static void main(String[] args) throws

InterruptedException {

int threadCounts = 19;// 使用的線程數(shù)

long sum = 0;

ExecutorService exec = Executors.newFixedThreadPool(threadCounts);

List<Callable<Long>> callList = new ArrayList<Callable<Long>>();

// 生成很大的List

List<Integer> list = new ArrayList<Integer>();

for (int i = 0; i <= ; i++) {

list.add(i);

}

int len = list.size() / threadCounts;// 平均分割List

// List中的數(shù)量沒有線程數(shù)多(很少存在)

if (len == 0) {

threadCounts = list.size();// 采用一個(gè)線程處理List中的一個(gè)元素

len = list.size() / threadCounts;// 重新平均分割List

}

for (int i = 0; i < threadCounts; i++) {

final List<Integer> subList;

if (i == threadCounts - 1) {

subList = list.subList(i * len, list.size());

} else {

subList = list.subList(i * len,

len * (i + 1) > list.size() ? list.size() : len

* (i + 1));

}

// 采用匿名內(nèi)部類實(shí)現(xiàn)

callList.add(new Callable<Long>() {

public Long call() throws Exception {

long subSum = 0L;

for (Integer i : subList) {

subSum += i;

}

System.out.println("分配給線程:"

+ Thread.currentThread().getName()

+ "那一部分List的整數(shù)和為:tSubSum:" + subSum);

return subSum;

}

});

}

List<Future<Long>> futureList = exec.invokeAll(callList);

for (Future<Long> future : futureList) {

sum += future.get();

}

exec.shutdown();

System.out.println(sum);

}

2.4 和Service的結(jié)合

Runnable和Callable類都可以通過Service的submit方法提交,并且返回一個(gè)Future,,它表示這個(gè)任務(wù),,可以獲得該任務(wù)的執(zhí)行結(jié)果或者取消它。

3 CompletionService

向Executor提交一個(gè)批處理任務(wù),,并且希望獲得結(jié)果,,那么你將會(huì)使用Future,然后不斷的調(diào)用isDone來檢驗(yàn)是否完成,,這樣太麻煩,,還有更好的方法,那就是完成服務(wù),,CompletionService,。poll方法不會(huì)等待,返回null,。take方法會(huì)等待,。

它整合了Executor和BlockingQueue的功能,你可以將Callable任務(wù)交給他執(zhí)行,,然后使用類似于隊(duì)列中的take何poll方法,,在結(jié)果完整時(shí)可用時(shí)獲得這個(gè)結(jié)果。ExecutorCompletionService是它的實(shí)現(xiàn)類,。

它的實(shí)現(xiàn)也比較簡(jiǎn)單,,在構(gòu)造函數(shù)中創(chuàng)建一個(gè)BlockingQueue,用它保存結(jié)果:

private final BlockingQueue<Future<V>> completionQueue;

提交的任務(wù)被包裝成QueueFuture:

private class QueueingFuture extends FutureTask<Void> {

QueueingFuture(RunnableFuture<V> task) {

super(task, null);

this.task = task;

}

protected void done() { completionQueue.add(task); }

private final Future<V> task;

}

覆寫done方法,,將結(jié)果置入BlockingQueue,。

它與上面獲得一堆FutureTask,然后遍歷的去get等返回還不一樣,。它只能一個(gè)個(gè)獲取,,代表有一個(gè)拿一個(gè)。FutureTask的get可能后面的FutureTask都已經(jīng)好了,,可是有一個(gè)還沒好,,那就卡在中間了。

 


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購(gòu)買等信息,,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多