Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions internal/event/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,42 @@ type Broadcaster interface {
// NopBroadcaster 空实现,用于测试或不需要广播的场景
type NopBroadcaster struct{}

func (n *NopBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {}
func (n *NopBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {}
func (n *NopBroadcaster) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) {}
func (n *NopBroadcaster) BroadcastLog(message string) {}
func (n *NopBroadcaster) BroadcastMessage(messageType string, data interface{}) {}
func (n *NopBroadcaster) BroadcastLog(message string) {}
func (n *NopBroadcaster) BroadcastMessage(messageType string, data interface{}) {}

// SanitizeProxyRequestForBroadcast 用于“实时广播”场景瘦身 payload:
// 去掉 request/response 大字段,避免 WebSocket 消息动辄几十/几百 KB,导致前端 JSON.parse / GC 卡死。
//
// 说明:
// - /requests 列表页只需要轻量字段(状态、耗时、tokens、成本等)。
// - 详情页需要的大字段应通过 /admin/requests/{id} 与 /admin/requests/{id}/attempts 拉取。
func SanitizeProxyRequestForBroadcast(req *domain.ProxyRequest) *domain.ProxyRequest {
if req == nil {
return nil
}
// 已经是瘦身后的对象,避免重复拷贝(高频场景会产生额外 GC 压力)
if req.RequestInfo == nil && req.ResponseInfo == nil {
return req
}
copied := *req
copied.RequestInfo = nil
copied.ResponseInfo = nil
return &copied
}

// SanitizeProxyUpstreamAttemptForBroadcast 用于“实时广播”场景瘦身 payload。
func SanitizeProxyUpstreamAttemptForBroadcast(attempt *domain.ProxyUpstreamAttempt) *domain.ProxyUpstreamAttempt {
if attempt == nil {
return nil
}
// 已经是瘦身后的对象,避免重复拷贝(高频场景会产生额外 GC 压力)
if attempt.RequestInfo == nil && attempt.ResponseInfo == nil {
return attempt
}
copied := *attempt
copied.RequestInfo = nil
copied.ResponseInfo = nil
return &copied
}
2 changes: 2 additions & 0 deletions internal/event/wails_broadcaster_desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (w *WailsBroadcaster) emitWailsEvent(eventType string, data interface{}) {

// BroadcastProxyRequest broadcasts a proxy request update
func (w *WailsBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {
req = SanitizeProxyRequestForBroadcast(req)
// Broadcast via inner broadcaster (WebSocket)
if w.inner != nil {
w.inner.BroadcastProxyRequest(req)
Expand All @@ -57,6 +58,7 @@ func (w *WailsBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {

// BroadcastProxyUpstreamAttempt broadcasts a proxy upstream attempt update
func (w *WailsBroadcaster) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) {
attempt = SanitizeProxyUpstreamAttemptForBroadcast(attempt)
if w.inner != nil {
w.inner.BroadcastProxyUpstreamAttempt(attempt)
}
Expand Down
102 changes: 61 additions & 41 deletions internal/executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package executor
package executor

import (
"context"
Expand Down Expand Up @@ -321,51 +321,71 @@ func (e *Executor) processAdapterEventsRealtime(eventChan domain.AdapterEventCha
return
}

for event := range eventChan {
if event == nil {
continue
}
// 事件节流:合并多个 adapter 事件为一次广播,避免在流式高并发下产生“广播风暴”
const broadcastThrottle = 200 * time.Millisecond
ticker := time.NewTicker(broadcastThrottle)
defer ticker.Stop()

needsBroadcast := false
dirty := false

switch event.Type {
case domain.EventRequestInfo:
if !e.shouldClearRequestDetail() && event.RequestInfo != nil {
attempt.RequestInfo = event.RequestInfo
needsBroadcast = true
}
case domain.EventResponseInfo:
if !e.shouldClearRequestDetail() && event.ResponseInfo != nil {
attempt.ResponseInfo = event.ResponseInfo
needsBroadcast = true
}
case domain.EventMetrics:
if event.Metrics != nil {
attempt.InputTokenCount = event.Metrics.InputTokens
attempt.OutputTokenCount = event.Metrics.OutputTokens
attempt.CacheReadCount = event.Metrics.CacheReadCount
attempt.CacheWriteCount = event.Metrics.CacheCreationCount
attempt.Cache5mWriteCount = event.Metrics.Cache5mCreationCount
attempt.Cache1hWriteCount = event.Metrics.Cache1hCreationCount
needsBroadcast = true
}
case domain.EventResponseModel:
if event.ResponseModel != "" {
attempt.ResponseModel = event.ResponseModel
needsBroadcast = true
flush := func() {
if !dirty || e.broadcaster == nil {
dirty = false
return
}
// 广播前做一次瘦身 + 快照,避免发送大字段、也避免指针被后续修改导致数据竞争
snapshot := event.SanitizeProxyUpstreamAttemptForBroadcast(attempt)
e.broadcaster.BroadcastProxyUpstreamAttempt(snapshot)
dirty = false
}

for {
select {
case ev, ok := <-eventChan:
if !ok {
flush()
return
}
case domain.EventFirstToken:
if event.FirstTokenTime > 0 {
// Calculate TTFT as duration from start time to first token time
firstTokenTime := time.UnixMilli(event.FirstTokenTime)
attempt.TTFT = firstTokenTime.Sub(attempt.StartTime)
needsBroadcast = true
if ev == nil {
continue
}
}

// Broadcast update immediately for real-time visibility
if needsBroadcast && e.broadcaster != nil {
e.broadcaster.BroadcastProxyUpstreamAttempt(attempt)
switch ev.Type {
case domain.EventRequestInfo:
if !e.shouldClearRequestDetail() && ev.RequestInfo != nil {
attempt.RequestInfo = ev.RequestInfo
dirty = true
}
case domain.EventResponseInfo:
if !e.shouldClearRequestDetail() && ev.ResponseInfo != nil {
attempt.ResponseInfo = ev.ResponseInfo
dirty = true
}
case domain.EventMetrics:
if ev.Metrics != nil {
attempt.InputTokenCount = ev.Metrics.InputTokens
attempt.OutputTokenCount = ev.Metrics.OutputTokens
attempt.CacheReadCount = ev.Metrics.CacheReadCount
attempt.CacheWriteCount = ev.Metrics.CacheCreationCount
attempt.Cache5mWriteCount = ev.Metrics.Cache5mCreationCount
attempt.Cache1hWriteCount = ev.Metrics.Cache1hCreationCount
dirty = true
}
case domain.EventResponseModel:
if ev.ResponseModel != "" {
attempt.ResponseModel = ev.ResponseModel
dirty = true
}
case domain.EventFirstToken:
if ev.FirstTokenTime > 0 {
// Calculate TTFT as duration from start time to first token time
firstTokenTime := time.UnixMilli(ev.FirstTokenTime)
attempt.TTFT = firstTokenTime.Sub(attempt.StartTime)
dirty = true
}
}
case <-ticker.C:
flush()
}
}
}
Expand Down
115 changes: 104 additions & 11 deletions internal/handler/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package handler

import (
"bufio"
"encoding/json"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/awsl-project/maxx/internal/domain"
"github.com/awsl-project/maxx/internal/event"
"github.com/gorilla/websocket"
)

Expand All @@ -28,8 +33,13 @@ type WebSocketHub struct {
clients map[*websocket.Conn]bool
broadcast chan WSMessage
mu sync.RWMutex

// broadcast channel 满时的丢弃计数(热路径:只做原子累加)
broadcastDroppedTotal atomic.Uint64
}

const websocketWriteTimeout = 5 * time.Second

func NewWebSocketHub() *WebSocketHub {
hub := &WebSocketHub{
clients: make(map[*websocket.Conn]bool),
Expand All @@ -41,15 +51,47 @@ func NewWebSocketHub() *WebSocketHub {

func (h *WebSocketHub) run() {
for msg := range h.broadcast {
// 避免在持锁状态下进行网络写入;同时修复 RLock 下 delete map 的数据竞争风险
h.mu.RLock()
clients := make([]*websocket.Conn, 0, len(h.clients))
for client := range h.clients {
err := client.WriteJSON(msg)
if err != nil {
client.Close()
clients = append(clients, client)
}
h.mu.RUnlock()

var toRemove []*websocket.Conn
for _, client := range clients {
_ = client.SetWriteDeadline(time.Now().Add(websocketWriteTimeout))
if err := client.WriteJSON(msg); err != nil {
_ = client.Close()
toRemove = append(toRemove, client)
}
}

if len(toRemove) > 0 {
h.mu.Lock()
for _, client := range toRemove {
delete(h.clients, client)
}
h.mu.Unlock()
}
}
}

func (h *WebSocketHub) tryEnqueueBroadcast(msg WSMessage, meta string) {
select {
case h.broadcast <- msg:
default:
dropped := h.broadcastDroppedTotal.Add(1)
// 避免日志刷屏:首次 + 每100次打印一次,确保可观测性但不拖慢热路径。
if dropped == 1 || dropped%100 == 0 {
meta = strings.TrimSpace(meta)
if meta != "" {
log.Printf("[WebSocket] drop broadcast message type=%s %s dropped_total=%d", msg.Type, meta, dropped)
} else {
log.Printf("[WebSocket] drop broadcast message type=%s dropped_total=%d", msg.Type, dropped)
}
}
h.mu.RUnlock()
}
}

Expand Down Expand Up @@ -81,33 +123,84 @@ func (h *WebSocketHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
}

func (h *WebSocketHub) BroadcastProxyRequest(req *domain.ProxyRequest) {
h.broadcast <- WSMessage{
sanitized := event.SanitizeProxyRequestForBroadcast(req)
var data interface{} = sanitized
var meta string
if sanitized != nil {
// 无论 Sanitize 是否返回原指针,都强制做一次浅拷贝快照,避免异步消费者读到后续可变更的数据。
snapshot := *sanitized
data = snapshot
meta = "requestID=" + snapshot.RequestID
if snapshot.ID != 0 {
meta += " requestDbID=" + strconv.FormatUint(snapshot.ID, 10)
}
}
msg := WSMessage{
Type: "proxy_request_update",
Data: req,
Data: data,
}
h.tryEnqueueBroadcast(msg, meta)
}

func (h *WebSocketHub) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) {
h.broadcast <- WSMessage{
sanitized := event.SanitizeProxyUpstreamAttemptForBroadcast(attempt)
var data interface{} = sanitized
var meta string
if sanitized != nil {
snapshot := *sanitized
data = snapshot
if snapshot.ProxyRequestID != 0 {
meta = "proxyRequestID=" + strconv.FormatUint(snapshot.ProxyRequestID, 10)
}
if snapshot.ID != 0 {
if meta != "" {
meta += " "
}
meta += "attemptDbID=" + strconv.FormatUint(snapshot.ID, 10)
}
}
msg := WSMessage{
Type: "proxy_upstream_attempt_update",
Data: attempt,
Data: data,
}
h.tryEnqueueBroadcast(msg, meta)
}

// BroadcastMessage sends a custom message with specified type to all connected clients
func (h *WebSocketHub) BroadcastMessage(messageType string, data interface{}) {
h.broadcast <- WSMessage{
// 约定:BroadcastMessage 允许调用方传入 map/struct/指针等可变对象。
//
// 但由于实际发送是异步的(入队后由 run() 写到各连接),如果这里直接把可变指针放进 channel,
// 调用方在入队后继续修改数据,会导致与 BroadcastProxyRequest 类似的数据竞态。
//
// 因此这里先把 data 预先序列化为 json.RawMessage,形成不可变快照;后续 WriteJSON 会直接写入该快照。
var snapshot interface{} = data
if data != nil {
if raw, ok := data.(json.RawMessage); ok {
snapshot = raw
} else {
b, err := json.Marshal(data)
if err != nil {
log.Printf("[WebSocket] drop broadcast message type=%s: marshal snapshot failed: %v", messageType, err)
return
}
snapshot = json.RawMessage(b)
}
}
msg := WSMessage{
Type: messageType,
Data: data,
Data: snapshot,
}
h.tryEnqueueBroadcast(msg, "")
}

// BroadcastLog sends a log message to all connected clients
func (h *WebSocketHub) BroadcastLog(message string) {
h.broadcast <- WSMessage{
msg := WSMessage{
Type: "log_message",
Data: message,
}
h.tryEnqueueBroadcast(msg, "")
}

// WebSocketLogWriter implements io.Writer to capture logs and broadcast via WebSocket
Expand Down
Loading