集合是編程中最常用的數(shù)據(jù)結(jié)構(gòu),。而談到并發(fā),幾乎總是離不開(kāi)集合這類高級(jí)數(shù)據(jù)結(jié)構(gòu)的支持,。比如兩個(gè)線程需要同時(shí)訪問(wèn)一個(gè)中間臨界區(qū)(Queue),,比如常會(huì)用緩存作為外部文件的副本(HashMap)。這篇文章主要分析jdk1.5的3種并發(fā)集合類型(concurrent,,copyonright,,queue)中的ConcurrentHashMap,讓我們從原理上細(xì)致的了解它們,,能夠讓我們?cè)谏疃软?xiàng)目開(kāi)發(fā)中獲益非淺,。 通過(guò)分析Hashtable就知道,,synchronized是針對(duì)整張Hash表的,即每次鎖住整張表讓線程獨(dú)占,,ConcurrentHashMap允許多個(gè)修改操作并發(fā)進(jìn)行,,其關(guān)鍵在于使用了鎖分離技術(shù)。它使用了多個(gè)鎖來(lái)控制對(duì)hash表的不同部分進(jìn)行的修改,。ConcurrentHashMap內(nèi)部使用段(Segment)來(lái)表示這些不同的部分,,每個(gè)段其實(shí)就是一個(gè)小的hash table,它們有自己的鎖,。只要多個(gè)修改操作發(fā)生在不同的段上,,它們就可以并發(fā)進(jìn)行。 一,、結(jié)構(gòu)解析 ConcurrentHashMap和Hashtable主要區(qū)別就是圍繞著鎖的粒度以及如何鎖,可以簡(jiǎn)單理解成把一個(gè)大的HashTable分解成多個(gè),形成了鎖分離,。如圖:
而Hashtable的實(shí)現(xiàn)方式是---鎖整個(gè)hash表 二,、應(yīng)用場(chǎng)景 當(dāng)有一個(gè)大數(shù)組時(shí)需要在多個(gè)線程共享時(shí)就可以考慮是否把它給分層多個(gè)節(jié)點(diǎn)了,避免大鎖,。并可以考慮通過(guò)hash算法進(jìn)行一些模塊定位,。 其實(shí)不止用于線程,當(dāng)設(shè)計(jì)數(shù)據(jù)表的事務(wù)時(shí)(事務(wù)某種意義上也是同步機(jī)制的體現(xiàn)),,可以把一個(gè)表看成一個(gè)需要同步的數(shù)組,,如果操作的表數(shù)據(jù)太多時(shí)就可以考慮事務(wù)分離了(這也是為什么要避免大表的出現(xiàn)),比如把數(shù)據(jù)進(jìn)行字段拆分,,水平分表等. 三,、源碼解讀 ConcurrentHashMap中主要實(shí)體類就是三個(gè):ConcurrentHashMap(整個(gè)Hash表),Segment(桶),,HashEntry(節(jié)點(diǎn)),對(duì)應(yīng)上面的圖可以看出之間的關(guān)系 /**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments; 不變(Immutable)和易變(Volatile)
1. static final class HashEntry<K,V> {
2. final K key;
3. final int hash;
4. volatile V value;
5. final HashEntry<K,V> next;
6. }
可以看到除了value不是final的,其它值都是final的,,這意味著不能從hash鏈的中間或尾部添加或刪除節(jié)點(diǎn),,因?yàn)檫@需要修改next 引用值,所有的節(jié)點(diǎn)的修改只能從頭部開(kāi)始,。對(duì)于put操作,,可以一律添加到Hash鏈的頭部。但是對(duì)于remove操作,,可能需要從中間刪除一個(gè)節(jié)點(diǎn),,這就需要將要?jiǎng)h除節(jié)點(diǎn)的前面所有節(jié)點(diǎn)整個(gè)復(fù)制一遍,最后一個(gè)節(jié)點(diǎn)指向要?jiǎng)h除結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn),。這在講解刪除操作時(shí)還會(huì)詳述,。為了確保讀操作能夠看到最新的值,將value設(shè)置成volatile,,這避免了加鎖,。
這是定位段的方法: 1. final Segment<K,V> segmentFor(int hash) {
2. return segments[(hash >>> segmentShift) & segmentMask];
3. } 數(shù)據(jù)結(jié)構(gòu)
1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
2. implements ConcurrentMap<K, V>, Serializable {
3. /**
4. * Mask value for indexing into segments. The upper bits of a
5. * key's hash code are used to choose the segment.
6. */
7. final int segmentMask;
8.
9. /**
10. * Shift value for indexing within segments.
11. */
12. final int segmentShift;
13.
14. /**
15. * The segments, each of which is a specialized hash table
16. */
17. final Segment<K,V>[] segments;
18. }
所有的成員都是final的,,其中segmentMask和segmentShift主要是為了定位段,參見(jiàn)上面的segmentFor方法,。
1. static final class Segment<K,V> extends ReentrantLock implements Serializable {
2. private static final long serialVersionUID = 2249069246763182397L;
3. /**
4. * The number of elements in this segment's region.
5. */
6. transient volatile int count;
7.
8. /**
9. * Number of updates that alter the size of the table. This is
10. * used during bulk-read methods to make sure they see a
11. * consistent snapshot: If modCounts change during a traversal
12. * of segments computing size or checking containsValue, then
13. * we might have an inconsistent view of state so (usually)
14. * must retry.
15. */
16. transient int modCount;
17.
18. /**
19. * The table is rehashed when its size exceeds this threshold.
20. * (The value of this field is always <tt>(int)(capacity *
21. * loadFactor)</tt>.)
22. */
23. transient int threshold;
24.
25. /**
26. * The per-segment table.
27. */
28. transient volatile HashEntry<K,V>[] table;
29.
30. /**
31. * The load factor for the hash table. Even though this value
32. * is same for all segments, it is replicated to avoid needing
33. * links to outer object.
34. * @serial
35. */
36. final float loadFactor;
37. }
count用來(lái)統(tǒng)計(jì)該段數(shù)據(jù)的個(gè)數(shù),它是volatile(volatile 變量使用指南),,它用來(lái)協(xié)調(diào)修改和讀取操作,,以保證讀取操作能夠讀取到幾乎最新的修改。協(xié)調(diào)方式是這樣的,,每次修改操作做了結(jié)構(gòu)上的改變,,如增加/刪除節(jié)點(diǎn)(修改節(jié)點(diǎn)的值不算結(jié)構(gòu)上的改變),都要寫count值,,每次讀取操作開(kāi)始都要讀取count的值,。這利用了 Java 5中對(duì)volatile語(yǔ)義的增強(qiáng),對(duì)同一個(gè)volatile變量的寫和讀存在happens-before關(guān)系,。modCount統(tǒng)計(jì)段結(jié)構(gòu)改變的次數(shù),,主要是為了檢測(cè)對(duì)多個(gè)段進(jìn)行遍歷過(guò)程中某個(gè)段是否發(fā)生改變,在講述跨段操作時(shí)會(huì)還會(huì)詳述,。threashold用來(lái)表示需要進(jìn)行rehash的界限值,。table數(shù)組存儲(chǔ)段中節(jié)點(diǎn),每個(gè)數(shù)組元素是個(gè)hash鏈,,用HashEntry表示,。table也是volatile,這使得能夠讀取到最新的 table值而不需要同步,。loadFactor表示負(fù)載因子,。 先來(lái)看下刪除操作remove(key)。
1. public V remove(Object key) {
2. hash = hash(key.hashCode());
3. return segmentFor(hash).remove(key, hash, null);
4. }
整個(gè)操作是先定位到段,,然后委托給段的remove操作,。當(dāng)多個(gè)刪除操作并發(fā)進(jìn)行時(shí),只要它們所在的段不相同,,它們就可以同時(shí)進(jìn)行,。下面是Segment的remove方法實(shí)現(xiàn):
1. V remove(Object key, int hash, Object value) {
2. lock();
3. try {
4. int c = count - 1;
5. HashEntry<K,V>[] tab = table;
6. int index = hash & (tab.length - 1);
7. HashEntry<K,V> first = tab[index];
8. HashEntry<K,V> e = first;
9. while (e != null && (e.hash != hash || !key.equals(e.key)))
10. e = e.next;
11.
12. V oldValue = null;
13. if (e != null) {
14. V v = e.value;
15. if (value == null || value.equals(v)) {
16. oldValue = v;
17. // All entries following removed node can stay
18. // in list, but all preceding ones need to be
19. // cloned.
20. ++modCount;
21. HashEntry<K,V> newFirst = e.next;
22. *for (HashEntry<K,V> p = first; p != e; p = p.next)
23. *newFirst = new HashEntry<K,V>(p.key, p.hash,
24. newFirst, p.value);
25. tab[index] = newFirst;
26. count = c; // write-volatile
27. }
28. }
29. return oldValue;
30. } finally {
31. unlock();
32. }
33. }
整個(gè)操作是在持有段鎖的情況下執(zhí)行的,空白行之前的行主要是定位到要?jiǎng)h除的節(jié)點(diǎn)e,。接下來(lái),,如果不存在這個(gè)節(jié)點(diǎn)就直接返回null,否則就要將e前面的結(jié)點(diǎn)復(fù)制一遍,,尾結(jié)點(diǎn)指向e的下一個(gè)結(jié)點(diǎn),。e后面的結(jié)點(diǎn)不需要復(fù)制,,它們可以重用。 中間那個(gè)for循環(huán)是做什么用的呢,?(*號(hào)標(biāo)記)從代碼來(lái)看,,就是將定位之后的所有entry克隆并拼回前面去,但有必要嗎,?每次刪除一個(gè)元素就要將那之前的元素克隆一遍,?這點(diǎn)其實(shí)是由entry的不變性來(lái)決定的,仔細(xì)觀察entry定義,,發(fā)現(xiàn)除了value,,其他所有屬性都是用final來(lái)修飾的,這意味著在第一次設(shè)置了next域之后便不能再改變它,,取而代之的是將它之前的節(jié)點(diǎn)全都克隆一次,。至于entry為什么要設(shè)置為不變性,這跟不變性的訪問(wèn)不需要同步從而節(jié)省時(shí)間有關(guān) 下面是個(gè)示意圖 刪除元素之前:
刪除元素3之后:
第二個(gè)圖其實(shí)有點(diǎn)問(wèn)題,,復(fù)制的結(jié)點(diǎn)中應(yīng)該是值為2的結(jié)點(diǎn)在前面,,值為1的結(jié)點(diǎn)在后面,也就是剛好和原來(lái)結(jié)點(diǎn)順序相反,,還好這不影響我們的討論,。
1. V put(K key, int hash, V value, boolean onlyIfAbsent) {
2. lock();
3. try {
4. int c = count;
5. if (c++ > threshold) // ensure capacity
6. rehash();
7. HashEntry<K,V>[] tab = table;
8. int index = hash & (tab.length - 1);
9. HashEntry<K,V> first = tab[index];
10. HashEntry<K,V> e = first;
11. while (e != null && (e.hash != hash || !key.equals(e.key)))
12. e = e.next;
13.
14. V oldValue;
15. if (e != null) {
16. oldValue = e.value;
17. if (!onlyIfAbsent)
18. e.value = value;
19. }
20. else {
21. oldValue = null;
22. ++modCount;
23. tab[index] = new HashEntry<K,V>(key, hash, first, value);
24. count = c; // write-volatile
25. }
26. return oldValue;
27. } finally {
28. unlock();
29. }
30. }
該方法也是在持有段鎖(鎖定整個(gè)segment)的情況下執(zhí)行的,,這當(dāng)然是為了并發(fā)的安全,,修改數(shù)據(jù)是不能并發(fā)進(jìn)行的,必須得有個(gè)判斷是否超限的語(yǔ)句以確保容量不足時(shí)能夠rehash。接著是找是否存在同樣一個(gè)key的結(jié)點(diǎn),,如果存在就直接替換這個(gè)結(jié)點(diǎn)的值,。否則創(chuàng)建一個(gè)新的結(jié)點(diǎn)并添加到hash鏈的頭部,這時(shí)一定要修改modCount和count的值,,同樣修改count的值一定要放在最后一步。put方法調(diào)用了rehash方法,,reash方法實(shí)現(xiàn)得也很精巧,,主要利用了table的大小為2^n,這里就不介紹了,。而比較難懂的是這句int index = hash & (tab.length - 1),,原來(lái)segment里面才是真正的hashtable,即每個(gè)segment是一個(gè)傳統(tǒng)意義上的hashtable,如上圖,,從兩者的結(jié)構(gòu)就可以看出區(qū)別,,這里就是找出需要的entry在table的哪一個(gè)位置,之后得到的entry就是這個(gè)鏈的第一個(gè)節(jié)點(diǎn),,如果e!=null,,說(shuō)明找到了,這是就要替換節(jié)點(diǎn)的值(onlyIfAbsent == false),,否則,,我們需要new一個(gè)entry,它的后繼是first,,而讓tab[index]指向它,,什么意思呢?實(shí)際上就是將這個(gè)新entry插入到鏈頭,,剩下的就非常容易理解了
1. V get(Object key, int hash) {
2. if (count != 0) { // read-volatile 當(dāng)前桶的數(shù)據(jù)個(gè)數(shù)是否為0
3. HashEntry<K,V> e = getFirst(hash); 得到頭節(jié)點(diǎn)
4. while (e != null) {
5. if (e.hash == hash && key.equals(e.key)) {
6. V v = e.value;
7. if (v != null)
8. return v;
9. return readValueUnderLock(e); // recheck
10. }
11. e = e.next;
12. }
13. }
14. return null;
15. }
get操作不需要鎖。第一步是訪問(wèn)count變量,,這是一個(gè)volatile變量,,由于所有的修改操作在進(jìn)行結(jié)構(gòu)修改時(shí)都會(huì)在最后一步寫count 變量,通過(guò)這種機(jī)制保證get操作能夠得到幾乎最新的結(jié)構(gòu)更新。對(duì)于非結(jié)構(gòu)更新,,也就是結(jié)點(diǎn)值的改變,,由于HashEntry的value變量是 volatile的,也能保證讀取到最新的值,。接下來(lái)就是根據(jù)hash和key對(duì)hash鏈進(jìn)行遍歷找到要獲取的結(jié)點(diǎn),,如果沒(méi)有找到,直接訪回null,。對(duì)hash鏈進(jìn)行遍歷不需要加鎖的原因在于鏈指針next是final的,。但是頭指針卻不是final的,這是通過(guò)getFirst(hash)方法返回,,也就是存在 table數(shù)組中的值,。這使得getFirst(hash)可能返回過(guò)時(shí)的頭結(jié)點(diǎn),例如,,當(dāng)執(zhí)行g(shù)et方法時(shí),,剛執(zhí)行完getFirst(hash)之后,另一個(gè)線程執(zhí)行了刪除操作并更新頭結(jié)點(diǎn),,這就導(dǎo)致get方法中返回的頭結(jié)點(diǎn)不是最新的,。這是可以允許,通過(guò)對(duì)count變量的協(xié)調(diào)機(jī)制,,get能讀取到幾乎最新的數(shù)據(jù),,雖然可能不是最新的。要得到最新的數(shù)據(jù),,只有采用完全的同步,。 1. V readValueUnderLock(HashEntry<K,V> e) {
2. lock();
3. try {
4. return e.value;
5. } finally {
6. unlock();
7. }
8. }
另一個(gè)操作是containsKey,,這個(gè)實(shí)現(xiàn)就要簡(jiǎn)單得多了,,因?yàn)樗恍枰x取值:
1. boolean containsKey(Object key, int hash) {
2. if (count != 0) { // read-volatile
3. HashEntry<K,V> e = getFirst(hash);
4. while (e != null) {
5. if (e.hash == hash && key.equals(e.key))
6. return true;
7. e = e.next;
8. }
9. }
10. return false;
11. }
優(yōu)秀博文:
|
|
來(lái)自: dtl樂(lè)學(xué)館 > 《多線程》