计数操作,并发情况下好复杂。 JDK8ConcurrentHashMap在插入和删除的情况下都会调用addCount方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K, V>[] tab, nt; int n, sc; while (s >= (long ) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
其中baseCount是volatile修饰的变量,通过CAS修改其值:
1 2 3 4 5 6 7 8 9 10 private transient volatile long baseCount; private static final long BASECOUNT = sun.misc.Unsafe.getUnsafe().objectFieldOffset (ConcurrentHashMap.class.getDeclaredField("baseCount" ));
CounterCell是一个简单的内部静态类,每个CounterCell都是一个用于记录数量的单元:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private transient volatile CounterCell[] counterCells; @sun .misc.Contendedstatic final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
ThreadLocalRandom.getProbe(): ThreadLocalRandom是一个线程私有的伪随机数生成器, 每个线程的probe都是不同的 (这点基于ThreadLocalRandom的内部实现, 它在内部维护了一个probeGenerator, 这是一个类型为AtomicInteger的静态常量, 每当初始化一个ThreadLocalRandom时probeGenerator都会先自增一个常量然后返回的整数即为当前线程的probe, probe变量被维护在Thread对象中),可以认为每个线程的probe就是它在CounterCell数组中的hash code。
fullAddCount()函数根据当前线程的probe寻找对应的CounterCell进行计数,如果CounterCell数组未被初始化,则初始化CounterCell数组和CounterCell。 这个方法太他妈复杂了,全程懵逼。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 private final void fullAddCount (long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } boolean collide = false ; for (; ; ) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { CounterCell r = new CounterCell (x); if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; else if (counterCells != as || n >= NCPU) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { CounterCell[] rs = new CounterCell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { if (counterCells == as) { CounterCell[] rs = new CounterCell [2 ]; rs[h & 1 ] = new CounterCell (x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ; } }
CounterCell数组的大小永远是一个2的n次方,初始容量为2,每次扩容的新容量都是之前容量乘以二,处于性能考虑,它的最大容量上限是机器的CPU数量。 所以说CounterCell数组的碰撞冲突是很严重的,因为它的bucket基数太小了。 而发生碰撞就代表着一个CounterCell会被多个线程竞争, 为了解决这个问题,Doug Lea使用无限循环加上CAS来模拟出一个自旋锁来保证线程安全, 自旋锁的实现基于一个被volatile修饰的整数变量,该变量只会有两种状态:0和1,当它被设置为0时表示没有加锁,当它被设置为1时表示已被其他线程加锁。 这个自旋锁用于保护初始化CounterCell、初始化CounterCell数组以及对CounterCell数组进行扩容时的安全。
1 2 3 4 5 6 7 private transient volatile int cellsBusy;
统计总和就比较简单了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
参考文档(拷贝):http://www.importnew.com/29832.html