Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 42 additions & 36 deletions src/hooks/use-learning-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,23 @@ export function useLearningChat(): UseLearningChatReturn {
const streamingThreadId = streamingThreadIds[streamingThreadIds.length - 1] ?? null;

// Track threads where we've just started a job (before storage reflects isStreaming)
const [pendingStreamThreadIds, setPendingStreamThreadIds] = useState<Set<string>>(new Set());
// Map of threadId -> expected message count after response completes
const [pendingStreamThreads, setPendingStreamThreads] = useState<Map<string, number>>(new Map());

// Combine storage-derived streaming IDs with pending ones
const pendingStreamThreadIds = useMemo(() =>
Array.from(pendingStreamThreads.keys()),
[pendingStreamThreads]
);

const allStreamingThreadIds = useMemo(() => {
const combined = new Set([...streamingThreadIds, ...pendingStreamThreadIds]);
return Array.from(combined);
}, [streamingThreadIds, pendingStreamThreadIds]);

// Check if active thread is streaming (from storage OR pending)
const isStreaming = activeThread?.isStreaming === true ||
(activeThreadId ? pendingStreamThreadIds.has(activeThreadId) : false);
(activeThreadId ? pendingStreamThreads.has(activeThreadId) : false);

// Streaming content comes from the thread messages in storage
// The streaming message has cursor ` ▊` which gives the typing effect
Expand All @@ -227,37 +233,31 @@ export function useLearningChat(): UseLearningChatReturn {
};
}, [allStreamingThreadIds.length, refreshThreads]);

// Clean up pending stream IDs once storage reflects the streaming state
// Clean up pending stream threads once storage reflects the completed response
useEffect(() => {
if (pendingStreamThreadIds.size === 0) return;
if (pendingStreamThreads.size === 0) return;

// Check if any pending threads now have isStreaming in storage
const stillPending = new Set<string>();
for (const threadId of pendingStreamThreadIds) {
// Check if any pending threads now have the expected number of messages
const stillPending = new Map<string, number>();
for (const [threadId, expectedMessageCount] of pendingStreamThreads) {
const thread = threads.find(t => t.id === threadId);
// Keep as pending if thread not found OR doesn't have isStreaming yet
// AND doesn't have a completed message (no cursor)
if (!thread) {
stillPending.add(threadId);
// Thread not found yet - keep pending
stillPending.set(threadId, expectedMessageCount);
} else if (thread.isStreaming) {
// Storage has caught up - no longer pending
} else {
// Check if there's a streaming message or completed assistant message
const hasStreamingMsg = thread.messages.some(m => m.id.startsWith('streaming-'));
const hasCompletedResponse = thread.messages.some(m =>
m.role === 'assistant' && !m.id.startsWith('streaming-')
);
if (!hasStreamingMsg && !hasCompletedResponse) {
// Still waiting for job to write first content
stillPending.add(threadId);
}
// Still streaming - keep pending (storage has caught up, streaming in progress)
stillPending.set(threadId, expectedMessageCount);
} else if (thread.messages.length < expectedMessageCount) {
// Haven't received the response yet - keep pending
stillPending.set(threadId, expectedMessageCount);
}
// Otherwise: thread has expected messages and is not streaming - remove from pending
}

if (stillPending.size !== pendingStreamThreadIds.size) {
setPendingStreamThreadIds(stillPending);
if (stillPending.size !== pendingStreamThreads.size) {
setPendingStreamThreads(stillPending);
}
}, [threads, pendingStreamThreadIds]);
}, [threads, pendingStreamThreads]);

useEffect(() => {
if (isThreadsLoading) return;
Expand Down Expand Up @@ -331,8 +331,8 @@ export function useLearningChat(): UseLearningChatReturn {
log.debug('Stopping stream for thread:', threadId);

// Remove from pending (stops showing as streaming immediately)
setPendingStreamThreadIds(prev => {
const next = new Set(prev);
setPendingStreamThreads(prev => {
const next = new Map(prev);
next.delete(threadId);
return next;
});
Expand Down Expand Up @@ -460,8 +460,11 @@ export function useLearningChat(): UseLearningChatReturn {
}
}

// Check if THIS thread is already streaming (from storage)
const threadFromStorage = threads.find(t => t.id === targetThreadId);
// Check if THIS thread is already streaming (query storage directly to avoid race condition)
// We must check the actual storage, not React state, because the state may not have
// refreshed yet after the previous message completed. This was causing follow-up messages
// to be blocked when sent quickly after the first message.
const threadFromStorage = await threadStore.getById(targetThreadId);
if (threadFromStorage?.isStreaming) {
log.warn(`Thread ${targetThreadId} is already streaming`);
return;
Expand All @@ -482,13 +485,17 @@ export function useLearningChat(): UseLearningChatReturn {
messages: [...thread.messages, userMessage],
}, targetThreadId);

// Calculate expected message count: current messages + user message + assistant response
// Current thread messages count + 1 (user) + 1 (assistant) = expected total
const expectedMessageCount = thread.messages.length + 2;

// Start background job via POST /api/jobs
// The job writes to storage, and our polling effect refreshes the UI
log.debug('Starting background job for chat response', { threadId: targetThreadId });
log.debug('Starting background job for chat response', { threadId: targetThreadId, expectedMessageCount });

// Mark this thread as pending streaming IMMEDIATELY
// Mark this thread as pending streaming with expected message count
// This triggers polling before storage has isStreaming: true
setPendingStreamThreadIds(prev => new Set([...prev, targetThreadId]));
setPendingStreamThreads(prev => new Map(prev).set(targetThreadId, expectedMessageCount));

try {
const jobRes = await fetch('/api/jobs', {
Expand All @@ -509,8 +516,8 @@ export function useLearningChat(): UseLearningChatReturn {
if (!jobRes.ok) {
const err = await jobRes.json();
// Remove from pending on error
setPendingStreamThreadIds(prev => {
const next = new Set(prev);
setPendingStreamThreads(prev => {
const next = new Map(prev);
next.delete(targetThreadId);
return next;
});
Expand All @@ -525,8 +532,8 @@ export function useLearningChat(): UseLearningChatReturn {
} catch (err) {
log.error('Failed to start chat response job:', err);
// Remove from pending on error
setPendingStreamThreadIds(prev => {
const next = new Set(prev);
setPendingStreamThreads(prev => {
const next = new Map(prev);
next.delete(targetThreadId);
return next;
});
Expand All @@ -537,7 +544,6 @@ export function useLearningChat(): UseLearningChatReturn {
createThread,
updateActiveThread,
refreshThreads,
threads,
]
);

Expand Down