Skip to content

Refactor Channel System - Rebased to Latest Main#817

Closed
Zhaoyikaiii wants to merge 28 commits intosipeed:mainfrom
Zhaoyikaiii:refactor/channel-system-upstream
Closed

Refactor Channel System - Rebased to Latest Main#817
Zhaoyikaiii wants to merge 28 commits intosipeed:mainfrom
Zhaoyikaiii:refactor/channel-system-upstream

Conversation

@Zhaoyikaiii
Copy link
Collaborator

Summary

This PR rebases the channel system refactor (PR #662) onto the latest main branch, incorporating approximately 60 commits from main that were made after the original refactor branch.

Changes Included

  • Channel subpackage architecture and reorganization
  • MessageBus deadlock fixes and concurrency improvements
  • Per-channel rate limiting with error classification
  • MediaStore for unified media lifecycle management
  • HTTP server consolidation under channel Manager
  • Identity system with canonical platform:id format
  • LINE channel typing indicators and PlaceholderRecorder
  • Various bug fixes and review comment resolutions

Rebase Notes

  • Resolved conflicts in:
    • pkg/channels/* (old file deletion vs main modifications)
    • go.mod (dependency merge: pflag + golang.org/x/time)
    • pkg/agent/loop.go (memory threshold notification)
    • cmd/picoclaw/internal/gateway/helpers.go (HTTP server setup)

Testing

Please verify:

  • All channels start and connect properly
  • Message sending works across all channel types
  • Rate limiting and retry mechanisms function correctly
  • Media handling works as expected
  • Health endpoints are accessible

🤖 Generated with Claude Code

alexhoshina and others added 28 commits February 26, 2026 20:40
…tructured fields

Add bus.Peer struct and explicit Peer/MessageID fields to InboundMessage,
replacing the implicit peer_kind/peer_id/message_id metadata convention.

- Add Peer{Kind, ID} type to pkg/bus/types.go
- Extend InboundMessage with Peer and MessageID fields
- Change BaseChannel.HandleMessage signature to accept peer and messageID
- Adapt all 12 channel implementations to pass structured peer/messageID
- Simplify agent extractPeer() to read msg.Peer directly
- extractParentPeer unchanged (parent_peer still via metadata)
…ext leaks

- OneBot: remove close(ch) race in Stop() pending cleanup; add WriteDeadline to Send/sendAPIRequest
- Telegram: add cancelCtx; Stop() now calls bh.Stop(), cancel(), and cleans up thinking CancelFuncs
- Discord: add cancelCtx via WithCancel; Stop() calls cancel(); remove unused getContext()
- WhatsApp: add cancelCtx; Send() adds WriteDeadline; replace stdlib log with project logger
- MaixCam: add cancelCtx; Send() adds WriteDeadline; Stop() calls cancel() before closing
…r queues

Move message splitting from individual channels (Discord) to the Manager
layer via per-channel worker goroutines. Each channel now declares its
max message length through BaseChannelOption/MessageLengthProvider, and
the Manager automatically splits oversized outbound messages before
dispatch. This prevents one slow channel from blocking all others.

- Add WithMaxMessageLength option and MessageLengthProvider interface
- Set platform-specific limits (Discord 2000, Telegram 4096, Slack 40000, etc.)
- Convert SplitMessage to rune-aware counting for correct Unicode handling
- Replace single dispatcher goroutine with per-channel buffered worker queues
- Remove Discord's internal SplitMessage call (now handled centrally)
…gement

Channels previously deleted downloaded media files via defer os.Remove,
racing with the async Agent consumer. Introduce MediaStore to decouple
file ownership: channels register files on download, Agent releases them
after processing via ReleaseAll(scope).

- New pkg/media with MediaStore interface + FileMediaStore implementation
- InboundMessage gains MediaScope field for lifecycle tracking
- BaseChannel gains SetMediaStore/GetMediaStore + BuildMediaScope helper
- Manager injects MediaStore into channels; AgentLoop releases on completion
- Telegram, Discord, Slack, OneBot, LINE channels migrated from defer
  os.Remove to store.Store() with media:// refs
… error classification

Define sentinel error types (ErrNotRunning, ErrRateLimit, ErrTemporary,
ErrSendFailed) so the Manager can classify Send failures and choose the
right retry strategy: permanent errors bail immediately, rate-limit
errors use a fixed 1s delay, and temporary/unknown errors use exponential
backoff (500ms→1s→2s, capped at 8s, up to 3 retries). A per-channel
token-bucket rate limiter (golang.org/x/time/rate) throttles outbound
sends before they hit the platform API.
PublishInbound/PublishOutbound held RLock during blocking channel sends,
deadlocking against Close() which needs a write lock when the buffer is
full. ConsumeInbound/SubscribeOutbound used bare receives instead of
comma-ok, causing zero-value processing or busy loops after close.

Replace sync.RWMutex+bool with atomic.Bool+done channel so Publish
methods use a lock-free 3-way select (send / done / ctx.Done). Add
context.Context parameter to both Publish methods so callers can cancel
or timeout blocked sends. Close() now only sets the atomic flag and
closes the done channel—never closes the data channels—eliminating
send-on-closed-channel panics.

- Remove dead code: RegisterHandler, GetHandler, handlers map,
  MessageHandler type (zero callers across the whole repo)
- Add ErrBusClosed sentinel error
- Update all 10 caller sites to pass context
- Add msgBus.Close() to gateway and agent shutdown flows
- Add pkg/bus/bus_test.go with 11 test cases covering basic round-trip,
  context cancellation, closed-bus behavior, concurrent publish+close,
  full-buffer timeout, and idempotent Close
…el types

All 12 channel Send methods now return proper sentinel errors (ErrNotRunning,
ErrTemporary, ErrRateLimit, ErrSendFailed) instead of plain fmt.Errorf strings,
enabling Manager's sendWithRetry classification logic to actually work.

- Add ClassifySendError/ClassifyNetError helpers in errutil.go for HTTP-based channels
- LINE/WeCom Bot/WeCom App: use ClassifySendError for HTTP status-based classification
- SDK channels (Telegram/Discord/Slack/QQ/DingTalk/Feishu): wrap errors as ErrTemporary
- WebSocket channels (OneBot/WhatsApp/MaixCam): wrap write errors as ErrTemporary
- WhatsApp: add missing IsRunning() check in Send
- WhatsApp/OneBot/MaixCam: add ctx.Done() check before entering write path
- Telegram Stop: clean up placeholders sync.Map to prevent state leaks
…ed by Manager

Merge 3 independent channel HTTP servers (LINE :18791, WeCom Bot :18793,
WeCom App :18792) and the health server (:18790) into a single shared
HTTP server on the Gateway address. Channels implement WebhookHandler
and/or HealthChecker interfaces to register their handlers on the shared
mux. Also change Gateway default host from 0.0.0.0 to 127.0.0.1 for
security.
Add outbound media sending capability so the agent can publish media
attachments (images, files, audio, video) through channels via the bus.

- Add MediaPart and OutboundMediaMessage types to bus
- Add PublishOutboundMedia/SubscribeOutboundMedia bus methods
- Add MediaSender interface discovered via type assertion by Manager
- Add media dispatch/worker in Manager with shared retry logic
- Extend ToolResult with Media field and MediaResult constructor
- Publish outbound media from agent loop on tool results
- Implement SendMedia for Telegram, Discord, Slack, LINE, OneBot, WeCom
Remove SetTranscriber and inline transcription logic from 4 channels
(Telegram, Discord, Slack, OneBot) and the gateway wiring. Voice/audio
files are still downloaded and stored in MediaStore with simple text
annotations ([voice], [audio: filename], [file: name]). The pkg/voice
package is preserved for future Agent-level transcription middleware.
Add unified ShouldRespondInGroup to BaseChannel, replacing scattered
per-channel group filtering logic. Introduce GroupTriggerConfig (with
mention_only + prefixes), TypingConfig, and PlaceholderConfig types.
Migrate Discord MentionOnly, OneBot checkGroupTrigger, and LINE
hardcoded mention-only to the shared mechanism. Add group trigger
entry points for Slack, Telegram, QQ, Feishu, DingTalk, and WeCom.
Legacy config fields are preserved with automatic migration.
…hannel (Phase 10 + 7)

Phase 10: Define TypingCapable, MessageEditor, PlaceholderRecorder interfaces.
Manager orchestrates outbound typing stop and placeholder editing via preSend.
Migrate Telegram, Discord, Slack, OneBot to register state with Manager instead
of handling locally in Send. Phase 7: Add native WebSocket Pico Protocol channel
as reference implementation of all optional capability interfaces.
Message splitting is exclusively a Manager responsibility. Moving it
into the channels package eliminates the cross-package dependency and
aligns with the refactoring plan.
- MediaStore: use full UUID to prevent ref collisions, preserve and
  expose metadata via ResolveWithMeta, include underlying OS errors
- Agent loop: populate MediaPart Type/Filename/ContentType from
  MediaStore metadata so channels can dispatch media correctly
- SplitMessage: fix byte-vs-rune index mixup in code block header
  parsing, remove dead candidateStr variable
- Pico auth: restrict query-param token behind AllowTokenQuery config
  flag (default false) to prevent token leakage via logs/referer
- HandleMessage: replace context.TODO with caller-propagated ctx,
  log PublishInbound failures instead of silently discarding
- Gateway shutdown: use fresh 15s timeout context for StopAll so
  graceful shutdown is not short-circuited by the cancelled parent ctx
…format

Introduce SenderInfo struct and pkg/identity package to standardize user
identification across all channels. Each channel now constructs structured
sender info (platform, platformID, canonicalID, username, displayName)
instead of ad-hoc string IDs. Allow-list matching supports all legacy
formats (numeric ID, @username, id|username) plus the new canonical
"platform:id" format. Session key resolution also handles canonical
peerIDs for backward-compatible identity link matching.
…ts, onebot leak)

- Drain buffered messages in MessageBus.Close() so they aren't silently lost
- Replace all context.TODO() with context.WithTimeout(5s) across 7 call sites
- Fix OneBot pending channel leak: send nil sentinel in Stop() and handle
  nil response in sendAPIRequest() to unblock waiting goroutines
… subpackages

Translate Chinese comments to English in qq, slack, and telegram channel
implementations, following the translation work done in PR sipeed#697. The
original PR modified the old parent package files, but these have been
moved to subpackages during the refactor, so translations are applied
to the new locations.
Implement TypingCapable interface for LINE channel using the
loading animation API (1:1 chats only, no group support).

- Add StartTyping() with 50s periodic refresh and context-based stop
- Integrate PlaceholderRecorder.RecordTypingStop in processEvent
- Skip RecordPlaceholder (LINE has no message edit API)
- Change sendLoading to accept context and return error
- Relax callAPI status check from 200 to 2xx range

Design consulted with Codex (GPT-5.2).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Address review feedback from @alexhoshina and Codex:
- Log sendLoading errors in ticker goroutine instead of discarding
- Only start typing indicator when PlaceholderRecorder is available
  to avoid wasted API calls and unnecessary goroutine creation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants