四、如果線程死掉了怎么辦 幾乎所有Executors中生成線程池的方法的注釋上,,都有代表相同意思的一句話,,表示如果線程池中的某個線程死掉了,線程池會生成一個新的線程代替它,。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的注釋,。 線程死亡的原因 我們都知道守護(hù)線程(daemon)會在所有的非守護(hù)線程都死掉之后也死掉,除此之外導(dǎo)致一個非守護(hù)線程死掉有以下幾種可能:
線程池要保證其高可用性,就必須保證線程的可用,。如一個固定容量的線程池,其中一個線程死掉了,,它必須要能監(jiān)控到線程的死亡并生成一個新的線程來代替它,。ThreadPoolExecutor中與線程相關(guān)的有這樣幾個概念:
在ThreadPoolExecutor中使用了一個很巧妙的方法實(shí)現(xiàn)了對線程池中線程健康狀況的監(jiān)控,代碼2是從ThreadPoolExecutor類源碼中截取的一段代碼,,它們在一起說明了其對線程的監(jiān)控,。 可以看到,在ThreadPoolExecutor中的線程被封裝成一個對象Worker,,而將其中的run()代理到ThreadPoolExecutor中的runWorker(),,在runWorker()方法中是一個獲取任務(wù)并執(zhí)行的死循環(huán),。如果任務(wù)的運(yùn)行出了什么問題(如拋出未捕獲異常),processWorkerExit()方法會被執(zhí)行,,同時傳入的completedAbruptly參數(shù)為true,,會重新添加一個初始任務(wù)為null的Worker,并隨之啟動一個新的線程,。 //代碼2//ThreadPoolExecutor的動態(tài)內(nèi)部類privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{ /** 對象中封裝的線程 */final Thread thread; /** 第一個要運(yùn)行的任務(wù),,可能為null. */ Runnable firstTask; /** 任務(wù)計數(shù)器 */volatilelong completedTasks; //省略其他代碼 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */publicvoidrun(){ runWorker(this); } }finalvoidrunWorker(Worker w){ Thread wt = Thread.currentThread(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); try { beforeExecute(wt, task); try { task.run(); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly){ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary.if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startablethrownew IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 五、回到我的問題由于各種各樣的原因,,我們并沒有使用數(shù)據(jù)庫自帶的主從機(jī)制來做數(shù)據(jù)的復(fù)制,,而是將主庫的所有DML語句作為消息發(fā)送到讀庫(DTS),同時自己實(shí)現(xiàn)了數(shù)據(jù)的重放,。第一版的數(shù)據(jù)同步服務(wù)十分簡單,,對于主庫的DML消息處理和消費(fèi)(寫入讀庫)都是在一個線程內(nèi)完成的.這么實(shí)現(xiàn)的優(yōu)點(diǎn)是簡單,但缺點(diǎn)是直接導(dǎo)致了表與表之間的數(shù)據(jù)同步會受到影響,,如果有一個表A忽然來了很多的消息(往往是批量修改數(shù)據(jù)造成的),,則會占住消息處理通道,影響其他業(yè)務(wù)數(shù)據(jù)的及時同步,,同時單線程寫庫吞吐太小,。 上文說到,首先想到的是使用線程池來做消息的消費(fèi),,但是不能直接套用上邊說的Executor框架,,由于以下幾個原因:
重復(fù)造輪子是沒有意義的,,但是在我們這種場景下JDK中現(xiàn)有的Executor框架不符合要求,,只能自己造輪子。 我的實(shí)現(xiàn)首先把線程抽象成「DML語句的執(zhí)行器(Executor)」,。其中包含了一個Thread的實(shí)例,,維護(hù)了自己的等待隊(duì)列(限定容量的阻塞隊(duì)列),和對應(yīng)的消息執(zhí)行邏輯,。
把線程池的概念抽象成執(zhí)行器組(ExecutorGroup),,其中維護(hù)了執(zhí)行器的數(shù)組,并維護(hù)了目標(biāo)表到特定執(zhí)行器的映射關(guān)系,,并對外提供執(zhí)行消息的接口,,其主要代碼如下: //代碼3publicclassExecutorGroup { Executor[] group = new Executor[NUM]; Thread boss = null; Map<String, Integer> registeredTables = new HashMap<>(32);// AtomicInteger cursor = new AtomicInteger();volatileint cursor = 0; publicExecutorGroup(String name) { //init groupfor(int i = 0; i < NUM; i++) { logger.debug("啟動線程{},{}", name, i); group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS); } startDaemonBoss(String.format("sync-executor-%s-boss", name)); } //額外的保險privatevoidstartDaemonBoss(String name) { if (boss != null) { boss.interrupt(); } boss = new Thread(() -> { while(true) { //休息一分鐘。,。,。if (this.group != null) { for (int i = 0; i < group.length; i++) { Executor executor = group[i]; if (executor != null) { executor.checkThread(); } } } } }); boss.setName(name); boss.setDaemon(true); boss.start(); } publicvoidexecute(Message message){ logger.debug("執(zhí)行消息"); //省略消息合法性驗(yàn)證if (!registeredTables.containsKey(taskKey)) { //已注冊// registeredTables.put(taskKey, cursor.getAndIncrement()); registeredTables.put(taskKey, cursor++ % NUM); } int index = registeredTables.get(taskKey); logger.debug("執(zhí)行消息{},注冊索引{}", taskKey, index); try { group[index].schedule(message); } catch (InterruptedException e) { logger.error("準(zhǔn)備消息出錯", e); } } }完成后整體的線程模型如下圖所示: 新的線程模型 Java1.7新加入的TransferQueueJava1.7中提供了新的隊(duì)列類型TransferQueue,但只提供了一個它的實(shí)現(xiàn)java.util.concurrent.LinkedTransferQueue<E>,,它有更好的性能表現(xiàn),,可它是一個無容量限制的隊(duì)列,而在我們的這個場景下必須要限制隊(duì)列的容量,,所以要自己實(shí)現(xiàn)一個有容量限制的隊(duì)列,。 |
|