public class AsyncTest { @Async public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); } }
は、@EnableAsync
アノテーションを使用してスタートアップ クラスに追加する必要があります。追加しないと有効になりません。
@SpringBootApplication //@EnableAsync public class Test1Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); AsyncTest bean = run.getBean(AsyncTest.class); for(int index = 0; index <= 10; ++index){ bean.async(String.valueOf(index)); } } }
追加する必要はありません @EnableAsync
Annotation
@SpringBootTest class Test1ApplicationTests { @Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; for(int index = 0; index <= 10; ++index){ threadPoolTaskExecutor.submit(runnable); } } }
SpringBoot スレッド プールの一般的な構成:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 Integer.MAX_VALUE keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
TaskExecutionAutoConfiguration
の実装原則は、内部実装であるクラス ThreadPoolTaskExecutor
で定義されています。このクラスのこれも、Java のネイティブ ThreadPoolExecutor
クラスに基づいています。 initializeExecutor()
メソッドは親クラスで呼び出されますが、親クラスでは RejectedExecutionHandler
が private RejectedExecutionHandler requestedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
として定義されています。そして、initialize()
メソッドを通じて AbortPolicy
を initializeExecutor()
に渡します。
TaskExecutionAutoConfiguration
クラスでは、ThreadPoolTaskExecutor
クラスの Bean 名は、applicationTaskExecutor
および taskExecutor
であることに注意してください。
// TaskExecutionAutoConfiguration#applicationTaskExecutor() @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
// ThreadPoolTaskExecutor#initializeExecutor() @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
// ExecutorConfigurationSupport#initialize() public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
デフォルトの taskExecutor
オブジェクトをオーバーライドします。Bean の戻り値の型は、ThreadPoolTaskExecutor
または Executor です。
。
@Configuration public class ThreadPoolConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
複数のスレッド プールがある場合、たとえば、別のスレッド プール taskExecutor2
を定義すると、直接実行するとエラーが報告されます。このとき、Bean の名前を指定する必要があります。
@Bean("taskExecutor2") public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; }
スレッド プールを参照する場合、名前で検索されるように、変数名を Bean の名前に変更する必要があります。
@Resource ThreadPoolTaskExecutor taskExecutor2;
@Async
アノテーションを使用したマルチスレッドの場合は、アノテーションで Bean 名を指定するだけです。
@Async("taskExecutor2") public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); }
スレッド プールの 4 つの拒否戦略
ThreadPoolExecutor
クラスのコンストラクターは次のとおりです。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
スレッドの最大数を制限しません (maximumPoolSize=Integer.MAX_VALUE
)。アイドル状態のスレッドが必要以上にある場合はリサイクルされ、そうでない場合は既存のスレッドになります。スレッドは再利用されます。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
固定長のスレッド プール。スレッド数を超えるタスクはキューで待機します。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newCachedThreadPool
と同様に、スレッド数に上限はありませんが、corePoolSize
を指定できます。遅延実行と定期実行が可能です。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
定期実行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(()->{ System.out.println("rate"); }, 1, 1, TimeUnit.SECONDS);
遅延実行:
scheduledThreadPool.schedule(()->{ System.out.println("delay 3 seconds"); }, 3, TimeUnit.SECONDS);
シングルスレッド スレッド プールは、スレッドの順次実行を実現できます。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
CallerRunsPolicy
: スレッド プールは呼び出し元の実行を許可します。
AbortPolicy
: スレッド プールがタスクを拒否した場合、エラーが直接報告されます。
DiscardPolicy
: スレッド プールがタスクを拒否した場合、そのタスクは直接破棄されます。
DiscardOldestPolicy
: スレッド プールがタスクを拒否した場合、スレッド プール内の最も古い未実行タスクが直接破棄され、新しいタスクがキューに入れられます。
run メソッドはメイン スレッドで直接実行されます。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
効果は次のようになります。
Runnable thread = ()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(0); } catch (InterruptedException e) { throw new RuntimeException(e); } }; thread.run();
RejectedExecutionException
例外を直接スローし、タスク情報とスレッド プール情報を示します。 ,
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy
は何も行いません。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
##e.getQueue().poll(): キューから最も古いタスクを削除します。
e.execute(r) : 現在のタスクがキューに入れられます。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
javaスレッド プールは
java.util.concurrent.ThreadPoolExecutor.Worker# に保存されます## オブジェクト。private Final HashSet<Worker> works = new HashSet<Worker>();
で維持されます。 workQueue
は実行するタスクを格納するキューで、スレッドプールに新しいタスクが追加されると、そのタスクは workQueue
キューに追加されます。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}</pre><div class="contentsignin">ログイン後にコピー</div></div>作業オブジェクトの実行は、runWorker()<p> に依存します。これは、通常作成するスレッドとは異なります。このスレッドはループ内にあり、キューから新しいタスクの実行を継続的に取得します。 . .したがって、通常使用するスレッドのように実行後に終了するのではなく、スレッド プール内のスレッドを再利用できます。 <code>rree
以上がSpringBootスレッドプールとJavaスレッドプールの使い方の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。