• 技术文章 >Java >java教程

    详解java中ThreadPoolExecutor的原理分析(附代码)

    黄舟黄舟2017-03-29 10:31:22原创841
    这篇文章主要介绍了java 中ThreadPoolExecutor原理分析的相关资料,需要的朋友可以参考下

    java 中ThreadPoolExecutor原理分析

    线程池简介

    Java线程池是开发中常用的工具,当我们有异步、并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求。

    线程池使用

    JDK中提供的线程池实现位于java.util.concurrent.ThreadPoolExecutor。在使用时,通常使用ExecutorService接口,它提供了submit,invokeAll,shutdown等通用的方法。

    在线程池配置方面,Executors类中提供了一些静态方法能够提供一些常用场景的线程池,如newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor等,这些方法最终都是调用到了ThreadPoolExecutor构造函数

    ThreadPoolExecutor的包含所有参数的构造函数是

    /**
       * @param corePoolSize the number of threads to keep in the pool, even
       *    if they are idle, unless {@code allowCoreThreadTimeOut} is set
       * @param maximumPoolSize the maximum number of threads to allow in the
       *    pool
       * @param keepAliveTime when the number of threads is greater than
       *    the core, this is the maximum time that excess idle threads
       *    will wait for new tasks before terminating.
       * @param unit the time unit for the {@code keepAliveTime} argument
       * @param workQueue the queue to use for holding tasks before they are
       *    executed. This queue will hold only the {@code Runnable}
       *    tasks submitted by the {@code execute} method.
       * @param threadFactory the factory to use when the executor
       *    creates a new thread
       * @param handler the handler to use when execution is blocked
       *    because the thread bounds and queue capacities are reached
      public ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
          maximumPoolSize <= 0 ||
          maximumPoolSize < corePoolSize ||
          keepAliveTime < 0)
          throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
          throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
      }

    实现(基于JDK1.8)

    ThreadPoolExecutor中保存的状态有

    当前线程池状态, 包括RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

    当前有效的运行线程的数量。

    将这两个状态放到一个int变量中,前三位作为线程池状态,后29位作为线程数量。

    例如0b11100000000000000000000000000001, 表示RUNNING, 一个线程。

    通过HashSet来存储工作者集合,访问该HashSet前必须先获取保护状态的mainLock:ReentrantLock

    submit、execute

    execute的执行方式为,首先检查当前worker数量,如果小于corePoolSize,则尝试add一个core Worker。线程池在维护线程数量以及状态检查上做了大量检测。

    public void execute(Runnable command) {
        int c = ctl.get();
        // 如果当期数量小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
          // 尝试增加worker
          if (addWorker(command, true))
            return;
          c = ctl.get();
        }
        // 如果线程池正在运行并且成功添加到工作队列中
        if (isRunning(c) && workQueue.offer(command)) {
          // 再次检查状态,如果已经关闭则执行拒绝处理
          int recheck = ctl.get();
          if (! isRunning(recheck) && remove(command))
            reject(command);
          // 如果工作线程都down了
          else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
        }
        else if (!addWorker(command, false))
          reject(command);
      }

    addWorker方法实现

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);
          // Check if queue empty only if necessary.
          if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
              firstTask == null &&
              ! workQueue.isEmpty()))
            return false;
          for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              wc >= (core ? corePoolSize : maximumPoolSize))
              return false;
            if (compareAndIncrementWorkerCount(c))
              break retry;
            c = ctl.get(); // Re-read ctl
            if (runStateOf(c) != rs)
              continue retry;
            // else CAS failed due to workerCount change; retry inner loop
          }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              // Recheck while holding lock.
              // Back out on ThreadFactory failure or if
              // shut down before lock acquired.
              int rs = runStateOf(ctl.get());
              if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                  throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                  largestPoolSize = s;
                workerAdded = true;
              }
            } finally {
              mainLock.unlock();
            }
            if (workerAdded) {
              // 如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)
              t.start();
              workerStarted = true;
            }
          }
        } finally {
          if (! workerStarted)
            addWorkerFailed(w);
        }
        return workerStarted;
      }

    Worker类继承了AbstractQueuedSynchronizer获得了同步等待这样的功能。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
      {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
          setState(-1); // inhibit interrupts until runWorker
          this.firstTask = firstTask;
          this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker */
        public void run() {
          runWorker(this);
        }
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
          return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
          if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
          }
          return false;
        }
        protected boolean tryRelease(int unused) {
          setExclusiveOwnerThread(null);
          setState(0);
          return true;
        }
        public void lock()    { acquire(1); }
        public boolean tryLock() { return tryAcquire(1); }
        public void unlock()   { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
          Thread t;
          if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
              t.interrupt();
            } catch (SecurityException ignore) {
            }
          }
        }

    runWorker(Worker)是Worker的轮询执行逻辑,不断地从工作队列中获取任务并执行它们。Worker每次执行任务前需要进行lock,防止在执行任务时被interrupt。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
               (Thread.interrupted() &&
               runStateAtLeast(ctl.get(), STOP))) &&
              !wt.isInterrupted())
              wt.interrupt();
            try {
              beforeExecute(wt, task);
              Throwable thrown = null;
              try {
                task.run();
              } catch (RuntimeException x) {
                thrown = x; throw x;
              } catch (Error x) {
                thrown = x; throw x;
              } catch (Throwable x) {
                thrown = x; throw new Error(x);
              } finally {
                afterExecute(task, thrown);
              }
            } finally {
              task = null;
              w.completedTasks++;
              w.unlock();
            }
          }
          completedAbruptly = false;
        } finally {
          processWorkerExit(w, completedAbruptly);
        }
      }

    ThreadPoolExecutor的submit方法中将Callable包装成FutureTask后交给execute方法。

    FutureTask

    FutureTask继承于Runnable和Future,FutureTask定义的几个状态为
    NEW, 尚未执行
    COMPLETING, 正在执行
    NORMAL, 正常执行完成得到结果
    EXCEPTIONAL, 执行抛出异常
    CANCELLED, 执行被取消
    INTERRUPTING,执行正在被中断
    INTERRUPTED, 已经中断。

    其中关键的get方法

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
          s = awaitDone(false, 0L);
        return report(s);
      }

    先获取当前状态,如果还未执行完成并且正常,则进入等待结果流程。在awaitDone不断循环获取当前状态,如果没有结果,则将自己通过CAS的方式添加到等待链表的头部,如果设置了超时,则LockSupport.parkNanos到指定的时间。

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
      }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
          if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
          }
          int s = state;
          if (s > COMPLETING) {
            if (q != null)
              q.thread = null;
            return s;
          }
          else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
          else if (q == null)
            q = new WaitNode();
          else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                               q.next = waiters, q);
          else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
              removeWaiter(q);
              return state;
            }
            LockSupport.parkNanos(this, nanos);
          }
          else
            LockSupport.park(this);
        }
      }

    FutureTask的run方法是执行任务并设置结果的位置,首先判断当前状态是否为NEW并且将当前线程设置为执行线程,然后调用Callable的call获取结果后设置结果修改FutureTask状态。

    public void run() {
        if (state != NEW ||
          !UNSAFE.compareAndSwapObject(this, runnerOffset,
                         null, Thread.currentThread()))
          return;
        try {
          Callable<V> c = callable;
          if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
              result = c.call();
              ran = true;
            } catch (Throwable ex) {
              result = null;
              ran = false;
              setException(ex);
            }
            if (ran)
              set(result);
          }
        } finally {
          // runner must be non-null until state is settled to
          // prevent concurrent calls to run()
          runner = null;
          // state must be re-read after nulling runner to prevent
          // leaked interrupts
          int s = state;
          if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
        }
      }

    以上就是详解java中ThreadPoolExecutor的原理分析(附代码)的详细内容,更多请关注php中文网其它相关文章!

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:详细介绍java中volatile和lock的原理 下一篇:Java中hashCode作用的详细介绍
    PHP编程就业班

    相关文章推荐

    • 详细解析Java反射机制原理和几种Class获取方式• 图文详解!什么是Java内存模型• 图文详解Java数据结构与算法• 带你搞懂JAVA反射机制(总结分享)• 深入解析JAVA中字符串常量池和缓冲池理解与作用

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网