final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
コードは長いので、セクションに分けて分析しましょう。最初に各部分の内容を紹介します
パート 1:for
ループ前のコードは主にスレッドのハッシュ値を取得するためのもので、0の場合は強制的に初期化されます
後半部分: for
ループの最初の if
ステートメント、Cell
配列を蓄積および展開します
パート 3: for
ループの最初の else if
ステートメント、この部分の機能は、Cell
配列を作成し、
パート 4 を初期化することです。 for
ループ 2 番目の else if
ステートメントでは、Cell
配列が激しい競合状態にあるときに、base
int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; // true表示没有竞争 } boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容
この部分のコア コードは getProbe
メソッドです。このメソッドの機能は hash# を取得することです。 ## スレッドの値 (後でビット操作を通じて # を見つけるのに便利です) ##Cell
配列内の特定の位置が 0
の場合、強制的に初期化されます
Cell 配列の初期化
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 省略... } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 获取锁 boolean init = false; // 初始化标志 try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; // 创建Cell数组 rs[h & 1] = new Cell(x); // 索引1位置创建Cell元素,值为x=1 cells = rs; // cells指向新数组 init = true; // 初始化完成 } } finally { cellsBusy = 0; // 释放锁 } if (init) break; // 跳出循环 } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
配列は null
であるため、最初の else if
が入力されます。ステートメントが実行され、他のスレッドは動作しないため、 cellsBusy==0
, cells==as
も true
, casCellsBusy()
cellsBusy
で cas
操作を実行して、それを 1
も true
に変更してみてください。 最初に 2 つの要素を持つ
配列を作成し、次にインデックス 1
の位置でスレッド h & 1
のビット演算を使用します。 value
が 1
である Cell
を設定し、それを cells
に再割り当てし、初期化が成功したことをマークし、cellsBusy を変更します。
0
はロックの解放を意味し、最終的にループから抜け出して初期化操作が完了します。 Accumulatebase
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 省略... } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 省略... } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
ステートメントは、Cell
配列内のすべての位置の競争が激しい場合に、次のことを試みることを意味します。 base
に蓄積します。これは最終的な保証として理解できますCell 配列が初期化された後、
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // as初始化之后满足条件 if ((a = as[(n - 1) & h]) == null) { // as中某个位置的值为null if (cellsBusy == 0) { // Try to attach new Cell 是否加锁 Cell r = new Cell(x); // Optimistically create 创建新Cell if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁 boolean created = false; // try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 重新检查该位置是否为null rs[j] = r; // 该位置添加Cell元素 created = true; // 新Cell创建成功 } } finally { cellsBusy = 0; // 释放锁 } if (created) break; // 创建成功,跳出循环 continue; // Slot is now non-empty } } collide = false; // 扩容标志 } else if (!wasUncontended) // 上面定位到的索引位置的值不为null wasUncontended = true; // 重新计算hash,重新定位其他索引位置重试 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试在该索引位置进行累加 break; else if (n >= NCPU || cells != as) // 如果数组长度大于等于CPU核心数,就不能在扩容 collide = false; // At max size or stale else if (!collide) // 数组长度没有达到最大值,修改扩容标志可以扩容 collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁 try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; // 创建一个原来长度2倍的数组 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 把原来的元素拷贝到新数组中 cells = rs; // cells指向新数组 } } finally { cellsBusy = 0; // 释放锁 } collide = false; // 已经扩容完成,修改标志不用再扩容 continue; // Retry with expanded table } h = advanceProbe(h); // 重新获取hash值 } // 省略... }
である場合、これはその位置で操作を実行できることを意味し、新しい ## を作成します#Cell を指定し、値を初期化します この位置に
1 を配置します。失敗した場合は、
hash 値を再計算して、再試行してください。 #配置された位置にはすでに値があります。これは、スレッド間で競合があることを示します。
wasUncontended が
false
を再計算します。 hashtry again
配置された位置には値があり、
wasUncontended はすでに
true
#蓄積が失敗した場合は、アレイの容量が最大値に達しているかどうかを確認し、最大値に達している場合は拡張できず、再ハッシュ
して再試行するしかありません
が false
としてマークされている場合は、それを
try again最初にロックを試行します。成功した場合、拡張操作が実行されます。各拡張長は次のとおりです。
2
を前の配列の倍数にしてから、元の配列の内容を新しい配列にコピーします。拡張操作は完了です。
以上がJava並行プログラミングにおけるLongAdderの実行ステータスは何ですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。