1. Classe de tâches d'exécution multi-thread
package com.visy.threadpool; import com.visy.executor.ExecutorFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Configuration; @Configuration public class ThreadPoolConfig { private TheadPoolProperties theadPoolProperties; private ThreadPoolExecutor executor; private ThreadPoolExecutor executorChild; public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); } publicList doConcurrentTask(List > taskList, ThreadPoolExecutor... executorChilds) { if (taskList != null && !taskList.isEmpty()) { List resultList = new ArrayList(); List futureList = null; try { if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) { throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size()); } if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) { futureList = executorChilds[0].invokeAll(taskList); } else { futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS); } } catch (InterruptedException var6) { var6.printStackTrace(); } this.doFutureList(resultList, futureList); return resultList; } else { return null; } } void doFutureList(List resultList, List > futureList) { if (futureList != null) { Iterator var3 = futureList.iterator(); while(var3.hasNext()) { Future future = (Future)var3.next(); try { resultList.add(future.get()); } catch (ExecutionException | InterruptedException var6) { var6.printStackTrace(); } } } } public void doVoidConcurrentTask(List > taskList) { if (taskList != null && !taskList.isEmpty()) { Iterator var2 = taskList.iterator(); while(var2.hasNext()) { Callable call = (Callable)var2.next(); this.executor.submit(call); } } } public TheadPoolProperties getTheadPoolProperties() { return this.theadPoolProperties; } public ThreadPoolExecutor getExecutor() { return this.executor; } public ThreadPoolExecutor getExecutorChild() { return this.executorChild; } public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public void setExecutorChild(ThreadPoolExecutor executorChild) { this.executorChild = executorChild; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof ThreadPoolConfig)) { return false; } else { ThreadPoolConfig other = (ThreadPoolConfig)o; if (!other.canEqual(this)) { return false; } else { label47: { Object this$theadPoolProperties = this.getTheadPoolProperties(); Object other$theadPoolProperties = other.getTheadPoolProperties(); if (this$theadPoolProperties == null) { if (other$theadPoolProperties == null) { break label47; } } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) { break label47; } return false; } Object this$executor = this.getExecutor(); Object other$executor = other.getExecutor(); if (this$executor == null) { if (other$executor != null) { return false; } } else if (!this$executor.equals(other$executor)) { return false; } Object this$executorChild = this.getExecutorChild(); Object other$executorChild = other.getExecutorChild(); if (this$executorChild == null) { if (other$executorChild != null) { return false; } } else if (!this$executorChild.equals(other$executorChild)) { return false; } return true; } } } protected boolean canEqual(Object other) { return other instanceof ThreadPoolConfig; } public int hashCode() { int PRIME = true; int result = 1; Object $theadPoolProperties = this.getTheadPoolProperties(); int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode()); Object $executor = this.getExecutor(); result = result * 59 + ($executor == null ? 43 : $executor.hashCode()); Object $executorChild = this.getExecutorChild(); result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode()); return result; } public String toString() { return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")"; } }
2. Classe d'usine d'exécuteur
package com.visy.executor; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ExecutorFactory { private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class); private static final MapthreadPoolExecutorMap = new ConcurrentHashMap(); private static final int DEFAULT_QUEUE_SIZE = 1000; private static final String DEFAULT_EXECUTOR_NAME = "default-executor"; private static final int MAX_THREAD_NUM = 100; private static final int CORE_THREAD_NUM = 1; private static volatile ExecutorFactory instance; private ExecutorFactory() { } public static ExecutorFactory getInstance() { if (instance == null) { Class var0 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (instance == null) { instance = new ExecutorFactory(); } } } return instance; } public ThreadPoolExecutor getThreadPoolExecutorByName(String name) { return (ThreadPoolExecutor)threadPoolExecutorMap.get(name); } public static Map getThreadPoolExecutorMap() { return threadPoolExecutorMap; } public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) { if (StringUtils.isBlank(threadPoolExecutorName)) { throw new IllegalArgumentException("thread name empty"); } else { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { Class var5 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor(); threadPoolExecutorMap.put(threadPoolExecutorName, executor); logger.info("thread name: {} executor created", threadPoolExecutorName); } } } return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName); } } public void submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); defaultExecutor.submit(t); } public void submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread name: {} executor not exist.", poolName); throw new IllegalArgumentException("thread name:" + poolName + " executor not exist."); } else { executor.submit(t); } } public > Future
3. Classe de configuration multi-thread
package com.visy.threadpool; import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.validation.annotation.Validated; @Validated @Configuration @ConfigurationProperties(prefix = "visy.threadpool") public class TheadPoolProperties { // 执行并行任务时,等待多久时间超时(单位:秒) @NotNull private Integer timeOut; // 队列大小 @NotNull private Integer queueSize; // 核心线程数量 @NotNull private Integer coreThreadNum; // 线程池最大线程数量 @NotNull private Integer maxPoolSize; // 并行执行每组大小 private Integer groupSize = 20; public TheadPoolProperties() { } public Integer getTimeOut() { return this.timeOut; } public Integer getQueueSize() { return this.queueSize; } public Integer getCoreThreadNum() { return this.coreThreadNum; } public Integer getMaxPoolSize() { return this.maxPoolSize; } public Integer getGroupSize() { return this.groupSize; } public void setTimeOut(Integer timeOut) { this.timeOut = timeOut; } public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } public void setCoreThreadNum(Integer coreThreadNum) { this.coreThreadNum = coreThreadNum; } public void setMaxPoolSize(Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; } public void setGroupSize(Integer groupSize) { this.groupSize = groupSize; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof TheadPoolProperties)) { return false; } else { TheadPoolProperties other = (TheadPoolProperties)o; if (!other.canEqual(this)) { return false; } else { label71: { Object this$timeOut = this.getTimeOut(); Object other$timeOut = other.getTimeOut(); if (this$timeOut == null) { if (other$timeOut == null) { break label71; } } else if (this$timeOut.equals(other$timeOut)) { break label71; } return false; } Object this$queueSize = this.getQueueSize(); Object other$queueSize = other.getQueueSize(); if (this$queueSize == null) { if (other$queueSize != null) { return false; } } else if (!this$queueSize.equals(other$queueSize)) { return false; } label57: { Object this$coreThreadNum = this.getCoreThreadNum(); Object other$coreThreadNum = other.getCoreThreadNum(); if (this$coreThreadNum == null) { if (other$coreThreadNum == null) { break label57; } } else if (this$coreThreadNum.equals(other$coreThreadNum)) { break label57; } return false; } Object this$maxPoolSize = this.getMaxPoolSize(); Object other$maxPoolSize = other.getMaxPoolSize(); if (this$maxPoolSize == null) { if (other$maxPoolSize != null) { return false; } } else if (!this$maxPoolSize.equals(other$maxPoolSize)) { return false; } Object this$groupSize = this.getGroupSize(); Object other$groupSize = other.getGroupSize(); if (this$groupSize == null) { if (other$groupSize == null) { return true; } } else if (this$groupSize.equals(other$groupSize)) { return true; } return false; } } } protected boolean canEqual(Object other) { return other instanceof TheadPoolProperties; } public int hashCode() { int PRIME = true; int result = 1; Object $timeOut = this.getTimeOut(); int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode()); Object $queueSize = this.getQueueSize(); result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode()); Object $coreThreadNum = this.getCoreThreadNum(); result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode()); Object $maxPoolSize = this.getMaxPoolSize(); result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode()); Object $groupSize = this.getGroupSize(); result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode()); return result; } public String toString() { return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")"; } }
4. Classe d'outil de fractionnement de liste
package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /** * 列表或数组按指定大小分组,用于批量取一部分数据循环处理 * */ public class ArraySplitUtil{ /** * 按指定大小对列表分组 * @param list * @param splitSize * @return */ public List > splistList(List
list, int splitSize) { if (null == list || list.size() == 0) { return null; } int listSize = list.size(); List > newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(list); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(list.subList(start, end)); } return newList; } /** * 按指定大小对数组分组 * @param array * @param splitSize * @return */ public List
splistArray(T[] array, int splitSize) { if (null == array) { return null; } int listSize = array.length; List newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(array); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(Arrays.copyOfRange(array, start, end)); } return newList; } public static ArraySplitUtil build(){ return new ArraySplitUtil<>(); } }
5. 6. Utilisation de l'assistant multitâche :
package com.visy.helper; import com.baomidou.mybatisplus.toolkit.CollectionUtils; import com.google.common.collect.Lists; import com.visy.utils.ArraySplitUtil; import com.visy.threadpool.ThreadPoolConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** * 多任务助手 * @author visy.wang * @date 2022/5/9 14:38 */ @Service public class MultiTaskHelper { @Autowired private ThreadPoolConfig threadPoolConfig; private static final Map> ArraySplitUtilCache = new ConcurrentHashMap<>(); public List > createAndRunListTask(List list, Function handler){ return createAndRunListTask(list, null, handler); } public List
> createAndRunListTaskV2(List list, Function
, List
> handler){ return createAndRunListTask(list, handler, null); } public void createAndRunListTaskWithoutReturn(List list, Consumer handler){ createAndRunListTaskWithoutReturn(list, null, handler); } public void createAndRunListTaskWithoutReturnV2(List list, Consumer > handler){ createAndRunListTaskWithoutReturn(list, handler, null); } /** * 把列表按线程数分组 * @param list 列表 * @return 分组后的列表 */ @SuppressWarnings("unchecked") private
List > listSplit(List
list){ String key = list.get(0).getClass().getName(); int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize(); ArraySplitUtil arraySplitUtil = (ArraySplitUtil )ArraySplitUtilCache.get(key); if(Objects.isNull(arraySplitUtil)){ arraySplitUtil = ArraySplitUtil.build(); ArraySplitUtilCache.put(key, arraySplitUtil); } return arraySplitUtil.splistList(list, groupSize); } /** * 创建并运行多任务 * @param list 输入数据列表 * @param handler1 处理器1 (优先级使用) * @param handler2 处理器2 * @param 输入数据类型 * @param 输出数据类型 * @return 执行结果分组列表 */ private List > createAndRunListTask(List list, Function
, List
> handler1, Function handler2){ List > listGroup = listSplit(list); //设定每个组的任务 List
>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ return handler1.apply(subList); }else if(Objects.nonNull(handler2)){ return subList.stream().map(handler2).collect(Collectors.toList()); }else{ return null; } }); }); return threadPoolConfig.doConcurrentTask(taskList); } /** * 创建并运行多任务(无返回结果) * @param list 输入数据列表 * @param handler1 处理器1 (优先级更高) * @param handler2 处理器2 * @param 输入数据类型 */ private void createAndRunListTaskWithoutReturn(List list, Consumer > handler1, Consumer handler2){ List
> listGroup = listSplit(list); //设定每个组的任务 List
>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ handler1.accept(subList); }else if(Objects.nonNull(handler2)){ subList.forEach(handler2); } return null; }); }); threadPoolConfig.doConcurrentTask(taskList); } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!