ホームページ >Java >&#&チュートリアル >Java並行プログラミングにおけるスレッドプールの使い方を詳しく解説

Java並行プログラミングにおけるスレッドプールの使い方を詳しく解説

黄舟
黄舟オリジナル
2017-05-28 09:25:011560ブラウズ

以下のエディターは、Java同時実行プログラミング_スレッドプールの使い方(詳細な説明)に関する記事をお届けします。編集者はこれがとても良いと思ったので、参考として共有します。エディターをフォローして見てみましょう

1. タスクと実行戦略の間の暗黙的な結合

エグゼキューターはタスクの送信戦略とタスク実行戦略を分離できます

タスクが同じタイプである場合のみ、および存在する場合のみです。実行時間にほとんど差がない場合、最大のパフォーマンスを達成できます。 そうしないと、長時間消費するタスクと短時間消費するタスクが同じスレッド プールに配置されると、スレッド プールがよほど大きくない限り、デッドロックなどの問題が発生します

1 .スレッド スターベーション デッドロック

は、2 つのタスクを単一スレッド プールに送信し、2 つのタスクが相互に依存し、一方のタスクがもう一方のタスクを待機すると、デッドロックが発生します。プールが十分ではないこと

定義: タスクはプール内の他のタスクの実行結果を待つ必要があり、飢餓デッドロックが発生する可能性があります

2. スレッドプールのサイズ

注: サイズスレッド プールのサイズには、他のリソース プールなどの制限も適用されます: データベース接続プール

各タスクが接続の場合、スレッド プールのサイズはデータベース接続プールのサイズによって制限されます

。 3. ThreadPoolExecutor スレッド プールを構成します

インスタンス:

1. Executor のファクトリ メソッドを通じていくつかのデフォルト実装を返します

2. ThreadPoolExecutor(....) をインスタンス化して実装をカスタマイズします。タスクが到着し、スレッド プールがいっぱいになると、タスクはキュー内で待機します。タスクが無限に達すると、キューは無限に拡張されます

例: これは、シングルトンと固定サイズのスレッド プールに使用されるものです2境界付きキュー:

新しいタスクが到着し、キューがいっぱいの場合は、 飽和戦略を使用します

3. 同期ハンドオーバー:

スレッド プールが大きい場合、配置後のハンドオーバーに遅延が発生します。タスクプロデューサーがすぐにタスクをキューに入れる場合、SynchronousQueueはタスクをワーカースレッドに直接渡しますメカニズム: タスクを入れると、受け入れを待機しているスレッドが存在する必要があります。そうでない場合は、

新しい

スレッド、スレッドが飽和している場合はタスクを拒否します例: CacheThreadPoolが使用される戦略です

飽和戦略:

setRejectedExecutionHan

dl

er飽和戦略を変更します

1. 中止中止 (デフォルト):

例外をスローします呼び出し元によって処理されます

2. DiscardOldest: 最も古いタスクを破棄します。注意: 優先度キューは優先度が最も高いタスクを破棄します

4.CallerRuns:ロールバックタスク、呼び出し元のスレッドがそれ自体を処理します

4.スレッドファクトリーThreadFactoy

いつでもスレッドが作成されます: 実際には、と呼ばれます スレッド ファクトリは、完了するために使用されます

カスタム スレッド ファクトリ: ThreadFactory を実装します

スレッド ファクトリの

動作 をカスタマイズできます: UncaughtException などハンドラーなど

public class MyAppThread extends Thread {
  public static final String DEFAULT_NAME = "MyAppThread";
  private static volatile boolean debugLifecycle = false;
  private static final AtomicInteger created = new AtomicInteger();
  private static final AtomicInteger alive = new AtomicInteger();
  private static final Logger log = Logger.getAnonymousLogger();

  public MyAppThread(Runnable r) {
    this(r, DEFAULT_NAME);
  }

  public MyAppThread(Runnable runnable, String name) {
    super(runnable, name + "-" + created.incrementAndGet());
    //设置该线程工厂创建的线程的 未捕获异常的行为
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
      public void uncaughtException(Thread t,
                     Throwable e) {
        log.log(Level.SEVERE,
            "UNCAUGHT in thread " + t.getName(), e);
      }
    });
  }

  public void run() {
    // Copy debug flag to ensure consistent value throughout.
    boolean debug = debugLifecycle;
    if (debug) log.log(Level.FINE, "Created " + getName());
    try {
      alive.incrementAndGet();
      super.run();
    } finally {
      alive.decrementAndGet();
      if (debug) log.log(Level.FINE, "Exiting " + getName());
    }
  }

  public static int getThreadsCreated() {
    return created.get();
  }

  public static int getThreadsAlive() {
    return alive.get();
  }

  public static boolean getDebug() {
    return debugLifecycle;
  }

  public static void setDebug(boolean b) {
    debugLifecycle = b;
  }
}

5. ThreadPoolExecutorの拡張

カスタムサブクラスでオーバーライドできるメソッド:

1.afterExecute: 終了後、RuntimeExceptionがスローされた場合、メソッドは実行されません 2.beforeExecute: 開始前に、RuntimeExceptionがスローされた場合、タスクは実行されません3.terminated: スレッドプールが閉じられると、リソースの解放などに使用できます

2. 再帰アルゴリズムの並列化

1.循环

在循环中,每次循环操作都是独立的

//串行化
  void processSequentially(List<Element> elements) {
    for (Element e : elements)
      process(e);
  }
  //并行化
  void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
      exec.execute(new Runnable() {
        public void run() {
          process(e);
        }
      });
  }

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

//串行 计算compute 和串行迭代
  public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
    for (Node<T> n : nodes) {
      results.add(n.compute());
      sequentialRecursive(n.getChildren(), results);
    }
  }
  //并行 计算compute 和串行迭代
  public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
    for (final Node<T> n : nodes) {
      exec.execute(() -> results.add(n.compute()));
      parallelRecursive(exec, n.getChildren(), results);
    }
  }
  //调用并行方法的操作
  public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
      throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
  }

实例:

public class ConcurrentPuzzleSolver <P, M> {
  private final Puzzle<P, M> puzzle;
  private final ExecutorService exec;
  private final ConcurrentMap<P, Boolean> seen;
  protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

  public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
    this.puzzle = puzzle;
    this.exec = initThreadPool();
    this.seen = new ConcurrentHashMap<P, Boolean>();
    if (exec instanceof ThreadPoolExecutor) {
      ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
      tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    }
  }

  private ExecutorService initThreadPool() {
    return Executors.newCachedThreadPool();
  }

  public List<M> solve() throws InterruptedException {
    try {
      P p = puzzle.initialPosition();
      exec.execute(newTask(p, null, null));
      // 等待ValueLatch中闭锁解开,则表示已经找到答案
      PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
      return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
    } finally {
      exec.shutdown();//最终主线程关闭线程池
    }
  }

  protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
    return new SolverTask(p, m, n);
  }

  protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
    SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
      super(pos, move, prev);
    }
    public void run() {
      //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
      //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
      if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
        return; // already solved or seen this position
      }
      if (puzzle.isGoal(pos)) {
        solution.setValue(this);
      } else {
        for (M m : puzzle.legalMoves(pos))
          exec.execute(newTask(puzzle.move(pos, m), m, this));
      }
    }
  }
}

以上がJava並行プログラミングにおけるスレッドプールの使い方を詳しく解説の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。