我是靠谱客的博主 贪玩香氛,这篇文章主要介绍xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool,现在分享给大家,希望可以做个参考。

xxl-job源码解读:触发器线程池TriggerPool

本文基于xxl-job的2.3.1版本

基本说明

作为触发器调用的统一入口,为触发器的调用提供线程池异步处理,并根据触发时间进行线程池的区分。

在不进行源码改动的情况下,共有四个地方会调用触发器JobTriggerPoolHelper.trigger

  1. 调度器触发执行:由定时任务的触发器正常调度
  2. 页面手动触发执行:从任务信息页面,点击执行一次,手动触发执行
  3. 失败监听器触发执行:如果任务执行失败,并且任务设置了失败重试次数,会根据重试次数再次调用触发器执行
  4. 父任务成功触发执行:设置了父子任务的情况下,父任务成功后,会由XxlJobCompleter 触发调用子任务的触发器执行

触发器源码解读

JobTriggerPoolHelper 负责分配触发器线程池,并作为触发器调用的统一入口 。

类名全路径:com.xxl.job.admin.core.thread.JobTriggerPoolHelper

代码逻辑流程图

在这里插入图片描述

源码解读

调用入口方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** * 为任务添加一个触发器 * <p>可用于立即执行一次</p> * * @param jobId 触发的任务ID * @param triggerType 添加的触发器类型 * @param failRetryCount >=0: use this param * <0: use param from job info config * @param executorShardingParam 分片执行参数 * @param executorParam null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); }

线程池区分+超时判断

快慢线程池定义,区别在队列大小,可以通过配置修改最大线程数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start() { fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode())); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode())); }

分配线程池进行执行。一分钟为周期,进行超时任务统计,高频调用的并且触发执行时间长的会被转移到慢线程池。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// job timeout count private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); /** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool ThreadPoolExecutor triggerPool = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); // 同一个任务 一分钟内任务触发超500ms 十次,转入慢线程池处理 if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool = slowTriggerPool; } // trigger triggerPool.execute(() -> { long start = System.currentTimeMillis(); try { // do trigger XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // 检查时间循环,每整分钟清空一次超时触发集合 long minTimNow = System.currentTimeMillis() / 60000; if (minTim != minTimNow) { minTim = minTimNow; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis() - start; if (cost > 500) { // ob-timeout threshold 500ms // 根据JobId进行统计, 任务触发超过500ms认为超时, 统计超时次数 AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } }); }

最后

以上就是贪玩香氛最近收集整理的关于xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool的全部内容,更多相关xxl-job源码解读内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(92)

评论列表共有 0 条评论

立即
投稿
返回
顶部