小虎任务队列:
直接上代码
文件结构:
data
lib
-- api.js
-- base.js
-- Router.js
-- runjs.js
log
index.js
index.js
var env = process.env.NODE_ENV || 'production';
env = env.toLowerCase();
global.ONLINE = (env=='development'?false:true);
global.express = require('express');
global.app = express();
global.ipport = ONLINE?'172.10.1.66:11300':'192.168.1.88:11300';
global.tube = 'Queue';
global.clientIsConn_pop = false;
global.clientIsConn_push = false;
global.MsgArray = [];
global.signKey = 'eae6331dd14gf10c4f4d034578cf0f08';
global.fileDir = './data/';
global.Base = require('./lib/base');
global.bodyParser = require('body-parser');
var favicon = require('serve-favicon');
var http = require('http');
var path = require('path');
var Router = require('./lib/Router');
var fs = require("fs");
var readline = require("readline");
/*app.disable('x-powered-by');*/
app.all('*', function(req, res, next) {
res.header("X-Powered-By",'ThinkPHP 6.0');//自定义,迷惑用户
next();
});
app.use('/favicon.ico', express.static(__dirname+'/favicon.ico'));
app.use(bodyParser.urlencoded({ extended: true }));
app.use('/', Router);
app.use(function(req, res, next) {
global.Base.api_error(res, 'error');
});
app.use(function(err, req, res, next) {
global.Base.api_error(res, 'error');
});
process.on('uncaughtException',function(err){});
global.bs_pop = require('nodestalker');
global.bs_push = require('nodestalker');
function pop(){
var client = global.bs_pop.Client(global.ipport);
function reconn(c){
c.disconnect();
global.clientIsConn_pop = false;
if( !global.clientIsConn_pop ){
setTimeout(function(){
console.log('pop ReConnectting...');
pop();
},2000);
}
}
client.on('connect',function(){
global.clientIsConn_pop = true;
console.log('pop Client OK');
});
client.on('close',function(){
console.log('pop Client Close');
reconn(client);
});
client.on('end',function(){
console.log('pop Client End');
reconn(client);
});
client.on('error',function(){
console.log('pop Client Error');
reconn(client);
});
/*消费消息*/
client.watch(global.tube).onSuccess(function(data) {
/*任务处理*/
function taskProcess(job,fn){
var dataJson = job.data;
var dataArray = {};
if( global.Base.isJSON(dataJson) ){
dataArray = JSON.parse(dataJson);
/*应该执行特定动作*/
let runjs = require('./runjs');
runjs.run(dataArray);
fn.call();
}else{
fn.call();
}
}
function resJob() {
client.reserve().onSuccess(function(job) {
taskProcess(job, function(){
client.deleteJob(job.id).onSuccess(function(del_msg) {
resJob();
});
});
});
}
resJob();
});
}
function push(){
var client = global.bs_push.Client(global.ipport);
function reconn(c){
c.disconnect();
global.clientIsConn_push = false;
if( !global.clientIsConn_push ){
setTimeout(function(){
console.log('push ReConnectting...');
push();
},2000);
}
}
/*生产消息*/
function push_data(c){
if( global.clientIsConn_push && global.MsgArray.length > 0 ){
c.use(global.tube).onSuccess(function(data) {
while(global.MsgArray.length>0){
var one = global.MsgArray.shift();
c.put(one);
}
setTimeout(function(){
push_data(c);
},1);
});
}else{
setTimeout(function(){
push_data(c);
},1000);
}
}
client.use(global.tube).onSuccess(function(data) {
if(global.MsgArray.length>0){
var one = global.MsgArray.shift();
client.put(one);
}
});
client.on('connect',function(){
global.clientIsConn_push = true;
console.log('push Client OK');
push_data(client);
});
client.on('close',function(){
console.log('push Client Close');
reconn(client);
});
client.on('end',function(){
console.log('push Client End');
reconn(client);
});
client.on('error',function(){
console.log('push Client Error');
reconn(client);
});
}
app.listen(5000, function () {
console.log('消费服务启动!');
pop();
push();
});
/*文件队列*/
global.fileList = [];
global.isRunning = 0;
global.oriFileList = {};
/*读取文件内容*/
function readFileToVar(){
if( global.fileList.length > 0 ){
global.isRunning = 1;
let oneDir = global.fileList.shift();
const readliner = readline.createInterface({
????input: fs.createReadStream( oneDir )
});
readliner.on('line', function(dataChunk) {
global.MsgArray.push(dataChunk);
});
readliner.on('close', function() {
global.isRunning = 0;
fs.unlinkSync(oneDir);
});
readliner.on('error', function() {
global.isRunning = 0;
fs.unlinkSync(oneDir);
});
readliner.on('end', function() {
global.isRunning = 0;
fs.unlinkSync(oneDir);
});
}
}
let timer;
//定时读取文件
timer = setInterval( ()=>{
if(global.isRunning == 1){
return;
}else{
readFileToVar();
}
}, 1000 );
let timer_o;
//定时修改原文件列表
timer_o = setInterval( ()=>{
for(let i in global.oriFileList){
let guid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
return v.toString(16);
});
fs.rename(global.fileDir + i, global.fileDir + 'd-'+guid+'.txt',function(err){
//console.log(i+' rename success');
delete global.oriFileList[i];
});
break;
}
}, 1000 );
/*监视文件变化*/
var watcher = fs.watch( global.fileDir );
watcher.on('change',function(event,filename){
var reg = new RegExp(/^d-[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}\.txt$/);
if (reg.test(filename)) {
let stats = fs.statSync( global.fileDir + filename );
if ( stats.isFile()) {
global.fileList.push( global.fileDir + filename );
}
}else{
if( !global.oriFileList[filename] ){
global.oriFileList[filename] = filename;
}
}
});
/*首次运行 检测文件夹*/
var dirList = fs.readdirSync( global.fileDir );
dirList.forEach(function(filename){
var reg = new RegExp(/^d-[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}\.txt$/);
let stats = fs.statSync( global.fileDir + filename );
if ( stats.isFile()) {
if( reg.test(filename) ){
global.fileList.push( global.fileDir + filename );
}else{
if( !global.oriFileList[filename] ){
global.oriFileList[filename] = filename;
}
}
}
});
base.js
function Base(){}
Base.prototype.api_success = function (res,obj,jsoncallback) {
res.writeHead(200, { 'Content-Type': 'text/plain; charset=UTF-8' });
var objectData = {
"status" : 1,
"data" : obj?obj:''
};
if(jsoncallback){
res.end(jsoncallback+'('+JSON.stringify(objectData)+')');
}
res.end(JSON.stringify(objectData));
};
Base.prototype.api_error = function (res,msg,jsoncallback) {
res.writeHead(200, { 'Content-Type': 'text/plain; charset=UTF-8' });
var objectData = {
"status" : 0,
"msg" : msg?msg:''
};
if(jsoncallback){
res.end(jsoncallback+'('+JSON.stringify(objectData)+')');
}
res.end(JSON.stringify(objectData));
};
Base.prototype.isJSON = function(res) {
try {
if ( !isNaN(res) ) return false;
JSON.parse(res);
return true;
}
catch(e) {
return false;
}
};
module.exports = new Base();
Router.js
var api = require('../lib/api');
var multipart = require('connect-multiparty');
var multipartMiddleware = multipart();
var router = global.express.Router();
var bodyData = global.bodyParser.json();
router.post('/api/public_interface', bodyData, api.public_interface);// 默认方式
router.post('/api/public_interface/', bodyData, api.public_interface);// 默认方式
router.post('/api/public_interface/post', bodyData, api.public_interface);// post方式
router.post('/api/public_interface/json', bodyData, api.public_interface);// application/json方式
router.post('/api/public_interface/form', multipartMiddleware, api.public_interface);// form方式
router.get('/api/public_interface?', api.public_interface );// get方式
router.get('/', api.index); // 首页
router.all('/*',api.index);
module.exports = router;
api.js
var crypto = require('crypto');
/*默认主页*/
exports.index = function (req, res) {
global.Base.api_error('error');
};
/*外部接口*/
exports.public_interface = function (req, res, next) {
var thatres = res;
var $_POST = req.body;
var $_GET = req.query;
var getData = $_POST.hasOwnProperty('data') ? $_POST : $_GET;
var data = getData.data;
var sign = getData.sign;
try {
var isSignOk = 1;
//检测sign
var reg = /^[a-zA-Z0-9]{40}$/;
if( !reg.test(sign) ){
isSignOk = 0;
}
var sha1 = crypto.createHash('sha1');
sha1.update( global.signKey + data );
if( sha1.digest('hex') != sign ){
isSignOk = 0;
}
if( isSignOk == 0 ){
global.Base.api_error(thatres, 'sign error');
return;
}
var nowtime = parseInt(new Date().getTime() / 1000);
var dataArray = Buffer.from(data,'base64').toString();
/*数据处理 开始*/
global.MsgArray.push(dataArray);
/*数据处理 结束*/
global.Base.api_success(thatres);
} catch (err) {
global.Base.api_error(thatres, 'SystemException:' + err.toString());
}
};
runjs.js
var fs = require("fs");
var runjs = {
run: function(data){
if(data.type){
switch(data.type){
case 'callback'://回调类型
this.CallBack(data.data);
return;
case 'InsertData'://入库类型
this.InsertData(data.data);
return;
default://类型自己扩展
return;
}
}
},
CallBack: function(data){
/*应该执行特定动作,这里以写入文件进行测试*/
fs.appendFile('./log/log_test.txt',data+"\r\n","utf8",(err) => {});
},
InsertData: function(data){
/*应该执行特定动作,这里以写入文件进行测试*/
fs.appendFile('./log/log_test.txt',data+"\r\n","utf8",(err) => {});
}
};
module.exports = runjs;
本文暂时没有评论,来添加一个吧(●'◡'●)