>Java >java지도 시간 >Java 동시 프로그래밍에서 스레드 풀을 사용하는 방법에 대한 자세한 설명

Java 동시 프로그래밍에서 스레드 풀을 사용하는 방법에 대한 자세한 설명

黄舟
黄舟원래의
2017-05-28 09:25:011555검색

다음 편집기에서는 Java 동시성프로그래밍_스레드 풀 사용 방법(자세한 설명)에 대한 기사를 제공합니다. 에디터가 꽤 좋다고 생각해서 지금 공유해서 참고용으로 올려보겠습니다. 에디터를 따라 살펴보겠습니다

1. 작업과 실행 전략의 암묵적 결합

Executor는 작업 제출과 작업 실행 전략을 분리할 수 있습니다

작업이 동일한 유형이고 있을 때만 가능합니다. 실행 시간의 차이가 거의 없어도 최대 성능을 얻을 수 있습니다. 그렇지 않으면 오래 걸리는 작업과 짧은 작업이 동일한 스레드 풀에 배치되면 스레드 풀이 매우 크지 않으면 교착 상태와 같은 문제가 발생합니다

1 .Thread starvation 교착 상태

는 다음과 유사합니다. 두 작업을 단일 스레드 풀에 제출하고 두 작업이 서로 의존하며 한 작업이 다른 작업을 기다리면 성능이 저하됩니다. 풀이 충분하지 않다는 것

정의: 작업은 풀에 있는 다른 작업의 실행 결과를 기다려야 하며 기아 교착 상태가 발생할 수 있습니다

2. 스레드 풀 크기

참고: 크기 스레드 풀에는 다른 리소스 풀과 같은 다른 제한 사항도 적용됩니다: 데이터베이스 연결 풀

각 작업이 연결인 경우 스레드 풀의 크기는 데이터베이스 연결 풀의 크기에 따라 제한됩니다

3. ThreadPoolExecutor 스레드 풀을 구성합니다

인스턴스:

1. Executor의 팩토리 메서드를 통해 일부 기본 구현을 반환합니다.

2 ThreadPoolExecutor(....)를 인스턴스화하여 구현을 사용자 정의합니다. 작업이 도착하고 스레드 풀이 가득 차면 작업은 대기열에서 대기합니다. 작업이 무한히 도달하면 대기열은 무한히 확장됩니다

예: 이는 싱글톤 및 고정 크기 스레드 풀에 사용되는 것입니다2 .바운드 큐:

새 작업이 도착하고 큐가 가득 찬 경우 포화 전략

을 사용합니다. 3. 동기식 핸드오버:

스레드 풀이 큰 경우 넣기 후 핸드오버가 지연됩니다. 작업 생성자가 곧 작업을 대기열에 추가하게 되면 SynchronousQueue가 작업을 작업자 스레드에 직접 전달합니다메커니즘: 작업을 넣으면 수락 대기 중인 스레드가 있어야 합니다. 그렇지 않다면

New

스레드, 스레드가 포화 상태이면 작업을 거부합니다예: CacheThreadPool이 사용된 전략입니다

포화 전략:

setRejectedExecutionHan

dl

er 포화 전략 수정

1. 중단(기본값):

예외 발생발신자에 의해 처리됨

2. DiscardOldest: 가장 오래된 작업을 삭제합니다. priority대기열은 가장 높은 작업

4.CallerRuns: 롤백 작업, 호출자 스레드가 자체적으로 처리합니다

4 Thread Factory ThreadFactoy

언제든지. 스레드가 생성됩니다: 실제로는 스레드 팩토리가

완료하는 데 사용됩니다. 사용자 정의 스레드 팩토리: implements ThreadFactory

스레드 팩토리의

동작을 사용자 정의할 수 있습니다. UncaughtException과 같은 Handler 등

public class MyAppThread extends Thread {
  public static final String DEFAULT_NAME = "MyAppThread";
  private static volatile boolean debugLifecycle = false;
  private static final AtomicInteger created = new AtomicInteger();
  private static final AtomicInteger alive = new AtomicInteger();
  private static final Logger log = Logger.getAnonymousLogger();

  public MyAppThread(Runnable r) {
    this(r, DEFAULT_NAME);
  }

  public MyAppThread(Runnable runnable, String name) {
    super(runnable, name + "-" + created.incrementAndGet());
    //设置该线程工厂创建的线程的 未捕获异常的行为
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
      public void uncaughtException(Thread t,
                     Throwable e) {
        log.log(Level.SEVERE,
            "UNCAUGHT in thread " + t.getName(), e);
      }
    });
  }

  public void run() {
    // Copy debug flag to ensure consistent value throughout.
    boolean debug = debugLifecycle;
    if (debug) log.log(Level.FINE, "Created " + getName());
    try {
      alive.incrementAndGet();
      super.run();
    } finally {
      alive.decrementAndGet();
      if (debug) log.log(Level.FINE, "Exiting " + getName());
    }
  }

  public static int getThreadsCreated() {
    return created.get();
  }

  public static int getThreadsAlive() {
    return alive.get();
  }

  public static boolean getDebug() {
    return debugLifecycle;
  }

  public static void setDebug(boolean b) {
    debugLifecycle = b;
  }
}

5. ThreadPoolExecutor 확장

사용자 정의 하위 클래스로 재정의할 수 있는 메서드:

1.afterExecute: 종료 후 RuntimeException이 발생하면 메서드가 실행되지 않습니다. 2.beforeExecute: 시작하기 전에 RuntimeException이 발생하면 작업이 실행되지 않습니다.3.terminating: 스레드 풀이 닫히면 리소스 해제 등에 사용할 수 있습니다.

2. recursive 알고리즘

의 병렬화

1.循环

在循环中,每次循环操作都是独立的

//串行化
  void processSequentially(List<Element> elements) {
    for (Element e : elements)
      process(e);
  }
  //并行化
  void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
      exec.execute(new Runnable() {
        public void run() {
          process(e);
        }
      });
  }

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

//串行 计算compute 和串行迭代
  public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
    for (Node<T> n : nodes) {
      results.add(n.compute());
      sequentialRecursive(n.getChildren(), results);
    }
  }
  //并行 计算compute 和串行迭代
  public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
    for (final Node<T> n : nodes) {
      exec.execute(() -> results.add(n.compute()));
      parallelRecursive(exec, n.getChildren(), results);
    }
  }
  //调用并行方法的操作
  public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
      throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
  }

实例:

public class ConcurrentPuzzleSolver <P, M> {
  private final Puzzle<P, M> puzzle;
  private final ExecutorService exec;
  private final ConcurrentMap<P, Boolean> seen;
  protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

  public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
    this.puzzle = puzzle;
    this.exec = initThreadPool();
    this.seen = new ConcurrentHashMap<P, Boolean>();
    if (exec instanceof ThreadPoolExecutor) {
      ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
      tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    }
  }

  private ExecutorService initThreadPool() {
    return Executors.newCachedThreadPool();
  }

  public List<M> solve() throws InterruptedException {
    try {
      P p = puzzle.initialPosition();
      exec.execute(newTask(p, null, null));
      // 等待ValueLatch中闭锁解开,则表示已经找到答案
      PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
      return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
    } finally {
      exec.shutdown();//最终主线程关闭线程池
    }
  }

  protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
    return new SolverTask(p, m, n);
  }

  protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
    SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
      super(pos, move, prev);
    }
    public void run() {
      //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
      //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
      if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
        return; // already solved or seen this position
      }
      if (puzzle.isGoal(pos)) {
        solution.setValue(this);
      } else {
        for (M m : puzzle.legalMoves(pos))
          exec.execute(newTask(puzzle.move(pos, m), m, this));
      }
    }
  }
}

위 내용은 Java 동시 프로그래밍에서 스레드 풀을 사용하는 방법에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.