Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 122 additions & 63 deletions src/hooks/useMessaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ export const useSendMessage = () => {
});
};

// Connection pool for message channels
const messageChannels = new Map<number, any>();
const channelSubscribers = new Map<number, Set<() => void>>();

// Hook for real-time message updates
export const useRealtimeMessages = (conversationId: number) => {
const queryClient = useQueryClient();
Expand All @@ -166,41 +170,73 @@ export const useRealtimeMessages = (conversationId: number) => {
useEffect(() => {
if (!user || !conversationId) return;

const channel = supabase
.channel(`messages:${conversationId}`)
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'Messages',
filter: `conversation_id=eq.${conversationId}`,
},
() => {
queryClient.invalidateQueries({ queryKey: ['messages', conversationId] });
queryClient.invalidateQueries({ queryKey: ['conversations'] });
}
)
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'Messages',
filter: `conversation_id=eq.${conversationId}`,
},
() => {
queryClient.invalidateQueries({ queryKey: ['messages', conversationId] });
}
)
.subscribe();
const invalidateQueries = () => {
queryClient.invalidateQueries({ queryKey: ['messages', conversationId] });
queryClient.invalidateQueries({ queryKey: ['conversations'] });
};

// Use connection pooling
if (!messageChannels.has(conversationId)) {
const channel = supabase
.channel(`messages:${conversationId}`)
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'public',
table: 'Messages',
filter: `conversation_id=eq.${conversationId}`,
},
() => {
// Notify all subscribers for this conversation
const subscribers = channelSubscribers.get(conversationId);
subscribers?.forEach(callback => callback());
}
)
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'Messages',
filter: `conversation_id=eq.${conversationId}`,
},
() => {
const subscribers = channelSubscribers.get(conversationId);
subscribers?.forEach(callback => callback());
}
)
.subscribe();

messageChannels.set(conversationId, channel);
channelSubscribers.set(conversationId, new Set());
}

// Add this component as subscriber
channelSubscribers.get(conversationId)?.add(invalidateQueries);

return () => {
supabase.removeChannel(channel);
// Remove subscriber
const subscribers = channelSubscribers.get(conversationId);
subscribers?.delete(invalidateQueries);

// Clean up channel if no subscribers
if (subscribers?.size === 0) {
const channel = messageChannels.get(conversationId);
if (channel) {
supabase.removeChannel(channel);
messageChannels.delete(conversationId);
channelSubscribers.delete(conversationId);
}
}
};
}, [conversationId, user, queryClient]);
};

// Shared presence channel for connection pooling
let sharedPresenceChannel: any = null;
let presenceSubscribers = new Set<(users: UserPresence[]) => void>();

// Hook for user presence
export const useUserPresence = () => {
const { user } = useAuth();
Expand All @@ -211,56 +247,79 @@ export const useUserPresence = () => {

// Update user status to online
const updatePresence = async () => {
await supabase
.from('UserPresence')
.upsert({
user_id: user.id,
status: 'online',
last_seen: new Date().toISOString(),
});
try {
await supabase
.from('UserPresence')
.upsert({
user_id: user.id,
status: 'online',
last_seen: new Date().toISOString(),
});
} catch (error) {
console.warn('Failed to update presence:', error);
}
};

updatePresence();

// Set up real-time subscription for presence updates
const channel = supabase
.channel('user-presence')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'UserPresence',
},
() => {
// Refetch presence data
fetchPresence();
}
)
.subscribe();
// Use shared channel for connection pooling
if (!sharedPresenceChannel) {
sharedPresenceChannel = supabase
.channel('shared-user-presence')
.on(
'postgres_changes',
{
event: '*',
schema: 'public',
table: 'UserPresence',
},
() => {
fetchPresence();
}
)
.subscribe();
}

const fetchPresence = async () => {
const { data } = await supabase
.from('UserPresence')
.select('*')
.eq('status', 'online');

if (data) setOnlineUsers(data);
try {
const { data } = await supabase
.from('UserPresence')
.select('*')
.eq('status', 'online');

if (data) {
// Notify all subscribers
presenceSubscribers.forEach(callback => callback(data));
}
} catch (error) {
console.warn('Failed to fetch presence:', error);
}
};

// Add this component as subscriber
presenceSubscribers.add(setOnlineUsers);
fetchPresence();

// Update presence every 30 seconds
const interval = setInterval(updatePresence, 30000);
// Reduced frequency: Update presence every 2 minutes
const interval = setInterval(updatePresence, 120000);

// Set status to offline on unmount
// Cleanup on unmount
return () => {
clearInterval(interval);
supabase.removeChannel(channel);
presenceSubscribers.delete(setOnlineUsers);

// Clean up shared channel if no subscribers
if (presenceSubscribers.size === 0 && sharedPresenceChannel) {
supabase.removeChannel(sharedPresenceChannel);
sharedPresenceChannel = null;
}

// Set status to offline
supabase
.from('UserPresence')
.update({ status: 'offline', last_seen: new Date().toISOString() })
.eq('user_id', user.id);
.eq('user_id', user.id)
.then(() => {});
};
}, [user]);

Expand Down