Java同時実行コレクションの詳細な説明

零下一度
リリース: 2017-06-17 11:35:55
オリジナル
1438 人が閲覧しました

この記事では主に Java 同時実行コレクション ConcurrentLinkedQueue を紹介します。必要な方は、

ConcurrentLinkedQueue の紹介を参照してください。

ConcurrentLinkedQueue は、「高同時実行」に適したスレッドセーフなキューです。シナリオ。

これは、FIFO (先入れ先出し) 原則に従って要素を並べ替える、リンク ノードに基づく無制限のスレッドセーフ キューです。 Null 要素をキュー要素に配置することはできません (内部的に実装された特別なノードを除く)。

ConcurrentLinkedQueueの原理とデータ構造

ConcurrentLinkedQueueのデータ構造は以下の図に示すとおりです。

説明:

1. ConcurrentLinkedQueueはAbstractQueueを継承します。

2. ConcurrentLinkedQueue はリンク リストを通じて内部的に実装されます。これには、リンクされたリストの先頭ノード head と末尾ノード tail の両方が含まれます。 ConcurrentLinkedQueue は、FIFO (先入れ先出し) 原則に従って要素を並べ替えます。要素はリンク リストの末尾から挿入され、先頭から返されます。

3. ConcurrentLinkedQueue のリンク リスト ノードの次の型は揮発性であり、リンク リスト データ項目の型も揮発性です。 volatile に関しては、そのセマンティクスに次のようなことがわかっています。「つまり、volatile 変数を読み取ると、(どのスレッドでも) この volatile 変数への最後の書き込みを常に確認できます。」 ConcurrentLinkedQueue は volatile を使用して、複数のスレッドによる競合リソースへの相互排他的アクセスを実現します。

ConcurrentLinkedQueue関数一覧


// 创建一个最初为空的 ConcurrentLinkedQueue。 ConcurrentLinkedQueue() // 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。 ConcurrentLinkedQueue(Collection c) // 将指定元素插入此队列的尾部。 boolean add(E e) // 如果此队列包含指定元素,则返回 true。 boolean contains(Object o) // 如果此队列不包含任何元素,则返回 true。 boolean isEmpty() // 返回在此队列元素上以恰当顺序进行迭代的迭代器。 Iterator iterator() // 将指定元素插入此队列的尾部。 boolean offer(E e) // 获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek() // 获取并移除此队列的头,如果此队列为空,则返回 null。 E poll() // 从队列中移除指定元素的单个实例(如果存在)。 boolean remove(Object o) // 返回此队列中的元素数量。 int size() // 返回以恰当顺序包含此队列所有元素的数组。 Object[] toArray() // 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。  T[] toArray(T[] a)
ログイン後にコピー

以下では、ConcurrentLinkedQueueを作成、追加、削除の側面から分析します。

1

の作成 以下はConcurrentLinkedQueue()を使って説明します。


public ConcurrentLinkedQueue() { head = tail = new Node(null); }
ログイン後にコピー

説明:コンストラクターでは、新しい「nullコンテンツを持つノード」が作成され、headとtailの値が新しいノードとして設定されます。

head と tail は次のように定義されます:


private transient volatile Node head; private transient volatile Node tail;
ログイン後にコピー

head と tail はどちらも volatile 型であり、volatile によって与えられる意味を持ちます: 「つまり、volatile 変数を読み取ると、常に (任意のスレッドを) "最後に見ることができます"揮発性変数に書き込みます。」

Node の宣言は次のとおりです:


private static class Node { volatile E item; volatile Node next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
ログイン後にコピー

説明:

Node は一方向のリンク リスト ノードであり、next は次のノードを指すために使用され、item はデータを格納するために使用されます。 Node でノード データを操作するための API はすべて、Unsafe メカニズムの CAS 関数を通じて実装されます。たとえば、casNext() は CAS 関数を通じて「ノードの次のノードを比較して設定します」。

2. 追加

以下では、例として add(E e) を使用して ConcurrentLinkedQueue での追加を説明します。


public boolean add(E e) { return offer(e); }
ログイン後にコピー

説明: add() は実際に Offer() を呼び出して、追加操作を完了します。

offer() のソースコードは次のとおりです:


public boolean offer(E e) { // 检查e是不是null,是的话抛出NullPointerException异常。 checkNotNull(e); // 创建新的节点 final Node newNode = new Node(e); // 将“新的节点”添加到链表的末尾。 for (Node t = tail, p = t;;) { Node q = p.next; // 情况1:q为空 if (q == null) { // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。 // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。 // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。 if (p.casNext(null, newNode)) { if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } // 情况2:p和q相等 else if (p == q) p = (t != (t = tail)) ? t : head; // 情况3:其它 else p = (p != t && t != (t = tail)) ? t : q; } }
ログイン後にコピー

説明: offer(E e) の機能は、要素 e をリンクされたリストの末尾に追加することです。 Offer() の比較は、for ループを理解するために、3 つの状況を区別して分析してみましょう。

ケース 1 -- q が空です。これは、q が末尾ノードの次のノードであることを意味します。このとき、p.casNext(null, newNode) で「p の次のノードを newNode」に設定します。設定が成功したら、「p と t」を比較します (p が t に等しくない場合は、newNode を新しい末尾ノードに設定します)。 )、その後 true を返します。それ以外の場合 (「他のスレッドが末尾ノードを変更した」ことを意味します)、何もせず、for ループを続行します。

p.casNext(null, newNode) は、p を操作するために CAS を呼び出します。 「p の次のノードが null に等しい」場合は、「p の次のノードが newNode に等しい」を設定します。設定が成功した場合は true を返し、失敗した場合は false を返します。

ケース 2 -- p と q が等しい。これはいつ起こりますか? 「ケース 3」を通じて、「ケース 3」の処理後、p の値が q に等しくなる可能性があることがわかります。

この時点で、末尾ノードが変化していない場合は、ヘッド ノードが変化しているはずです。p をヘッド ノードとして設定し、リンクされたリストを再度走査します。そうでない場合は (末尾ノードが変化した場合)、p を次のように設定します。テールノード。

ケース 3 -- その他。

p = (p != t && t != (t = tail)) ? t : q; を次のコードに変換します。


if (p==t) { p = q; } else { Node tmp=t; t = tail; if (tmp==t) { p=q; } else { p=t; } }
ログイン後にコピー

p と t が等しい場合、p を q に設定します。それ以外の場合は、「末尾ノードが変化したかどうか」を判断し、変化がない場合は p を q に設定し、変化がない場合は p を末尾ノードに設定します。

checkNotNull() のソース コードは次のとおりです。


private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
ログイン後にコピー

3. 削除

以下では、ConcurrentLinkedQueue での削除を示す例として、poll() を使用します。


public E poll() { // 设置“标记” restartFromHead: for (;;) { for (Node h = head, p = h, q;;) { E item = p.item; // 情况1 // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话; // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。 if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 情况2 // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // 情况3 // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。 else if (p == q) continue restartFromHead; // 情况4 // 设置p为q else p = q; } } }
ログイン後にコピー

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:


final void updateHead(Node h, Node p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
ログイン後にコピー

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:


void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
ログイン後にコピー

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例


import java.util.*; import java.util.concurrent.*; /* * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。 * * 下面是“多个线程同时操作并且遍历queue”的示例 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 * * */ public class ConcurrentLinkedQueueDemo1 { // TODO: queue是LinkedList对象时,程序会出错。 //private static Queue queue = new LinkedList(); private static Queue queue = new ConcurrentLinkedQueue(); public static void main(String[] args) { // 同时启动两个线程对queue进行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “线程名” + "-" + "序号" String val = Thread.currentThread().getName()+i; queue.add(val); // 通过“Iterator”遍历queue。 printAll(); } } } }
ログイン後にコピー

(某一次)运行结果:


ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
ログイン後にコピー

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

以上がJava同時実行コレクションの詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!