• 技术文章 >Java >java教程

    一文搞懂Java线程池实现原理

    WBOYWBOY2022-11-03 16:13:34转载90
    本篇文章给大家带来了关于java的相关知识,其中主要介绍了关于线程池实现原理的相关内容,包括了为什么要使用线程池以及线程池使用的相关内容,下面一起来看一下,希望对大家有帮助。

    php入门到就业线上直播课:进入学习

    推荐学习:《java视频教程

    1. 为什么要使用线程池

    使用线程池通常由以下两个原因:

    1. 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程。

    2. 使用线程池可以更容易管理线程,线程池可以动态管理线程个数、具有阻塞队列、定时周期执行任务、环境隔离等。

    2. 线程池的使用

    /**
     * @author 一灯架构
     * @apiNote 线程池示例
     **/
    public class ThreadPoolDemo {
    
        public static void main(String[] args) {
            // 1. 创建线程池
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    3,
                    3,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
          
            // 2. 往线程池中提交3个任务
            for (int i = 0; i < 3; i++) {
                threadPoolExecutor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构");
                });
            }
          
            // 3. 关闭线程池
            threadPoolExecutor.shutdown();
        }
    }

    输出结果:

    pool-1-thread-2 关注公众号:一灯架构
    pool-1-thread-1 关注公众号:一灯架构
    pool-1-thread-3 关注公众号:一灯架构

    线程池的使用非常简单:

    再看一下线程池构造方法中核心参数的作用。

    3. 线程池核心参数

    线程池共有七大核心参数:

    参数名称参数含义
    int corePoolSize核心线程数
    int maximumPoolSize最大线程数
    long keepAliveTime线程存活时间
    TimeUnit unit时间单位
    BlockingQueue workQueue阻塞队列
    ThreadFactory threadFactory线程创建工厂
    RejectedExecutionHandler handler拒绝策略

    4. 线程池工作原理

    线程池的工作原理,简单理解如下:

    流程.jpg

    5. 线程池源码剖析

    5.1 线程池的属性

    public class ThreadPoolExecutor extends AbstractExecutorService {
    
        // 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // 线程个数所占的位数
        private static final int COUNT_BITS = Integer.SIZE - 3;
        // 线程池的最大容量,2^29-1,约5亿个线程
        private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    
        // 独占锁,用来控制多线程下的并发操作
        private final ReentrantLock mainLock = new ReentrantLock();
        // 工作线程的集合
        private final HashSet<Worker> workers = new HashSet<>();
        // 等待条件,用来响应中断
        private final Condition termination = mainLock.newCondition();
        // 是否允许回收核心线程
        private volatile boolean allowCoreThreadTimeOut;
        // 线程数的历史峰值
        private int largestPoolSize;
    
        /**
         * 以下是线程池的七大核心参数
         */
        private volatile int corePoolSize;
        private volatile int maximumPoolSize;
        private volatile long keepAliveTime;
        private final BlockingQueue<Runnable> workQueue;
        private volatile ThreadFactory threadFactory;
        private volatile RejectedExecutionHandler handler;
    
    }

    线程池的控制状态ctl用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。

    设计者多聪明,用一个变量存储了两块内容。

    5.2 线程池状态

    线程池共有5种状态:

    状态名称状态含义状态作用
    RUNNING运行中线程池创建后默认状态,接收新任务,并处理阻塞队列中的任务。
    SHUTDOWN已关闭调用shutdown方法后处于该状态,不再接收新任务,处理阻塞队列中任务。
    STOP已停止调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。
    TIDYING处理中所有任务已完成,所有工作线程都已回收,等待调用terminated方法。
    TERMINATED已终止调用terminated方法后处于该状态,线程池的最终状态。

    image-20221031192041121.png

    5.3 execute源码

    看一下往线程池中提交任务的源码,这是线程池的核心逻辑:

    // 往线程池中提交任务
    public void execute(Runnable command) {
        // 1. 判断提交的任务是否为null
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        // 2. 判断线程数是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 3. 把任务包装成worker,添加到worker集合中
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            // 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            // 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 7. 如果添加阻塞队列失败,就创建一个Worker
        else if (!addWorker(command, false))
            // 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略
            reject(command);
    }

    execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:

    // 添加worker
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 1. 检查是否允许提交任务
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            firstTask == null &&
                            !workQueue.isEmpty()))
                return false;
            // 2. 使用死循环保证添加线程成功
            for (; ; ) {
                int wc = workerCountOf(c);
                // 3. 校验线程数是否超过容量限制
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 4. 使用CAS修改线程数
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                // 5. 如果线程池状态变了,则从头再来
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 6. 把任务和新线程包装成一个worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 7. 加锁,控制并发
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 8. 再次校验线程池状态是否异常
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        // 9. 如果线程已经启动,就抛出异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 10. 添加到worker集合中
                        workers.add(w);
                        int s = workers.size();
                        // 11. 记录线程数历史峰值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 12. 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。

    5.4 worker源码

    再看一下worker类的结构:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
        // 工作线程
        final Thread thread;
        // 任务
        Runnable firstTask;
    
        // 创建worker,并创建一个新线程(用来执行任务)
        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    }

    5.5 runWorker源码

    再看一下run方法的源码:

    // 线程执行入口
    public void run() {
        runWorker(this);
    }
    
    // 线程运行核心方法
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务
            while (task != null || (task = getTask()) != null) {
                // 加锁,保证thread不被其他线程中断(除非线程池被中断)
                w.lock();
                // 2. 校验线程池状态,是否需要中断当前线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 3. 执行run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 4. 从worker集合删除当前worker
            processWorkerExit(w, completedAbruptly);
        }
    }

    runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。

    再看一下从阻塞队列中拉取任务的逻辑:

    // 从阻塞队列中拉取任务
    private Runnable getTask() {
        boolean timedOut = false;
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // 2. 再次判断是否需要回收线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                // 3. 从阻塞队列中拉取任务
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    推荐学习:《java视频教程

    以上就是一文搞懂Java线程池实现原理的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:掘金,如有侵犯,请联系admin@php.cn删除

    前端(VUE)零基础到就业课程:点击学习

    清晰的学习路线+老师随时辅导答疑

    快捷开发Web应用及小程序:点击使用

    支持亿级表,高并发,自动生成可视化后台。

    专题推荐:后端 Java
    上一篇:SpringCloud Feign超详细讲解 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • ❤️‍🔥共22门课程,总价3725元,会员免费学• ❤️‍🔥接口自动化测试不想写代码?• JavaScript怎么输入N个数据求平均数• javascript怎么求总分和平均值• 一文详解 JavaScript 中展开运算符的不同使用方式• 一文带你详细了解JavaScript中的深拷贝• JavaScript更新到了es几
    1/1

    PHP中文网