Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions config/config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,50 +52,58 @@
"proxy": "",
"allow_from": [
"YOUR_USER_ID"
]
],
"reasoning_channel_id": ""
},
"discord": {
"enabled": false,
"token": "YOUR_DISCORD_BOT_TOKEN",
"allow_from": [],
"mention_only": false
"mention_only": false,
"reasoning_channel_id": ""
},
"qq": {
"enabled": false,
"app_id": "YOUR_QQ_APP_ID",
"app_secret": "YOUR_QQ_APP_SECRET",
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"maixcam": {
"enabled": false,
"host": "0.0.0.0",
"port": 18790,
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"whatsapp": {
"enabled": false,
"bridge_url": "ws://localhost:3001",
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"feishu": {
"enabled": false,
"app_id": "",
"app_secret": "",
"encrypt_key": "",
"verification_token": "",
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"dingtalk": {
"enabled": false,
"client_id": "YOUR_CLIENT_ID",
"client_secret": "YOUR_CLIENT_SECRET",
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"slack": {
"enabled": false,
"bot_token": "xoxb-YOUR-BOT-TOKEN",
"app_token": "xapp-YOUR-APP-TOKEN",
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"line": {
"enabled": false,
Expand All @@ -104,15 +112,17 @@
"webhook_host": "0.0.0.0",
"webhook_port": 18791,
"webhook_path": "/webhook/line",
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"onebot": {
"enabled": false,
"ws_url": "ws://127.0.0.1:3001",
"access_token": "",
"reconnect_interval": 5,
"group_trigger_prefix": [],
"allow_from": []
"allow_from": [],
"reasoning_channel_id": ""
},
"wecom": {
"_comment": "WeCom Bot (ζ™Ίθƒ½ζœΊε™¨δΊΊ) - Easier setup, supports group chats",
Expand All @@ -124,7 +134,8 @@
"webhook_port": 18793,
"webhook_path": "/webhook/wecom",
"allow_from": [],
"reply_timeout": 5
"reply_timeout": 5,
"reasoning_channel_id": ""
},
"wecom_app": {
"_comment": "WeCom App (θ‡ͺ建应用) - More features, proactive messaging, private chat only. See docs/wecom-app-configuration.md",
Expand All @@ -138,7 +149,8 @@
"webhook_port": 18792,
"webhook_path": "/webhook/wecom-app",
"allow_from": [],
"reply_timeout": 5
"reply_timeout": 5,
"reasoning_channel_id": ""
}
},
"providers": {
Expand Down Expand Up @@ -250,4 +262,4 @@
"host": "127.0.0.1",
"port": 18790
}
}
}
34 changes: 34 additions & 0 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,28 @@ func (al *AgentLoop) runAgentLoop(ctx context.Context, agent *AgentInstance, opt
return finalContent, nil
}

func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string) {
if al.channelManager == nil {
return ""
}
if ch, ok := al.channelManager.GetChannel(channelName); ok {
return ch.ReasoningChannelID()
}
return ""
}

func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, channelName, channelID string) {
if reasoningContent == "" || channelName == "" || channelID == "" {
return
}

al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: channelName,
ChatID: channelID,
Content: reasoningContent,
})
}

// runLLMIteration executes the LLM call loop with tool handling.
func (al *AgentLoop) runLLMIteration(
ctx context.Context,
Expand Down Expand Up @@ -649,6 +671,18 @@ func (al *AgentLoop) runLLMIteration(
return "", iteration, fmt.Errorf("LLM call failed after retries: %w", err)
}

go al.handleReasoning(ctx, response.Reasoning, opts.Channel, al.targetReasoningChannelID(opts.Channel))

logger.DebugCF("agent", "LLM response",
map[string]any{
"agent_id": agent.ID,
"iteration": iteration,
"content_chars": len(response.Content),
"tool_calls": len(response.ToolCalls),
"reasoning": response.Reasoning,
"target_channel": al.targetReasoningChannelID(opts.Channel),
"channel": opts.Channel,
})
// Check if no tool calls - we're done
if len(response.ToolCalls) == 0 {
finalContent = response.Content
Expand Down
167 changes: 167 additions & 0 deletions pkg/agent/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@ import (
"time"

"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/providers"
"github.com/sipeed/picoclaw/pkg/tools"
)

type fakeChannel struct{ id string }

func (f *fakeChannel) Name() string { return "fake" }
func (f *fakeChannel) Start(ctx context.Context) error { return nil }
func (f *fakeChannel) Stop(ctx context.Context) error { return nil }
func (f *fakeChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { return nil }
func (f *fakeChannel) IsRunning() bool { return true }
func (f *fakeChannel) IsAllowed(string) bool { return true }
func (f *fakeChannel) IsAllowedSender(sender bus.SenderInfo) bool { return true }
func (f *fakeChannel) ReasoningChannelID() string { return f.id }

func TestRecordLastChannel(t *testing.T) {
// Create temp workspace
tmpDir, err := os.MkdirTemp("", "agent-test-*")
Expand Down Expand Up @@ -631,3 +643,158 @@ func TestAgentLoop_ContextExhaustionRetry(t *testing.T) {
t.Errorf("Expected history to be compressed (len < 8), got %d", len(finalHistory))
}
}

func TestTargetReasoningChannelID_AllChannels(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agent-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)

cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
}

al := NewAgentLoop(cfg, bus.NewMessageBus(), &mockProvider{})
chManager, err := channels.NewManager(&config.Config{}, bus.NewMessageBus(), nil)
if err != nil {
t.Fatalf("Failed to create channel manager: %v", err)
}
for name, id := range map[string]string{
"whatsapp": "rid-whatsapp",
"telegram": "rid-telegram",
"feishu": "rid-feishu",
"discord": "rid-discord",
"maixcam": "rid-maixcam",
"qq": "rid-qq",
"dingtalk": "rid-dingtalk",
"slack": "rid-slack",
"line": "rid-line",
"onebot": "rid-onebot",
"wecom": "rid-wecom",
"wecom_app": "rid-wecom-app",
} {
chManager.RegisterChannel(name, &fakeChannel{id: id})
}
al.SetChannelManager(chManager)
tests := []struct {
channel string
wantID string
}{
{channel: "whatsapp", wantID: "rid-whatsapp"},
{channel: "telegram", wantID: "rid-telegram"},
{channel: "feishu", wantID: "rid-feishu"},
{channel: "discord", wantID: "rid-discord"},
{channel: "maixcam", wantID: "rid-maixcam"},
{channel: "qq", wantID: "rid-qq"},
{channel: "dingtalk", wantID: "rid-dingtalk"},
{channel: "slack", wantID: "rid-slack"},
{channel: "line", wantID: "rid-line"},
{channel: "onebot", wantID: "rid-onebot"},
{channel: "wecom", wantID: "rid-wecom"},
{channel: "wecom_app", wantID: "rid-wecom-app"},
{channel: "unknown", wantID: ""},
}

for _, tt := range tests {
t.Run(tt.channel, func(t *testing.T) {
got := al.targetReasoningChannelID(tt.channel)
if got != tt.wantID {
t.Fatalf("targetReasoningChannelID(%q) = %q, want %q", tt.channel, got, tt.wantID)
}
})
}
}

func TestHandleReasoning(t *testing.T) {
newLoop := func(t *testing.T) (*AgentLoop, *bus.MessageBus) {
t.Helper()
tmpDir, err := os.MkdirTemp("", "agent-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
t.Cleanup(func() { _ = os.RemoveAll(tmpDir) })
cfg := &config.Config{
Agents: config.AgentsConfig{
Defaults: config.AgentDefaults{
Workspace: tmpDir,
Model: "test-model",
MaxTokens: 4096,
MaxToolIterations: 10,
},
},
}
msgBus := bus.NewMessageBus()
return NewAgentLoop(cfg, msgBus, &mockProvider{}), msgBus
}

t.Run("skips when any required field is empty", func(t *testing.T) {
al, msgBus := newLoop(t)
al.handleReasoning(context.Background(), "reasoning", "telegram", "")

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
if msg, ok := msgBus.SubscribeOutbound(ctx); ok {
t.Fatalf("expected no outbound message, got %+v", msg)
}
})

t.Run("publishes one message for non telegram", func(t *testing.T) {
al, msgBus := newLoop(t)
al.handleReasoning(context.Background(), "hello reasoning", "slack", "channel-1")

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
msg, ok := msgBus.SubscribeOutbound(ctx)
if !ok {
t.Fatal("expected an outbound message")
}
if msg.Channel != "slack" || msg.ChatID != "channel-1" || msg.Content != "hello reasoning" {
t.Fatalf("unexpected outbound message: %+v", msg)
}
})

t.Run("publishes one message for telegram", func(t *testing.T) {
al, msgBus := newLoop(t)
reasoning := "hello telegram reasoning"
al.handleReasoning(context.Background(), reasoning, "telegram", "tg-chat")

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
msg, ok := msgBus.SubscribeOutbound(ctx)
if !ok {
t.Fatal("expected outbound message")
}

if msg.Channel != "telegram" {
t.Fatalf("expected telegram channel message, got %+v", msg)
}
if msg.ChatID != "tg-chat" {
t.Fatalf("expected chatID tg-chat, got %+v", msg)
}
if msg.Content != reasoning {
t.Fatalf("content mismatch: got %q want %q", msg.Content, reasoning)
}
})
t.Run("expired ctx", func(t *testing.T) {
al, msgBus := newLoop(t)
reasoning := "hello telegram reasoning"
ctx, cancel := context.WithCancel(context.Background())
cancel()
al.handleReasoning(ctx, reasoning, "telegram", "tg-chat")

ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
msg, ok := msgBus.SubscribeOutbound(ctx)
if ok {
t.Fatalf("expected no outbound message, got %+v", msg)
}
})
}
9 changes: 9 additions & 0 deletions pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (mb *MessageBus) PublishInbound(ctx context.Context, msg InboundMessage) er
if mb.closed.Load() {
return ErrBusClosed
}
if err := ctx.Err(); err != nil {
return err
}
select {
case mb.inbound <- msg:
return nil
Expand All @@ -57,6 +60,9 @@ func (mb *MessageBus) PublishOutbound(ctx context.Context, msg OutboundMessage)
if mb.closed.Load() {
return ErrBusClosed
}
if err := ctx.Err(); err != nil {
return err
}
select {
case mb.outbound <- msg:
return nil
Expand All @@ -82,6 +88,9 @@ func (mb *MessageBus) PublishOutboundMedia(ctx context.Context, msg OutboundMedi
if mb.closed.Load() {
return ErrBusClosed
}
if err := ctx.Err(); err != nil {
return err
}
select {
case mb.outboundMedia <- msg:
return nil
Expand Down
Loading