寫(xiě)在前面
祝大家兒童節(jié)快樂(lè)??,,保持童心,這篇文章作為兒童節(jié)禮物??送給大家,。進(jìn)入源碼階段了,寫(xiě)了十幾篇的 并發(fā)系列 知識(shí)鋪墊終于要派上用場(chǎng)了,。相信很多人已經(jīng)忘了其中的一些理論知識(shí),,別擔(dān)心,我會(huì)在源碼環(huán)節(jié)帶入相應(yīng)的理論知識(shí)點(diǎn)幫助大家回憶,,做到理論與實(shí)踐相結(jié)合,,另外這是超長(zhǎng)圖文,建議收藏,,如果對(duì)你有用還請(qǐng)點(diǎn)贊讓更多人看到
Java SDK 為什么要設(shè)計(jì) Lock
曾幾何時(shí)幻想過(guò),,如果 Java 并發(fā)控制只有 synchronized 多好,只有下面三種使用方式,,簡(jiǎn)單方便
public class ThreeSync {
private static final Object object = new Object();
public synchronized void normalSyncMethod(){
//臨界區(qū)
}
public static synchronized void staticSyncMethod(){
//臨界區(qū)
}
public void syncBlockMethod(){
synchronized (object){
//臨界區(qū)
}
}
}
如果在 Java 1.5之前,,確實(shí)是這樣,自從 1.5 版本 Doug Lea 大師就重新造了一個(gè)輪子 Lock
我們常說(shuō):“避免重復(fù)造輪子”,,如果有了輪子還是要堅(jiān)持再造個(gè)輪子,,那么肯定傳統(tǒng)的輪子在某些應(yīng)用場(chǎng)景中不能很好的解決問(wèn)題
不知你是否還記得 Coffman 總結(jié)的四個(gè)可以發(fā)生死鎖的情形 ,其中【不可剝奪條件】是指:
線程已經(jīng)獲得資源,在未使用完之前,,不能被剝奪,,只能在使用完時(shí)自己釋放
要想破壞這個(gè)條件,就需要具有申請(qǐng)不到進(jìn)一步資源就釋放已有資源的能力
很顯然,,這個(gè)能力是 synchronized 不具備的,,使用 synchronized ,如果線程申請(qǐng)不到資源就會(huì)進(jìn)入阻塞狀態(tài),,我們做什么也改變不了它的狀態(tài),,這是 synchronized 輪子的致命弱點(diǎn),這就強(qiáng)有力的給了重造輪子 Lock 的理由
顯式鎖 Lock
舊輪子有弱點(diǎn),,新輪子就要解決這些問(wèn)題,,所以要具備不會(huì)阻塞的功能,下面的三個(gè)方案都是解決這個(gè)問(wèn)題的好辦法(看下面表格描述你就明白三個(gè)方案的含義了)
特性 | 描述 | API |
---|
能響應(yīng)中斷 | 如果不能自己釋放,,那可以響應(yīng)中斷也是很好的,。Java多線程中斷機(jī)制 專門(mén)描述了中斷過(guò)程,目的是通過(guò)中斷信號(hào)來(lái)跳出某種狀態(tài),,比如阻塞 | lockInterruptbly() |
非阻塞式的獲取鎖 | 嘗試獲取,,獲取不到不會(huì)阻塞,直接返回 | tryLock() |
支持超時(shí) | 給定一個(gè)時(shí)間限制,,如果一段時(shí)間內(nèi)沒(méi)獲取到,,不是進(jìn)入阻塞狀態(tài),同樣直接返回 | tryLock(long time, timeUnit) |
好的方案有了,,但魚(yú)和熊掌不可兼得,,Lock 多了 synchronized 不具備的特性,自然不會(huì)像 synchronized 那樣一個(gè)關(guān)鍵字三個(gè)玩法走遍全天下,,在使用上也相對(duì)復(fù)雜了一丟丟
Lock 使用范式
synchronized 有標(biāo)準(zhǔn)用法,,這樣的優(yōu)良傳統(tǒng)咱 Lock 也得有,相信很多人都知道使用 Lock 的一個(gè)范式
Lock lock = new ReentrantLock();
lock.lock();
try{
...
}finally{
lock.unlock();
}
既然是范式(沒(méi)事不要挑戰(zhàn)更改寫(xiě)法的那種),,肯定有其理由,,我們來(lái)看一下
標(biāo)準(zhǔn)1—finally 中釋放鎖
這個(gè)大家應(yīng)該都會(huì)明白,在 finally 中釋放鎖,,目的是保證在獲取到鎖之后,,最終能被釋放
標(biāo)準(zhǔn)2—在 try{} 外面獲取鎖
不知道你有沒(méi)有想過(guò),為什么會(huì)有標(biāo)準(zhǔn) 2 的存在,,我們通常是“喜歡” try 住所有內(nèi)容,,生怕發(fā)生異常不能捕獲的
在 try{}
外獲取鎖主要考慮兩個(gè)方面:
- 如果沒(méi)有獲取到鎖就拋出異常,最終釋放鎖肯定是有問(wèn)題的,,因?yàn)檫€未曾擁有鎖談何釋放鎖呢
- 如果在獲取鎖時(shí)拋出了異常,,也就是當(dāng)前線程并未獲取到鎖,,但執(zhí)行到 finally 代碼時(shí),如果恰巧別的線程獲取到了鎖,,則會(huì)被釋放掉(無(wú)故釋放)
不同鎖的實(shí)現(xiàn)方式略有不同,,范式的存在就是要避免一切問(wèn)題的出現(xiàn),所以大家盡量遵守范式
Lock 是怎樣起到鎖的作用呢,?
如果你熟悉 synchronized,,你知道程序編譯成 CPU 指令后,在臨界區(qū)會(huì)有 moniterenter
和 moniterexit
指令的出現(xiàn),,可以理解成進(jìn)出臨界區(qū)的標(biāo)識(shí)
從范式上來(lái)看:
lock.lock()
獲取鎖,,“等同于” synchronized 的 moniterenter指令
lock.unlock()
釋放鎖,“等同于” synchronized 的 moniterexit 指令
那 Lock 是怎么做到的呢,?
這里先簡(jiǎn)單說(shuō)明一下,,這樣一會(huì)到源碼分析時(shí),,你可以遠(yuǎn)觀設(shè)計(jì)輪廓,,近觀實(shí)現(xiàn)細(xì)節(jié),會(huì)變得越發(fā)輕松
其實(shí)很簡(jiǎn)單,,比如在 ReentrantLock 內(nèi)部維護(hù)了一個(gè) volatile 修飾的變量 state,,通過(guò) CAS 來(lái)進(jìn)行讀寫(xiě)(最底層還是交給硬件來(lái)保證原子性和可見(jiàn)性),如果CAS更改成功,,即獲取到鎖,,線程進(jìn)入到 try 代碼塊繼續(xù)執(zhí)行;如果沒(méi)有更改成功,,線程會(huì)被【掛起】,,不會(huì)向下執(zhí)行
但 Lock 是一個(gè)接口,里面根本沒(méi)有 state 這個(gè)變量的存在:
它怎么處理這個(gè) state 呢,?很顯然需要一點(diǎn)設(shè)計(jì)的加成了,,接口定義行為,具體都是需要實(shí)現(xiàn)類(lèi)的
Lock 接口的實(shí)現(xiàn)類(lèi)基本都是通過(guò)【聚合】了一個(gè)【隊(duì)列同步器】的子類(lèi)完成線程訪問(wèn)控制的
那什么是隊(duì)列同步器呢,?(這應(yīng)該是你見(jiàn)過(guò)的最強(qiáng)標(biāo)題黨,,聊了半個(gè)世紀(jì)才入正題,評(píng)論區(qū)留言罵我)
隊(duì)列同步器 AQS
隊(duì)列同步器 (AbstractQueuedSynchronizer),,簡(jiǎn)稱同步器或AQS,,就是我們今天的主人公
**問(wèn):**為什么你分析 JUC 源碼,要從 AQS 說(shuō)起呢,?
**答:**看下圖
相信看到這個(gè)截圖你就明白一二了,,你聽(tīng)過(guò)的,面試常被問(wèn)起的,,工作中常用的
ThreadPoolExecutor
(關(guān)于線程池的理解,,可以查看 為什么要使用線程池? )
都和 AQS 有直接關(guān)系,所以了解 AQS 的抽象實(shí)現(xiàn),在此基礎(chǔ)上再稍稍查看上述各類(lèi)的實(shí)現(xiàn)細(xì)節(jié),,很快就可以全部搞定,,不至于查看源碼時(shí)一頭霧水,丟失主線
上面提到,,在鎖的實(shí)現(xiàn)類(lèi)中會(huì)聚合同步器,,然后利同步器實(shí)現(xiàn)鎖的語(yǔ)義,那么問(wèn)題來(lái)了:
為什么要用聚合模式,,怎么進(jìn)一步理解鎖和同步器的關(guān)系呢,?
我們絕大多數(shù)都是在使用鎖,實(shí)現(xiàn)鎖之后,,其核心就是要使用方便
從 AQS 的類(lèi)名稱和修飾上來(lái)看,,這是一個(gè)抽象類(lèi),所以從設(shè)計(jì)模式的角度來(lái)看同步器一定是基于【模版模式】來(lái)設(shè)計(jì)的,,使用者需要繼承同步器,,實(shí)現(xiàn)自定義同步器,并重寫(xiě)指定方法,,隨后將同步器組合在自定義的同步組件中,,并調(diào)用同步器的模版方法,而這些模版方法又回調(diào)用使用者重寫(xiě)的方法
我不想將上面的解釋說(shuō)的這么抽象,,其實(shí)想理解上面這句話,,我們只需要知道下面兩個(gè)問(wèn)題就好了
同步器可重寫(xiě)的方法
同步器提供的可重寫(xiě)方法只有5個(gè),,這大大方便了鎖的使用者:
按理說(shuō),需要重寫(xiě)的方法也應(yīng)該有 abstract 來(lái)修飾的,,為什么這里沒(méi)有,?原因其實(shí)很簡(jiǎn)單,上面的方法我已經(jīng)用顏色區(qū)分成了兩類(lèi):
自定義的同步組件或者鎖不可能既是獨(dú)占式又是共享式,,為了避免強(qiáng)制重寫(xiě)不相干方法,,所以就沒(méi)有 abstract 來(lái)修飾了,但要拋出異常告知不能直接使用該方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
暖暖的很貼心(如果你有類(lèi)似的需求也可以仿照這樣的設(shè)計(jì))
表格方法描述中所說(shuō)的同步狀態(tài)
就是上文提到的有 volatile 修飾的 state,,所以我們?cè)?code>重寫(xiě)上面幾個(gè)方法時(shí),,還要通過(guò)同步器提供的下面三個(gè)方法(AQS 提供的)來(lái)獲取或修改同步狀態(tài):
而獨(dú)占式和共享式操作 state 變量的區(qū)別也就很簡(jiǎn)單了
所以你看到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信號(hào)量)
CountDownLatch
這幾個(gè)類(lèi)其實(shí)僅僅是在實(shí)現(xiàn)以上幾個(gè)方法上略有差別,其他的實(shí)現(xiàn)都是通過(guò)同步器的模版方法來(lái)實(shí)現(xiàn)的,,到這里是不是心情放松了許多呢,?我們來(lái)看一看模版方法:
同步器提供的模版方法
上面我們將同步器的實(shí)現(xiàn)方法分為獨(dú)占式和共享式兩類(lèi),模版方法其實(shí)除了提供以上兩類(lèi)模版方法之外,,只是多了響應(yīng)中斷
和超時(shí)限制
的模版方法供 Lock 使用,,來(lái)看一下
先不用記上述方法的功能,,目前你只需要了解個(gè)大概功能就好。另外,,相信你也注意到了:
上面的方法都有 final 關(guān)鍵字修飾,,說(shuō)明子類(lèi)不能重寫(xiě)這個(gè)方法
看到這你也許有點(diǎn)亂了,我們稍微歸納一下:
程序員還是看代碼心里踏實(shí)一點(diǎn),,我們?cè)賮?lái)用代碼說(shuō)明一下上面的關(guān)系(注意代碼中的注釋,,以下的代碼并不是很?chē)?yán)謹(jǐn),只是為了簡(jiǎn)單說(shuō)明上圖的代碼實(shí)現(xiàn)):
package top.dayarch.myjuc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自定義互斥鎖
*
* @author tanrgyb
* @date 2020/5/23 9:33 PM
*/
public class MyMutex implements Lock {
// 靜態(tài)內(nèi)部類(lèi)-自定義同步器
private static class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
// 調(diào)用AQS提供的方法,,通過(guò)CAS保證原子性
if (compareAndSetState(0, arg)){
// 我們實(shí)現(xiàn)的是互斥鎖,,所以標(biāo)記獲取到同步狀態(tài)(更新state成功)的線程,
// 主要為了判斷是否可重入(一會(huì)兒會(huì)說(shuō)明)
setExclusiveOwnerThread(Thread.currentThread());
//獲取同步狀態(tài)成功,,返回 true
return true;
}
// 獲取同步狀態(tài)失敗,,返回 false
return false;
}
@Override
protected boolean tryRelease(int arg) {
// 未擁有鎖卻讓釋放,會(huì)拋出IMSE
if (getState() == 0){
throw new IllegalMonitorStateException();
}
// 可以釋放,,清空排它線程標(biāo)記
setExclusiveOwnerThread(null);
// 設(shè)置同步狀態(tài)為0,,表示釋放鎖
setState(0);
return true;
}
// 是否獨(dú)占式持有
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 后續(xù)會(huì)用到,主要用于等待/通知機(jī)制,,每個(gè)condition都有一個(gè)與之對(duì)應(yīng)的條件等待隊(duì)列,,在鎖模型中說(shuō)明過(guò)
Condition newCondition() {
return new ConditionObject();
}
}
// 聚合自定義同步器
private final MySync sync = new MySync();
@Override
public void lock() {
// 阻塞式的獲取鎖,調(diào)用同步器模版方法獨(dú)占式,,獲取同步狀態(tài)
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 調(diào)用同步器模版方法可中斷式獲取同步狀態(tài)
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
// 調(diào)用自己重寫(xiě)的方法,非阻塞式的獲取同步狀態(tài)
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 調(diào)用同步器模版方法,,可響應(yīng)中斷和超時(shí)時(shí)間限制
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
// 釋放鎖
sync.release(1);
}
@Override
public Condition newCondition() {
// 使用自定義的條件
return sync.newCondition();
}
}
如果你現(xiàn)在打開(kāi) IDE,, 你會(huì)發(fā)現(xiàn)上文提到的 ReentrantLock
ReentrantReadWriteLock
Semaphore(信號(hào)量)
CountDownLatch
都是按照這個(gè)結(jié)構(gòu)實(shí)現(xiàn),所以我們就來(lái)看一看 AQS 的模版方法到底是怎么實(shí)現(xiàn)鎖
AQS實(shí)現(xiàn)分析
從上面的代碼中,,你應(yīng)該理解了lock.tryLock()
非阻塞式獲取鎖就是調(diào)用自定義同步器重寫(xiě)的 tryAcquire()
方法,,通過(guò) CAS 設(shè)置state 狀態(tài),不管成功與否都會(huì)馬上返回,;那么 lock.lock() 這種阻塞式的鎖是如何實(shí)現(xiàn)的呢,?
有阻塞就需要排隊(duì),實(shí)現(xiàn)排隊(duì)必然需要隊(duì)列
CLH:Craig,、Landin and Hagersten 隊(duì)列,,是一個(gè)單向鏈表,AQS中的隊(duì)列是CLH變體的虛擬雙向隊(duì)列(FIFO)——概念了解就好,,不要記
隊(duì)列中每個(gè)排隊(duì)的個(gè)體就是一個(gè) Node,,所以我們來(lái)看一下 Node 的結(jié)構(gòu)
Node 節(jié)點(diǎn)
AQS 內(nèi)部維護(hù)了一個(gè)同步隊(duì)列,用于管理同步狀態(tài),。
- 當(dāng)線程獲取同步狀態(tài)失敗時(shí),,就會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成一個(gè) Node 節(jié)點(diǎn),,將其加入到同步隊(duì)列中尾部,阻塞該線程
- 當(dāng)同步狀態(tài)被釋放時(shí),,會(huì)喚醒同步隊(duì)列中“首節(jié)點(diǎn)”的線程獲取同步狀態(tài)
為了將上述步驟弄清楚,,我們需要來(lái)看一看 Node 結(jié)構(gòu) (如果你能打開(kāi) IDE 一起看那是極好的)
乍一看有點(diǎn)雜亂,我們還是將其歸類(lèi)說(shuō)明一下:
上面這幾個(gè)狀態(tài)說(shuō)明有個(gè)印象就好,,有了Node 的結(jié)構(gòu)說(shuō)明鋪墊,,你也就能想象同步隊(duì)列的接本結(jié)構(gòu)了:
前置知識(shí)基本鋪墊完畢,我們來(lái)看一看獨(dú)占式獲取同步狀態(tài)的整個(gè)過(guò)程
獨(dú)占式獲取同步狀態(tài)
故事要從范式lock.lock() 開(kāi)始
public void lock() {
// 阻塞式的獲取鎖,,調(diào)用同步器模版方法,,獲取同步狀態(tài)
sync.acquire(1);
}
進(jìn)入AQS的模版方法 acquire()
public final void acquire(int arg) {
// 調(diào)用自定義同步器重寫(xiě)的 tryAcquire 方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先,也會(huì)嘗試非阻塞的獲取同步狀態(tài),,如果獲取失?。╰ryAcquire返回false),則會(huì)調(diào)用 addWaiter
方法構(gòu)造 Node 節(jié)點(diǎn)(Node.EXCLUSIVE 獨(dú)占式)并安全的(CAS)加入到同步隊(duì)列【尾部】
private Node addWaiter(Node mode) {
// 構(gòu)造Node節(jié)點(diǎn),,包含當(dāng)前線程信息以及節(jié)點(diǎn)模式【獨(dú)占/共享】
Node node = new Node(Thread.currentThread(), mode);
// 新建變量 pred 將指針指向tail指向的節(jié)點(diǎn)
Node pred = tail;
// 如果尾節(jié)點(diǎn)不為空
if (pred != null) {
// 新加入的節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向尾節(jié)點(diǎn)
node.prev = pred;
// 因?yàn)槿绻鄠€(gè)線程同時(shí)獲取同步狀態(tài)失敗都會(huì)執(zhí)行這段代碼
// 所以,,通過(guò) CAS 方式確保安全的設(shè)置當(dāng)前節(jié)點(diǎn)為最新的尾節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
// 曾經(jīng)的尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)
pred.next = node;
// 返回新構(gòu)建的節(jié)點(diǎn)
return node;
}
}
// 尾節(jié)點(diǎn)為空,說(shuō)明當(dāng)前節(jié)點(diǎn)是第一個(gè)被加入到同步隊(duì)列中的節(jié)點(diǎn)
// 需要一個(gè)入隊(duì)操作
enq(node);
return node;
}
private Node enq(final Node node) {
// 通過(guò)“死循環(huán)”確保節(jié)點(diǎn)被正確添加,,最終將其設(shè)置為尾節(jié)點(diǎn)之后才會(huì)返回,,這里使用 CAS 的理由和上面一樣
for (;;) {
Node t = tail;
// 第一次循環(huán),如果尾節(jié)點(diǎn)為 null
if (t == null) { // Must initialize
// 構(gòu)建一個(gè)哨兵節(jié)點(diǎn),,并將頭部指針指向它
if (compareAndSetHead(new Node()))
// 尾部指針同樣指向哨兵節(jié)點(diǎn)
tail = head;
} else {
// 第二次循環(huán),,將新節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)指向t
node.prev = t;
// 將新節(jié)點(diǎn)加入到隊(duì)列尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
// 前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向當(dāng)前新節(jié)點(diǎn),完成雙向隊(duì)列
t.next = node;
return t;
}
}
}
}
你可能比較迷惑 enq() 的處理方式,,進(jìn)入該方法就是一個(gè)“死循環(huán)”,,我們就用圖來(lái)描述它是怎樣跳出循環(huán)的
有些同學(xué)可能會(huì)有疑問(wèn),為什么會(huì)有哨兵節(jié)點(diǎn),?
哨兵,,顧名思義,是用來(lái)解決國(guó)家之間邊界問(wèn)題的,,不直接參與生產(chǎn)活動(dòng),。同樣,計(jì)算機(jī)科學(xué)中提到的哨兵,,也用來(lái)解決邊界問(wèn)題,,如果沒(méi)有邊界,指定環(huán)節(jié),,按照同樣算法可能會(huì)在邊界處發(fā)生異常,,比如要繼續(xù)向下分析的 acquireQueued()
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// '死循環(huán)',嘗試獲取鎖,,或者掛起
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 只有當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),,才會(huì)嘗試獲取鎖
// 看到這你應(yīng)該理解添加哨兵節(jié)點(diǎn)的含義了吧
if (p == head && tryAcquire(arg)) {
// 獲取同步狀態(tài)成功,,將自己設(shè)置為頭
setHead(node);
// 將哨兵節(jié)點(diǎn)的后繼節(jié)點(diǎn)置為空,方便GC
p.next = null; // help GC
failed = false;
// 返回中斷標(biāo)識(shí)
return interrupted;
}
// 當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)
//【或者】當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)但獲取同步狀態(tài)失敗
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
獲取同步狀態(tài)成功會(huì)返回可以理解了,,但是如果失敗就會(huì)一直陷入到“死循環(huán)”中浪費(fèi)資源嗎,?很顯然不是,shouldParkAfterFailedAcquire(p, node)
和 parkAndCheckInterrupt()
就會(huì)將線程獲取同步狀態(tài)失敗的線程掛起,,我們繼續(xù)向下看
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
// 如果是 SIGNAL 狀態(tài),,即等待被占用的資源釋放,直接返回 true
// 準(zhǔn)備繼續(xù)調(diào)用 parkAndCheckInterrupt 方法
if (ws == Node.SIGNAL)
return true;
// ws 大于0說(shuō)明是CANCELLED狀態(tài),,
if (ws > 0) {
// 循環(huán)判斷前驅(qū)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否也為CANCELLED狀態(tài),,忽略該狀態(tài)的節(jié)點(diǎn),重新連接隊(duì)列
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)設(shè)置為設(shè)置為 SIGNAL 狀態(tài),,用于后續(xù)喚醒操作
// 程序第一次執(zhí)行到這返回為false,,還會(huì)進(jìn)行外層第二次循環(huán),最終從代碼第7行返回
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
到這里你也許有個(gè)問(wèn)題:
這個(gè)地方設(shè)置前驅(qū)節(jié)點(diǎn)為 SIGNAL 狀態(tài)到底有什么作用,?
保留這個(gè)問(wèn)題,,我們陸續(xù)揭曉
如果前驅(qū)節(jié)點(diǎn)的 waitStatus 是 SIGNAL狀態(tài),即 shouldParkAfterFailedAcquire 方法會(huì)返回 true ,,程序會(huì)繼續(xù)向下執(zhí)行 parkAndCheckInterrupt
方法,,用于將當(dāng)前線程掛起
private final boolean parkAndCheckInterrupt() {
// 線程掛起,程序不會(huì)繼續(xù)向下執(zhí)行
LockSupport.park(this);
// 根據(jù) park 方法 API描述,,程序在下述三種情況會(huì)繼續(xù)向下執(zhí)行
// 1. 被 unpark
// 2. 被中斷(interrupt)
// 3. 其他不合邏輯的返回才會(huì)繼續(xù)向下執(zhí)行
// 因上述三種情況程序執(zhí)行至此,,返回當(dāng)前線程的中斷狀態(tài),并清空中斷狀態(tài)
// 如果由于被中斷,,該方法會(huì)返回 true
return Thread.interrupted();
}
被喚醒的程序會(huì)繼續(xù)執(zhí)行 acquireQueued
方法里的循環(huán),,如果獲取同步狀態(tài)成功,則會(huì)返回 interrupted = true
的結(jié)果
程序繼續(xù)向調(diào)用棧上層返回,,最終回到 AQS 的模版方法 acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
你也許會(huì)有疑惑:
程序已經(jīng)成功獲取到同步狀態(tài)并返回了,怎么會(huì)有個(gè)自我中斷呢,?
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
如果你不能理解中斷,,強(qiáng)烈建議你回看 Java多線程中斷機(jī)制
到這里關(guān)于獲取同步狀態(tài)我們還遺漏了一條線,acquireQueued 的 finally 代碼塊如果你仔細(xì)看你也許馬上就會(huì)有疑惑:
到底什么情況才會(huì)執(zhí)行 if(failed) 里面的代碼 ,?
if (failed)
cancelAcquire(node);
這段代碼被執(zhí)行的條件是 failed 為 true,,正常情況下,如果跳出循環(huán),,failed 的值為false,,如果不能跳出循環(huán)貌似怎么也不能執(zhí)行到這里,所以只有不正常的情況才會(huì)執(zhí)行到這里,,也就是會(huì)發(fā)生異常,,才會(huì)執(zhí)行到此處
查看 try 代碼塊,,只有兩個(gè)方法會(huì)拋出異常:
自己重寫(xiě)的 tryAcquire()
方法
先看前者:
很顯然,這里拋出的異常不是重點(diǎn),,那就以 ReentrantLock 重寫(xiě)的 tryAcquire() 方法為例
另外,,上面分析 shouldParkAfterFailedAcquire
方法還對(duì) CANCELLED 的狀態(tài)進(jìn)行了判斷,那么
什么時(shí)候會(huì)生成取消狀態(tài)的節(jié)點(diǎn)呢,?
答案就在 cancelAcquire
方法中,, 我們來(lái)看看 cancelAcquire到底怎么設(shè)置/處理 CANNELLED 的
private void cancelAcquire(Node node) {
// 忽略無(wú)效節(jié)點(diǎn)
if (node == null)
return;
// 將關(guān)聯(lián)的線程信息清空
node.thread = null;
// 跳過(guò)同樣是取消狀態(tài)的前驅(qū)節(jié)點(diǎn)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 跳出上面循環(huán)后找到前驅(qū)有效節(jié)點(diǎn),并獲取該有效節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Node predNext = pred.next;
// 將當(dāng)前節(jié)點(diǎn)的狀態(tài)置為 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果當(dāng)前節(jié)點(diǎn)處在尾節(jié)點(diǎn),,直接從隊(duì)列中刪除自己就好
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 1. 如果當(dāng)前節(jié)點(diǎn)的有效前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),,也就是說(shuō)當(dāng)前節(jié)點(diǎn)不是頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (pred != head &&
// 2. 判斷當(dāng)前節(jié)點(diǎn)有效前驅(qū)節(jié)點(diǎn)的狀態(tài)是否為 SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL ||
// 3. 如果不是,嘗試將前驅(qū)節(jié)點(diǎn)的狀態(tài)置為 SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 判斷當(dāng)前節(jié)點(diǎn)有效前驅(qū)節(jié)點(diǎn)的線程信息是否為空
pred.thread != null) {
// 上述條件滿足
Node next = node.next;
// 將當(dāng)前節(jié)點(diǎn)有效前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn)指針指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),,或者上述其他條件不滿足,,就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
unparkSuccessor(node);
}
node.next = node; // help GC
}
看到這個(gè)注釋你可能有些亂了,其核心目的就是從等待隊(duì)列中移除 CANCELLED 的節(jié)點(diǎn),,并重新拼接整個(gè)隊(duì)列,,總結(jié)來(lái)看,其實(shí)設(shè)置 CANCELLED 狀態(tài)節(jié)點(diǎn)只是有三種情況,,我們通過(guò)畫(huà)圖來(lái)分析一下:
至此,,獲取同步狀態(tài)的過(guò)程就結(jié)束了,我們簡(jiǎn)單的用流程圖說(shuō)明一下整個(gè)過(guò)程
獲取鎖的過(guò)程就這樣的結(jié)束了,,先暫停幾分鐘整理一下自己的思路,。我們上面還沒(méi)有說(shuō)明 SIGNAL 的作用, SIGNAL 狀態(tài)信號(hào)到底是干什么用的,?這就涉及到鎖的釋放了,,我們來(lái)繼續(xù)了解,整體思路和鎖的獲取是一樣的,, 但是釋放過(guò)程就相對(duì)簡(jiǎn)單很多了
獨(dú)占式釋放同步狀態(tài)
故事要從 unlock() 方法說(shuō)起
public void unlock() {
// 釋放鎖
sync.release(1);
}
調(diào)用 AQS 模版方法 release,,進(jìn)入該方法
public final boolean release(int arg) {
// 調(diào)用自定義同步器重寫(xiě)的 tryRelease 方法嘗試釋放同步狀態(tài)
if (tryRelease(arg)) {
// 釋放成功,獲取頭節(jié)點(diǎn)
Node h = head;
// 存在頭節(jié)點(diǎn),,并且waitStatus不是初始狀態(tài)
// 通過(guò)獲取的過(guò)程我們已經(jīng)分析了,,在獲取的過(guò)程中會(huì)將 waitStatus的值從初始狀態(tài)更新成 SIGNAL 狀態(tài)
if (h != null && h.waitStatus != 0)
// 解除線程掛起狀態(tài)
unparkSuccessor(h);
return true;
}
return false;
}
查看 unparkSuccessor 方法,實(shí)際是要喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
// 獲取頭節(jié)點(diǎn)的waitStatus
int ws = node.waitStatus;
if (ws < 0)
// 清空頭節(jié)點(diǎn)的waitStatus值,,即置為0
compareAndSetWaitStatus(node, ws, 0);
// 獲取頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Node s = node.next;
// 判斷當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)是否是取消狀態(tài),,如果是,需要移除,,重新連接隊(duì)列
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾節(jié)點(diǎn)向前查找,,找到隊(duì)列第一個(gè)waitStatus狀態(tài)小于0的節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果是獨(dú)占式,這里小于0,其實(shí)就是 SIGNAL
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 解除線程掛起狀態(tài)
LockSupport.unpark(s.thread);
}
有同學(xué)可能有疑問(wèn):
為什么這個(gè)地方是從隊(duì)列尾部向前查找不是 CANCELLED 的節(jié)點(diǎn),?
原因有兩個(gè):
第一,,先回看節(jié)點(diǎn)加入隊(duì)列的情景:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
節(jié)點(diǎn)入隊(duì)并不是原子操作,代碼第6,、7行
node.prev = pred;
compareAndSetTail(pred, node)
這兩個(gè)地方可以看作是尾節(jié)點(diǎn)入隊(duì)的原子操作,,如果此時(shí)代碼還沒(méi)執(zhí)行到 pred.next = node; 這時(shí)又恰巧執(zhí)行了unparkSuccessor方法,就沒(méi)辦法從前往后找了,,因?yàn)楹罄^指針還沒(méi)有連接起來(lái),,所以需要從后往前找
第二點(diǎn)原因,在上面圖解產(chǎn)生 CANCELLED 狀態(tài)節(jié)點(diǎn)的時(shí)候,,先斷開(kāi)的是 Next 指針,,Prev指針并未斷開(kāi),因此這也是必須要從后往前遍歷才能夠遍歷完全部的Node
同步狀態(tài)至此就已經(jīng)成功釋放了,,之前獲取同步狀態(tài)被掛起的線程就會(huì)被喚醒,,繼續(xù)從下面代碼第 3 行返回執(zhí)行:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
繼續(xù)返回上層調(diào)用棧, 從下面代碼15行開(kāi)始執(zhí)行,重新執(zhí)行循環(huán),,再次嘗試獲取同步狀態(tài)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
到這里,,關(guān)于獨(dú)占式獲取/釋放鎖的流程已經(jīng)閉環(huán)了,但是關(guān)于 AQS 的另外兩個(gè)模版方法還沒(méi)有介紹
獨(dú)占式響應(yīng)中斷獲取同步狀態(tài)
故事要從lock.lockInterruptibly() 方法說(shuō)起
public void lockInterruptibly() throws InterruptedException {
// 調(diào)用同步器模版方法可中斷式獲取同步狀態(tài)
sync.acquireInterruptibly(1);
}
有了前面的理解,,理解獨(dú)占式可響應(yīng)中斷的獲取同步狀態(tài)方式,,真是一眼就能明白了:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試非阻塞式獲取同步狀態(tài)失敗,如果沒(méi)有獲取到同步狀態(tài),,執(zhí)行代碼7行
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
繼續(xù)查看 doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 獲取中斷信號(hào)后,,不再返回 interrupted = true 的值,而是直接拋出 InterruptedException
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
沒(méi)想到 JDK 內(nèi)部也有如此相近的代碼,,可響應(yīng)中斷獲取鎖沒(méi)什么深?yuàn)W的,,就是被中斷拋出 InterruptedException 異常(代碼第17行),這樣就逐層返回上層調(diào)用棧捕獲該異常進(jìn)行下一步操作了
趁熱打鐵,,來(lái)看看另外一個(gè)模版方法:
獨(dú)占式超時(shí)限制獲取同步狀態(tài)
這個(gè)很好理解,,就是給定一個(gè)時(shí)限,在該時(shí)間段內(nèi)獲取到同步狀態(tài),,就返回 true,, 否則,返回 false,。好比線程給自己定了一個(gè)鬧鐘,鬧鈴一響,,線程就自己返回了,,這就不會(huì)使自己是阻塞狀態(tài)了
既然涉及到超時(shí)限制,其核心邏輯肯定是計(jì)算時(shí)間間隔,,因?yàn)樵诔瑫r(shí)時(shí)間內(nèi),,肯定是多次嘗試獲取鎖的,,每次獲取鎖肯定有時(shí)間消耗,所以計(jì)算時(shí)間間隔的邏輯就像我們?cè)诔绦虼蛴〕绦蚝臅r(shí) log 那么簡(jiǎn)單
nanosTimeout = deadline - System.nanoTime()
故事要從 lock.tryLock(time, unit)
方法說(shuō)起
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 調(diào)用同步器模版方法,,可響應(yīng)中斷和超時(shí)時(shí)間限制
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
來(lái)看 tryAcquireNanos 方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
是不是和上面 acquireInterruptibly
方法長(zhǎng)相很詳細(xì)了,,繼續(xù)查看來(lái) doAcquireNanos 方法,看程序, 該方法也是 throws InterruptedException,,我們?cè)谥袛辔恼轮姓f(shuō)過(guò),,方法標(biāo)記上有 throws InterruptedException
說(shuō)明該方法也是可以響應(yīng)中斷的,所以你可以理解超時(shí)限制是 acquireInterruptibly
方法的加強(qiáng)版,,具有超時(shí)和非阻塞控制的雙保險(xiǎn)
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 超時(shí)時(shí)間內(nèi),,為獲取到同步狀態(tài),直接返回false
if (nanosTimeout <= 0L)
return false;
// 計(jì)算超時(shí)截止時(shí)間
final long deadline = System.nanoTime() + nanosTimeout;
// 以獨(dú)占方式加入到同步隊(duì)列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 計(jì)算新的超時(shí)時(shí)間
nanosTimeout = deadline - System.nanoTime();
// 如果超時(shí),,直接返回 false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 判斷是最新超時(shí)時(shí)間是否大于閾值 1000
nanosTimeout > spinForTimeoutThreshold)
// 掛起線程 nanosTimeout 長(zhǎng)時(shí)間,,時(shí)間到,自動(dòng)返回
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上面的方法應(yīng)該不是很難懂,,但是又同學(xué)可能在第 27 行上有所困惑
為什么 nanosTimeout 和 自旋超時(shí)閾值1000進(jìn)行比較,?
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
其實(shí) doc 說(shuō)的很清楚,說(shuō)白了,,1000 nanoseconds 時(shí)間已經(jīng)非常非常短暫了,,沒(méi)必要再執(zhí)行掛起和喚醒操作了,不如直接當(dāng)前線程直接進(jìn)入下一次循環(huán)
到這里,,我們自定義的 MyMutex 只差 Condition 沒(méi)有說(shuō)明了,,不知道你累了嗎?我還在堅(jiān)持
Condition
如果你看過(guò)之前寫(xiě)的 并發(fā)編程之等待通知機(jī)制 ,,你應(yīng)該對(duì)下面這個(gè)圖是有印象的:
如果當(dāng)時(shí)你理解了這個(gè)模型,,再看 Condition 的實(shí)現(xiàn),根本就不是問(wèn)題了,,首先 Condition 還是一個(gè)接口,,肯定也是需要有實(shí)現(xiàn)類(lèi)的
那故事就從 lock.newnewCondition
說(shuō)起吧
public Condition newCondition() {
// 使用自定義的條件
return sync.newCondition();
}
自定義同步器重封裝了該方法:
Condition newCondition() {
return new ConditionObject();
}
ConditionObject 就是 Condition 的實(shí)現(xiàn)類(lèi),該類(lèi)就定義在了 AQS 中,,只有兩個(gè)成員變量:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
所以,,我們只需要來(lái)看一下 ConditionObject 實(shí)現(xiàn)的 await / signal 方法來(lái)使用這兩個(gè)成員變量就可以了
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 同樣構(gòu)建 Node 節(jié)點(diǎn),并加入到等待隊(duì)列中
Node node = addConditionWaiter();
// 釋放同步狀態(tài)
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 掛起當(dāng)前線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
這里注意用詞,,在介紹獲取同步狀態(tài)時(shí),,addWaiter 是加入到【同步隊(duì)列】,就是上圖說(shuō)的入口等待隊(duì)列,,這里說(shuō)的是【等待隊(duì)列】,,所以 addConditionWaiter 肯定是構(gòu)建了一個(gè)自己的隊(duì)列:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新構(gòu)建的節(jié)點(diǎn)的 waitStatus 是 CONDITION,注意不是 0 或 SIGNAL 了
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 構(gòu)建單向同步隊(duì)列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
這里有朋友可能會(huì)有疑問(wèn):
為什么這里是單向隊(duì)列,也沒(méi)有使用CAS 來(lái)保證加入隊(duì)列的安全性呢,?
因?yàn)?await 是 Lock 范式 try 中使用的,,說(shuō)明已經(jīng)獲取到鎖了,所以就沒(méi)必要使用 CAS 了,,至于是單向,,因?yàn)檫@里還不涉及到競(jìng)爭(zhēng)鎖,只是做一個(gè)條件等待隊(duì)列
在 Lock 中可以定義多個(gè)條件,,每個(gè)條件都會(huì)對(duì)應(yīng)一個(gè) 條件等待隊(duì)列,,所以將上圖豐富說(shuō)明一下就變成了這個(gè)樣子:
線程已經(jīng)按相應(yīng)的條件加入到了條件等待隊(duì)列中,那如何再嘗試獲取鎖呢,?signal / signalAll 方法就已經(jīng)排上用場(chǎng)了
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
Signal 方法通過(guò)調(diào)用 doSignal 方法,,只喚醒條件等待隊(duì)列中的第一個(gè)節(jié)點(diǎn)
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 調(diào)用該方法,將條件等待隊(duì)列的線程節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
繼續(xù)看 transferForSignal
方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 重新進(jìn)行入隊(duì)操作
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒同步隊(duì)列中該線程
LockSupport.unpark(node.thread);
return true;
}
所以我們?cè)儆脠D解一下喚醒的整個(gè)過(guò)程
到這里,,理解 signalAll 就非常簡(jiǎn)單了,,只不過(guò)循環(huán)判斷是否還有 nextWaiter,如果有就像 signal 操作一樣,,將其從條件等待隊(duì)列中移到同步隊(duì)列中
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
不知你還是否記得,,我在并發(fā)編程之等待通知機(jī)制 中還說(shuō)過(guò)一句話
沒(méi)有特殊原因盡量用 signalAll 方法
什么時(shí)候可以用 signal 方法也在其中做了說(shuō)明,請(qǐng)大家自行查看吧
這里我還要多說(shuō)一個(gè)細(xì)節(jié),,從條件等待隊(duì)列移到同步隊(duì)列是有時(shí)間差的,,所以使用 await() 方法也是范式的, 同樣在該文章中做了解釋
有時(shí)間差,,就會(huì)有公平和不公平的問(wèn)題,,想要全面了解這個(gè)問(wèn)題,我們就要走近 ReentrantLock 中來(lái)看了,,除了了解公平/不公平問(wèn)題,,查看 ReentrantLock 的應(yīng)用還是要反過(guò)來(lái)驗(yàn)證它使用的AQS的,我們繼續(xù)吧
ReentrantLock 是如何應(yīng)用的AQS
獨(dú)占式的典型應(yīng)用就是 ReentrantLock 了,,我們來(lái)看看它是如何重寫(xiě)這個(gè)方法的
乍一看挺奇怪的,,怎么里面自定義了三個(gè)同步器:其實(shí) NonfairSync,F(xiàn)airSync 只是對(duì) Sync 做了進(jìn)一步劃分:
從名稱上你應(yīng)該也知道了,,這就是你聽(tīng)到過(guò)的 公平鎖/非公平鎖
了
何為公平鎖/非公平鎖,?
生活中,排隊(duì)講求先來(lái)后到視為公平,。程序中的公平性也是符合請(qǐng)求鎖的絕對(duì)時(shí)間的,,其實(shí)就是 FIFO,否則視為不公平
我們來(lái)對(duì)比一下 ReentrantLock 是如何實(shí)現(xiàn)公平鎖和非公平鎖的
其實(shí)沒(méi)什么大不了,,公平鎖就是判斷同步隊(duì)列是否還有先驅(qū)節(jié)點(diǎn)的存在,,只有沒(méi)有先驅(qū)節(jié)點(diǎn)才能獲取鎖,;而非公平鎖是不管這個(gè)事的,能獲取到同步狀態(tài)就可以,,就這么簡(jiǎn)單,那問(wèn)題來(lái)了:
為什么會(huì)有公平鎖/非公平鎖的設(shè)計(jì),?
考慮這個(gè)問(wèn)題,,我們需重新回憶上面的鎖獲取實(shí)現(xiàn)圖了,其實(shí)上面我已經(jīng)透露了一點(diǎn)
主要有兩點(diǎn)原因:
原因一:
恢復(fù)掛起的線程到真正鎖的獲取還是有時(shí)間差的,,從人類(lèi)的角度來(lái)看這個(gè)時(shí)間微乎其微,,但是從CPU的角度來(lái)看,這個(gè)時(shí)間差存在的還是很明顯的,。所以非公平鎖能更充分的利用 CPU 的時(shí)間片,,盡量減少 CPU 空閑狀態(tài)時(shí)間
原因二:
不知你是否還記得我在 面試問(wèn),創(chuàng)建多少個(gè)線程合適,?文章中反復(fù)提到過(guò),,使用多線程很重要的考量點(diǎn)是線程切換的開(kāi)銷(xiāo),想象一下,,如果采用非公平鎖,,當(dāng)一個(gè)線程請(qǐng)求鎖獲取同步狀態(tài),然后釋放同步狀態(tài),,因?yàn)椴恍枰紤]是否還有前驅(qū)節(jié)點(diǎn),,所以剛釋放鎖的線程在此刻再次獲取同步狀態(tài)的幾率就變得非常大,所以就減少了線程的開(kāi)銷(xiāo)
相信到這里,,你也就明白了,,為什么 ReentrantLock 默認(rèn)構(gòu)造器用的是非公平鎖同步器
public ReentrantLock() {
sync = new NonfairSync();
}
看到這里,感覺(jué)非公平鎖 perfect,,非也,,有得必有失
使用公平鎖會(huì)有什么問(wèn)題?
公平鎖保證了排隊(duì)的公平性,,非公平鎖霸氣的忽視這個(gè)規(guī)則,,所以就有可能導(dǎo)致排隊(duì)的長(zhǎng)時(shí)間在排隊(duì),也沒(méi)有機(jī)會(huì)獲取到鎖,,這就是傳說(shuō)中的 “饑餓”
如何選擇公平鎖/非公平鎖,?
相信到這里,答案已經(jīng)在你心中了,,如果為了更高的吞吐量,,很顯然非公平鎖是比較合適的,因?yàn)楣?jié)省很多線程切換時(shí)間,,吞吐量自然就上去了,,否則那就用公平鎖還大家一個(gè)公平
我們還差最后一個(gè)環(huán)節(jié),,真的要挺住
可重入鎖
到這里,我們還沒(méi)分析 ReentrantLock 的名字,,JDK 起名這么有講究,,肯定有其含義,直譯過(guò)來(lái)【可重入鎖】
為什么要支持鎖的重入,?
試想,,如果是一個(gè)有 synchronized 修飾的遞歸調(diào)用方法,程序第二次進(jìn)入被自己阻塞了豈不是很大的笑話,,所以 synchronized 是支持鎖的重入的
Lock 是新輪子,,自然也要支持這個(gè)功能,其實(shí)現(xiàn)也很簡(jiǎn)單,,請(qǐng)查看公平鎖和非公平鎖對(duì)比圖,,其中有一段代碼:
// 判斷當(dāng)前線程是否和已占用鎖的線程是同一個(gè)
else if (current == getExclusiveOwnerThread())
仔細(xì)看代碼, 你也許發(fā)現(xiàn),,我前面的一個(gè)說(shuō)明是錯(cuò)誤的,,我要重新解釋一下
重入的線程會(huì)一直將 state + 1, 釋放鎖會(huì) state - 1直至等于0,,上面這樣寫(xiě)也是想幫助大家快速的區(qū)分
總結(jié)
本文是一個(gè)長(zhǎng)文,,說(shuō)明了為什么要造 Lock 新輪子,如何標(biāo)準(zhǔn)的使用 Lock,,AQS 是什么,,是如何實(shí)現(xiàn)鎖的,結(jié)合 ReentrantLock 反推 AQS 中的一些應(yīng)用以及其獨(dú)有的一些特性
獨(dú)占式獲取鎖就這樣介紹完了,,我們還差 AQS 共享式 xxxShared
沒(méi)有分析,,結(jié)合共享式,接下來(lái)我們來(lái)閱讀一下 Semaphore,,ReentrantReadWriteLock 和 CountLatch 等