一、概述
前文已经介绍了xxl-job调度平台的部署,以及应用系统(结算系统)如何接入xxl-job调度调度,详见:springboot集成分布式任务调度系统xxl-job(调度器和执行器) ,应用系统为什么通过一些简单的配置就能接入、使用xxl-job调度平台了呢?
二、正文
应用系统引入xxl-job依赖,新增xxl-job配置信息、以及新增配置类:XxlJobConfig.java,该配置类主要就是注入了:XxlJobSpringExecutor这个bean,那么就看下该bean究竟有何神奇之处:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XxlJobSpringExecutor源码
实现了SmartInitializingSingleton接口,那么在bean初始化结束后,会回调afterSingletonsInstantiated()方法
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
@Override
public void afterSingletonsInstantiated() {
// 1、扫描所有带有@XxlJob注解的方法,并将之注册到jobHandlerRepository中(ConcurrentMap<String, IJobHandler>)
initJobHandlerMethodRepository(applicationContext);
// 2、type = 1代表使用的是SpringGlueFactory(spring框架)
GlueFactory.refreshInstance(1);
try {
// 3、调用父类启动方法:XxlJobExecutor.start()
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void destroy() {
super.destroy();
}
// 1、扫描所有带有@XxlJob注解的方法,并将之注册到jobHandlerRepository中(ConcurrentMap<String, IJobHandler>)
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 获取所有的beanDefinitionName
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
// 循环获取bean
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null;
try {
// 扫描每一个bean,获取带有 @XxlJob 注解的方法
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods == null || annotatedMethods.isEmpty()) {
continue;
}
// 遍历所有带有 @XxlJob 注解的方法
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
// 注册每一个带有 @XxlJob 注解的方法
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
XxlJobSpringExecutor.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
XxlJobExecutor#registJobHandler方法
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod) {
if (xxlJob == null) {
return;
}
// job任务名称
String name = xxlJob.value();
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
executeMethod.setAccessible(true);
// 获取需要初始化方法init 和销毁方法destroy()
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
// 注册job任务,其实就是将每一个job任务存放到: ConcurrentMap<String, IJobHandler> jobHandlerRepository 中
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
// 注册job任务,其实就是将每一个job任务存放到:jobHandlerRepository 中
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
XxlJobExecutor.start()
public void start() throws Exception {
// 初始化log路径,这里我们配置的是:D:\\tmp\\log
XxlJobFileAppender.initLogPath(logPath);
// 初始化AdminBizClient,并存放到adminBizList中
initAdminBizList(adminAddresses, accessToken);
// 初始化(日志)清理线程,并定时清理
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化(回调、重试)线程
TriggerCallbackThread.getInstance().start();
// 使用netty开放端口,等待服务端调用、开启注册线程(心跳时间:30秒)
initEmbedServer(address, ip, port, appname, accessToken);
}
XxlJobFileAppender.initLogPath(初始化log路径)
public static void initLogPath(String logPath) {
if (logPath != null && logPath.trim().length() > 0) {
logBasePath = logPath;
}
File logPathDir = new File(logBasePath);
// 目录不存在则创建
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
initAdminBizList(初始化AdminBizClient,并存放到adminBizList中)
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
// 多个调度平台地址中间以逗号分割,遍历后,创建AdminBizClient,并存放到adminBizList
if (adminAddresses != null && adminAddresses.trim().length() > 0) {
for (String address : adminAddresses.trim().split(",")) {
if (address != null && address.trim().length() > 0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
JobLogFileCleanThread.getInstance().start(logRetentionDays)(初始化(日志)清理线程,并定时清理)
public void start(final long logRetentionDays) {
// 日志最少3天,小于3天则不开启日志清理线程
if (logRetentionDays < 3) {
return;
}
localThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 获取日志路径下所有日志文件
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs != null && childDirs.length > 0) {
// 获取到当前日期
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY, 0);
todayCal.set(Calendar.MINUTE, 0);
todayCal.set(Calendar.SECOND, 0);
todayCal.set(Calendar.MILLISECOND, 0);
Date todayDate = todayCal.getTime();
// 遍历子文件夹
for (File childFile : childDirs) {
if (!childFile.isDirectory()) {
continue;
}
// 如果文件夹名字中没有'-'则跳过,这里都是用日期作为文件夹名称
if (childFile.getName().indexOf("-") == -1) {
continue;
}
Date logFileCreateDate = null;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}
// 如果超过了最大的保留时间,就删除
if ((todayDate.getTime() - logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)) {
FileUtil.deleteRecursively(childFile);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 休眠1天
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");
}
});
localThread.setDaemon(true);
localThread.setName("xxl-job, executor JobLogFileCleanThread");
localThread.start();
}
TriggerCallbackThread.getInstance().start();(初始化回调、重试线程)
public void start() {
// 如果没有配置admin server的地址,那就跳过
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// normal callback
while (!toStop) {
try {
// 移除并返回队列的头部, take() 方法是会产生阻塞
// 这里采用了 take()+ drainTo() 方法,既保留了drainTo批量处理数据的高效,又让其拥有了阻塞效果
// 没有数据时方法不会空循环,避免CPU占用比较高
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// 回调参数组装
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
// 调用队列的 drainTo 方法,将一次获取所有的数据,drainTo方法是不会阻塞的
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// 进行回调操作,如果失败会进行重试
if (callbackParamList != null && callbackParamList.size() > 0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
try {
// 如果程序运行到这里,说明跳出了上面的 while 循环,那就是toStop为true了
// 此时队列中可能还有遗留没有处理,最后处理一次,通过 drainTo方法全量拿出并进行处理
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList != null && callbackParamList.size() > 0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
}
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();
// 重试线程
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 异常重试回调线程每隔30s执行一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
}
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
}
doCallback(回调方法)
private void doCallback(List<HandleCallbackParam> callbackParamList) {
boolean callbackRet = false;
// 遍历所有的admin server, 优先取第一台机器进行处理;如果失败,则用使用第二台机器
for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult != null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
// 成功标志位
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
// 失败则重试
if (!callbackRet) {
// 失败会将相关参数写入callbacklog日志文件中,等待重试
appendFailCallbackFile(callbackParamList);
}
}
private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList) {
if (callbackParamList == null || callbackParamList.size() == 0) {
return;
}
// jdk序列化:这里是将 Object-->byte[]
byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
// 这里是存放在日志路径下面的:/callbacklog/xxl-job-callback-{x}.log
File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
if (callbackLogFile.exists()) {
for (int i = 0; i < 100; i++) {
callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i))));
if (!callbackLogFile.exists()) {
break;
}
}
}
FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
}
retryFailCallbackFile(异常重试方法)
private void retryFailCallbackFile() {
File callbackLogPath = new File(failCallbackFilePath);
if (!callbackLogPath.exists()) {
return;
}
if (callbackLogPath.isFile()) {
callbackLogPath.delete();
}
if (!(callbackLogPath.isDirectory() && callbackLogPath.list() != null && callbackLogPath.list().length > 0)) {
return;
}
for (File callbaclLogFile : callbackLogPath.listFiles()) {
// 读取失败文件内容
byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
if (callbackParamList_bytes == null || callbackParamList_bytes.length < 1) {
callbaclLogFile.delete();
continue;
}
// 反序列为callbackParamList
List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
callbaclLogFile.delete();
// 执行回调方法,和上面的方法是一样的
doCallback(callbackParamList);
}
}
initEmbedServer(address, ip, port, appname, accessToken);(初始化:executor-server)
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// 默认端口是9999
port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
// generate address
if (address == null || address.trim().length() == 0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken == null || accessToken.trim().length() == 0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
EmbedServer#start
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// 创建两个线程池组,分别处理客户端的连接、和客户端的读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// // 创建线程池
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// 创建netty引导类,配置和串联系列组件(设置线程模型,设置通道类型,设置客户端处理器handler,设置绑定端口号)
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
// 配置链式解码器
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))
// 解码成HttpRequest
.addLast(new HttpServerCodec())
// 解码成FullHttpRequest
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
// 添加处自定义的处理器(http协议)
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 开始注册(创建并开启执行器注册线程:ExecutorRegistryThread)
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
startRegistry(创建并开启执行器注册线程:ExecutorRegistryThread)
public void startRegistry(final String appname, final String address) {
// 创建并开启执行器注册线程:ExecutorRegistryThread
ExecutorRegistryThread.getInstance().start(appname, address);
}
public void start(final String appname, final String address) {
// 验证appName不可为空
if (appname == null || appname.trim().length() == 0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
return;
}
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
logger.info(">>>>>>>>>>> 执行器注册开始........................");
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
try {
// 调用AdminBizClient的registry方法进行注册
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
// 每隔30s进行注册一次(心跳)
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
try {
// 代码到这里时,说明 toStop = true了,此时需要移除注册
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
try {
// 调用AdminBizClient的registryRemove方法进行注册
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
三、总结
执行器在启动时:
- 扫描项目中所有带有@XxlJob注解的方法,并将之注册到jobHandlerRepository中(ConcurrentMap<String, IJobHandler>)
- 初始化log路径
- 初始化AdminBizClient,并存放到adminBizList中
- 初始化日志清理线程,并定时清理
- 初始化回调、重试线程
- 使用netty开放端口,等待服务端调用、开启注册线程(心跳时间:30秒)
本文暂时没有评论,来添加一个吧(●'◡'●)