JAVA线程池监控以及动态调解线程池
关于线程池的底子知识和源码见 Java线程池源码分析
1 配景
Java线程池源码分析 里固然先容了线程池的焦点设置(焦点线程数、最大线程数和队列巨细)该怎样设置,但是现实上业界也没有一个同一的标准。固然有些所谓的"公式",但是差别的业务场景复杂多变,设置原则也不尽雷同。从现实履向来看,IO麋集型、CPU麋集型应用在线程设置上就比力悬殊,因此没有一个通用的得当全部场景的公式。
那么我们换一种思绪,就是既然不能明确设置,那么能不能支持动态设置呢?答案是肯定的,由于线程池本身就支持焦点线程数和最大线程数的修改,而且是及时生效的。 通常在生产环境中,我们可以及时监控线程池的运行状态,随时把握应用服务的性能状态,以便在系统资源告急时及时告警,动态调解线程设置,须要时进行人工到场,排查题目,线上修复。
也就是说,通过及时监控,然后动态修改。
2 监控
我们知道,线程池使用不妥也会使服务器资源枯竭,导致非常环境的发生,比如固定线程池的壅闭队列使命数量过多、缓存线程池创建的线程过多导致内存溢出、系统假死等题目。因此,我们须要一种简单的监控方案来监控线程池的使用环境,比如完成使命数量、未完成使命数量、线程巨细等信息。
线程池的监控分为2种范例,一种是在实行使命前后全量统计使命列队时间和实行时间,别的一种是通过定时使命,定时获取生动线程数,队列中的使命数,焦点线程数,最大线程数等数据。
2.1 MonitoredThreadPoolStatisticsExecutor全量统计
参数名称分析poolName线程池的名称timeout预设的使命超时时间阈值taskTimeoutFlag是否记录使命超时次数execTimeout使命实行超时时间阈值taskExecTimeoutFlag是否记录使命实行超时次数waitInQueueTimeout使命在队列中等候的时间阈值taskWaitInQueueTimeoutFlag是否记录使命等候时间超时次数queueSizeWarningPercent使命队列使用率告警阈值queueSizeWarningFlag是否进行队列容量告警queueSizeHasWarningFlag是否须要队列容量告警(队列是否曾经到达过预警值)taskTotalTime使命总时长,以使命提交时间进行计时,单元 mstaskTotalExecTime使命总实行时长,以使命开始实行进行计时,单元 msminTaskTime最短使命时长,以提交时间计时,单元 msmaxTaskTime最长使命时长,以提交时间计时,单元 mstaskTimeoutCount使命超时次数,以使命提交进行计时taskExecTimeoutCount使命实行超时次数,以使命开始实行时间进行计时taskWaitInQueueTimeoutCount使命等候时间高出设定的阈值的次数minTaskExecTime最短使命时长,以实行时间计时,单元 msmaxTaskExecTime最长使命时长,以实行时间计时,单元 msactiveCount线程池中正在实行使命的线程数量completedTaskCount线程池已完成的使命数量,该值小于即是taskCountcorePoolSize线程池的焦点线程数量largestPoolSize线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是到达了maximumPoolSizemaximumPoolSize线程池的最大线程数量poolSize线程池当前的线程数量taskCount线程池已经实行的和未实行的使命总数为了简化,代码中的监控数据都是通过日记打印,现实中是通过kafka网络,然后做出可视化监控。
/** * 自界说可监控的线程池 */ public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean { /** * 线程池的名称 */ private String poolName; /** * 预设的使命超时时间阈值,用于统计功能。 * 以使命提交时间进行计时,单元 ms,大于0则记录超时次数。 */ private long timeout = 120000l; /** * 是否记录使命超时次数 */ private boolean taskTimeoutFlag = false; /** * 使命实行超时时间阈值,用于统计功能。 * 以使命开始实行进行计时,单元 ms,大于 0 则记录使命实行超时次数。 */ private long execTimeout = 120000l; /** * 是否记录使命实行超时次数 */ private boolean taskExecTimeoutFlag = false; /** * 使命在队列中等候的时间阈值,用于统计功能。 * 以使命提交时间开始计时到开始实举动止,单元 ms。 */ private long waitInQueueTimeout = 60000l; /** * 是否记录使命等候时间超时次数 */ private boolean taskWaitInQueueTimeoutFlag = false; /** * 使命队列使用率告警阈值 */ private int queueSizeWarningPercent = 80; /** * 是否进行队列容量告警 */ private boolean queueSizeWarningFlag = false; /** * 是否须要队列容量告警(队列是否曾经到达过预警值) */ private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false); /** * 使命总时长,用于统计功能。以使命提交时间进行计时,单元 ms */ private AtomicLong taskTotalTime = new AtomicLong(0); /** * 使命总实行时长,用于统计功能。以使命开始实行进行计时,单元 ms */ private AtomicLong taskTotalExecTime = new AtomicLong(0); /** * 最短使命时长,以提交时间计时,单元 ms */ private long minTaskTime = Long.MAX_VALUE; /** * 最长使命时长,以提交时间计时,单元 ms */ private long maxTaskTime = 0; /** * 使命超时次数,以使命提交进行计时 */ private AtomicLong taskTimeoutCount = new AtomicLong(0); /** * 使命实行超时次数,以使命开始实行时间进行计时 */ private AtomicLong taskExecTimeoutCount = new AtomicLong(0); /** * 使命等候时间高出设定的阈值的次数 */ private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0); /** * 最短使命时长,以实行时间计时,单元 ms */ private long minTaskExecTime = Long.MAX_VALUE; /** * 最长使命时长,以实行时间计时,单元 ms */ private long maxTaskExecTime = 0; /** * 生存使命信息 */ private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>(); public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler); this.poolName = poolName; this.timeout = timeout; this.execTimeout = execTimeout; this.waitInQueueTimeout = waitInQueueTimeout; this.queueSizeWarningPercent = queueSizeWarningPercent; if (this.timeout > 0) { this.taskTimeoutFlag = true; } if (this.execTimeout > 0) { this.taskExecTimeoutFlag = true; } if (this.waitInQueueTimeout > 0) { this.taskWaitInQueueTimeoutFlag = true; } if (this.queueSizeWarningPercent > 0) { this.queueSizeWarningFlag = true; } ThreadPoolMonitor.monitor(this); } public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName)); this.poolName = poolName; this.timeout = timeout; this.execTimeout = execTimeout; this.waitInQueueTimeout = waitInQueueTimeout; this.queueSizeWarningPercent = queueSizeWarningPercent; if (this.timeout > 0) { this.taskTimeoutFlag = true; } if (this.execTimeout > 0) { this.taskExecTimeoutFlag = true; } if (this.waitInQueueTimeout > 0) { this.taskWaitInQueueTimeoutFlag = true; } if (this.queueSizeWarningPercent > 0) { this.queueSizeWarningFlag = true; } ThreadPoolMonitor.monitor(this); } @Override public void execute(Runnable command) { this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics()); if (this.queueSizeWarningFlag) { float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity()); BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP); int usedPercent = bd.multiply(new BigDecimal(100)).intValue(); if (usedPercent > this.queueSizeWarningPercent) { this.queueSizeHasWarningFlag.set(true); System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity()); } } super.execute(command); } @Override protected void beforeExecute(Thread t, Runnable r) { TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode())); if (null != taskStatistics) { taskStatistics.setStartExecTime(System.currentTimeMillis()); } super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { //重写此方法做一些统计功能 long endTime = System.currentTimeMillis(); TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode())); if (null != taskStatistics) { long taskTotalTime = endTime - taskStatistics.getCommitTime(); long taskExecTime = endTime - taskStatistics.getStartExecTime(); long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime(); this.taskTotalTime.addAndGet(taskTotalTime); this.taskTotalExecTime.addAndGet(taskExecTime); if (this.minTaskTime > taskTotalTime) { this.minTaskTime = taskTotalTime; } if (this.maxTaskTime < taskTotalTime) { this.maxTaskTime = taskTotalTime; } if (this.taskTimeoutFlag && taskTotalTime > this.timeout) { this.taskTimeoutCount.incrementAndGet(); } if (this.minTaskExecTime > taskExecTime) { this.minTaskExecTime = taskExecTime; } if (this.maxTaskExecTime < taskExecTime) { this.maxTaskExecTime = taskExecTime; } if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) { this.taskExecTimeoutCount.incrementAndGet(); } if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) { this.taskWaitInQueueTimeoutCount.incrementAndGet(); } System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]"); // 初始线程数、焦点线程数、正在实行的使命数量、 // 已完成使命数量、使命总数、队列里缓存的使命数量、池中存在的最大线程数、 // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否停止 LOGGER.info("{}-pool-monitor: " + " PoolSize: {}, CorePoolSize: {}, Active: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } super.afterExecute(r, t); } /** * Spring容器管理线程池的生命周期,线程池Bean烧毁之前先关闭 * @throws Exception */ @Override public void destroy() throws Exception { shutdown(); } /** * 线程池耽误关闭时(等候线程池里的使命都实行完毕),统计线程池环境 */ @Override public void shutdown() { // 统计已实行使命、正在实行使命、未实行使命数量 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } /** * 线程池立即关闭时,统计线程池环境 */ @Override public List<Runnable> shutdownNow() { // 统计已实行使命、正在实行使命、未实行使命数量 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 使命匀称时长,无已完成使命时,返回 0 */ public long getTaskAvgTime() { if (this.getCompletedTaskCount() > 0) { return this.getTaskTotalTime().get() / this.getCompletedTaskCount(); } return 0; } /** * 使命匀称实行时长,无已完成使命时,返回 0 */ public long getTaskAvgExecTime() { if (this.getCompletedTaskCount() > 0) { return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount(); } return 0; } //省略setter/getter方法} public class TaskStatistics { /** * 使命提交时间 */ private long commitTime; /** * 使命开始实行时间 */ private long startExecTime; public TaskStatistics() { this.commitTime = System.currentTimeMillis(); }}方法寄义shutdown()线程池耽误关闭时(等候线程池里的使命都实行完毕),统计已实行使命、正在实行使命、未实行使命数量shutdownNow()线程池立即关闭时,统计已实行使命、正在实行使命、未实行使命数量beforeExecute(Thread t, Runnable r)使命实行之前,记录使命开始时间,startTimes这个HashMap以使命的hashCode为key,开始时间为值afterExecute(Runnable r, Throwable t)使命实行之后,盘算使命竣事时间。统计使命耗时、初始线程数、焦点线程数、正在实行的使命数量、已完成使命数量、使命总数、队列里缓存的使命数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否停止信息留意事项:
- 在 afterExecute 方法中须要留意,须要调用 ConcurrentHashMap 的 remove 方法移除并返回使命的开始时间信息,而不是调用 get 方法,由于在高并发环境下,线程池里要实行的使命很多,假如只获取值不移除的话,会使 ConcurrentHashMap 越来越大,引发内存走漏或溢出题目。
2.2 定时收罗
public class ThreadPoolMonitor { private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>(); private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor")); private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L; public ThreadPoolMonitor() { } public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) { if (threadPoolExecutor instanceof MonitoredThreadPoolStatisticsExecutor) { throw new IllegalArgumentException("MonitoredThreadPoolStatisticsExecutor is already monitored."); } else { monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS); } } public static void remove(String name) { ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name); if (futureWrapper != null) { futureWrapper.future.cancel(false); } } public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) { ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name); if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) { POOL_TASK_FUTURE_MAP.remove(name, futureWrapper); futureWrapper.future.cancel(false); } } static void monitor(MonitoredThreadPoolStatisticsExecutor threadPoolExecutor) { monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS); } private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) { PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name); POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> { if (v == null) { return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor); } else { throw new IllegalStateException("duplicate pool name: " + name); } }); } static { Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown)); } static class FutureWrapper { private final Future<?> future; private final ThreadPoolExecutor threadPoolExecutor; public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) { this.future = future; this.threadPoolExecutor = threadPoolExecutor; } }}public class PoolMonitorTask implements Runnable { private final ThreadPoolExecutor monitoredThreadPool; private final String poolName; private volatile long lastTaskCount = 0L; public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) { this.monitoredThreadPool = monitoredThreadPool; this.poolName = poolName; } @Override public void run() { int activeCount = this.monitoredThreadPool.getActiveCount(); int corePoolSize = this.monitoredThreadPool.getCorePoolSize(); int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize(); int queueTaskSize = this.monitoredThreadPool.getQueue().size(); long taskCount = this.monitoredThreadPool.getTaskCount(); int executedTask = (int) (taskCount - this.lastTaskCount); log.info("线程池名称 = {}, 生动线程数峰值 = {}, 队列使命数峰值 = {}, 焦点线程数 = {}, 最大线程数 = {}, 实行的使命总数 = {}", this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask); this.lastTaskCount = taskCount; if (this.monitoredThreadPool.isTerminated()) { ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool); } }}2.3 可视化
通过Kafka获取到监控数据后,可以做一个可视化页面,比如可以展示下面这这些数据
- active/coreSize :运动线程数和焦点线程数的比值, 此中active = executor.getActiveCount(),表现全部运行中的工作线程的数量,这个比值反应线程池的线程生动状态,假如不停维持在一个很低的程度,则分析线程池须要进行缩容;假如长时间维持一个很大的数值,分析生动度好,线程池使用率高。
- active/maxSize :运动线程数和最大线程数的比值,这个值可以配合上面的 active/coreSize 来看,当active/coreSize大于100%的时间,假如active/maxSize维持在一个较低的值,则分析当前线程池的负载偏低,假如大于60%大概更高,则分析线程池过载,须要及时调解线程池容量设置。
- completedTaskCount:实行完毕的工作线程的总数,包含汗青全部。
- largestPoolSize:汗青上线程池容量触达过的最大值
- rejectCount:被拒绝的线程的数量,假如大量线程被拒绝,则分析当前线程池已经溢出了,须要及时调解线程池设置
- queueSize:队列中工作线程的数量,假如大量的线程池在列队,分析coreSize已经不敷用了,可以根据现实环境来调解,对于实行时间要求很严格的业务场景,大概须要通过提升coreSize来淘汰列队环境。
3 动态调解线程池
设置线程池的巨细可根据以下几个维度进行分析来设置公道的线程数:
使命性质可分为:CPU麋集型使命,IO麋集型使命,肴杂型使命,使命的实行时长,使命是否有依靠——依靠其他系统资源,如数据库毗连等。
1、CPU麋集型使命 只管使用较小的线程池,一样寻常为CPU核数+1。 由于CPU麋集型使命使得CPU使用率很高,若开过多的线程数,只能增长上下文切换的次数,因此会带来额外的开销。
2、IO麋集型使命 可以使用稍大的线程池,一样寻常为2*CPU核数+1。 由于IO使用不占用CPU,不要让CPU闲下来,应加大线程数量,因此可以让CPU在等候IO的时间去处理处罚别的使命,充实使用CPU时间。
3、肴杂型使命 可以将使命分成IO麋集型和CPU麋集型使命,然后分别用差别的线程池去处理处罚。 只要分完之后两个使命的实行时间相差不大,那么就会比串行实行来的高效。 由于假如分别之后两个使命实行时间相差甚远,那么先实行完的使命就要等后实行完的使命,终极的时间仍然取决于后实行完的使命,而且还要加上使命拆分与归并的开销,得不偿失
4、依靠其他资源 如某个使命依靠数据库的毗连返回的效果,这时间等候的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才华更好的使用CPU。
总之,线程等候时间所占比例越高,须要越多线程。线程CPU时间所占比例越高,须要越少线程。
但是实践发现,只管我们颠末审慎的评估,仍然不可以或许包管一次盘算出来符合的参数,那么我们是否可以将修改线程池参数的资源降下来,如许至少可以发生故障的时间可以快速调解从而紧缩故障规复的时间呢? 基于这个思考,我们是否可以将线程池的参数从代码中迁徙到分布式设置中心上,实现线程池参数可动态设置和即时生效,线程池参数动态化前后的参数修改流程对比如下:
3.1 修改参数
现实应用中重要有下列参数可以支持动态修改。
线程池参数分析corePoolSize焦点线程数maximumPoolSize最大线程数queueCapacity等候队列巨细timeout使命超时时间告警阈值execTimeout使命实行超时时间告警阈值queuedTaskWarningSize等候队列列队数量告警阈值checkInterval线程池定时监控时间隔断autoExtend是否主动扩容此中的corePoolSize、maximumPoolSize都可以使用ThreadPoolExecutor提供的api实现: public void setCorePoolSize(int corePoolSize) public void setMaximumPoolSize(int maximumPoolSize)
从ThreadPoolExecutor源码中可知,
设置新的焦点线程数时, 假如设置的新值小于当前值,多余的现有线程将在下一次空闲时停止,假如新设置的corePoolSize值更大,将在须要时启动新线程来实行任何列队的使命;
设置新的最大线程数时,假如新值小于当前值,多余的现有线程将在下一次空闲时停止。
ThreadPoolExecutor没有提供直接修改等候队列巨细的api。这就须要我们自界说一个可以修改容量的队列。其实很简单,只要把jdk原生的队列中的容量设置为可以修改,并提供修改方法即可。 比如把jdk中的LinkedBlockingQueue拷贝一份,定名为CapacityResizableLinkedBlockingQueue。 将其capacity的属性变为可变的,并提供set方法:
/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;将上述原生代码改为:private volatile int capacity;public void setCapacity(int capacity) { this.capacity = capacity;}3.2 设置监听
可以通过设置中心的动态加载来处理处罚,以Apollo为例,我们可以使用Apollo的ChangeListener来实现对设置变更的监听,(假如是MySQL,可以修改完设置后直接同过HTTP接口关照客户端进行设置革新),代码片断如下:
public class ThreadPoolConfigUpdateListener { @Value("${apollo.bootstrap.namespaces:application}") private String namespace; @Autowired private DynamicThreadPoolFacade dynamicThreadPoolManager; @Autowired private DynamicThreadPoolProperties poolProperties; @PostConstruct public void init() { initConfigUpdateListener(); } public void initConfigUpdateListener() { String apolloNamespace = namespace; if (StringUtils.hasText(poolProperties.getApolloNamespace())) { apolloNamespace = poolProperties.getApolloNamespace(); } String finalApolloNamespace = apolloNamespace; Config config = ConfigService.getConfig(finalApolloNamespace); config.addChangeListener(changeEvent -> { try { Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000); } catch (InterruptedException e) { log.error("设置革新非常",e); } dynamicThreadPoolManager.refreshThreadPoolExecutor(); log.info("线程池设置有变革,革新完成"); }); }}线程池设置的革新的逻辑如下:
public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) { try { dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> { ThreadPoolMonitor threadPoolExecutor =ThreadPoolMonitor.POOL_TASK_FUTURE_MAP.get(poolProperties.getThreadPoolName()).getThreadPoolExecutor(); executor.setCorePoolSize(poolProperties.getCorePoolSize()); executor.setMaxPoolSize(poolProperties.getMaximumPoolSize()); executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime()); }); }catch(Exception e){ log.error("Executor 参数设置非常",e); } }3.4 配景管理
固然可以通过管理配景来动态修改,如下图,参数可生存在mysql
末了,一个简单的线程池监控以及动态修改架构如下图:
4 线程池怎样保持有序/串行
线程池固然是无序的,但是假如我们须要像kafka那样分区有序怎么办呢?
思绪:创建多个只有一个线程的线程池,然后按照差别key 在 差别线程池上实行,如许key就相当于kafka的分区,只要key雷同,那么提交的使命就会有序实行。
demo如下
类似的功能在git有大神开源了,可以去参考一下 https://github.com/PhantomThief/more-lambdas-java |