-
Notifications
You must be signed in to change notification settings - Fork 2
feat: create internal/stream/filestore.go for FileStore-backed strea... #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
thruflo
wants to merge
27
commits into
main
Choose a base branch
from
durable-streams-refactor
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Create the foundation for durable stream communication between wisp-sprite and clients. Defines event types (session, task, claude_event, input_request), command types (kill, background, input_response), and acknowledgment messages. Includes: - types.go: Event struct with JSON serialization, typed data accessors, command and ack creation helpers - types_test.go: Comprehensive unit tests for serialization round-trips 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add FileStore struct that provides file-based persistent storage for stream events on the Sprite VM. Events are stored as newline-delimited JSON (NDJSON) with automatic sequence number assignment. Features: - Append() writes events with assigned sequence numbers and fsync - Read(fromSeq) reads events from a given sequence number - Subscribe() provides polling-based event streaming via channels - Thread-safe with mutex protection for concurrent access - Handles malformed lines gracefully (skips them) - Creates directories as needed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add StreamClient that connects to the stream server on a Sprite: - Connect() tests connection to server via /state endpoint - Subscribe() returns channel of events via SSE with auto-reconnect - SendCommand() POSTs commands and returns acknowledgments - GetState() fetches current state snapshot - Automatic reconnection with configurable retry interval and max attempts - Support for auth tokens and custom HTTP clients Includes comprehensive tests with mock HTTP server. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Create internal/spriteloop package with iteration loop logic designed to run directly on Sprite VMs. This package extracts and adapts the core loop from internal/loop but for local execution: - loop.go: Main iteration loop with exit conditions (done, blocked, max iterations, max duration, stuck detection, user commands) - executor.go: ClaudeExecutor interface for Claude command execution with LocalExecutor implementation and MockExecutor for testing - doc.go: Package documentation Key features: - Direct file access (no SSH) for state files in /var/local/wisp/session/ - FileStore integration for publishing events to durable stream - Command handling (kill, background, input_response) via channel - Stuck detection based on progress history - Comprehensive test coverage 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add types and functions for parsing Claude Code stream-json output. This enables monitoring and extracting information from Claude's execution during the iteration loop: - StreamEvent types for system/init, assistant, user, and result events - ContentBlock parsing for text, tool_use, and tool_result content - StreamState for tracking accumulated progress (tool calls, turns, cost) - StreamParser for convenient line-by-line processing with callbacks - ToolInput parsing for extracting common tool parameters The parsing separates Claude output format knowledge from loop/executor logic, enabling clients to understand execution progress in real-time. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add CommandProcessor to handle incoming commands from the stream. Implements kill, background, and input_response command handlers with proper acknowledgment publishing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add Server struct that provides HTTP endpoints for communication between TUI/web clients and the Sprite VM: - GET /stream: SSE endpoint for real-time event streaming - POST /command: Receive commands (kill, background, input_response) - GET /state: Current state snapshot with session, tasks, pending input - GET /health: Health check with last sequence number Features: - Bearer token authentication (optional) with query param fallback - Graceful shutdown support - Configurable polling interval - Keepalive SSE comments to prevent connection timeouts - from_seq parameter for catching up on missed events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Create cmd/wisp-sprite/main.go that runs on the Sprite VM to execute the Claude Code iteration loop. The binary provides: - Command-line flags for port, session-dir, work-dir, template-dir, token - FileStore initialization for durable event persistence - HTTP server for stream and command endpoints - CommandProcessor for handling kill/background/input commands - Signal handling (SIGINT/SIGTERM) for graceful shutdown - Main loop execution until completion or exit condition 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add Makefile target to cross-compile wisp-sprite binary for Linux/amd64: - Sets CGO_ENABLED=0 for static linking (no libc dependencies) - Sets GOOS=linux GOARCH=amd64 for Sprite VM compatibility - Outputs to bin/wisp-sprite directory - Updates clean target to remove bin directory 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add SpriteRunner functions to internal/cli/start.go: - UploadSpriteRunner: uploads wisp-sprite binary to /var/local/wisp/bin/wisp-sprite - StartSpriteRunner: starts wisp-sprite with nohup to survive disconnect - WaitForSpriteRunner: polls /health endpoint until server is ready - ConnectToSpriteStream: creates StreamClient connected to Sprite's stream server Update SetupSprite to upload the wisp-sprite binary during Sprite setup. The binary is uploaded but not started in SetupSprite - it will be started by the caller after task generation. Add constants for SpriteRunner paths and port: - SpriteRunnerPort: 8374 - SpriteRunnerBinaryPath: /var/local/wisp/bin/wisp-sprite - SpriteRunnerPIDPath: /var/local/wisp/wisp-sprite.pid - SpriteRunnerLogPath: /var/local/wisp/wisp-sprite.log - LocalSpriteRunnerPath: bin/wisp-sprite Add unit test for constants. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add IsSpriteRunnerRunning function to check if the wisp-sprite process is running on the Sprite by checking the PID file and verifying the process is alive. Add ConnectOrRestartSpriteRunner function that: - Checks if wisp-sprite is running on the Sprite - If running, connects to the existing stream server - If not running, uploads the binary (if needed) and starts it - Returns a stream client connected to the process These functions support the resume flow where the TUI reconnects to a running wisp-sprite process after a disconnect, or restarts the process if it has stopped. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ication Add StreamClient field and stream event handling to TUI for durable streams: - Add SetStreamClient/GetStreamClient methods for configuring stream client - Add HandleStreamEvent method to process session, task, claude, and input events - Add UpdateFromSnapshot method for initial state sync and reconnection - Create stream.go with StreamRunner for running TUI with stream subscription - Convert user actions (kill, background, input response) to stream commands - Maintain full backward compatibility with existing loop integration 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…clients This change adapts internal/server/streams.go to work with the stream client, enabling the web server to act as a relay between the Sprite and web clients. Key changes: - Add NewRelayStreamManager() for creating a manager in relay mode - Add StartRelay()/StopRelay() to manage the event forwarding loop - Add methods to forward commands to Sprite (SendCommandToSprite, etc.) - Update Server config to support SpriteURL and SpriteAuthToken - Update handleInput to forward input responses to Sprite in relay mode - Add conversion functions between stream.* types and server.* types - Add comprehensive tests for relay mode functionality The server now supports two modes: 1. Local mode: Events stored locally (for testing or single-machine setup) 2. Relay mode: Events relayed from remote Sprite via StreamClient 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The loop package now coordinates with wisp-sprite rather than running iterations directly. The iteration logic has been moved to spriteloop. Changes: - Remove runIteration, buildClaudeArgs, parseStreamJSON, streamOutput - Remove duration/budget/stuck checking (now in spriteloop) - Remove handleNeedsInput, updateTUIState (now via stream events) - Add stream client integration for wisp-sprite communication - Add handlers for stream events (session, task, claude, input_request) - Add handlers for TUI actions (kill, background, input) - Update tests for simplified interface 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The internal/loop package has been refactored to coordinate with wisp-sprite rather than run iterations directly. Updated doc.go to accurately describe the package's new responsibilities: stream processing, TUI coordination, event broadcasting, and state syncing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Added comprehensive tests for: - loop.go: LimitsFromConfig, CommandCh, writeResponse, handleNeedsInput (with response, kill, background, input response commands, context cancel), checkCommands, handleCommand, buildClaudeArgs edge cases, readTasks error handling, allTasksComplete, publishEvent/publishClaudeEvent edge cases - server.go: Start/Stop lifecycle, health endpoint integration test, command endpoint without processor - executor.go: NewLocalExecutor, Execute with no args, simple command execution, context cancellation, command callback, MockExecutor Coverage increased from 65.5% to 84.7% (>80% target achieved). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add integration tests verifying the durable stream architecture handles network disconnections gracefully. These tests demonstrate: - Events persist in FileStore during client disconnect - Clients can reconnect and catch up using stored lastSeq - No Claude output is lost during disconnect/reconnect cycles - Sequence numbers remain contiguous without gaps 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Update AGENTS.md to document the new package structure and durable stream architecture: - Add cmd/wisp-sprite and internal packages (spriteloop, stream, loop, server) - Document durable stream architecture with event flow diagram - Add message types documentation - Add wisp-sprite cross-compilation instructions - Update integration test references 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace the custom NDJSON file-based storage implementation in internal/stream/filestore.go with a wrapper around the durable-streams FileStore package. This brings several benefits: - Uses battle-tested durable-streams file storage with proper metadata tracking via bbolt - Adds long-poll notification for efficient subscriber updates - Maintains the same public API (Append, Read, Subscribe, LastSeq, Close, Path) - Idempotent Close() operation prevents double-close panics The implementation stores events in a .stream-data subdirectory using durable-streams' internal segment format, while maintaining the same sequence-numbered Event struct interface for consumers. Test updates: - Remove tests that relied on direct file system access to internal storage format (durable-streams manages its own format) - Update persistence test to verify through API rather than reading raw files - Update directory creation test to verify data is stored 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Changed Event struct from {Seq, Type, Timestamp, Data} to
{Seq, Type, Key, Value, Headers} per State Protocol spec.
Key changes:
- Headers now contain Operation (insert/update/delete), TxID, and Timestamp
- NewEvent takes 3 args (msgType, key, value) instead of 2
- Separated InputRequest and InputResponse into distinct event types
- InputResponse is now its own event type, not a command type
- Added typed event creators: NewSessionEvent, NewTaskEvent, etc.
- Updated all consumers: spriteloop, server, tui, loop packages
This aligns the Go types with the frontend @durable-streams/state
schema for seamless sync.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace the custom HTTP protocol for stream communication with the
standard durable-streams HTTP protocol. This change makes the client
compatible with any durable-streams server.
Key changes:
- Connect() now uses HEAD request to check stream existence
- Subscribe() uses GET /{path}?offset=X&live=sse for SSE streaming
- SendCommand/SendInputResponse use POST /{path} to append events
- GetState() reads all events and reconstructs state snapshot
- Track offsets (durable-streams format) alongside synthetic seq numbers
- Parse durable-streams SSE format (event: data/control events)
The client maintains backward compatibility with consumers (TUI/server)
by keeping the same interface, while internally using durable-streams
protocol for communication.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace the 100ms polling loop in handleNeedsInput() with event-driven waiting using the State Protocol bidirectional sync pattern. Changes: - Add TxID field to InputResponse for transaction confirmation - Add await.go with AwaitInputResponse and InputResponseWatcher helpers - Update handleNeedsInput to use stream-based event watching - Remove in-memory pendingInputs map from CommandProcessor - Update server.go to use StreamManager for input state tracking - Add IsInputResponded, GetInputResponse methods to StreamManager - Update HandleInputResponse to return bool (first-response-wins) The new approach: 1. Input requests are published as durable events to the stream 2. Clients append input_response events to the stream 3. The loop watches for matching input_response events 4. Transaction confirmation via TxID header pattern This eliminates the 100ms polling interval and provides: - Event-driven responsiveness - Durable state that survives disconnections - First-response-wins conflict resolution via stream 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace the custom SSE streaming implementation in spriteloop/server.go with a durable-streams compliant HTTP server: - Unify endpoints under /wisp/events path following durable-streams protocol - GET: Read events with offset, live=sse, and live=long-poll modes - POST: Append events (commands, input responses) to stream - HEAD: Return stream metadata with current offset - Remove deprecated /command, /state, /stream endpoints - Add StreamPath configuration option for customizable endpoint The server now implements the standard durable-streams protocol: - Offset format: "readseq_byteoffset" (e.g., "42_42") - Stream-Next-Offset and Stream-Up-To-Date response headers - SSE events with "data" and "control" event types - JSON array format for batch event responses 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Create internal/cli/sprite.go with parameterized SetupSpriteWithConfig() function that handles both start and resume Sprite setup modes. This extracts ~80% duplicated code from start.go and resume.go. Changes: - Add SpriteSetupMode enum (Start/Resume) for mode-specific behavior - Add SpriteSetupConfig struct with all setup parameters - Move HandleServerPassword to sprite.go (shared between start/resume) - Move checkoutBranch helper to sprite.go - Update start.go, resume.go, review.go, update.go to use new shared code - Update tests to use HandleServerPassword instead of handleResumeServerPassword Net reduction of ~68 lines while consolidating duplicated logic. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace wildcard CORS origin with a secure configurable approach: - Add CORSOrigins field to ServerConfig and server.Config - Default to localhost:3000/5173 and 127.0.0.1:3000/5173 for dev - Implement origin validation with fast map lookup - Add CORS preflight (OPTIONS) request handling - Reject cross-origin requests from non-allowed origins - Add withCORS middleware for consistent CORS handling - Include comprehensive tests for CORS behavior Production deployments can configure allowed origins via config. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add sliding window rate limiting to the authentication endpoint to prevent brute force attacks: - 5 requests per minute per IP (configurable) - Exponential backoff blocking after 10 failed attempts - Block duration doubles with each consecutive block (capped at 24h) - X-Forwarded-For support for reverse proxy scenarios - Logged rate-limited requests for security monitoring - Retry-After header in 429 responses The rate limiter tracks both rate limits (sliding window) and failure counts separately. Successful authentication resets the failure counter. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add internal/logging package with structured logging support: - Logger with debug/info/warn/error levels - Context fields via With() and WithFields() methods - Key-value logging for structured output - Default minimum level set to warn Update error handling throughout the codebase: - spriteloop/loop.go: Log publish function failures with context - server/streams.go: Log relay loop and event handler errors - loop/loop.go: Log TUI action and broadcast failures - tui/stream.go: Log action handling failures - CLI commands (done, stop, sprite, resume, review, update, start, abandon): Add structured logging alongside user-facing warnings The logging adds visibility into non-fatal errors that were previously silently swallowed, improving debugging and monitoring capabilities while maintaining user-facing fmt.Printf warnings for interactive use. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR implements a durable stream architecture that moves the Claude iteration loop from the developer's laptop to the Sprite VM. The core problem: if the laptop disconnects or sleeps, Claude stops iterating despite the Sprite VM remaining alive.
Solution: A new wisp-sprite binary runs directly on the Sprite, executing Claude iterations locally and persisting all events to a FileStore. TUI and web clients become equivalent stream subscribers that can disconnect and reconnect without losing output.
Architecture
Key Changes
Session Lifecycle
Tasks
This PR implements the following tasks:
🤖 Generated with wisp