Rumah> Java> javaTutorial> teks badan

SpringBoot线程池和Java线程池怎么使用

PHPz
Lepaskan: 2023-05-18 12:46:36
ke hadapan
903 orang telah melayarinya

    SpringBoot线程池和Java线程池的用法和实现原理

    使用默认的线程池

    方式一:通过@Async注解调用

    public class AsyncTest { @Async public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); } }
    Salin selepas log masuk

    启动类上需要添加@EnableAsync注解,否则不会生效。

    @SpringBootApplication //@EnableAsync public class Test1Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); AsyncTest bean = run.getBean(AsyncTest.class); for(int index = 0; index <= 10; ++index){ bean.async(String.valueOf(index)); } } }
    Salin selepas log masuk

    方式二:直接注入 ThreadPoolTaskExecutor

    此时可不加@EnableAsync注解

    @SpringBootTest class Test1ApplicationTests { @Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; for(int index = 0; index <= 10; ++index){ threadPoolTaskExecutor.submit(runnable); } } }
    Salin selepas log masuk

    线程池默认配置信息

    SpringBoot线程池的常见配置:

    spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 Integer.MAX_VALUE keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
    Salin selepas log masuk

    SpringBoot 线程池的实现原理

    TaskExecutionAutoConfiguration类中定义了ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中RejectedExecutionHandler被定义为了private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

    注意在TaskExecutionAutoConfiguration类中,ThreadPoolTaskExecutor类的bean的名称为:applicationTaskExecutortaskExecutor

    // TaskExecutionAutoConfiguration#applicationTaskExecutor() @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
    Salin selepas log masuk
    // ThreadPoolTaskExecutor#initializeExecutor() @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
    Salin selepas log masuk
    // ExecutorConfigurationSupport#initialize() public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
    Salin selepas log masuk

    覆盖默认的线程池

    覆盖默认的taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

    @Configuration public class ThreadPoolConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
    Salin selepas log masuk

    管理多个线程池

    如果出现了多个线程池,例如再定义一个线程池taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

    @Bean("taskExecutor2") public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; }
    Salin selepas log masuk

    引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

    @Resource ThreadPoolTaskExecutor taskExecutor2;
    Salin selepas log masuk

    对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2") public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); }
    Salin selepas log masuk

    线程池的四种拒绝策略

    JAVA常用的四种线程池

    ThreadPoolExecutor类的构造函数如下:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
    Salin selepas log masuk

    newCachedThreadPool

    不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

    new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    Salin selepas log masuk

    newFixedThreadPool

    定长线程池,超出线程数的任务会在队列中等待。

    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    Salin selepas log masuk

    newScheduledThreadPool

    类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

    public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
    Salin selepas log masuk

    周期执行:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(()->{ System.out.println("rate"); }, 1, 1, TimeUnit.SECONDS);
    Salin selepas log masuk

    延时执行:

    scheduledThreadPool.schedule(()->{ System.out.println("delay 3 seconds"); }, 3, TimeUnit.SECONDS);
    Salin selepas log masuk

    newSingleThreadExecutor

    单线程线程池,可以实现线程的顺序执行。

    public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
    Salin selepas log masuk

    Java 线程池中的四种拒绝策略

    • CallerRunsPolicy:线程池让调用者去执行。

    • AbortPolicy:如果线程池拒绝了任务,直接报错。

    • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

    • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

    CallerRunsPolicy

    直接在主线程中执行了run方法。

    public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
    Salin selepas log masuk

    效果类似于:

    Runnable thread = ()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(0); } catch (InterruptedException e) { throw new RuntimeException(e); } }; thread.run();
    Salin selepas log masuk

    AbortPolicy

    直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

    public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
    Salin selepas log masuk

    DiscardPolicy

    什么也不做。

    public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
    Salin selepas log masuk

    DiscardOldestPolicy

    • e.getQueue().poll(): 取出队列最旧的任务。

    • e.execute(r): 当前任务入队。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
    Salin selepas log masuk

    Java 线程复用的原理

    java的线程池中保存的是java.util.concurrent.ThreadPoolExecutor.Worker对象,该对象在 被维护在private final HashSet workers = new HashSet();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
    Salin selepas log masuk

    work对象的执行依赖于runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

    final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
    Salin selepas log masuk

    Atas ialah kandungan terperinci SpringBoot线程池和Java线程池怎么使用. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Label berkaitan:
    sumber:yisu.com
    Kenyataan Laman Web ini
    Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
    Muat turun terkini
    Lagi>
    kesan web
    Kod sumber laman web
    Bahan laman web
    Templat hujung hadapan
    Tentang kita Penafian Sitemap
    Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!