diff --git a/cmd/picoclaw/cmd_gateway.go b/cmd/picoclaw/cmd_gateway.go index 798ad2813..e118d20d8 100644 --- a/cmd/picoclaw/cmd_gateway.go +++ b/cmd/picoclaw/cmd_gateway.go @@ -122,11 +122,17 @@ func gatewayCmd() { return tools.SilentResult(response) }) - // Create media store for file lifecycle management - mediaStore := media.NewFileMediaStore() + // Create media store for file lifecycle management with TTL cleanup + mediaStore := media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ + Enabled: cfg.Tools.MediaCleanup.Enabled, + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, + }) + mediaStore.Start() channelManager, err := channels.NewManager(cfg, msgBus, mediaStore) if err != nil { + mediaStore.Stop() fmt.Printf("Error creating channel manager: %v\n", err) os.Exit(1) } @@ -200,6 +206,7 @@ func gatewayCmd() { deviceService.Stop() heartbeatService.Stop() cronService.Stop() + mediaStore.Stop() agentLoop.Stop() fmt.Println("✓ Gateway stopped") } diff --git a/pkg/config/config.go b/pkg/config/config.go index fd5def625..eef9d9261 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -504,11 +504,18 @@ type ExecConfig struct { CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"` } +type MediaCleanupConfig struct { + Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"` + MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"` + Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"` +} + type ToolsConfig struct { - Web WebToolsConfig `json:"web"` - Cron CronToolsConfig `json:"cron"` - Exec ExecConfig `json:"exec"` - Skills SkillsToolsConfig `json:"skills"` + Web WebToolsConfig `json:"web"` + Cron CronToolsConfig `json:"cron"` + Exec ExecConfig `json:"exec"` + Skills SkillsToolsConfig `json:"skills"` + MediaCleanup MediaCleanupConfig `json:"media_cleanup"` } type SkillsToolsConfig struct { diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 604b53e24..fb2e8ae22 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -291,6 +291,11 @@ func DefaultConfig() *Config { Port: 18790, }, Tools: ToolsConfig{ + MediaCleanup: MediaCleanupConfig{ + Enabled: true, + MaxAge: 30, + Interval: 5, + }, Web: WebToolsConfig{ Brave: BraveConfig{ Enabled: false, diff --git a/pkg/media/store.go b/pkg/media/store.go index 2df4420e9..30220986c 100644 --- a/pkg/media/store.go +++ b/pkg/media/store.go @@ -4,8 +4,11 @@ import ( "fmt" "os" "sync" + "time" "github.com/google/uuid" + + "github.com/sipeed/picoclaw/pkg/logger" ) // MediaMeta holds metadata about a stored media file. @@ -35,8 +38,16 @@ type MediaStore interface { // mediaEntry holds the path and metadata for a stored media file. type mediaEntry struct { - path string - meta MediaMeta + path string + meta MediaMeta + storedAt time.Time +} + +// MediaCleanerConfig configures the background TTL cleanup. +type MediaCleanerConfig struct { + Enabled bool + MaxAge time.Duration + Interval time.Duration } // FileMediaStore is a pure in-memory implementation of MediaStore. @@ -45,13 +56,34 @@ type FileMediaStore struct { mu sync.RWMutex refs map[string]mediaEntry scopeToRefs map[string]map[string]struct{} + refToScope map[string]string + + cleanerCfg MediaCleanerConfig + stop chan struct{} + startOnce sync.Once + stopOnce sync.Once + nowFunc func() time.Time // for testing } -// NewFileMediaStore creates a new FileMediaStore. +// NewFileMediaStore creates a new FileMediaStore without background cleanup. func NewFileMediaStore() *FileMediaStore { return &FileMediaStore{ refs: make(map[string]mediaEntry), scopeToRefs: make(map[string]map[string]struct{}), + refToScope: make(map[string]string), + nowFunc: time.Now, + } +} + +// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup. +func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore { + return &FileMediaStore{ + refs: make(map[string]mediaEntry), + scopeToRefs: make(map[string]map[string]struct{}), + refToScope: make(map[string]string), + cleanerCfg: cfg, + stop: make(chan struct{}), + nowFunc: time.Now, } } @@ -66,11 +98,12 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) ( s.mu.Lock() defer s.mu.Unlock() - s.refs[ref] = mediaEntry{path: localPath, meta: meta} + s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()} if s.scopeToRefs[scope] == nil { s.scopeToRefs[scope] = make(map[string]struct{}) } s.scopeToRefs[scope][ref] = struct{}{} + s.refToScope[ref] = scope return ref, nil } @@ -100,24 +133,139 @@ func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error) } // ReleaseAll removes all files under the given scope and cleans up mappings. +// Phase 1 (under lock): remove entries from maps. +// Phase 2 (no lock): delete files from disk. func (s *FileMediaStore) ReleaseAll(scope string) error { - s.mu.Lock() - defer s.mu.Unlock() + // Phase 1: collect paths and remove from maps under lock + var paths []string + s.mu.Lock() refs, ok := s.scopeToRefs[scope] if !ok { + s.mu.Unlock() return nil } for ref := range refs { if entry, exists := s.refs[ref]; exists { - if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) { - // Log but continue — best effort cleanup + paths = append(paths, entry.path) + } + delete(s.refs, ref) + delete(s.refToScope, ref) + } + delete(s.scopeToRefs, scope) + s.mu.Unlock() + + // Phase 2: delete files without holding the lock + for _, p := range paths { + if err := os.Remove(p); err != nil && !os.IsNotExist(err) { + logger.WarnCF("media", "release: failed to remove file", map[string]any{ + "path": p, + "error": err.Error(), + }) + } + } + + return nil +} + +// CleanExpired removes all entries older than MaxAge. +// Phase 1 (under lock): identify expired entries and remove from maps. +// Phase 2 (no lock): delete files from disk to minimize lock contention. +func (s *FileMediaStore) CleanExpired() int { + if s.cleanerCfg.MaxAge <= 0 { + return 0 + } + + // Phase 1: collect expired entries under lock + type expiredEntry struct { + ref string + path string + } + + s.mu.Lock() + cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge) + var expired []expiredEntry + + for ref, entry := range s.refs { + if entry.storedAt.Before(cutoff) { + expired = append(expired, expiredEntry{ref: ref, path: entry.path}) + + if scope, ok := s.refToScope[ref]; ok { + if scopeRefs, ok := s.scopeToRefs[scope]; ok { + delete(scopeRefs, ref) + if len(scopeRefs) == 0 { + delete(s.scopeToRefs, scope) + } + } } + delete(s.refs, ref) + delete(s.refToScope, ref) } } + s.mu.Unlock() - delete(s.scopeToRefs, scope) - return nil + // Phase 2: delete files without holding the lock + for _, e := range expired { + if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) { + logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{ + "path": e.path, + "error": err.Error(), + }) + } + } + + return len(expired) +} + +// Start begins the background cleanup goroutine if cleanup is enabled. +// Safe to call multiple times; only the first call starts the goroutine. +func (s *FileMediaStore) Start() { + if !s.cleanerCfg.Enabled || s.stop == nil { + return + } + if s.cleanerCfg.Interval <= 0 || s.cleanerCfg.MaxAge <= 0 { + logger.WarnCF("media", "cleanup: skipped due to invalid config", map[string]any{ + "interval": s.cleanerCfg.Interval.String(), + "max_age": s.cleanerCfg.MaxAge.String(), + }) + return + } + + s.startOnce.Do(func() { + logger.InfoCF("media", "cleanup enabled", map[string]any{ + "interval": s.cleanerCfg.Interval.String(), + "max_age": s.cleanerCfg.MaxAge.String(), + }) + + go func() { + ticker := time.NewTicker(s.cleanerCfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if n := s.CleanExpired(); n > 0 { + logger.InfoCF("media", "cleanup: removed expired entries", map[string]any{ + "count": n, + }) + } + case <-s.stop: + return + } + } + }() + }) +} + +// Stop terminates the background cleanup goroutine. +// Safe to call multiple times; only the first call closes the channel. +func (s *FileMediaStore) Stop() { + if s.stop == nil { + return + } + s.stopOnce.Do(func() { + close(s.stop) + }) } diff --git a/pkg/media/store_test.go b/pkg/media/store_test.go index 95bd1eb7a..989f90d7c 100644 --- a/pkg/media/store_test.go +++ b/pkg/media/store_test.go @@ -1,11 +1,13 @@ package media import ( + "fmt" "os" "path/filepath" "strings" "sync" "testing" + "time" ) func createTempFile(t *testing.T, dir, name string) string { @@ -132,6 +134,36 @@ func TestReleaseAllIdempotent(t *testing.T) { } } +func TestReleaseAllCleansMappingsIfRefsMissing(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + path := createTempFile(t, dir, "file.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + // Simulate internal inconsistency: scopeToRefs/refToScope contains ref but refs map doesn't. + store.mu.Lock() + delete(store.refs, ref) + store.mu.Unlock() + + if err := store.ReleaseAll("scope1"); err != nil { + t.Fatalf("ReleaseAll failed: %v", err) + } + + // ReleaseAll should still clean mappings (even if it can't delete the file without the path). + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.refToScope[ref]; ok { + t.Error("refToScope should not contain ref after ReleaseAll") + } + if _, ok := store.scopeToRefs["scope1"]; ok { + t.Error("scopeToRefs should not contain scope1 after ReleaseAll") + } +} + func TestStoreNonexistentFile(t *testing.T) { store := NewFileMediaStore() @@ -140,7 +172,8 @@ func TestStoreNonexistentFile(t *testing.T) { t.Error("Store should fail for nonexistent file") } // Error message should include the underlying os error, not just "file does not exist" - if !strings.Contains(err.Error(), "no such file or directory") { + if !strings.Contains(err.Error(), "no such file or directory") && + !strings.Contains(err.Error(), "cannot find") { t.Errorf("Error should contain OS error detail, got: %v", err) } } @@ -221,3 +254,277 @@ func TestConcurrentSafety(t *testing.T) { wg.Wait() } + +// --- TTL cleanup tests --- + +func newTestStoreWithCleanup(maxAge time.Duration) *FileMediaStore { + s := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: maxAge, + Interval: time.Hour, // won't tick in tests + }) + return s +} + +func TestCleanExpiredRemovesOldEntries(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + + path := createTempFile(t, dir, "old.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + // Advance clock to present + store.nowFunc = func() time.Time { return now } + removed := store.CleanExpired() + + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + if _, err := store.Resolve(ref); err == nil { + t.Error("expired ref should be unresolvable") + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Error("expired file should be deleted") + } +} + +func TestCleanExpiredKeepsNonExpired(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + store.nowFunc = func() time.Time { return now } + + path := createTempFile(t, dir, "fresh.jpg") + ref, err := store.Store(path, MediaMeta{Source: "test"}, "scope1") + if err != nil { + t.Fatalf("Store failed: %v", err) + } + + removed := store.CleanExpired() + if removed != 0 { + t.Errorf("expected 0 removed, got %d", removed) + } + + if _, err := store.Resolve(ref); err != nil { + t.Errorf("fresh ref should still resolve: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Error("fresh file should still exist") + } +} + +func TestCleanExpiredMixedAges(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + // Store old entry + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + oldPath := createTempFile(t, dir, "old.jpg") + oldRef, _ := store.Store(oldPath, MediaMeta{Source: "test"}, "scope1") + + // Store fresh entry + store.nowFunc = func() time.Time { return now } + freshPath := createTempFile(t, dir, "fresh.jpg") + freshRef, _ := store.Store(freshPath, MediaMeta{Source: "test"}, "scope1") + + removed := store.CleanExpired() + if removed != 1 { + t.Errorf("expected 1 removed, got %d", removed) + } + + if _, err := store.Resolve(oldRef); err == nil { + t.Error("old ref should be gone") + } + if _, err := store.Resolve(freshRef); err != nil { + t.Errorf("fresh ref should still resolve: %v", err) + } +} + +func TestCleanExpiredCleansEmptyScopes(t *testing.T) { + dir := t.TempDir() + now := time.Now() + store := newTestStoreWithCleanup(10 * time.Minute) + + // Store old entry as the only one in scope + store.nowFunc = func() time.Time { return now.Add(-20 * time.Minute) } + path := createTempFile(t, dir, "only.jpg") + store.Store(path, MediaMeta{Source: "test"}, "lonely_scope") + + store.nowFunc = func() time.Time { return now } + store.CleanExpired() + + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.scopeToRefs["lonely_scope"]; ok { + t.Error("empty scope should be cleaned up") + } +} + +func TestStartStopLifecycle(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: time.Minute, + Interval: 50 * time.Millisecond, + }) + + // Start and stop should not panic + store.Start() + // Double start should not spawn a second goroutine + store.Start() + time.Sleep(100 * time.Millisecond) + store.Stop() + + // Double stop should not panic + store.Stop() +} + +func TestCleanExpiredZeroMaxAge(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: 0, + Interval: time.Hour, + }) + + dir := t.TempDir() + path := createTempFile(t, dir, "file.jpg") + ref, _ := store.Store(path, MediaMeta{Source: "test"}, "scope1") + + // Zero MaxAge should be a no-op + removed := store.CleanExpired() + if removed != 0 { + t.Errorf("expected 0 removed with zero MaxAge, got %d", removed) + } + if _, err := store.Resolve(ref); err != nil { + t.Errorf("ref should still resolve: %v", err) + } +} + +func TestStartDisabledIsNoop(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: false, + MaxAge: time.Minute, + Interval: time.Minute, + }) + // Should not start any goroutine or panic + store.Start() + store.Stop() +} + +func TestStartZeroIntervalNoPanic(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: time.Minute, + Interval: 0, + }) + // Zero interval should not panic (time.NewTicker panics on <= 0) + store.Start() + store.Stop() +} + +func TestStartZeroMaxAgeNoPanic(t *testing.T) { + store := NewFileMediaStoreWithCleanup(MediaCleanerConfig{ + Enabled: true, + MaxAge: 0, + Interval: time.Minute, + }) + store.Start() + store.Stop() +} + +func TestConcurrentCleanupSafety(t *testing.T) { + dir := t.TempDir() + store := newTestStoreWithCleanup(50 * time.Millisecond) + store.nowFunc = time.Now + + const workers = 10 + const ops = 20 + var wg sync.WaitGroup + wg.Add(workers * 4) + + // Store workers + for w := 0; w < workers; w++ { + go func(wIdx int) { + defer wg.Done() + scope := fmt.Sprintf("scope-%d", wIdx) + for i := 0; i < ops; i++ { + p := createTempFile(t, dir, fmt.Sprintf("w%d-f%d.tmp", wIdx, i)) + store.Store(p, MediaMeta{Source: "test"}, scope) + } + }(w) + } + + // Resolve workers + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < ops; i++ { + store.Resolve("media://nonexistent") + } + }() + } + + // ReleaseAll workers + for w := 0; w < workers; w++ { + go func(wIdx int) { + defer wg.Done() + for i := 0; i < ops; i++ { + store.ReleaseAll(fmt.Sprintf("scope-%d", wIdx)) + } + }(w) + } + + // CleanExpired workers + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < ops; i++ { + store.CleanExpired() + } + }() + } + + wg.Wait() +} + +func TestRefToScopeConsistency(t *testing.T) { + dir := t.TempDir() + store := NewFileMediaStore() + + // Store entries in two scopes + ref1, _ := store.Store(createTempFile(t, dir, "a.jpg"), MediaMeta{Source: "test"}, "s1") + ref2, _ := store.Store(createTempFile(t, dir, "b.jpg"), MediaMeta{Source: "test"}, "s1") + ref3, _ := store.Store(createTempFile(t, dir, "c.jpg"), MediaMeta{Source: "test"}, "s2") + + store.mu.RLock() + checkRef := func(ref, expectedScope string) { + t.Helper() + if scope, ok := store.refToScope[ref]; !ok || scope != expectedScope { + t.Errorf("refToScope[%s] = %q, want %q", ref, scope, expectedScope) + } + } + checkRef(ref1, "s1") + checkRef(ref2, "s1") + checkRef(ref3, "s2") + store.mu.RUnlock() + + // Release s1 and verify refToScope is cleaned + store.ReleaseAll("s1") + + store.mu.RLock() + defer store.mu.RUnlock() + if _, ok := store.refToScope[ref1]; ok { + t.Error("refToScope should not contain ref1 after ReleaseAll") + } + if _, ok := store.refToScope[ref2]; ok { + t.Error("refToScope should not contain ref2 after ReleaseAll") + } + if _, ok := store.refToScope[ref3]; !ok { + t.Error("refToScope should still contain ref3") + } +}