Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/bus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ type InboundMessage struct {
}

type OutboundMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
MsgType string `json:"msg_type,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}

type MessageHandler func(InboundMessage) error
128 changes: 124 additions & 4 deletions pkg/channels/feishu_64.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type FeishuChannel struct {
client *lark.Client
wsClient *larkws.Client

streamingMu sync.RWMutex
streamingCache map[string]*FeishuStreamingSession

mu sync.Mutex
cancel context.CancelFunc
}
Expand All @@ -34,9 +37,10 @@ func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChan
base := NewBaseChannel("feishu", cfg, bus, cfg.AllowFrom)

return &FeishuChannel{
BaseChannel: base,
config: cfg,
client: lark.NewClient(cfg.AppID, cfg.AppSecret),
BaseChannel: base,
config: cfg,
client: lark.NewClient(cfg.AppID, cfg.AppSecret),
streamingCache: make(map[string]*FeishuStreamingSession),
}, nil
}

Expand Down Expand Up @@ -97,6 +101,47 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
return fmt.Errorf("chat ID is empty")
}

msgType := c.resolveMessageType(msg.Content, msg.MsgType)

switch msgType {
case "card":
return c.sendCardMessage(ctx, msg)
case "streaming":
return c.sendStreamingMessage(ctx, msg)
default:
return c.sendTextMessage(ctx, msg)
}
}

func (c *FeishuChannel) resolveMessageType(content, explicitMsgType string) string {
if explicitMsgType == "card" || explicitMsgType == "interactive" {
return "card"
}
if explicitMsgType == "text" {
return "text"
}

renderMode := c.config.RenderMode
if renderMode == "" {
renderMode = "raw"
}

switch renderMode {
case "card":
return "card"
case "raw":
return "text"
case "auto":
if ShouldUseCard(content) {
return "card"
}
return "text"
default:
return "text"
}
}

func (c *FeishuChannel) sendTextMessage(ctx context.Context, msg bus.OutboundMessage) error {
payload, err := json.Marshal(map[string]string{"text": msg.Content})
if err != nil {
return fmt.Errorf("failed to marshal feishu content: %w", err)
Expand All @@ -121,13 +166,88 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
return fmt.Errorf("feishu api error: code=%d msg=%s", resp.Code, resp.Msg)
}

logger.DebugCF("feishu", "Feishu message sent", map[string]any{
logger.DebugCF("feishu", "Feishu text message sent", map[string]any{
"chat_id": msg.ChatID,
})

return nil
}

func (c *FeishuChannel) sendCardMessage(ctx context.Context, msg bus.OutboundMessage) error {
card := BuildMarkdownCard(msg.Content)
cardJSON, err := json.Marshal(card)
if err != nil {
return fmt.Errorf("failed to marshal card: %w", err)
}

req := larkim.NewCreateMessageReqBuilder().
ReceiveIdType(larkim.ReceiveIdTypeChatId).
Body(larkim.NewCreateMessageReqBodyBuilder().
ReceiveId(msg.ChatID).
MsgType(larkim.MsgTypeInteractive).
Content(string(cardJSON)).
Uuid(fmt.Sprintf("picoclaw-card-%d", time.Now().UnixNano())).
Build()).
Build()

resp, err := c.client.Im.V1.Message.Create(ctx, req)
if err != nil {
return fmt.Errorf("failed to send feishu card: %w", err)
}

if !resp.Success() {
return fmt.Errorf("feishu card api error: code=%d msg=%s", resp.Code, resp.Msg)
}

logger.DebugCF("feishu", "Feishu card message sent", map[string]any{
"chat_id": msg.ChatID,
})

return nil
}

func (c *FeishuChannel) sendStreamingMessage(ctx context.Context, msg bus.OutboundMessage) error {
streamingEnabled := c.config.StreamingEnabled
if !streamingEnabled {
return c.sendCardMessage(ctx, msg)
}

c.streamingMu.Lock()
session, ok := c.streamingCache[msg.ChatID]
if !ok {
interval := c.config.StreamingInterval
if interval <= 0 {
interval = 100
}
session = NewStreamingSession(c.client, c.config.AppID, c.config.AppSecret, "", interval)
c.streamingCache[msg.ChatID] = session
}
c.streamingMu.Unlock()

if !session.IsActive() {
if err := session.Start(ctx, msg.ChatID); err != nil {
logger.ErrorCF("feishu", "Failed to start streaming session", map[string]any{
"error": err.Error(),
})
return c.sendCardMessage(ctx, msg)
}
}

session.Update(msg.Content)

if msg.Metadata != nil && msg.Metadata["streaming_end"] == "true" {
c.streamingMu.Lock()
delete(c.streamingCache, msg.ChatID)
c.streamingMu.Unlock()
session.Close(msg.Content)
logger.DebugCF("feishu", "Streaming session closed", map[string]any{
"chat_id": msg.ChatID,
})
}

return nil
}

func (c *FeishuChannel) handleMessageReceive(_ context.Context, event *larkim.P2MessageReceiveV1) error {
if event == nil || event.Event == nil || event.Event.Message == nil {
return nil
Expand Down
Loading