Maison >Java >javaDidacticiel >Explication détaillée de la façon d'utiliser le pool de threads dans la programmation simultanée Java
L'éditeur suivant vous proposera un article sur la concurrence JavaProgrammation_Comment utiliser le pool de threads (explication détaillée). L'éditeur pense que c'est plutôt bien, alors je vais le partager avec vous maintenant et le donner comme référence. Suivons l'éditeur pour y jeter un œil
1. Le couplage implicite entre les tâches et les stratégies d'exécution
L'exécuteur peut soumettre des tâches découplées de l'exécution stratégie de la tâche
Seules les tâches du même type avec peu de différence dans le temps d'exécution peuvent atteindre des performances maximales, sinon, par exemple, mettre des tâches longues et courtes dans le même pool, à moins que le pool de threads ne soit très volumineux, cela entraînera des problèmes tels qu'un blocage
1. Un blocage par manque de threads
Similaire à : Combinez les deux. Une tâche est soumise à un pool monothread, et les deux tâches dépendent l'une de l'autre. Si une tâche attend une autre tâche, un blocage se produira ; la performance est que le pool n'est pas suffisant
Définition. : une tâche doit attendre le pool Suite à l'exécution d'autres tâches, une impasse de famine peut se produire
2. Taille du pool de threads
Remarque : La taille du pool de threads est également soumise à d'autres restrictions, telles que d'autres pools de ressources : pool de connexions à la base de données
Si chaque tâche est une connexion, alors la taille du le pool de threads est limité par la taille du pool de connexions à la base de données
3. Configurez le pool de threads ThreadPoolExecutor
Instance :
1. Retour via la méthode d'usine des exécuteurs Quelques implémentations par défaut
2. Personnalisez l'implémentation en instanciant ThreadPoolExecutor(.....) >
. 1. File d'attente illimitée : Lorsque la tâche arrive et que le pool de threads est plein, la tâche attend dans la file d'attente. Si la tâche atteint l'infini, la file d'attente s'étendra à l'infini
Par exemple : ceci. est utilisé pour les singletons et les pools de threads de taille fixe2 File d'attente limitée :
Si une nouvelle tâche arrive et que la file d'attente est pleine, utilisezStratégie de saturation.
3. Transfert synchrone : Si le pool de threads est grand, il y aura un délai de transfert après avoir mis la tâche dans la file d'attente. le producteur de tâches provoque également bientôt la mise en file d'attente de la tâche
SynchronousQueue remet directement la tâche au thread de travailMécanisme : pour mettre une tâche dedans, il doit y avoir a Le thread attend l'acceptation. Sinon, ajoute un
Si le thread est saturé, rejette la tâchePar exemple : Cache
ThreadPool est la stratégie. utilisé RejectedExecutionHandler pour modifier la stratégie de saturation
1. Terminer l'abandon (par défaut) : Lève une exception Gérée par l'appelant
2. Rejeter Rejeter 3. Supprimer la plus ancienne : Abandonnez la tâche la plus ancienne. Remarque : Si elle est
prioritaire, la file d'attente supprimera la tâche la plus prioritaire
4.Ctoutes. erRuns : tâche de restauration, le thread appelant la gère tout seul
4. Thread factory ThreadFactoy
Chaque fois qu'un thread est créé Quand : appelle réellement la fabrique de threads pour terminer Fabrique de threads personnalisée :
implémente ThreadFactoryVous pouvez personnaliser la fabrique de threads
Comportement:tels que UncaughtException
Gestionnaire, etc.public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }Méthodes qui peuvent être remplacées par des sous-classes personnalisées :
1.afterExecute : Après la fin, si une exception Runtimeest levée, la méthode ne sera pas exécutée
2.before
Execute : Avant de démarrer, si une RuntimeException est levée, la tâche ne sera pas exécutée2. Parallélisation de l'algorithme
récursif1.循环
在循环中,每次循环操作都是独立的
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2.迭代
如果每个迭代操作是彼此独立的,则可以串行执行
如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!