-
Notifications
You must be signed in to change notification settings - Fork 371
Expand file tree
/
Copy pathprocess_queue.js
More file actions
120 lines (107 loc) · 3.68 KB
/
process_queue.js
File metadata and controls
120 lines (107 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
const rp = require('request-promise-native');
const Queue = require('./taskqueue');
const _ = require('lodash');
const processUrl = 'http://localhost:5000/process/';
const processBasicFields = ['overwrite', 'from', 'actions'];
const processFields = ['timestamp'].concat(processBasicFields);
const debug = false;
// Queue of scans to be processed
// Persistent storage using sqlite
const store = Queue.getCachedSqlStore({
type: 'sql',
dialect: 'sqlite',
path: './data/process-queue.db'
});
const queue = new Queue(function (task, cb) {
var url = processUrl + task.id;
if (!debug) {
var options = {
uri: url,
qs: _.pick(task, processFields)
};
console.log('Processing %s, connecting to url %s', task.id, url);
rp(options)
.then(function (resp) {
// TODO: Process response...
console.log('Response to %s: ', task.id, resp);
cb(null, resp);
})
.catch(function (err) {
// Processing failed...
console.log('Processing %s FAILED: ', task.id, err);
cb(err, null);
});
} else {
console.log('Processing %s - debug mode', task.id);
}
}, {
stateFile: './data/process-queue.state.json',
store: store,
priority: function (task, cb) {
cb(null, task.priority || 1);
}
}
);
module.exports = function() {
const app = this;
app.process_queue = queue;
// custom population of scans from csv
app.get('/queues/process/add', (req, res, next) => {
// Add scan to process queue
//var logger = app.logger.getContext('', { path: '/queues/process/add'});
var opts = { id: req.query.scanId, timestamp: Date.now() };
_.defaults(opts, _.pick(req.query, processBasicFields));
if (req.query.priority != undefined) {
var priority = req.query.priority;
if (priority == 'max') {
var p = queue.maxPriority();
priority = p? p+1 : undefined;
} else if (priority == 'min') {
var p = queue.minPriority();
priority = p? p-1 : undefined;
}
opts.priority = priority;
}
console.log(opts);
var ticket = queue.push(opts);
res.json({ status: 'ok', size: queue.size(), ticket: ticket });
});
app.get('/queues/process/remove', (req, res, next) => {
// Remove scan from process queue
//var logger = app.logger.getContext('', { path: '/queues/process/remove'});
queue.cancel(req.query.scanId);
res.json({ status: 'ok', size: queue.size() });
});
app.get('/queues/process/list', (req, res, next) => {
// List scans in process queue
//var logger = app.logger.getContext('', { path: '/queues/process/list'});
var queued = queue.list();
res.json({ status: 'ok', queue: queued });
});
app.get('/queues/process/clear', (req, res, next) => {
// List scans in process queue
//var logger = app.logger.getContext('', { path: '/queues/process/clear'});
queue.clear();
res.json({ status: 'ok', size: queue.size() });
});
app.get('/queues/process/status', (req, res, next) => {
// Returns basic queue status
res.json({ status: 'ok', status: queue.status() });
});
app.get('/queues/process/stats', (req, res, next) => {
// Returns basic queue stats
res.json({ status: 'ok', stats: queue.getStats(), size: queue.size(), isPaused: queue.isPaused() });
});
app.get('/queues/process/pause', (req, res, next) => {
// Pauses process queue
queue.pause();
queue.saveState();
res.json({ status: 'ok', size: queue.size(), isPaused: queue.isPaused() });
});
app.get('/queues/process/resume', (req, res, next) => {
// Resumes process queue
queue.resume();
queue.saveState();
res.json({ status: 'ok', size: queue.size(), isPaused: queue.isPaused() });
});
};