// 默认为 0// 当初始化时, 为 -1// 当扩容时, 为 -(1 + 扩容线程数)// 当初始化或扩容完成后,为 下一次的扩容的阈值大小private transient volatile int sizeCtl;// 整个 ConcurrentHashMap 就是一个 Node[]static class Node implements Map.Entry {}// hash 表transient volatile Node[] table;// 扩容时的 新 hash 表private transient volatile Node[] nextTable;// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点,key = -1static final class ForwardingNode extends Node {}// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Nodestatic final class ReservationNode extends Node {}// 作为 红黑树 的头节点, 存储 root 和 first,其子节点还是 TreeNodestatic final class TreeBin extends Node {}// 作为 treebin 的节点, 存储 parent, left, rightstatic final class TreeNode extends Node {}
2.3.2、重要的方法
// 获取 Node[] 中第 i 个 Nodestatic final Node tabAt(Node[] tab, int i) // cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值static final boolean casTabAt(Node[] tab, int i, Node c, Node v) // 直接修改 Node[] 中第 i 个 Node 的值, v 为新值static final void setTabAt(Node[] tab, int i, Node v)
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); // tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ..、 int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap;}
2.3.4、get
public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; // spread 方法能确保返回结果是正数 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果头结点已经是要查找的 key if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // hash 为负数表示该 bin 在扩容中(-1)或是 treebin(-2), 这时调用 find 方法来查找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 正常遍历链表, 用 equals 比较 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);}final long sumCount() { CounterCell[] as = counterCells; CounterCell a; // 将 baseCount 计数与所有 cell 计数累加 long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum;}
public V put(K key, V value) { Segment s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 计算出 segment 下标 int j = (hash >>> segmentShift) & segmentMask; // 获得 segment 对象, 判断是否为 null, 是则创建该 segment if ((s = (Segment)UNSAFE.getObject (segments, (j << SSHIFT) + Sbase)) == null) { // 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null, // 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性 s = ensureSegment(j); } // 进入 segment 的put 流程 return s.put(key, hash, value, false);}
segment 继承了可重入锁(ReentrantLock),它的 put 方法为
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 尝试加锁 HashEntry node = tryLock() ? null : // 如果不成功, 进入 scanAndLockForPut 流程 // 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程 // 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来 scanAndLockForPut(key, hash, value); // 执行到这里 segment 已经被成功加锁, 可以安全执行 V oldValue; try { HashEntry[] tab = table; int index = (tab.length - 1) & hash; HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { if (e != null) { // 更新 K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // 新增 // 1) 之前等待锁时, node 已经被创建, next 指向链表头 if (node != null) node.setNext(first); else // 2) 创建新 node node = new HashEntry(hash, key, value, first); int c = count + 1; // 3) 扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // 将 node 作为链表头 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue;}
2.4.3、rehash
发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全
private void rehash(HashEntry node) { HashEntry[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry[] newTable = (HashEntry[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; // 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用 for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 剩余节点需要新建 for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry(h, p.key, v, n); } } } } // 扩容完成, 才加入新的节点 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; // 替换为新的 HashEntry table table = newTable;}
2.4.4、get
get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新表取内容
public V get(Object key) { Segment s; // manually integrate access methods to reduce overhead HashEntry[] tab; int h = hash(key); // u 为 segment 对象在数组中的偏移量 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + Sbase; // s 即为 segment if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + Tbase); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null;}
public int size() { // Try a few times to get accurate count、On failure due to // continuous async changes in table, resort to locking. final Segment[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { // 超过重试次数, 需要创建所有 segment 并加锁 for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size;}