Skip to content

Commit

Permalink
better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
NourAlharithi committed Nov 7, 2023
1 parent 3e112f4 commit cbecd51
Showing 1 changed file with 42 additions and 8 deletions.
50 changes: 42 additions & 8 deletions src/wsConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ const REDIS_PASSWORD = process.env.REDIS_PASSWORD;

async function main() {
const redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
const lastMessageRetriever = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
const lastMessageRetriever = new RedisClient(
REDIS_HOST,
REDIS_PORT,
REDIS_PASSWORD
);

await redisClient.connect();
await lastMessageRetriever.connect();
Expand All @@ -39,20 +43,34 @@ async function main() {
});

// Save and persist last message
lastMessageRetriever.client.set(`last_update_${subscribedChannel}`, message);
lastMessageRetriever.client.set(
`last_update_${subscribedChannel}`,
message
);
});

redisClient.client.on('error', (error) => {
console.error('Redis client error:', error);
});

wss.on('connection', (ws: WebSocket) => {
console.log('Client connected');

ws.on('message', async (msg) => {
const parsedMessage = JSON.parse(msg.toString());
let parsedMessage: any;
let messageType: string;
try {
parsedMessage = JSON.parse(msg.toString());
messageType = parsedMessage.type.toLowerCase();
} catch (e) {
return;
}

switch (parsedMessage.type.toLowerCase()) {
switch (messageType) {
case 'subscribe': {
const channel = parsedMessage.channel;
if (!subscribedChannels.has(channel)) {
console.log('Subscribing to channel', channel);
console.log('Trying to subscribe to channel', channel);
redisClient.client
.subscribe(channel)
.then(() => {
Expand All @@ -62,7 +80,7 @@ async function main() {
ws.send(
JSON.stringify({
channel,
error: `Invalid channel: ${channel}`,
error: `Error subscribing to channel: ${channel}`,
})
);
return;
Expand All @@ -76,9 +94,10 @@ async function main() {
channelSubscribers.get(channel).add(ws);

// Fetch and send last message
const lastMessage = await lastMessageRetriever.client.get(`last_update_${channel}`);
const lastMessage = await lastMessageRetriever.client.get(
`last_update_${channel}`
);
if (lastMessage !== null) {
console.log('sending last message on new subscribe');
ws.send(JSON.stringify({ channel, data: lastMessage }));
}
break;
Expand All @@ -91,6 +110,7 @@ async function main() {
}
break;
}
case undefined:
default:
break;
}
Expand Down Expand Up @@ -122,6 +142,12 @@ async function main() {
// Clear any existing intervals and timeouts
clearInterval(pingIntervalId);
clearTimeout(pongTimeoutId);
channelSubscribers.forEach((subscribers, channel) => {
if (subscribers.delete(ws) && subscribers.size === 0) {
redisClient.client.unsubscribe(channel);
channelSubscribers.delete(channel);
}
});
});

ws.on('disconnect', () => {
Expand All @@ -136,8 +162,16 @@ async function main() {
server.listen(WS_PORT, () => {
console.log(`connection manager running on ${WS_PORT}`);
});

server.on('error', (error) => {
console.error('Server error:', error);
});
}

process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
});

async function recursiveTryCatch(f: () => void) {
try {
await f();
Expand Down

0 comments on commit cbecd51

Please sign in to comment.