Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions pkg/channels/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package channels
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -48,25 +49,31 @@ func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChann
var opts []telego.BotOption
telegramCfg := cfg.Channels.Telegram

// Always use a custom transport with TCP keepalive to prevent "unexpected EOF"
// errors caused by NAT/firewall silently dropping idle long-poll connections.
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}

if telegramCfg.Proxy != "" {
proxyURL, parseErr := url.Parse(telegramCfg.Proxy)
if parseErr != nil {
return nil, fmt.Errorf("invalid proxy URL %q: %w", telegramCfg.Proxy, parseErr)
}
opts = append(opts, telego.WithHTTPClient(&http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyURL),
},
}))
transport.Proxy = http.ProxyURL(proxyURL)
} else if os.Getenv("HTTP_PROXY") != "" || os.Getenv("HTTPS_PROXY") != "" {
// Use environment proxy if configured
opts = append(opts, telego.WithHTTPClient(&http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
}))
transport.Proxy = http.ProxyFromEnvironment
}

opts = append(opts, telego.WithHTTPClient(&http.Client{
Transport: transport,
}))

bot, err := telego.NewBot(telegramCfg.Token, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create telegram bot: %w", err)
Expand Down
10 changes: 9 additions & 1 deletion pkg/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,15 @@ func (cs *CronService) saveStoreUnsafe() error {
return err
}

return os.WriteFile(cs.storePath, data, 0o600)
// Write atomically: write to a temp file then rename.
// os.WriteFile truncates the file before writing, so a crash between
// truncation and completion leaves an empty or partial file.
// os.Rename on the same filesystem is atomic on Linux.
tmpPath := cs.storePath + ".tmp"
if err := os.WriteFile(tmpPath, data, 0o600); err != nil {
return err
}
return os.Rename(tmpPath, cs.storePath)
}

func (cs *CronService) AddJob(
Expand Down
9 changes: 7 additions & 2 deletions pkg/tools/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,12 @@ func (t *CronTool) ExecuteJob(ctx context.Context, job *cron.CronJob) string {
return fmt.Sprintf("Error: %v", err)
}

// Response is automatically sent via MessageBus by AgentLoop
_ = response // Will be sent by AgentLoop
if response != "" {
t.msgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: response,
})
}
return "ok"
}