diff --git a/pkg/channels/telegram.go b/pkg/channels/telegram.go index 5cd51e8bc..1176f34aa 100644 --- a/pkg/channels/telegram.go +++ b/pkg/channels/telegram.go @@ -3,6 +3,7 @@ package channels import ( "context" "fmt" + "net" "net/http" "net/url" "os" @@ -48,25 +49,31 @@ func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChann var opts []telego.BotOption telegramCfg := cfg.Channels.Telegram + // Always use a custom transport with TCP keepalive to prevent "unexpected EOF" + // errors caused by NAT/firewall silently dropping idle long-poll connections. + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + } + if telegramCfg.Proxy != "" { proxyURL, parseErr := url.Parse(telegramCfg.Proxy) if parseErr != nil { return nil, fmt.Errorf("invalid proxy URL %q: %w", telegramCfg.Proxy, parseErr) } - opts = append(opts, telego.WithHTTPClient(&http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyURL(proxyURL), - }, - })) + transport.Proxy = http.ProxyURL(proxyURL) } else if os.Getenv("HTTP_PROXY") != "" || os.Getenv("HTTPS_PROXY") != "" { - // Use environment proxy if configured - opts = append(opts, telego.WithHTTPClient(&http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - }, - })) + transport.Proxy = http.ProxyFromEnvironment } + opts = append(opts, telego.WithHTTPClient(&http.Client{ + Transport: transport, + })) + bot, err := telego.NewBot(telegramCfg.Token, opts...) if err != nil { return nil, fmt.Errorf("failed to create telegram bot: %w", err) diff --git a/pkg/cron/service.go b/pkg/cron/service.go index e699a44b5..dc0e0ab0a 100644 --- a/pkg/cron/service.go +++ b/pkg/cron/service.go @@ -340,7 +340,15 @@ func (cs *CronService) saveStoreUnsafe() error { return err } - return os.WriteFile(cs.storePath, data, 0o600) + // Write atomically: write to a temp file then rename. + // os.WriteFile truncates the file before writing, so a crash between + // truncation and completion leaves an empty or partial file. + // os.Rename on the same filesystem is atomic on Linux. + tmpPath := cs.storePath + ".tmp" + if err := os.WriteFile(tmpPath, data, 0o600); err != nil { + return err + } + return os.Rename(tmpPath, cs.storePath) } func (cs *CronService) AddJob( diff --git a/pkg/tools/cron.go b/pkg/tools/cron.go index 562fffc84..65d93dd5c 100644 --- a/pkg/tools/cron.go +++ b/pkg/tools/cron.go @@ -327,7 +327,12 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string { return fmt.Sprintf("Error: %v", err) } - // Response is automatically sent via MessageBus by AgentLoop - _ = response // Will be sent by AgentLoop + if response != "" { + t.msgBus.PublishOutbound(bus.OutboundMessage{ + Channel: channel, + ChatID: chatID, + Content: response, + }) + } return "ok" }