Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/picoclaw/cmd_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/state"
"github.com/sipeed/picoclaw/pkg/tools"
"github.com/sipeed/picoclaw/pkg/utils"
"github.com/sipeed/picoclaw/pkg/voice"
)

Expand Down Expand Up @@ -192,6 +193,15 @@ func gatewayCmd() {
fmt.Println("✓ Device event service started")
}

if cfg.Tools.MediaCleanup.Enabled {
mediaCleaner := utils.NewMediaCleaner(
cfg.Tools.MediaCleanup.Interval,
cfg.Tools.MediaCleanup.MaxAge,
)
mediaCleaner.Start()
defer mediaCleaner.Stop()
}

if err := channelManager.StartAll(ctx); err != nil {
fmt.Printf("Error starting channels: %v\n", err)
}
Expand Down
16 changes: 0 additions & 16 deletions pkg/channels/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -307,18 +306,6 @@ func (c *LINEChannel) processEvent(event lineEvent) {

var content string
var mediaPaths []string
localFiles := []string{}

defer func() {
for _, file := range localFiles {
if err := os.Remove(file); err != nil {
logger.DebugCF("line", "Failed to cleanup temp file", map[string]any{
"file": file,
"error": err.Error(),
})
}
}
}()

switch msg.Type {
case "text":
Expand All @@ -330,21 +317,18 @@ func (c *LINEChannel) processEvent(event lineEvent) {
case "image":
localPath := c.downloadContent(msg.ID, "image.jpg")
if localPath != "" {
localFiles = append(localFiles, localPath)
mediaPaths = append(mediaPaths, localPath)
content = "[image]"
}
case "audio":
localPath := c.downloadContent(msg.ID, "audio.m4a")
if localPath != "" {
localFiles = append(localFiles, localPath)
mediaPaths = append(mediaPaths, localPath)
content = "[audio]"
}
case "video":
localPath := c.downloadContent(msg.ID, "video.mp4")
if localPath != "" {
localFiles = append(localFiles, localPath)
mediaPaths = append(mediaPaths, localPath)
content = "[video]"
}
Expand Down
20 changes: 0 additions & 20 deletions pkg/channels/onebot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -571,7 +570,6 @@ type parseMessageResult struct {
Text string
IsBotMentioned bool
Media []string
LocalFiles []string
ReplyTo string
}

Expand Down Expand Up @@ -603,7 +601,6 @@ func (c *OneBotChannel) parseMessageSegments(raw json.RawMessage, selfID int64)
mentioned := false
selfIDStr := strconv.FormatInt(selfID, 10)
var media []string
var localFiles []string
var replyTo string

for _, seg := range segments {
Expand Down Expand Up @@ -642,7 +639,6 @@ func (c *OneBotChannel) parseMessageSegments(raw json.RawMessage, selfID int64)
})
if localPath != "" {
media = append(media, localPath)
localFiles = append(localFiles, localPath)
textParts = append(textParts, fmt.Sprintf("[%s]", segType))
}
}
Expand All @@ -656,7 +652,6 @@ func (c *OneBotChannel) parseMessageSegments(raw json.RawMessage, selfID int64)
LoggerPrefix: "onebot",
})
if localPath != "" {
localFiles = append(localFiles, localPath)
if c.transcriber != nil && c.transcriber.IsAvailable() {
tctx, tcancel := context.WithTimeout(c.ctx, 30*time.Second)
result, err := c.transcriber.Transcribe(tctx, localPath)
Expand Down Expand Up @@ -703,7 +698,6 @@ func (c *OneBotChannel) parseMessageSegments(raw json.RawMessage, selfID int64)
Text: strings.TrimSpace(strings.Join(textParts, "")),
IsBotMentioned: mentioned,
Media: media,
LocalFiles: localFiles,
ReplyTo: replyTo,
}
}
Expand Down Expand Up @@ -824,20 +818,6 @@ func (c *OneBotChannel) handleMessage(raw *oneBotRawEvent) {
}
}

// Clean up temp files when done
if len(parsed.LocalFiles) > 0 {
defer func() {
for _, f := range parsed.LocalFiles {
if err := os.Remove(f); err != nil {
logger.DebugCF("onebot", "Failed to remove temp file", map[string]any{
"path": f,
"error": err.Error(),
})
}
}
}()
}

if c.isDuplicate(messageID) {
logger.DebugCF("onebot", "Duplicate message, skipping", map[string]any{
"message_id": messageID,
Expand Down
15 changes: 0 additions & 15 deletions pkg/channels/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package channels
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -232,27 +231,13 @@ func (c *SlackChannel) handleMessageEvent(ev *slackevents.MessageEvent) {
content = c.stripBotMention(content)

var mediaPaths []string
localFiles := []string{} // 跟踪需要清理的本地文件

// 确保临时文件在函数返回时被清理
defer func() {
for _, file := range localFiles {
if err := os.Remove(file); err != nil {
logger.DebugCF("slack", "Failed to cleanup temp file", map[string]any{
"file": file,
"error": err.Error(),
})
}
}
}()

if ev.Message != nil && len(ev.Message.Files) > 0 {
for _, file := range ev.Message.Files {
localPath := c.downloadSlackFile(file)
if localPath == "" {
continue
}
localFiles = append(localFiles, localPath)
mediaPaths = append(mediaPaths, localPath)

if utils.IsAudioFile(file.Name, file.Mimetype) && c.transcriber != nil && c.transcriber.IsAvailable() {
Expand Down
17 changes: 0 additions & 17 deletions pkg/channels/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,6 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes

content := ""
mediaPaths := []string{}
localFiles := []string{} // 跟踪需要清理的本地文件

// 确保临时文件在函数返回时被清理
defer func() {
for _, file := range localFiles {
if err := os.Remove(file); err != nil {
logger.DebugCF("telegram", "Failed to cleanup temp file", map[string]any{
"file": file,
"error": err.Error(),
})
}
}
}()

if message.Text != "" {
content += message.Text
Expand All @@ -250,7 +237,6 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
photo := message.Photo[len(message.Photo)-1]
photoPath := c.downloadPhoto(ctx, photo.FileID)
if photoPath != "" {
localFiles = append(localFiles, photoPath)
mediaPaths = append(mediaPaths, photoPath)
if content != "" {
content += "\n"
Expand All @@ -262,7 +248,6 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
if message.Voice != nil {
voicePath := c.downloadFile(ctx, message.Voice.FileID, ".ogg")
if voicePath != "" {
localFiles = append(localFiles, voicePath)
mediaPaths = append(mediaPaths, voicePath)

transcribedText := ""
Expand Down Expand Up @@ -297,7 +282,6 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
if message.Audio != nil {
audioPath := c.downloadFile(ctx, message.Audio.FileID, ".mp3")
if audioPath != "" {
localFiles = append(localFiles, audioPath)
mediaPaths = append(mediaPaths, audioPath)
if content != "" {
content += "\n"
Expand All @@ -309,7 +293,6 @@ func (c *TelegramChannel) handleMessage(ctx context.Context, message *telego.Mes
if message.Document != nil {
docPath := c.downloadFile(ctx, message.Document.FileID, "")
if docPath != "" {
localFiles = append(localFiles, docPath)
mediaPaths = append(mediaPaths, docPath)
if content != "" {
content += "\n"
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,11 +452,18 @@ type ExecConfig struct {
CustomDenyPatterns []string `json:"custom_deny_patterns" env:"PICOCLAW_TOOLS_EXEC_CUSTOM_DENY_PATTERNS"`
}

type MediaCleanupConfig struct {
Enabled bool `json:"enabled" env:"PICOCLAW_MEDIA_CLEANUP_ENABLED"`
MaxAge int `json:"max_age_minutes" env:"PICOCLAW_MEDIA_CLEANUP_MAX_AGE"`
Interval int `json:"interval_minutes" env:"PICOCLAW_MEDIA_CLEANUP_INTERVAL"`
}

type ToolsConfig struct {
Web WebToolsConfig `json:"web"`
Cron CronToolsConfig `json:"cron"`
Exec ExecConfig `json:"exec"`
Skills SkillsToolsConfig `json:"skills"`
Web WebToolsConfig `json:"web"`
Cron CronToolsConfig `json:"cron"`
Exec ExecConfig `json:"exec"`
Skills SkillsToolsConfig `json:"skills"`
MediaCleanup MediaCleanupConfig `json:"media_cleanup"`
}

type SkillsToolsConfig struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ func DefaultConfig() *Config {
TTLSeconds: 300,
},
},
MediaCleanup: MediaCleanupConfig{
Enabled: true,
MaxAge: 30,
Interval: 5,
},
},
Heartbeat: HeartbeatConfig{
Enabled: true,
Expand Down
97 changes: 96 additions & 1 deletion pkg/utils/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/google/uuid"

"github.com/sipeed/picoclaw/pkg/logger"
)

// MediaDir is the subdirectory name under os.TempDir() where downloaded media files are stored.
const MediaDir = "picoclaw_media"

// IsAudioFile checks if a file is an audio file based on its filename extension and content type.
func IsAudioFile(filename, contentType string) bool {
audioExtensions := []string{".mp3", ".wav", ".ogg", ".m4a", ".flac", ".aac", ".wma"}
Expand Down Expand Up @@ -65,7 +69,7 @@ func DownloadFile(url, filename string, opts DownloadOptions) string {
opts.LoggerPrefix = "utils"
}

mediaDir := filepath.Join(os.TempDir(), "picoclaw_media")
mediaDir := filepath.Join(os.TempDir(), MediaDir)
if err := os.MkdirAll(mediaDir, 0o700); err != nil {
logger.ErrorCF(opts.LoggerPrefix, "Failed to create media directory", map[string]any{
"error": err.Error(),
Expand Down Expand Up @@ -141,3 +145,94 @@ func DownloadFileSimple(url, filename string) string {
LoggerPrefix: "media",
})
}

// MediaCleaner periodically removes old files from the media temp directory.
type MediaCleaner struct {
interval time.Duration
maxAge time.Duration
stop chan struct{}
once sync.Once
}

// NewMediaCleaner creates a new MediaCleaner with the given settings.
// If intervalMinutes or maxAgeMinutes are <= 0, defaults are used (5 and 30 respectively).
func NewMediaCleaner(intervalMinutes, maxAgeMinutes int) *MediaCleaner {
interval := time.Duration(intervalMinutes) * time.Minute
if interval <= 0 {
interval = 5 * time.Minute
}
maxAge := time.Duration(maxAgeMinutes) * time.Minute
if maxAge <= 0 {
maxAge = 30 * time.Minute
}
return &MediaCleaner{
interval: interval,
maxAge: maxAge,
stop: make(chan struct{}),
}
}

// Start begins the background cleanup goroutine. Safe to call multiple times.
func (mc *MediaCleaner) Start() {
mc.once.Do(func() {
go mc.loop()
logger.InfoCF("media", "Media cleaner started", map[string]any{
"interval": mc.interval.String(),
"max_age": mc.maxAge.String(),
})
})
}

// Stop signals the cleanup goroutine to exit. Safe to call multiple times.
func (mc *MediaCleaner) Stop() {
select {
case <-mc.stop:
default:
close(mc.stop)
logger.InfoC("media", "Media cleaner stopped")
}
}

func (mc *MediaCleaner) loop() {
ticker := time.NewTicker(mc.interval)
defer ticker.Stop()
for {
select {
case <-mc.stop:
return
case <-ticker.C:
mc.cleanup()
}
}
}

func (mc *MediaCleaner) cleanup() {
mediaDir := filepath.Join(os.TempDir(), MediaDir)
entries, err := os.ReadDir(mediaDir)
if err != nil {
return
}

now := time.Now()
removed := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
info, err := entry.Info()
if err != nil {
continue
}
if now.Sub(info.ModTime()) > mc.maxAge {
path := filepath.Join(mediaDir, entry.Name())
if err := os.Remove(path); err == nil {
removed++
}
}
}
if removed > 0 {
logger.DebugCF("media", "Cleaned up old media files", map[string]any{
"removed": removed,
})
}
}
Loading