From abd804b43711949180f23e132cb708d40553481d Mon Sep 17 00:00:00 2001 From: johnny Date: Sun, 13 Jul 2025 11:25:29 +0800 Subject: [PATCH 1/5] checkpoint: impl. --- backend/app.ts | 15 +- backend/handlers/chat.ts | 67 +++++- backend/handlers/resume.ts | 62 ++++++ backend/handlers/status.ts | 43 ++++ backend/pathUtils.test.ts | 6 +- backend/runtime/deno.ts | 20 +- backend/runtime/types.ts | 8 +- backend/streaming/streamingFileManager.ts | 223 +++++++++++++++++++ frontend/src/components/ChatPage.tsx | 90 +++++++- frontend/src/hooks/useNetworkRecovery.ts | 258 ++++++++++++++++++++++ frontend/src/types.ts | 9 + shared/types.ts | 27 +++ 12 files changed, 805 insertions(+), 23 deletions(-) create mode 100644 backend/handlers/resume.ts create mode 100644 backend/handlers/status.ts create mode 100644 backend/streaming/streamingFileManager.ts create mode 100644 frontend/src/hooks/useNetworkRecovery.ts diff --git a/backend/app.ts b/backend/app.ts index b2bd402e..3f4097c9 100644 --- a/backend/app.ts +++ b/backend/app.ts @@ -17,6 +17,9 @@ import { handleHistoriesRequest } from "./handlers/histories.ts"; import { handleConversationRequest } from "./handlers/conversations.ts"; import { handleChatRequest } from "./handlers/chat.ts"; import { handleAbortRequest } from "./handlers/abort.ts"; +import { handleResumeRequest } from "./handlers/resume.ts"; +import { handleStatusRequest } from "./handlers/status.ts"; +import { startCleanupInterval } from "./streaming/streamingFileManager.ts"; export interface AppConfig { debugMode: boolean; @@ -33,6 +36,9 @@ export function createApp( // Store AbortControllers for each request (shared with chat handler) const requestAbortControllers = new Map(); + // Start cleanup interval for streaming files + startCleanupInterval(runtime); + // CORS middleware app.use( "*", @@ -71,10 +77,11 @@ export function createApp( (c) => handleAbortRequest(c, requestAbortControllers), ); - app.post( - "/api/chat", - (c) => handleChatRequest(c, requestAbortControllers), - ); + app.post("/api/chat", (c) => handleChatRequest(c, requestAbortControllers)); + + app.get("/api/resume/:requestId", (c) => handleResumeRequest(c)); + + app.get("/api/status/:requestId", (c) => handleStatusRequest(c)); // Static file serving with SPA fallback // Serve static assets (CSS, JS, images, etc.) diff --git a/backend/handlers/chat.ts b/backend/handlers/chat.ts index c2754e31..d9ff8ed6 100644 --- a/backend/handlers/chat.ts +++ b/backend/handlers/chat.ts @@ -2,6 +2,10 @@ import { Context } from "hono"; import { AbortError, query } from "@anthropic-ai/claude-code"; import type { ChatRequest, StreamResponse } from "../../shared/types.ts"; import type { Runtime } from "../runtime/types.ts"; +import { + appendMessage, + initializeStreaming, +} from "../streaming/streamingFileManager.ts"; /** * Automatically determines Claude Code execution configuration @@ -48,6 +52,16 @@ function getClaudeExecutionConfig(claudePath: string, runtime: Runtime) { return createNodeConfig(actualPath); } +/** + * Get encoded project name from working directory + */ +function getEncodedProjectName(workingDirectory?: string): string | null { + if (!workingDirectory) return null; + + // Encode the directory path to match Claude's project naming convention + return encodeURIComponent(workingDirectory.replace(/\//g, "_")); +} + /** * Executes a Claude command and yields streaming responses * @param message - User message or command @@ -73,6 +87,7 @@ async function* executeClaudeCommand( debugMode?: boolean, ): AsyncGenerator { let abortController: AbortController; + const encodedProjectName = getEncodedProjectName(workingDirectory); try { // Process commands that start with '/' @@ -86,6 +101,11 @@ async function* executeClaudeCommand( abortController = new AbortController(); requestAbortControllers.set(requestId, abortController); + // Initialize streaming file if we have a project + if (encodedProjectName) { + await initializeStreaming(encodedProjectName, requestId, runtime); + } + // Use the validated Claude path from startup configuration (passed as parameter) // Get Claude Code execution configuration for migrate-installer compatibility @@ -110,25 +130,64 @@ async function* executeClaudeCommand( console.debug("---"); } - yield { + const response: StreamResponse = { type: "claude_json", data: sdkMessage, }; + + // Write to streaming file if we have a project + if (encodedProjectName) { + await appendMessage(encodedProjectName, requestId, response, runtime); + } + + yield response; + } + + const doneResponse: StreamResponse = { type: "done" }; + + // Write done message to streaming file + if (encodedProjectName) { + await appendMessage(encodedProjectName, requestId, doneResponse, runtime); } - yield { type: "done" }; + yield doneResponse; } catch (error) { // Check if error is due to abort if (error instanceof AbortError) { - yield { type: "aborted" }; + const abortedResponse: StreamResponse = { type: "aborted" }; + + // Write aborted message to streaming file + if (encodedProjectName) { + await appendMessage( + encodedProjectName, + requestId, + abortedResponse, + runtime, + ); + } + + yield abortedResponse; } else { if (debugMode) { console.error("Claude Code execution failed:", error); } - yield { + + const errorResponse: StreamResponse = { type: "error", error: error instanceof Error ? error.message : String(error), }; + + // Write error message to streaming file + if (encodedProjectName) { + await appendMessage( + encodedProjectName, + requestId, + errorResponse, + runtime, + ); + } + + yield errorResponse; } } finally { // Clean up AbortController from map diff --git a/backend/handlers/resume.ts b/backend/handlers/resume.ts new file mode 100644 index 00000000..d7119ef8 --- /dev/null +++ b/backend/handlers/resume.ts @@ -0,0 +1,62 @@ +/** + * Resume API handler + * Handles requests to resume streaming from a specific point after network interruption + */ + +import { Context } from "hono"; +import type { ResumeResponse } from "../../shared/types.ts"; +import { + getRequestStatus, + readStreamingFile, +} from "../streaming/streamingFileManager.ts"; + +/** + * Handles GET /api/resume/:requestId requests + * Returns messages from a specific index for resuming interrupted streams + */ +export async function handleResumeRequest(c: Context): Promise { + const requestId = c.req.param("requestId"); + const fromIndex = parseInt(c.req.query("fromIndex") || "0", 10); + const { runtime } = c.var.config; + + // Validate requestId + if (!requestId) { + return c.json({ error: "Request ID is required" }, 400); + } + + // Get request status + const status = getRequestStatus(requestId); + if (!status) { + return c.json({ error: "Request not found" }, 404); + } + + // Extract encoded project name from the file path + // Format: /home/user/.claude/projects/{encodedProjectName}/streaming/{requestId}.jsonl + const pathParts = status.filePath.split("/"); + const projectsIndex = pathParts.indexOf("projects"); + if (projectsIndex === -1 || projectsIndex + 2 >= pathParts.length) { + return c.json({ error: "Invalid file path structure" }, 500); + } + const encodedProjectName = pathParts[projectsIndex + 1]; + + try { + // Read messages from the streaming file + const messages = await readStreamingFile( + encodedProjectName, + requestId, + fromIndex, + runtime, + ); + + const response: ResumeResponse = { + messages, + totalMessages: status.totalMessages, + isComplete: status.status !== "in_progress", + }; + + return c.json(response); + } catch (error) { + console.error("Failed to read streaming file:", error); + return c.json({ error: "Failed to read streaming data" }, 500); + } +} diff --git a/backend/handlers/status.ts b/backend/handlers/status.ts new file mode 100644 index 00000000..37bc491d --- /dev/null +++ b/backend/handlers/status.ts @@ -0,0 +1,43 @@ +/** + * Status API handler + * Handles requests to check the status of a streaming request + */ + +import { Context } from "hono"; +import type { RequestStatus, StatusResponse } from "../../shared/types.ts"; +import { getRequestStatus } from "../streaming/streamingFileManager.ts"; + +/** + * Handles GET /api/status/:requestId requests + * Returns the current status of a streaming request + */ +export async function handleStatusRequest(c: Context): Promise { + const requestId = c.req.param("requestId"); + + // Validate requestId + if (!requestId) { + return c.json({ error: "Request ID is required" }, 400); + } + + // Get request status + const status = getRequestStatus(requestId); + + if (!status) { + const response: StatusResponse = { + requestId, + status: RequestStatus.NOT_FOUND, + totalMessages: 0, + lastUpdated: new Date().toISOString(), + }; + return c.json(response); + } + + const response: StatusResponse = { + requestId, + status: status.status, + totalMessages: status.totalMessages, + lastUpdated: status.lastUpdated.toISOString(), + }; + + return c.json(response); +} diff --git a/backend/pathUtils.test.ts b/backend/pathUtils.test.ts index 43d45286..dc8b14c1 100644 --- a/backend/pathUtils.test.ts +++ b/backend/pathUtils.test.ts @@ -5,7 +5,7 @@ import type { MiddlewareHandler } from "hono"; // Create a mock runtime for testing const mockRuntime: Runtime = { - getEnv: (key: string) => key === "HOME" ? "/mock/home" : undefined, + getEnv: (key: string) => (key === "HOME" ? "/mock/home" : undefined), async *readDir(_path: string) { // Mock empty directory - no entries // This async generator yields nothing, representing an empty directory @@ -47,6 +47,10 @@ const mockRuntime: Runtime = { serve: () => {}, createStaticFileMiddleware: (): MiddlewareHandler => () => Promise.resolve(new Response()), + appendTextFile: () => Promise.resolve(), + ensureDir: () => Promise.resolve(), + remove: () => Promise.resolve(), + removeDir: () => Promise.resolve(), }; describe("pathUtils", () => { diff --git a/backend/runtime/deno.ts b/backend/runtime/deno.ts index ef772216..f9259313 100644 --- a/backend/runtime/deno.ts +++ b/backend/runtime/deno.ts @@ -79,6 +79,22 @@ export class DenoRuntime implements Runtime { } } + async appendTextFile(path: string, content: string): Promise { + await Deno.writeTextFile(path, content, { append: true }); + } + + async ensureDir(path: string): Promise { + await Deno.mkdir(path, { recursive: true }); + } + + async remove(path: string): Promise { + await Deno.remove(path); + } + + async removeDir(path: string): Promise { + await Deno.remove(path, { recursive: true }); + } + getEnv(key: string): string | undefined { return Deno.env.get(key); } @@ -116,9 +132,7 @@ export class DenoRuntime implements Runtime { Deno.serve({ port, hostname }, handler); } - createStaticFileMiddleware( - options: { root: string }, - ): MiddlewareHandler { + createStaticFileMiddleware(options: { root: string }): MiddlewareHandler { return serveStatic(options); } } diff --git a/backend/runtime/types.ts b/backend/runtime/types.ts index 11a0da8d..7018629e 100644 --- a/backend/runtime/types.ts +++ b/backend/runtime/types.ts @@ -43,6 +43,10 @@ export interface Runtime { lstat(path: string): Promise; lstatSync(path: string): FileStats; readDir(path: string): AsyncIterable; + appendTextFile(path: string, content: string): Promise; + ensureDir(path: string): Promise; + remove(path: string): Promise; + removeDir(path: string): Promise; // Environment access getEnv(key: string): string | undefined; @@ -60,7 +64,5 @@ export interface Runtime { ): void; // Static file serving - createStaticFileMiddleware( - options: { root: string }, - ): MiddlewareHandler; + createStaticFileMiddleware(options: { root: string }): MiddlewareHandler; } diff --git a/backend/streaming/streamingFileManager.ts b/backend/streaming/streamingFileManager.ts new file mode 100644 index 00000000..ef5bac47 --- /dev/null +++ b/backend/streaming/streamingFileManager.ts @@ -0,0 +1,223 @@ +/** + * Streaming file management utilities + * Handles writing, reading, and cleaning up streaming response files + */ + +import type { Runtime } from "../runtime/types.ts"; +import type { RequestStatus, StreamResponse } from "../../shared/types.ts"; + +export interface StreamingFileInfo { + requestId: string; + filePath: string; + status: RequestStatus; + totalMessages: number; + lastUpdated: Date; +} + +// In-memory store for request statuses +const requestStatuses = new Map(); + +// Cleanup interval (5 minutes) +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; + +// Start periodic cleanup +let cleanupInterval: number | null = null; + +export function startCleanupInterval(runtime: Runtime) { + if (cleanupInterval) return; + + cleanupInterval = setInterval(() => { + cleanupExpiredFiles(runtime); + }, CLEANUP_INTERVAL_MS); +} + +export function stopCleanupInterval() { + if (cleanupInterval) { + clearInterval(cleanupInterval); + cleanupInterval = null; + } +} + +/** + * Get the streaming directory path for a project + */ +export function getStreamingDir( + encodedProjectName: string, + runtime: Runtime, +): string { + const homeDir = runtime.getEnv("HOME"); + if (!homeDir) { + throw new Error("HOME environment variable not found"); + } + return `${homeDir}/.claude/projects/${encodedProjectName}/streaming`; +} + +/** + * Get the file path for a specific request + */ +export function getStreamingFilePath( + encodedProjectName: string, + requestId: string, + runtime: Runtime, +): string { + const streamingDir = getStreamingDir(encodedProjectName, runtime); + return `${streamingDir}/${requestId}.jsonl`; +} + +/** + * Initialize streaming for a request + */ +export async function initializeStreaming( + encodedProjectName: string, + requestId: string, + runtime: Runtime, +): Promise { + const streamingDir = getStreamingDir(encodedProjectName, runtime); + + // Ensure streaming directory exists + await runtime.ensureDir(streamingDir); + + const filePath = getStreamingFilePath(encodedProjectName, requestId, runtime); + + // Initialize request status + const info: StreamingFileInfo = { + requestId, + filePath, + status: RequestStatus.IN_PROGRESS, + totalMessages: 0, + lastUpdated: new Date(), + }; + + requestStatuses.set(requestId, info); +} + +/** + * Append a message to the streaming file + */ +export async function appendMessage( + encodedProjectName: string, + requestId: string, + message: StreamResponse, + runtime: Runtime, +): Promise { + const filePath = getStreamingFilePath(encodedProjectName, requestId, runtime); + const info = requestStatuses.get(requestId); + + if (!info) { + throw new Error(`Request ${requestId} not initialized`); + } + + // Append message to file + const line = JSON.stringify(message) + "\n"; + await runtime.appendTextFile(filePath, line); + + // Update status + info.totalMessages++; + info.lastUpdated = new Date(); + + // Update status based on message type + if (message.type === "done") { + info.status = RequestStatus.COMPLETED; + } else if (message.type === "error") { + info.status = RequestStatus.FAILED; + } else if (message.type === "aborted") { + info.status = RequestStatus.ABORTED; + } +} + +/** + * Read messages from a streaming file + */ +export async function readStreamingFile( + encodedProjectName: string, + requestId: string, + fromIndex: number = 0, + runtime: Runtime, +): Promise { + const filePath = getStreamingFilePath(encodedProjectName, requestId, runtime); + + try { + const content = await runtime.readTextFile(filePath); + const lines = content + .trim() + .split("\n") + .filter((line) => line.trim()); + + const messages: StreamResponse[] = []; + for (let i = fromIndex; i < lines.length; i++) { + try { + const message = JSON.parse(lines[i]) as StreamResponse; + messages.push(message); + } catch (error) { + console.error(`Failed to parse line ${i} in ${filePath}:`, error); + } + } + + return messages; + } catch (error) { + if (error instanceof Deno.errors.NotFound) { + return []; + } + throw error; + } +} + +/** + * Get request status + */ +export function getRequestStatus(requestId: string): StreamingFileInfo | null { + return requestStatuses.get(requestId) || null; +} + +/** + * Clean up expired files + */ +async function cleanupExpiredFiles(runtime: Runtime): Promise { + const now = new Date(); + const expiredRequests: string[] = []; + + // Find expired requests + for (const [requestId, info] of requestStatuses.entries()) { + const age = now.getTime() - info.lastUpdated.getTime(); + if ( + age > CLEANUP_INTERVAL_MS && + info.status !== RequestStatus.IN_PROGRESS + ) { + expiredRequests.push(requestId); + } + } + + // Delete expired files and remove from status map + for (const requestId of expiredRequests) { + const info = requestStatuses.get(requestId); + if (info) { + try { + await runtime.remove(info.filePath); + } catch (error) { + console.error(`Failed to delete file ${info.filePath}:`, error); + } + requestStatuses.delete(requestId); + } + } + + if (expiredRequests.length > 0) { + console.log(`Cleaned up ${expiredRequests.length} expired streaming files`); + } +} + +/** + * Clean up all streaming files on shutdown + */ +export async function cleanupAllStreamingFiles( + encodedProjectName: string, + runtime: Runtime, +): Promise { + const streamingDir = getStreamingDir(encodedProjectName, runtime); + + try { + await runtime.removeDir(streamingDir); + requestStatuses.clear(); + } catch (error) { + console.error(`Failed to clean up streaming directory:`, error); + } +} diff --git a/frontend/src/components/ChatPage.tsx b/frontend/src/components/ChatPage.tsx index 37e6f495..52a5fffb 100644 --- a/frontend/src/components/ChatPage.tsx +++ b/frontend/src/components/ChatPage.tsx @@ -1,13 +1,14 @@ import { useEffect, useCallback, useState } from "react"; import { useLocation, useNavigate, useSearchParams } from "react-router-dom"; import { ChevronLeftIcon } from "@heroicons/react/24/outline"; -import type { ChatRequest, ChatMessage, ProjectInfo } from "../types"; +import type { ChatRequest, ChatMessage, ProjectInfo, StreamResponse } from "../types"; import { useTheme } from "../hooks/useTheme"; import { useClaudeStreaming } from "../hooks/useClaudeStreaming"; import { useChatState } from "../hooks/chat/useChatState"; import { usePermissions } from "../hooks/chat/usePermissions"; import { useAbortController } from "../hooks/chat/useAbortController"; import { useAutoHistoryLoader } from "../hooks/useHistoryLoader"; +import { useNetworkRecovery } from "../hooks/useNetworkRecovery"; import { ThemeToggle } from "./chat/ThemeToggle"; import { HistoryButton } from "./chat/HistoryButton"; import { ChatInput } from "./chat/ChatInput"; @@ -91,6 +92,39 @@ export function ChatPage() { initialSessionId: loadedSessionId || undefined, }); + const { + isRecovering, + retryCount, + trackMessage, + resetTracking, + handleNetworkError, + cleanup: cleanupNetworkRecovery + } = useNetworkRecovery({ + onNetworkError: () => { + console.log("Network error detected, attempting recovery..."); + }, + onRecoverySuccess: () => { + console.log("Successfully recovered from network error"); + }, + onRecoveryFailed: () => { + console.error("Failed to recover from network error"); + addMessage({ + type: "chat", + role: "assistant", + content: "Network connection lost and could not be recovered. Please check your connection and try again.", + timestamp: Date.now(), + }); + resetRequestState(); + }, + }); + + // Cleanup network recovery on unmount + useEffect(() => { + return () => { + cleanupNetworkRecovery(); + }; + }, [cleanupNetworkRecovery]); + const { allowedTools, permissionDialog, @@ -131,6 +165,7 @@ export function ChatPage() { if (!messageContent) clearInput(); startRequest(); + resetTracking(); // Reset message tracking for new request try { const response = await fetch(getChatUrl(), { @@ -185,6 +220,15 @@ export function ChatPage() { for (const line of lines) { if (shouldAbort) break; + + // Track message for recovery + try { + const parsed = JSON.parse(line) as StreamResponse; + trackMessage(parsed); + } catch { + // Ignore parse errors for tracking + } + processStreamLine(line, streamingContext); } @@ -192,14 +236,33 @@ export function ChatPage() { } } catch (error) { console.error("Failed to send message:", error); - addMessage({ - type: "chat", - role: "assistant", - content: "Error: Failed to get response", - timestamp: Date.now(), - }); + + // Try network recovery if applicable + const recovered = await handleNetworkError( + error, + requestId, + (resumedMessages: StreamResponse[]) => { + // Process resumed messages + for (const message of resumedMessages) { + const line = JSON.stringify(message); + processStreamLine(line, streamingContext); + } + } + ); + + if (!recovered) { + // Non-network error or recovery failed + addMessage({ + type: "chat", + role: "assistant", + content: "Error: Failed to get response", + timestamp: Date.now(), + }); + } } finally { - resetRequestState(); + if (!isRecovering) { + resetRequestState(); + } } }, [ @@ -223,6 +286,10 @@ export function ChatPage() { processStreamLine, handlePermissionError, createAbortHandler, + trackMessage, + resetTracking, + handleNetworkError, + isRecovering, ], ); @@ -470,6 +537,13 @@ export function ChatPage() { {/* Chat Messages */} + {/* Network Recovery Status */} + {isRecovering && ( +
+ Reconnecting... (Attempt {retryCount}) +
+ )} + {/* Input */} void; + onRecoveryStart?: () => void; + onRecoverySuccess?: () => void; + onRecoveryFailed?: () => void; +} + +interface NetworkRecoveryState { + isRecovering: boolean; + retryCount: number; + lastError: Error | null; +} + +export function useNetworkRecovery(options: NetworkRecoveryOptions = {}) { + const { + maxRetries = 5, + initialRetryDelay = 1000, + maxRetryDelay = 30000, + onNetworkError, + onRecoveryStart, + onRecoverySuccess, + onRecoveryFailed, + } = options; + + const [state, setState] = useState({ + isRecovering: false, + retryCount: 0, + lastError: null, + }); + + const retryTimeoutRef = useRef(null); + const messageIndexRef = useRef(0); + + /** + * Check if an error is a network error + */ + const isNetworkError = useCallback((error: unknown): boolean => { + if (error instanceof TypeError) { + // Common network error messages + const networkErrorMessages = [ + "Failed to fetch", + "NetworkError", + "Network request failed", + "ERR_INTERNET_DISCONNECTED", + "ERR_NETWORK_CHANGED", + ]; + return networkErrorMessages.some((msg) => error.message.includes(msg)); + } + return false; + }, []); + + /** + * Calculate retry delay with exponential backoff + */ + const calculateRetryDelay = useCallback( + (retryCount: number): number => { + const delay = Math.min( + initialRetryDelay * Math.pow(2, retryCount), + maxRetryDelay, + ); + // Add jitter to prevent thundering herd + return delay + Math.random() * 1000; + }, + [initialRetryDelay, maxRetryDelay], + ); + + /** + * Check request status + */ + const checkRequestStatus = useCallback( + async (requestId: string): Promise => { + const response = await fetch(`/api/status/${requestId}`); + if (!response.ok) { + throw new Error(`Status check failed: ${response.status}`); + } + return response.json(); + }, + [], + ); + + /** + * Resume streaming from a specific message index + */ + const resumeStreaming = useCallback( + async (requestId: string, fromIndex: number): Promise => { + const response = await fetch( + `/api/resume/${requestId}?fromIndex=${fromIndex}`, + ); + if (!response.ok) { + throw new Error(`Resume failed: ${response.status}`); + } + return response.json(); + }, + [], + ); + + /** + * Track processed messages + */ + const trackMessage = useCallback(() => { + messageIndexRef.current++; + }, []); + + /** + * Reset message tracking + */ + const resetTracking = useCallback(() => { + messageIndexRef.current = 0; + }, []); + + /** + * Handle network error and attempt recovery + */ + const handleNetworkError = useCallback( + async ( + error: unknown, + requestId: string, + onResumeMessages: (messages: StreamResponse[]) => void, + ): Promise => { + if (!isNetworkError(error)) { + return false; + } + + setState((prev) => ({ + ...prev, + lastError: error as Error, + })); + + onNetworkError?.(); + + // Clear any existing retry timeout + if (retryTimeoutRef.current) { + clearTimeout(retryTimeoutRef.current); + } + + // Attempt recovery with exponential backoff + let retryCount = 0; + + const attemptRecovery = async (): Promise => { + if (retryCount >= maxRetries) { + setState((prev) => ({ + ...prev, + isRecovering: false, + retryCount: 0, + })); + onRecoveryFailed?.(); + return false; + } + + setState((prev) => ({ + ...prev, + isRecovering: true, + retryCount: retryCount + 1, + })); + + if (retryCount === 0) { + onRecoveryStart?.(); + } + + try { + // Check if request is still in progress or completed + const status = await checkRequestStatus(requestId); + + if (status.status === "not_found") { + // Request was lost, cannot recover + throw new Error("Request not found"); + } + + // Resume from last processed message + const resumeResponse = await resumeStreaming( + requestId, + messageIndexRef.current, + ); + + // Process resumed messages + if (resumeResponse.messages.length > 0) { + onResumeMessages(resumeResponse.messages); + } + + setState((prev) => ({ + ...prev, + isRecovering: false, + retryCount: 0, + lastError: null, + })); + + onRecoverySuccess?.(); + return true; + } catch (recoveryError) { + if (!isNetworkError(recoveryError)) { + // Non-network error, stop retrying + setState((prev) => ({ + ...prev, + isRecovering: false, + retryCount: 0, + })); + onRecoveryFailed?.(); + return false; + } + + // Network still down, retry with backoff + retryCount++; + const delay = calculateRetryDelay(retryCount); + + return new Promise((resolve) => { + retryTimeoutRef.current = window.setTimeout(async () => { + const success = await attemptRecovery(); + resolve(success); + }, delay); + }); + } + }; + + return attemptRecovery(); + }, + [ + isNetworkError, + maxRetries, + onNetworkError, + onRecoveryStart, + onRecoverySuccess, + onRecoveryFailed, + checkRequestStatus, + resumeStreaming, + calculateRetryDelay, + ], + ); + + /** + * Cleanup function + */ + const cleanup = useCallback(() => { + if (retryTimeoutRef.current) { + clearTimeout(retryTimeoutRef.current); + retryTimeoutRef.current = null; + } + resetTracking(); + }, [resetTracking]); + + return { + ...state, + trackMessage, + resetTracking, + handleNetworkError, + cleanup, + }; +} diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 882fb7e1..8ff0cf15 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -5,6 +5,15 @@ import type { SDKResultMessage, } from "@anthropic-ai/claude-code"; +// Re-export shared types for frontend use +export type { + StreamResponse, + ChatRequest, + ResumeResponse, + StatusResponse, + RequestStatus, +} from "../../../shared/types"; + // Chat message for user/assistant interactions (not part of SDKMessage) export interface ChatMessage { type: "chat"; diff --git a/shared/types.ts b/shared/types.ts index 71f64e75..ca7e1290 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -51,3 +51,30 @@ export interface ConversationHistory { }; } +// Resume API types +export interface ResumeRequest { + requestId: string; + fromIndex?: number; +} + +export interface ResumeResponse { + messages: StreamResponse[]; + totalMessages: number; + isComplete: boolean; +} + +// Status API types +export enum RequestStatus { + IN_PROGRESS = "in_progress", + COMPLETED = "completed", + FAILED = "failed", + ABORTED = "aborted", + NOT_FOUND = "not_found", +} + +export interface StatusResponse { + requestId: string; + status: RequestStatus; + totalMessages: number; + lastUpdated: string; +} From 05030e9325193eec669383716ee17310aaade6c4 Mon Sep 17 00:00:00 2001 From: johnny Date: Sun, 13 Jul 2025 12:07:46 +0800 Subject: [PATCH 2/5] chechpoint: fix errors of make check --- backend/handlers/status.ts | 5 +-- backend/streaming/streamingFileManager.ts | 5 +-- frontend/src/components/ChatPage.tsx | 32 +++++++++++-------- .../DemoPermissionDialogWrapper.tsx | 19 +++++------ frontend/src/hooks/useDemoAutomation.ts | 22 ++++++++----- frontend/src/hooks/useNetworkRecovery.ts | 11 +++++-- 6 files changed, 58 insertions(+), 36 deletions(-) diff --git a/backend/handlers/status.ts b/backend/handlers/status.ts index 37bc491d..4930c83c 100644 --- a/backend/handlers/status.ts +++ b/backend/handlers/status.ts @@ -4,14 +4,15 @@ */ import { Context } from "hono"; -import type { RequestStatus, StatusResponse } from "../../shared/types.ts"; +import type { StatusResponse } from "../../shared/types.ts"; +import { RequestStatus } from "../../shared/types.ts"; import { getRequestStatus } from "../streaming/streamingFileManager.ts"; /** * Handles GET /api/status/:requestId requests * Returns the current status of a streaming request */ -export async function handleStatusRequest(c: Context): Promise { +export function handleStatusRequest(c: Context): Response { const requestId = c.req.param("requestId"); // Validate requestId diff --git a/backend/streaming/streamingFileManager.ts b/backend/streaming/streamingFileManager.ts index ef5bac47..0b48f019 100644 --- a/backend/streaming/streamingFileManager.ts +++ b/backend/streaming/streamingFileManager.ts @@ -4,7 +4,8 @@ */ import type { Runtime } from "../runtime/types.ts"; -import type { RequestStatus, StreamResponse } from "../../shared/types.ts"; +import type { StreamResponse } from "../../shared/types.ts"; +import { RequestStatus } from "../../shared/types.ts"; export interface StreamingFileInfo { requestId: string; @@ -21,7 +22,7 @@ const requestStatuses = new Map(); const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // Start periodic cleanup -let cleanupInterval: number | null = null; +let cleanupInterval: ReturnType | null = null; export function startCleanupInterval(runtime: Runtime) { if (cleanupInterval) return; diff --git a/frontend/src/components/ChatPage.tsx b/frontend/src/components/ChatPage.tsx index 52a5fffb..1c05cf20 100644 --- a/frontend/src/components/ChatPage.tsx +++ b/frontend/src/components/ChatPage.tsx @@ -1,7 +1,12 @@ import { useEffect, useCallback, useState } from "react"; import { useLocation, useNavigate, useSearchParams } from "react-router-dom"; import { ChevronLeftIcon } from "@heroicons/react/24/outline"; -import type { ChatRequest, ChatMessage, ProjectInfo, StreamResponse } from "../types"; +import type { + ChatRequest, + ChatMessage, + ProjectInfo, + StreamResponse, +} from "../types"; import { useTheme } from "../hooks/useTheme"; import { useClaudeStreaming } from "../hooks/useClaudeStreaming"; import { useChatState } from "../hooks/chat/useChatState"; @@ -92,13 +97,13 @@ export function ChatPage() { initialSessionId: loadedSessionId || undefined, }); - const { - isRecovering, - retryCount, - trackMessage, - resetTracking, + const { + isRecovering, + retryCount, + trackMessage, + resetTracking, handleNetworkError, - cleanup: cleanupNetworkRecovery + cleanup: cleanupNetworkRecovery, } = useNetworkRecovery({ onNetworkError: () => { console.log("Network error detected, attempting recovery..."); @@ -111,7 +116,8 @@ export function ChatPage() { addMessage({ type: "chat", role: "assistant", - content: "Network connection lost and could not be recovered. Please check your connection and try again.", + content: + "Network connection lost and could not be recovered. Please check your connection and try again.", timestamp: Date.now(), }); resetRequestState(); @@ -220,7 +226,7 @@ export function ChatPage() { for (const line of lines) { if (shouldAbort) break; - + // Track message for recovery try { const parsed = JSON.parse(line) as StreamResponse; @@ -228,7 +234,7 @@ export function ChatPage() { } catch { // Ignore parse errors for tracking } - + processStreamLine(line, streamingContext); } @@ -236,7 +242,7 @@ export function ChatPage() { } } catch (error) { console.error("Failed to send message:", error); - + // Try network recovery if applicable const recovered = await handleNetworkError( error, @@ -247,9 +253,9 @@ export function ChatPage() { const line = JSON.stringify(message); processStreamLine(line, streamingContext); } - } + }, ); - + if (!recovered) { // Non-network error or recovery failed addMessage({ diff --git a/frontend/src/components/DemoPermissionDialogWrapper.tsx b/frontend/src/components/DemoPermissionDialogWrapper.tsx index e382db8c..5536069e 100644 --- a/frontend/src/components/DemoPermissionDialogWrapper.tsx +++ b/frontend/src/components/DemoPermissionDialogWrapper.tsx @@ -38,17 +38,19 @@ export function DemoPermissionDialogWrapper({ // Auto-click effect with focus sequence animation useEffect(() => { if (autoClickButton && permissionDialogProps.isOpen) { + const timers = timersRef.current; + if (autoClickButton === "allowPermanent") { // For allowPermanent: sequence 1st → 2nd button setActiveButton("allow"); - timersRef.current.focus = setTimeout(() => { + timers.focus = setTimeout(() => { setActiveButton("allowPermanent"); }, 500); - timersRef.current.action = setTimeout(() => { + timers.action = setTimeout(() => { setClickedButton("allowPermanent"); - timersRef.current.clickEffect = setTimeout(() => { + timers.clickEffect = setTimeout(() => { onAllowPermanent(); }, 200); }, 1200); @@ -56,19 +58,18 @@ export function DemoPermissionDialogWrapper({ // For allow: direct focus on button setActiveButton("allow"); - timersRef.current.action = setTimeout(() => { + timers.action = setTimeout(() => { setClickedButton("allow"); - timersRef.current.clickEffect = setTimeout(() => { + timers.clickEffect = setTimeout(() => { onAllow(); }, 200); }, 700); } return () => { - if (timersRef.current.focus) clearTimeout(timersRef.current.focus); - if (timersRef.current.action) clearTimeout(timersRef.current.action); - if (timersRef.current.clickEffect) - clearTimeout(timersRef.current.clickEffect); + if (timers.focus) clearTimeout(timers.focus); + if (timers.action) clearTimeout(timers.action); + if (timers.clickEffect) clearTimeout(timers.clickEffect); }; } }, [ diff --git a/frontend/src/hooks/useDemoAutomation.ts b/frontend/src/hooks/useDemoAutomation.ts index f087026b..db168a1f 100644 --- a/frontend/src/hooks/useDemoAutomation.ts +++ b/frontend/src/hooks/useDemoAutomation.ts @@ -461,14 +461,19 @@ export function useDemoAutomation( // Cleanup on unmount useEffect(() => { return () => { - if (stepTimeoutRef.current) { - clearTimeout(stepTimeoutRef.current); + const stepTimeout = stepTimeoutRef.current; + // eslint-disable-next-line react-hooks/exhaustive-deps + const typingTimeout = typingTimeoutRef.current; + const typingInterval = typingIntervalRef.current; + + if (stepTimeout) { + clearTimeout(stepTimeout); } - if (typingTimeoutRef.current) { - clearTimeout(typingTimeoutRef.current); + if (typingTimeout) { + clearTimeout(typingTimeout); } - if (typingIntervalRef.current) { - clearTimeout(typingIntervalRef.current); + if (typingInterval) { + clearTimeout(typingInterval); } }; }, []); @@ -540,8 +545,9 @@ export function useTypingAnimation( useEffect(() => { return () => { - if (intervalRef.current) { - clearTimeout(intervalRef.current); + const interval = intervalRef.current; + if (interval) { + clearTimeout(interval); } }; }, []); diff --git a/frontend/src/hooks/useNetworkRecovery.ts b/frontend/src/hooks/useNetworkRecovery.ts index c0bb0262..a134f3b2 100644 --- a/frontend/src/hooks/useNetworkRecovery.ts +++ b/frontend/src/hooks/useNetworkRecovery.ts @@ -41,6 +41,7 @@ export function useNetworkRecovery(options: NetworkRecoveryOptions = {}) { const retryTimeoutRef = useRef(null); const messageIndexRef = useRef(0); + const messageHistoryRef = useRef>([]); /** * Check if an error is a network error @@ -108,8 +109,14 @@ export function useNetworkRecovery(options: NetworkRecoveryOptions = {}) { /** * Track processed messages */ - const trackMessage = useCallback(() => { - messageIndexRef.current++; + const trackMessage = useCallback((message: string) => { + const id = messageIndexRef.current++; + messageHistoryRef.current.push({ id, message }); + // Optional: limit history size + if (messageHistoryRef.current.length > 100) { + messageHistoryRef.current.shift(); + } + return id; // Return the message ID for reference }, []); /** From 1f0622b426903b51ad8fa90483487109303ed5c9 Mon Sep 17 00:00:00 2001 From: johnny Date: Sun, 13 Jul 2025 12:21:32 +0800 Subject: [PATCH 3/5] checkpoint: fix all errors of make. --- backend/streaming/streamingFileManager.ts | 40 ++++++++------- frontend/src/components/ChatPage.tsx | 59 +++++++++++------------ frontend/src/types.ts | 4 +- shared/types.ts | 16 +++--- 4 files changed, 56 insertions(+), 63 deletions(-) diff --git a/backend/streaming/streamingFileManager.ts b/backend/streaming/streamingFileManager.ts index 0b48f019..1aab0da4 100644 --- a/backend/streaming/streamingFileManager.ts +++ b/backend/streaming/streamingFileManager.ts @@ -137,30 +137,28 @@ export async function readStreamingFile( ): Promise { const filePath = getStreamingFilePath(encodedProjectName, requestId, runtime); - try { - const content = await runtime.readTextFile(filePath); - const lines = content - .trim() - .split("\n") - .filter((line) => line.trim()); - - const messages: StreamResponse[] = []; - for (let i = fromIndex; i < lines.length; i++) { - try { - const message = JSON.parse(lines[i]) as StreamResponse; - messages.push(message); - } catch (error) { - console.error(`Failed to parse line ${i} in ${filePath}:`, error); - } - } + // Check if file exists before reading + if (!(await runtime.exists(filePath))) { + return []; + } - return messages; - } catch (error) { - if (error instanceof Deno.errors.NotFound) { - return []; + const content = await runtime.readTextFile(filePath); + const lines = content + .trim() + .split("\n") + .filter((line) => line.trim()); + + const messages: StreamResponse[] = []; + for (let i = fromIndex; i < lines.length; i++) { + try { + const message = JSON.parse(lines[i]) as StreamResponse; + messages.push(message); + } catch (error) { + console.error(`Failed to parse line ${i} in ${filePath}:`, error); } - throw error; } + + return messages; } /** diff --git a/frontend/src/components/ChatPage.tsx b/frontend/src/components/ChatPage.tsx index 1c05cf20..bc770d1d 100644 --- a/frontend/src/components/ChatPage.tsx +++ b/frontend/src/components/ChatPage.tsx @@ -173,6 +173,32 @@ export function ChatPage() { startRequest(); resetTracking(); // Reset message tracking for new request + // Local state for this streaming session + let localHasReceivedInit = false; + let shouldAbort = false; + + const streamingContext: StreamingContext = { + currentAssistantMessage, + setCurrentAssistantMessage, + addMessage, + updateLastMessage, + onSessionId: setCurrentSessionId, + shouldShowInitMessage: () => !hasShownInitMessage, + onInitMessageShown: () => setHasShownInitMessage(true), + get hasReceivedInit() { + return localHasReceivedInit; + }, + setHasReceivedInit: (received: boolean) => { + localHasReceivedInit = received; + setHasReceivedInit(received); + }, + onPermissionError: handlePermissionError, + onAbortRequest: async () => { + shouldAbort = true; + await createAbortHandler(requestId)(); + }, + }; + try { const response = await fetch(getChatUrl(), { method: "POST", @@ -191,32 +217,6 @@ export function ChatPage() { const reader = response.body.getReader(); const decoder = new TextDecoder(); - // Local state for this streaming session - let localHasReceivedInit = false; - let shouldAbort = false; - - const streamingContext: StreamingContext = { - currentAssistantMessage, - setCurrentAssistantMessage, - addMessage, - updateLastMessage, - onSessionId: setCurrentSessionId, - shouldShowInitMessage: () => !hasShownInitMessage, - onInitMessageShown: () => setHasShownInitMessage(true), - get hasReceivedInit() { - return localHasReceivedInit; - }, - setHasReceivedInit: (received: boolean) => { - localHasReceivedInit = received; - setHasReceivedInit(received); - }, - onPermissionError: handlePermissionError, - onAbortRequest: async () => { - shouldAbort = true; - await createAbortHandler(requestId)(); - }, - }; - while (true) { const { done, value } = await reader.read(); if (done || shouldAbort) break; @@ -228,12 +228,7 @@ export function ChatPage() { if (shouldAbort) break; // Track message for recovery - try { - const parsed = JSON.parse(line) as StreamResponse; - trackMessage(parsed); - } catch { - // Ignore parse errors for tracking - } + trackMessage(line); processStreamLine(line, streamingContext); } diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 8ff0cf15..7ce75ea6 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -7,12 +7,10 @@ import type { // Re-export shared types for frontend use export type { - StreamResponse, - ChatRequest, ResumeResponse, StatusResponse, RequestStatus, -} from "../../../shared/types"; +} from "../../shared/types"; // Chat message for user/assistant interactions (not part of SDKMessage) export interface ChatMessage { diff --git a/shared/types.ts b/shared/types.ts index ca7e1290..aee0b01d 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -64,13 +64,15 @@ export interface ResumeResponse { } // Status API types -export enum RequestStatus { - IN_PROGRESS = "in_progress", - COMPLETED = "completed", - FAILED = "failed", - ABORTED = "aborted", - NOT_FOUND = "not_found", -} +export const RequestStatus = { + IN_PROGRESS: "in_progress", + COMPLETED: "completed", + FAILED: "failed", + ABORTED: "aborted", + NOT_FOUND: "not_found", +} as const; + +export type RequestStatus = (typeof RequestStatus)[keyof typeof RequestStatus]; export interface StatusResponse { requestId: string; From 4c7d3b6c6ace23b5f514feca6d7193dc9cbd15d1 Mon Sep 17 00:00:00 2001 From: johnny Date: Sun, 13 Jul 2025 13:21:45 +0800 Subject: [PATCH 4/5] use relative path for local streaming file. --- .gitignore | 4 +++- backend/deno.json | 4 ++-- backend/streaming/streamingFileManager.ts | 15 ++++++++++----- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 06896b04..e7233dc5 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,6 @@ backend/dist/ frontend/demo-recordings/ frontend/test-results/ frontend/playwright-report/ -frontend/playwright/.cache/ \ No newline at end of file +frontend/playwright/.cache/ + +backend/.streaming/ diff --git a/backend/deno.json b/backend/deno.json index 1cccfb85..785019f3 100644 --- a/backend/deno.json +++ b/backend/deno.json @@ -11,8 +11,8 @@ "tasks": { "generate-version": "node scripts/generate-version.js", "copy-frontend": "node scripts/copy-frontend.js", - "dev": "deno task generate-version && dotenvx run --env-file=../.env -- deno run --allow-net --allow-run --allow-read --allow-env --watch cli/deno.ts --debug", - "build": "deno task generate-version && deno task copy-frontend && deno compile --allow-net --allow-run --allow-read --allow-env --include ./dist/static --output ../dist/claude-code-webui cli/deno.ts", + "dev": "deno task generate-version && dotenvx run --env-file=../.env -- deno run --allow-net --allow-run --allow-read --allow-write=./.streaming --allow-env --watch cli/deno.ts --debug", + "build": "deno task generate-version && deno task copy-frontend && deno compile --allow-net --allow-run --allow-read --allow-write=./.streaming --allow-env --include ./dist/static --output ../dist/claude-code-webui cli/deno.ts", "format": "deno fmt", "lint": "deno lint", "check": "deno check", diff --git a/backend/streaming/streamingFileManager.ts b/backend/streaming/streamingFileManager.ts index 1aab0da4..9ad7572a 100644 --- a/backend/streaming/streamingFileManager.ts +++ b/backend/streaming/streamingFileManager.ts @@ -3,6 +3,7 @@ * Handles writing, reading, and cleaning up streaming response files */ +import process from "node:process"; import type { Runtime } from "../runtime/types.ts"; import type { StreamResponse } from "../../shared/types.ts"; import { RequestStatus } from "../../shared/types.ts"; @@ -40,17 +41,21 @@ export function stopCleanupInterval() { } /** - * Get the streaming directory path for a project + * Get the streaming directory path for a project relative to current working directory */ export function getStreamingDir( encodedProjectName: string, runtime: Runtime, ): string { - const homeDir = runtime.getEnv("HOME"); - if (!homeDir) { - throw new Error("HOME environment variable not found"); + // Check if there's a custom streaming base directory set in environment + const customDir = runtime.getEnv("CLAUDE_STREAMING_DIR"); + if (customDir) { + return `${customDir}/${encodedProjectName}`; } - return `${homeDir}/.claude/projects/${encodedProjectName}/streaming`; + + // Otherwise, use current working directory + const cwd = process.cwd(); + return `${cwd}/.streaming/${encodedProjectName}`; } /** From 52a81dec0ee447680c23659cc956042f241691d7 Mon Sep 17 00:00:00 2001 From: johnny Date: Sun, 13 Jul 2025 14:36:50 +0800 Subject: [PATCH 5/5] feat: Implement streaming message integration in conversation loader - Added functionality to load additional messages from streaming files based on session ID. - Merged non-duplicate streaming messages with existing conversation history. - Updated conversation metadata, including message count and end time. - Introduced a new utility function `getStreamingMessages` to retrieve messages from streaming files. This enhancement improves the conversation loading process by ensuring that the latest messages are included, providing a more comprehensive chat history. --- backend/history/conversationLoader.ts | 66 +++++++++++++++++- backend/streaming/streamingFileManager.ts | 82 +++++++++++++++++++++++ 2 files changed, 147 insertions(+), 1 deletion(-) diff --git a/backend/history/conversationLoader.ts b/backend/history/conversationLoader.ts index 3ab46843..3417a8e5 100644 --- a/backend/history/conversationLoader.ts +++ b/backend/history/conversationLoader.ts @@ -8,6 +8,7 @@ import type { ConversationHistory } from "../../shared/types.ts"; import type { Runtime } from "../runtime/types.ts"; import { processConversationMessages } from "./timestampRestore.ts"; import { validateEncodedProjectName } from "./pathUtils.ts"; +import { getStreamingMessages } from "../streaming/streamingFileManager.ts"; /** * Load a specific conversation by session ID @@ -47,6 +48,66 @@ export async function loadConversation( sessionId, runtime, ); + + // Check for additional messages in streaming files + const streamingMessages = await getStreamingMessages( + encodedProjectName, + sessionId, + runtime, + ); + + if (streamingMessages.length > 0) { + // Merge streaming messages with conversation history + // The streaming messages might contain duplicates or newer messages + const existingMessageIds = new Set(); + + // Build a set of existing message IDs/timestamps for deduplication + for (const msg of conversationHistory.messages) { + if (typeof msg === "object" && msg !== null && "timestamp" in msg) { + existingMessageIds.add(JSON.stringify(msg)); + } + } + + // Add non-duplicate streaming messages + let newMessagesAdded = 0; + for (const streamingMsg of streamingMessages) { + const msgKey = JSON.stringify(streamingMsg); + if (!existingMessageIds.has(msgKey)) { + conversationHistory.messages.push(streamingMsg); + newMessagesAdded++; + } + } + + if (newMessagesAdded > 0) { + // Re-sort messages by timestamp if new messages were added + conversationHistory.messages.sort( + (a: unknown, b: unknown) => { + const timeA = (a as { timestamp?: number }).timestamp || 0; + const timeB = (b as { timestamp?: number }).timestamp || 0; + return timeA - timeB; + }, + ); + + // Update metadata + conversationHistory.metadata.messageCount = + conversationHistory.messages.length; + if (conversationHistory.messages.length > 0) { + const lastMsg = conversationHistory.messages[ + conversationHistory.messages.length - 1 + ] as { timestamp?: number }; + if (lastMsg.timestamp) { + conversationHistory.metadata.endTime = new Date( + lastMsg.timestamp, + ).toISOString(); + } + } + + console.log( + `[ConversationLoader] Merged ${newMessagesAdded} additional messages from streaming files for session ${sessionId}`, + ); + } + } + return conversationHistory; } catch (error) { throw error; // Re-throw any parsing errors @@ -63,7 +124,10 @@ async function parseConversationFile( runtime: Runtime, ): Promise { const content = await runtime.readTextFile(filePath); - const lines = content.trim().split("\n").filter((line) => line.trim()); + const lines = content + .trim() + .split("\n") + .filter((line) => line.trim()); if (lines.length === 0) { throw new Error("Empty conversation file"); diff --git a/backend/streaming/streamingFileManager.ts b/backend/streaming/streamingFileManager.ts index 9ad7572a..f63897b6 100644 --- a/backend/streaming/streamingFileManager.ts +++ b/backend/streaming/streamingFileManager.ts @@ -225,3 +225,85 @@ export async function cleanupAllStreamingFiles( console.error(`Failed to clean up streaming directory:`, error); } } + +/** + * Get streaming messages for a specific sessionId + * This searches through streaming files to find messages with matching sessionId + */ +export async function getStreamingMessages( + encodedProjectName: string, + sessionId: string, + runtime: Runtime, +): Promise { + const streamingDir = getStreamingDir(encodedProjectName, runtime); + + // Check if streaming directory exists + if (!(await runtime.exists(streamingDir))) { + return []; + } + + const messages: unknown[] = []; + + try { + // List all files in the streaming directory + const entries: string[] = []; + for await (const entry of runtime.readDir(streamingDir)) { + if (entry.isFile && entry.name.endsWith(".jsonl")) { + entries.push(entry.name); + } + } + + // Process each streaming file + for (const file of entries) { + const filePath = `${streamingDir}/${file}`; + const content = await runtime.readTextFile(filePath); + const lines = content + .trim() + .split("\n") + .filter((line) => line.trim()); + + // Check each message for matching sessionId + for (const line of lines) { + try { + const parsed = JSON.parse(line); + + // Check if this is a claude_json message with SDK content + if ( + parsed.type === "claude_json" && + parsed.content?.type === "sdk" + ) { + const sdkMessage = parsed.content.message; + + // Extract sessionId from different message types + let messageSessionId: string | undefined; + + if (sdkMessage.type === "system" && sdkMessage.session_id) { + messageSessionId = sdkMessage.session_id; + } else if ( + sdkMessage.type === "assistant" && + sdkMessage.session_id + ) { + messageSessionId = sdkMessage.session_id; + } else if ( + sdkMessage.type === "result" && + sdkMessage.session_id + ) { + messageSessionId = sdkMessage.session_id; + } + + // If sessionId matches, add the SDK message + if (messageSessionId === sessionId) { + messages.push(sdkMessage); + } + } + } catch (error) { + console.error(`Failed to parse streaming message:`, error); + } + } + } + } catch (error) { + console.error(`Failed to read streaming directory:`, error); + } + + return messages; +}