diff --git a/internal/event/broadcaster.go b/internal/event/broadcaster.go index 33382877..326ef352 100644 --- a/internal/event/broadcaster.go +++ b/internal/event/broadcaster.go @@ -14,7 +14,42 @@ type Broadcaster interface { // NopBroadcaster 空实现,用于测试或不需要广播的场景 type NopBroadcaster struct{} -func (n *NopBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {} +func (n *NopBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {} func (n *NopBroadcaster) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) {} -func (n *NopBroadcaster) BroadcastLog(message string) {} -func (n *NopBroadcaster) BroadcastMessage(messageType string, data interface{}) {} +func (n *NopBroadcaster) BroadcastLog(message string) {} +func (n *NopBroadcaster) BroadcastMessage(messageType string, data interface{}) {} + +// SanitizeProxyRequestForBroadcast 用于“实时广播”场景瘦身 payload: +// 去掉 request/response 大字段,避免 WebSocket 消息动辄几十/几百 KB,导致前端 JSON.parse / GC 卡死。 +// +// 说明: +// - /requests 列表页只需要轻量字段(状态、耗时、tokens、成本等)。 +// - 详情页需要的大字段应通过 /admin/requests/{id} 与 /admin/requests/{id}/attempts 拉取。 +func SanitizeProxyRequestForBroadcast(req *domain.ProxyRequest) *domain.ProxyRequest { + if req == nil { + return nil + } + // 已经是瘦身后的对象,避免重复拷贝(高频场景会产生额外 GC 压力) + if req.RequestInfo == nil && req.ResponseInfo == nil { + return req + } + copied := *req + copied.RequestInfo = nil + copied.ResponseInfo = nil + return &copied +} + +// SanitizeProxyUpstreamAttemptForBroadcast 用于“实时广播”场景瘦身 payload。 +func SanitizeProxyUpstreamAttemptForBroadcast(attempt *domain.ProxyUpstreamAttempt) *domain.ProxyUpstreamAttempt { + if attempt == nil { + return nil + } + // 已经是瘦身后的对象,避免重复拷贝(高频场景会产生额外 GC 压力) + if attempt.RequestInfo == nil && attempt.ResponseInfo == nil { + return attempt + } + copied := *attempt + copied.RequestInfo = nil + copied.ResponseInfo = nil + return &copied +} diff --git a/internal/event/wails_broadcaster_desktop.go b/internal/event/wails_broadcaster_desktop.go index 3afc62ae..377c6989 100644 --- a/internal/event/wails_broadcaster_desktop.go +++ b/internal/event/wails_broadcaster_desktop.go @@ -47,6 +47,7 @@ func (w *WailsBroadcaster) emitWailsEvent(eventType string, data interface{}) { // BroadcastProxyRequest broadcasts a proxy request update func (w *WailsBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) { + req = SanitizeProxyRequestForBroadcast(req) // Broadcast via inner broadcaster (WebSocket) if w.inner != nil { w.inner.BroadcastProxyRequest(req) @@ -57,6 +58,7 @@ func (w *WailsBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) { // BroadcastProxyUpstreamAttempt broadcasts a proxy upstream attempt update func (w *WailsBroadcaster) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) { + attempt = SanitizeProxyUpstreamAttemptForBroadcast(attempt) if w.inner != nil { w.inner.BroadcastProxyUpstreamAttempt(attempt) } diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 6e8d4bb0..a24451c0 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -1,4 +1,4 @@ -package executor +package executor import ( "context" @@ -321,51 +321,71 @@ func (e *Executor) processAdapterEventsRealtime(eventChan domain.AdapterEventCha return } - for event := range eventChan { - if event == nil { - continue - } + // 事件节流:合并多个 adapter 事件为一次广播,避免在流式高并发下产生“广播风暴” + const broadcastThrottle = 200 * time.Millisecond + ticker := time.NewTicker(broadcastThrottle) + defer ticker.Stop() - needsBroadcast := false + dirty := false - switch event.Type { - case domain.EventRequestInfo: - if !e.shouldClearRequestDetail() && event.RequestInfo != nil { - attempt.RequestInfo = event.RequestInfo - needsBroadcast = true - } - case domain.EventResponseInfo: - if !e.shouldClearRequestDetail() && event.ResponseInfo != nil { - attempt.ResponseInfo = event.ResponseInfo - needsBroadcast = true - } - case domain.EventMetrics: - if event.Metrics != nil { - attempt.InputTokenCount = event.Metrics.InputTokens - attempt.OutputTokenCount = event.Metrics.OutputTokens - attempt.CacheReadCount = event.Metrics.CacheReadCount - attempt.CacheWriteCount = event.Metrics.CacheCreationCount - attempt.Cache5mWriteCount = event.Metrics.Cache5mCreationCount - attempt.Cache1hWriteCount = event.Metrics.Cache1hCreationCount - needsBroadcast = true - } - case domain.EventResponseModel: - if event.ResponseModel != "" { - attempt.ResponseModel = event.ResponseModel - needsBroadcast = true + flush := func() { + if !dirty || e.broadcaster == nil { + dirty = false + return + } + // 广播前做一次瘦身 + 快照,避免发送大字段、也避免指针被后续修改导致数据竞争 + snapshot := event.SanitizeProxyUpstreamAttemptForBroadcast(attempt) + e.broadcaster.BroadcastProxyUpstreamAttempt(snapshot) + dirty = false + } + + for { + select { + case ev, ok := <-eventChan: + if !ok { + flush() + return } - case domain.EventFirstToken: - if event.FirstTokenTime > 0 { - // Calculate TTFT as duration from start time to first token time - firstTokenTime := time.UnixMilli(event.FirstTokenTime) - attempt.TTFT = firstTokenTime.Sub(attempt.StartTime) - needsBroadcast = true + if ev == nil { + continue } - } - // Broadcast update immediately for real-time visibility - if needsBroadcast && e.broadcaster != nil { - e.broadcaster.BroadcastProxyUpstreamAttempt(attempt) + switch ev.Type { + case domain.EventRequestInfo: + if !e.shouldClearRequestDetail() && ev.RequestInfo != nil { + attempt.RequestInfo = ev.RequestInfo + dirty = true + } + case domain.EventResponseInfo: + if !e.shouldClearRequestDetail() && ev.ResponseInfo != nil { + attempt.ResponseInfo = ev.ResponseInfo + dirty = true + } + case domain.EventMetrics: + if ev.Metrics != nil { + attempt.InputTokenCount = ev.Metrics.InputTokens + attempt.OutputTokenCount = ev.Metrics.OutputTokens + attempt.CacheReadCount = ev.Metrics.CacheReadCount + attempt.CacheWriteCount = ev.Metrics.CacheCreationCount + attempt.Cache5mWriteCount = ev.Metrics.Cache5mCreationCount + attempt.Cache1hWriteCount = ev.Metrics.Cache1hCreationCount + dirty = true + } + case domain.EventResponseModel: + if ev.ResponseModel != "" { + attempt.ResponseModel = ev.ResponseModel + dirty = true + } + case domain.EventFirstToken: + if ev.FirstTokenTime > 0 { + // Calculate TTFT as duration from start time to first token time + firstTokenTime := time.UnixMilli(ev.FirstTokenTime) + attempt.TTFT = firstTokenTime.Sub(attempt.StartTime) + dirty = true + } + } + case <-ticker.C: + flush() } } } diff --git a/internal/handler/websocket.go b/internal/handler/websocket.go index 6e49c29e..642b5257 100644 --- a/internal/handler/websocket.go +++ b/internal/handler/websocket.go @@ -2,14 +2,19 @@ package handler import ( "bufio" + "encoding/json" "io" "log" "net/http" "os" + "strconv" "strings" "sync" + "sync/atomic" + "time" "github.com/awsl-project/maxx/internal/domain" + "github.com/awsl-project/maxx/internal/event" "github.com/gorilla/websocket" ) @@ -28,8 +33,13 @@ type WebSocketHub struct { clients map[*websocket.Conn]bool broadcast chan WSMessage mu sync.RWMutex + + // broadcast channel 满时的丢弃计数(热路径:只做原子累加) + broadcastDroppedTotal atomic.Uint64 } +const websocketWriteTimeout = 5 * time.Second + func NewWebSocketHub() *WebSocketHub { hub := &WebSocketHub{ clients: make(map[*websocket.Conn]bool), @@ -41,15 +51,47 @@ func NewWebSocketHub() *WebSocketHub { func (h *WebSocketHub) run() { for msg := range h.broadcast { + // 避免在持锁状态下进行网络写入;同时修复 RLock 下 delete map 的数据竞争风险 h.mu.RLock() + clients := make([]*websocket.Conn, 0, len(h.clients)) for client := range h.clients { - err := client.WriteJSON(msg) - if err != nil { - client.Close() + clients = append(clients, client) + } + h.mu.RUnlock() + + var toRemove []*websocket.Conn + for _, client := range clients { + _ = client.SetWriteDeadline(time.Now().Add(websocketWriteTimeout)) + if err := client.WriteJSON(msg); err != nil { + _ = client.Close() + toRemove = append(toRemove, client) + } + } + + if len(toRemove) > 0 { + h.mu.Lock() + for _, client := range toRemove { delete(h.clients, client) } + h.mu.Unlock() + } + } +} + +func (h *WebSocketHub) tryEnqueueBroadcast(msg WSMessage, meta string) { + select { + case h.broadcast <- msg: + default: + dropped := h.broadcastDroppedTotal.Add(1) + // 避免日志刷屏:首次 + 每100次打印一次,确保可观测性但不拖慢热路径。 + if dropped == 1 || dropped%100 == 0 { + meta = strings.TrimSpace(meta) + if meta != "" { + log.Printf("[WebSocket] drop broadcast message type=%s %s dropped_total=%d", msg.Type, meta, dropped) + } else { + log.Printf("[WebSocket] drop broadcast message type=%s dropped_total=%d", msg.Type, dropped) + } } - h.mu.RUnlock() } } @@ -81,33 +123,84 @@ func (h *WebSocketHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { } func (h *WebSocketHub) BroadcastProxyRequest(req *domain.ProxyRequest) { - h.broadcast <- WSMessage{ + sanitized := event.SanitizeProxyRequestForBroadcast(req) + var data interface{} = sanitized + var meta string + if sanitized != nil { + // 无论 Sanitize 是否返回原指针,都强制做一次浅拷贝快照,避免异步消费者读到后续可变更的数据。 + snapshot := *sanitized + data = snapshot + meta = "requestID=" + snapshot.RequestID + if snapshot.ID != 0 { + meta += " requestDbID=" + strconv.FormatUint(snapshot.ID, 10) + } + } + msg := WSMessage{ Type: "proxy_request_update", - Data: req, + Data: data, } + h.tryEnqueueBroadcast(msg, meta) } func (h *WebSocketHub) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) { - h.broadcast <- WSMessage{ + sanitized := event.SanitizeProxyUpstreamAttemptForBroadcast(attempt) + var data interface{} = sanitized + var meta string + if sanitized != nil { + snapshot := *sanitized + data = snapshot + if snapshot.ProxyRequestID != 0 { + meta = "proxyRequestID=" + strconv.FormatUint(snapshot.ProxyRequestID, 10) + } + if snapshot.ID != 0 { + if meta != "" { + meta += " " + } + meta += "attemptDbID=" + strconv.FormatUint(snapshot.ID, 10) + } + } + msg := WSMessage{ Type: "proxy_upstream_attempt_update", - Data: attempt, + Data: data, } + h.tryEnqueueBroadcast(msg, meta) } // BroadcastMessage sends a custom message with specified type to all connected clients func (h *WebSocketHub) BroadcastMessage(messageType string, data interface{}) { - h.broadcast <- WSMessage{ + // 约定:BroadcastMessage 允许调用方传入 map/struct/指针等可变对象。 + // + // 但由于实际发送是异步的(入队后由 run() 写到各连接),如果这里直接把可变指针放进 channel, + // 调用方在入队后继续修改数据,会导致与 BroadcastProxyRequest 类似的数据竞态。 + // + // 因此这里先把 data 预先序列化为 json.RawMessage,形成不可变快照;后续 WriteJSON 会直接写入该快照。 + var snapshot interface{} = data + if data != nil { + if raw, ok := data.(json.RawMessage); ok { + snapshot = raw + } else { + b, err := json.Marshal(data) + if err != nil { + log.Printf("[WebSocket] drop broadcast message type=%s: marshal snapshot failed: %v", messageType, err) + return + } + snapshot = json.RawMessage(b) + } + } + msg := WSMessage{ Type: messageType, - Data: data, + Data: snapshot, } + h.tryEnqueueBroadcast(msg, "") } // BroadcastLog sends a log message to all connected clients func (h *WebSocketHub) BroadcastLog(message string) { - h.broadcast <- WSMessage{ + msg := WSMessage{ Type: "log_message", Data: message, } + h.tryEnqueueBroadcast(msg, "") } // WebSocketLogWriter implements io.Writer to capture logs and broadcast via WebSocket diff --git a/internal/handler/websocket_test.go b/internal/handler/websocket_test.go new file mode 100644 index 00000000..c84c7cac --- /dev/null +++ b/internal/handler/websocket_test.go @@ -0,0 +1,158 @@ +package handler + +import ( + "bytes" + "encoding/json" + "log" + "strings" + "testing" + + "github.com/awsl-project/maxx/internal/domain" +) + +func TestWebSocketHub_BroadcastProxyRequest_SendsSnapshot(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + + req := &domain.ProxyRequest{ + ID: 1, + RequestID: "req_1", + Status: "IN_PROGRESS", + } + + hub.BroadcastProxyRequest(req) + + // 如果 Broadcast 发送的是同一个指针,那么这里对原对象的修改会“污染”队列中的消息。 + req.Status = "COMPLETED" + + msg := <-hub.broadcast + if msg.Type != "proxy_request_update" { + t.Fatalf("unexpected message type: %s", msg.Type) + } + + switch v := msg.Data.(type) { + case *domain.ProxyRequest: + if v == req { + t.Fatalf("expected snapshot (different pointer), got original pointer") + } + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + case domain.ProxyRequest: + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + default: + t.Fatalf("unexpected data type: %T", msg.Data) + } +} + +func TestWebSocketHub_BroadcastProxyUpstreamAttempt_SendsSnapshot(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + + attempt := &domain.ProxyUpstreamAttempt{ + ID: 2, + ProxyRequestID: 1, + Status: "IN_PROGRESS", + } + + hub.BroadcastProxyUpstreamAttempt(attempt) + attempt.Status = "COMPLETED" + + msg := <-hub.broadcast + if msg.Type != "proxy_upstream_attempt_update" { + t.Fatalf("unexpected message type: %s", msg.Type) + } + + switch v := msg.Data.(type) { + case *domain.ProxyUpstreamAttempt: + if v == attempt { + t.Fatalf("expected snapshot (different pointer), got original pointer") + } + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + case domain.ProxyUpstreamAttempt: + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + default: + t.Fatalf("unexpected data type: %T", msg.Data) + } +} + +func TestWebSocketHub_BroadcastProxyRequest_LogsWhenDropped(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + hub.broadcast <- WSMessage{Type: "dummy", Data: nil} + + var buf bytes.Buffer + oldOutput := log.Writer() + oldFlags := log.Flags() + oldPrefix := log.Prefix() + log.SetOutput(&buf) + log.SetFlags(0) + log.SetPrefix("") + defer func() { + log.SetOutput(oldOutput) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + req := &domain.ProxyRequest{ + ID: 1, + RequestID: "req_1", + Status: "IN_PROGRESS", + } + + hub.BroadcastProxyRequest(req) + + out := buf.String() + if !strings.Contains(out, "drop") && !strings.Contains(out, "丢弃") { + t.Fatalf("expected drop log, got: %q", out) + } + if !strings.Contains(out, "proxy_request_update") { + t.Fatalf("expected message type in log, got: %q", out) + } + if !strings.Contains(out, "req_1") { + t.Fatalf("expected requestID in log, got: %q", out) + } +} + +func TestWebSocketHub_BroadcastMessage_SendsSnapshot(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + + type payload struct { + A int `json:"a"` + } + + p := &payload{A: 1} + hub.BroadcastMessage("custom_event", p) + + // 如果 BroadcastMessage 直接把指针放进队列,这里修改会污染后续消费者看到的数据。 + p.A = 2 + + msg := <-hub.broadcast + if msg.Type != "custom_event" { + t.Fatalf("unexpected message type: %s", msg.Type) + } + + raw, ok := msg.Data.(json.RawMessage) + if !ok { + t.Fatalf("expected Data to be json.RawMessage snapshot, got %T", msg.Data) + } + + var got payload + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("failed to unmarshal snapshot: %v", err) + } + if got.A != 1 { + t.Fatalf("expected snapshot A=1, got %d", got.A) + } +} diff --git a/internal/repository/sqlite/migrations.go b/internal/repository/sqlite/migrations.go index e28dfc65..946f0441 100644 --- a/internal/repository/sqlite/migrations.go +++ b/internal/repository/sqlite/migrations.go @@ -1,10 +1,13 @@ package sqlite import ( + "errors" "log" "sort" + "strings" "time" + mysqlDriver "github.com/go-sql-driver/mysql" "gorm.io/gorm" ) @@ -51,6 +54,50 @@ var migrations = []Migration{ return nil }, }, + { + Version: 2, + Description: "Add index on proxy_requests.provider_id", + Up: func(db *gorm.DB) error { + // 说明:这是高频列表/过滤路径的关键优化点。 + // 不同数据库方言对 IF NOT EXISTS 的支持不同,这里做最小兼容处理。 + switch db.Dialector.Name() { + case "mysql": + err := db.Exec("CREATE INDEX idx_proxy_requests_provider_id ON proxy_requests(provider_id)").Error + if isMySQLDuplicateIndexError(err) { + return nil + } + return err + default: + return db.Exec("CREATE INDEX IF NOT EXISTS idx_proxy_requests_provider_id ON proxy_requests(provider_id)").Error + } + }, + Down: func(db *gorm.DB) error { + switch db.Dialector.Name() { + case "mysql": + // MySQL 不支持 DROP INDEX IF EXISTS;这里尽量执行,失败则忽略(回滚不是主路径)。 + sql := "DROP INDEX idx_proxy_requests_provider_id ON proxy_requests" + if err := db.Exec(sql).Error; err != nil { + log.Printf("[Migration] Warning: rollback v2 failed (dialector=mysql) sql=%q err=%v", sql, err) + } + return nil + default: + return db.Exec("DROP INDEX IF EXISTS idx_proxy_requests_provider_id").Error + } + }, + }, +} + +func isMySQLDuplicateIndexError(err error) bool { + if err == nil { + return false + } + var mysqlErr *mysqlDriver.MySQLError + if errors.As(err, &mysqlErr) { + return mysqlErr.Number == 1061 // ER_DUP_KEYNAME + } + // 兜底:错误可能被包装成字符串,避免使用过宽的 "duplicate" 匹配导致吞掉其它错误。 + lower := strings.ToLower(err.Error()) + return strings.Contains(lower, "duplicate key name") || strings.Contains(lower, "error 1061") } // RunMigrations 运行所有待执行的迁移 @@ -103,6 +150,10 @@ func (d *DB) getCurrentVersion() int { // runMigration 在事务中运行单个迁移 func (d *DB) runMigration(m Migration) error { + // 注意:MySQL 的 DDL(如 CREATE/DROP INDEX)会触发隐式提交(implicit commit), + // 这意味着即使这里用 gorm.Transaction 包裹,MySQL 路径也无法提供严格的“DDL + 迁移记录”原子性。 + // + // 因此迁移实现必须尽量幂等:例如重复执行 CREATE INDEX 时,仅在 ER_DUP_KEYNAME(1061) 场景下视为成功。 return d.gorm.Transaction(func(tx *gorm.DB) error { // 运行迁移 if m.Up != nil { @@ -160,6 +211,8 @@ func (d *DB) RollbackMigration(targetVersion int) error { // rollbackMigration 在事务中回滚单个迁移 func (d *DB) rollbackMigration(m Migration) error { + // 同 runMigration:MySQL DDL 在回滚路径同样可能发生隐式提交,因此这里的事务主要用于把“回滚逻辑” + // 与“删除迁移记录”尽量绑定在一起,但不应假设 MySQL 上能做到严格原子回滚。 return d.gorm.Transaction(func(tx *gorm.DB) error { // 运行回滚 if m.Down != nil { diff --git a/internal/repository/sqlite/migrations_test.go b/internal/repository/sqlite/migrations_test.go new file mode 100644 index 00000000..9f48aa0d --- /dev/null +++ b/internal/repository/sqlite/migrations_test.go @@ -0,0 +1,23 @@ +package sqlite + +import ( + "errors" + "testing" + + mysqlDriver "github.com/go-sql-driver/mysql" +) + +func TestIsMySQLDuplicateIndexError(t *testing.T) { + if !isMySQLDuplicateIndexError(&mysqlDriver.MySQLError{Number: 1061, Message: "Duplicate key name"}) { + t.Fatalf("expected true for ER_DUP_KEYNAME(1061)") + } + if isMySQLDuplicateIndexError(&mysqlDriver.MySQLError{Number: 1146, Message: "Table doesn't exist"}) { + t.Fatalf("expected false for non-duplicate mysql error") + } + if !isMySQLDuplicateIndexError(errors.New("Error 1061: Duplicate key name 'idx_proxy_requests_provider_id'")) { + t.Fatalf("expected true for duplicate key name string match fallback") + } + if isMySQLDuplicateIndexError(errors.New("some other error")) { + t.Fatalf("expected false for unrelated error") + } +} diff --git a/internal/repository/sqlite/models.go b/internal/repository/sqlite/models.go index 4e2e2e6d..4cb956e5 100644 --- a/internal/repository/sqlite/models.go +++ b/internal/repository/sqlite/models.go @@ -1,4 +1,4 @@ -package sqlite +package sqlite import ( "time" @@ -222,7 +222,7 @@ type ProxyRequest struct { Multiplier uint64 // 倍率(10000=1倍) Cost uint64 RouteID uint64 - ProviderID uint64 + ProviderID uint64 `gorm:"index"` IsStream int StatusCode int ProjectID uint64 diff --git a/web/src/contexts/cooldowns-context.tsx b/web/src/contexts/cooldowns-context.tsx index 467ac5c8..eb7eafcb 100644 --- a/web/src/contexts/cooldowns-context.tsx +++ b/web/src/contexts/cooldowns-context.tsx @@ -1,4 +1,4 @@ -/** +/** * Cooldowns Context * 提供共享的 Cooldowns 数据,减少重复请求 */ @@ -7,6 +7,7 @@ import { createContext, useContext, useEffect, useCallback, type ReactNode } fro import { useQuery, useQueryClient, useMutation } from '@tanstack/react-query'; import { getTransport } from '@/lib/transport'; import type { Cooldown } from '@/lib/transport'; +import { subscribeCooldownUpdates } from '@/lib/cooldown-update-subscription'; interface CooldownsContextValue { cooldowns: Cooldown[]; @@ -38,14 +39,7 @@ export function CooldownsProvider({ children }: CooldownsProviderProps) { // Subscribe to cooldown_update WebSocket event useEffect(() => { - const transport = getTransport(); - const unsubscribe = transport.subscribe('cooldown_update', () => { - queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); - }); - - return () => { - unsubscribe(); - }; + return subscribeCooldownUpdates(queryClient); }, [queryClient]); // Mutation for clearing cooldown diff --git a/web/src/hooks/queries/use-providers.ts b/web/src/hooks/queries/use-providers.ts index d5bcfa16..66472f6c 100644 --- a/web/src/hooks/queries/use-providers.ts +++ b/web/src/hooks/queries/use-providers.ts @@ -1,4 +1,4 @@ -/** +/** * Provider React Query Hooks */ @@ -98,6 +98,7 @@ export function useAllProviderStats() { queryKey: [...providerKeys.stats(), 'all'], queryFn: () => getTransport().getProviderStats(), // 不再轮询,改为通过 WebSocket 事件触发刷新 (useProxyRequestUpdates) + staleTime: 5000, }); } diff --git a/web/src/hooks/queries/use-requests.ts b/web/src/hooks/queries/use-requests.ts index a5f5ee6f..eae8d3e6 100644 --- a/web/src/hooks/queries/use-requests.ts +++ b/web/src/hooks/queries/use-requests.ts @@ -84,11 +84,57 @@ export function useProxyRequestUpdates() { const flushIntervalMs = 250; const pendingRequests = new Map(); + const pendingAttemptsByRequest = new Map>(); const knownRequestIds = new Set(); let flushTimer: ReturnType | null = null; + const flushAttempts = () => { + if (pendingAttemptsByRequest.size === 0) { + return; + } + + const entries = Array.from(pendingAttemptsByRequest.entries()); + pendingAttemptsByRequest.clear(); + + for (const [proxyRequestID, attemptsById] of entries) { + const attemptsKey = requestKeys.attempts(proxyRequestID); + const attemptsQuery = queryCache.find({ queryKey: attemptsKey, exact: true }); + if (!attemptsQuery || attemptsQuery.getObserversCount() === 0) { + continue; + } + + const updates = Array.from(attemptsById.values()); + + queryClient.setQueryData(attemptsKey, (old) => { + const list = old ? [...old] : []; + + for (const updatedAttempt of updates) { + const index = list.findIndex((a) => a.id === updatedAttempt.id); + if (index >= 0) { + const prev = list[index]; + list[index] = { + ...prev, + ...updatedAttempt, + requestInfo: updatedAttempt.requestInfo ?? prev.requestInfo, + responseInfo: updatedAttempt.responseInfo ?? prev.responseInfo, + }; + continue; + } + list.push(updatedAttempt); + } + + return list; + }); + } + }; + const flush = () => { + if (pendingRequests.size === 0 && pendingAttemptsByRequest.size === 0) { + return; + } + if (pendingRequests.size === 0) { + flushAttempts(); return; } @@ -117,7 +163,19 @@ export function useProxyRequestUpdates() { const detailKey = requestKeys.detail(requestId); const detailQuery = queryCache.find({ queryKey: detailKey, exact: true }); if (detailQuery && detailQuery.getObserversCount() > 0) { - queryClient.setQueryData(detailKey, updatedRequest); + // 后端可能会对 WS 广播做“瘦身”(不带 requestInfo/responseInfo 大字段), + // 这里合并旧值,避免把详情页已加载的内容覆盖成空。 + queryClient.setQueryData(detailKey, (old) => { + if (!old) { + return updatedRequest; + } + return { + ...old, + ...updatedRequest, + requestInfo: updatedRequest.requestInfo ?? old.requestInfo, + responseInfo: updatedRequest.responseInfo ?? old.responseInfo, + }; + }); isKnown = true; } @@ -292,6 +350,8 @@ export function useProxyRequestUpdates() { if (invalidateCooldowns) { queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); } + + flushAttempts(); }; const scheduleFlush = () => { @@ -320,19 +380,13 @@ export function useProxyRequestUpdates() { return; } - queryClient.setQueryData( - attemptsKey, - (old) => { - if (!old) return [updatedAttempt]; - const index = old.findIndex((a) => a.id === updatedAttempt.id); - if (index >= 0) { - const newList = [...old]; - newList[index] = updatedAttempt; - return newList; - } - return [...old, updatedAttempt]; - }, - ); + let perRequest = pendingAttemptsByRequest.get(updatedAttempt.proxyRequestID); + if (!perRequest) { + perRequest = new Map(); + pendingAttemptsByRequest.set(updatedAttempt.proxyRequestID, perRequest); + } + perRequest.set(updatedAttempt.id, updatedAttempt); + scheduleFlush(); }, ); @@ -342,6 +396,7 @@ export function useProxyRequestUpdates() { flushTimer = null; } pendingRequests.clear(); + pendingAttemptsByRequest.clear(); unsubscribeRequest(); unsubscribeAttempt(); }; diff --git a/web/src/hooks/use-cooldowns.ts b/web/src/hooks/use-cooldowns.ts index aa21fe2f..96798eb6 100644 --- a/web/src/hooks/use-cooldowns.ts +++ b/web/src/hooks/use-cooldowns.ts @@ -1,7 +1,8 @@ -import { useQuery, useQueryClient, useMutation } from '@tanstack/react-query'; +import { useQuery, useQueryClient, useMutation } from '@tanstack/react-query'; import { getTransport } from '@/lib/transport'; import type { Cooldown } from '@/lib/transport'; import { useEffect, useState, useCallback } from 'react'; +import { subscribeCooldownUpdates } from '@/lib/cooldown-update-subscription'; export function useCooldowns() { const queryClient = useQueryClient(); @@ -20,15 +21,7 @@ export function useCooldowns() { // Subscribe to cooldown_update WebSocket event useEffect(() => { - const transport = getTransport(); - const unsubscribe = transport.subscribe('cooldown_update', () => { - // Invalidate and refetch cooldowns when a cooldown update is received - queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); - }); - - return () => { - unsubscribe(); - }; + return subscribeCooldownUpdates(queryClient); }, [queryClient]); // Mutation for clearing cooldown diff --git a/web/src/hooks/use-streaming.ts b/web/src/hooks/use-streaming.ts index af97b864..22a6762d 100644 --- a/web/src/hooks/use-streaming.ts +++ b/web/src/hooks/use-streaming.ts @@ -1,9 +1,9 @@ -/** +/** * Streaming Requests Hook * 追踪实时活动请求状态 */ -import { useState, useEffect, useCallback, useRef } from 'react'; +import { useState, useEffect, useCallback, useRef, useMemo } from 'react'; import { getTransport, type ProxyRequest, type ClientType } from '@/lib/transport'; export interface StreamingState { @@ -105,43 +105,46 @@ export function useStreamingRequests(): StreamingState { }; }, [handleRequestUpdate, loadActiveRequests]); - // 计算按 clientType 和 providerID 的统计 - const countsByClient = new Map(); - const countsByProvider = new Map(); - const countsByProviderAndClient = new Map(); - const countsByRoute = new Map(); - - for (const request of activeRequests.values()) { - // 按 clientType 统计 - const clientCount = countsByClient.get(request.clientType) || 0; - countsByClient.set(request.clientType, clientCount + 1); - - // 按 routeID 统计 - if (request.routeID > 0) { - const routeCount = countsByRoute.get(request.routeID) || 0; - countsByRoute.set(request.routeID, routeCount + 1); - } + return useMemo((): StreamingState => { + // 计算按 clientType 和 providerID 的统计 + const countsByClient = new Map(); + const countsByProvider = new Map(); + const countsByProviderAndClient = new Map(); + const countsByRoute = new Map(); + const requests = Array.from(activeRequests.values()); + + for (const request of requests) { + // 按 clientType 统计 + const clientCount = countsByClient.get(request.clientType) || 0; + countsByClient.set(request.clientType, clientCount + 1); + + // 按 routeID 统计 + if (request.routeID > 0) { + const routeCount = countsByRoute.get(request.routeID) || 0; + countsByRoute.set(request.routeID, routeCount + 1); + } - // 按 providerID 统计 - if (request.providerID > 0) { - const providerCount = countsByProvider.get(request.providerID) || 0; - countsByProvider.set(request.providerID, providerCount + 1); + // 按 providerID 统计 + if (request.providerID > 0) { + const providerCount = countsByProvider.get(request.providerID) || 0; + countsByProvider.set(request.providerID, providerCount + 1); - // 按 providerID + clientType 组合统计 - const key = `${request.providerID}:${request.clientType}`; - const combinedCount = countsByProviderAndClient.get(key) || 0; - countsByProviderAndClient.set(key, combinedCount + 1); + // 按 providerID + clientType 组合统计 + const key = `${request.providerID}:${request.clientType}`; + const combinedCount = countsByProviderAndClient.get(key) || 0; + countsByProviderAndClient.set(key, combinedCount + 1); + } } - } - - return { - total: activeRequests.size, - requests: Array.from(activeRequests.values()), - countsByClient, - countsByProvider, - countsByProviderAndClient, - countsByRoute, - }; + + return { + total: activeRequests.size, + requests, + countsByClient, + countsByProvider, + countsByProviderAndClient, + countsByRoute, + }; + }, [activeRequests]); } /** diff --git a/web/src/lib/cooldown-update-subscription.ts b/web/src/lib/cooldown-update-subscription.ts new file mode 100644 index 00000000..8ccdd627 --- /dev/null +++ b/web/src/lib/cooldown-update-subscription.ts @@ -0,0 +1,29 @@ +import type { QueryClient } from '@tanstack/react-query'; +import { getTransport } from '@/lib/transport'; + +type Unsubscribe = () => void; + +let transportUnsubscribe: Unsubscribe | null = null; +const queryClients = new Set(); + +export function subscribeCooldownUpdates(queryClient: QueryClient): Unsubscribe { + queryClients.add(queryClient); + + if (!transportUnsubscribe) { + const transport = getTransport(); + transportUnsubscribe = transport.subscribe('cooldown_update', () => { + for (const qc of queryClients) { + qc.invalidateQueries({ queryKey: ['cooldowns'] }); + } + }); + } + + return () => { + queryClients.delete(queryClient); + if (queryClients.size === 0 && transportUnsubscribe) { + transportUnsubscribe(); + transportUnsubscribe = null; + } + }; +} + diff --git a/web/src/pages/requests/index.tsx b/web/src/pages/requests/index.tsx index 6d3f8bfb..b84df678 100644 --- a/web/src/pages/requests/index.tsx +++ b/web/src/pages/requests/index.tsx @@ -1,4 +1,5 @@ -import { useState, useMemo, useRef, useEffect } from 'react'; +import { useState, useMemo, useRef, useEffect } from 'react'; +import { useCallback, memo } from 'react'; import { useNavigate } from 'react-router-dom'; import { useTranslation } from 'react-i18next'; import { @@ -167,6 +168,13 @@ export function RequestsPage() { scrollContainerRef.current?.scrollTo({ top: 0 }); }; + const handleOpenRequest = useCallback( + (id: number) => { + navigate(`/requests/${id}`); + }, + [navigate], + ); + return (
{allRequests.map((req) => ( - navigate(`/requests/${req.id}`)} + onOpenRequest={handleOpenRequest} /> ))} {/* 触底加载指示器 */} @@ -312,11 +320,10 @@ export function RequestsPage() { - {allRequests.map((req, index) => ( - ( + navigate(`/requests/${req.id}`)} + onOpenRequest={handleOpenRequest} /> ))} @@ -480,9 +487,21 @@ function CostCell({ cost }: { cost: number }) { } // Log Row Component +type LogRowProps = { + request: ProxyRequest; + providerName?: string; + projectName?: string; + tokenName?: string; + showProjectColumn?: boolean; + showTokenColumn?: boolean; + forceProjectBinding?: boolean; + nowMs: number; + enableMarquee: boolean; + onOpenRequest: (id: number) => void; +}; + function LogRow({ request, - index, providerName, projectName, tokenName, @@ -491,20 +510,8 @@ function LogRow({ forceProjectBinding, nowMs, enableMarquee, - onClick, -}: { - request: ProxyRequest; - index: number; - providerName?: string; - projectName?: string; - tokenName?: string; - showProjectColumn?: boolean; - showTokenColumn?: boolean; - forceProjectBinding?: boolean; - nowMs: number; - enableMarquee: boolean; - onClick: () => void; -}) { + onOpenRequest, +}: LogRowProps) { const isPending = request.status === 'PENDING' || request.status === 'IN_PROGRESS'; const isFailed = request.status === 'FAILED'; const isPendingBinding = @@ -563,26 +570,25 @@ function LogRow({ // Get HTTP status code (use denormalized field for list performance) const statusCode = request.statusCode || request.responseInfo?.status; - // Zebra striping base class - const zebraClass = index % 2 === 1 ? 'bg-foreground/[0.03]' : ''; + const handleClick = () => onOpenRequest(request.id); return ( , next: Readonly) => { + if (prev.request !== next.request) return false; + if (prev.providerName !== next.providerName) return false; + if (prev.projectName !== next.projectName) return false; + if (prev.tokenName !== next.tokenName) return false; + if (prev.showProjectColumn !== next.showProjectColumn) return false; + if (prev.showTokenColumn !== next.showTokenColumn) return false; + if (prev.forceProjectBinding !== next.forceProjectBinding) return false; + if (prev.enableMarquee !== next.enableMarquee) return false; + if (prev.onOpenRequest !== next.onOpenRequest) return false; + + const prevPending = prev.request.status === 'PENDING' || prev.request.status === 'IN_PROGRESS'; + const nextPending = next.request.status === 'PENDING' || next.request.status === 'IN_PROGRESS'; + if (prevPending || nextPending) { + return prev.nowMs === next.nowMs; + } + + return true; + }, +); + // Mobile Request Card Component -function MobileRequestCard({ - request, - providerName, - onClick, -}: { +type MobileRequestCardProps = { request: ProxyRequest; providerName?: string; - onClick: () => void; -}) { + onOpenRequest: (id: number) => void; +}; + +function MobileRequestCard({ request, providerName, onOpenRequest }: MobileRequestCardProps) { const isPending = request.status === 'PENDING' || request.status === 'IN_PROGRESS'; const isFailed = request.status === 'FAILED'; + const handleClick = useCallback(() => onOpenRequest(request.id), [onOpenRequest, request.id]); const formatTime = (dateStr: string) => { const date = new Date(dateStr); @@ -790,7 +818,7 @@ function MobileRequestCard({ return (
, next: Readonly) => { + if (prev.request !== next.request) return false; + if (prev.providerName !== next.providerName) return false; + if (prev.onOpenRequest !== next.onOpenRequest) return false; + return true; + }, +); + // Provider Filter Component using Select function ProviderFilter({ providers,