小虎任務佇列:
直接上程式碼
檔案結構:
datalib-- api.js-- base.js-- Router.js-- runjs.jslogindex.js
index.js
var env = process.env.NODE_ENV || 'production';env = env.toLowerCase();global.ONLINE = (env=='development'?false:true);global.express = require('express');global.app = express();global.ipport = ONLINE?'172.10.1.66:11300':'192.168.1.88:11300';global.tube = 'Queue';global.clientIsConn_pop = false;global.clientIsConn_push = false;global.MsgArray = [];global.signKey = 'eae6331dd14gf10c4f4d034578cf0f08';global.fileDir = './data/';global.Base = require('./lib/base');global.bodyParser = require('body-parser');var favicon = require('serve-favicon');var http = require('http');var path = require('path');var Router = require('./lib/Router');var fs = require("fs");var readline = require("readline");/*app.disable('x-powered-by');*/app.all('*', function(req, res, next) {res.header("X-Powered-By",'ThinkPHP 6.0');//自定義,迷惑使用者next();});app.use('/favicon.ico', express.static(__dirname+'/favicon.ico'));app.use(bodyParser.urlencoded({ extended: true }));app.use('/', Router);app.use(function(req, res, next) {global.Base.api_error(res, 'error');});app.use(function(err, req, res, next) {global.Base.api_error(res, 'error');});process.on('uncaughtException',function(err){});global.bs_pop = require('nodestalker');global.bs_push = require('nodestalker');function pop(){var client = global.bs_pop.Client(global.ipport);function reconn(c){c.disconnect();global.clientIsConn_pop = false;if( !global.clientIsConn_pop ){setTimeout(function(){console.log('pop ReConnectting...');pop();},2000);}}client.on('connect',function(){global.clientIsConn_pop = true;console.log('pop Client OK');});client.on('close',function(){console.log('pop Client Close');reconn(client);});client.on('end',function(){console.log('pop Client End');reconn(client);});client.on('error',function(){console.log('pop Client Error');reconn(client);});/*消費訊息*/client.watch(global.tube).onSuccess(function(data) {/*任務處理*/function taskProcess(job,fn){var dataJson = job.data;var dataArray = {};if( global.Base.isJSON(dataJson) ){dataArray = JSON.parse(dataJson);/*應該執行特定動作*/let runjs = require('./runjs');runjs.run(dataArray);fn.call();}else{fn.call();}}function resJob() {client.reserve().onSuccess(function(job) {taskProcess(job, function(){client.deleteJob(job.id).onSuccess(function(del_msg) {resJob();});});});}resJob();});}function push(){var client = global.bs_push.Client(global.ipport);function reconn(c){c.disconnect();global.clientIsConn_push = false;if( !global.clientIsConn_push ){setTimeout(function(){console.log('push ReConnectting...');push();},2000);}}/*生產訊息*/function push_data(c){if( global.clientIsConn_push && global.MsgArray.length > 0 ){c.use(global.tube).onSuccess(function(data) {while(global.MsgArray.length>0){var one = global.MsgArray.shift();c.put(one);}setTimeout(function(){push_data(c);},1);});}else{setTimeout(function(){push_data(c);},1000);}}client.use(global.tube).onSuccess(function(data) {if(global.MsgArray.length>0){var one = global.MsgArray.shift();client.put(one);}});client.on('connect',function(){global.clientIsConn_push = true;console.log('push Client OK');push_data(client);});client.on('close',function(){console.log('push Client Close');reconn(client);});client.on('end',function(){console.log('push Client End');reconn(client);});client.on('error',function(){console.log('push Client Error');reconn(client);});}app.listen(5000, function () {console.log('消費服務啟動!');pop();push();});/*檔案佇列*/global.fileList = [];global.isRunning = 0;global.oriFileList = {};/*讀取檔案內容*/function readFileToVar(){if( global.fileList.length > 0 ){global.isRunning = 1;let oneDir = global.fileList.shift();const readliner = readline.createInterface({input: fs.createReadStream( oneDir )});readliner.on('line', function(dataChunk) {global.MsgArray.push(dataChunk);});readliner.on('close', function() {global.isRunning = 0;fs.unlinkSync(oneDir);});readliner.on('error', function() {global.isRunning = 0;fs.unlinkSync(oneDir);});readliner.on('end', function() {global.isRunning = 0;fs.unlinkSync(oneDir);});}}let timer;//定時讀取檔案timer = setInterval( ()=>{if(global.isRunning == 1){return;}else{readFileToVar();}}, 1000 );let timer_o;//定時修改原檔案列表timer_o = setInterval( ()=>{for(let i in global.oriFileList){let guid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);return v.toString(16);});fs.rename(global.fileDir + i, global.fileDir + 'd-'+guid+'.txt',function(err){//console.log(i+' rename success');delete global.oriFileList[i];});break;}}, 1000 );/*監視檔案變化*/var watcher = fs.watch( global.fileDir );watcher.on('change',function(event,filename){var reg = new RegExp(/^d-[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}\\.txt$/);if (reg.test(filename)) {let stats = fs.statSync( global.fileDir + filename );if ( stats.isFile()) {global.fileList.push( global.fileDir + filename );}}else{if( !global.oriFileList[filename] ){global.oriFileList[filename] = filename;}}});/*首次執行 檢測資料夾*/var dirList = fs.readdirSync( global.fileDir );dirList.forEach(function(filename){var reg = new RegExp(/^d-[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}\\.txt$/);let stats = fs.statSync( global.fileDir + filename );if ( stats.isFile()) {if( reg.test(filename) ){global.fileList.push( global.fileDir + filename );}else{if( !global.oriFileList[filename] ){global.oriFileList[filename] = filename;}}}});
base.js
function Base(){}Base.prototype.api_success = function (res,obj,jsoncallback) { res.writeHead(200, { 'Content-Type': 'text/plain; charset=UTF-8' });var objectData = {"status" : 1,"data" : obj?obj:''};if(jsoncallback){res.end(jsoncallback+'('+JSON.stringify(objectData)+')');} res.end(JSON.stringify(objectData));};Base.prototype.api_error = function (res,msg,jsoncallback) { res.writeHead(200, { 'Content-Type': 'text/plain; charset=UTF-8' });var objectData = {"status" : 0,"msg" : msg?msg:''};if(jsoncallback){res.end(jsoncallback+'('+JSON.stringify(objectData)+')');} res.end(JSON.stringify(objectData));};Base.prototype.isJSON = function(res) {try {if ( !isNaN(res) ) return false;JSON.parse(res);return true;}catch(e) {return false;}};module.exports = new Base();
Router.js
var api = require('../lib/api');var multipart = require('connect-multiparty');var multipartMiddleware = multipart();var router = global.express.Router();var bodyData = global.bodyParser.json();router.post('/api/public_interface',bodyData,api.public_interface);// 預設方式router.post('/api/public_interface/',bodyData,api.public_interface);// 預設方式router.post('/api/public_interface/post',bodyData,api.public_interface);// post方式router.post('/api/public_interface/json',bodyData,api.public_interface);// application/json方式router.post('/api/public_interface/form',multipartMiddleware,api.public_interface);// form方式router.get('/api/public_interface?',api.public_interface);// get方式router.get('/', api.index); // 首頁router.all('/*',api.index);module.exports = router;
api.js
var crypto = require('crypto');/*預設主頁*/exports.index = function (req, res) {global.Base.api_error('error');};/*外部介面*/exports.public_interface = function (req, res, next) {var thatres = res;var $_POST = req.body;var $_GET = req.query;var getData = $_POST.hasOwnProperty('data') ? $_POST : $_GET;var data = getData.data;var sign = getData.sign;try {var isSignOk = 1;//檢測signvar reg = /^[a-zA-Z0-9]{40}$/;if( !reg.test(sign) ){isSignOk = 0;}var sha1 = crypto.createHash('sha1');sha1.update( global.signKey + data );if( sha1.digest('hex') != sign ){isSignOk = 0;}if( isSignOk == 0 ){global.Base.api_error(thatres, 'sign error');return;}var nowtime = parseInt(new Date().getTime() / 1000);var dataArray = Buffer.from(data,'base64').toString();/*資料處理 開始*/global.MsgArray.push(dataArray);/*資料處理 結束*/global.Base.api_success(thatres);} catch (err) {global.Base.api_error(thatres, 'SystemException:' + err.toString());}};
runjs.js
var fs = require("fs");var runjs = {run: function(data){if(data.type){switch(data.type){case 'callback'://回撥型別this.CallBack(data.data);return;case 'InsertData'://入庫型別this.InsertData(data.data);return;default://型別自己擴充套件return;}}},CallBack: function(data){/*應該執行特定動作,這裡以寫入檔案進行測試*/fs.appendFile('./log/log_test.txt',data+"\\r\\n","utf8",(err) => {});},InsertData: function(data){/*應該執行特定動作,這裡以寫入檔案進行測試*/fs.appendFile('./log/log_test.txt',data+"\\r\\n","utf8",(err) => {});}};module.exports = runjs;
最新評論