久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Java 線程池的理論與實(shí)踐(二)

 太原中軟 2017-05-15

四、如果線程死掉了怎么辦

  幾乎所有Executors中生成線程池的方法的注釋上,,都有代表相同意思的一句話,,表示如果線程池中的某個線程死掉了,線程池會生成一個新的線程代替它,。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的注釋,。

  If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

線程死亡的原因

  我們都知道守護(hù)線程(daemon)會在所有的非守護(hù)線程都死掉之后也死掉,除此之外導(dǎo)致一個非守護(hù)線程死掉有以下幾種可能:

  1. 自然死亡,,Runnable.run()方法執(zhí)行完后返回,。
  2. 執(zhí)行過程中有未捕獲異常,被拋到了Runnable.run()之外,,導(dǎo)致線程死亡,。
  3. 其宿主死亡,進(jìn)程關(guān)閉或者機(jī)器死機(jī),。在Java中通常是System.exit()方法被調(diào)用
  4. 其他硬件問題,。

  線程池要保證其高可用性,就必須保證線程的可用,。如一個固定容量的線程池,其中一個線程死掉了,,它必須要能監(jiān)控到線程的死亡并生成一個新的線程來代替它,。ThreadPoolExecutor中與線程相關(guān)的有這樣幾個概念:

  1. java.util.concurrent.ThreadFactory,在Executors中有兩種ThreadFactory,,但其提供的線程池只使用了一種java.util.concurrent.Executors.DefaultThreadFactory,,它是簡單的使用ThreadGroup來實(shí)現(xiàn)。
  2. java.lang.ThreadGroup,,從Java1開始就存在的類,,用來建立一個線程的樹形結(jié)構(gòu),可以用它來組織線程間的關(guān)系,,但其并沒有對其包含的子線程的監(jiān)控,。
  3. java.util.concurrent.ThreadPoolExecutor.Worker,ThreadPoolExecutor對線程的封裝,,其中還包含了一些統(tǒng)計功能,。
ThreadPoolExecutor中如何保障線程的可用

  在ThreadPoolExecutor中使用了一個很巧妙的方法實(shí)現(xiàn)了對線程池中線程健康狀況的監(jiān)控,代碼2是從ThreadPoolExecutor類源碼中截取的一段代碼,,它們在一起說明了其對線程的監(jiān)控,。

  可以看到,在ThreadPoolExecutor中的線程被封裝成一個對象Worker,,而將其中的run()代理到ThreadPoolExecutor中的runWorker(),,在runWorker()方法中是一個獲取任務(wù)并執(zhí)行的死循環(huán),。如果任務(wù)的運(yùn)行出了什么問題(如拋出未捕獲異常),processWorkerExit()方法會被執(zhí)行,,同時傳入的completedAbruptly參數(shù)為true,,會重新添加一個初始任務(wù)為null的Worker,并隨之啟動一個新的線程,。

//代碼2//ThreadPoolExecutor的動態(tài)內(nèi)部類privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{ /** 對象中封裝的線程 */final Thread thread; /** 第一個要運(yùn)行的任務(wù),,可能為null. */ Runnable firstTask; /** 任務(wù)計數(shù)器 */volatilelong completedTasks; //省略其他代碼 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */publicvoidrun(){ runWorker(this); } }finalvoidrunWorker(Worker w){ Thread wt = Thread.currentThread(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); try { beforeExecute(wt, task); try { task.run(); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly){ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary.if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startablethrownew IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 五、回到我的問題

  由于各種各樣的原因,,我們并沒有使用數(shù)據(jù)庫自帶的主從機(jī)制來做數(shù)據(jù)的復(fù)制,,而是將主庫的所有DML語句作為消息發(fā)送到讀庫(DTS),同時自己實(shí)現(xiàn)了數(shù)據(jù)的重放,。第一版的數(shù)據(jù)同步服務(wù)十分簡單,,對于主庫的DML消息處理和消費(fèi)(寫入讀庫)都是在一個線程內(nèi)完成的.這么實(shí)現(xiàn)的優(yōu)點(diǎn)是簡單,但缺點(diǎn)是直接導(dǎo)致了表與表之間的數(shù)據(jù)同步會受到影響,,如果有一個表A忽然來了很多的消息(往往是批量修改數(shù)據(jù)造成的),,則會占住消息處理通道,影響其他業(yè)務(wù)數(shù)據(jù)的及時同步,,同時單線程寫庫吞吐太小,。

  上文說到,首先想到的是使用線程池來做消息的消費(fèi),,但是不能直接套用上邊說的Executor框架,,由于以下幾個原因:

  1. ThreadPoolExecutor中默認(rèn)所有的任務(wù)之間是不互相影響的,然而對于數(shù)據(jù)庫的DML來說,,消息的順序不能被打亂,,至少單表的消息順序必須有序,不然會影響最終的數(shù)據(jù)一致,。
  2. ThreadPoolExecutor中所有的線程共享一個等待隊(duì)列,,然而為了防止表與表之間的影響,每個線程應(yīng)該有自己的任務(wù)等待隊(duì)列,。
  3. 寫庫操作的吞吐直接受到提交事務(wù)數(shù)的影響,,所以此多線程框架要可以支持任務(wù)的合并。

  重復(fù)造輪子是沒有意義的,,但是在我們這種場景下JDK中現(xiàn)有的Executor框架不符合要求,,只能自己造輪子。

我的實(shí)現(xiàn)

  首先把線程抽象成「DML語句的執(zhí)行器(Executor)」,。其中包含了一個Thread的實(shí)例,,維護(hù)了自己的等待隊(duì)列(限定容量的阻塞隊(duì)列),和對應(yīng)的消息執(zhí)行邏輯,。

  除此之外還包含了一些簡單的統(tǒng)計,、線程健康監(jiān)控,、合并事務(wù)等處理。

  Executor的對象實(shí)現(xiàn)了Thread.UncaughtExceptionHandler接口,,并綁定到其工作線程上,。同時ExecutorGroup也會再生成一個守護(hù)線程專門來守護(hù)池內(nèi)所有線程,作為額外的保險措施,。

  把線程池的概念抽象成執(zhí)行器組(ExecutorGroup),,其中維護(hù)了執(zhí)行器的數(shù)組,并維護(hù)了目標(biāo)表到特定執(zhí)行器的映射關(guān)系,,并對外提供執(zhí)行消息的接口,,其主要代碼如下:

//代碼3publicclassExecutorGroup { Executor[] group = new Executor[NUM]; Thread boss = null; Map<String, Integer> registeredTables = new HashMap<>(32);// AtomicInteger cursor = new AtomicInteger();volatileint cursor = 0; publicExecutorGroup(String name) { //init groupfor(int i = 0; i < NUM; i++) { logger.debug("啟動線程{},{}", name, i); group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS); } startDaemonBoss(String.format("sync-executor-%s-boss", name)); } //額外的保險privatevoidstartDaemonBoss(String name) { if (boss != null) { boss.interrupt(); } boss = new Thread(() -> { while(true) { //休息一分鐘。,。,。if (this.group != null) { for (int i = 0; i < group.length; i++) { Executor executor = group[i]; if (executor != null) { executor.checkThread(); } } } } }); boss.setName(name); boss.setDaemon(true); boss.start(); } publicvoidexecute(Message message){ logger.debug("執(zhí)行消息"); //省略消息合法性驗(yàn)證if (!registeredTables.containsKey(taskKey)) { //已注冊// registeredTables.put(taskKey, cursor.getAndIncrement()); registeredTables.put(taskKey, cursor++ % NUM); } int index = registeredTables.get(taskKey); logger.debug("執(zhí)行消息{},注冊索引{}", taskKey, index); try { group[index].schedule(message); } catch (InterruptedException e) { logger.error("準(zhǔn)備消息出錯", e); } } }

  完成后整體的線程模型如下圖所示:

  新的線程模型

Java1.7新加入的TransferQueue

  Java1.7中提供了新的隊(duì)列類型TransferQueue,但只提供了一個它的實(shí)現(xiàn)java.util.concurrent.LinkedTransferQueue<E>,,它有更好的性能表現(xiàn),,可它是一個無容量限制的隊(duì)列,而在我們的這個場景下必須要限制隊(duì)列的容量,,所以要自己實(shí)現(xiàn)一個有容量限制的隊(duì)列,。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點(diǎn)擊一鍵舉報,。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多