编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

xxl-job执行器源码分析(1、执行器启动阶段做了哪些事情)

wxchong 2024-06-11 09:59:05 开源技术 24 ℃ 0 评论

一、概述

前文已经介绍了xxl-job调度平台的部署,以及应用系统(结算系统)如何接入xxl-job调度调度,详见:springboot集成分布式任务调度系统xxl-job(调度器和执行器) ,应用系统为什么通过一些简单的配置就能接入、使用xxl-job调度平台了呢?

二、正文

应用系统引入xxl-job依赖,新增xxl-job配置信息、以及新增配置类:XxlJobConfig.java,该配置类主要就是注入了:XxlJobSpringExecutor这个bean,那么就看下该bean究竟有何神奇之处:

@Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }

XxlJobSpringExecutor源码

实现了SmartInitializingSingleton接口,那么在bean初始化结束后,会回调afterSingletonsInstantiated()方法

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);

    @Override
    public void afterSingletonsInstantiated() {
        // 1、扫描所有带有@XxlJob注解的方法,并将之注册到jobHandlerRepository中(ConcurrentMap<String, IJobHandler>)
        initJobHandlerMethodRepository(applicationContext);
        // 2、type = 1代表使用的是SpringGlueFactory(spring框架)
        GlueFactory.refreshInstance(1);
        try {
            // 3、调用父类启动方法:XxlJobExecutor.start()
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void destroy() {
        super.destroy();
    }

    // 1、扫描所有带有@XxlJob注解的方法,并将之注册到jobHandlerRepository中(ConcurrentMap<String, IJobHandler>)
    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // 获取所有的beanDefinitionName
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {
            // 循环获取bean
            Object bean = applicationContext.getBean(beanDefinitionName);
            Map<Method, XxlJob> annotatedMethods = null;
            try {
                // 扫描每一个bean,获取带有 @XxlJob 注解的方法
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods == null || annotatedMethods.isEmpty()) {
                continue;
            }
            // 遍历所有带有 @XxlJob 注解的方法
            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                // 注册每一个带有 @XxlJob 注解的方法
                registJobHandler(xxlJob, bean, executeMethod);
            }
        }
    }

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        XxlJobSpringExecutor.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}

XxlJobExecutor#registJobHandler方法

protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod) {
        if (xxlJob == null) {
            return;
        }
        // job任务名称
        String name = xxlJob.value();
        Class<?> clazz = bean.getClass();
        String methodName = executeMethod.getName();
        if (name.trim().length() == 0) {
            throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
        }
        if (loadJobHandler(name) != null) {
            throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
        }
        executeMethod.setAccessible(true);
        // 获取需要初始化方法init 和销毁方法destroy()
        Method initMethod = null;
        Method destroyMethod = null;
        if (xxlJob.init().trim().length() > 0) {
            try {
                initMethod = clazz.getDeclaredMethod(xxlJob.init());
                initMethod.setAccessible(true);
            } catch (NoSuchMethodException e) {
                throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
            }
        }
        if (xxlJob.destroy().trim().length() > 0) {
            try {
                destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
                destroyMethod.setAccessible(true);
            } catch (NoSuchMethodException e) {
                throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
            }
        }
        // 注册job任务,其实就是将每一个job任务存放到: ConcurrentMap<String, IJobHandler> jobHandlerRepository 中
        registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
    }


    private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
    // 注册job任务,其实就是将每一个job任务存放到:jobHandlerRepository 中
    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }

XxlJobExecutor.start()

public void start() throws Exception {

        // 初始化log路径,这里我们配置的是:D:\\tmp\\log
        XxlJobFileAppender.initLogPath(logPath);

        // 初始化AdminBizClient,并存放到adminBizList中
        initAdminBizList(adminAddresses, accessToken);

        // 初始化(日志)清理线程,并定时清理
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 初始化(回调、重试)线程
        TriggerCallbackThread.getInstance().start();

        // 使用netty开放端口,等待服务端调用、开启注册线程(心跳时间:30秒)
        initEmbedServer(address, ip, port, appname, accessToken);
    }

XxlJobFileAppender.initLogPath(初始化log路径)

public static void initLogPath(String logPath) {
		if (logPath != null && logPath.trim().length() > 0) {
			logBasePath = logPath;
		}
		File logPathDir = new File(logBasePath);
    // 目录不存在则创建
		if (!logPathDir.exists()) {
			logPathDir.mkdirs();
		}
		logBasePath = logPathDir.getPath();
		File glueBaseDir = new File(logPathDir, "gluesource");
		if (!glueBaseDir.exists()) {
			glueBaseDir.mkdirs();
		}
		glueSrcPath = glueBaseDir.getPath();
	}

initAdminBizList(初始化AdminBizClient,并存放到adminBizList中)

private static List<AdminBiz> adminBizList;

private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
  // 多个调度平台地址中间以逗号分割,遍历后,创建AdminBizClient,并存放到adminBizList
  if (adminAddresses != null && adminAddresses.trim().length() > 0) {
            for (String address : adminAddresses.trim().split(",")) {
                if (address != null && address.trim().length() > 0) {
                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

JobLogFileCleanThread.getInstance().start(logRetentionDays)(初始化(日志)清理线程,并定时清理)

public void start(final long logRetentionDays) {
        // 日志最少3天,小于3天则不开启日志清理线程
        if (logRetentionDays < 3) {
            return;
        }
        localThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        // 获取日志路径下所有日志文件
                        File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                        if (childDirs != null && childDirs.length > 0) {
                            // 获取到当前日期
                            Calendar todayCal = Calendar.getInstance();
                            todayCal.set(Calendar.HOUR_OF_DAY, 0);
                            todayCal.set(Calendar.MINUTE, 0);
                            todayCal.set(Calendar.SECOND, 0);
                            todayCal.set(Calendar.MILLISECOND, 0);
                            Date todayDate = todayCal.getTime();
                            // 遍历子文件夹
                            for (File childFile : childDirs) {
                                if (!childFile.isDirectory()) {
                                    continue;
                                }
                                // 如果文件夹名字中没有'-'则跳过,这里都是用日期作为文件夹名称
                                if (childFile.getName().indexOf("-") == -1) {
                                    continue;
                                }
                                Date logFileCreateDate = null;
                                try {
                                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                    logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                                } catch (ParseException e) {
                                    logger.error(e.getMessage(), e);
                                }
                                if (logFileCreateDate == null) {
                                    continue;
                                }
                                // 如果超过了最大的保留时间,就删除
                                if ((todayDate.getTime() - logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)) {
                                    FileUtil.deleteRecursively(childFile);
                                }
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        // 休眠1天
                        TimeUnit.DAYS.sleep(1);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");
            }
        });
        localThread.setDaemon(true);
        localThread.setName("xxl-job, executor JobLogFileCleanThread");
        localThread.start();
    }

TriggerCallbackThread.getInstance().start();(初始化回调、重试线程)

public void start() {
        // 如果没有配置admin server的地址,那就跳过
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
            return;
        }
        triggerCallbackThread = new Thread(new Runnable() {
            @Override
            public void run() {
                // normal callback
                while (!toStop) {
                    try {
                        // 移除并返回队列的头部, take() 方法是会产生阻塞
                        // 这里采用了 take()+ drainTo() 方法,既保留了drainTo批量处理数据的高效,又让其拥有了阻塞效果
                        // 没有数据时方法不会空循环,避免CPU占用比较高
                        HandleCallbackParam callback = getInstance().callBackQueue.take();
                        if (callback != null) {
                            // 回调参数组装
                            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                            // 调用队列的 drainTo 方法,将一次获取所有的数据,drainTo方法是不会阻塞的
                            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                            callbackParamList.add(callback);
                            // 进行回调操作,如果失败会进行重试
                            if (callbackParamList != null && callbackParamList.size() > 0) {
                                doCallback(callbackParamList);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                try {
                    // 如果程序运行到这里,说明跳出了上面的 while 循环,那就是toStop为true了
                    // 此时队列中可能还有遗留没有处理,最后处理一次,通过 drainTo方法全量拿出并进行处理
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    if (callbackParamList != null && callbackParamList.size() > 0) {
                        doCallback(callbackParamList);
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
            }
        });
        triggerCallbackThread.setDaemon(true);
        triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
        triggerCallbackThread.start();

        // 重试线程
        triggerRetryCallbackThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        retryFailCallbackFile();
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        // 异常重试回调线程每隔30s执行一次
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
            }
        });
        triggerRetryCallbackThread.setDaemon(true);
        triggerRetryCallbackThread.start();
    }

doCallback(回调方法)

private void doCallback(List<HandleCallbackParam> callbackParamList) {
        boolean callbackRet = false;
        // 遍历所有的admin server, 优先取第一台机器进行处理;如果失败,则用使用第二台机器
        for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
                if (callbackResult != null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                    // 成功标志位
                    callbackRet = true;
                    break;
                } else {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
                }
            } catch (Exception e) {
                callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
            }
        }
        // 失败则重试
        if (!callbackRet) {
            // 失败会将相关参数写入callbacklog日志文件中,等待重试
            appendFailCallbackFile(callbackParamList);
        }
    }

private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList) {
        if (callbackParamList == null || callbackParamList.size() == 0) {
            return;
        }
        // jdk序列化:这里是将 Object-->byte[]
        byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
        // 这里是存放在日志路径下面的:/callbacklog/xxl-job-callback-{x}.log
        File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
        if (callbackLogFile.exists()) {
            for (int i = 0; i < 100; i++) {
                callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i))));
                if (!callbackLogFile.exists()) {
                    break;
                }
            }
        }
        FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
    }

retryFailCallbackFile(异常重试方法)

private void retryFailCallbackFile() {
        File callbackLogPath = new File(failCallbackFilePath);
        if (!callbackLogPath.exists()) {
            return;
        }
        if (callbackLogPath.isFile()) {
            callbackLogPath.delete();
        }
        if (!(callbackLogPath.isDirectory() && callbackLogPath.list() != null && callbackLogPath.list().length > 0)) {
            return;
        }
        for (File callbaclLogFile : callbackLogPath.listFiles()) {
            // 读取失败文件内容
            byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
            if (callbackParamList_bytes == null || callbackParamList_bytes.length < 1) {
                callbaclLogFile.delete();
                continue;
            }
            // 反序列为callbackParamList
            List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
            callbaclLogFile.delete();
            // 执行回调方法,和上面的方法是一样的
            doCallback(callbackParamList);
        }
    }

initEmbedServer(address, ip, port, appname, accessToken);(初始化:executor-server)

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
     // 默认端口是9999      
       port = port > 0 ? port : NetUtil.findAvailablePort(9999);
        ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
        // generate address
        if (address == null || address.trim().length() == 0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }
        // accessToken
        if (accessToken == null || accessToken.trim().length() == 0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }
        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }

EmbedServer#start

public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 创建两个线程池组,分别处理客户端的连接、和客户端的读写
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                // // 创建线程池
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                            }
                        },
                        new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                            }
                        });


                try {
                    // 创建netty引导类,配置和串联系列组件(设置线程模型,设置通道类型,设置客户端处理器handler,设置绑定端口号)
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    // 配置链式解码器
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))
                                            // 解码成HttpRequest
                                            .addLast(new HttpServerCodec())
                                            // 解码成FullHttpRequest
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))
                                            // 添加处自定义的处理器(http协议)
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // 绑定端口
                    ChannelFuture future = bootstrap.bind(port).sync();
                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
                    // 开始注册(创建并开启执行器注册线程:ExecutorRegistryThread)
                    startRegistry(appname, address);
                    // wait util stop
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                    }
                } finally {
                    // stop
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }

startRegistry(创建并开启执行器注册线程:ExecutorRegistryThread)

public void startRegistry(final String appname, final String address) {
        // 创建并开启执行器注册线程:ExecutorRegistryThread
        ExecutorRegistryThread.getInstance().start(appname, address);
}

public void start(final String appname, final String address) {
        // 验证appName不可为空
        if (appname == null || appname.trim().length() == 0) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
            return;
        }
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
            return;
        }
        registryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        logger.info(">>>>>>>>>>> 执行器注册开始........................");
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
                            try {
                                // 调用AdminBizClient的registry方法进行注册
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        if (!toStop) {
                            // 每隔30s进行注册一次(心跳)
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                        }
                    }
                }

                try {
                    // 代码到这里时,说明 toStop = true了,此时需要移除注册
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
                        try {
                            // 调用AdminBizClient的registryRemove方法进行注册
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                            }
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
            }
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();
    }

三、总结

执行器在启动时:

  • 扫描项目中所有带有@XxlJob注解的方法,并将之注册到jobHandlerRepository中(ConcurrentMap<String, IJobHandler>)
  • 初始化log路径
  • 初始化AdminBizClient,并存放到adminBizList中
  • 初始化日志清理线程,并定时清理
  • 初始化回调、重试线程
  • 使用netty开放端口,等待服务端调用、开启注册线程(心跳时间:30秒)

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表