diff --git a/pkg/memory/jsonl.go b/pkg/memory/jsonl.go new file mode 100644 index 000000000..222d91f02 --- /dev/null +++ b/pkg/memory/jsonl.go @@ -0,0 +1,477 @@ +package memory + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "hash/fnv" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +const ( + // numLockShards is the fixed number of mutexes used to serialize + // per-session access. Using a sharded array instead of a map keeps + // memory bounded regardless of how many sessions are created over + // the lifetime of the process — important for a long-running daemon. + numLockShards = 64 + + // maxLineSize is the maximum size of a single JSON line in a .jsonl + // file. Tool results (read_file, web search, etc.) can be large, so + // we set a generous limit. The scanner starts at 64 KB and grows + // only as needed up to this cap. + maxLineSize = 10 * 1024 * 1024 // 10 MB +) + +// sessionMeta holds per-session metadata stored in a .meta.json file. +type sessionMeta struct { + Key string `json:"key"` + Summary string `json:"summary"` + Skip int `json:"skip"` + Count int `json:"count"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// JSONLStore implements Store using append-only JSONL files. +// +// Each session is stored as two files: +// +// {sanitized_key}.jsonl — one JSON-encoded message per line, append-only +// {sanitized_key}.meta.json — session metadata (summary, logical truncation offset) +// +// Messages are never physically deleted from the JSONL file. Instead, +// TruncateHistory records a "skip" offset in the metadata file and +// GetHistory ignores lines before that offset. This keeps all writes +// append-only, which is both fast and crash-safe. +type JSONLStore struct { + dir string + locks [numLockShards]sync.Mutex +} + +// NewJSONLStore creates a new JSONL-backed store rooted at dir. +func NewJSONLStore(dir string) (*JSONLStore, error) { + err := os.MkdirAll(dir, 0o755) + if err != nil { + return nil, fmt.Errorf("memory: create directory: %w", err) + } + return &JSONLStore{dir: dir}, nil +} + +// sessionLock returns a mutex for the given session key. +// Keys are mapped to a fixed pool of shards via FNV hash, so +// memory usage is O(1) regardless of total session count. +func (s *JSONLStore) sessionLock(key string) *sync.Mutex { + h := fnv.New32a() + h.Write([]byte(key)) + return &s.locks[h.Sum32()%numLockShards] +} + +func (s *JSONLStore) jsonlPath(key string) string { + return filepath.Join(s.dir, sanitizeKey(key)+".jsonl") +} + +func (s *JSONLStore) metaPath(key string) string { + return filepath.Join(s.dir, sanitizeKey(key)+".meta.json") +} + +// sanitizeKey converts a session key to a safe filename component. +// Mirrors pkg/session.sanitizeFilename so that migration paths match. +func sanitizeKey(key string) string { + return strings.ReplaceAll(key, ":", "_") +} + +// readMeta loads the metadata file for a session. +// Returns a zero-value sessionMeta if the file does not exist. +func (s *JSONLStore) readMeta(key string) (sessionMeta, error) { + data, err := os.ReadFile(s.metaPath(key)) + if os.IsNotExist(err) { + return sessionMeta{Key: key}, nil + } + if err != nil { + return sessionMeta{}, fmt.Errorf("memory: read meta: %w", err) + } + var meta sessionMeta + err = json.Unmarshal(data, &meta) + if err != nil { + return sessionMeta{}, fmt.Errorf("memory: decode meta: %w", err) + } + return meta, nil +} + +// writeMeta atomically writes the metadata file (temp + rename). +func (s *JSONLStore) writeMeta(key string, meta sessionMeta) error { + data, err := json.MarshalIndent(meta, "", " ") + if err != nil { + return fmt.Errorf("memory: encode meta: %w", err) + } + + target := s.metaPath(key) + tmp := target + ".tmp" + + err = os.WriteFile(tmp, data, 0o644) + if err != nil { + return fmt.Errorf("memory: write meta tmp: %w", err) + } + + err = os.Rename(tmp, target) + if err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("memory: rename meta: %w", err) + } + return nil +} + +// readMessages reads valid JSON lines from a .jsonl file, skipping +// the first `skip` lines without unmarshaling them. This avoids the +// cost of json.Unmarshal on logically truncated messages. +// Malformed trailing lines (e.g. from a crash) are silently skipped. +func readMessages(path string, skip int) ([]providers.Message, error) { + f, err := os.Open(path) + if os.IsNotExist(err) { + return []providers.Message{}, nil + } + if err != nil { + return nil, fmt.Errorf("memory: open jsonl: %w", err) + } + defer f.Close() + + var msgs []providers.Message + scanner := bufio.NewScanner(f) + // Allow large lines for tool results (read_file, web search, etc.). + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) + + lineNum := 0 + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + lineNum++ + if lineNum <= skip { + continue + } + var msg providers.Message + if json.Unmarshal(line, &msg) != nil { + // Corrupt line — likely a partial write from a crash. + // Skip it; this is the standard JSONL recovery pattern. + continue + } + msgs = append(msgs, msg) + } + if scanner.Err() != nil { + return nil, fmt.Errorf("memory: scan jsonl: %w", scanner.Err()) + } + + if msgs == nil { + msgs = []providers.Message{} + } + return msgs, nil +} + +// countLines counts the total number of non-empty lines in a .jsonl file. +// Used by TruncateHistory to reconcile a stale meta.Count without +// the overhead of unmarshaling every message. +func countLines(path string) (int, error) { + f, err := os.Open(path) + if os.IsNotExist(err) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("memory: open jsonl: %w", err) + } + defer f.Close() + + n := 0 + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) + for scanner.Scan() { + if len(scanner.Bytes()) > 0 { + n++ + } + } + return n, scanner.Err() +} + +func (s *JSONLStore) AddMessage( + _ context.Context, sessionKey, role, content string, +) error { + return s.addMsg(sessionKey, providers.Message{ + Role: role, + Content: content, + }) +} + +func (s *JSONLStore) AddFullMessage( + _ context.Context, sessionKey string, msg providers.Message, +) error { + return s.addMsg(sessionKey, msg) +} + +// addMsg is the shared implementation for AddMessage and AddFullMessage. +func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + // Append the message as a single JSON line. + line, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("memory: marshal message: %w", err) + } + line = append(line, '\n') + + f, err := os.OpenFile( + s.jsonlPath(sessionKey), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0o644, + ) + if err != nil { + return fmt.Errorf("memory: open jsonl for append: %w", err) + } + _, writeErr := f.Write(line) + closeErr := f.Close() + if writeErr != nil { + return fmt.Errorf("memory: append message: %w", writeErr) + } + if closeErr != nil { + return fmt.Errorf("memory: close jsonl: %w", closeErr) + } + + // Update metadata. + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.Count == 0 && meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Count++ + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) GetHistory( + _ context.Context, sessionKey string, +) ([]providers.Message, error) { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return nil, err + } + + // Pass meta.Skip so readMessages skips those lines without + // unmarshaling them — avoids wasted CPU on truncated messages. + msgs, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) + if err != nil { + return nil, err + } + + return msgs, nil +} + +func (s *JSONLStore) GetSummary( + _ context.Context, sessionKey string, +) (string, error) { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return "", err + } + return meta.Summary, nil +} + +func (s *JSONLStore) SetSummary( + _ context.Context, sessionKey, summary string, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Summary = summary + meta.UpdatedAt = now + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) TruncateHistory( + _ context.Context, sessionKey string, keepLast int, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + + // Always reconcile meta.Count with the actual line count on disk. + // A crash between the JSONL append and the meta update in addMsg + // leaves meta.Count stale (e.g. file has 101 lines but meta says + // 100). Counting lines is cheap — no unmarshal, just a scan — and + // TruncateHistory is not a hot path, so always re-count. + n, countErr := countLines(s.jsonlPath(sessionKey)) + if countErr != nil { + return countErr + } + meta.Count = n + + if keepLast <= 0 { + meta.Skip = meta.Count + } else { + effective := meta.Count - meta.Skip + if keepLast < effective { + meta.Skip = meta.Count - keepLast + } + } + meta.UpdatedAt = time.Now() + + return s.writeMeta(sessionKey, meta) +} + +func (s *JSONLStore) SetHistory( + _ context.Context, + sessionKey string, + history []providers.Message, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + now := time.Now() + if meta.CreatedAt.IsZero() { + meta.CreatedAt = now + } + meta.Skip = 0 + meta.Count = len(history) + meta.UpdatedAt = now + + // Write meta BEFORE rewriting the JSONL file. If we crash between + // the two writes, meta has Skip=0 and the old file is still intact, + // so GetHistory reads from line 1 — returning "too many" messages + // rather than losing data. The next SetHistory call corrects this. + err = s.writeMeta(sessionKey, meta) + if err != nil { + return err + } + + return s.rewriteJSONL(sessionKey, history) +} + +// Compact physically rewrites the JSONL file, dropping all logically +// skipped lines. This reclaims disk space that accumulates after +// repeated TruncateHistory calls. +// +// It is safe to call at any time; if there is nothing to compact +// (skip == 0) the method returns immediately. +func (s *JSONLStore) Compact( + _ context.Context, sessionKey string, +) error { + l := s.sessionLock(sessionKey) + l.Lock() + defer l.Unlock() + + meta, err := s.readMeta(sessionKey) + if err != nil { + return err + } + if meta.Skip == 0 { + return nil + } + + // Read only the active messages, skipping truncated lines + // without unmarshaling them. + active, err := readMessages(s.jsonlPath(sessionKey), meta.Skip) + if err != nil { + return err + } + + // Write meta BEFORE rewriting the JSONL file. If the process + // crashes between the two writes, meta has Skip=0 and the old + // (uncompacted) file is still intact, so GetHistory reads from + // line 1 — returning previously-truncated messages rather than + // losing data. The next Compact or TruncateHistory corrects this. + meta.Skip = 0 + meta.Count = len(active) + meta.UpdatedAt = time.Now() + + err = s.writeMeta(sessionKey, meta) + if err != nil { + return err + } + + return s.rewriteJSONL(sessionKey, active) +} + +// rewriteJSONL atomically replaces the JSONL file with the given messages. +func (s *JSONLStore) rewriteJSONL( + sessionKey string, msgs []providers.Message, +) error { + target := s.jsonlPath(sessionKey) + tmp := target + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return fmt.Errorf("memory: create jsonl tmp: %w", err) + } + + for i, msg := range msgs { + line, marshalErr := json.Marshal(msg) + if marshalErr != nil { + f.Close() + _ = os.Remove(tmp) + return fmt.Errorf("memory: marshal message %d: %w", i, marshalErr) + } + line = append(line, '\n') + _, writeErr := f.Write(line) + if writeErr != nil { + f.Close() + _ = os.Remove(tmp) + return fmt.Errorf("memory: write message %d: %w", i, writeErr) + } + } + + err = f.Close() + if err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("memory: close jsonl tmp: %w", err) + } + + err = os.Rename(tmp, target) + if err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("memory: rename jsonl: %w", err) + } + return nil +} + +func (s *JSONLStore) Close() error { + return nil +} diff --git a/pkg/memory/jsonl_test.go b/pkg/memory/jsonl_test.go new file mode 100644 index 000000000..356ff14ff --- /dev/null +++ b/pkg/memory/jsonl_test.go @@ -0,0 +1,835 @@ +package memory + +import ( + "context" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +func newTestStore(t *testing.T) *JSONLStore { + t.Helper() + store, err := NewJSONLStore(t.TempDir()) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + return store +} + +func TestNewJSONLStore_CreatesDirectory(t *testing.T) { + dir := filepath.Join(t.TempDir(), "nested", "sessions") + store, err := NewJSONLStore(dir) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + + info, err := os.Stat(dir) + if err != nil { + t.Fatalf("Stat: %v", err) + } + if !info.IsDir() { + t.Errorf("expected directory, got file") + } +} + +func TestAddMessage_BasicRoundtrip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + err := store.AddMessage(ctx, "s1", "user", "hello") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + err = store.AddMessage(ctx, "s1", "assistant", "hi there") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "s1") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 messages, got %d", len(history)) + } + if history[0].Role != "user" || history[0].Content != "hello" { + t.Errorf("msg[0] = %+v", history[0]) + } + if history[1].Role != "assistant" || history[1].Content != "hi there" { + t.Errorf("msg[1] = %+v", history[1]) + } +} + +func TestAddMessage_AutoCreatesSession(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Adding a message to a non-existent session should work. + err := store.AddMessage(ctx, "new-session", "user", "first message") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "new-session") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1 message, got %d", len(history)) + } +} + +func TestAddFullMessage_WithToolCalls(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + msg := providers.Message{ + Role: "assistant", + Content: "Let me search that.", + ToolCalls: []providers.ToolCall{ + { + ID: "call_abc", + Type: "function", + Function: &providers.FunctionCall{ + Name: "web_search", + Arguments: `{"q":"golang jsonl"}`, + }, + }, + }, + } + + err := store.AddFullMessage(ctx, "tc", msg) + if err != nil { + t.Fatalf("AddFullMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "tc") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + if len(history[0].ToolCalls) != 1 { + t.Fatalf("expected 1 tool call, got %d", len(history[0].ToolCalls)) + } + tc := history[0].ToolCalls[0] + if tc.ID != "call_abc" { + t.Errorf("tool call ID = %q", tc.ID) + } + if tc.Function == nil || tc.Function.Name != "web_search" { + t.Errorf("tool call function = %+v", tc.Function) + } +} + +func TestAddFullMessage_ToolCallID(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + msg := providers.Message{ + Role: "tool", + Content: "search results here", + ToolCallID: "call_abc", + } + + err := store.AddFullMessage(ctx, "tr", msg) + if err != nil { + t.Fatalf("AddFullMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "tr") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + if history[0].ToolCallID != "call_abc" { + t.Errorf("ToolCallID = %q", history[0].ToolCallID) + } +} + +func TestGetHistory_EmptySession(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + history, err := store.GetHistory(ctx, "nonexistent") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if history == nil { + t.Fatal("expected non-nil empty slice") + } + if len(history) != 0 { + t.Errorf("expected 0 messages, got %d", len(history)) + } +} + +func TestGetHistory_Ordering(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage( + ctx, "order", + "user", + string(rune('a'+i)), + ) + if err != nil { + t.Fatalf("AddMessage(%d): %v", i, err) + } + } + + history, err := store.GetHistory(ctx, "order") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 5 { + t.Fatalf("expected 5, got %d", len(history)) + } + for i := 0; i < 5; i++ { + expected := string(rune('a' + i)) + if history[i].Content != expected { + t.Errorf("msg[%d].Content = %q, want %q", i, history[i].Content, expected) + } + } +} + +func TestSetSummary_GetSummary(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // No summary yet. + summary, err := store.GetSummary(ctx, "s1") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "" { + t.Errorf("expected empty, got %q", summary) + } + + // Set a summary. + err = store.SetSummary(ctx, "s1", "talked about Go") + if err != nil { + t.Fatalf("SetSummary: %v", err) + } + + summary, err = store.GetSummary(ctx, "s1") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "talked about Go" { + t.Errorf("summary = %q", summary) + } + + // Update summary. + err = store.SetSummary(ctx, "s1", "updated summary") + if err != nil { + t.Fatalf("SetSummary: %v", err) + } + + summary, err = store.GetSummary(ctx, "s1") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "updated summary" { + t.Errorf("summary = %q", summary) + } +} + +func TestTruncateHistory_KeepLast(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 10; i++ { + err := store.AddMessage( + ctx, "trunc", + "user", + string(rune('a'+i)), + ) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "trunc", 4) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "trunc") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 4 { + t.Fatalf("expected 4, got %d", len(history)) + } + // Should be the last 4: g, h, i, j + if history[0].Content != "g" { + t.Errorf("first kept = %q, want 'g'", history[0].Content) + } + if history[3].Content != "j" { + t.Errorf("last kept = %q, want 'j'", history[3].Content) + } +} + +func TestTruncateHistory_KeepZero(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "empty", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "empty", 0) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "empty") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 0 { + t.Errorf("expected 0, got %d", len(history)) + } +} + +func TestTruncateHistory_KeepMoreThanExists(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 3; i++ { + err := store.AddMessage(ctx, "few", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Keep 100, but only 3 exist — should keep all. + err := store.TruncateHistory(ctx, "few", 100) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "few") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Errorf("expected 3, got %d", len(history)) + } +} + +func TestSetHistory_ReplacesAll(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Add some initial messages. + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "replace", "user", "old") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Replace with new history. + newHistory := []providers.Message{ + {Role: "user", Content: "new1"}, + {Role: "assistant", Content: "new2"}, + } + err := store.SetHistory(ctx, "replace", newHistory) + if err != nil { + t.Fatalf("SetHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "replace") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2, got %d", len(history)) + } + if history[0].Content != "new1" || history[1].Content != "new2" { + t.Errorf("history = %+v", history) + } +} + +func TestSetHistory_ResetsSkip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Add messages and truncate. + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "skip-reset", "user", "old") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + err := store.TruncateHistory(ctx, "skip-reset", 3) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + // SetHistory should reset skip to 0. + newHistory := []providers.Message{ + {Role: "user", Content: "fresh"}, + } + err = store.SetHistory(ctx, "skip-reset", newHistory) + if err != nil { + t.Fatalf("SetHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "skip-reset") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + if history[0].Content != "fresh" { + t.Errorf("content = %q", history[0].Content) + } +} + +func TestColonInKey(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + err := store.AddMessage(ctx, "telegram:123", "user", "hi") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + history, err := store.GetHistory(ctx, "telegram:123") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1, got %d", len(history)) + } + + // Verify the file is named with underscore. + jsonlFile := filepath.Join(store.dir, "telegram_123.jsonl") + if _, statErr := os.Stat(jsonlFile); statErr != nil { + t.Errorf("expected file %s to exist: %v", jsonlFile, statErr) + } +} + +func TestCompact_RemovesSkippedMessages(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Write 10 messages, then truncate to keep last 3. + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "compact", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + err := store.TruncateHistory(ctx, "compact", 3) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + // Before compact: file still has 10 lines. + allOnDisk, err := readMessages(store.jsonlPath("compact"), 0) + if err != nil { + t.Fatalf("readMessages: %v", err) + } + if len(allOnDisk) != 10 { + t.Fatalf("before compact: expected 10 on disk, got %d", len(allOnDisk)) + } + + // Compact. + err = store.Compact(ctx, "compact") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + // After compact: file should have only 3 lines. + allOnDisk, err = readMessages(store.jsonlPath("compact"), 0) + if err != nil { + t.Fatalf("readMessages: %v", err) + } + if len(allOnDisk) != 3 { + t.Fatalf("after compact: expected 3 on disk, got %d", len(allOnDisk)) + } + + // GetHistory should still return the same 3 messages. + history, err := store.GetHistory(ctx, "compact") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("expected 3, got %d", len(history)) + } + if history[0].Content != "h" || history[2].Content != "j" { + t.Errorf("wrong content: %+v", history) + } +} + +func TestCompact_NoOpWhenNoSkip(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 5; i++ { + err := store.AddMessage(ctx, "noop", "user", "msg") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Compact without prior truncation — should be a no-op. + err := store.Compact(ctx, "noop") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + history, err := store.GetHistory(ctx, "noop") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 5 { + t.Errorf("expected 5, got %d", len(history)) + } +} + +func TestCompact_ThenAppend(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 8; i++ { + err := store.AddMessage(ctx, "cap", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + err := store.TruncateHistory(ctx, "cap", 2) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + err = store.Compact(ctx, "cap") + if err != nil { + t.Fatalf("Compact: %v", err) + } + + // Append after compaction should work correctly. + err = store.AddMessage(ctx, "cap", "user", "new") + if err != nil { + t.Fatalf("AddMessage after compact: %v", err) + } + + history, err := store.GetHistory(ctx, "cap") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 3 { + t.Fatalf("expected 3, got %d", len(history)) + } + // g, h (kept from truncation), new (appended after compaction). + if history[0].Content != "g" { + t.Errorf("first = %q, want 'g'", history[0].Content) + } + if history[2].Content != "new" { + t.Errorf("last = %q, want 'new'", history[2].Content) + } +} + +func TestTruncateHistory_StaleMetaCount(t *testing.T) { + // Simulates a crash between JSONL append and meta update in addMsg: + // file has N+1 lines but meta.Count is still N. TruncateHistory must + // reconcile with the real line count so that keepLast is accurate. + store := newTestStore(t) + ctx := context.Background() + + // Write 10 messages normally (meta.Count = 10). + for i := 0; i < 10; i++ { + err := store.AddMessage(ctx, "stale", "user", string(rune('a'+i))) + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + // Simulate crash: append a line to JSONL but do NOT update meta. + // This leaves meta.Count = 10 while the file has 11 lines. + jsonlPath := store.jsonlPath("stale") + f, err := os.OpenFile(jsonlPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + t.Fatalf("open for append: %v", err) + } + _, err = f.WriteString(`{"role":"user","content":"orphan"}` + "\n") + if err != nil { + t.Fatalf("write orphan: %v", err) + } + f.Close() + + // TruncateHistory(keepLast=4) should keep the last 4 of 11 lines, + // not the last 4 of 10. + err = store.TruncateHistory(ctx, "stale", 4) + if err != nil { + t.Fatalf("TruncateHistory: %v", err) + } + + history, err := store.GetHistory(ctx, "stale") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 4 { + t.Fatalf("expected 4, got %d", len(history)) + } + // Last 4 of [a,b,c,d,e,f,g,h,i,j,orphan] = [h,i,j,orphan] + if history[0].Content != "h" { + t.Errorf("first kept = %q, want 'h'", history[0].Content) + } + if history[3].Content != "orphan" { + t.Errorf("last kept = %q, want 'orphan'", history[3].Content) + } +} + +func TestCrashRecovery_PartialLine(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + // Write a valid message first. + err := store.AddMessage(ctx, "crash", "user", "valid") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + // Simulate a crash by appending a partial JSON line directly. + jsonlPath := store.jsonlPath("crash") + f, err := os.OpenFile(jsonlPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + t.Fatalf("open for append: %v", err) + } + _, err = f.WriteString(`{"role":"user","content":"incomple`) + if err != nil { + t.Fatalf("write partial: %v", err) + } + f.Close() + + // GetHistory should return only the valid message. + history, err := store.GetHistory(ctx, "crash") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1 valid message, got %d", len(history)) + } + if history[0].Content != "valid" { + t.Errorf("content = %q", history[0].Content) + } +} + +func TestPersistence_AcrossInstances(t *testing.T) { + dir := t.TempDir() + ctx := context.Background() + + // Write with first instance. + store1, err := NewJSONLStore(dir) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + err = store1.AddMessage(ctx, "persist", "user", "remember me") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + err = store1.SetSummary(ctx, "persist", "a test session") + if err != nil { + t.Fatalf("SetSummary: %v", err) + } + store1.Close() + + // Read with second instance. + store2, err := NewJSONLStore(dir) + if err != nil { + t.Fatalf("NewJSONLStore: %v", err) + } + defer store2.Close() + + history, err := store2.GetHistory(ctx, "persist") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 || history[0].Content != "remember me" { + t.Errorf("history = %+v", history) + } + + summary, err := store2.GetSummary(ctx, "persist") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "a test session" { + t.Errorf("summary = %q", summary) + } +} + +func TestConcurrent_AddAndRead(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + var wg sync.WaitGroup + const goroutines = 10 + const msgsPerGoroutine = 20 + + // Concurrent writes. + for g := 0; g < goroutines; g++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < msgsPerGoroutine; i++ { + _ = store.AddMessage(ctx, "concurrent", "user", "msg") + } + }() + } + wg.Wait() + + history, err := store.GetHistory(ctx, "concurrent") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + expected := goroutines * msgsPerGoroutine + if len(history) != expected { + t.Errorf("expected %d messages, got %d", expected, len(history)) + } +} + +func TestConcurrent_SummarizeRace(t *testing.T) { + // Simulates the #704 race: one goroutine adds messages while + // another truncates + sets summary — like summarizeSession(). + store := newTestStore(t) + ctx := context.Background() + + // Seed with some messages. + for i := 0; i < 20; i++ { + err := store.AddMessage(ctx, "race", "user", "seed") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + } + + var wg sync.WaitGroup + + // Writer goroutine (main agent loop). + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + _ = store.AddMessage(ctx, "race", "user", "new") + } + }() + + // Summarizer goroutine (background task). + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + _ = store.SetSummary(ctx, "race", "summary") + _ = store.TruncateHistory(ctx, "race", 5) + } + }() + + wg.Wait() + + // Verify the store is still in a consistent state. + _, err := store.GetHistory(ctx, "race") + if err != nil { + t.Fatalf("GetHistory after race: %v", err) + } + _, err = store.GetSummary(ctx, "race") + if err != nil { + t.Fatalf("GetSummary after race: %v", err) + } +} + +func TestMultipleSessions_Isolation(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + err := store.AddMessage(ctx, "s1", "user", "msg for s1") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + err = store.AddMessage(ctx, "s2", "user", "msg for s2") + if err != nil { + t.Fatalf("AddMessage: %v", err) + } + + h1, err := store.GetHistory(ctx, "s1") + if err != nil { + t.Fatalf("GetHistory s1: %v", err) + } + h2, err := store.GetHistory(ctx, "s2") + if err != nil { + t.Fatalf("GetHistory s2: %v", err) + } + + if len(h1) != 1 || h1[0].Content != "msg for s1" { + t.Errorf("s1 history = %+v", h1) + } + if len(h2) != 1 || h2[0].Content != "msg for s2" { + t.Errorf("s2 history = %+v", h2) + } +} + +func BenchmarkAddMessage(b *testing.B) { + dir := b.TempDir() + store, err := NewJSONLStore(dir) + if err != nil { + b.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + ctx := context.Background() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = store.AddMessage(ctx, "bench", "user", "benchmark message content") + } +} + +func BenchmarkGetHistory_100(b *testing.B) { + dir := b.TempDir() + store, err := NewJSONLStore(dir) + if err != nil { + b.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + ctx := context.Background() + + for i := 0; i < 100; i++ { + _ = store.AddMessage(ctx, "bench", "user", "message content") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = store.GetHistory(ctx, "bench") + } +} + +func BenchmarkGetHistory_1000(b *testing.B) { + dir := b.TempDir() + store, err := NewJSONLStore(dir) + if err != nil { + b.Fatalf("NewJSONLStore: %v", err) + } + defer store.Close() + ctx := context.Background() + + for i := 0; i < 1000; i++ { + _ = store.AddMessage(ctx, "bench", "user", "message content") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = store.GetHistory(ctx, "bench") + } +} diff --git a/pkg/memory/migration.go b/pkg/memory/migration.go new file mode 100644 index 000000000..c9d5176ab --- /dev/null +++ b/pkg/memory/migration.go @@ -0,0 +1,108 @@ +package memory + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +// jsonSession mirrors pkg/session.Session for migration purposes. +type jsonSession struct { + Key string `json:"key"` + Messages []providers.Message `json:"messages"` + Summary string `json:"summary,omitempty"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` +} + +// MigrateFromJSON reads legacy sessions/*.json files from sessionsDir, +// writes them into the Store, and renames each migrated file to +// .json.migrated as a backup. Returns the number of sessions migrated. +// +// Files that fail to parse are logged and skipped. Already-migrated +// files (.json.migrated) are ignored, making the function idempotent. +func MigrateFromJSON( + ctx context.Context, sessionsDir string, store Store, +) (int, error) { + entries, err := os.ReadDir(sessionsDir) + if os.IsNotExist(err) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("memory: read sessions dir: %w", err) + } + + migrated := 0 + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".json") { + continue + } + // Skip already-migrated files. + if strings.HasSuffix(name, ".migrated") { + continue + } + + srcPath := filepath.Join(sessionsDir, name) + + data, readErr := os.ReadFile(srcPath) + if readErr != nil { + log.Printf("memory: migrate: skip %s: %v", name, readErr) + continue + } + + var sess jsonSession + if parseErr := json.Unmarshal(data, &sess); parseErr != nil { + log.Printf("memory: migrate: skip %s: %v", name, parseErr) + continue + } + + // Use the key from the JSON content, not the filename. + // Filenames are sanitized (":" → "_") but keys are not. + key := sess.Key + if key == "" { + key = strings.TrimSuffix(name, ".json") + } + + // Use SetHistory (atomic replace) instead of per-message + // AddFullMessage. This makes migration idempotent: if the + // process crashes after writing messages but before the + // rename below, a retry replaces the partial data cleanly + // instead of duplicating messages. + if setErr := store.SetHistory(ctx, key, sess.Messages); setErr != nil { + return migrated, fmt.Errorf( + "memory: migrate %s: set history: %w", + name, setErr, + ) + } + + if sess.Summary != "" { + if sumErr := store.SetSummary(ctx, key, sess.Summary); sumErr != nil { + return migrated, fmt.Errorf( + "memory: migrate %s: set summary: %w", + name, sumErr, + ) + } + } + + // Rename to .migrated as backup (not delete). + renameErr := os.Rename(srcPath, srcPath+".migrated") + if renameErr != nil { + log.Printf("memory: migrate: rename %s: %v", name, renameErr) + } + + migrated++ + } + + return migrated, nil +} diff --git a/pkg/memory/migration_test.go b/pkg/memory/migration_test.go new file mode 100644 index 000000000..3170758b7 --- /dev/null +++ b/pkg/memory/migration_test.go @@ -0,0 +1,384 @@ +package memory + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +func writeJSONSession( + t *testing.T, dir string, filename string, sess jsonSession, +) { + t.Helper() + data, err := json.MarshalIndent(sess, "", " ") + if err != nil { + t.Fatalf("marshal session: %v", err) + } + err = os.WriteFile(filepath.Join(dir, filename), data, 0o644) + if err != nil { + t.Fatalf("write session file: %v", err) + } +} + +func TestMigrateFromJSON_Basic(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "test.json", jsonSession{ + Key: "test", + Messages: []providers.Message{ + {Role: "user", Content: "hello"}, + {Role: "assistant", Content: "hi"}, + }, + Summary: "A greeting.", + Created: time.Now(), + Updated: time.Now(), + }) + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1 migrated, got %d", count) + } + + history, err := store.GetHistory(ctx, "test") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 messages, got %d", len(history)) + } + if history[0].Content != "hello" || history[1].Content != "hi" { + t.Errorf("unexpected messages: %+v", history) + } + + summary, err := store.GetSummary(ctx, "test") + if err != nil { + t.Fatalf("GetSummary: %v", err) + } + if summary != "A greeting." { + t.Errorf("summary = %q", summary) + } +} + +func TestMigrateFromJSON_WithToolCalls(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "tools.json", jsonSession{ + Key: "tools", + Messages: []providers.Message{ + { + Role: "assistant", + Content: "Searching...", + ToolCalls: []providers.ToolCall{ + { + ID: "call_1", + Type: "function", + Function: &providers.FunctionCall{ + Name: "web_search", + Arguments: `{"q":"test"}`, + }, + }, + }, + }, + { + Role: "tool", + Content: "result", + ToolCallID: "call_1", + }, + }, + Created: time.Now(), + Updated: time.Now(), + }) + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1, got %d", count) + } + + history, err := store.GetHistory(ctx, "tools") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 2 { + t.Fatalf("expected 2 messages, got %d", len(history)) + } + if len(history[0].ToolCalls) != 1 { + t.Fatalf("expected 1 tool call, got %d", len(history[0].ToolCalls)) + } + if history[0].ToolCalls[0].Function.Name != "web_search" { + t.Errorf("function = %q", history[0].ToolCalls[0].Function.Name) + } + if history[1].ToolCallID != "call_1" { + t.Errorf("ToolCallID = %q", history[1].ToolCallID) + } +} + +func TestMigrateFromJSON_MultipleFiles(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + for i := 0; i < 3; i++ { + key := string(rune('a' + i)) + writeJSONSession(t, sessionsDir, key+".json", jsonSession{ + Key: key, + Messages: []providers.Message{{Role: "user", Content: "msg " + key}}, + Created: time.Now(), + Updated: time.Now(), + }) + } + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 3 { + t.Errorf("expected 3, got %d", count) + } + + for i := 0; i < 3; i++ { + key := string(rune('a' + i)) + history, histErr := store.GetHistory(ctx, key) + if histErr != nil { + t.Fatalf("GetHistory(%q): %v", key, histErr) + } + if len(history) != 1 { + t.Errorf("session %q: expected 1 msg, got %d", key, len(history)) + } + } +} + +func TestMigrateFromJSON_InvalidJSON(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + // One valid, one invalid. + writeJSONSession(t, sessionsDir, "good.json", jsonSession{ + Key: "good", + Messages: []providers.Message{{Role: "user", Content: "ok"}}, + Created: time.Now(), + Updated: time.Now(), + }) + err := os.WriteFile( + filepath.Join(sessionsDir, "bad.json"), + []byte("{invalid json"), + 0o644, + ) + if err != nil { + t.Fatalf("write bad file: %v", err) + } + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1 (bad file skipped), got %d", count) + } + + history, err := store.GetHistory(ctx, "good") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Errorf("expected 1 message, got %d", len(history)) + } +} + +func TestMigrateFromJSON_RenamesFiles(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "rename.json", jsonSession{ + Key: "rename", + Messages: []providers.Message{{Role: "user", Content: "hi"}}, + Created: time.Now(), + Updated: time.Now(), + }) + + _, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + + // Original .json should not exist. + _, statErr := os.Stat(filepath.Join(sessionsDir, "rename.json")) + if !os.IsNotExist(statErr) { + t.Error("rename.json should have been renamed") + } + // .json.migrated should exist. + _, statErr = os.Stat( + filepath.Join(sessionsDir, "rename.json.migrated"), + ) + if statErr != nil { + t.Errorf("rename.json.migrated should exist: %v", statErr) + } +} + +func TestMigrateFromJSON_Idempotent(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "idem.json", jsonSession{ + Key: "idem", + Messages: []providers.Message{{Role: "user", Content: "once"}}, + Created: time.Now(), + Updated: time.Now(), + }) + + count1, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("first migration: %v", err) + } + if count1 != 1 { + t.Errorf("first run: expected 1, got %d", count1) + } + + // Second run should find only .migrated files, skip them. + count2, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("second migration: %v", err) + } + if count2 != 0 { + t.Errorf("second run: expected 0, got %d", count2) + } + + history, err := store.GetHistory(ctx, "idem") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Errorf("expected 1 message, got %d", len(history)) + } +} + +func TestMigrateFromJSON_ColonInKey(t *testing.T) { + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + // File is named telegram_123 (sanitized), but the key inside is telegram:123. + writeJSONSession(t, sessionsDir, "telegram_123.json", jsonSession{ + Key: "telegram:123", + Messages: []providers.Message{{Role: "user", Content: "from telegram"}}, + Created: time.Now(), + Updated: time.Now(), + }) + + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 1 { + t.Errorf("expected 1, got %d", count) + } + + // Accessible via the original key "telegram:123". + history, err := store.GetHistory(ctx, "telegram:123") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history) != 1 { + t.Fatalf("expected 1 message, got %d", len(history)) + } + if history[0].Content != "from telegram" { + t.Errorf("content = %q", history[0].Content) + } + + // In the file-based store, "telegram:123" and "telegram_123" both + // sanitize to the same filename, so they share storage. This is + // expected — the colon-to-underscore mapping is a one-way function. + history2, err := store.GetHistory(ctx, "telegram_123") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + if len(history2) != 1 { + t.Errorf("expected 1 (same file), got %d", len(history2)) + } +} + +func TestMigrateFromJSON_RetryAfterCrash(t *testing.T) { + // Simulates a crash during migration: first run writes messages + // but doesn't rename the .json file. Second run must replace + // (not duplicate) the messages thanks to SetHistory semantics. + sessionsDir := t.TempDir() + store := newTestStore(t) + ctx := context.Background() + + writeJSONSession(t, sessionsDir, "retry.json", jsonSession{ + Key: "retry", + Messages: []providers.Message{ + {Role: "user", Content: "one"}, + {Role: "assistant", Content: "two"}, + }, + Created: time.Now(), + Updated: time.Now(), + }) + + // First migration succeeds — writes messages and renames file. + count, err := MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("first migration: %v", err) + } + if count != 1 { + t.Fatalf("expected 1, got %d", count) + } + + // Simulate "crash before rename": restore the .json file. + src := filepath.Join(sessionsDir, "retry.json.migrated") + dst := filepath.Join(sessionsDir, "retry.json") + if renameErr := os.Rename(src, dst); renameErr != nil { + t.Fatalf("restore .json: %v", renameErr) + } + + // Second migration should re-import without duplicating messages. + count, err = MigrateFromJSON(ctx, sessionsDir, store) + if err != nil { + t.Fatalf("second migration: %v", err) + } + if count != 1 { + t.Fatalf("expected 1, got %d", count) + } + + history, err := store.GetHistory(ctx, "retry") + if err != nil { + t.Fatalf("GetHistory: %v", err) + } + // Must be exactly 2 messages (not 4 from duplication). + if len(history) != 2 { + t.Fatalf("expected 2 messages (no duplicates), got %d", len(history)) + } + if history[0].Content != "one" || history[1].Content != "two" { + t.Errorf("unexpected messages: %+v", history) + } +} + +func TestMigrateFromJSON_NonexistentDir(t *testing.T) { + store := newTestStore(t) + ctx := context.Background() + + count, err := MigrateFromJSON(ctx, "/nonexistent/path", store) + if err != nil { + t.Fatalf("MigrateFromJSON: %v", err) + } + if count != 0 { + t.Errorf("expected 0, got %d", count) + } +} diff --git a/pkg/memory/store.go b/pkg/memory/store.go new file mode 100644 index 000000000..b6e11707d --- /dev/null +++ b/pkg/memory/store.go @@ -0,0 +1,42 @@ +package memory + +import ( + "context" + + "github.com/sipeed/picoclaw/pkg/providers" +) + +// Store defines an interface for persistent session storage. +// Each method is an atomic operation — there is no separate Save() call. +type Store interface { + // AddMessage appends a simple text message to a session. + AddMessage(ctx context.Context, sessionKey, role, content string) error + + // AddFullMessage appends a complete message (with tool calls, etc.) to a session. + AddFullMessage(ctx context.Context, sessionKey string, msg providers.Message) error + + // GetHistory returns all messages for a session in insertion order. + // Returns an empty slice (not nil) if the session does not exist. + GetHistory(ctx context.Context, sessionKey string) ([]providers.Message, error) + + // GetSummary returns the conversation summary for a session. + // Returns an empty string if no summary exists. + GetSummary(ctx context.Context, sessionKey string) (string, error) + + // SetSummary updates the conversation summary for a session. + SetSummary(ctx context.Context, sessionKey, summary string) error + + // TruncateHistory removes all but the last keepLast messages from a session. + // If keepLast <= 0, all messages are removed. + TruncateHistory(ctx context.Context, sessionKey string, keepLast int) error + + // SetHistory replaces all messages in a session with the provided history. + SetHistory(ctx context.Context, sessionKey string, history []providers.Message) error + + // Compact reclaims storage by physically removing logically truncated + // data. Backends that do not accumulate dead data may return nil. + Compact(ctx context.Context, sessionKey string) error + + // Close releases any resources held by the store. + Close() error +}