forked from rajaraodv/rabbitpubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.js
136 lines (113 loc) · 4.62 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
var express = require('express')
, routes = require('./routes')
, http = require('http')
, path = require('path')
, redis = require('redis')
, amqp = require('amqp');
var rabbitConn = amqp.createConnection({});
var chatExchange;
rabbitConn.on('ready', function () {
chatExchange = rabbitConn.exchange('chatExchange', {'type': 'fanout'});
});
/*
Setup Express & Socket.io
*/
var app = express();
var server = http.createServer(app);
var io = require('socket.io').listen(server);
//Set xhr-polling as WebSocket is not supported by CF
io.set("transports", ["xhr-polling"]);
//Set Socket.io's log level to 1 (info). Default is 3 (debugging)
io.set('log level', 1);
/*
Also use Redis for Session Store. Redis will keep all Express sessions in it.
*/
var RedisStore = require('connect-redis')(express),
rClient = redis.createClient(),
sessionStore = new RedisStore({client: rClient});
var cookieParser = express.cookieParser('your secret here');
app.configure(function () {
app.set('port', process.env.PORT || 3000);
app.set('views', __dirname + '/views');
app.set('view engine', 'ejs');
app.use(express.favicon());
app.use(express.logger('dev'));
app.use(express.bodyParser());
app.use(express.methodOverride());
/*
Use cookieParser and session middlewares together.
By default Express/Connect app creates a cookie by name 'connect.sid'.But to scale Socket.io app,
make sure to use cookie name 'jsessionid' (instead of connect.sid) use Cloud Foundry's 'Sticky Session' feature.
W/o this, Socket.io won't work if you have more than 1 instance.
If you are NOT running on Cloud Foundry, having cookie name 'jsessionid' doesn't hurt - it's just a cookie name.
*/
app.use(cookieParser);
app.use(express.session({store: sessionStore, key: 'jsessionid', secret: 'your secret here'}));
app.use(app.router);
app.use(express.static(path.join(__dirname, 'public')));
});
app.configure('development', function () {
app.use(express.errorHandler());
});
app.get('/', routes.index);
app.get('/logout', function(req, res) {
req.session.destroy();
res.redirect('/');
});
/*
When the user logs in (in our case, does http POST w/ user name), store it
in Express session (which in turn is stored in Redis)
*/
app.post('/user', function (req, res) {
req.session.user = req.body.user;
res.json({"error": ""});
});
/*
Use SessionSockets so that we can exchange (set/get) user data b/w sockets and http sessions
Pass 'jsessionid' (custom) cookie name that we are using to make use of Sticky sessions.
*/
var SessionSockets = require('session.socket.io');
var sessionSockets = new SessionSockets(io, sessionStore, cookieParser, 'jsessionid');
sessionSockets.on('connection', function (err, socket, session) {
/**
* When a user sends a chat message, publish it to chatExchange w/o a Routing Key (Routing Key doesn't matter
* because chatExchange is a 'fanout').
*
* Notice that we are getting user's name from session.
*/
socket.on('chat', function (data) {
var msg = JSON.parse(data);
var reply = {action: 'message', user: session.user, msg: msg.msg };
chatExchange.publish('', reply);
});
/**
* When a user joins, publish it to chatExchange w/o Routing key (Routing doesn't matter
* because chatExchange is a 'fanout').
*
* Note: that we are getting user's name from session.
*/
socket.on('join', function () {
var reply = {action: 'control', user: session.user, msg: ' joined the channel' };
chatExchange.publish('', reply);
});
/**
* Initialize subscriber queue.
* 1. First create a queue w/o any name. This forces RabbitMQ to create new queue for every socket.io connection w/ a new random queue name.
* 2. Then bind the queue to chatExchange w/ "#" or "" 'Binding key' and listen to ALL messages
* 3. Lastly, create a consumer (via .subscribe) that waits for messages from RabbitMQ. And when
* a message comes, send it to the browser.
*
* Note: we are creating this w/in sessionSockets.on('connection'..) to create NEW queue for every connection
*/
rabbitConn.queue('', {exclusive: true}, function (q) {
//Bind to chatExchange w/ "#" or "" binding key to listen to all messages.
q.bind('chatExchange', "");
//Subscribe When a message comes, send it back to browser
q.subscribe(function (message) {
socket.emit('chat', JSON.stringify(message));
});
});
});
server.listen(app.get('port'), function () {
console.log("Express server listening on port " + app.get('port'));
});