1. DelayQueue 遅延無制限ブロッキング キュー
DelayQueue の使用法と原理について話すときは、まず DelayQueue について説明します。遅延が経過した場合にのみ要素を抽出できるブロッキング キュー。キューの先頭は、遅延の期限が切れた後、最も長く保存される Delayed 要素です。 (推奨される調査: Java インタビューの質問 )
DelayQueue ブロッキング キューは、システム開発でもよく使用されます。たとえば、キャッシュ システムの設計、キャッシュ内のオブジェクトは、アイドル時間はキャッシュから移動する必要があるため、タスク スケジューリング システムはタスクの実行時間を正確に把握できます。タイムクリティカルな大量のデータをスレッド経由で処理する必要がある場合があります。
通常のスレッドを使用する場合、すべてのオブジェクトを走査し、データの有効期限が切れているかどうかなどを 1 つずつ確認する必要があります。第一に、実行効率はあまり高くありません。第二に、デザインスタイルもデータの精度に大きく影響します。 12:00 に実行する必要があるタスクは 12:01 まで実行されない可能性があり、これはデータ要件が高いシステムにとっては大きなデメリットとなります。これから DelayQueue を使用することができます。
以下では、DelayQueue について説明し、次に例を示します。また、Delayed インターフェイスの実装とサンプル コードを提供します。 DelayQueue は、特殊なパラメータが Delayed である BlockingQueue です。
(BlockingQueue を知らない学生は、BlockingQueue について学んでからこの記事を読んでください) Delayed は Comparable インターフェイスを拡張します 比較ベンチマークは遅延時間の値です 実装クラス getDelay の戻り値遅延インターフェイスの値は固定値である必要があります (最終)。 DelayQueue は、PriorityQueue を使用して内部的に実装されます。
DelayQueue=BlockingQueue+PriorityQueue+Delayed
DelayQueue の主要な要素は、BlockingQueue、PriorityQueue、および Delayed です。 DelayQueueは、プライオリティキュー(PriorityQueue)を用いて実装されたBlockingQueueと言え、プライオリティキューの比較基準値は時間である。
それらの基本的な定義は次のとおりです。
public interface Comparable<T> { public int compareTo(T o); } public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { private final PriorityQueue<E> q = new PriorityQueue<E>(); }
DelayQueue の内部実装では、優先キューが使用されます。 DelayQueueのofferメソッドが呼び出されると、Delayedオブジェクトがプライオリティキューqに追加されます。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } }
DelayQueueのtakeメソッドは優先キューqの先頭を取り出し(ピーク)、遅延閾値に達していない場合は待機処理を行います。次のとおりです:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); //wake up other takers return x; } } } } finally { lock.unlock(); } }
DelayQueue インスタンス アプリケーション
Ps: 呼び出し動作を行うには、DelayDeque に格納されている要素は Delayed インターフェイスを継承する必要があります。 Delayed インターフェイスはオブジェクトを遅延オブジェクトにし、DelayQueue クラスに格納されているオブジェクトにアクティブ化日を与えます。このインターフェイスは、次の 2 つのメソッドを強制します。
以下では、Delay を使用してキャッシュを実装します。これには、Pair、DelayItem、および Cache の合計 3 つのクラスが含まれています。
Pair クラス:
public class Pair<K, V> { public K first; public V second; public Pair() { } public Pair(K first, V second) { this.first = first; this.second = second; } }
次は、Delay の実装です。インターフェイス:
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class DelayItem<T> implements Delayed { /** * Base of nanosecond timings, to avoid wrapping */ private static final long NANO_ORIGIN = System.nanoTime(); /** * Returns nanosecond time offset by origin */ final static long now() { return System.nanoTime() - NANO_ORIGIN; } /** * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied * entries. */ private static final AtomicLong sequencer = new AtomicLong(0); /** * Sequence number to break ties FIFO */ private final long sequenceNumber; /** * The time the task is enabled to execute in nanoTime units */ private final long time; private final T item; public DelayItem(T submit, long timeout) { this.time = now() + timeout; this.item = submit; this.sequenceNumber = sequencer.getAndIncrement(); } public T getItem() { return this.item; } public long getDelay(TimeUnit unit) { long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); return d; } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof DelayItem) { DelayItem x = (DelayItem) other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ?0 :((d < 0) ?-1 :1); } }
以下は、put メソッドと get メソッドを含む Cache の実装です。
import javafx.util.Pair; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class Cache<K, V> { private static final Logger LOG = Logger.getLogger(Cache.class.getName()); private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>(); private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>(); private Thread daemonThread; public Cache() { Runnable daemonTask = new Runnable() { public void run() { daemonCheck(); } }; daemonThread = new Thread(daemonTask); daemonThread.setDaemon(true); daemonThread.setName("Cache Daemon"); daemonThread.start(); } private void daemonCheck() { if (LOG.isLoggable(Level.INFO)) LOG.info("cache service started."); for (; ; ) { try { DelayItem<Pair<K, V>> delayItem = q.take(); if (delayItem != null) { // 超时对象处理 Pair<K, V> pair = delayItem.getItem(); cacheObjMap.remove(pair.first, pair.second); // compare and remove } } catch (InterruptedException e) { if (LOG.isLoggable(Level.SEVERE)) LOG.log(Level.SEVERE, e.getMessage(), e); break; } } if (LOG.isLoggable(Level.INFO)) LOG.info("cache service stopped."); } // 添加缓存对象 public void put(K key, V value, long time, TimeUnit unit) { V oldValue = cacheObjMap.put(key, value); if (oldValue != null) q.remove(key); long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit); q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime)); } public V get(K key) { return cacheObjMap.get(key); } }
メイン メソッドをテストします:
// 测试入口函数 public static void main(String[] args) throws Exception { Cache<Integer, String> cache = new Cache<Integer, String>(); cache.put(1, "aaaa", 3, TimeUnit.SECONDS); Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } }
出力結果は:
aaaa null
上記の結果が表示されますが、遅延時間を超えるとキャッシュ内のデータは自動的に失われ、結果はnullになります。
2. 同時 (コレクション) キュー - ノンブロッキング キュー
ノンブロッキング キュー
まず、次のことを行う必要があります。次に、ノンブロッキング キューとは何ですか:
ブロッキング キューとは対照的に、ノンブロッキング キューの実行は、コンシューマのデキューであっても、プロデューサーの参加であっても、ブロックされません。内部では、ノンブロッキング キューは CAS (比較およびスワップ) を使用して、ノンブロッキング スレッドの実行を実現します。
ノンブロッキング キューの簡単な操作
ノンブロッキング キューの一般的な方法は、ブロッキング キューと同じように、デキューとエンキューです。
offer(): キュー インターフェイスから継承されたメソッドで、スレッドの実行を妨げることなくキュー エントリ操作を実装します。挿入が成功した場合は true を返します。デキュー メソッド:
poll(): ヘッド ノード ポインターを移動し、ヘッド ノード要素を返し、ヘッド ノード要素をデキューします。キューが空の場合は、null を返します。
peek(): ヘッド ノード ポインターを移動し、戻りますヘッド ノード要素 、ヘッド ノード要素はデキューされません; キューが空の場合、null が返されます;
3. ノンブロッキング アルゴリズム CAS
まず悲観的ロックと楽観的ロックを理解する必要があります
悲観的ロック: 同時実行環境が悲観的であると仮定します。同時実行の競合が発生すると一貫性が破壊されるため、競合は次の方法で完全に禁止する必要があります。排他的なロック。 「ドアに鍵をかけないと、トラブルメーカーが侵入して混乱を招く」という古典的な比喩があります。そのため、「ドアを開けて中に入れるのは一度に 1 人だけなので、安全な状態を保つことができます」彼に注目してください。」
オプティミスティック ロック: 同時実行環境が楽観的であると仮定します。つまり、同時実行の競合は発生しますが、その競合は発見でき、損害は発生しないため、保護を追加することはできず、あきらめることを決定します。操作を実行するか、同時実行性の競合を検出した後で再試行してください。たとえて言えば、「ドアに鍵をかけなければ、問題を起こす人たちが侵入してきますが、彼らはあなたを破滅させたいかどうか知っているでしょう。」 したがって、「全員を中に入れて、彼らがそれを知るまで待つことができます」壊したい。「決断せよ」。
一般に、特に一部の複雑なシナリオでは、楽観的ロックのパフォーマンスが悲観的ロックのパフォーマンスよりも高いと考えられています。これは主に、悲観的ロックはロック中に損傷を引き起こさない特定の操作も保護するためですが、楽観的ロックの競合は最小の同時実行競合でのみ発生します。悲観的ロックを使用して理解すると、それは「ロックの粒度が最も小さい」です。 ”。ただし、楽観的ロックの設計はより複雑であることが多いため、悲観的ロックは複雑なシナリオで使用されることがよくあります。まず正確性を確保し、必要に応じてパフォーマンスを追求します。
オプティミスティック ロックの実装には、多くの場合ハードウェア サポートが必要です。ほとんどのプロセッサは、「比較と交換」のセマンティクスを実装する CAS 命令を実装しています (ここでのスワップは「スワップ イン」、つまりセットを意味します)。基本的な楽観的ロックを形成します。 CAS には 3 つのオペランドが含まれます。
読み書きされるメモリ位置 V
比較される値 A
書き込まれる新しい値 B
位置 V の値が A に等しい場合に限り、CAS は位置 V の値を新しい値 B でアトミックに更新します; それ以外の場合は操作は実行されません。位置 V の値が A に等しいかどうかに関係なく、V の元の値が返されます。興味深い事実は、「CAS を使用して同時実行性を制御する」ことは、「楽観的ロックを使用する」ことと同等ではないということです。 CAS は、楽観的ロックと悲観的ロックの両方を実現する手段にすぎません。楽観主義と悲観主義は、単なる同時実行制御戦略にすぎません。
以上がJava マルチスレッドおよび同時実行に関する面接の質問 (1 ~ 3 つの質問と回答付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。