final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
The code is long, let’s analyze it in sections, first introduce the content of each part
Part one:for
The code before the loop is mainly to get the hash value of the thread. If it is 0, it will be forced to initialize
The second part: for
The first # in the loop ##if Statement, accumulate and expand the
Cell array
forThe first in the loop
else if statement, the function of this part is to create the
Cell array and initialize the
for loop In the second
else if statement, when the
Cell array is in fierce competition, try to accumulate on
base
int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; // true表示没有竞争 } boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容
getProbe method. The function of this method is to obtain the
hash value of the thread, which is convenient for locating # through bit operations later. ##Cell
A certain position in the array, if it is 0
, it will be forced initializationInitialize the Cell array
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 省略... } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 获取锁 boolean init = false; // 初始化标志 try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; // 创建Cell数组 rs[h & 1] = new Cell(x); // 索引1位置创建Cell元素,值为x=1 cells = rs; // cells指向新数组 init = true; // 初始化完成 } } finally { cellsBusy = 0; // 释放锁 } if (init) break; // 跳出循环 } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
null, so it will enter the first
else if statement, and no other thread will operate, so
cellsBusy==0,
cells==as is also
true,
casCellsBusy()try to perform
cas operation on
cellsBusy and change it to
1 is also
true.
First creates a
Cell
h & 1 at the position of index
1 Set a
Cell whose
value is
1, and then reassign it to
cells, mark the initialization successfully, and modify
cellsBusy
0 means releasing the lock, finally jumping out of the loop, and the initialization operation is completed.
Accumulate base
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 省略... } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 省略... } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
Cell array is fierce, Just try to accumulate on
base, which can be understood as the final guarantee
After the Cell array is initialized
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // as初始化之后满足条件 if ((a = as[(n - 1) & h]) == null) { // as中某个位置的值为null if (cellsBusy == 0) { // Try to attach new Cell 是否加锁 Cell r = new Cell(x); // Optimistically create 创建新Cell if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁 boolean created = false; // try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 重新检查该位置是否为null rs[j] = r; // 该位置添加Cell元素 created = true; // 新Cell创建成功 } } finally { cellsBusy = 0; // 释放锁 } if (created) break; // 创建成功,跳出循环 continue; // Slot is now non-empty } } collide = false; // 扩容标志 } else if (!wasUncontended) // 上面定位到的索引位置的值不为null wasUncontended = true; // 重新计算hash,重新定位其他索引位置重试 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试在该索引位置进行累加 break; else if (n >= NCPU || cells != as) // 如果数组长度大于等于CPU核心数,就不能在扩容 collide = false; // At max size or stale else if (!collide) // 数组长度没有达到最大值,修改扩容标志可以扩容 collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁 try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; // 创建一个原来长度2倍的数组 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 把原来的元素拷贝到新数组中 cells = rs; // cells指向新数组 } } finally { cellsBusy = 0; // 释放锁 } collide = false; // 已经扩容完成,修改标志不用再扩容 continue; // Retry with expanded table } h = advanceProbe(h); // 重新获取hash值 } // 省略... }
Cell and initialize the value Place
1 at this position. If it fails, recalculate the
hash value and try again.
false, change it to
true and recalculate
hashtry again
true, so try to accumulate at that position
false, change it to
true, indicating that expansion can be performed, and then
rehashtry again
The above is the detailed content of What is the execution status of LongAdder in Java concurrent programming?. For more information, please follow other related articles on the PHP Chinese website!