Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/picoclaw/cmd_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ func gatewayCmd() {
return tools.SilentResult(response)
})

// Create media store for file lifecycle management
mediaStore := media.NewFileMediaStore()
// Create media store for file lifecycle management with TTL cleanup
mediaStore := media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{
Enabled: cfg.Tools.MediaCleanup.Enabled,
MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute,
Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute,
})
mediaStore.Start()

channelManager, err := channels.NewManager(cfg, msgBus, mediaStore)
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there err path not close the goroutine?

mediaStore.Stop()
fmt.Printf("Error creating channel manager: %v\n", err)
os.Exit(1)
}
Expand Down Expand Up @@ -200,6 +206,7 @@ func gatewayCmd() {
deviceService.Stop()
heartbeatService.Stop()
cronService.Stop()
mediaStore.Stop()
agentLoop.Stop()
fmt.Println("✓ Gateway stopped")
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,18 @@ type ExecConfig struct {
CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"`
}

type MediaCleanupConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"`
MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"`
Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"`
}

type ToolsConfig struct {
Web WebToolsConfig `json:"web"`
Cron CronToolsConfig `json:"cron"`
Exec ExecConfig `json:"exec"`
Skills SkillsToolsConfig `json:"skills"`
Web WebToolsConfig `json:"web"`
Cron CronToolsConfig `json:"cron"`
Exec ExecConfig `json:"exec"`
Skills SkillsToolsConfig `json:"skills"`
MediaCleanup MediaCleanupConfig `json:"media_cleanup"`
}

type SkillsToolsConfig struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func DefaultConfig() *Config {
Port: 18790,
},
Tools: ToolsConfig{
MediaCleanup: MediaCleanupConfig{
Enabled: true,
MaxAge: 30,
Interval: 5,
},
Web: WebToolsConfig{
Brave: BraveConfig{
Enabled: false,
Expand Down
168 changes: 158 additions & 10 deletions pkg/media/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/google/uuid"

"github.com/sipeed/picoclaw/pkg/logger"
)

// MediaMeta holds metadata about a stored media file.
Expand Down Expand Up @@ -35,8 +38,16 @@ type MediaStore interface {

// mediaEntry holds the path and metadata for a stored media file.
type mediaEntry struct {
path string
meta MediaMeta
path string
meta MediaMeta
storedAt time.Time
}

// MediaCleanerConfig configures the background TTL cleanup.
type MediaCleanerConfig struct {
Enabled bool
MaxAge time.Duration
Interval time.Duration
}

// FileMediaStore is a pure in-memory implementation of MediaStore.
Expand All @@ -45,13 +56,34 @@ type FileMediaStore struct {
mu sync.RWMutex
refs map[string]mediaEntry
scopeToRefs map[string]map[string]struct{}
refToScope map[string]string

cleanerCfg MediaCleanerConfig
stop chan struct{}
startOnce sync.Once
stopOnce sync.Once
nowFunc func() time.Time // for testing
}

// NewFileMediaStore creates a new FileMediaStore.
// NewFileMediaStore creates a new FileMediaStore without background cleanup.
func NewFileMediaStore() *FileMediaStore {
return &FileMediaStore{
refs: make(map[string]mediaEntry),
scopeToRefs: make(map[string]map[string]struct{}),
refToScope: make(map[string]string),
nowFunc: time.Now,
}
}

// NewFileMediaStoreWithCleanup creates a FileMediaStore with TTL-based background cleanup.
func NewFileMediaStoreWithCleanup(cfg MediaCleanerConfig) *FileMediaStore {
return &FileMediaStore{
refs: make(map[string]mediaEntry),
scopeToRefs: make(map[string]map[string]struct{}),
refToScope: make(map[string]string),
cleanerCfg: cfg,
stop: make(chan struct{}),
nowFunc: time.Now,
}
}

Expand All @@ -66,11 +98,12 @@ func (s *FileMediaStore) Store(localPath string, meta MediaMeta, scope string) (
s.mu.Lock()
defer s.mu.Unlock()

s.refs[ref] = mediaEntry{path: localPath, meta: meta}
s.refs[ref] = mediaEntry{path: localPath, meta: meta, storedAt: s.nowFunc()}
if s.scopeToRefs[scope] == nil {
s.scopeToRefs[scope] = make(map[string]struct{})
}
s.scopeToRefs[scope][ref] = struct{}{}
s.refToScope[ref] = scope

return ref, nil
}
Expand Down Expand Up @@ -100,24 +133,139 @@ func (s *FileMediaStore) ResolveWithMeta(ref string) (string, MediaMeta, error)
}

// ReleaseAll removes all files under the given scope and cleans up mappings.
// Phase 1 (under lock): remove entries from maps.
// Phase 2 (no lock): delete files from disk.
func (s *FileMediaStore) ReleaseAll(scope string) error {
s.mu.Lock()
defer s.mu.Unlock()
// Phase 1: collect paths and remove from maps under lock
var paths []string

s.mu.Lock()
refs, ok := s.scopeToRefs[scope]
if !ok {
s.mu.Unlock()
return nil
}

for ref := range refs {
if entry, exists := s.refs[ref]; exists {
if err := os.Remove(entry.path); err != nil && !os.IsNotExist(err) {
// Log but continue — best effort cleanup
paths = append(paths, entry.path)
}
delete(s.refs, ref)
delete(s.refToScope, ref)
}
delete(s.scopeToRefs, scope)
s.mu.Unlock()

// Phase 2: delete files without holding the lock
for _, p := range paths {
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
logger.WarnCF("media", "release: failed to remove file", map[string]any{
"path": p,
"error": err.Error(),
})
}
}

return nil
}

// CleanExpired removes all entries older than MaxAge.
// Phase 1 (under lock): identify expired entries and remove from maps.
// Phase 2 (no lock): delete files from disk to minimize lock contention.
func (s *FileMediaStore) CleanExpired() int {
if s.cleanerCfg.MaxAge <= 0 {
return 0
}

// Phase 1: collect expired entries under lock
type expiredEntry struct {
ref string
path string
}

s.mu.Lock()
cutoff := s.nowFunc().Add(-s.cleanerCfg.MaxAge)
var expired []expiredEntry

for ref, entry := range s.refs {
if entry.storedAt.Before(cutoff) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and should cutoff >= 0 ?

expired = append(expired, expiredEntry{ref: ref, path: entry.path})

if scope, ok := s.refToScope[ref]; ok {
if scopeRefs, ok := s.scopeToRefs[scope]; ok {
delete(scopeRefs, ref)
if len(scopeRefs) == 0 {
delete(s.scopeToRefs, scope)
}
}
}

delete(s.refs, ref)
delete(s.refToScope, ref)
}
}
s.mu.Unlock()

delete(s.scopeToRefs, scope)
return nil
// Phase 2: delete files without holding the lock
for _, e := range expired {
if err := os.Remove(e.path); err != nil && !os.IsNotExist(err) {
logger.WarnCF("media", "cleanup: failed to remove file", map[string]any{
"path": e.path,
"error": err.Error(),
})
}
}

return len(expired)
}

// Start begins the background cleanup goroutine if cleanup is enabled.
// Safe to call multiple times; only the first call starts the goroutine.
func (s *FileMediaStore) Start() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start function need match the Stop function also using s.once.Do to dispatch goroutine.

if !s.cleanerCfg.Enabled || s.stop == nil {
return
}
if s.cleanerCfg.Interval <= 0 || s.cleanerCfg.MaxAge <= 0 {
logger.WarnCF("media", "cleanup: skipped due to invalid config", map[string]any{
"interval": s.cleanerCfg.Interval.String(),
"max_age": s.cleanerCfg.MaxAge.String(),
})
return
}

s.startOnce.Do(func() {
logger.InfoCF("media", "cleanup enabled", map[string]any{
"interval": s.cleanerCfg.Interval.String(),
"max_age": s.cleanerCfg.MaxAge.String(),
})

go func() {
ticker := time.NewTicker(s.cleanerCfg.Interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if n := s.CleanExpired(); n > 0 {
logger.InfoCF("media", "cleanup: removed expired entries", map[string]any{
"count": n,
})
}
case <-s.stop:
return
}
}
}()
})
}

// Stop terminates the background cleanup goroutine.
// Safe to call multiple times; only the first call closes the channel.
func (s *FileMediaStore) Stop() {
if s.stop == nil {
return
}
s.stopOnce.Do(func() {
close(s.stop)
})
}
Loading