Home  >  Article  >  Java  >  Source code analysis of Java thread pool implementation principle

Source code analysis of Java thread pool implementation principle

WBOY
WBOYforward
2023-05-09 14:16:071439browse

The origin of the thread pool

Background: With the upgrading of computer hardware, our software has the ability to perform multi-threaded tasks. When we are doing multi-thread programming, we need to create threads. If the program concurrency is very high, we will create a large number of threads, and each thread will perform a short task and then end. Creating threads frequently will cause It greatly reduces system performance and increases server overhead, because creating and destroying threads requires additional consumption.

At this time we can use pooling technology to optimize this defect, and the thread pool was born.

The essence of pooling technology is to achieve resource reuse and reduce the overhead of resource creation and destruction in high concurrency scenarios. If the number of concurrencies is small, there is no obvious advantage (the resources always occupy system memory and have no chance to be used. ).

Introduction to pooling technology: When is pooling technology? Pooling technology is a programming technique that can significantly optimize the program when there is high concurrency in the program and reduce additional overhead such as frequent creation and destruction of connections in the system. Pooling technologies that we often come into contact with include database connection pools, thread pools, object pools, etc. The characteristic of pooling technology is to maintain some high-cost resources in a specific pool (memory), and specify its minimum number of connections, maximum number of connections, blocking queue, overflow rules and other configurations to facilitate unified management. Under normal circumstances, it will also come with some supporting functions such as monitoring and forced recycling.

Pooling technology is a resource usage technology. Typical usage scenarios are:

  • When the cost of obtaining resources is high

  • When the frequency of resource requests is high and the total number of resources used is low

  • When faced with performance problems and processing time delays

Pooling technology resource classification:

  • System resources called by the system, such as threads, processes, memory allocation, etc.

  • Remote resources for network communication, such as database connections, socket connections, etc.

Definition and use of thread pool

Threads The pool was born to avoid the additional overhead of creating threads and destroying threads. Therefore, after we define and create the thread pool, we do not need to create threads ourselves, but use thread pool calls to perform our tasks. Let's take a look at how to define and create a thread pool.

Option 1: Executors (for understanding only, it is recommended to use option 2)

To create a thread pool, you can use Executors, which provides a series of factory methods for creating threads The pool and the returned thread pool all implement the ExecutorService interface.

ExecutorService The interface is a subclass interface of the Executor interface and is more widely used. It provides a thread pool life cycle management method and returns a Future object.

That is to say, we create a thread pool through Executors, get ExecutorService, and execute asynchronous tasks (implementing the Runnable interface) through ExecutorService

Executors can create several types of thread pools:

  • newCachedThreadPool Create a cacheable thread pool. If the number of thread pool threads is excessive, it will be in 60 Excess thread resources will be recycled after seconds. When the task book increases and there are not enough threads, a new thread will be created.

  • newFixedThreadPool creates a fixed-length thread pool that can control the maximum number of concurrent threads. Exceeding threads will wait in the queue.

  • newScheduledThreadPool Create a fixed-length thread pool to support scheduled and periodic task execution.

  • newSingleThreadExecutor Create a single-threaded thread pool and only use the only thread to execute tasks, which ensures that tasks are completed in the order they are submitted.

Option 2: ThreadPoolExecutor

In Alibaba development specifications, it is stipulated that thread pools are not allowed to be created through Executors, but are created through ThreadPoolExecutor.

Benefits: Students who write can be more clear about the running rules of the thread pool and avoid the risk of resource exhaustion.

The seven parameters of ThreadPoolExecutor:

(1)corePoolSize The number of core threads, core threads will always be retained and will not be destroyed.

(2)maximumPoolSize The maximum number of threads. When the core threads cannot meet the task needs, the system will create new threads to execute the tasks.

(3)keepAliveTime The survival time, how long the threads other than the core thread are idle will be destroyed.

(4)timeUnit Represents the time unit for thread survival.

(5)BlockingQueue Blocking Queue

  • If the task being executed exceeds the maximum number of threads, it can be stored in the queue. When the thread pool If there are free resources, the task can be removed from the queue and continued execution.

  • Queue types include the following types: LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue.

(6)threadFactory Thread factory is used to create threads. You can customize threads. For example, we can define thread group names, which is very helpful when troubleshooting jstack problems. .

(7)rejectedExecutionHandler Rejection strategy,

When all threads (maximum number of threads) are busy and the task queue is full of tasks, rejection will be executed Strategy.

JDK provides us with four rejection strategies, and we must all be familiar with them

  • AbortPolicy: Drop tasks, And throw exception RejectedExecutionException. Default

  • DiscardPolicy: Discard the latest task without throwing an exception.

  • DiscardOldestPolicy: Discard the task with the longest queue time, that is, the oldest task.

  • CallerRuns: The task is processed by the caller (the thread that submitted the asynchronous task).

The implementation principle of thread pool

If we want to implement a thread pool, we need to care about the ThreadPoolExecutor class, because Executors also create thread pools through the new ThreadPoolExecutor object.

Looking at the class inheritance relationship of ThreadPoolExecutor, we can see why the thread pool created by Executors returns the result ExecutorService, because ThreadPoolExecutor is the implementation class of the ExecutorService interface. The essence of the thread pool created by Executors is also the ThreadPoolExecutor object created.

Source code analysis of Java thread pool implementation principle

Let’s take a look at the source code of ThreadPoolExecutor. First, the variables and constants defined in ThreadPoolExecutor are:

    // 复合类型变量 是一个原子整数  控制状态(运行状态|线程池活跃线程数量)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 容量
    // 运行状态存储在高位3位
    private static final int RUNNING    = -1 << COUNT_BITS;  // 接受新任务,并处理队列任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 不接受新任务,但会处理队列任务
    private static final int STOP       =  1 << COUNT_BITS;  // 不接受新任务,不会处理队列任务,中断正在处理的任务
    private static final int TIDYING    =  2 << COUNT_BITS;  // 所有的任务已结束,活跃线程为0,线程过渡到TIDYING状       态,将会执行terminated()钩子方法
    private static final int TERMINATED =  3 << COUNT_BITS;  // terminated()方法已经完成
    // 设置 ctl 参数方法
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    /**
     * 阻塞队列
     */
    private final BlockingQueue<Runnable> workQueue;
    /**
     * Lock 锁.
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    /**
     * 工人们
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    /**
     * 等待条件支持等待终止
     */
    private final Condition termination = mainLock.newCondition();
    /**
     * 最大的池大小.
     */
    private int largestPoolSize;
    /**
     * 完成任务数
     */
    private long completedTaskCount;
    /**
     * 线程工厂
     */
    private volatile ThreadFactory threadFactory;
    /**
     * 拒绝策略
     */
    private volatile RejectedExecutionHandler handler;
    /**
     * 存活时间
     */
    private volatile long keepAliveTime;
    /**
     * 允许核心线程数
     */
    private volatile boolean allowCoreThreadTimeOut;
    /**
     * 核心线程数
     */
    private volatile int corePoolSize;
    /**
     * 最大线程数
     */
    private volatile int maximumPoolSize;
    /**
     * 默认拒绝策略
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    /**
     * shutdown and shutdownNow权限
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

Constructor, supports four constructors with at least five parameters and a maximum of seven parameters:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Workers, the thread pool executes tasks through these Workers perform work, including core employees (core threads) and temporary workers (temporarily created when there are not enough people, and will be laid off if they are idle).

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 工人的本质就是个线程
        final Thread thread;
        // 第一件工作任务
        Runnable firstTask;
      volatile long completedTasks;
        /**
         * 构造器
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** 工作  */
        public void run() {
            runWorker(this);
        }
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Core methods, through The thread pool executes the task (this is also the operating principle of the thread pool):

  • Check the task

  • Get the current thread pool status

  • Judge whether the number of working workers is less than the number of core employees

  • If it is less, recruit people and arrange work

  • If not less than, determine whether the waiting area tasks are full

  • If not, the tasks will be placed in the waiting area

  • If the waiting area is full, see Whether to allow recruitment, if recruitment is allowed, temporary workers will be recruited

  • If it fails, the thread pool cannot receive new tasks, and starts to execute the rejection policy according to the rejection policy agreed by the boss

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

submit()The method is defined by its abstract parent class. Here we can clearly see the difference between submit and execute. By calling submit, we will create RunnableFuture, and will return Future. Here we can tell the submit method the return value type, and it will return the value through generic constraints.

public abstract class AbstractExecutorService implements ExecutorService {
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    ...
}

addWorker() is a method to recruit people:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 判断状态,及任务列表
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Method to obtain tasks:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

How to let employees work, assign tasks, and run tasks:

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

The above is the detailed content of Source code analysis of Java thread pool implementation principle. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete