Refactor/channel system#662
Conversation
nikolasdehor
left a comment
There was a problem hiding this comment.
Substantial refactor — the channel system is significantly better structured after this. Key improvements: self-registering channel factories, context-aware bus with proper close semantics, structured sender identity, media lifecycle store, rate-limited per-channel workers, and unified group trigger logic.
Reviewed the full diff. Several observations:
Architecture (positive)
- The
init()+RegisterFactorypattern for channels is clean and idiomatic Go (similar todatabase/sqldrivers). Blank imports incmd_gateway.gomake the dependencies explicit. MessageBus.Close()usingatomic.Bool+ adonechannel instead ofsync.RWMutexeliminates the potential deadlock the old code had whenPublishInboundblocked on a full buffer while holding the read lock.- The
SplitMessagelogic move frompkg/utils/message.gointopkg/channels/split.gois appropriate — message splitting is channel-specific. FileMediaStorewith scope-based lifecycle is a good abstraction for cross-channel media.
Potential issues
-
Bus channels never closed:
NewMessageBuscreates buffered channels (inbound,outbound,outboundMedia) butClose()only closesdone— it never closes the actual channels. This means messages buffered in those channels at shutdown are silently dropped. The old code closed both channels. Consider draining or closing them inClose()(after setting theclosedflag). -
Rate limiter values hardcoded:
channelRateConfighas Discord and Slack at 1 msg/s. Discord's actual limit is 5 msg/s per channel, Slack is 1 msg/s per method. These should probably be configurable or at least documented that they are conservative defaults. -
context.TODO()usage: Several call sites usecontext.TODO()forPublishOutbound/PublishInbound(e.g.,cron.go,subagent.go, summarize goroutine). These would silently hang if the bus buffer is full and the bus is not yet closed. Consider using a context with timeout for fire-and-forget publishes. -
ShouldRespondInGroupprefix matching: The current implementation usesstrings.HasPrefixwhich is case-sensitive. If a user types/Askinstead of/ask, it won't match. This may be intentional, but worth documenting. -
Shutdown order change: The old code called
channelManager.StopAll(ctx)last; the new code calls it first (before stopping cron, heartbeat, agent). This seems correct (stop accepting new messages first), but theshutdownCtxwith 15s timeout is shared across all the stop calls that follow. IfStopAlltakes close to 15s, the subsequent stops get no grace period. Might be fine in practice but worth noting. -
Media cleanup disabled: The TODO comment in
loop.gonotes media cleanup is disabled because files are deleted before the LLM accesses them. This means/tmp/picoclaw_media/files will accumulate indefinitely. Consider a periodic cleanup or TTL-based sweep inFileMediaStore. -
OneBotChannel.Stop— pending channels not closed: In the old code,close(ch)was called for pending response channels. The new code doesdelete(c.pending, echo)without closing. Any goroutine blocked on a read from that channel will now leak. This looks like a bug. -
uuiddependency: The newgoogle/uuiddependency is only used inBuildMediaScopeandFileMediaStore.Store. Both usages are fine, but the import inbase.gomeans thechannelspackage itself now depends onuuid.
Test coverage: Good additions — bus_test.go, base_test.go, identity_test.go, media/store_test.go, manager_test.go, session_key_test.go, errutil_test.go all cover the new abstractions well. The concurrent safety test for FileMediaStore is appreciated.
Overall this is solid work. The items above are mostly about edge cases around shutdown and resource cleanup. Item 7 (OneBot pending channel leak) looks like an actual bug that should be addressed.
Thank you for your review, I will make the necessary fixes later |
…ts, onebot leak) - Drain buffered messages in MessageBus.Close() so they aren't silently lost - Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites - Fix OneBot pending channel leak: send nil sentinel in Stop() and handle nil response in sendAPIRequest() to unblock waiting goroutines
|
@alexhoshina Sounds good, take your time with the fixes. For prioritization, I'd focus on:
The rest (rate limiter values, I am also watching #619 and #639 which both feed into this refactor. The Happy to review the next iteration. |
…tructured fields
Add bus.Peer struct and explicit Peer/MessageID fields to InboundMessage,
replacing the implicit peer_kind/peer_id/message_id metadata convention.
- Add Peer{Kind, ID} type to pkg/bus/types.go
- Extend InboundMessage with Peer and MessageID fields
- Change BaseChannel.HandleMessage signature to accept peer and messageID
- Adapt all 12 channel implementations to pass structured peer/messageID
- Simplify agent extractPeer() to read msg.Peer directly
- extractParentPeer unchanged (parent_peer still via metadata)
…ext leaks - OneBot: remove close(ch) race in Stop() pending cleanup; add WriteDeadline to Send/sendAPIRequest - Telegram: add cancelCtx; Stop() now calls bh.Stop(), cancel(), and cleans up thinking CancelFuncs - Discord: add cancelCtx via WithCancel; Stop() calls cancel(); remove unused getContext() - WhatsApp: add cancelCtx; Send() adds WriteDeadline; replace stdlib log with project logger - MaixCam: add cancelCtx; Send() adds WriteDeadline; Stop() calls cancel() before closing
…r queues Move message splitting from individual channels (Discord) to the Manager layer via per-channel worker goroutines. Each channel now declares its max message length through BaseChannelOption/MessageLengthProvider, and the Manager automatically splits oversized outbound messages before dispatch. This prevents one slow channel from blocking all others. - Add WithMaxMessageLength option and MessageLengthProvider interface - Set platform-specific limits (Discord 2000, Telegram 4096, Slack 40000, etc.) - Convert SplitMessage to rune-aware counting for correct Unicode handling - Replace single dispatcher goroutine with per-channel buffered worker queues - Remove Discord's internal SplitMessage call (now handled centrally)
…gement Channels previously deleted downloaded media files via defer os.Remove, racing with the async Agent consumer. Introduce MediaStore to decouple file ownership: channels register files on download, Agent releases them after processing via ReleaseAll(scope). - New pkg/media with MediaStore interface + FileMediaStore implementation - InboundMessage gains MediaScope field for lifecycle tracking - BaseChannel gains SetMediaStore/GetMediaStore + BuildMediaScope helper - Manager injects MediaStore into channels; AgentLoop releases on completion - Telegram, Discord, Slack, OneBot, LINE channels migrated from defer os.Remove to store.Store() with media:// refs
… error classification Define sentinel error types (ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed) so the Manager can classify Send failures and choose the right retry strategy: permanent errors bail immediately, rate-limit errors use a fixed 1s delay, and temporary/unknown errors use exponential backoff (500ms→1s→2s, capped at 8s, up to 3 retries). A per-channel token-bucket rate limiter (golang.org/x/time/rate) throttles outbound sends before they hit the platform API.
PublishInbound/PublishOutbound held RLock during blocking channel sends, deadlocking against Close() which needs a write lock when the buffer is full. ConsumeInbound/SubscribeOutbound used bare receives instead of comma-ok, causing zero-value processing or busy loops after close. Replace sync.RWMutex+bool with atomic.Bool+done channel so Publish methods use a lock-free 3-way select (send / done / ctx.Done). Add context.Context parameter to both Publish methods so callers can cancel or timeout blocked sends. Close() now only sets the atomic flag and closes the done channel—never closes the data channels—eliminating send-on-closed-channel panics. - Remove dead code: RegisterHandler, GetHandler, handlers map, MessageHandler type (zero callers across the whole repo) - Add ErrBusClosed sentinel error - Update all 10 caller sites to pass context - Add msgBus.Close() to gateway and agent shutdown flows - Add pkg/bus/bus_test.go with 11 test cases covering basic round-trip, context cancellation, closed-bus behavior, concurrent publish+close, full-buffer timeout, and idempotent Close
…el types All 12 channel Send methods now return proper sentinel errors (ErrNotRunning, ErrTemporary, ErrRateLimit, ErrSendFailed) instead of plain fmt.Errorf strings, enabling Manager's sendWithRetry classification logic to actually work. - Add ClassifySendError/ClassifyNetError helpers in errutil.go for HTTP-based channels - LINE/WeCom Bot/WeCom App: use ClassifySendError for HTTP status-based classification - SDK channels (Telegram/Discord/Slack/QQ/DingTalk/Feishu): wrap errors as ErrTemporary - WebSocket channels (OneBot/WhatsApp/MaixCam): wrap write errors as ErrTemporary - WhatsApp: add missing IsRunning() check in Send - WhatsApp/OneBot/MaixCam: add ctx.Done() check before entering write path - Telegram Stop: clean up placeholders sync.Map to prevent state leaks
…ed by Manager Merge 3 independent channel HTTP servers (LINE :18791, WeCom Bot :18793, WeCom App :18792) and the health server (:18790) into a single shared HTTP server on the Gateway address. Channels implement WebhookHandler and/or HealthChecker interfaces to register their handlers on the shared mux. Also change Gateway default host from 0.0.0.0 to 127.0.0.1 for security.
Add outbound media sending capability so the agent can publish media attachments (images, files, audio, video) through channels via the bus. - Add MediaPart and OutboundMediaMessage types to bus - Add PublishOutboundMedia/SubscribeOutboundMedia bus methods - Add MediaSender interface discovered via type assertion by Manager - Add media dispatch/worker in Manager with shared retry logic - Extend ToolResult with Media field and MediaResult constructor - Publish outbound media from agent loop on tool results - Implement SendMedia for Telegram, Discord, Slack, LINE, OneBot, WeCom
Remove SetTranscriber and inline transcription logic from 4 channels (Telegram, Discord, Slack, OneBot) and the gateway wiring. Voice/audio files are still downloaded and stored in MediaStore with simple text annotations ([voice], [audio: filename], [file: name]). The pkg/voice package is preserved for future Agent-level transcription middleware.
Add unified ShouldRespondInGroup to BaseChannel, replacing scattered per-channel group filtering logic. Introduce GroupTriggerConfig (with mention_only + prefixes), TypingConfig, and PlaceholderConfig types. Migrate Discord MentionOnly, OneBot checkGroupTrigger, and LINE hardcoded mention-only to the shared mechanism. Add group trigger entry points for Slack, Telegram, QQ, Feishu, DingTalk, and WeCom. Legacy config fields are preserved with automatic migration.
…hannel (Phase 10 + 7) Phase 10: Define TypingCapable, MessageEditor, PlaceholderRecorder interfaces. Manager orchestrates outbound typing stop and placeholder editing via preSend. Migrate Telegram, Discord, Slack, OneBot to register state with Manager instead of handling locally in Send. Phase 7: Add native WebSocket Pico Protocol channel as reference implementation of all optional capability interfaces.
Message splitting is exclusively a Manager responsibility. Moving it into the channels package eliminates the cross-package dependency and aligns with the refactoring plan.
- MediaStore: use full UUID to prevent ref collisions, preserve and expose metadata via ResolveWithMeta, include underlying OS errors - Agent loop: populate MediaPart Type/Filename/ContentType from MediaStore metadata so channels can dispatch media correctly - SplitMessage: fix byte-vs-rune index mixup in code block header parsing, remove dead candidateStr variable - Pico auth: restrict query-param token behind AllowTokenQuery config flag (default false) to prevent token leakage via logs/referer - HandleMessage: replace context.TODO with caller-propagated ctx, log PublishInbound failures instead of silently discarding - Gateway shutdown: use fresh 15s timeout context for StopAll so graceful shutdown is not short-circuited by the cancelled parent ctx
…format Introduce SenderInfo struct and pkg/identity package to standardize user identification across all channels. Each channel now constructs structured sender info (platform, platformID, canonicalID, username, displayName) instead of ad-hoc string IDs. Allow-list matching supports all legacy formats (numeric ID, @username, id|username) plus the new canonical "platform:id" format. Session key resolution also handles canonical peerIDs for backward-compatible identity link matching.
…ts, onebot leak) - Drain buffered messages in MessageBus.Close() so they aren't silently lost - Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites - Fix OneBot pending channel leak: send nil sentinel in Stop() and handle nil response in sendAPIRequest() to unblock waiting goroutines
… subpackages Translate Chinese comments to English in qq, slack, and telegram channel implementations, following the translation work done in PR sipeed#697. The original PR modified the old parent package files, but these have been moved to subpackages during the refactor, so translations are applied to the new locations.
10a141c to
692efb2
Compare
…ts, onebot leak) - Drain buffered messages in MessageBus.Close() so they aren't silently lost - Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites - Fix OneBot pending channel leak: send nil sentinel in Stop() and handle nil response in sendAPIRequest() to unblock waiting goroutines
…ts, onebot leak) - Drain buffered messages in MessageBus.Close() so they aren't silently lost - Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites - Fix OneBot pending channel leak: send nil sentinel in Stop() and handle nil response in sendAPIRequest() to unblock waiting goroutines
…ebot leak) - Drain buffered messages in MessageBus.Close() so they aren't silently lost - Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites - Fix OneBot pending channel leak: send nil sentinel in Stop() and handle nil response in sendAPIRequest() to unblock waiting goroutines
📝 Description
Comprehensive refactoring of the channel system (
pkg/channels/), message bus (pkg/bus/), and supporting infrastructure across 12 phases. This PR restructures how PicoClaw manages chat platform connections — from a flat, tightly-coupled package into a modular, capability-driven architecture with proper concurrency control, error handling, and lifecycle management.Impact: ~5,900 additions / ~1,400 deletions across 68 files, touching all 12 channel implementations + bus + agent loop + config + 3 new packages.
🗣️ Type of Change
🔗 Related Issue
Closes #621
Resolves: #310 #381 #619 #449 #244 #285 #216 #9 #36
📚 Technical Context
Why This Refactoring
The channel system had accumulated significant technical debt: implicit metadata contracts (
peer_kind,peer_idas magic strings), media files deleted before async consumers read them, a single dispatcher goroutine where one slow channel blocks all others, no retry/rate-limiting on sends, scattered group-trigger logic duplicated across channels, and aMessageBusthat could deadlock when the buffer was full during shutdown.This PR addresses all of these systematically across 12 phases.
Architecture Changes
1. Channel Subpackages + Factory Registry
Channels moved from flat files (
pkg/channels/telegram.go) into subpackages (pkg/channels/telegram/telegram.go). Each registers itself viainit()+RegisterFactory()— the same pattern asdatabase/sqldrivers. The Manager dynamically creates channels from config without import-time coupling.2. MessageBus Concurrency Fix
Replaced
sync.RWMutex+boolwithatomic.Bool+donechannel. Publish methods use a lock-free 3-wayselect(send | done | ctx.Done), eliminating the deadlock wherePublishInboundblocked on a full buffer while holding the read lock, preventingClose()from acquiring the write lock.context.ContextClose()drains buffered messages instead of silently dropping themRegisterHandler,GetHandler,handlersmapErrBusClosedsentinel error3. Structured Identity (
bus.Peer+pkg/identity)Promoted
peer_kind/peer_id/message_idfrom implicit metadata strings to typed struct fields onInboundMessage. Newpkg/identitypackage provides canonicalplatform:idformat and backward-compatible allow-list matching (supports legacy numeric,@username,id|username, and newtelegram:123456formats).4. MediaStore File Lifecycle (
pkg/media)Channels previously deleted downloaded media via
defer os.Remove, racing with the async Agent consumer.FileMediaStoredecouples file ownership: channels register files on download (Store()→media://<uuid>ref), Agent resolves refs after processing. Scope-basedReleaseAll()implemented but currently disabled pending proper consumption confirmation.5. Per-Channel Worker Queues + Rate Limiting
Manager now runs a dedicated worker goroutine per channel with buffered queues, replacing the single dispatcher that could block all channels. Each worker has a
golang.org/x/time/ratetoken-bucket limiter with platform-specific defaults (Discord 1 msg/s burst 5, Telegram 20 msg/s, etc.).6. Error Classification + Retry
Defined sentinel errors (
ErrNotRunning,ErrRateLimit,ErrTemporary,ErrSendFailed). All 12 channels return typed errors fromSend(). Manager'ssendWithRetry()classifies errors and applies the right strategy: permanent → bail, rate-limit → fixed 1s delay, temporary → exponential backoff (500ms→1s→2s, max 3 retries).7. Unified Channel Lifecycle
All channels follow a standard contract:
Start(): createcancelCtxviacontext.WithCancel, spawn goroutines,SetRunning(true)Stop(): callcancel(), wait for goroutines, release resources,SetRunning(false)Send(): checkIsRunning(), respectctx.Done(), return typed errorsFixed goroutine/context leaks in OneBot, Telegram, Discord, WhatsApp, and MaixCam.
8. Shared HTTP Server
Merged 3 independent webhook servers (LINE
:18791, WeCom Bot:18793, WeCom App:18792) + health server (:18790) into a single shared HTTP server on the Gateway address. Channels implementWebhookHandlerand/orHealthCheckerinterfaces to register routes. Default listen address changed from0.0.0.0to127.0.0.1for security.9. Outbound Media via Bus
New
MediaSenderoptional interface +OutboundMediaMessagebus type. Agent publishes media from tool results via bus; Manager dispatches to channels that implementSendMedia(). Implemented for 7 channels: Telegram, Discord, Slack, OneBot, LINE, WeCom App, Pico.10. Group Trigger Standardization
Unified
ShouldRespondInGroup(isMentioned, content)in BaseChannel replaces 9 scattered per-channel implementations. Configurable viaGroupTriggerConfigwithmention_onlyandprefixesfields. Legacy config fields (Discord.MentionOnly,OneBot.GroupTriggerPrefix) auto-migrate.11. Typing/Placeholder Orchestration
Manager tracks typing indicators and placeholder messages via
PlaceholderRecorderinterface. On outbound message,preSend()stops typing and attempts to edit the placeholder (if channel implementsMessageEditor). Implemented for Telegram, Discord, and Pico.12. Pico Protocol Channel (New)
Native WebSocket channel as a reference implementation of all optional interfaces (
Channel,MessageEditor,TypingCapable,MediaSender,WebhookHandler). JSON protocol withmessage.send/create/update,typing.start/stop,media.send/create,ping/pong. Bearer token auth with configurable query-param fallback.Message Flow (After Refactoring)
New Packages
pkg/mediaMediaStoreinterface +FileMediaStorefor file lifecycle with scope-based cleanuppkg/identityplatform:idformat,MatchAllowed()with backward-compatible allow-list matchingpkg/channels/picoTest Coverage
Added 1,816 lines of new test code across 12 test files:
bus/bus_test.gochannels/manager_test.gochannels/base_test.gochannels/errors_test.goerrors.Issemantics, error messageschannels/errutil_test.gochannels/split_test.gomedia/store_test.goidentity/identity_test.gorouting/session_key_test.goKnown Limitations
ReleaseAll()is commented out in agent loop to prevent premature file deletion. Files in/tmp/picoclaw_media/will accumulate. A TTL-based or periodic cleanup strategy is needed.☑️ Checklist