From 32ec8cadeb039fca8430520bed8f45b9f4f3f1b0 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Tue, 24 Feb 2026 22:21:29 +0800 Subject: [PATCH 01/10] feat(memory): define Store interface for session persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a backend-agnostic Store interface in pkg/memory/ that maps one-to-one with the current SessionManager API. Each method is atomic — no separate Save() call needed. Refs #711 --- pkg/memory/store.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 pkg/memory/store.go diff --git a/pkg/memory/store.go b/pkg/memory/store.go new file mode 100644 index 000000000..6887ec26e --- /dev/null +++ b/pkg/memory/store.go @@ -0,0 +1,38 @@ +package memory + +import ( + "context" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +// Store defines an interface for persistent session storage. +// Each method is an atomic operation — there is no separate Save() call. +type Store interface { + // AddMessage appends a simple text message to a session. + AddMessage(ctx context.Context, sessionKey, role, content string) error + + // AddFullMessage appends a complete message (with tool calls, etc.) to a session. + AddFullMessage(ctx context.Context, sessionKey string, msg providers.Message) error + + // GetHistory returns all messages for a session in insertion order. + // Returns an empty slice (not nil) if the session does not exist. + GetHistory(ctx context.Context, sessionKey string) ([]providers.Message, error) + + // GetSummary returns the conversation summary for a session. + // Returns an empty string if no summary exists. + GetSummary(ctx context.Context, sessionKey string) (string, error) + + // SetSummary updates the conversation summary for a session. + SetSummary(ctx context.Context, sessionKey, summary string) error + + // TruncateHistory removes all but the last keepLast messages from a session. + // If keepLast <= 0, all messages are removed. + TruncateHistory(ctx context.Context, sessionKey string, keepLast int) error + + // SetHistory replaces all messages in a session with the provided history. + SetHistory(ctx context.Context, sessionKey string, history []providers.Message) error + + // Close releases any resources held by the store. + Close() error +} From 9f36e50807093181d13619b2f21d988f98553276 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Tue, 24 Feb 2026 22:21:42 +0800 Subject: [PATCH 02/10] feat(memory): implement append-only JSONL session store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add JSONLStore that persists sessions as .jsonl files (one message per line) plus .meta.json for summary and truncation offset. Key design decisions: - Append-only writes — no full-file rewrites on AddMessage - Logical truncation via skip offset instead of physical deletion - Per-session mutex for safe concurrent access - Crash recovery: malformed trailing lines are silently skipped - Atomic metadata writes using temp+rename Zero new dependencies — pure stdlib. Refs #711 --- pkg/memory/jsonl.go | 386 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 386 insertions(+) create mode 100644 pkg/memory/jsonl.go diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go new file mode 100644 index 000000000..266f453d9 --- /dev/null +++ b/pkg/memory/jsonl.go @@ -0,0 +1,386 @@ +package memory + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +// sessionMeta holds per-session metadata stored in a .meta.json file. +type sessionMeta struct { + Key string `json:"key"` + Summary string `json:"summary"` + Skip int `json:"skip"` + Count int `json:"count"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// JSONLStore implements Store using append-only JSONL files. +// +// Each session is stored as two files: +// +// {sanitized_key}.jsonl — one JSON-encoded message per line, append-only +// {sanitized_key}.meta.json — session metadata (summary, logical truncation offset) +// +// Messages are never physically deleted from the JSONL file. Instead, +// TruncateHistory records a "skip" offset in the metadata file and +// GetHistory ignores lines before that offset. This keeps all writes +// append-only, which is both fast and crash-safe. +type JSONLStore struct { + dir string + + mu sync.Mutex + locks map[string]*sync.Mutex +} + +// NewJSONLStore creates a new JSONL-backed store rooted at dir. +func NewJSONLStore(dir string) (*JSONLStore, error) { + err := os.MkdirAll(dir, 0o755) + if err != nil { + return nil, fmt.Errorf("memory: create directory: %w", err) + } + return &JSONLStore{ + dir: dir, + locks: make(map[string]*sync.Mutex), + }, nil +} + +// sessionLock returns (or creates) a per-session mutex. +func (s *JSONLStore) sessionLock(key string) *sync.Mutex { + s.mu.Lock() + defer s.mu.Unlock() + + l, ok := s.locks[key] + if !ok { + l = &sync.Mutex{} + s.locks[key] = l + } + return l +} + +func (s *JSONLStore) jsonlPath(key string) string { + return filepath.Join(s.dir, sanitizeKey(key)+".jsonl") +} + +func (s *JSONLStore) metaPath(key string) string { + return filepath.Join(s.dir, sanitizeKey(key)+".meta.json") +} + +// sanitizeKey converts a session key to a safe filename component. +// Mirrors pkg/session.sanitizeFilename so that migration paths match. +func sanitizeKey(key string) string { + return strings.ReplaceAll(key, ":", "_") +} + +// readMeta loads the metadata file for a session. +// Returns a zero-value sessionMeta if the file does not exist. +func (s *JSONLStore) readMeta(key string) (sessionMeta, error) { + data, err := os.ReadFile(s.metaPath(key)) + if os.IsNotExist(err) { + return sessionMeta{Key: key}, nil + } + if err != nil { + return sessionMeta{}, fmt.Errorf("memory: read meta: %w", err) + } + var meta sessionMeta + err = json.Unmarshal(data, &meta) + if err != nil { + return sessionMeta{}, fmt.Errorf("memory: decode meta: %w", err) + } + return meta, nil +} + +// writeMeta atomically writes the metadata file (temp + rename). +func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error { + data, err := json.MarshalIndent(meta, "", " ") + if err != nil { + return fmt.Errorf("memory: encode meta: %w", err) + } + + target := s.metaPath(key) + tmp := target + ".tmp" + + err = os.WriteFile(tmp, data, 0o644) + if err != nil { + return fmt.Errorf("memory: write meta tmp: %w", err) + } + + err = os.Rename(tmp, target) + if err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("memory: rename meta: %w", err) + } + return nil +} + +// readMessages reads all valid JSON lines from a .jsonl file. +// Malformed trailing lines (e.g. from a crash) are silently skipped. +func readMessages(path string) ([]providers.Message, error) { + f, err := os.Open(path) + if os.IsNotExist(err) { + return []providers.Message{}, nil + } + if err != nil { + return nil, fmt.Errorf("memory: open jsonl: %w", err) + } + defer f.Close() + + var msgs []providers.Message + scanner := bufio.NewScanner(f) + // Allow up to 1 MB per line for messages with large content. + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var msg providers.Message + if json.Unmarshal(line, &msg) != nil { + // Corrupt line — likely a partial write from a crash. + // Skip it; this is the standard JSONL recovery pattern. + continue + } + msgs = append(msgs, msg) + } + if scanner.Err() != nil { + return nil, fmt.Errorf("memory: scan jsonl: %w", scanner.Err()) + } + + if msgs == nil { + msgs = []providers.Message{} + } + return msgs, nil +} + +func (s *JSONLStore) AddMessage( + _ context.Context, sessionKey, role, content string, +) error { + return s.addMsg(sessionKey, providers.Message{ + Role: role, + Content: content, + }) +} + +func (s *JSONLStore) AddFullMessage( + _ context.Context, sessionKey string, msg providers.Message, +) error { + return s.addMsg(sessionKey, msg) +} + +// addMsg is the shared implementation for AddMessage and AddFullMessage. +func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + // Append the message as a single JSON line. + line, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("memory: marshal message: %w", err) + } + line = append(line, '\n') + + f, err := os.OpenFile( + s.jsonlPath(sessionKey), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0o644, + ) + if err != nil { + return fmt.Errorf("memory: open jsonl for append: %w", err) + } + _, writeErr := f.Write(line) + closeErr := f.Close() + if writeErr != nil { + return fmt.Errorf("memory: append message: %w", writeErr) + } + if closeErr != nil { + return fmt.Errorf("memory: close jsonl: %w", closeErr) + } + + // Update metadata. + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.Count == 0 && meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Count++ + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) GetHistory( + _ context.Context, sessionKey string, +) ([]providers.Message, error) { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return nil, err + } + + msgs, err := readMessages(s.jsonlPath(sessionKey)) + if err != nil { + return nil, err + } + + // Apply logical truncation: skip the first meta.Skip messages. + if meta.Skip > 0 && meta.Skip < len(msgs) { + msgs = msgs[meta.Skip:] + } else if meta.Skip >= len(msgs) { + msgs = []providers.Message{} + } + + return msgs, nil +} + +func (s *JSONLStore) GetSummary( + _ context.Context, sessionKey string, +) (string, error) { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return "", err + } + return meta.Summary, nil +} + +func (s *JSONLStore) SetSummary( + _ context.Context, sessionKey, summary string, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Summary = summary + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) TruncateHistory( + _ context.Context, sessionKey string, keepLast int, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + + // If the meta count might be stale (e.g. after a crash during + // addMsg), reconcile with the actual line count on disk. + if meta.Count == 0 { + msgs, readErr := readMessages(s.jsonlPath(sessionKey)) + if readErr != nil { + return readErr + } + meta.Count = len(msgs) + } + + if keepLast <= 0 { + meta.Skip = meta.Count + } else { + effective := meta.Count - meta.Skip + if keepLast < effective { + meta.Skip = meta.Count - keepLast + } + } + meta.UpdatedAt = time.Now() + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) SetHistory( + _ context.Context, + sessionKey string, + history []providers.Message, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + // Rewrite the JSONL file atomically (temp + rename). + target := s.jsonlPath(sessionKey) + tmp := target + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return fmt.Errorf("memory: create jsonl tmp: %w", err) + } + + for i, msg := range history { + line, marshalErr := json.Marshal(msg) + if marshalErr != nil { + f.Close() + _ = os.Remove(tmp) + return fmt.Errorf("memory: marshal message %d: %w", i, marshalErr) + } + line = append(line, '\n') + _, writeErr := f.Write(line) + if writeErr != nil { + f.Close() + _ = os.Remove(tmp) + return fmt.Errorf("memory: write message %d: %w", i, writeErr) + } + } + + err = f.Close() + if err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("memory: close jsonl tmp: %w", err) + } + + err = os.Rename(tmp, target) + if err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("memory: rename jsonl: %w", err) + } + + // Reset metadata: skip=0, count=len(history). + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Skip = 0 + meta.Count = len(history) + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) Close() error { + return nil +} From 529622b7d3d49d068f332a3a1ecef7eee2848bf1 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Tue, 24 Feb 2026 22:22:46 +0800 Subject: [PATCH 03/10] test(memory): add unit, concurrency, and benchmark tests Cover all Store interface methods plus edge cases: - Basic roundtrip, ordering, empty session, tool calls - Logical truncation (keep last N, keep zero, keep more than exist) - SetHistory replacing all + resetting skip offset - Crash recovery with partial JSON lines - Persistence across store instances - Concurrent add+read (10 goroutines x 20 msgs) - Simulated #704 race (summarizer vs main loop) - Benchmarks for AddMessage and GetHistory (100/1000 msgs) --- pkg/memory/jsonl_test.go | 663 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 663 insertions(+) create mode 100644 pkg/memory/jsonl_test.go diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go new file mode 100644 index 000000000..57675504d --- /dev/null +++ b/pkg/memory/jsonl_test.go @@ -0,0 +1,663 @@ +package memory + +import ( + "context" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +func newTestStore(t *testing.T) *JSONLStore { + t.Helper() + store, err := NewJSONLStore(t.TempDir()) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + return store +} + +func TestNewJSONLStore_CreatesDirectory(t *testing.T) { + dir := filepath.Join(t.TempDir(), "nested", "sessions") + store, err := NewJSONLStore(dir) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + + info, err := os.Stat(dir) + if err != nil { + t.Fatalf("Stat: %v", err) + } + if !info.IsDir() { + t.Errorf("expected directory, got file") + } +} + +func TestAddMessage_BasicRoundtrip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + err := store.AddMessage(ctx, "s1", "user", "hello") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + err = store.AddMessage(ctx, "s1", "assistant", "hi there") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "s1") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 messages, got %d", len(history)) + } + if history[0].Role != "user" || history[0].Content != "hello" { + t.Errorf("msg[0] = %+v", history[0]) + } + if history[1].Role != "assistant" || history[1].Content != "hi there" { + t.Errorf("msg[1] = %+v", history[1]) + } +} + +func TestAddMessage_AutoCreatesSession(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Adding a message to a non-existent session should work. + err := store.AddMessage(ctx, "new-session", "user", "first message") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "new-session") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1 message, got %d", len(history)) + } +} + +func TestAddFullMessage_WithToolCalls(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + msg := providers.Message{ + Role: "assistant", + Content: "Let me search that.", + ToolCalls: []providers.ToolCall{ + { + ID: "call_abc", + Type: "function", + Function: &providers.FunctionCall{ + Name: "web_search", + Arguments: `{"q":"golang jsonl"}`, + }, + }, + }, + } + + err := store.AddFullMessage(ctx, "tc", msg) + if err != nil { + t.Fatalf("AddFullMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "tc") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + if len(history[0].ToolCalls) != 1 { + t.Fatalf("expected 1 tool call, got %d", len(history[0].ToolCalls)) + } + tc := history[0].ToolCalls[0] + if tc.ID != "call_abc" { + t.Errorf("tool call ID = %q", tc.ID) + } + if tc.Function == nil || tc.Function.Name != "web_search" { + t.Errorf("tool call function = %+v", tc.Function) + } +} + +func TestAddFullMessage_ToolCallID(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + msg := providers.Message{ + Role: "tool", + Content: "search results here", + ToolCallID: "call_abc", + } + + err := store.AddFullMessage(ctx, "tr", msg) + if err != nil { + t.Fatalf("AddFullMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "tr") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + if history[0].ToolCallID != "call_abc" { + t.Errorf("ToolCallID = %q", history[0].ToolCallID) + } +} + +func TestGetHistory_EmptySession(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + history, err := store.GetHistory(ctx, "nonexistent") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if history == nil { + t.Fatal("expected non-nil empty slice") + } + if len(history) != 0 { + t.Errorf("expected 0 messages, got %d", len(history)) + } +} + +func TestGetHistory_Ordering(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage( + ctx, "order", + "user", + string(rune('a'+i)), + ) + if err != nil { + t.Fatalf("AddMessage(%d): %v", i, err) + } + } + + history, err := store.GetHistory(ctx, "order") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 5 { + t.Fatalf("expected 5, got %d", len(history)) + } + for i := 0; i < 5; i++ { + expected := string(rune('a' + i)) + if history[i].Content != expected { + t.Errorf("msg[%d].Content = %q, want %q", i, history[i].Content, expected) + } + } +} + +func TestSetSummary_GetSummary(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // No summary yet. + summary, err := store.GetSummary(ctx, "s1") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "" { + t.Errorf("expected empty, got %q", summary) + } + + // Set a summary. + err = store.SetSummary(ctx, "s1", "talked about Go") + if err != nil { + t.Fatalf("SetSummary: %v", err) + } + + summary, err = store.GetSummary(ctx, "s1") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "talked about Go" { + t.Errorf("summary = %q", summary) + } + + // Update summary. + err = store.SetSummary(ctx, "s1", "updated summary") + if err != nil { + t.Fatalf("SetSummary: %v", err) + } + + summary, err = store.GetSummary(ctx, "s1") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "updated summary" { + t.Errorf("summary = %q", summary) + } +} + +func TestTruncateHistory_KeepLast(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 10; i++ { + err := store.AddMessage( + ctx, "trunc", + "user", + string(rune('a'+i)), + ) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "trunc", 4) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "trunc") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 4 { + t.Fatalf("expected 4, got %d", len(history)) + } + // Should be the last 4: g, h, i, j + if history[0].Content != "g" { + t.Errorf("first kept = %q, want 'g'", history[0].Content) + } + if history[3].Content != "j" { + t.Errorf("last kept = %q, want 'j'", history[3].Content) + } +} + +func TestTruncateHistory_KeepZero(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "empty", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "empty", 0) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "empty") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 0 { + t.Errorf("expected 0, got %d", len(history)) + } +} + +func TestTruncateHistory_KeepMoreThanExists(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 3; i++ { + err := store.AddMessage(ctx, "few", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Keep 100, but only 3 exist — should keep all. + err := store.TruncateHistory(ctx, "few", 100) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "few") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Errorf("expected 3, got %d", len(history)) + } +} + +func TestSetHistory_ReplacesAll(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Add some initial messages. + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "replace", "user", "old") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Replace with new history. + newHistory := []providers.Message{ + {Role: "user", Content: "new1"}, + {Role: "assistant", Content: "new2"}, + } + err := store.SetHistory(ctx, "replace", newHistory) + if err != nil { + t.Fatalf("SetHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "replace") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2, got %d", len(history)) + } + if history[0].Content != "new1" || history[1].Content != "new2" { + t.Errorf("history = %+v", history) + } +} + +func TestSetHistory_ResetsSkip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Add messages and truncate. + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "skip-reset", "user", "old") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + err := store.TruncateHistory(ctx, "skip-reset", 3) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + // SetHistory should reset skip to 0. + newHistory := []providers.Message{ + {Role: "user", Content: "fresh"}, + } + err = store.SetHistory(ctx, "skip-reset", newHistory) + if err != nil { + t.Fatalf("SetHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "skip-reset") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + if history[0].Content != "fresh" { + t.Errorf("content = %q", history[0].Content) + } +} + +func TestColonInKey(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + err := store.AddMessage(ctx, "telegram:123", "user", "hi") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "telegram:123") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + + // Verify the file is named with underscore. + jsonlFile := filepath.Join(store.dir, "telegram_123.jsonl") + if _, statErr := os.Stat(jsonlFile); statErr != nil { + t.Errorf("expected file %s to exist: %v", jsonlFile, statErr) + } +} + +func TestCrashRecovery_PartialLine(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Write a valid message first. + err := store.AddMessage(ctx, "crash", "user", "valid") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + // Simulate a crash by appending a partial JSON line directly. + jsonlPath := store.jsonlPath("crash") + f, err := os.OpenFile(jsonlPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + t.Fatalf("open for append: %v", err) + } + _, err = f.WriteString(`{"role":"user","content":"incomple`) + if err != nil { + t.Fatalf("write partial: %v", err) + } + f.Close() + + // GetHistory should return only the valid message. + history, err := store.GetHistory(ctx, "crash") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1 valid message, got %d", len(history)) + } + if history[0].Content != "valid" { + t.Errorf("content = %q", history[0].Content) + } +} + +func TestPersistence_AcrossInstances(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + // Write with first instance. + store1, err := NewJSONLStore(dir) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + err = store1.AddMessage(ctx, "persist", "user", "remember me") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + err = store1.SetSummary(ctx, "persist", "a test session") + if err != nil { + t.Fatalf("SetSummary: %v", err) + } + store1.Close() + + // Read with second instance. + store2, err := NewJSONLStore(dir) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + defer store2.Close() + + history, err := store2.GetHistory(ctx, "persist") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 || history[0].Content != "remember me" { + t.Errorf("history = %+v", history) + } + + summary, err := store2.GetSummary(ctx, "persist") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "a test session" { + t.Errorf("summary = %q", summary) + } +} + +func TestConcurrent_AddAndRead(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + var wg sync.WaitGroup + const goroutines = 10 + const msgsPerGoroutine = 20 + + // Concurrent writes. + for g := 0; g < goroutines; g++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < msgsPerGoroutine; i++ { + _ = store.AddMessage(ctx, "concurrent", "user", "msg") + } + }() + } + wg.Wait() + + history, err := store.GetHistory(ctx, "concurrent") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + expected := goroutines * msgsPerGoroutine + if len(history) != expected { + t.Errorf("expected %d messages, got %d", expected, len(history)) + } +} + +func TestConcurrent_SummarizeRace(t *testing.T) { + // Simulates the #704 race: one goroutine adds messages while + // another truncates + sets summary — like summarizeSession(). + store := newTestStore(t) + ctx := context.Background() + + // Seed with some messages. + for i := 0; i < 20; i++ { + err := store.AddMessage(ctx, "race", "user", "seed") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + var wg sync.WaitGroup + + // Writer goroutine (main agent loop). + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + _ = store.AddMessage(ctx, "race", "user", "new") + } + }() + + // Summarizer goroutine (background task). + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + _ = store.SetSummary(ctx, "race", "summary") + _ = store.TruncateHistory(ctx, "race", 5) + } + }() + + wg.Wait() + + // Verify the store is still in a consistent state. + _, err := store.GetHistory(ctx, "race") + if err != nil { + t.Fatalf("GetHistory after race: %v", err) + } + _, err = store.GetSummary(ctx, "race") + if err != nil { + t.Fatalf("GetSummary after race: %v", err) + } +} + +func TestMultipleSessions_Isolation(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + err := store.AddMessage(ctx, "s1", "user", "msg for s1") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + err = store.AddMessage(ctx, "s2", "user", "msg for s2") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + h1, err := store.GetHistory(ctx, "s1") + if err != nil { + t.Fatalf("GetHistory s1: %v", err) + } + h2, err := store.GetHistory(ctx, "s2") + if err != nil { + t.Fatalf("GetHistory s2: %v", err) + } + + if len(h1) != 1 || h1[0].Content != "msg for s1" { + t.Errorf("s1 history = %+v", h1) + } + if len(h2) != 1 || h2[0].Content != "msg for s2" { + t.Errorf("s2 history = %+v", h2) + } +} + +func BenchmarkAddMessage(b *testing.B) { + dir := b.TempDir() + store, err := NewJSONLStore(dir) + if err != nil { + b.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + ctx := context.Background() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = store.AddMessage(ctx, "bench", "user", "benchmark message content") + } +} + +func BenchmarkGetHistory_100(b *testing.B) { + dir := b.TempDir() + store, err := NewJSONLStore(dir) + if err != nil { + b.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + ctx := context.Background() + + for i := 0; i < 100; i++ { + _ = store.AddMessage(ctx, "bench", "user", "message content") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = store.GetHistory(ctx, "bench") + } +} + +func BenchmarkGetHistory_1000(b *testing.B) { + dir := b.TempDir() + store, err := NewJSONLStore(dir) + if err != nil { + b.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + ctx := context.Background() + + for i := 0; i < 1000; i++ { + _ = store.AddMessage(ctx, "bench", "user", "message content") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = store.GetHistory(ctx, "bench") + } +} From 903681207ba3b423ee04d6a7056738dacbf75f08 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Tue, 24 Feb 2026 22:22:58 +0800 Subject: [PATCH 04/10] feat(memory): support migration from legacy JSON sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Read existing sessions/*.json files, convert to JSONL format, and rename originals to .json.migrated as backup. The migration is idempotent — second runs skip already-migrated files. Session keys are read from JSON content (not filenames) so that sanitized names like telegram_123 correctly map back to telegram:123. --- pkg/memory/migration.go | 107 ++++++++++++ pkg/memory/migration_test.go | 328 +++++++++++++++++++++++++++++++++++ 2 files changed, 435 insertions(+) create mode 100644 pkg/memory/migration.go create mode 100644 pkg/memory/migration_test.go diff --git a/pkg/memory/migration.go b/pkg/memory/migration.go new file mode 100644 index 000000000..5b2f69ab3 --- /dev/null +++ b/pkg/memory/migration.go @@ -0,0 +1,107 @@ +package memory + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +// jsonSession mirrors pkg/session.Session for migration purposes. +type jsonSession struct { + Key string `json:"key"` + Messages []providers.Message `json:"messages"` + Summary string `json:"summary,omitempty"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` +} + +// MigrateFromJSON reads legacy sessions/*.json files from sessionsDir, +// writes them into the Store, and renames each migrated file to +// .json.migrated as a backup. Returns the number of sessions migrated. +// +// Files that fail to parse are logged and skipped. Already-migrated +// files (.json.migrated) are ignored, making the function idempotent. +func MigrateFromJSON( + ctx context.Context, sessionsDir string, store Store, +) (int, error) { + entries, err := os.ReadDir(sessionsDir) + if os.IsNotExist(err) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("memory: read sessions dir: %w", err) + } + + migrated := 0 + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".json") { + continue + } + // Skip already-migrated files. + if strings.HasSuffix(name, ".migrated") { + continue + } + + srcPath := filepath.Join(sessionsDir, name) + + data, readErr := os.ReadFile(srcPath) + if readErr != nil { + log.Printf("memory: migrate: skip %s: %v", name, readErr) + continue + } + + var sess jsonSession + if parseErr := json.Unmarshal(data, &sess); parseErr != nil { + log.Printf("memory: migrate: skip %s: %v", name, parseErr) + continue + } + + // Use the key from the JSON content, not the filename. + // Filenames are sanitized (":" → "_") but keys are not. + key := sess.Key + if key == "" { + key = strings.TrimSuffix(name, ".json") + } + + for _, msg := range sess.Messages { + addErr := store.AddFullMessage(ctx, key, msg) + if addErr != nil { + return migrated, fmt.Errorf( + "memory: migrate %s: add message: %w", + name, addErr, + ) + } + } + + if sess.Summary != "" { + sumErr := store.SetSummary(ctx, key, sess.Summary) + if sumErr != nil { + return migrated, fmt.Errorf( + "memory: migrate %s: set summary: %w", + name, sumErr, + ) + } + } + + // Rename to .migrated as backup (not delete). + renameErr := os.Rename(srcPath, srcPath+".migrated") + if renameErr != nil { + log.Printf("memory: migrate: rename %s: %v", name, renameErr) + } + + migrated++ + } + + return migrated, nil +} diff --git a/pkg/memory/migration_test.go b/pkg/memory/migration_test.go new file mode 100644 index 000000000..bf16c32f8 --- /dev/null +++ b/pkg/memory/migration_test.go @@ -0,0 +1,328 @@ +package memory + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +func writeJSONSession( + t *testing.T, dir string, filename string, sess jsonSession, +) { + t.Helper() + data, err := json.MarshalIndent(sess, "", " ") + if err != nil { + t.Fatalf("marshal session: %v", err) + } + err = os.WriteFile(filepath.Join(dir, filename), data, 0o644) + if err != nil { + t.Fatalf("write session file: %v", err) + } +} + +func TestMigrateFromJSON_Basic(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "test.json", jsonSession{ + Key: "test", + Messages: []providers.Message{ + {Role: "user", Content: "hello"}, + {Role: "assistant", Content: "hi"}, + }, + Summary: "A greeting.", + Created: time.Now(), + Updated: time.Now(), + }) + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1 migrated, got %d", count) + } + + history, err := store.GetHistory(ctx, "test") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 messages, got %d", len(history)) + } + if history[0].Content != "hello" || history[1].Content != "hi" { + t.Errorf("unexpected messages: %+v", history) + } + + summary, err := store.GetSummary(ctx, "test") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "A greeting." { + t.Errorf("summary = %q", summary) + } +} + +func TestMigrateFromJSON_WithToolCalls(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "tools.json", jsonSession{ + Key: "tools", + Messages: []providers.Message{ + { + Role: "assistant", + Content: "Searching...", + ToolCalls: []providers.ToolCall{ + { + ID: "call_1", + Type: "function", + Function: &providers.FunctionCall{ + Name: "web_search", + Arguments: `{"q":"test"}`, + }, + }, + }, + }, + { + Role: "tool", + Content: "result", + ToolCallID: "call_1", + }, + }, + Created: time.Now(), + Updated: time.Now(), + }) + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1, got %d", count) + } + + history, err := store.GetHistory(ctx, "tools") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 messages, got %d", len(history)) + } + if len(history[0].ToolCalls) != 1 { + t.Fatalf("expected 1 tool call, got %d", len(history[0].ToolCalls)) + } + if history[0].ToolCalls[0].Function.Name != "web_search" { + t.Errorf("function = %q", history[0].ToolCalls[0].Function.Name) + } + if history[1].ToolCallID != "call_1" { + t.Errorf("ToolCallID = %q", history[1].ToolCallID) + } +} + +func TestMigrateFromJSON_MultipleFiles(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 3; i++ { + key := string(rune('a' + i)) + writeJSONSession(t, sessionsDir, key+".json", jsonSession{ + Key: key, + Messages: []providers.Message{{Role: "user", Content: "msg " + key}}, + Created: time.Now(), + Updated: time.Now(), + }) + } + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 3 { + t.Errorf("expected 3, got %d", count) + } + + for i := 0; i < 3; i++ { + key := string(rune('a' + i)) + history, histErr := store.GetHistory(ctx, key) + if histErr != nil { + t.Fatalf("GetHistory(%q): %v", key, histErr) + } + if len(history) != 1 { + t.Errorf("session %q: expected 1 msg, got %d", key, len(history)) + } + } +} + +func TestMigrateFromJSON_InvalidJSON(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + // One valid, one invalid. + writeJSONSession(t, sessionsDir, "good.json", jsonSession{ + Key: "good", + Messages: []providers.Message{{Role: "user", Content: "ok"}}, + Created: time.Now(), + Updated: time.Now(), + }) + err := os.WriteFile( + filepath.Join(sessionsDir, "bad.json"), + []byte("{invalid json"), + 0o644, + ) + if err != nil { + t.Fatalf("write bad file: %v", err) + } + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1 (bad file skipped), got %d", count) + } + + history, err := store.GetHistory(ctx, "good") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Errorf("expected 1 message, got %d", len(history)) + } +} + +func TestMigrateFromJSON_RenamesFiles(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "rename.json", jsonSession{ + Key: "rename", + Messages: []providers.Message{{Role: "user", Content: "hi"}}, + Created: time.Now(), + Updated: time.Now(), + }) + + _, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + + // Original .json should not exist. + _, statErr := os.Stat(filepath.Join(sessionsDir, "rename.json")) + if !os.IsNotExist(statErr) { + t.Error("rename.json should have been renamed") + } + // .json.migrated should exist. + _, statErr = os.Stat( + filepath.Join(sessionsDir, "rename.json.migrated"), + ) + if statErr != nil { + t.Errorf("rename.json.migrated should exist: %v", statErr) + } +} + +func TestMigrateFromJSON_Idempotent(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "idem.json", jsonSession{ + Key: "idem", + Messages: []providers.Message{{Role: "user", Content: "once"}}, + Created: time.Now(), + Updated: time.Now(), + }) + + count1, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("first migration: %v", err) + } + if count1 != 1 { + t.Errorf("first run: expected 1, got %d", count1) + } + + // Second run should find only .migrated files, skip them. + count2, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("second migration: %v", err) + } + if count2 != 0 { + t.Errorf("second run: expected 0, got %d", count2) + } + + history, err := store.GetHistory(ctx, "idem") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Errorf("expected 1 message, got %d", len(history)) + } +} + +func TestMigrateFromJSON_ColonInKey(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + // File is named telegram_123 (sanitized), but the key inside is telegram:123. + writeJSONSession(t, sessionsDir, "telegram_123.json", jsonSession{ + Key: "telegram:123", + Messages: []providers.Message{{Role: "user", Content: "from telegram"}}, + Created: time.Now(), + Updated: time.Now(), + }) + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1, got %d", count) + } + + // Accessible via the original key "telegram:123". + history, err := store.GetHistory(ctx, "telegram:123") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1 message, got %d", len(history)) + } + if history[0].Content != "from telegram" { + t.Errorf("content = %q", history[0].Content) + } + + // In the file-based store, "telegram:123" and "telegram_123" both + // sanitize to the same filename, so they share storage. This is + // expected — the colon-to-underscore mapping is a one-way function. + history2, err := store.GetHistory(ctx, "telegram_123") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history2) != 1 { + t.Errorf("expected 1 (same file), got %d", len(history2)) + } +} + +func TestMigrateFromJSON_NonexistentDir(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + count, err := MigrateFromJSON(ctx, "/nonexistent/path", store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 0 { + t.Errorf("expected 0, got %d", count) + } +} From b464687e2fc0578eb23f3e9be4aa20849d5bcaa4 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 08:42:35 +0800 Subject: [PATCH 05/10] feat(memory): add Compact method for physical JSONL compaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address file growth concern from #711 review: logical truncation via skip offset is fast but leaves dead lines on disk indefinitely. Compact() rewrites the JSONL file keeping only active messages, using the same temp+rename pattern for crash safety. No-op when skip == 0. The caller (lifecycle manager or agent loop) decides when to trigger compaction — e.g. when skipped lines exceed active lines. --- pkg/memory/jsonl.go | 87 ++++++++++++++++++++++------ pkg/memory/jsonl_test.go | 121 +++++++++++++++++++++++++++++++++++++++ pkg/memory/store.go | 4 ++ 3 files changed, 195 insertions(+), 17 deletions(-) diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 266f453d9..be71396ca 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -328,7 +328,74 @@ func (s *JSONLStore) SetHistory( l.Lock() defer l.Unlock() - // Rewrite the JSONL file atomically (temp + rename). + err := s.rewriteJSONL(sessionKey, history) + if err != nil { + return err + } + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Skip = 0 + meta.Count = len(history) + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +// Compact physically rewrites the JSONL file, dropping all logically +// skipped lines. This reclaims disk space that accumulates after +// repeated TruncateHistory calls. +// +// It is safe to call at any time; if there is nothing to compact +// (skip == 0) the method returns immediately. +func (s *JSONLStore) Compact( + _ context.Context, sessionKey string, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + if meta.Skip == 0 { + return nil + } + + all, err := readMessages(s.jsonlPath(sessionKey)) + if err != nil { + return err + } + + // Keep only the active (non-skipped) messages. + var active []providers.Message + if meta.Skip < len(all) { + active = all[meta.Skip:] + } + + err = s.rewriteJSONL(sessionKey, active) + if err != nil { + return err + } + + meta.Skip = 0 + meta.Count = len(active) + meta.UpdatedAt = time.Now() + + return s.writeMeta(sessionKey, meta) +} + +// rewriteJSONL atomically replaces the JSONL file with the given messages. +func (s *JSONLStore) rewriteJSONL( + sessionKey string, msgs []providers.Message, +) error { target := s.jsonlPath(sessionKey) tmp := target + ".tmp" @@ -337,7 +404,7 @@ func (s *JSONLStore) SetHistory( return fmt.Errorf("memory: create jsonl tmp: %w", err) } - for i, msg := range history { + for i, msg := range msgs { line, marshalErr := json.Marshal(msg) if marshalErr != nil { f.Close() @@ -364,21 +431,7 @@ func (s *JSONLStore) SetHistory( _ = os.Remove(tmp) return fmt.Errorf("memory: rename jsonl: %w", err) } - - // Reset metadata: skip=0, count=len(history). - meta, err := s.readMeta(sessionKey) - if err != nil { - return err - } - now := time.Now() - if meta.CreatedAt.IsZero() { - meta.CreatedAt = now - } - meta.Skip = 0 - meta.Count = len(history) - meta.UpdatedAt = now - - return s.writeMeta(sessionKey, meta) + return nil } func (s *JSONLStore) Close() error { diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index 57675504d..e3b53bfde 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -423,6 +423,127 @@ func TestColonInKey(t *testing.T) { } } +func TestCompact_RemovesSkippedMessages(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Write 10 messages, then truncate to keep last 3. + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "compact", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + err := store.TruncateHistory(ctx, "compact", 3) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + // Before compact: file still has 10 lines. + allOnDisk, err := readMessages(store.jsonlPath("compact")) + if err != nil { + t.Fatalf("readMessages: %v", err) + } + if len(allOnDisk) != 10 { + t.Fatalf("before compact: expected 10 on disk, got %d", len(allOnDisk)) + } + + // Compact. + err = store.Compact(ctx, "compact") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + // After compact: file should have only 3 lines. + allOnDisk, err = readMessages(store.jsonlPath("compact")) + if err != nil { + t.Fatalf("readMessages: %v", err) + } + if len(allOnDisk) != 3 { + t.Fatalf("after compact: expected 3 on disk, got %d", len(allOnDisk)) + } + + // GetHistory should still return the same 3 messages. + history, err := store.GetHistory(ctx, "compact") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("expected 3, got %d", len(history)) + } + if history[0].Content != "h" || history[2].Content != "j" { + t.Errorf("wrong content: %+v", history) + } +} + +func TestCompact_NoOpWhenNoSkip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "noop", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Compact without prior truncation — should be a no-op. + err := store.Compact(ctx, "noop") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + history, err := store.GetHistory(ctx, "noop") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 5 { + t.Errorf("expected 5, got %d", len(history)) + } +} + +func TestCompact_ThenAppend(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 8; i++ { + err := store.AddMessage(ctx, "cap", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "cap", 2) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + err = store.Compact(ctx, "cap") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + // Append after compaction should work correctly. + err = store.AddMessage(ctx, "cap", "user", "new") + if err != nil { + t.Fatalf("AddMessage after compact: %v", err) + } + + history, err := store.GetHistory(ctx, "cap") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("expected 3, got %d", len(history)) + } + // g, h (kept from truncation), new (appended after compaction). + if history[0].Content != "g" { + t.Errorf("first = %q, want 'g'", history[0].Content) + } + if history[2].Content != "new" { + t.Errorf("last = %q, want 'new'", history[2].Content) + } +} + func TestCrashRecovery_PartialLine(t *testing.T) { store := newTestStore(t) ctx := context.Background() diff --git a/pkg/memory/store.go b/pkg/memory/store.go index 6887ec26e..b6e11707d 100644 --- a/pkg/memory/store.go +++ b/pkg/memory/store.go @@ -33,6 +33,10 @@ type Store interface { // SetHistory replaces all messages in a session with the provided history. SetHistory(ctx context.Context, sessionKey string, history []providers.Message) error + // Compact reclaims storage by physically removing logically truncated + // data. Backends that do not accumulate dead data may return nil. + Compact(ctx context.Context, sessionKey string) error + // Close releases any resources held by the store. Close() error } From 5d73ee2d9a72c27233cbb27a305d128130228944 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 14:31:02 +0800 Subject: [PATCH 06/10] refactor(memory): use sync.Map for session locks and skip-scan in readMessages Address review feedback from @Zhaoyikaiii: - Replace map[string]*sync.Mutex + separate mu with sync.Map.LoadOrStore for simpler, lock-free session lock management. - Add skip parameter to readMessages so callers (GetHistory, Compact) can skip truncated lines without paying the json.Unmarshal cost. - Add countLines helper for TruncateHistory's count reconciliation, avoiding full deserialization when only the line count is needed. --- pkg/memory/jsonl.go | 86 ++++++++++++++++++++++------------------ pkg/memory/jsonl_test.go | 4 +- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index be71396ca..eda9563fe 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -36,10 +36,8 @@ type sessionMeta struct { // GetHistory ignores lines before that offset. This keeps all writes // append-only, which is both fast and crash-safe. type JSONLStore struct { - dir string - - mu sync.Mutex - locks map[string]*sync.Mutex + dir string + locks sync.Map // map[string]*sync.Mutex, one per session } // NewJSONLStore creates a new JSONL-backed store rooted at dir. @@ -48,23 +46,13 @@ func NewJSONLStore(dir string) (*JSONLStore, error) { if err != nil { return nil, fmt.Errorf("memory: create directory: %w", err) } - return &JSONLStore{ - dir: dir, - locks: make(map[string]*sync.Mutex), - }, nil + return &JSONLStore{dir: dir}, nil } // sessionLock returns (or creates) a per-session mutex. func (s *JSONLStore) sessionLock(key string) *sync.Mutex { - s.mu.Lock() - defer s.mu.Unlock() - - l, ok := s.locks[key] - if !ok { - l = &sync.Mutex{} - s.locks[key] = l - } - return l + v, _ := s.locks.LoadOrStore(key, &sync.Mutex{}) + return v.(*sync.Mutex) } func (s *JSONLStore) jsonlPath(key string) string { @@ -122,9 +110,11 @@ func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error { return nil } -// readMessages reads all valid JSON lines from a .jsonl file. +// readMessages reads valid JSON lines from a .jsonl file, skipping +// the first `skip` lines without unmarshaling them. This avoids the +// cost of json.Unmarshal on logically truncated messages. // Malformed trailing lines (e.g. from a crash) are silently skipped. -func readMessages(path string) ([]providers.Message, error) { +func readMessages(path string, skip int) ([]providers.Message, error) { f, err := os.Open(path) if os.IsNotExist(err) { return []providers.Message{}, nil @@ -139,11 +129,16 @@ func readMessages(path string) ([]providers.Message, error) { // Allow up to 1 MB per line for messages with large content. scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + lineNum := 0 for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } + lineNum++ + if lineNum <= skip { + continue + } var msg providers.Message if json.Unmarshal(line, &msg) != nil { // Corrupt line — likely a partial write from a crash. @@ -162,6 +157,30 @@ func readMessages(path string) ([]providers.Message, error) { return msgs, nil } +// countLines counts the total number of non-empty lines in a .jsonl file. +// Used by TruncateHistory to reconcile a stale meta.Count without +// the overhead of unmarshaling every message. +func countLines(path string) (int, error) { + f, err := os.Open(path) + if os.IsNotExist(err) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("memory: open jsonl: %w", err) + } + defer f.Close() + + n := 0 + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + if len(scanner.Bytes()) > 0 { + n++ + } + } + return n, scanner.Err() +} + func (s *JSONLStore) AddMessage( _ context.Context, sessionKey, role, content string, ) error { @@ -234,18 +253,13 @@ func (s *JSONLStore) GetHistory( return nil, err } - msgs, err := readMessages(s.jsonlPath(sessionKey)) + // Pass meta.Skip so readMessages skips those lines without + // unmarshaling them — avoids wasted CPU on truncated messages. + msgs, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) if err != nil { return nil, err } - // Apply logical truncation: skip the first meta.Skip messages. - if meta.Skip > 0 && meta.Skip < len(msgs) { - msgs = msgs[meta.Skip:] - } else if meta.Skip >= len(msgs) { - msgs = []providers.Message{} - } - return msgs, nil } @@ -299,11 +313,11 @@ func (s *JSONLStore) TruncateHistory( // If the meta count might be stale (e.g. after a crash during // addMsg), reconcile with the actual line count on disk. if meta.Count == 0 { - msgs, readErr := readMessages(s.jsonlPath(sessionKey)) - if readErr != nil { - return readErr + n, countErr := countLines(s.jsonlPath(sessionKey)) + if countErr != nil { + return countErr } - meta.Count = len(msgs) + meta.Count = n } if keepLast <= 0 { @@ -369,17 +383,13 @@ func (s *JSONLStore) Compact( return nil } - all, err := readMessages(s.jsonlPath(sessionKey)) + // Read only the active messages, skipping truncated lines + // without unmarshaling them. + active, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) if err != nil { return err } - // Keep only the active (non-skipped) messages. - var active []providers.Message - if meta.Skip < len(all) { - active = all[meta.Skip:] - } - err = s.rewriteJSONL(sessionKey, active) if err != nil { return err diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index e3b53bfde..779cab041 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -440,7 +440,7 @@ func TestCompact_RemovesSkippedMessages(t *testing.T) { } // Before compact: file still has 10 lines. - allOnDisk, err := readMessages(store.jsonlPath("compact")) + allOnDisk, err := readMessages(store.jsonlPath("compact"), 0) if err != nil { t.Fatalf("readMessages: %v", err) } @@ -455,7 +455,7 @@ func TestCompact_RemovesSkippedMessages(t *testing.T) { } // After compact: file should have only 3 lines. - allOnDisk, err = readMessages(store.jsonlPath("compact")) + allOnDisk, err = readMessages(store.jsonlPath("compact"), 0) if err != nil { t.Fatalf("readMessages: %v", err) } From d55e5540af6de070d3b1b9750e825c3054c6a30f Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 15:35:04 +0800 Subject: [PATCH 07/10] fix(memory): bound lock memory and increase scanner buffer Address feedback from @yinwm for long-running daemon use: - Replace sync.Map with a fixed-size sharded lock array (64 mutexes). Keys are mapped via FNV hash, so memory is O(1) regardless of how many sessions are created over the process lifetime. - Increase scanner buffer cap from 1 MB to 10 MB. Tool results (read_file on large files, web search responses) can easily exceed 1 MB. The scanner still starts at 64 KB and only grows as needed. --- pkg/memory/jsonl.go | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index eda9563fe..13f450835 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "hash/fnv" "os" "path/filepath" "strings" @@ -14,6 +15,20 @@ import ( "github.com/sipeed/picoclaw/pkg/providers" ) +const ( + // numLockShards is the fixed number of mutexes used to serialize + // per-session access. Using a sharded array instead of a map keeps + // memory bounded regardless of how many sessions are created over + // the lifetime of the process — important for a long-running daemon. + numLockShards = 64 + + // maxLineSize is the maximum size of a single JSON line in a .jsonl + // file. Tool results (read_file, web search, etc.) can be large, so + // we set a generous limit. The scanner starts at 64 KB and grows + // only as needed up to this cap. + maxLineSize = 10 * 1024 * 1024 // 10 MB +) + // sessionMeta holds per-session metadata stored in a .meta.json file. type sessionMeta struct { Key string `json:"key"` @@ -37,7 +52,7 @@ type sessionMeta struct { // append-only, which is both fast and crash-safe. type JSONLStore struct { dir string - locks sync.Map // map[string]*sync.Mutex, one per session + locks [numLockShards]sync.Mutex } // NewJSONLStore creates a new JSONL-backed store rooted at dir. @@ -49,10 +64,13 @@ func NewJSONLStore(dir string) (*JSONLStore, error) { return &JSONLStore{dir: dir}, nil } -// sessionLock returns (or creates) a per-session mutex. +// sessionLock returns a mutex for the given session key. +// Keys are mapped to a fixed pool of shards via FNV hash, so +// memory usage is O(1) regardless of total session count. func (s *JSONLStore) sessionLock(key string) *sync.Mutex { - v, _ := s.locks.LoadOrStore(key, &sync.Mutex{}) - return v.(*sync.Mutex) + h := fnv.New32a() + h.Write([]byte(key)) + return &s.locks[h.Sum32()%numLockShards] } func (s *JSONLStore) jsonlPath(key string) string { @@ -126,8 +144,8 @@ func readMessages(path string, skip int) ([]providers.Message, error) { var msgs []providers.Message scanner := bufio.NewScanner(f) - // Allow up to 1 MB per line for messages with large content. - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + // Allow large lines for tool results (read_file, web search, etc.). + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) lineNum := 0 for scanner.Scan() { @@ -172,7 +190,7 @@ func countLines(path string) (int, error) { n := 0 scanner := bufio.NewScanner(f) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) for scanner.Scan() { if len(scanner.Bytes()) > 0 { n++ From 1f0b85280a5d93d01be4c7b76e72577ccbda515b Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 16:12:34 +0800 Subject: [PATCH 08/10] fix(memory): always reconcile line count in TruncateHistory A crash between the JSONL append and the meta update in addMsg can leave meta.Count stale (e.g. file has 101 lines but meta says 100). The previous code only reconciled when Count==0, so a nonzero stale count was silently trusted, causing keepLast/skip to be calculated against the wrong total. Now TruncateHistory always counts the actual lines on disk. This is cheap (scan without unmarshal) and TruncateHistory is not a hot path. --- pkg/memory/jsonl.go | 19 ++++++++------- pkg/memory/jsonl_test.go | 51 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 13f450835..6e6722b96 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -328,15 +328,16 @@ func (s *JSONLStore) TruncateHistory( return err } - // If the meta count might be stale (e.g. after a crash during - // addMsg), reconcile with the actual line count on disk. - if meta.Count == 0 { - n, countErr := countLines(s.jsonlPath(sessionKey)) - if countErr != nil { - return countErr - } - meta.Count = n - } + // Always reconcile meta.Count with the actual line count on disk. + // A crash between the JSONL append and the meta update in addMsg + // leaves meta.Count stale (e.g. file has 101 lines but meta says + // 100). Counting lines is cheap — no unmarshal, just a scan — and + // TruncateHistory is not a hot path, so always re-count. + n, countErr := countLines(s.jsonlPath(sessionKey)) + if countErr != nil { + return countErr + } + meta.Count = n if keepLast <= 0 { meta.Skip = meta.Count diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go index 779cab041..356ff14ff 100644 --- a/pkg/memory/jsonl_test.go +++ b/pkg/memory/jsonl_test.go @@ -544,6 +544,57 @@ func TestCompact_ThenAppend(t *testing.T) { } } +func TestTruncateHistory_StaleMetaCount(t *testing.T) { + // Simulates a crash between JSONL append and meta update in addMsg: + // file has N+1 lines but meta.Count is still N. TruncateHistory must + // reconcile with the real line count so that keepLast is accurate. + store := newTestStore(t) + ctx := context.Background() + + // Write 10 messages normally (meta.Count = 10). + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "stale", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Simulate crash: append a line to JSONL but do NOT update meta. + // This leaves meta.Count = 10 while the file has 11 lines. + jsonlPath := store.jsonlPath("stale") + f, err := os.OpenFile(jsonlPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + t.Fatalf("open for append: %v", err) + } + _, err = f.WriteString(`{"role":"user","content":"orphan"}` + "\n") + if err != nil { + t.Fatalf("write orphan: %v", err) + } + f.Close() + + // TruncateHistory(keepLast=4) should keep the last 4 of 11 lines, + // not the last 4 of 10. + err = store.TruncateHistory(ctx, "stale", 4) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "stale") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 4 { + t.Fatalf("expected 4, got %d", len(history)) + } + // Last 4 of [a,b,c,d,e,f,g,h,i,j,orphan] = [h,i,j,orphan] + if history[0].Content != "h" { + t.Errorf("first kept = %q, want 'h'", history[0].Content) + } + if history[3].Content != "orphan" { + t.Errorf("last kept = %q, want 'orphan'", history[3].Content) + } +} + func TestCrashRecovery_PartialLine(t *testing.T) { store := newTestStore(t) ctx := context.Background() From 9c72317b9b497671ebe0de8e38993f638e5fa056 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 16:13:57 +0800 Subject: [PATCH 09/10] fix(memory): write meta before JSONL rewrite for crash safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In SetHistory and Compact, the JSONL file was rewritten before updating the meta file. If the process crashed between the two writes, the meta still had a large Skip value pointing past the now-shorter JSONL file, causing GetHistory to return empty — effectively data loss. Reverse the order: write meta (with Skip=0) first, then rewrite JSONL. On crash between the two writes, the old uncompacted file is still intact and GetHistory reads from line 1, returning stale-but-complete data. The next operation self-corrects. --- pkg/memory/jsonl.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go index 6e6722b96..222d91f02 100644 --- a/pkg/memory/jsonl.go +++ b/pkg/memory/jsonl.go @@ -361,11 +361,6 @@ func (s *JSONLStore) SetHistory( l.Lock() defer l.Unlock() - err := s.rewriteJSONL(sessionKey, history) - if err != nil { - return err - } - meta, err := s.readMeta(sessionKey) if err != nil { return err @@ -378,7 +373,16 @@ func (s *JSONLStore) SetHistory( meta.Count = len(history) meta.UpdatedAt = now - return s.writeMeta(sessionKey, meta) + // Write meta BEFORE rewriting the JSONL file. If we crash between + // the two writes, meta has Skip=0 and the old file is still intact, + // so GetHistory reads from line 1 — returning "too many" messages + // rather than losing data. The next SetHistory call corrects this. + err = s.writeMeta(sessionKey, meta) + if err != nil { + return err + } + + return s.rewriteJSONL(sessionKey, history) } // Compact physically rewrites the JSONL file, dropping all logically @@ -409,16 +413,21 @@ func (s *JSONLStore) Compact( return err } - err = s.rewriteJSONL(sessionKey, active) - if err != nil { - return err - } - + // Write meta BEFORE rewriting the JSONL file. If the process + // crashes between the two writes, meta has Skip=0 and the old + // (uncompacted) file is still intact, so GetHistory reads from + // line 1 — returning previously-truncated messages rather than + // losing data. The next Compact or TruncateHistory corrects this. meta.Skip = 0 meta.Count = len(active) meta.UpdatedAt = time.Now() - return s.writeMeta(sessionKey, meta) + err = s.writeMeta(sessionKey, meta) + if err != nil { + return err + } + + return s.rewriteJSONL(sessionKey, active) } // rewriteJSONL atomically replaces the JSONL file with the given messages. From e810331dd8d8875d440ffe2cff6d8f530a2b13b2 Mon Sep 17 00:00:00 2001 From: xiaoen <2768753269@qq.com> Date: Thu, 26 Feb 2026 16:15:11 +0800 Subject: [PATCH 10/10] fix(memory): use SetHistory in migration for crash idempotency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MigrateFromJSON previously called AddFullMessage in a loop, then renamed the .json file to .json.migrated. If the process crashed after appending some messages but before the rename, a retry would re-read the same .json and append all messages again — duplicating whatever was written before the crash. Switch to SetHistory which atomically replaces the session contents. A retry after crash overwrites the partial data instead of appending. --- pkg/memory/migration.go | 21 +++++++------- pkg/memory/migration_test.go | 56 ++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/pkg/memory/migration.go b/pkg/memory/migration.go index 5b2f69ab3..c9d5176ab 100644 --- a/pkg/memory/migration.go +++ b/pkg/memory/migration.go @@ -74,19 +74,20 @@ func MigrateFromJSON( key = strings.TrimSuffix(name, ".json") } - for _, msg := range sess.Messages { - addErr := store.AddFullMessage(ctx, key, msg) - if addErr != nil { - return migrated, fmt.Errorf( - "memory: migrate %s: add message: %w", - name, addErr, - ) - } + // Use SetHistory (atomic replace) instead of per-message + // AddFullMessage. This makes migration idempotent: if the + // process crashes after writing messages but before the + // rename below, a retry replaces the partial data cleanly + // instead of duplicating messages. + if setErr := store.SetHistory(ctx, key, sess.Messages); setErr != nil { + return migrated, fmt.Errorf( + "memory: migrate %s: set history: %w", + name, setErr, + ) } if sess.Summary != "" { - sumErr := store.SetSummary(ctx, key, sess.Summary) - if sumErr != nil { + if sumErr := store.SetSummary(ctx, key, sess.Summary); sumErr != nil { return migrated, fmt.Errorf( "memory: migrate %s: set summary: %w", name, sumErr, diff --git a/pkg/memory/migration_test.go b/pkg/memory/migration_test.go index bf16c32f8..3170758b7 100644 --- a/pkg/memory/migration_test.go +++ b/pkg/memory/migration_test.go @@ -314,6 +314,62 @@ func TestMigrateFromJSON_ColonInKey(t *testing.T) { } } +func TestMigrateFromJSON_RetryAfterCrash(t *testing.T) { + // Simulates a crash during migration: first run writes messages + // but doesn't rename the .json file. Second run must replace + // (not duplicate) the messages thanks to SetHistory semantics. + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "retry.json", jsonSession{ + Key: "retry", + Messages: []providers.Message{ + {Role: "user", Content: "one"}, + {Role: "assistant", Content: "two"}, + }, + Created: time.Now(), + Updated: time.Now(), + }) + + // First migration succeeds — writes messages and renames file. + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("first migration: %v", err) + } + if count != 1 { + t.Fatalf("expected 1, got %d", count) + } + + // Simulate "crash before rename": restore the .json file. + src := filepath.Join(sessionsDir, "retry.json.migrated") + dst := filepath.Join(sessionsDir, "retry.json") + if renameErr := os.Rename(src, dst); renameErr != nil { + t.Fatalf("restore .json: %v", renameErr) + } + + // Second migration should re-import without duplicating messages. + count, err = MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("second migration: %v", err) + } + if count != 1 { + t.Fatalf("expected 1, got %d", count) + } + + history, err := store.GetHistory(ctx, "retry") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + // Must be exactly 2 messages (not 4 from duplication). + if len(history) != 2 { + t.Fatalf("expected 2 messages (no duplicates), got %d", len(history)) + } + if history[0].Content != "one" || history[1].Content != "two" { + t.Errorf("unexpected messages: %+v", history) + } +} + func TestMigrateFromJSON_NonexistentDir(t *testing.T) { store := newTestStore(t) ctx := context.Background()