• 技术文章 >Java >java教程

    java并发编程之线程池的使用方法详解

    黄舟黄舟2017-05-28 09:25:01原创843
    下面小编就为大家带来一篇java并发编程_线程池的使用方法(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    一、任务和执行策略之间的隐性耦合

    Executor可以将任务的提交和任务的执行策略解耦

    只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题

    1.线程饥饿死锁

    类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够

    定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁

    2.线程池大小

    注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池

    如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小

    3.配置ThreadPoolExecutor线程池

    实例:

    1.通过Executors的工厂方法返回默认的一些实现

    2.通过实例化ThreadPoolExecutor(.....)自定义实现

    线程池的队列

    1.无界队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张

    如:单例和固定大小的线程池用的就是此种

    2.有界队列:如果新任务到达,队列满则使用饱和策略

    3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队

    SynchronousQueue直接将任务移交给工作线程

    机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务

    如:CacheThreadPool就是使用的这种策略

    饱和策略:

    setRejectedExecutionHandler来修改饱和策略

    1.终止Abort(默认):抛出异常由调用者处理

    2.抛弃Discard

    3.抛弃DiscardOldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务

    4.CallerRuns:回退任务,有调用者线程自行处理

    4.线程工厂ThreadFactoy

    每当创建线程时:其实是调用了线程工厂来完成

    自定义线程工厂:implements ThreadFactory

    可以定制该线程工厂的行为如UncaughtExceptionHandler等

    public class MyAppThread extends Thread {
      public static final String DEFAULT_NAME = "MyAppThread";
      private static volatile boolean debugLifecycle = false;
      private static final AtomicInteger created = new AtomicInteger();
      private static final AtomicInteger alive = new AtomicInteger();
      private static final Logger log = Logger.getAnonymousLogger();
    
      public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
      }
    
      public MyAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
          public void uncaughtException(Thread t,
                         Throwable e) {
            log.log(Level.SEVERE,
                "UNCAUGHT in thread " + t.getName(), e);
          }
        });
      }
    
      public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try {
          alive.incrementAndGet();
          super.run();
        } finally {
          alive.decrementAndGet();
          if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
      }
    
      public static int getThreadsCreated() {
        return created.get();
      }
    
      public static int getThreadsAlive() {
        return alive.get();
      }
    
      public static boolean getDebug() {
        return debugLifecycle;
      }
    
      public static void setDebug(boolean b) {
        debugLifecycle = b;
      }
    }

    5.扩展ThreadPoolExecutor

    可以被自定义子类覆盖的方法:

    1.afterExecute:结束后,如果抛出RuntimeException则方法不会执行

    2.beforeExecute:开始前,如果抛出RuntimeException则任务不会执行

    3.terminated:在线程池关闭时,可以用来释放资源等

    二、递归算法的并行化

    1.循环

    在循环中,每次循环操作都是独立的

    //串行化
      void processSequentially(List<Element> elements) {
        for (Element e : elements)
          process(e);
      }
      //并行化
      void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements)
          exec.execute(new Runnable() {
            public void run() {
              process(e);
            }
          });
      }

    2.迭代

    如果每个迭代操作是彼此独立的,则可以串行执行

    如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

    //串行 计算compute 和串行迭代
      public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
        for (Node<T> n : nodes) {
          results.add(n.compute());
          sequentialRecursive(n.getChildren(), results);
        }
      }
      //并行 计算compute 和串行迭代
      public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
        for (final Node<T> n : nodes) {
          exec.execute(() -> results.add(n.compute()));
          parallelRecursive(exec, n.getChildren(), results);
        }
      }
      //调用并行方法的操作
      public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
          throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
      }

    实例:

    public class ConcurrentPuzzleSolver <P, M> {
      private final Puzzle<P, M> puzzle;
      private final ExecutorService exec;
      private final ConcurrentMap<P, Boolean> seen;
      protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
    
      public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
        this.puzzle = puzzle;
        this.exec = initThreadPool();
        this.seen = new ConcurrentHashMap<P, Boolean>();
        if (exec instanceof ThreadPoolExecutor) {
          ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
          tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
      }
    
      private ExecutorService initThreadPool() {
        return Executors.newCachedThreadPool();
      }
    
      public List<M> solve() throws InterruptedException {
        try {
          P p = puzzle.initialPosition();
          exec.execute(newTask(p, null, null));
          // 等待ValueLatch中闭锁解开,则表示已经找到答案
          PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
          return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
          exec.shutdown();//最终主线程关闭线程池
        }
      }
    
      protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
        return new SolverTask(p, m, n);
      }
    
      protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
          super(pos, move, prev);
        }
        public void run() {
          //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
          //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
          if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
            return; // already solved or seen this position
          }
          if (puzzle.isGoal(pos)) {
            solution.setValue(this);
          } else {
            for (M m : puzzle.legalMoves(pos))
              exec.execute(newTask(puzzle.move(pos, m), m, this));
          }
        }
      }
    }

    以上就是java并发编程之线程池的使用方法详解的详细内容,更多请关注php中文网其它相关文章!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:简单介绍Java成员变量与属性的区别 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • Java数据结构之单链表与OJ题• 详细介绍Java正则表达式之单字符匹配和预定义字符• Java总结分享之反射、枚举、Lambda表达式• 一起来分析java设计模式之单例• 一文搞懂Java线程池实现原理
    1/1

    PHP中文网