public class AsyncTest { @Async public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); } }
에 @EnableAsync</code를 추가해야 합니다. 시작 클래스 >Annotation, 그렇지 않으면 적용되지 않습니다. <code>@EnableAsync
注解,否则不会生效。
@SpringBootApplication //@EnableAsync public class Test1Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); AsyncTest bean = run.getBean(AsyncTest.class); for(int index = 0; index <= 10; ++index){ bean.async(String.valueOf(index)); } } }
此时可不加 @EnableAsync
注解
@SpringBootTest class Test1ApplicationTests { @Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; for(int index = 0; index <= 10; ++index){ threadPoolTaskExecutor.submit(runnable); } } }
SpringBoot线程池的常见配置:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 Integer.MAX_VALUE keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
TaskExecutionAutoConfiguration
类中定义了 ThreadPoolTaskExecutor
,该类的内部实现也是基于java原生的 ThreadPoolExecutor
类。initializeExecutor()
方法在其父类中被调用,但是在父类中 RejectedExecutionHandler
被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
,并通过initialize()
方法将AbortPolicy
传入initializeExecutor()
中。
注意在TaskExecutionAutoConfiguration
类中,ThreadPoolTaskExecutor
类的bean的名称为: applicationTaskExecutor
和 taskExecutor
。
// TaskExecutionAutoConfiguration#applicationTaskExecutor() @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
// ThreadPoolTaskExecutor#initializeExecutor() @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
// ExecutorConfigurationSupport#initialize() public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
覆盖默认的 taskExecutor
对象,bean的返回类型可以是ThreadPoolTaskExecutor
也可以是Executor
。
@Configuration public class ThreadPoolConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
如果出现了多个线程池,例如再定义一个线程池 taskExecutor2
,则直接执行会报错。此时需要指定bean的名称即可。
@Bean("taskExecutor2") public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; }
引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
对于使用@Async
注解的多线程则在注解中指定bean的名字即可。
@Async("taskExecutor2") public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); }
线程池的四种拒绝策略
ThreadPoolExecutor
类的构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE
),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
定长线程池,超出线程数的任务会在队列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
类似于newCachedThreadPool
,线程数无上限,但是可以指定corePoolSize
。可实现延迟执行、周期执行。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
周期执行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(()->{ System.out.println("rate"); }, 1, 1, TimeUnit.SECONDS);
延时执行:
scheduledThreadPool.schedule(()->{ System.out.println("delay 3 seconds"); }, 3, TimeUnit.SECONDS);
单线程线程池,可以实现线程的顺序执行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
CallerRunsPolicy
:线程池让调用者去执行。
AbortPolicy
:如果线程池拒绝了任务,直接报错。
DiscardPolicy
:如果线程池拒绝了任务,直接丢弃。
DiscardOldestPolicy
:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
直接在主线程中执行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
效果类似于:
Runnable thread = ()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(0); } catch (InterruptedException e) { throw new RuntimeException(e); } }; thread.run();
直接抛出RejectedExecutionException
异常,并指示任务的信息,线程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
e.getQueue().poll()
: 取出队列最旧的任务。
e.execute(r)
: 当前任务入队。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
java
的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker
对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();
。workQueue
是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue
队列中。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
work对象的执行依赖于 runWorker()
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
@EnableAsync
주석을 추가할 필요가 없습니다🎜rrreee🎜스레드 풀 기본 구성 정보🎜🎜SpringBoot 스레드 풀의 일반적인 구성:🎜 rrreee🎜SpringBoot 스레드 풀 원리 구현 🎜🎜TaskExecutionAutoConfiguration
클래스는 ThreadPoolTaskExecutor
를 정의하고 이 클래스의 내부 구현도 Java의 기본 ThreadPoolExecutor
를 기반으로 합니다. 수업. initializeExecutor()
메서드는 상위 클래스에서 호출되지만 상위 클래스에서는 RejectedExecutionHandler
가 private RejectedExecutionHandler 거부된ExecutionHandler = new ThreadPoolExecutor.AbortPolicy();, <code>initialize()
메서드를 통해 AbortPolicy
를 initializeExecutor()
에 전달합니다. 🎜🎜TaskExecutionAutoConfiguration
클래스에서 ThreadPoolTaskExecutor
클래스의 Bean 이름은 applicationTaskExecutor
및 taskExecutor
입니다. 🎜rrreeerrreeerrreee🎜기본 스레드 풀 재정의🎜🎜기본 taskExecutor
개체를 재정의하세요. Bean의 반환 유형은 ThreadPoolTaskExecutor
또는 Executor
일 수 있습니다. 🎜rrreee🎜여러 스레드 풀 관리🎜🎜여러 스레드 풀이 있는 경우(예: 다른 스레드 풀 taskExecutor2
정의) 직접 실행하면 오류가 보고됩니다. 이때 Bean의 이름을 지정해 주어야 합니다. 🎜rrreee🎜스레드풀 참조시 변수명을 해당 빈의 이름으로 변경해야 이름으로 검색이 됩니다. 🎜rrreee🎜@Async
주석을 사용하는 멀티스레딩의 경우 주석에 Bean 이름을 지정하기만 하면 됩니다. 🎜rrreee🎜스레드 풀의 네 가지 거부 전략🎜🎜JAVA에서 일반적으로 사용되는 네 가지 스레드 풀🎜🎜ThreadPoolExecutor
클래스 생성자는 다음과 같습니다. 🎜rrreee🎜newCachedThreadPool🎜🎜최대 개수에는 제한이 없습니다. 스레드( maximumPoolSize=Integer.MAX_VALUE
), 필요한 것보다 더 많은 유휴 스레드가 있으면 재활용되고, 그렇지 않으면 기존 스레드가 재사용됩니다. 🎜rrreee🎜newFixedThreadPool🎜🎜고정 길이 스레드 풀, 스레드 수를 초과하는 작업은 대기열에서 대기합니다. 🎜rrreee🎜newScheduledThreadPool🎜🎜 newCachedThreadPool
과 유사하며 스레드 수에는 상한이 없지만 corePoolSize
를 지정할 수 있습니다. 지연 실행과 주기적 실행이 가능합니다. 🎜rrreee🎜정기적 실행: 🎜rrreee🎜지연 실행: 🎜rrreee🎜newSingleThreadExecutor🎜🎜스레드의 순차적 실행을 실현할 수 있는 단일 스레드 스레드 풀입니다. 🎜rrreee🎜Java 스레드 풀의 네 가지 거부 전략🎜CallerRunsPolicy
: 스레드 풀을 통해 호출자가 실행될 수 있습니다. 🎜AbortPolicy
: 스레드 풀이 작업을 거부하면 오류가 직접 보고됩니다. 🎜DiscardPolicy
: 스레드 풀이 작업을 거부하면 해당 작업이 바로 삭제됩니다. 🎜DiscardOldestPolicy
: 스레드 풀이 작업을 거부하면 스레드 풀에서 가장 오래되고 실행되지 않은 작업이 바로 삭제되고 새 작업이 대기열에 추가됩니다. 🎜RejectedExecutionException
예외를 직접 발생시키고 작업 정보와 스레드 풀 정보를 나타냅니다. , 🎜rrreee🎜DiscardPolicy🎜🎜는 아무 작업도 수행하지 않습니다. 🎜rrreee🎜DiscardOldestPolicy🎜e.getQueue().poll()
: 가장 오래된 정책을 제거합니다. 큐 태스크. 🎜e.execute(r)
: 현재 작업이 대기열에 추가됩니다. 🎜java
의 스레드 풀은 java.util.concurrent.ThreadPoolExecutor.Worker
객체를 저장합니다. private final HashSet<worker> Workers = new HashSet<worker>();</worker></worker>
에서 유지됩니다. workQueue
는 실행할 작업을 저장하는 대기열입니다. 스레드 풀에 새 작업이 추가되면 해당 작업이 workQueue
대기열에 추가됩니다. 🎜rrreee🎜작업 개체의 실행은 runWorker()
에 의존합니다. 우리가 일반적으로 작성하는 스레드와 달리 이 스레드는 루프에 있으며 실행을 위해 대기열에서 지속적으로 새 작업을 얻습니다. 따라서 우리가 일반적으로 사용하는 스레드처럼 실행 후 종료되는 대신 스레드 풀에 있는 스레드를 재사용할 수 있습니다. 🎜아아아아위 내용은 SpringBoot 스레드 풀과 Java 스레드 풀을 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!