编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

自研简单实用的任务队列系统 -- 小虎任务队列

wxchong 2024-07-20 08:21:19 开源技术 34 ℃ 0 评论


小虎任务队列:

直接上代码

文件结构:

data
lib
-- api.js
-- base.js
-- Router.js
-- runjs.js
log
index.js

index.js

var env = process.env.NODE_ENV || 'production';
env = env.toLowerCase();
global.ONLINE = (env=='development'?false:true);

global.express = require('express');
global.app = express();
global.ipport = ONLINE?'172.10.1.66:11300':'192.168.1.88:11300';
global.tube = 'Queue';
global.clientIsConn_pop = false;
global.clientIsConn_push = false;
global.MsgArray = [];

global.signKey = 'eae6331dd14gf10c4f4d034578cf0f08';
global.fileDir = './data/';
global.Base = require('./lib/base');
global.bodyParser = require('body-parser');

var favicon = require('serve-favicon');
var http = require('http');
var path = require('path');
var Router = require('./lib/Router');
var fs = require("fs");
var readline = require("readline");

/*app.disable('x-powered-by');*/
app.all('*', function(req, res, next) {
	res.header("X-Powered-By",'ThinkPHP 6.0');//自定义,迷惑用户
	next();
});
app.use('/favicon.ico', express.static(__dirname+'/favicon.ico'));
app.use(bodyParser.urlencoded({ extended: true }));
app.use('/', Router);

app.use(function(req, res, next) {
	global.Base.api_error(res, 'error');
});
app.use(function(err, req, res, next) {
	global.Base.api_error(res, 'error');
});

process.on('uncaughtException',function(err){});


global.bs_pop = require('nodestalker');
global.bs_push = require('nodestalker');

function pop(){
	
	var client = global.bs_pop.Client(global.ipport);
	
	function reconn(c){
		c.disconnect();
		global.clientIsConn_pop = false;
		if( !global.clientIsConn_pop ){
			setTimeout(function(){
				console.log('pop ReConnectting...');
				pop();
			},2000);
		}
	}
	
	client.on('connect',function(){
		global.clientIsConn_pop = true;
		console.log('pop Client OK');
	});
	
	client.on('close',function(){
		console.log('pop Client Close');
		reconn(client);
	});
	
	client.on('end',function(){
		console.log('pop Client End');
		reconn(client);
	});
	
	client.on('error',function(){
		console.log('pop Client Error');
		reconn(client);
	});
	
	/*消费消息*/
	client.watch(global.tube).onSuccess(function(data) {
		
		/*任务处理*/
		function taskProcess(job,fn){
			
			var dataJson = job.data;
			var dataArray = {};
			if( global.Base.isJSON(dataJson) ){
				dataArray = JSON.parse(dataJson);
				
				/*应该执行特定动作*/
				let runjs = require('./runjs');
				runjs.run(dataArray);
				
				fn.call();
			}else{
				fn.call();
			}
			
		}
		
		function resJob() {
			client.reserve().onSuccess(function(job) {
				taskProcess(job, function(){
					client.deleteJob(job.id).onSuccess(function(del_msg) {
						resJob();
					});
				});
				
			});
		}

		resJob();
	});
}

function push(){
	
	var client = global.bs_push.Client(global.ipport);
	
	function reconn(c){
		c.disconnect();
		global.clientIsConn_push = false;
		if( !global.clientIsConn_push ){
			setTimeout(function(){
				console.log('push ReConnectting...');
				push();
			},2000);
		}
	}
		
	/*生产消息*/
	function push_data(c){
		if( global.clientIsConn_push && global.MsgArray.length > 0 ){
			
			c.use(global.tube).onSuccess(function(data) {
				while(global.MsgArray.length>0){
					var one = global.MsgArray.shift();
					c.put(one);
				}
				
				setTimeout(function(){
					push_data(c);
				},1);
			});
		}else{
			setTimeout(function(){
				push_data(c);
			},1000);
		}
	}
	
	client.use(global.tube).onSuccess(function(data) {
		if(global.MsgArray.length>0){
			var one = global.MsgArray.shift();
			client.put(one);
		}		
	});
	
	client.on('connect',function(){
		global.clientIsConn_push = true;
		console.log('push Client OK');
		push_data(client);
	});
	
	client.on('close',function(){
		console.log('push Client Close');
		reconn(client);
	});
	
	client.on('end',function(){
		console.log('push Client End');
		reconn(client);
	});
	
	client.on('error',function(){
		console.log('push Client Error');
		reconn(client);
	});
}


app.listen(5000, function () {
	console.log('消费服务启动!');
	pop();
	push();
});


/*文件队列*/
global.fileList = [];
global.isRunning = 0;
global.oriFileList = {};

/*读取文件内容*/
function readFileToVar(){
	if( global.fileList.length > 0 ){
		
		global.isRunning = 1;
		let oneDir = global.fileList.shift();
		
		const readliner = readline.createInterface({
		????input: fs.createReadStream( oneDir )
		});
		
		readliner.on('line', function(dataChunk) {
			global.MsgArray.push(dataChunk);
		});
		readliner.on('close', function() {
			global.isRunning = 0;
			fs.unlinkSync(oneDir);
		});
		readliner.on('error', function() {
			global.isRunning = 0;
			fs.unlinkSync(oneDir);
		});
		readliner.on('end', function() {
			global.isRunning = 0;
			fs.unlinkSync(oneDir);
		});
		
	}
}

let timer;
//定时读取文件
timer = setInterval( ()=>{
	if(global.isRunning == 1){
		return;
	}else{
		readFileToVar();
	}
}, 1000 );

let timer_o;
//定时修改原文件列表
timer_o = setInterval( ()=>{
	for(let i in global.oriFileList){
		let guid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
			var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
			return v.toString(16);
		});
		fs.rename(global.fileDir + i, global.fileDir + 'd-'+guid+'.txt',function(err){
			//console.log(i+' rename success');
			delete global.oriFileList[i];
		});
		break;
	}
}, 1000 );

/*监视文件变化*/
var watcher = fs.watch( global.fileDir );
watcher.on('change',function(event,filename){	
	var reg = new RegExp(/^d-[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}\.txt$/);
	if (reg.test(filename)) {
		let stats = fs.statSync( global.fileDir + filename );
		if ( stats.isFile()) {
			global.fileList.push( global.fileDir + filename );
		}
	}else{
		if( !global.oriFileList[filename] ){
			global.oriFileList[filename] = filename;
		}
	}
});
/*首次运行 检测文件夹*/
var dirList = fs.readdirSync( global.fileDir );
dirList.forEach(function(filename){
	var reg = new RegExp(/^d-[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}\.txt$/);
	let stats = fs.statSync( global.fileDir + filename );
	if ( stats.isFile()) {
		if( reg.test(filename) ){
			global.fileList.push( global.fileDir + filename );
		}else{
			if( !global.oriFileList[filename] ){
				global.oriFileList[filename] = filename;
			}
		}
	}
});

base.js

function Base(){}

Base.prototype.api_success = function (res,obj,jsoncallback) {
    res.writeHead(200, { 'Content-Type': 'text/plain; charset=UTF-8' });
	var objectData = {
		"status" : 1,
		"data" : obj?obj:''
	};
	if(jsoncallback){
		res.end(jsoncallback+'('+JSON.stringify(objectData)+')');
	}
    res.end(JSON.stringify(objectData));
};

Base.prototype.api_error = function (res,msg,jsoncallback) {
    res.writeHead(200, { 'Content-Type': 'text/plain; charset=UTF-8' });
	var objectData = {
		"status" : 0,
		"msg" : msg?msg:''
	};
	if(jsoncallback){
		res.end(jsoncallback+'('+JSON.stringify(objectData)+')');
	}
    res.end(JSON.stringify(objectData));
};
Base.prototype.isJSON = function(res) {
	try {
		if ( !isNaN(res) ) return false;
		JSON.parse(res);
		return true;
	}
	catch(e) {
		return false;
	}
};

module.exports = new Base();

Router.js

var api = require('../lib/api');
var multipart = require('connect-multiparty');
var multipartMiddleware = multipart();

var router = global.express.Router();

var bodyData = global.bodyParser.json();

router.post('/api/public_interface',		bodyData,				api.public_interface);// 默认方式
router.post('/api/public_interface/',		bodyData,				api.public_interface);// 默认方式
router.post('/api/public_interface/post',	bodyData,				api.public_interface);// post方式
router.post('/api/public_interface/json',	bodyData,				api.public_interface);// application/json方式
router.post('/api/public_interface/form',	multipartMiddleware,	api.public_interface);// form方式
router.get('/api/public_interface?',		api.public_interface						);// get方式
router.get('/', api.index); // 首页
router.all('/*',api.index);

module.exports = router;

api.js

var crypto = require('crypto');

/*默认主页*/
exports.index = function (req, res) {
	global.Base.api_error('error');
};

/*外部接口*/
exports.public_interface = function (req, res, next) {
	var thatres = res;
	var $_POST = req.body;
	var $_GET = req.query;
	var getData = $_POST.hasOwnProperty('data') ? $_POST : $_GET;
	
	var data = getData.data;
	var sign = getData.sign;
	
	try {
		var isSignOk = 1;
		//检测sign
		var reg = /^[a-zA-Z0-9]{40}$/;
		if( !reg.test(sign) ){
			isSignOk = 0;
		}
		var sha1 = crypto.createHash('sha1');
		sha1.update( global.signKey + data );
		if( sha1.digest('hex') != sign ){
			isSignOk = 0;
		}
		
		if( isSignOk == 0 ){
			global.Base.api_error(thatres, 'sign error');
			return;
		}
		
		var nowtime = parseInt(new Date().getTime() / 1000);
		
		var dataArray = Buffer.from(data,'base64').toString();
				
		/*数据处理 开始*/
		global.MsgArray.push(dataArray);
		/*数据处理 结束*/
		
		global.Base.api_success(thatres);
	} catch (err) {
		global.Base.api_error(thatres, 'SystemException:' + err.toString());
	}
};

runjs.js

var fs = require("fs");
var runjs = {
	
	run: function(data){
		if(data.type){
			switch(data.type){
				case 'callback'://回调类型
					this.CallBack(data.data);
					return;
				case 'InsertData'://入库类型
					this.InsertData(data.data);
					return;
				default://类型自己扩展
					return;
			}
		}
	},
	
	CallBack: function(data){
		/*应该执行特定动作,这里以写入文件进行测试*/
		fs.appendFile('./log/log_test.txt',data+"\r\n","utf8",(err) => {});
	},
	
	InsertData: function(data){
		/*应该执行特定动作,这里以写入文件进行测试*/
		fs.appendFile('./log/log_test.txt',data+"\r\n","utf8",(err) => {});
	}
	
};

module.exports = runjs;

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表