談到并發(fā),,我們不得不說 AQS(AbstractQueuedSynchronizer)
,所謂的 AQS
即是抽象的隊列式的同步器,,內部定義了很多鎖相關的方法,,我們熟知的 ReentrantLock
、 ReentrantReadWriteLock
,、 CountDownLatch
,、 Semaphore
等都是基于 AQS
來實現(xiàn)的。
我們先看下 AQS
相關的 UML
圖:
AQS
中 維護了一個 volatile int state
(代表共享資源)和一個 FIFO
線程等待隊列(多線程爭用資源被阻塞時會進入此隊列),。
這里 volatile
能夠保證多線程下的可見性,,當 state=1
則代表當前對象鎖已經被占有,,其他線程來加鎖時則會失敗,,加鎖失敗的線程會被放入一個 FIFO
的等待隊列中,比列會被 UNSAFE.park()
操作掛起,,等待其他獲取鎖的線程釋放鎖才能夠被喚醒,。
另外 state
的操作都是通過 CAS
來保證其并發(fā)修改的安全性。
具體原理我們可以用一張圖來簡單概括:
AQS
中提供了很多關于鎖的實現(xiàn)方法,,
tryAcquire(int):獨占方式獲取鎖,。嘗試獲取資源,成功則返回true,,失敗則返回false,。 tryRelease(int):獨占方式釋放鎖。嘗試釋放資源,,成功則返回true,,失敗則返回false。 這里還有一些方法并沒有列出來,,接下來我們以 ReentrantLock
作為突破點通過源碼和畫圖的形式一步步了解 AQS
內部實現(xiàn)原理,。
文章準備模擬多線程競爭鎖、釋放鎖的場景來進行分析 AQS
源碼:
三個線程(線程一,、線程二,、線程三)同時來加鎖/釋放鎖
目錄如下:
線程二/三 加鎖失敗時AQS
中等待隊列的數(shù)據(jù)模型 通過線程場景來講解Condition中a wait()
和 signal()
實現(xiàn)原理 這里會通過畫圖來分析每個線程加鎖、釋放鎖后 AQS
內部的數(shù)據(jù)結構和實現(xiàn)原理
線程一加鎖成功 如果同時有三個線程 并發(fā)搶占鎖,,此時線程一 搶占鎖成功,,線程二 和線程三 搶占鎖失敗,具體執(zhí)行流程如下:
此時 AQS
內部數(shù)據(jù)為:
線程二 ,、線程三 加鎖失?。?/span>
有圖可以看出,,等待隊列中的節(jié)點 Node
是一個雙向鏈表,這里 SIGNAL
是 Node
中 waitStatus
屬性,, Node
中還有一個 nextWaiter
屬性,,這個并未在圖中畫出來,這個到后面 Condition
會具體講解的,。
具體看下?lián)屨兼i代碼實現(xiàn):
java.util.concurrent.locks.ReentrantLock .NonfairSync:
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
這里使用的ReentrantLock非公平鎖 ,,線程進來直接利用 CAS
嘗試搶占鎖,如果搶占成功 state
值回被改為1,,且設置對象獨占鎖線程為當前線程,。如下所示:
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
線程二搶占鎖失敗 我們按照真實場景來分析,線程一 搶占鎖成功后,, state
變?yōu)?,,線程二 通過 CAS
修改 state
變量必然會失敗。此時 AQS
中 FIFO
(First In First Out 先進先出)隊列中數(shù)據(jù)如圖所示:
我們將線程二 執(zhí)行的邏輯一步步拆解來看:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先看看 tryAcquire()
的具體實現(xiàn): java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire()
:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error('Maximum lock count exceeded' ); setState(nextc); return true ; } return false ; }
nonfairTryAcquire()
方法中首先會獲取 state
的值,,如果不為0則說明當前對象的鎖已經被其他線程所占有,接著判斷占有鎖的線程是否為當前線程,,如果是則累加 state
值,,這就是可重入鎖的具體實現(xiàn),累加 state
值,,釋放鎖的時候也要依次遞減 state
值,。
如果 state
為0,,則執(zhí)行 CAS
操作,,嘗試更新 state
值為1,如果更新成功則代表當前線程加鎖成功,。
以線程二 為例,,因為線程一 已經將 state
修改為1,所以線程二 通過 CAS
修改 state
的值不會成功,。加鎖失敗,。
線程二 執(zhí)行tryAcquire()
后會返回false,,接著執(zhí)行 addWaiter(Node.EXCLUSIVE)
邏輯,將自己加入到一個 FIFO
等待隊列中,代碼實現(xiàn)如下:
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter()
:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
這段代碼首先會創(chuàng)建一個和當前線程綁定的 Node
節(jié)點,, Node
為雙向鏈表,。此時等待對內中的 tail
指針為空,直接調用 enq(node)
方法將當前線程加入等待隊列尾部:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
第一遍循環(huán)時 tail
指針為空,,進入if邏輯,,使用 CAS
操作設置 head
指針,將 head
指向一個新創(chuàng)建的 Node
節(jié)點,。此時 AQS
中數(shù)據(jù):
執(zhí)行完成之后,, head
、 tail
,、 t
都指向第一個 Node
元素,。
接著執(zhí)行第二遍循環(huán),進入 else
邏輯,,此時已經有了 head
節(jié)點,,這里要操作的就是將線程二 對應的 Node
節(jié)點掛到 head
節(jié)點后面。此時隊列中就有了兩個 Node
節(jié)點:
addWaiter()
方法執(zhí)行完后,,會返回當前線程創(chuàng)建的節(jié)點信息,。繼續(xù)往后執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
邏輯,此時傳入的參數(shù)為線程二 對應的 Node
節(jié)點信息:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued()
:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndChecknIterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }private final boolean parkAndCheckInterrupt() { LockSupport.park(this ); return Thread.interrupted(); }
acquireQueued()
這個方法會先判斷當前傳入的 Node
對應的前置節(jié)點是否為 head
,,如果是則嘗試加鎖,。加鎖成功過則將當前節(jié)點設置為 head
節(jié)點,,然后空置之前的 head
節(jié)點,方便后續(xù)被垃圾回收掉,。
如果加鎖失敗或者 Node
的前置節(jié)點不是 head
節(jié)點,,就會通過 shouldParkAfterFailedAcquire
方法將 head
節(jié)點的 waitStatus
變?yōu)榱?/span>SIGNAL=-1
,最后執(zhí)行 parkAndChecknIterrupt
方法,,調用 LockSupport.park()
掛起當前線程,。
此時 AQS
中的數(shù)據(jù)如下圖:
此時線程二 就靜靜的待在 AQS
的等待隊列里面了,等著其他線程釋放鎖來喚醒它,。
線程三搶占鎖失敗 看完了線程二 搶占鎖失敗的分析,,那么再來分析線程三 搶占鎖失敗就很簡單了,先看看 addWaiter(Node mode)
方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
此時等待隊列的 tail
節(jié)點指向線程二 ,,進入 if
邏輯后,,通過 CAS
指令將 tail
節(jié)點重新指向線程三 。接著線程三 調用 enq()
方法執(zhí)行入隊操作,,和上面線程二 執(zhí)行方式是一致的,,入隊后會修改線程二 對應的 Node
中的 waitStatus=SIGNAL
。最后線程三 也會被掛起。此時等待隊列的數(shù)據(jù)如圖:
線程一釋放鎖 現(xiàn)在來分析下釋放鎖的過程,,首先是線程一 釋放鎖,,釋放鎖后會喚醒 head
節(jié)點的后置節(jié)點,也就是我們現(xiàn)在的線程二 ,,具體操作流程如下:
執(zhí)行完后等待隊列數(shù)據(jù)如下:
此時線程二 已經被喚醒,,繼續(xù)嘗試獲取鎖,如果獲取鎖失敗,,則會繼續(xù)被掛起,。如果獲取鎖成功,則 AQS
中數(shù)據(jù)如圖:
接著還是一步步拆解來看,,先看看線程一 釋放鎖的代碼:
java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
這里首先會執(zhí)行 tryRelease()
方法,,這個方法具體實現(xiàn)在 ReentrantLock
中,如果 tryRelease
執(zhí)行成功,,則繼續(xù)判斷 head
節(jié)點的 waitStatus
是否為0,,前面我們已經看到過, head
的 waitStatue
為 SIGNAL(-1)
,,這里就會執(zhí)行 unparkSuccessor()
方法來喚醒 head
的后置節(jié)點,,也就是我們上面圖中線程二 對應的 Node
節(jié)點。
此時看 ReentrantLock.tryRelease()
中的具體實現(xiàn):
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
執(zhí)行完 ReentrantLock.tryRelease()
后,, state
被設置成0,,Lock對象的獨占鎖被設置為null。此時看下 AQS
中的數(shù)據(jù):
接著執(zhí)行 java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()
方法,,喚醒 head
的后置節(jié)點:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
這里主要是將 head
節(jié)點的 waitStatus
設置為0,,然后解除 head
節(jié)點 next
的指向,使 head
節(jié)點空置,,等待著被垃圾回收,。
此時重新將 head
指針指向線程二 對應的 Node
節(jié)點,且使用 LockSupport.unpark
方法來喚醒線程二 ,。
被喚醒的線程二 會接著嘗試獲取鎖,,用 CAS
指令修改 state
數(shù)據(jù)。執(zhí)行完成后可以查看 AQS
中數(shù)據(jù):
此時線程二 被喚醒,,線程二 接著之前被 park
的地方繼續(xù)執(zhí)行,,繼續(xù)執(zhí)行 acquireQueued()
方法。
線程二喚醒繼續(xù)加鎖 final boolean acquireQueued(final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
此時線程二 被喚醒,,繼續(xù)執(zhí)行 for
循環(huán),,判斷線程二 的前置節(jié)點是否為 head
,如果是則繼續(xù)使用 tryAcquire()
方法來嘗試獲取鎖,,其實就是使用 CAS
操作來修改 state
值,,如果修改成功則代表獲取鎖成功。接著將線程二 設置為 head
節(jié)點,然后空置之前的 head
節(jié)點數(shù)據(jù),,被空置的節(jié)點數(shù)據(jù)等著被垃圾回收 ,。
此時線程三獲取鎖成功, AQS
中隊列數(shù)據(jù)如下:
等待隊列中的數(shù)據(jù)都等待著被垃圾回收,。
線程二釋放鎖/線程三加鎖 當線程二 釋放鎖時,,會喚醒被掛起的線程三 ,流程和上面大致相同,,被喚醒的線程三 會再次嘗試加鎖,,具體代碼可以參考上面內容。具體流程圖如下:
此時 AQS
中隊列數(shù)據(jù)如圖:
上面所有的加鎖場景都是基于非公平鎖 來實現(xiàn)的,,非公平鎖 是 ReentrantLock
的默認實現(xiàn),,那我們接著來看一下公平鎖 的實現(xiàn)原理,這里先用一張圖來解釋公平鎖 和非公平鎖 的區(qū)別:
非公平鎖 執(zhí)行流程:
這里我們還是用之前的線程模型來舉例子,,當線程二 釋放鎖的時候,,喚醒被掛起的線程三 ,線程三 執(zhí)行 tryAcquire()
方法使用 CAS
操作來嘗試修改 state
值,,如果此時又來了一個線程四 也來執(zhí)行加鎖操作,,同樣會執(zhí)行 tryAcquire()
方法。
這種情況就會出現(xiàn)競爭,,線程四 如果獲取鎖成功,,線程三 仍然需要待在等待隊列中被掛起。這就是所謂的非公平鎖 ,,線程三 辛辛苦苦排隊等到自己獲取鎖,,卻眼巴巴的看到線程四 插隊獲取到了鎖。
公平鎖 執(zhí)行流程:
公平鎖在加鎖的時候,,會先判斷 AQS
等待隊列中是存在節(jié)點,,如果存在節(jié)點則會直接入隊等待,具體代碼如下.
公平鎖在獲取鎖是也是首先會執(zhí)行 acquire()
方法,,只不過公平鎖單獨實現(xiàn)了 tryAcquire()
方法:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這里會執(zhí)行 ReentrantLock
中公平鎖的 tryAcquire()
方法
#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire()
:
static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error('Maximum lock count exceeded' ); setState(nextc); return true ; } return false ; } }
這里會先判斷 state
值,如果不為0且獲取鎖的線程不是當前線程,,直接返回false代表獲取鎖失敗,,被加入等待隊列。如果是當前線程則可重入獲取鎖,。
如果 state=0
則代表此時沒有線程持有鎖,,執(zhí)行 hasQueuedPredecessors()
判斷 AQS
等待隊列中是否有元素存在,如果存在其他等待線程,,那么自己也會加入到等待隊列尾部,,做到真正的先來后到,有序加鎖。具體代碼如下:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors()
:
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
這段代碼很有意思,,返回 false
代表隊列中沒有節(jié)點或者僅有一個節(jié)點是當前線程創(chuàng)建的節(jié)點,。返回 true
則代表隊列中存在等待節(jié)點,當前線程需要入隊等待,。
先判斷 head
是否等于 tail
,,如果隊列中只有一個 Node
節(jié)點,那么 head
會等于 tail
,,接著判斷 head
的后置節(jié)點,,這里肯定會是 null
,如果此 Node
節(jié)點對應的線程和當前的線程是同一個線程,,那么則會返回 false
,,代表沒有等待節(jié)點或者等待節(jié)點就是當前線程創(chuàng)建的 Node
節(jié)點。此時當前線程會嘗試獲取鎖,。
如果 head
和 tail
不相等,,說明隊列中有等待線程創(chuàng)建的節(jié)點,此時直接返回 true
,,如果只有一個節(jié)點,,而此節(jié)點的線程和當前線程不一致,也會返回 true
非公平鎖 和公平鎖 的區(qū)別:非公平鎖 性能高于公平鎖 性能,。非公平鎖 可以減少CPU
喚醒線程的開銷,,整體的吞吐效率會高點, CPU
也不必取喚醒所有線程,,會減少喚起線程的數(shù)量
非公平鎖 性能雖然優(yōu)于公平鎖 ,,但是會存在導致線程饑餓 的情況。在最壞的情況下,,可能存在某個線程一直獲取不到鎖 ,。不過相比性能而言,饑餓問題可以暫時忽略,,這可能就是ReentrantLock
默認創(chuàng)建非公平鎖的原因之一了,。
Condition 簡介
上面已經介紹了 AQS
所提供的核心功能,當然它還有很多其他的特性,,這里我們來繼續(xù)說下 Condition
這個組件,。
Condition
是在 java 1.5
中才出現(xiàn)的,它用來替代傳統(tǒng)的 Object
的 wait()
,、 notify()
實現(xiàn)線程間的協(xié)作,,相比使用 Object
的 wait()
、 notify()
,,使用 Condition
中的 await()
,、 signal()
這種方式實現(xiàn)線程間協(xié)作更加安全和高效,。因此通常來說比較推薦使用 Condition
其中 AbstractQueueSynchronizer
中實現(xiàn)了 Condition
中的方法,主要對外提供 awaite(Object.wait())
和 signal(Object.notify())
調用,。
Condition Demo示例 使用示例代碼:
/** * ReentrantLock 實現(xiàn)源碼學習 * @author 一枝花算不算浪漫 * @date 2020/4/28 7:20 */ public class ReentrantLockDemo { static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) { Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); try { System.out.println('線程一加鎖成功' ); System.out.println('線程一執(zhí)行await被掛起' ); condition.await(); System.out.println('線程一被喚醒成功' ); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println('線程一釋放鎖成功' ); } }).start(); new Thread(() -> { lock.lock(); try { System.out.println('線程二加鎖成功' ); condition.signal(); System.out.println('線程二喚醒線程一' ); } finally { lock.unlock(); System.out.println('線程二釋放鎖成功' ); } }).start(); } }
執(zhí)行結果如下圖:
這里線程一 先獲取鎖,,然后使用 await()
方法掛起當前線程并釋放鎖 ,線程二 獲取鎖后使用 signal
喚醒線程一 ,。
Condition實現(xiàn)原理圖解 我們還是用上面的 demo
作為實例,,執(zhí)行的流程如下:
線程一 執(zhí)行await()
方法:
先看下具體的代碼實現(xiàn), #java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await()
:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
await()
方法中首先調用 addConditionWaiter()
將當前線程加入到 Condition
隊列中,。
執(zhí)行完后我們可以看下 Condition
隊列中的數(shù)據(jù):
具體實現(xiàn)代碼為:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
這里會用當前線程創(chuàng)建一個 Node
節(jié)點,, waitStatus
為 CONDITION
。接著會釋放該節(jié)點的鎖,,調用之前解析過的 release()
方法,,釋放鎖后此時會喚醒被掛起的線程二 ,線程二 會繼續(xù)嘗試獲取鎖,。
接著調用 isOnSyncQueue()
方法判斷當前節(jié)點是否為 Condition
隊列中的頭部節(jié)點,,如果是則調用 LockSupport.park(this)
掛起 Condition
中當前線程。此時線程一 被掛起,,線程二 獲取鎖成功,。
具體流程如下圖:
線程二 執(zhí)行signal()
方法:
首先我們考慮下線程二 已經獲取到鎖,此時 AQS
等待隊列中已經沒有了數(shù)據(jù),。
接著就來看看線程二 喚醒線程一 的具體執(zhí)行流程:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); }
先判斷當前線程是否為獲取鎖的線程,,如果不是則直接拋出異常。接著調用 doSignal()
方法來喚醒線程,。
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null ) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這里先從 transferForSignal()
方法來看,,通過上面的分析我們知道 Condition
隊列中只有線程一創(chuàng)建的一個 Node
節(jié)點,且 waitStatue
為 CONDITION
,,先通過 CAS
修改當前節(jié)點 waitStatus
為0,,然后執(zhí)行 enq()
方法將當前線程加入到等待隊列中,并返回當前線程的前置節(jié)點,。
加入等待隊列的代碼在上面也已經分析過,,此時等待隊列中數(shù)據(jù)如下圖:
接著開始通過 CAS
修改當前節(jié)點的前置節(jié)點 waitStatus
為 SIGNAL
,并且喚醒當前線程,。此時 AQS
中等待隊列數(shù)據(jù)為:
線程一 被喚醒后,,繼續(xù)執(zhí)行await()
方法中的 while 循環(huán)。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
因為此時線程一的 waitStatus
已經被修改為0,,所以執(zhí)行 isOnSyncQueue()
方法會返回 false
。跳出 while
循環(huán),。
接著執(zhí)行 acquireQueued()
方法,,這里之前也有講過,,嘗試重新獲取鎖,如果獲取鎖失敗繼續(xù)會被掛起,。直到另外線程釋放鎖才被喚醒,。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; // help GC failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
此時線程一 的流程都已經分析完了,等線程二 釋放鎖后,,線程一 會繼續(xù)重試獲取鎖,,流程到此終結。
Condition總結 我們總結下 Condition 和 wait/notify 的比較:
Condition 可以精準的對多個不同條件進行控制,,wait/notify 只能和 synchronized 關鍵字一起使用,,并且只能喚醒一個或者全部的等待隊列;
Condition 需要使用 Lock 進行控制,,使用的時候要注意 lock() 后及時的 unlock(),,Condition 有類似于 await 的機制,因此不會產生加鎖方式而產生的死鎖出現(xiàn),,同時底層實現(xiàn)的是 park/unpark 的機制,,因此也不會產生先喚醒再掛起的死鎖,一句話就是不會產生死鎖,,但是 wait/notify 會產生先喚醒再掛起的死鎖,。
這里用了一步一圖的方式結合三個線程依次加鎖/釋放鎖來展示了 ReentrantLock
的實現(xiàn)方式和實現(xiàn)原理,而 ReentrantLock
底層就是基于 AQS
實現(xiàn)的,,所以我們也對 AQS
有了深刻的理解,。
另外還介紹了公平鎖 與非公平鎖 的實現(xiàn)原理, Condition
的實現(xiàn)原理,,基本上都是使用源碼+繪圖 的講解方式,,盡量讓大家更容易去理解。