-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp.js
More file actions
93 lines (80 loc) · 3.05 KB
/
app.js
File metadata and controls
93 lines (80 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
var config = require("./config.js");
var __ = require("underscore");
var app = require('http').createServer();
var io = require('socket.io')(app);
var queue = require('amqplib');
var connections = [];
function bail(err) {
console.error(err);
process.exit(1);
}
// Publisher helper function
function publisher(ch, clientId, queue_name, data, type, other_options, callback) {
ch.assertQueue(queue_name);
ch.sendToQueue(
queue_name,
new Buffer(data),
__(config.rabbit.publish_options)
.extend(other_options ? other_options : {}, {
headers: {
web_client_id: clientId
},
type: type
}),
callback ? callback : function(err, ok) {
if (err) {
console.warn("Warning! %s message nacked %s", data, err);
}
}
);
}
queue.connect('amqp://' + config.rabbit.address)
.then(
function(conn) {
// Create/Assert the queues
console.log("Creating queues:");
var channel;
conn.createConfirmChannel()
.then(
function(ch) {
channel = ch;
var promises = [];
__(config.rabbit.queues).each(function(q){
console.log(" -> " + q + " ... created");
promises.push(ch.assertQueue(q));
});
return Promise.all(promises);
})
.then(
function(){
io.on('connection', function(client){
connections.push(client);
console.log('User connected: ', client.id, " -> " + connections.length + " clients connected");
/* Interaction event */
client.on('interaction', function(data){
if (data !== undefined)
if (data.name !== undefined && config.interactions[data.name] !== undefined)
if (config.interactions[data.name].queues !== undefined)
__(config.interactions[data.name].queues).each(function(q) {
if (__(config.rabbit.queues).contains(q))
publisher(channel, client.id, q, data.data ? data.data : "", "interaction_data");
else
console.warn("WARNING: %s is not defined in config.rabbit.queues", q);
});
});
/* Client disconnected event */
client.on('disconnect', function() {
connections = __(connections).without(client);
console.log('User disconnected: ', client.id, " -> " + connections.length + " clients connected");
__(config.rabbit.queues).each(function(q){
publisher(channel, client.id, q, "", "client_disconnected");
});
});
});
});
});
app.listen(config.web.port, function() {
console.log("*********************************************************");
console.log("*** Sendero Interaction Server listening on port %s ***", config.web.port);
console.log("*********************************************************");
});