Hintergrund: Durch die Aktualisierung der Computerhardware ist unsere Software in der Lage, Multithread-Aufgaben auszuführen. Wenn wir Multithread-Programmierung durchführen, müssen wir Threads erstellen. Wenn die Programmparallelität sehr hoch ist, erstellen wir eine große Anzahl von Threads, und jeder Thread führt eine kurze Aufgabe aus und wird dann häufig beendet Reduziert die Systemleistung erheblich und erhöht den Server-Overhead, da das Erstellen und Zerstören von Threads zusätzlichen Verbrauch erfordert.
Zu diesem Zeitpunkt können wir die Pooling-Technologie verwenden, um diesen Fehler zu optimieren, und der Thread-Pool war geboren.
Der Kern der Pooling-Technologie besteht darin, die Wiederverwendung von Ressourcen zu erreichen und den Aufwand für die Erstellung und Zerstörung von Ressourcen in Szenarien mit hoher Parallelität zu reduzieren. Wenn die Anzahl der Parallelitäten gering ist, gibt es keinen offensichtlichen Vorteil (Ressourcen belegen immer den Systemspeicher und haben keine Chance darauf). gebraucht).
Einführung in die Pooling-Technologie: Wann kommt die Pooling-Technologie zum Einsatz? Bei der Pooling-Technologie handelt es sich um eine Programmiertechnik, die das Programm bei hoher Parallelität im Programm erheblich optimieren und zusätzlichen Overhead wie häufiges Erstellen und Zerstören von Verbindungen im System reduzieren kann. Zu den Pooling-Technologien, mit denen wir häufig in Kontakt kommen, gehören Datenbankverbindungspools, Thread-Pools, Objektpools usw. Das Merkmal der Pooling-Technologie besteht darin, einige kostenintensive Ressourcen in einem bestimmten Pool (Speicher) zu verwalten und dessen minimale Anzahl von Verbindungen, maximale Anzahl von Verbindungen, Blockierungswarteschlangen, Überlaufregeln und andere Konfigurationen festzulegen, um eine einheitliche Verwaltung zu ermöglichen. Unter normalen Umständen verfügt es auch über einige unterstützende Funktionen wie Überwachung und erzwungenes Recycling.
Pooling-Technologie ist eine Ressourcennutzungstechnologie:
Wenn die Kosten für die Beschaffung von Ressourcen hoch sind
Wenn die Häufigkeit von Ressourcenanforderungen hoch und die Gesamtzahl der verwendeten Ressourcen niedrig ist
... Remote-Ressourcen für die Netzwerkkommunikation, wie Datenbankverbindungen, Socket-Verbindungen usw.Option 1: Executors (nur zum Verständnis, Option 2 wird empfohlen)
Um einen Thread-Pool zu erstellen, können SieExecutorService
-Schnittstelle. DieExecutorService
-Schnittstelle ist eine Unterklassenschnittstelle der Executor-Schnittstelle und wird häufiger verwendet. Sie stellt Thread-Pool-Lebenszyklusverwaltungsmethoden bereit und gibt einExecutorService
und führen asynchrone Aufgaben aus (implementieren Sie die Runnable-Schnittstelle) über ExecutorService
newCachedThreadPool
erstellt einen zwischenspeicherbaren Thread-Pool. Wenn sich zu viele Threads im Thread-Pool befinden, werden die überschüssigen Thread-Ressourcen nach 60 Sekunden recycelt , es sind nicht genügend Threads vorhanden. Es wird ein neuer Thread erstellt.
newFixedThreadPool erstellt einen Thread-Pool mit fester Länge, der die maximale Anzahl gleichzeitiger Threads steuern kann, die in der Warteschlange warten.
newScheduledThreadPool
Erstellt einen Thread-Pool fester Länge, um die geplante und periodische Aufgabenausführung zu unterstützen. ExecutorService
,通过ExecutorService
执行异步任务(实现Runnable接口)
Executors 可以创建一下几种类型的线程池:
newCachedThreadPool
创建一个可缓存线程池,如果线程池线程数量过剩,会在60秒后回收掉多余线程资源,当任务书增加,线程不够用,则会新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor
创建一个单线程的线程池,只使用唯一的线程来执行任务,可以保证任务按照提交顺序来完成。
在阿里巴巴开发规范中,规定线程池不允许通过Executors创建,而是通过ThreadPoolExecutor创建。
好处:让写的同学可以更加明确线程池的运行规则,规避资源耗尽的风险。
ThreadPoolExecutor的七大参数:
(1)corePoolSize
核心线程数量,核心线程会一直保留,不会被销毁。
(2)maximumPoolSize
最大线程数,当核心线程不能满足任务需要时,系统就会创建新的线程来执行任务。
(3)keepAliveTime
存活时间,核心线程之外的线程空闲多长时间就会被销毁。
(4)timeUnit
代表线程存活的时间单位。
(5)BlockingQueue
newSingleThreadExecutor
Erstellen Sie einen Single-Threaded-Thread-Pool und verwenden Sie nur den einzigen Thread zum Ausführen von Aufgaben, um sicherzustellen, dass Aufgaben in der Reihenfolge abgeschlossen werden, in der sie übermittelt werden. In den Alibaba-Entwicklungsspezifikationen ist festgelegt, dass Thread-Pools nicht über Executoren erstellt werden dürfen, sondern über ThreadPoolExecutor. Vorteile: Studierende, die schreiben, können die Betriebsregeln des Thread-Pools besser verstehen und das Risiko einer Ressourcenerschöpfung vermeiden. Die sieben Parameter von
🎜🎜ThreadPoolExecutor: 🎜🎜🎜(1)corePoolSize
Die Anzahl der Kernthreads bleibt immer erhalten und wird nicht zerstört. 🎜🎜(2)maximumPoolSize
Die maximale Anzahl von Threads. Wenn die Kernthreads die Aufgabenanforderungen nicht erfüllen können, erstellt das System neue Threads, um die Aufgaben auszuführen. 🎜🎜(3)keepAliveTime
Überlebenszeit, wie lange andere Threads als Kern-Threads im Leerlauf sind, wird zerstört. 🎜🎜(4)timeUnit
stellt die Zeiteinheit für das Thread-Überleben dar. 🎜🎜(5)BlockingQueue
Blocking queue🎜🎜🎜🎜Wenn die ausgeführte Aufgabe die maximale Anzahl von Threads überschreitet, kann sie in der Warteschlange gespeichert werden Die Aufgabe kann aus der Warteschlange entfernt werden. Ausführung fortsetzen. 🎜🎜🎜🎜🎜Die Warteschlangentypen sind wie folgt: 🎜LinkedBlockingQueue ArrayBlockingQueue SynchronousQueue TransferQueue. 🎜(6)threadFactory
Thread-Factory wird zum Erstellen von Threads verwendet. Sie können Thread-Gruppennamen definieren, was bei der Fehlerbehebung bei Jstack-Problemen sehr hilfreich ist. threadFactory
线程工厂,用来创建线程的,可以自定义线程,比如我们可以定义线程组名称,在jstack问题排查时,非常有帮助。
(7)rejectedExecutionHandler
拒绝策略,
当所有线程(最大线程数)都在忙,并且任务队列处于满任务的状态,则会执行拒绝策略。
JDK为我们提供了四种拒绝策略,我们必须都得熟悉
AbortPolicy: 丢弃任务,并抛出异常RejectedExecutionException。 默认
DiscardPolicy: 丢弃最新的任务,不抛异常。
DiscardOldestPolicy: 扔掉排队时间最久的任务,也就是最旧的任务。
CallerRuns: 由调用者(提交异步任务的线程)处理任务。
想要实现一个线程池我们就需要关心ThreadPoolExecutor类,因为Executors创建线程池也是通过new ThreadPoolExecutor对象。
看一下ThreadPoolExecutor
的类继承关系,可以看出为什么通过Executors
创建的线程池返回结果是ExecutorService,因为ThreadPoolExecutor是ExecutorService接口的实现类,而Executors创建线程池本质也是创建的ThreadPoolExecutor 对象。
下面我们一起看一下ThreadPoolExecutor
的源码,首先是ThreadPoolExecutor
内定义的变量,常量:
// 复合类型变量 是一个原子整数 控制状态(运行状态|线程池活跃线程数量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 容量 // 运行状态存储在高位3位 private static final int RUNNING = -1 << COUNT_BITS; // 接受新任务,并处理队列任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,但会处理队列任务 private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,不会处理队列任务,中断正在处理的任务 private static final int TIDYING = 2 << COUNT_BITS; // 所有的任务已结束,活跃线程为0,线程过渡到TIDYING状 态,将会执行terminated()钩子方法 private static final int TERMINATED = 3 << COUNT_BITS; // terminated()方法已经完成 // 设置 ctl 参数方法 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } /** * 阻塞队列 */ private final BlockingQueue<Runnable> workQueue; /** * Lock 锁. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 工人们 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 等待条件支持等待终止 */ private final Condition termination = mainLock.newCondition(); /** * 最大的池大小. */ private int largestPoolSize; /** * 完成任务数 */ private long completedTaskCount; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 拒绝策略 */ private volatile RejectedExecutionHandler handler; /** * 存活时间 */ private volatile long keepAliveTime; /** * 允许核心线程数 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数 */ private volatile int corePoolSize; /** * 最大线程数 */ private volatile int maximumPoolSize; /** * 默认拒绝策略 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * shutdown and shutdownNow权限 */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
构造器,,支持最少五种参数,最大七中参数的四种构造器:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
工人,线程池中执行任务的,线程池就是通过这些工人进行工作的,有核心员工(核心线程)和临时工(人手不够的时候,临时创建的,如果空闲时间厂,就会被裁员),
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 工人的本质就是个线程 final Thread thread; // 第一件工作任务 Runnable firstTask; volatile long completedTasks; /** * 构造器 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 工作 */ public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
核心方法,通过线程池执行任务(这也是线程池的运行原理):
检验任务
获取当前线程池状态
判断上班工人数量是否小于核心员工数
如果小于则招人,安排工作
不小于则判断等候区任务是否排满
如果没有排满则任务排入等候区
如果排满,看是否允许招人,允许招人则招临时工
如果都不行,该线程池无法接收新任务,开始按老板约定的拒绝策略,执行拒绝策略
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
submit()
方法是其抽象父类定义的,这里我们就可以明显看到submit与execute的区别,通过submit调用,我们会创建RunnableFuture
rejectedExecutionHandler
Ablehnungsrichtlinie, Wenn alle Threads (maximale Anzahl von Threads) beschäftigt sind und die Aufgabenwarteschlange voller Aufgaben ist, wird die Ablehnungsrichtlinie ausgeführt.
JDK bietet uns vier Ablehnungsstrategien, mit denen wir alle vertraut sein müssen
AbortPolicy: Brechen Sie die Aufgabe ab und werfen Sie die Ausnahme RejectedExecutionException aus. Default
🎜🎜ThreadPoolExecutor
ansehen, können Sie erkennen, warum der von Executors
erstellte Thread-Pool das Ergebnis ExecutorService zurückgibt, da ThreadPoolExecutor die Implementierungsklasse der ExecutorService-Schnittstelle ist , und Executors Der Kern der Erstellung eines Thread-Pools besteht darin, auch ein ThreadPoolExecutor-Objekt zu erstellen. 🎜🎜🎜🎜Lassen Sie uns zusammenarbeiten Schauen Sie sich den Quellcode von ThreadPoolExecutor
an. Der erste sind die im ThreadPoolExecutor
definierten Variablen und Konstanten: 🎜public abstract class AbstractExecutorService implements ExecutorService { public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } ... }
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断状态,及任务列表 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
submit()
wird durch ihre abstrakte übergeordnete Klasse definiert. Hier können Sie den Unterschied zwischen deutlich erkennen Durch den Aufruf von „Submit“ erstellen wir RunnableFuture
und geben Future zurück. Hier können wir der Submit-Methode den Rückgabewerttyp mitteilen und sie übergibt den generischen Constraint-Rückgabewert. 🎜rrreee🎜🎜addWorker() ist eine Möglichkeit, Leute zu rekrutieren: 🎜🎜rrreee🎜🎜So erhalten Sie Aufgaben: 🎜🎜rrreee🎜🎜Eine Möglichkeit, Mitarbeiter arbeiten zu lassen, Aufgaben zuzuweisen und Aufgaben auszuführen: 🎜🎜rrreeeDas obige ist der detaillierte Inhalt vonQuellcode-Analyse des Java-Thread-Pool-Implementierungsprinzips. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!