From e4d3650005b980d03c3671c2932a51ed0ca1c8a3 Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Tue, 24 Feb 2026 21:24:32 +0900 Subject: [PATCH 1/7] feat(media): integrate TTL cleanup into FileMediaStore Add background TTL-based cleanup (L2 safety net) directly into FileMediaStore so file deletion and in-memory ref removal happen atomically under the same mutex, preventing dangling references. - Add storedAt timestamp and refToScope reverse map to mediaEntry - Add CleanExpired() for atomic TTL-based expiration - Add Start()/Stop() for background goroutine lifecycle - Add MediaCleanupConfig (enabled, max_age, interval) to config - Wire up in cmd_gateway.go with config-driven defaults - Add 8 new tests including concurrent cleanup safety Co-Authored-By: Claude Opus 4.6 --- cmd/picoclaw/cmd_gateway.go | 10 +- pkg/config/config.go | 15 ++- pkg/config/defaults.go | 5 + pkg/media/store.go | 109 ++++++++++++++++- pkg/media/store_test.go | 235 +++++++++++++++++++++++++++++++++++- 5 files changed, 363 insertions(+), 11 deletions(-) diff --git a/cmd/picoclaw/cmd_gateway.go b/cmd/picoclaw/cmd_gateway.go index 798ad2813..2ec60317f 100644 --- a/cmd/picoclaw/cmd_gateway.go +++ b/cmd/picoclaw/cmd_gateway.go @@ -122,8 +122,13 @@ func gatewayCmd() { return tools.SilentResult(response) }) - // Create media store for file lifecycle management - mediaStore := media.NewFileMediaStore() + // Create media store for file lifecycle management with TTL cleanup + mediaStore := media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ + Enabled: cfg.Tools.MediaCleanup.Enabled, + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, + }) + mediaStore.Start() channelManager, err := channels.NewManager(cfg, msgBus, mediaStore) if err != nil { @@ -200,6 +205,7 @@ func gatewayCmd() { deviceService.Stop() heartbeatService.Stop() cronService.Stop() + mediaStore.Stop() agentLoop.Stop() fmt.Println("✓ Gateway stopped") } diff --git a/pkg/config/config.go b/pkg/config/config.go index fd5def625..eef9d9261 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -504,11 +504,18 @@ type ExecConfig struct { CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"` } +type MediaCleanupConfig struct { + Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"` + MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"` + Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"` +} + type ToolsConfig struct { - Web WebToolsConfig `json:"web"` - Cron CronToolsConfig `json:"cron"` - Exec ExecConfig `json:"exec"` - Skills SkillsToolsConfig `json:"skills"` + Web WebToolsConfig `json:"web"` + Cron CronToolsConfig `json:"cron"` + Exec ExecConfig `json:"exec"` + Skills SkillsToolsConfig `json:"skills"` + MediaCleanup MediaCleanupConfig `json:"media_cleanup"` } type SkillsToolsConfig struct { diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 604b53e24..fb2e8ae22 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -291,6 +291,11 @@ func DefaultConfig() *Config { Port: 18790, }, Tools: ToolsConfig{ + MediaCleanup: MediaCleanupConfig{ + Enabled: true, + MaxAge: 30, + Interval: 5, + }, Web: WebToolsConfig{ Brave: BraveConfig{ Enabled: false, diff --git a/pkg/media/store.go b/pkg/media/store.go index 2df4420e9..6851f9512 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -2,8 +2,10 @@ package media import ( "fmt" + "log" "os" "sync" + "time" "github.com/google/uuid" ) @@ -35,8 +37,16 @@ type MediaStore interface { // mediaEntry holds the path and metadata for a stored media file. type mediaEntry struct { - path string - meta MediaMeta + path string + meta MediaMeta + storedAt time.Time +} + +// MediaCleanerConfig configures the background TTL cleanup. +type MediaCleanerConfig struct { + Enabled bool + MaxAge time.Duration + Interval time.Duration } // FileMediaStore is a pure in-memory implementation of MediaStore. @@ -45,13 +55,33 @@ type FileMediaStore struct { mu sync.RWMutex refs map[string]mediaEntry scopeToRefs map[string]map[string]struct{} + refToScope map[string]string + + cleanerCfg MediaCleanerConfig + stop chan struct{} + once sync.Once + nowFunc func() time.Time // for testing } -// NewFileMediaStore creates a new FileMediaStore. +// NewFileMediaStore creates a new FileMediaStore without background cleanup. func NewFileMediaStore() *FileMediaStore { return &FileMediaStore{ refs: make(map[string]mediaEntry), scopeToRefs: make(map[string]map[string]struct{}), + refToScope: make(map[string]string), + nowFunc: time.Now, + } +} + +// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup. +func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore { + return &FileMediaStore{ + refs: make(map[string]mediaEntry), + scopeToRefs: make(map[string]map[string]struct{}), + refToScope: make(map[string]string), + cleanerCfg: cfg, + stop: make(chan struct{}), + nowFunc: time.Now, } } @@ -66,11 +96,12 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) ( s.mu.Lock() defer s.mu.Unlock() - s.refs[ref] = mediaEntry{path: localPath, meta: meta} + s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()} if s.scopeToRefs[scope] == nil { s.scopeToRefs[scope] = make(map[string]struct{}) } s.scopeToRefs[scope][ref] = struct{}{} + s.refToScope[ref] = scope return ref, nil } @@ -115,9 +146,79 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { // Log but continue — best effort cleanup } delete(s.refs, ref) + delete(s.refToScope, ref) } } delete(s.scopeToRefs, scope) return nil } + +// CleanExpired removes all entries older than MaxAge. +// Both the file on disk and the in-memory references are deleted atomically +// under the same mutex, preventing dangling references. +func (s *FileMediaStore) CleanExpired() int { + s.mu.Lock() + defer s.mu.Unlock() + + cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge) + removed := 0 + + for ref, entry := range s.refs { + if entry.storedAt.Before(cutoff) { + if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) { + // Log but continue — best effort cleanup + } + + scope := s.refToScope[ref] + if scopeRefs, ok := s.scopeToRefs[scope]; ok { + delete(scopeRefs, ref) + if len(scopeRefs) == 0 { + delete(s.scopeToRefs, scope) + } + } + + delete(s.refs, ref) + delete(s.refToScope, ref) + removed++ + } + } + + return removed +} + +// Start begins the background cleanup goroutine if cleanup is enabled. +func (s *FileMediaStore) Start() { + if !s.cleanerCfg.Enabled || s.stop == nil { + return + } + + log.Printf("[media] cleanup enabled: interval=%s, max_age=%s", + s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) + + go func() { + ticker := time.NewTicker(s.cleanerCfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if n := s.CleanExpired(); n > 0 { + log.Printf("[media] cleanup: removed %d expired entries", n) + } + case <-s.stop: + return + } + } + }() +} + +// Stop terminates the background cleanup goroutine. +func (s *FileMediaStore) Stop() { + if s.stop == nil { + return + } + s.once.Do(func() { + close(s.stop) + }) +} diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index 95bd1eb7a..e934bf9c0 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -1,11 +1,13 @@ package media import ( + "fmt" "os" "path/filepath" "strings" "sync" "testing" + "time" ) func createTempFile(t *testing.T, dir, name string) string { @@ -140,7 +142,8 @@ func TestStoreNonexistentFile(t *testing.T) { t.Error("Store should fail for nonexistent file") } // Error message should include the underlying os error, not just "file does not exist" - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), "no such file or directory") && + !strings.Contains(err.Error(), "cannot find") { t.Errorf("Error should contain OS error detail, got: %v", err) } } @@ -221,3 +224,233 @@ func TestConcurrentSafety(t *testing.T) { wg.Wait() } + +// --- TTL cleanup tests --- + +func newTestStoreWithCleanup(maxAge time.Duration) *FileMediaStore { + s := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: maxAge, + Interval: time.Hour, // won't tick in tests + }) + return s +} + +func TestCleanExpiredRemovesOldEntries(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + + path := createTempFile(t, dir, "old.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + // Advance clock to present + store.nowFunc = func() time.Time { return now } + removed := store.CleanExpired() + + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + if _, err := store.Resolve(ref); err == nil { + t.Error("expired ref should be unresolvable") + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Error("expired file should be deleted") + } +} + +func TestCleanExpiredKeepsNonExpired(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now } + + path := createTempFile(t, dir, "fresh.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + removed := store.CleanExpired() + if removed != 0 { + t.Errorf("expected 0 removed, got %d", removed) + } + + if _, err := store.Resolve(ref); err != nil { + t.Errorf("fresh ref should still resolve: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Error("fresh file should still exist") + } +} + +func TestCleanExpiredMixedAges(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + // Store old entry + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + oldPath := createTempFile(t, dir, "old.jpg") + oldRef, _ := store.Store(oldPath, MediaMeta{Source: "test"}, "scope1") + + // Store fresh entry + store.nowFunc = func() time.Time { return now } + freshPath := createTempFile(t, dir, "fresh.jpg") + freshRef, _ := store.Store(freshPath, MediaMeta{Source: "test"}, "scope1") + + removed := store.CleanExpired() + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + + if _, err := store.Resolve(oldRef); err == nil { + t.Error("old ref should be gone") + } + if _, err := store.Resolve(freshRef); err != nil { + t.Errorf("fresh ref should still resolve: %v", err) + } +} + +func TestCleanExpiredCleansEmptyScopes(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + // Store old entry as the only one in scope + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + path := createTempFile(t, dir, "only.jpg") + store.Store(path, MediaMeta{Source: "test"}, "lonely_scope") + + store.nowFunc = func() time.Time { return now } + store.CleanExpired() + + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.scopeToRefs["lonely_scope"]; ok { + t.Error("empty scope should be cleaned up") + } +} + +func TestStartStopLifecycle(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: time.Minute, + Interval: 50 * time.Millisecond, + }) + + // Start and stop should not panic + store.Start() + time.Sleep(100 * time.Millisecond) + store.Stop() + + // Double stop should not panic + store.Stop() +} + +func TestStartDisabledIsNoop(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: false, + MaxAge: time.Minute, + Interval: time.Minute, + }) + // Should not start any goroutine or panic + store.Start() + store.Stop() +} + +func TestConcurrentCleanupSafety(t *testing.T) { + dir := t.TempDir() + store := newTestStoreWithCleanup(50 * time.Millisecond) + store.nowFunc = time.Now + + const workers = 10 + const ops = 20 + var wg sync.WaitGroup + wg.Add(workers * 4) + + // Store workers + for w := 0; w < workers; w++ { + go func(wIdx int) { + defer wg.Done() + scope := fmt.Sprintf("scope-%d", wIdx) + for i := 0; i < ops; i++ { + p := createTempFile(t, dir, fmt.Sprintf("w%d-f%d.tmp", wIdx, i)) + store.Store(p, MediaMeta{Source: "test"}, scope) + } + }(w) + } + + // Resolve workers + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < ops; i++ { + store.Resolve("media://nonexistent") + } + }() + } + + // ReleaseAll workers + for w := 0; w < workers; w++ { + go func(wIdx int) { + defer wg.Done() + for i := 0; i < ops; i++ { + store.ReleaseAll(fmt.Sprintf("scope-%d", wIdx)) + } + }(w) + } + + // CleanExpired workers + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < ops; i++ { + store.CleanExpired() + } + }() + } + + wg.Wait() +} + +func TestRefToScopeConsistency(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + // Store entries in two scopes + ref1, _ := store.Store(createTempFile(t, dir, "a.jpg"), MediaMeta{Source: "test"}, "s1") + ref2, _ := store.Store(createTempFile(t, dir, "b.jpg"), MediaMeta{Source: "test"}, "s1") + ref3, _ := store.Store(createTempFile(t, dir, "c.jpg"), MediaMeta{Source: "test"}, "s2") + + store.mu.RLock() + checkRef := func(ref, expectedScope string) { + t.Helper() + if scope, ok := store.refToScope[ref]; !ok || scope != expectedScope { + t.Errorf("refToScope[%s] = %q, want %q", ref, scope, expectedScope) + } + } + checkRef(ref1, "s1") + checkRef(ref2, "s1") + checkRef(ref3, "s2") + store.mu.RUnlock() + + // Release s1 and verify refToScope is cleaned + store.ReleaseAll("s1") + + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.refToScope[ref1]; ok { + t.Error("refToScope should not contain ref1 after ReleaseAll") + } + if _, ok := store.refToScope[ref2]; ok { + t.Error("refToScope should not contain ref2 after ReleaseAll") + } + if _, ok := store.refToScope[ref3]; !ok { + t.Error("refToScope should still contain ref3") + } +} From 9a16f3b1112f1475c2e1c6b4ae4bfa621991dcbc Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Thu, 26 Feb 2026 16:22:49 +0900 Subject: [PATCH 2/7] fix(media): address review comments on TTL cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CleanExpired: split into two phases — collect expired entries under lock, then delete files after releasing the lock to minimize contention - CleanExpired: guard against zero MaxAge (no-op if unconfigured) - CleanExpired: log file removal errors instead of silently ignoring - Start: protect with startOnce to prevent multiple goroutines - Stop: rename once -> stopOnce for clarity - cmd_gateway: call mediaStore.Stop() on error path after Start() - Add TestCleanExpiredZeroMaxAge and double-Start test Co-Authored-By: Claude Opus 4.6 --- cmd/picoclaw/cmd_gateway.go | 1 + pkg/media/store.go | 74 +++++++++++++++++++++++-------------- pkg/media/store_test.go | 23 ++++++++++++ 3 files changed, 70 insertions(+), 28 deletions(-) diff --git a/cmd/picoclaw/cmd_gateway.go b/cmd/picoclaw/cmd_gateway.go index 2ec60317f..e118d20d8 100644 --- a/cmd/picoclaw/cmd_gateway.go +++ b/cmd/picoclaw/cmd_gateway.go @@ -132,6 +132,7 @@ func gatewayCmd() { channelManager, err := channels.NewManager(cfg, msgBus, mediaStore) if err != nil { + mediaStore.Stop() fmt.Printf("Error creating channel manager: %v\n", err) os.Exit(1) } diff --git a/pkg/media/store.go b/pkg/media/store.go index 6851f9512..c6a6502a1 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -59,7 +59,8 @@ type FileMediaStore struct { cleanerCfg MediaCleanerConfig stop chan struct{} - once sync.Once + startOnce sync.Once + stopOnce sync.Once nowFunc func() time.Time // for testing } @@ -155,20 +156,26 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { } // CleanExpired removes all entries older than MaxAge. -// Both the file on disk and the in-memory references are deleted atomically -// under the same mutex, preventing dangling references. +// Phase 1 (under lock): identify expired entries and remove from maps. +// Phase 2 (no lock): delete files from disk to minimize lock contention. func (s *FileMediaStore) CleanExpired() int { - s.mu.Lock() - defer s.mu.Unlock() + if s.cleanerCfg.MaxAge <= 0 { + return 0 + } + + // Phase 1: collect expired entries under lock + type expiredEntry struct { + ref string + path string + } + s.mu.Lock() cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge) - removed := 0 + var expired []expiredEntry for ref, entry := range s.refs { if entry.storedAt.Before(cutoff) { - if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) { - // Log but continue — best effort cleanup - } + expired = append(expired, expiredEntry{ref: ref, path: entry.path}) scope := s.refToScope[ref] if scopeRefs, ok := s.scopeToRefs[scope]; ok { @@ -180,45 +187,56 @@ func (s *FileMediaStore) CleanExpired() int { delete(s.refs, ref) delete(s.refToScope, ref) - removed++ + } + } + s.mu.Unlock() + + // Phase 2: delete files without holding the lock + for _, e := range expired { + if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) { + log.Printf("[media] cleanup: failed to remove %s: %v", e.path, err) } } - return removed + return len(expired) } // Start begins the background cleanup goroutine if cleanup is enabled. +// Safe to call multiple times; only the first call starts the goroutine. func (s *FileMediaStore) Start() { if !s.cleanerCfg.Enabled || s.stop == nil { return } - log.Printf("[media] cleanup enabled: interval=%s, max_age=%s", - s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) - - go func() { - ticker := time.NewTicker(s.cleanerCfg.Interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if n := s.CleanExpired(); n > 0 { - log.Printf("[media] cleanup: removed %d expired entries", n) + s.startOnce.Do(func() { + log.Printf("[media] cleanup enabled: interval=%s, max_age=%s", + s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) + + go func() { + ticker := time.NewTicker(s.cleanerCfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if n := s.CleanExpired(); n > 0 { + log.Printf("[media] cleanup: removed %d expired entries", n) + } + case <-s.stop: + return } - case <-s.stop: - return } - } - }() + }() + }) } // Stop terminates the background cleanup goroutine. +// Safe to call multiple times; only the first call closes the channel. func (s *FileMediaStore) Stop() { if s.stop == nil { return } - s.once.Do(func() { + s.stopOnce.Do(func() { close(s.stop) }) } diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index e934bf9c0..e45212671 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -345,6 +345,8 @@ func TestStartStopLifecycle(t *testing.T) { // Start and stop should not panic store.Start() + // Double start should not spawn a second goroutine + store.Start() time.Sleep(100 * time.Millisecond) store.Stop() @@ -352,6 +354,27 @@ func TestStartStopLifecycle(t *testing.T) { store.Stop() } +func TestCleanExpiredZeroMaxAge(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: 0, + Interval: time.Hour, + }) + + dir := t.TempDir() + path := createTempFile(t, dir, "file.jpg") + ref, _ := store.Store(path, MediaMeta{Source: "test"}, "scope1") + + // Zero MaxAge should be a no-op + removed := store.CleanExpired() + if removed != 0 { + t.Errorf("expected 0 removed with zero MaxAge, got %d", removed) + } + if _, err := store.Resolve(ref); err != nil { + t.Errorf("ref should still resolve: %v", err) + } +} + func TestStartDisabledIsNoop(t *testing.T) { store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ Enabled: false, From 8e2064468d6a2eff3571495e4340993f94f525a8 Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Thu, 26 Feb 2026 16:33:32 +0900 Subject: [PATCH 3/7] fix(media): guard Interval<=0 panic, two-phase ReleaseAll Address Codex (GPT-5.2) review feedback: - Start: guard against Interval<=0 or MaxAge<=0 to prevent time.NewTicker panic on misconfiguration - ReleaseAll: split into two phases (collect under lock, delete after unlock) matching CleanExpired pattern - ReleaseAll: log file removal errors - Add TestStartZeroIntervalNoPanic and TestStartZeroMaxAgeNoPanic Co-Authored-By: Claude Opus 4.6 --- pkg/media/store.go | 27 +++++++++++++++++++++------ pkg/media/store_test.go | 21 +++++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/pkg/media/store.go b/pkg/media/store.go index c6a6502a1..c4cd816d2 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -132,26 +132,36 @@ func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error) } // ReleaseAll removes all files under the given scope and cleans up mappings. +// Phase 1 (under lock): remove entries from maps. +// Phase 2 (no lock): delete files from disk. func (s *FileMediaStore) ReleaseAll(scope string) error { - s.mu.Lock() - defer s.mu.Unlock() + // Phase 1: collect paths and remove from maps under lock + var paths []string + s.mu.Lock() refs, ok := s.scopeToRefs[scope] if !ok { + s.mu.Unlock() return nil } for ref := range refs { if entry, exists := s.refs[ref]; exists { - if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) { - // Log but continue — best effort cleanup - } + paths = append(paths, entry.path) delete(s.refs, ref) delete(s.refToScope, ref) } } - delete(s.scopeToRefs, scope) + s.mu.Unlock() + + // Phase 2: delete files without holding the lock + for _, p := range paths { + if err := os.Remove(p); err != nil && !os.IsNotExist(err) { + log.Printf("[media] release: failed to remove %s: %v", p, err) + } + } + return nil } @@ -207,6 +217,11 @@ func (s *FileMediaStore) Start() { if !s.cleanerCfg.Enabled || s.stop == nil { return } + if s.cleanerCfg.Interval <= 0 || s.cleanerCfg.MaxAge <= 0 { + log.Printf("[media] cleanup: skipped (interval=%s, max_age=%s)", + s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) + return + } s.startOnce.Do(func() { log.Printf("[media] cleanup enabled: interval=%s, max_age=%s", diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index e45212671..b14e1ff9a 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -386,6 +386,27 @@ func TestStartDisabledIsNoop(t *testing.T) { store.Stop() } +func TestStartZeroIntervalNoPanic(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: time.Minute, + Interval: 0, + }) + // Zero interval should not panic (time.NewTicker panics on <= 0) + store.Start() + store.Stop() +} + +func TestStartZeroMaxAgeNoPanic(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: 0, + Interval: time.Minute, + }) + store.Start() + store.Stop() +} + func TestConcurrentCleanupSafety(t *testing.T) { dir := t.TempDir() store := newTestStoreWithCleanup(50 * time.Millisecond) From ad736d71cb603cf1f1d20ea17758f95be26952cc Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Thu, 26 Feb 2026 16:43:41 +0900 Subject: [PATCH 4/7] feat(line): add StartTyping and PlaceholderRecorder integration Implement TypingCapable interface for LINE channel using the loading animation API (1:1 chats only, no group support). - Add StartTyping() with 50s periodic refresh and context-based stop - Integrate PlaceholderRecorder.RecordTypingStop in processEvent - Skip RecordPlaceholder (LINE has no message edit API) - Change sendLoading to accept context and return error - Relax callAPI status check from 200 to 2xx range Design consulted with Codex (GPT-5.2). Co-Authored-By: Claude Opus 4.6 --- pkg/channels/line/line.go | 74 +++++++++++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index 28d5ad8f7..91ebc7f72 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -368,9 +368,6 @@ func (c *LINEChannel) processEvent(event lineEvent) { "preview": utils.Truncate(content, 50), }) - // Show typing/loading indicator (requires user ID, not group ID) - c.sendLoading(senderID) - sender := bus.SenderInfo{ Platform: "line", PlatformID: senderID, @@ -381,6 +378,30 @@ func (c *LINEChannel) processEvent(event lineEvent) { return } + // Thinking indicator (LINE loading animation is 1:1 only). + // For group/room chats, LINE provides no equivalent API. + if !isGroup { + typingCtx, typingCancel := context.WithTimeout(c.ctx, 5*time.Minute) + stop, err := c.StartTyping(typingCtx, chatID) + if err == nil { + var stopOnce sync.Once + stopFn := func() { + stopOnce.Do(func() { + stop() + typingCancel() + }) + } + if rec := c.GetPlaceholderRecorder(); rec != nil { + rec.RecordTypingStop("line", chatID, stopFn) + } else { + // No recorder — stop immediately to avoid goroutine leaks. + stopFn() + } + } else { + typingCancel() + } + } + c.HandleMessage(c.ctx, peer, msg.ID, senderID, chatID, content, mediaPaths, metadata, sender) } @@ -577,17 +598,50 @@ func (c *LINEChannel) sendPush(ctx context.Context, to, content, quoteToken stri return c.callAPI(ctx, linePushEndpoint, payload) } +// StartTyping implements channels.TypingCapable using LINE's loading animation. +// +// NOTE: The LINE loading animation API only works for 1:1 chats. Callers must ensure +// the provided chatID is a user chat ID (not a group/room ID). +// There is no explicit "stop" API; we periodically re-send start requests to keep +// the indicator alive, and stop by canceling the context. +func (c *LINEChannel) StartTyping(ctx context.Context, chatID string) (func(), error) { + if chatID == "" { + return func() {}, nil + } + + typingCtx, cancel := context.WithCancel(ctx) + var once sync.Once + stop := func() { once.Do(cancel) } + + // Send immediately, then refresh periodically for long-running tasks. + if err := c.sendLoading(typingCtx, chatID); err != nil { + stop() + return stop, err + } + + ticker := time.NewTicker(50 * time.Second) + go func() { + defer ticker.Stop() + for { + select { + case <-typingCtx.Done(): + return + case <-ticker.C: + _ = c.sendLoading(typingCtx, chatID) + } + } + }() + + return stop, nil +} + // sendLoading sends a loading animation indicator to the chat. -func (c *LINEChannel) sendLoading(chatID string) { +func (c *LINEChannel) sendLoading(ctx context.Context, chatID string) error { payload := map[string]any{ "chatId": chatID, "loadingSeconds": 60, } - if err := c.callAPI(c.ctx, lineLoadingEndpoint, payload); err != nil { - logger.DebugCF("line", "Failed to send loading indicator", map[string]any{ - "error": err.Error(), - }) - } + return c.callAPI(ctx, lineLoadingEndpoint, payload) } // callAPI makes an authenticated POST request to the LINE API. @@ -612,7 +666,7 @@ func (c *LINEChannel) callAPI(ctx context.Context, endpoint string, payload any) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + if resp.StatusCode < 200 || resp.StatusCode >= 300 { respBody, _ := io.ReadAll(resp.Body) return channels.ClassifySendError(resp.StatusCode, fmt.Errorf("LINE API error: %s", string(respBody))) } From af9754097c96be3a4e1a5c39755dc15e0bcbc0ec Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Thu, 26 Feb 2026 20:17:45 +0900 Subject: [PATCH 5/7] Revert "feat(line): add StartTyping and PlaceholderRecorder integration" This reverts commit ad736d71cb603cf1f1d20ea17758f95be26952cc. --- pkg/channels/line/line.go | 74 ++++++--------------------------------- 1 file changed, 10 insertions(+), 64 deletions(-) diff --git a/pkg/channels/line/line.go b/pkg/channels/line/line.go index 91ebc7f72..28d5ad8f7 100644 --- a/pkg/channels/line/line.go +++ b/pkg/channels/line/line.go @@ -368,6 +368,9 @@ func (c *LINEChannel) processEvent(event lineEvent) { "preview": utils.Truncate(content, 50), }) + // Show typing/loading indicator (requires user ID, not group ID) + c.sendLoading(senderID) + sender := bus.SenderInfo{ Platform: "line", PlatformID: senderID, @@ -378,30 +381,6 @@ func (c *LINEChannel) processEvent(event lineEvent) { return } - // Thinking indicator (LINE loading animation is 1:1 only). - // For group/room chats, LINE provides no equivalent API. - if !isGroup { - typingCtx, typingCancel := context.WithTimeout(c.ctx, 5*time.Minute) - stop, err := c.StartTyping(typingCtx, chatID) - if err == nil { - var stopOnce sync.Once - stopFn := func() { - stopOnce.Do(func() { - stop() - typingCancel() - }) - } - if rec := c.GetPlaceholderRecorder(); rec != nil { - rec.RecordTypingStop("line", chatID, stopFn) - } else { - // No recorder — stop immediately to avoid goroutine leaks. - stopFn() - } - } else { - typingCancel() - } - } - c.HandleMessage(c.ctx, peer, msg.ID, senderID, chatID, content, mediaPaths, metadata, sender) } @@ -598,50 +577,17 @@ func (c *LINEChannel) sendPush(ctx context.Context, to, content, quoteToken stri return c.callAPI(ctx, linePushEndpoint, payload) } -// StartTyping implements channels.TypingCapable using LINE's loading animation. -// -// NOTE: The LINE loading animation API only works for 1:1 chats. Callers must ensure -// the provided chatID is a user chat ID (not a group/room ID). -// There is no explicit "stop" API; we periodically re-send start requests to keep -// the indicator alive, and stop by canceling the context. -func (c *LINEChannel) StartTyping(ctx context.Context, chatID string) (func(), error) { - if chatID == "" { - return func() {}, nil - } - - typingCtx, cancel := context.WithCancel(ctx) - var once sync.Once - stop := func() { once.Do(cancel) } - - // Send immediately, then refresh periodically for long-running tasks. - if err := c.sendLoading(typingCtx, chatID); err != nil { - stop() - return stop, err - } - - ticker := time.NewTicker(50 * time.Second) - go func() { - defer ticker.Stop() - for { - select { - case <-typingCtx.Done(): - return - case <-ticker.C: - _ = c.sendLoading(typingCtx, chatID) - } - } - }() - - return stop, nil -} - // sendLoading sends a loading animation indicator to the chat. -func (c *LINEChannel) sendLoading(ctx context.Context, chatID string) error { +func (c *LINEChannel) sendLoading(chatID string) { payload := map[string]any{ "chatId": chatID, "loadingSeconds": 60, } - return c.callAPI(ctx, lineLoadingEndpoint, payload) + if err := c.callAPI(c.ctx, lineLoadingEndpoint, payload); err != nil { + logger.DebugCF("line", "Failed to send loading indicator", map[string]any{ + "error": err.Error(), + }) + } } // callAPI makes an authenticated POST request to the LINE API. @@ -666,7 +612,7 @@ func (c *LINEChannel) callAPI(ctx context.Context, endpoint string, payload any) } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { + if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) return channels.ClassifySendError(resp.StatusCode, fmt.Errorf("LINE API error: %s", string(respBody))) } From 13a8ae47007211fc15aa9565820d1ca7df4c5680 Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Thu, 26 Feb 2026 22:39:58 +0900 Subject: [PATCH 6/7] fix(media): use project logger and harden map cleanup - Replace stdlib log.Printf with logger.InfoCF/WarnCF for consistency with the rest of the codebase (addresses @nikolasdehor review point #3) - ReleaseAll: clean refToScope/refs mappings even if refs entry is missing - CleanExpired: guard refToScope lookup before scope cleanup - Add TestReleaseAllCleansMappingsIfRefsMissing for robustness Co-Authored-By: Claude Opus 4.6 --- pkg/media/store.go | 43 +++++++++++++++++++++++++++-------------- pkg/media/store_test.go | 30 ++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/pkg/media/store.go b/pkg/media/store.go index c4cd816d2..902bfa540 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -2,12 +2,12 @@ package media import ( "fmt" - "log" "os" "sync" "time" "github.com/google/uuid" + "github.com/sipeed/picoclaw/pkg/logger" ) // MediaMeta holds metadata about a stored media file. @@ -148,9 +148,9 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { for ref := range refs { if entry, exists := s.refs[ref]; exists { paths = append(paths, entry.path) - delete(s.refs, ref) - delete(s.refToScope, ref) } + delete(s.refs, ref) + delete(s.refToScope, ref) } delete(s.scopeToRefs, scope) s.mu.Unlock() @@ -158,7 +158,10 @@ func (s *FileMediaStore) ReleaseAll(scope string) error { // Phase 2: delete files without holding the lock for _, p := range paths { if err := os.Remove(p); err != nil && !os.IsNotExist(err) { - log.Printf("[media] release: failed to remove %s: %v", p, err) + logger.WarnCF("media", "release: failed to remove file", map[string]any{ + "path": p, + "error": err.Error(), + }) } } @@ -187,11 +190,12 @@ func (s *FileMediaStore) CleanExpired() int { if entry.storedAt.Before(cutoff) { expired = append(expired, expiredEntry{ref: ref, path: entry.path}) - scope := s.refToScope[ref] - if scopeRefs, ok := s.scopeToRefs[scope]; ok { - delete(scopeRefs, ref) - if len(scopeRefs) == 0 { - delete(s.scopeToRefs, scope) + if scope, ok := s.refToScope[ref]; ok { + if scopeRefs, ok := s.scopeToRefs[scope]; ok { + delete(scopeRefs, ref) + if len(scopeRefs) == 0 { + delete(s.scopeToRefs, scope) + } } } @@ -204,7 +208,10 @@ func (s *FileMediaStore) CleanExpired() int { // Phase 2: delete files without holding the lock for _, e := range expired { if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) { - log.Printf("[media] cleanup: failed to remove %s: %v", e.path, err) + logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{ + "path": e.path, + "error": err.Error(), + }) } } @@ -218,14 +225,18 @@ func (s *FileMediaStore) Start() { return } if s.cleanerCfg.Interval <= 0 || s.cleanerCfg.MaxAge <= 0 { - log.Printf("[media] cleanup: skipped (interval=%s, max_age=%s)", - s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) + logger.WarnCF("media", "cleanup: skipped due to invalid config", map[string]any{ + "interval": s.cleanerCfg.Interval.String(), + "max_age": s.cleanerCfg.MaxAge.String(), + }) return } s.startOnce.Do(func() { - log.Printf("[media] cleanup enabled: interval=%s, max_age=%s", - s.cleanerCfg.Interval, s.cleanerCfg.MaxAge) + logger.InfoCF("media", "cleanup enabled", map[string]any{ + "interval": s.cleanerCfg.Interval.String(), + "max_age": s.cleanerCfg.MaxAge.String(), + }) go func() { ticker := time.NewTicker(s.cleanerCfg.Interval) @@ -235,7 +246,9 @@ func (s *FileMediaStore) Start() { select { case <-ticker.C: if n := s.CleanExpired(); n > 0 { - log.Printf("[media] cleanup: removed %d expired entries", n) + logger.InfoCF("media", "cleanup: removed expired entries", map[string]any{ + "count": n, + }) } case <-s.stop: return diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index b14e1ff9a..989f90d7c 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -134,6 +134,36 @@ func TestReleaseAllIdempotent(t *testing.T) { } } +func TestReleaseAllCleansMappingsIfRefsMissing(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + path := createTempFile(t, dir, "file.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + // Simulate internal inconsistency: scopeToRefs/refToScope contains ref but refs map doesn't. + store.mu.Lock() + delete(store.refs, ref) + store.mu.Unlock() + + if err := store.ReleaseAll("scope1"); err != nil { + t.Fatalf("ReleaseAll failed: %v", err) + } + + // ReleaseAll should still clean mappings (even if it can't delete the file without the path). + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.refToScope[ref]; ok { + t.Error("refToScope should not contain ref after ReleaseAll") + } + if _, ok := store.scopeToRefs["scope1"]; ok { + t.Error("scopeToRefs should not contain scope1 after ReleaseAll") + } +} + func TestStoreNonexistentFile(t *testing.T) { store := NewFileMediaStore() From 897968348aa3ae05abfc0acdeb50c92a01dc531d Mon Sep 17 00:00:00 2001 From: ex-takashima Date: Thu, 26 Feb 2026 22:45:59 +0900 Subject: [PATCH 7/7] fix(media): separate import groups for gci linter Co-Authored-By: Claude Opus 4.6 --- pkg/media/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/media/store.go b/pkg/media/store.go index 902bfa540..30220986c 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "github.com/sipeed/picoclaw/pkg/logger" )