Fork me on GitHub

2019-7-22 jdk源码分析(6)-ConcurrentHashMap

jdk源码分析(6)-ConcurrentHashMap

本文将主要讲述 JDK1.8 版本 的 ConcurrentHashMap,其内部结构和很多的哈希优化算法,都是和 JDK1.8 版本的 HashMap是一样的,所以在阅读本文之前,一定要先了解 HashMap。

一、ConcurrentHashMap 结构概述

CHM 的源码有 6k 多行,包含的内容多,精巧,不容易理解;建议在查看源码的时候,可以首先把握整体结构脉络,对于一些精巧的优化,哈希技巧可以先了解目的就可以了,不用深究;对整体把握比较清楚后,在逐步分析,可以比较快速的看懂;

JDK1.8 版本中的 CHM,和 JDK1.7 版本的差别非常大,在查看资料的时候要注意区分,1.7 中主要是使用 Segment 分段锁 来解决并发问题的;而在 1.8 中则完全没有这些稍显臃肿的结构,其结构基本和 HashMap 是一样的,都是 数组 + 链表 + 红黑树,如图所示

其主要区别就在 CHM 支持并发:

  • 使用 Unsafe 方法操作数组内部元素,保证可见性;(U.getObjectVolatile、U.compareAndSwapObject、U.putObjectVolatile);
  • 在更新和移动节点的时候,直接锁住对应的哈希桶,锁粒度更小,且动态扩展;
  • 针对扩容慢操作进行优化,
    • 首先扩容过程的中,节点首先移动到过度表 nextTable ,所有节点移动完毕时替换散列表 table
    • 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizeCtl 标记正在扩容;
    • 移动完成一个哈希桶或者遇到空桶时,将其标记为 ForwardingNode 节点,并指向 nextTable
    • 后有其他线程在操作哈希表时,遇到 ForwardingNode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;
  • 优化哈希表计数器,采用 LongAdder、Striped64 类似思想;
  • 以及大量的哈希算法优化和状态变量优化;

以上讲的这些不太清楚也没有关系,主要是有一个印象,大致清楚 CHM 的实现方向,具体细节后面还会结合源码详细讲解;

2. 类定义和成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ConcurrentHashMap<K,V> 
extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
private static final int MAXIMUM_CAPACITY = 1 << 30; // 最大容量
private static final int DEFAULT_CAPACITY = 16; // 默认初始化容量
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 并发级别,为兼容1.7,实际未用
private static final float LOAD_FACTOR = 0.75f; // 固定负载系数,n - (n >>> 2)
static final int TREEIFY_THRESHOLD = 8; // 链表超过8时,转为红黑树
static final int UNTREEIFY_THRESHOLD = 6; // 红黑树低于6时,转为链表
static final int MIN_TREEIFY_CAPACITY = 64; // 树化最小容量,容量小于64时,先扩容
private static final int MIN_TRANSFER_STRIDE = 16; // 扩容时拆分散列表,最小步长
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 可参与扩容的最大线程

static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU 数
transient volatile Node<K,V>[] table; // 散列表
private transient volatile Node<K,V>[] nextTable; // 扩容时的过度表

private transient volatile int sizeCtl; // 最重要的状态变量,下面详讲
private transient volatile int transferIndex; // 扩容进度指示

private transient volatile long baseCount; // 计数器,基础基数
private transient volatile int cellsBusy; // 计数器,并发标记
private transient volatile CounterCell[] counterCells; // 计数器,并发累计

public ConcurrentHashMap() { }

public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 注意这里不是0.75,后面介绍
this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor); // 注意这里的初始化
int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
...
}

上面有几个重要的地方这里单独讲:

LOAD_FACTOR:

这里的负载系数,同 HashMap 等其他 Map 的系数有明显区别:

sizeCtl:

sizeCtl 是 CHM 中最重要的状态变量,其中包括很多中状态,这里先整体介绍帮助后面源码理解;

  • sizeCtl = 0 :初始值,还未指定初始容量;
  • sizeCtl > 0 :
    • table 未初始化,表示初始化容量;
    • table 已初始化,表示扩容阈值(0.75n);
  • sizeCtl = -1 :表示正在初始化;
  • sizeCtl < -1 :表示正在扩容,具体结构如图所示:

1
2
3
4
5
6
7
8
/*
* n=64
* Integer.numberOfLeadingZeros(n)=26
* resizeStamp(64) = 0001 1010 | 1000 0000 0000 0000 = 1000 0000 0001 1010
*/
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

所以 resizeStamp(64) << RESIZE_STAMP_SHIFT) + 2 ,表示扩容目标为 64,有一个线程正在扩容;

3. Node 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static class Node<K,V> implements Map.Entry<K,V> {  // 哈希表普通节点
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node<K,V> find(int h, Object k) {} // 主要在扩容时,利用多态查询已转移节点
}

static final class ForwardingNode<K,V> extends Node<K,V> { // 标识扩容节点
final Node<K,V>[] nextTable; // 指向成员变量 ConcurrentHashMap.nextTable

ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null); // hash = -1,快速确定 ForwardingNode 节点
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) {}
}

static final class TreeBin<K,V> extends Node<K,V> { // 红黑树根节点
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null); // hash = -2,快速确定红黑树,
...
}
}
static final class TreeNode<K,V> extends Node<K,V> { } // 红黑树普通节点,其 hash 同 Node 普通节点 > 0;

4. 哈希计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final int MOVED     = -1;          // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

// 让高位16位,参与哈希桶定位运算的同时,保证 hash 为正
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
除此之外还有,

tableSizeFor : 将容量转为大于n,且最小的2的幂;
除留余数法 :hash % length = hash & (length-1) ;
扩容后哈希桶定位:(e.hash & oldCap),0 - 位置不变,1 - 原来的位置 + oldCap;

5. 哈希桶可见性

我们都知道一个数组即使声明为 volatile,也只能保证这个数组引用本身的可见性,其内部元素的可见性是无法保证的,如果每次都加锁,则效率必然大大降低,在 CHM 中则使用 Unsafe 方法来保证:

1
2
3
4
5
6
7
8
9
10
11
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

6.initTable 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) Thread.yield(); // 有其他线程在初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 设置状态 -1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 注意此时的 sizeCtl 表示初始容量,完毕后表示扩容阈值
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); // 同 0.75n
}
} finally {
sizeCtl = sc; // 注意这里没有 CAS 更新,这就是状态变量的高明了,因为前面设置了 -1,此时这里没有竞争
}
break;
}
}
return tab;
}

7.get 方法

get 方法可能看代码不是很长,但是他却能 保证无锁状态下的内存一致性 ,他的每一句代码都要仔细理解,多设想一下如果发生竞争会怎样,如此才能有所得;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 计算 hash
if ((tab = table) != null && (n = tab.length) > 0 && // 确保 table 已经初始化

// 确保对应的哈希桶不为空,注意这里是 Volatile 语义获取;因为扩容的时候,是完全拷贝,所以只要不为空,则链表必然完整
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}

// hash < 0,则必然在扩容,原来位置的节点可能全部移动到 i + oldCap 位置,所以利用多态到 nextTable 中查找
else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;

while ((e = e.next) != null) { // 遍历链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

8.putVal 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // hash 计算
int binCount = 0; // 状态变量,主要表示查找链表节点数,最后判断是否转为红黑树
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) tab = initTable(); // 初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // cas 获取哈希桶
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas 更新,失败时继续循环更新
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 正在扩容的时候,先帮助扩容
else {
V oldVal = null;
synchronized (f) { // 注意这里只锁定了一个哈希桶,所以比 1.7 中的 Segment 分段锁 粒度更低
if (tabAt(tab, i) == f) { // 确认该哈希桶是否已经移动
if (fh >= 0) { // hash >=0 则必然是普通节点,直接遍历链表即可
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if(e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 查找成功
oldVal = e.val;
if (!onlyIfAbsent) e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 查找失败时,直接在末尾添加新节点
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 树根节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 红黑树查找
oldVal = p.val;
if (!onlyIfAbsent) p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果链表长度大于8,转为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 计数加一,注意这里使用的是计数器,普通的 Atomic 变量仍然可能称为性能瓶颈;
return null;
}

其具体流程如图所示

9.扩容

扩容操作一直都是比较慢的操作,而 CHM 中巧妙的利用任务划分,使得多个线程可能同时参与扩容;另外扩容条件也有两个:

  • 有链表长度超过 8,但是容量小于 64 的时候,发生扩容;
  • 节点数超过阈值的时候,发生扩容;

其扩容的过程可描述为:

  • 首先扩容过程的中,节点首先移动到过度表 nextTable ,所有节点移动完毕时替换散列表 table
  • 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizeCtl 标记正在扩容;
  • 移动完成一个哈希桶或者遇到空桶时,将其标记为 ForwardingNode 节点,并指向 nextTable
  • 后有其他线程在操作哈希表时,遇到 ForwardingNode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;

图形化表示如下:

源码分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // 根据 CPU 数量计算任务步长
if (nextTab == null) { // 初始化 nextTab
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 扩容一倍
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE; // 发生 OOM 时,不再扩容
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 标记空桶,或已经转移完毕的桶
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) { // 逆向遍历扩容
Node<K,V> f; int fh;
while (advance) { // 向前获取哈希桶
int nextIndex, nextBound;
if (--i >= bound || finishing) // 已经取到哈希桶,或已完成时退出
advance = false;
else if ((nextIndex = transferIndex) <= 0) { // 遍历到达头节点,已经没有待迁移的桶,线程准备退出
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 当前任务完成,领取下一批哈希桶
bound = nextBound;
i = nextIndex - 1; // 索引指向下一批哈希桶
advance = false;
}
}

// i < 0 :表示扩容结束,已经没有待移动的哈希桶
// i >= n :扩容结束,再次检查确认
// i + n >= nextn : 在使用 nextTable 替换 table 时,有线程进入扩容就会出现
if (i < 0 || i >= n || i + n >= nextn) { // 完成扩容准备退出
int sc;
if (finishing) { // 两次检查,只有最后一个扩容线程退出时,才更新变量
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 扩容线程减一
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 不是最后一个线程,直接退出
finishing = advance = true; // 最后一个线程,再次检查
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) // 当前节点为空,直接标记为 ForwardingNode,然后继续获取下一个桶
advance = casTabAt(tab, i, null, fwd);

// 之前的线程已经完成该桶的移动,直接跳过,正常情况下自己的任务区间,不会出现 ForwardingNode 节点,
else if ((fh = f.hash) == MOVED) // 此处为极端条件下的健壮性检查
advance = true; // already processed

// 开始处理链表
else {
// 注意在 get 的时候,可以无锁获取,是因为扩容是全拷贝节点,完成后最后在更新哈希桶
// 而在 put 的时候,是直接将节点加入尾部,获取修改其中的值,此时如果允许 put 操作,最后就会发生脏读,
// 所以 put 和 transfer,需要竞争同一把锁,也就是对应的哈希桶,以保证内存一致性效果
synchronized (f) {
if (tabAt(tab, i) == f) { // 确认锁定的是同一个桶
Node<K,V> ln, hn;
if (fh >= 0) { // 正常节点
int runBit = fh & n; // hash & n,判断扩容后的索引
Node<K,V> lastRun = f;

// 此处找到链表最后扩容后处于同一位置的连续节点,这样最后一节就不用再一次复制了
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}

// 依次将链表拆分成,lo、hi 两条链表,即位置不变的链表,和位置 + oldCap 的链表
// 注意最后一节链表没有new,而是直接使用原来的节点
// 同时链表的顺序也被打乱了,lastRun 到最后为正序,前面一节为逆序
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 插入 lo 链表
setTabAt(nextTab, i + n, hn); // 插入 hi 链表
setTabAt(tab, i, fwd); // 哈希桶移动完成,标记为 ForwardingNode 节点
advance = true; // 继续获取下一个桶
}
else if (f instanceof TreeBin) { // 拆分红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; // 为避免最后在反向遍历,先留头结点的引用,
TreeNode<K,V> hi = null, hiTail = null; // 因为顺序的链表,可以加速红黑树构造
int lc = 0, hc = 0; // 同样记录 lo,hi 链表的长度
for (Node<K,V> e = t.first; e != null; e = e.next) { // 中序遍历红黑树
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null); // 构造红黑树节点
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}

// 判断是否需要将其转化为红黑树,同时如果只有一条链,那么就可以不用在构造
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

10.计数器

当获取 Map.size 的时候,如果使用 Atomic 变量,很容易导致过度竞争,产生性能瓶颈,所以 CHM 中使用了,计数器的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
private transient volatile CounterCell[] counterCells; // 计数器

@sun.misc.Contended static final class CounterCell { // @sun.misc.Contended 避免伪缓存
volatile long value;
CounterCell(long x) { value = x; }
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) { // 累计计数
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

11.总结

  • 首先 JDK1.8 的 CHM,没有使用 Segment 分段锁,而是直接锁定单个哈希桶
  • 对数组中的哈希桶使用 CAS 操作,保证其可见性
  • 对扩容是用,任务拆分,多线程同时扩容的方式,加速扩容
  • 对 size 使用计数器思想
  • CHM 中对状态变量的应用,使得很多操作都得以无所化进行
-------------本文结束感谢您的阅读-------------
坚持原创技术分享,您的支持将鼓励我继续创作!