编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

xxl job-时间轮

wxchong 2024-06-11 09:59:35 开源技术 24 ℃ 0 评论

背景:

项目中原先使用spring task来做定时任务,不知为啥,任务有好几次没有执行,但是没有日志也没有报警,当时没发现。。。后来还是其他同事发现的(大罪过[捂脸])

考虑到集群部署、日志、预警、手动触发、部署简单等原因,xxl-job映入了眼帘,一番操作后,成功上线。部署步骤,网上一大堆,这里就不再絮叨了。本着刨根问底的精神,翻了一遍源代码,发现许大大的代码中有太多精彩之处,今天我们着重看下调度器。

调度策略

  • JDK自带的java.util.Timer(TaskQueue)可实现简单的定时任务,它是一个基于最小堆实现的优先级队列。
  • SpringTask底层是基于 JDK 的 ScheduledThreadPoolExecutor 线程池(单线程)来实现的。
  • 时间轮

时间轮

时间轮出自Netty中的HashedWheelTimer,是一个环形结构,可以用时钟来类比,钟面上有很多bucket,每一个bucket上可以存放多个任务,使用一个List保存该时刻到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应bucket上所有到期的任务。任务通过取模决定应该放入哪个bucket。和HashMap的原理类似,newTask对应put,使用List来解决 Hash 冲突。

以上图为例,假设一个bucket是1秒,则指针转动一轮表示的时间段为8s,假设当前指针指向 0,此时需要调度一个3s后执行的任务,显然应该加入到(0+3=3)的方格中,指针再走3s次就可以执行了;如果任务要在10s后执行,应该等指针走完一轮零2格再执行,因此应放入2,同时将round(1)保存到任务中。检查到期任务时只执行round为0的,bucket上其他任务的round减1。

Netty 、Dubbo、ZooKeeper、Kafka、Caffeine 、Akka 中都有对时间轮的实现。

xxljob中的时间轮


xxljob 从v2.1.0版本之后策略由quartz改为时间轮,本文基于v2.1.0。

  • 存入时间轮
// 1、pre read
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
    // 2、push time-ring
    for (XxlJobInfo jobInfo: scheduleList) {

        // time-ring jump
        if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
            // 2.1、trigger-expire > 5s:pass && make next-trigger-time
            logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

            // 1、misfire match
            MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
            if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                // FIRE_ONCE_NOW 》 trigger
                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
            }

            // 2、fresh next
            refreshNextValidTime(jobInfo, new Date());

        } else if (nowTime > jobInfo.getTriggerNextTime()) {
            // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

            // 1、trigger
            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

            // 2、fresh next
            refreshNextValidTime(jobInfo, new Date());

            // next-trigger-time in 5s, pre-read again
            if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                // 1、make ring second
                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                // 2、push time ring加入时间轮
                pushTimeRing(ringSecond, jobInfo.getId());

                // 3、fresh next
                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
            }
        } else {
            // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

            // 1、make ring second
            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

            // 2、push time ring加入时间轮
            pushTimeRing(ringSecond, jobInfo.getId());

            // 3、fresh next
            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

        }
    }
  • 执行时间轮
public void run() {

    while (!ringThreadToStop) {

        // align second
        try {
            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
        } catch (InterruptedException e) {
            if (!ringThreadToStop) {
                logger.error(e.getMessage(), e);
            }
        }

        try {
            // second data
            List<Integer> ringItemData = new ArrayList<>();
           // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);  
            for (int i = 0; i < 2; i++) {
                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                if (tmpData != null) {
                    ringItemData.addAll(tmpData);
                }
            }

            // ring trigger
            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
            if (ringItemData.size() > 0) {
                // do trigger
                for (int jobId: ringItemData) {
                    // do trigger
                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                }
                // clear
                ringItemData.clear();
            }
        } catch (Exception e) {
            if (!ringThreadToStop) {
                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
            }
        }
    }
    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
    }
});

总结

如图所言,xxljob利用线程每隔5s左右,将A、B、C区域触发的定时任务取出,A区域的定时任务根据过期策略判断是否执行,B区域的定时任务立即执行,C区域的任务根据key值(0-59)存入ringData

其实xxljob不算真正的时间轮,因为没有轮子。。哈哈

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表