From e09d109aac85d3b99e8f78d8f1d055fd16e98ee7 Mon Sep 17 00:00:00 2001 From: Asura Date: Sun, 22 Feb 2026 20:01:03 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20=E5=89=8D=E7=AB=AF/req?= =?UTF-8?q?uests=20=E5=8D=A1=E6=88=90=E7=8B=97=E5=B1=8E=EF=BC=8C=20api/adm?= =?UTF-8?q?in/requests=3Flimit=3D100=20=20=E5=90=8E=E7=AB=AF=E6=85=A2?= =?UTF-8?q?=E6=88=90=E7=8B=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/repository/sqlite/proxy_request.go | 14 +- web/src/hooks/queries/use-requests.ts | 222 +++++++++++++------- web/src/pages/requests/index.tsx | 78 ++++--- 3 files changed, 202 insertions(+), 112 deletions(-) diff --git a/internal/repository/sqlite/proxy_request.go b/internal/repository/sqlite/proxy_request.go index cc1fe2e4..96fdbe68 100644 --- a/internal/repository/sqlite/proxy_request.go +++ b/internal/repository/sqlite/proxy_request.go @@ -101,9 +101,17 @@ func (r *ProxyRequestRepository) ListCursor(limit int, before, after uint64, fil } var models []ProxyRequest - // 按结束时间排序:未完成的请求(end_time=0)在最前面,已完成的按 end_time DESC 排序 - // SQLite 不支持 NULLS FIRST,使用 CASE WHEN 实现 - if err := query.Order("CASE WHEN end_time = 0 THEN 0 ELSE 1 END, end_time DESC, id DESC").Limit(limit).Find(&models).Error; err != nil { + // 注意:这里使用基于主键 id 的稳定排序(与 before/after 游标保持一致),避免复杂 ORDER BY + // 导致 SQLite 无法利用索引、在大数据量下触发全表排序,从而出现“limit=100 仍需数分钟”的性能问题。 + // + // 约定: + // - 默认/向后翻页(before):按 id DESC(最新在前) + // - 向前翻页/获取新数据(after):按 id ASC(较旧的新数据在前,便于按时间顺序追加) + orderBy := "id DESC" + if after > 0 { + orderBy = "id ASC" + } + if err := query.Order(orderBy).Limit(limit).Find(&models).Error; err != nil { return nil, err } return r.toDomainList(models), nil diff --git a/web/src/hooks/queries/use-requests.ts b/web/src/hooks/queries/use-requests.ts index ac36dc32..a5f5ee6f 100644 --- a/web/src/hooks/queries/use-requests.ts +++ b/web/src/hooks/queries/use-requests.ts @@ -1,4 +1,4 @@ -/** +/** * ProxyRequest React Query Hooks */ @@ -80,31 +80,54 @@ export function useProxyRequestUpdates() { useEffect(() => { const transport = getTransport(); + const queryCache = queryClient.getQueryCache(); + + const flushIntervalMs = 250; + const pendingRequests = new Map(); + const knownRequestIds = new Set(); + let flushTimer: ReturnType | null = null; + + const flush = () => { + if (pendingRequests.size === 0) { + return; + } + + const updates = Array.from(pendingRequests.values()); + pendingRequests.clear(); + + const listQueries = queryCache + .findAll({ queryKey: requestKeys.lists() }) + .filter((q) => q.getObserversCount() > 0); + const infiniteQueries = queryCache + .findAll({ queryKey: [...requestKeys.all, 'infinite'] }) + .filter((q) => q.getObserversCount() > 0); + const countQueries = queryCache + .findAll({ queryKey: ['requestsCount'] }) + .filter((q) => q.getObserversCount() > 0); + + let invalidateDashboard = false; + let invalidateProviderStats = false; + let invalidateCooldowns = false; + + for (const updatedRequest of updates) { + const requestId = updatedRequest.id; + let isKnown = knownRequestIds.has(requestId); + + // 仅当详情查询正在被观察时才更新详情缓存,避免列表页“写缓存造内存” + const detailKey = requestKeys.detail(requestId); + const detailQuery = queryCache.find({ queryKey: detailKey, exact: true }); + if (detailQuery && detailQuery.getObserversCount() > 0) { + queryClient.setQueryData(detailKey, updatedRequest); + isKnown = true; + } - // 订阅 ProxyRequest 更新事件 (连接由 main.tsx 统一管理) - const unsubscribeRequest = transport.subscribe( - 'proxy_request_update', - (updatedRequest) => { - // 检查是否是新请求(通过详情缓存判断) - const existingDetail = queryClient.getQueryData(requestKeys.detail(updatedRequest.id)); - const isNewRequest = !existingDetail; - - // 更新单个请求的缓存 - queryClient.setQueryData(requestKeys.detail(updatedRequest.id), updatedRequest); - - // 更新列表缓存(乐观更新)- 适配 CursorPaginationResult 结构 - // 使用 queryCache 遍历所有匹配的查询,以获取每个查询的过滤参数 - const queryCache = queryClient.getQueryCache(); - const listQueries = queryCache.findAll({ queryKey: requestKeys.lists() }); - + // 更新 Cursor 列表查询(仅更新正在被观察的 query) for (const query of listQueries) { const queryKey = query.queryKey as ReturnType; - // 从 queryKey 中提取过滤参数: ['requests', 'list', params] const params = queryKey[2] as CursorPaginationParams | undefined; const filterProviderId = params?.providerId; const filterStatus = params?.status; - // 检查是否匹配过滤条件的辅助函数 const matchesFilter = (request: ProxyRequest) => { if (filterProviderId !== undefined && request.providerID !== filterProviderId) { return false; @@ -140,29 +163,23 @@ export function useProxyRequestUpdates() { }; }; - const index = old.items.findIndex((r) => r.id === updatedRequest.id); + const index = old.items.findIndex((r) => r.id === requestId); if (index >= 0) { - // 已存在的请求:检查是否仍然匹配过滤条件 + isKnown = true; if (!matchesFilter(updatedRequest)) { - // 不再匹配过滤条件,从列表中移除 - const newItems = old.items.filter((r) => r.id !== updatedRequest.id); + const newItems = old.items.filter((r) => r.id !== requestId); return normalizePage(newItems); } - // 仍然匹配,更新 const newItems = [...old.items]; newItems[index] = updatedRequest; return normalizePage(newItems); } - // 新请求:检查是否匹配过滤条件 if (!matchesFilter(updatedRequest)) { - // 不匹配过滤条件,不添加 return old; } - // 新请求添加到列表开头(只在首页,即没有 before 参数的查询) if (params?.before) { - // 不是首页,不添加新请求 return old; } @@ -170,14 +187,9 @@ export function useProxyRequestUpdates() { }); } - // 更新 Infinite Queries 缓存 - const infiniteQueries = queryCache.findAll({ - queryKey: [...requestKeys.all, 'infinite'], - }); - + // 更新 Infinite Queries(仅更新正在被观察的 query) for (const query of infiniteQueries) { const queryKey = query.queryKey as ReturnType; - // queryKey: ['requests', 'infinite', providerId, status] const filterProviderId = queryKey[2] as number | undefined; const filterStatus = queryKey[3] as string | undefined; @@ -197,72 +209,119 @@ export function useProxyRequestUpdates() { }>(queryKey, (old) => { if (!old || !old.pages || old.pages.length === 0) return old; - const updatedPages = old.pages.map((page, pageIndex) => { - const index = page.items.findIndex((r) => r.id === updatedRequest.id); - - if (index >= 0) { - // 已存在的请求:检查是否仍然匹配过滤条件 - if (!matchesFilter(updatedRequest)) { - // 不再匹配,从列表中移除 - const newItems = page.items.filter((r) => r.id !== updatedRequest.id); - return { ...page, items: newItems }; - } - // 仍然匹配,更新 - const newItems = [...page.items]; - newItems[index] = updatedRequest; - return { ...page, items: newItems }; + let hasExisting = false; + + const updatedPages = old.pages.map((page) => { + const index = page.items.findIndex((r) => r.id === requestId); + if (index < 0) { + return page; } - // 只在第一页添加新请求 - if (pageIndex === 0 && isNewRequest && matchesFilter(updatedRequest)) { - return { ...page, items: [updatedRequest, ...page.items] }; + hasExisting = true; + + if (!matchesFilter(updatedRequest)) { + const newItems = page.items.filter((r) => r.id !== requestId); + return { ...page, items: newItems }; } - return page; + const newItems = [...page.items]; + newItems[index] = updatedRequest; + return { ...page, items: newItems }; }); - return { ...old, pages: updatedPages }; + if (hasExisting) { + isKnown = true; + return { ...old, pages: updatedPages }; + } + + if (!matchesFilter(updatedRequest)) { + return { ...old, pages: updatedPages }; + } + + // 仅在第一页插入“新请求”,避免重复插入导致列表膨胀 + const firstPage = updatedPages[0]; + if (!firstPage) { + return { ...old, pages: updatedPages }; + } + + return { + ...old, + pages: [{ ...firstPage, items: [updatedRequest, ...firstPage.items] }, ...updatedPages.slice(1)], + }; }); } - // 新请求时乐观更新 count(需要考虑每个 count 查询的过滤条件) - if (isNewRequest) { - // 遍历所有 requestsCount 缓存 - const countQueries = queryCache.findAll({ queryKey: ['requestsCount'] }); - for (const query of countQueries) { - // queryKey: ['requestsCount', providerId, status] - const filterProviderId = query.queryKey[1] as number | undefined; - const filterStatus = query.queryKey[2] as string | undefined; - // 如果有过滤条件且不匹配,不更新计数 - if (filterProviderId !== undefined && updatedRequest.providerID !== filterProviderId) { - continue; - } - if (filterStatus !== undefined && updatedRequest.status !== filterStatus) { - continue; + // 新请求时乐观更新 count(增加保护:避免因“未观察详情缓存”导致重复 +1) + if (!isKnown) { + const startTimeMs = new Date(updatedRequest.startTime).getTime(); + const looksLikeNewRequest = + updatedRequest.status === 'PENDING' && + Number.isFinite(startTimeMs) && + Date.now() - startTimeMs < 15_000; + + if (looksLikeNewRequest) { + for (const query of countQueries) { + const filterProviderId = query.queryKey[1] as number | undefined; + const filterStatus = query.queryKey[2] as string | undefined; + if (filterProviderId !== undefined && updatedRequest.providerID !== filterProviderId) { + continue; + } + if (filterStatus !== undefined && updatedRequest.status !== filterStatus) { + continue; + } + queryClient.setQueryData(query.queryKey, (old) => (old ?? 0) + 1); } - queryClient.setQueryData(query.queryKey, (old) => (old ?? 0) + 1); } } - // 请求完成或失败时刷新相关数据 + knownRequestIds.add(requestId); + if (updatedRequest.status === 'COMPLETED' || updatedRequest.status === 'FAILED') { - // 刷新 dashboard 数据 - queryClient.invalidateQueries({ queryKey: ['dashboard'] }); - // 刷新 provider stats(因为统计数据变化了) - queryClient.invalidateQueries({ queryKey: ['providers', 'stats'] }); - // 刷新 cooldowns(请求可能触发了冷却,即使最终成功也可能有 provider 进入冷却) - queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); + invalidateDashboard = true; + invalidateProviderStats = true; + invalidateCooldowns = true; } - }, - ); + } + + if (invalidateDashboard) { + queryClient.invalidateQueries({ queryKey: ['dashboard'] }); + } + if (invalidateProviderStats) { + queryClient.invalidateQueries({ queryKey: ['providers', 'stats'] }); + } + if (invalidateCooldowns) { + queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); + } + }; + + const scheduleFlush = () => { + if (flushTimer) { + return; + } + flushTimer = setTimeout(() => { + flushTimer = null; + flush(); + }, flushIntervalMs); + }; + + const unsubscribeRequest = transport.subscribe('proxy_request_update', (updatedRequest) => { + pendingRequests.set(updatedRequest.id, updatedRequest); + scheduleFlush(); + }); // 订阅 ProxyUpstreamAttempt 更新事件 const unsubscribeAttempt = transport.subscribe( 'proxy_upstream_attempt_update', (updatedAttempt) => { - // 更新 Attempts 缓存 + // 仅当 attempts 查询正在被观察时才更新,避免列表页“写缓存造内存” + const attemptsKey = requestKeys.attempts(updatedAttempt.proxyRequestID); + const attemptsQuery = queryCache.find({ queryKey: attemptsKey, exact: true }); + if (!attemptsQuery || attemptsQuery.getObserversCount() === 0) { + return; + } + queryClient.setQueryData( - requestKeys.attempts(updatedAttempt.proxyRequestID), + attemptsKey, (old) => { if (!old) return [updatedAttempt]; const index = old.findIndex((a) => a.id === updatedAttempt.id); @@ -278,6 +337,11 @@ export function useProxyRequestUpdates() { ); return () => { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + pendingRequests.clear(); unsubscribeRequest(); unsubscribeAttempt(); }; diff --git a/web/src/pages/requests/index.tsx b/web/src/pages/requests/index.tsx index 6c7ed5ff..6d3f8bfb 100644 --- a/web/src/pages/requests/index.tsx +++ b/web/src/pages/requests/index.tsx @@ -1,4 +1,4 @@ -import { useState, useMemo, useRef, useEffect } from 'react'; +import { useState, useMemo, useRef, useEffect } from 'react'; import { useNavigate } from 'react-router-dom'; import { useTranslation } from 'react-i18next'; import { @@ -109,6 +109,27 @@ export function RequestsPage() { return data?.pages.flatMap((page) => page.items) ?? []; }, [data]); + const activeCount = useMemo(() => { + return allRequests.reduce((count, req) => { + return req.status === 'PENDING' || req.status === 'IN_PROGRESS' ? count + 1 : count; + }, 0); + }, [allRequests]); + const hasActiveRequests = activeCount > 0; + const enableMarquee = activeCount <= 10; + + const [nowMs, setNowMs] = useState(() => Date.now()); + + // 全局 tick:仅在有“传输中”请求时更新,避免每行一个定时器导致重渲染风暴 + useEffect(() => { + if (!hasActiveRequests) { + return; + } + + setNowMs(Date.now()); + const interval = window.setInterval(() => setNowMs(Date.now()), 1000); + return () => window.clearInterval(interval); + }, [hasActiveRequests]); + // IntersectionObserver 触底检测 useEffect(() => { const loadMoreEl = loadMoreRef.current; @@ -292,7 +313,7 @@ export function RequestsPage() { {allRequests.map((req, index) => ( - navigate(`/requests/${req.id}`)} /> ))} @@ -466,6 +489,8 @@ function LogRow({ showProjectColumn, showTokenColumn, forceProjectBinding, + nowMs, + enableMarquee, onClick, }: { request: ProxyRequest; @@ -476,6 +501,8 @@ function LogRow({ showProjectColumn?: boolean; showTokenColumn?: boolean; forceProjectBinding?: boolean; + nowMs: number; + enableMarquee: boolean; onClick: () => void; }) { const isPending = request.status === 'PENDING' || request.status === 'IN_PROGRESS'; @@ -486,9 +513,6 @@ function LogRow({ (!request.projectID || request.projectID === 0); const [isRecent, setIsRecent] = useState(false); - // Live duration calculation for pending requests - const [liveDuration, setLiveDuration] = useState(null); - useEffect(() => { // Check if request is new (less than 5 seconds old) const startTime = new Date(request.startTime).getTime(); @@ -499,35 +523,22 @@ function LogRow({ } }, [request.startTime]); - useEffect(() => { - if (!isPending) { - setLiveDuration(null); - return; - } - - const startTime = new Date(request.startTime).getTime(); - const updateDuration = () => { - const now = Date.now(); - setLiveDuration(now - startTime); - }; - - updateDuration(); - const interval = setInterval(updateDuration, 100); - - return () => clearInterval(interval); - }, [isPending, request.startTime]); + const startTimeMs = useMemo(() => new Date(request.startTime).getTime(), [request.startTime]); + const liveDurationMs = + isPending && Number.isFinite(startTimeMs) ? Math.max(0, nowMs - startTimeMs) : null; const formatDuration = (ns?: number | null) => { if (ns === undefined || ns === null) return '-'; - // If it's live duration (ms), convert directly to seconds - if (isPending && liveDuration !== null) { - return `${(liveDuration / 1000).toFixed(2)}s`; - } // Convert nanoseconds to seconds with 2 decimal places const seconds = ns / 1_000_000_000; return `${seconds.toFixed(2)}s`; }; + const formatLiveDuration = (ms: number | null) => { + if (ms === null) return '-'; + return `${(ms / 1000).toFixed(2)}s`; + }; + const formatTime = (dateStr: string) => { const date = new Date(dateStr); const yyyy = date.getFullYear(); @@ -540,7 +551,7 @@ function LogRow({ }; // Display duration - const displayDuration = isPending ? liveDuration : request.duration; + const displayDuration = request.duration; // Duration color const durationColor = isPending @@ -576,8 +587,15 @@ function LogRow({ 'border-l-2 border-l-amber-500', ), - // Active/Pending state - Blue left border + Marquee animation - isPending && !isPendingBinding && 'animate-marquee-row', + // Active/Pending state - Blue left border + (optional) Marquee animation + isPending && + !isPendingBinding && + (enableMarquee + ? 'animate-marquee-row' + : cn( + index % 2 === 1 ? 'bg-blue-500/10' : 'bg-blue-500/5', + 'border-l-2 border-l-blue-500/50', + )), // New Item Flash Animation isRecent && !isPending && !isPendingBinding && 'bg-accent/20', @@ -687,7 +705,7 @@ function LogRow({ className={`text-xs font-mono ${durationColor}`} title={`${formatTime(request.startTime || request.createdAt)} → ${request.endTime && new Date(request.endTime).getTime() > 0 ? formatTime(request.endTime) : '...'}`} > - {formatDuration(displayDuration)} + {isPending ? formatLiveDuration(liveDurationMs) : formatDuration(displayDuration)} From 7719e475fc7ac07929252bf3fe00908304b3ca93 Mon Sep 17 00:00:00 2001 From: Asura Date: Sun, 22 Feb 2026 22:25:30 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=85=A8=E8=AE=BF=E9=97=AE=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/broadcaster.go | 43 ++++++++- internal/event/wails_broadcaster_desktop.go | 4 +- internal/executor/executor.go | 102 ++++++++++++-------- internal/handler/websocket.go | 52 ++++++++-- internal/repository/sqlite/migrations.go | 31 +++++- internal/repository/sqlite/models.go | 4 +- web/src/contexts/cooldowns-context.tsx | 12 +-- web/src/hooks/queries/use-providers.ts | 3 +- web/src/hooks/queries/use-requests.ts | 83 +++++++++++++--- web/src/hooks/use-cooldowns.ts | 13 +-- web/src/hooks/use-streaming.ts | 75 +++++++------- web/src/lib/cooldown-update-subscription.ts | 29 ++++++ web/src/pages/requests/index.tsx | 98 +++++++++++++------ 13 files changed, 394 insertions(+), 155 deletions(-) create mode 100644 web/src/lib/cooldown-update-subscription.ts diff --git a/internal/event/broadcaster.go b/internal/event/broadcaster.go index 33382877..c4b83ff9 100644 --- a/internal/event/broadcaster.go +++ b/internal/event/broadcaster.go @@ -1,4 +1,4 @@ -package event +package event import "github.com/awsl-project/maxx/internal/domain" @@ -14,7 +14,42 @@ type Broadcaster interface { // NopBroadcaster 空实现,用于测试或不需要广播的场景 type NopBroadcaster struct{} -func (n *NopBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {} +func (n *NopBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) {} func (n *NopBroadcaster) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) {} -func (n *NopBroadcaster) BroadcastLog(message string) {} -func (n *NopBroadcaster) BroadcastMessage(messageType string, data interface{}) {} +func (n *NopBroadcaster) BroadcastLog(message string) {} +func (n *NopBroadcaster) BroadcastMessage(messageType string, data interface{}) {} + +// SanitizeProxyRequestForBroadcast 用于“实时广播”场景瘦身 payload: +// 去掉 request/response 大字段,避免 WebSocket 消息动辄几十/几百 KB,导致前端 JSON.parse / GC 卡死。 +// +// 说明: +// - /requests 列表页只需要轻量字段(状态、耗时、tokens、成本等)。 +// - 详情页需要的大字段应通过 /admin/requests/{id} 与 /admin/requests/{id}/attempts 拉取。 +func SanitizeProxyRequestForBroadcast(req *domain.ProxyRequest) *domain.ProxyRequest { + if req == nil { + return nil + } + // 已经是瘦身后的对象,避免重复拷贝(高频场景会产生额外 GC 压力) + if req.RequestInfo == nil && req.ResponseInfo == nil { + return req + } + copied := *req + copied.RequestInfo = nil + copied.ResponseInfo = nil + return &copied +} + +// SanitizeProxyUpstreamAttemptForBroadcast 用于“实时广播”场景瘦身 payload。 +func SanitizeProxyUpstreamAttemptForBroadcast(attempt *domain.ProxyUpstreamAttempt) *domain.ProxyUpstreamAttempt { + if attempt == nil { + return nil + } + // 已经是瘦身后的对象,避免重复拷贝(高频场景会产生额外 GC 压力) + if attempt.RequestInfo == nil && attempt.ResponseInfo == nil { + return attempt + } + copied := *attempt + copied.RequestInfo = nil + copied.ResponseInfo = nil + return &copied +} diff --git a/internal/event/wails_broadcaster_desktop.go b/internal/event/wails_broadcaster_desktop.go index 3afc62ae..0e9c6a95 100644 --- a/internal/event/wails_broadcaster_desktop.go +++ b/internal/event/wails_broadcaster_desktop.go @@ -1,4 +1,4 @@ -//go:build desktop +//go:build desktop package event @@ -47,6 +47,7 @@ func (w *WailsBroadcaster) emitWailsEvent(eventType string, data interface{}) { // BroadcastProxyRequest broadcasts a proxy request update func (w *WailsBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) { + req = SanitizeProxyRequestForBroadcast(req) // Broadcast via inner broadcaster (WebSocket) if w.inner != nil { w.inner.BroadcastProxyRequest(req) @@ -57,6 +58,7 @@ func (w *WailsBroadcaster) BroadcastProxyRequest(req *domain.ProxyRequest) { // BroadcastProxyUpstreamAttempt broadcasts a proxy upstream attempt update func (w *WailsBroadcaster) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) { + attempt = SanitizeProxyUpstreamAttemptForBroadcast(attempt) if w.inner != nil { w.inner.BroadcastProxyUpstreamAttempt(attempt) } diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 6e8d4bb0..a24451c0 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -1,4 +1,4 @@ -package executor +package executor import ( "context" @@ -321,51 +321,71 @@ func (e *Executor) processAdapterEventsRealtime(eventChan domain.AdapterEventCha return } - for event := range eventChan { - if event == nil { - continue - } + // 事件节流:合并多个 adapter 事件为一次广播,避免在流式高并发下产生“广播风暴” + const broadcastThrottle = 200 * time.Millisecond + ticker := time.NewTicker(broadcastThrottle) + defer ticker.Stop() - needsBroadcast := false + dirty := false - switch event.Type { - case domain.EventRequestInfo: - if !e.shouldClearRequestDetail() && event.RequestInfo != nil { - attempt.RequestInfo = event.RequestInfo - needsBroadcast = true - } - case domain.EventResponseInfo: - if !e.shouldClearRequestDetail() && event.ResponseInfo != nil { - attempt.ResponseInfo = event.ResponseInfo - needsBroadcast = true - } - case domain.EventMetrics: - if event.Metrics != nil { - attempt.InputTokenCount = event.Metrics.InputTokens - attempt.OutputTokenCount = event.Metrics.OutputTokens - attempt.CacheReadCount = event.Metrics.CacheReadCount - attempt.CacheWriteCount = event.Metrics.CacheCreationCount - attempt.Cache5mWriteCount = event.Metrics.Cache5mCreationCount - attempt.Cache1hWriteCount = event.Metrics.Cache1hCreationCount - needsBroadcast = true - } - case domain.EventResponseModel: - if event.ResponseModel != "" { - attempt.ResponseModel = event.ResponseModel - needsBroadcast = true + flush := func() { + if !dirty || e.broadcaster == nil { + dirty = false + return + } + // 广播前做一次瘦身 + 快照,避免发送大字段、也避免指针被后续修改导致数据竞争 + snapshot := event.SanitizeProxyUpstreamAttemptForBroadcast(attempt) + e.broadcaster.BroadcastProxyUpstreamAttempt(snapshot) + dirty = false + } + + for { + select { + case ev, ok := <-eventChan: + if !ok { + flush() + return } - case domain.EventFirstToken: - if event.FirstTokenTime > 0 { - // Calculate TTFT as duration from start time to first token time - firstTokenTime := time.UnixMilli(event.FirstTokenTime) - attempt.TTFT = firstTokenTime.Sub(attempt.StartTime) - needsBroadcast = true + if ev == nil { + continue } - } - // Broadcast update immediately for real-time visibility - if needsBroadcast && e.broadcaster != nil { - e.broadcaster.BroadcastProxyUpstreamAttempt(attempt) + switch ev.Type { + case domain.EventRequestInfo: + if !e.shouldClearRequestDetail() && ev.RequestInfo != nil { + attempt.RequestInfo = ev.RequestInfo + dirty = true + } + case domain.EventResponseInfo: + if !e.shouldClearRequestDetail() && ev.ResponseInfo != nil { + attempt.ResponseInfo = ev.ResponseInfo + dirty = true + } + case domain.EventMetrics: + if ev.Metrics != nil { + attempt.InputTokenCount = ev.Metrics.InputTokens + attempt.OutputTokenCount = ev.Metrics.OutputTokens + attempt.CacheReadCount = ev.Metrics.CacheReadCount + attempt.CacheWriteCount = ev.Metrics.CacheCreationCount + attempt.Cache5mWriteCount = ev.Metrics.Cache5mCreationCount + attempt.Cache1hWriteCount = ev.Metrics.Cache1hCreationCount + dirty = true + } + case domain.EventResponseModel: + if ev.ResponseModel != "" { + attempt.ResponseModel = ev.ResponseModel + dirty = true + } + case domain.EventFirstToken: + if ev.FirstTokenTime > 0 { + // Calculate TTFT as duration from start time to first token time + firstTokenTime := time.UnixMilli(ev.FirstTokenTime) + attempt.TTFT = firstTokenTime.Sub(attempt.StartTime) + dirty = true + } + } + case <-ticker.C: + flush() } } } diff --git a/internal/handler/websocket.go b/internal/handler/websocket.go index 6e49c29e..4f415eaa 100644 --- a/internal/handler/websocket.go +++ b/internal/handler/websocket.go @@ -1,4 +1,4 @@ -package handler +package handler import ( "bufio" @@ -10,6 +10,7 @@ import ( "sync" "github.com/awsl-project/maxx/internal/domain" + "github.com/awsl-project/maxx/internal/event" "github.com/gorilla/websocket" ) @@ -41,15 +42,29 @@ func NewWebSocketHub() *WebSocketHub { func (h *WebSocketHub) run() { for msg := range h.broadcast { + // 避免在持锁状态下进行网络写入;同时修复 RLock 下 delete map 的数据竞争风险 h.mu.RLock() + clients := make([]*websocket.Conn, 0, len(h.clients)) for client := range h.clients { - err := client.WriteJSON(msg) - if err != nil { - client.Close() + clients = append(clients, client) + } + h.mu.RUnlock() + + var toRemove []*websocket.Conn + for _, client := range clients { + if err := client.WriteJSON(msg); err != nil { + _ = client.Close() + toRemove = append(toRemove, client) + } + } + + if len(toRemove) > 0 { + h.mu.Lock() + for _, client := range toRemove { delete(h.clients, client) } + h.mu.Unlock() } - h.mu.RUnlock() } } @@ -81,33 +96,52 @@ func (h *WebSocketHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { } func (h *WebSocketHub) BroadcastProxyRequest(req *domain.ProxyRequest) { - h.broadcast <- WSMessage{ + req = event.SanitizeProxyRequestForBroadcast(req) + msg := WSMessage{ Type: "proxy_request_update", Data: req, } + select { + case h.broadcast <- msg: + default: + // Channel 满时丢弃,避免阻塞 executor/请求处理链路 + } } func (h *WebSocketHub) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) { - h.broadcast <- WSMessage{ + attempt = event.SanitizeProxyUpstreamAttemptForBroadcast(attempt) + msg := WSMessage{ Type: "proxy_upstream_attempt_update", Data: attempt, } + select { + case h.broadcast <- msg: + default: + } } // BroadcastMessage sends a custom message with specified type to all connected clients func (h *WebSocketHub) BroadcastMessage(messageType string, data interface{}) { - h.broadcast <- WSMessage{ + msg := WSMessage{ Type: messageType, Data: data, } + select { + case h.broadcast <- msg: + default: + } } // BroadcastLog sends a log message to all connected clients func (h *WebSocketHub) BroadcastLog(message string) { - h.broadcast <- WSMessage{ + msg := WSMessage{ Type: "log_message", Data: message, } + select { + case h.broadcast <- msg: + default: + } } // WebSocketLogWriter implements io.Writer to capture logs and broadcast via WebSocket diff --git a/internal/repository/sqlite/migrations.go b/internal/repository/sqlite/migrations.go index e28dfc65..f20972ce 100644 --- a/internal/repository/sqlite/migrations.go +++ b/internal/repository/sqlite/migrations.go @@ -1,8 +1,9 @@ -package sqlite +package sqlite import ( "log" "sort" + "strings" "time" "gorm.io/gorm" @@ -51,6 +52,34 @@ var migrations = []Migration{ return nil }, }, + { + Version: 2, + Description: "Add index on proxy_requests.provider_id", + Up: func(db *gorm.DB) error { + // 说明:这是高频列表/过滤路径的关键优化点。 + // 不同数据库方言对 IF NOT EXISTS 的支持不同,这里做最小兼容处理。 + switch db.Dialector.Name() { + case "mysql": + err := db.Exec("CREATE INDEX idx_proxy_requests_provider_id ON proxy_requests(provider_id)").Error + if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate") { + return nil + } + return err + default: + return db.Exec("CREATE INDEX IF NOT EXISTS idx_proxy_requests_provider_id ON proxy_requests(provider_id)").Error + } + }, + Down: func(db *gorm.DB) error { + switch db.Dialector.Name() { + case "mysql": + // MySQL 不支持 DROP INDEX IF EXISTS;这里尽量执行,失败则忽略(回滚不是主路径)。 + _ = db.Exec("DROP INDEX idx_proxy_requests_provider_id ON proxy_requests").Error + return nil + default: + return db.Exec("DROP INDEX IF EXISTS idx_proxy_requests_provider_id").Error + } + }, + }, } // RunMigrations 运行所有待执行的迁移 diff --git a/internal/repository/sqlite/models.go b/internal/repository/sqlite/models.go index 4e2e2e6d..4cb956e5 100644 --- a/internal/repository/sqlite/models.go +++ b/internal/repository/sqlite/models.go @@ -1,4 +1,4 @@ -package sqlite +package sqlite import ( "time" @@ -222,7 +222,7 @@ type ProxyRequest struct { Multiplier uint64 // 倍率(10000=1倍) Cost uint64 RouteID uint64 - ProviderID uint64 + ProviderID uint64 `gorm:"index"` IsStream int StatusCode int ProjectID uint64 diff --git a/web/src/contexts/cooldowns-context.tsx b/web/src/contexts/cooldowns-context.tsx index 467ac5c8..eb7eafcb 100644 --- a/web/src/contexts/cooldowns-context.tsx +++ b/web/src/contexts/cooldowns-context.tsx @@ -1,4 +1,4 @@ -/** +/** * Cooldowns Context * 提供共享的 Cooldowns 数据,减少重复请求 */ @@ -7,6 +7,7 @@ import { createContext, useContext, useEffect, useCallback, type ReactNode } fro import { useQuery, useQueryClient, useMutation } from '@tanstack/react-query'; import { getTransport } from '@/lib/transport'; import type { Cooldown } from '@/lib/transport'; +import { subscribeCooldownUpdates } from '@/lib/cooldown-update-subscription'; interface CooldownsContextValue { cooldowns: Cooldown[]; @@ -38,14 +39,7 @@ export function CooldownsProvider({ children }: CooldownsProviderProps) { // Subscribe to cooldown_update WebSocket event useEffect(() => { - const transport = getTransport(); - const unsubscribe = transport.subscribe('cooldown_update', () => { - queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); - }); - - return () => { - unsubscribe(); - }; + return subscribeCooldownUpdates(queryClient); }, [queryClient]); // Mutation for clearing cooldown diff --git a/web/src/hooks/queries/use-providers.ts b/web/src/hooks/queries/use-providers.ts index d5bcfa16..66472f6c 100644 --- a/web/src/hooks/queries/use-providers.ts +++ b/web/src/hooks/queries/use-providers.ts @@ -1,4 +1,4 @@ -/** +/** * Provider React Query Hooks */ @@ -98,6 +98,7 @@ export function useAllProviderStats() { queryKey: [...providerKeys.stats(), 'all'], queryFn: () => getTransport().getProviderStats(), // 不再轮询,改为通过 WebSocket 事件触发刷新 (useProxyRequestUpdates) + staleTime: 5000, }); } diff --git a/web/src/hooks/queries/use-requests.ts b/web/src/hooks/queries/use-requests.ts index a5f5ee6f..eae8d3e6 100644 --- a/web/src/hooks/queries/use-requests.ts +++ b/web/src/hooks/queries/use-requests.ts @@ -84,11 +84,57 @@ export function useProxyRequestUpdates() { const flushIntervalMs = 250; const pendingRequests = new Map(); + const pendingAttemptsByRequest = new Map>(); const knownRequestIds = new Set(); let flushTimer: ReturnType | null = null; + const flushAttempts = () => { + if (pendingAttemptsByRequest.size === 0) { + return; + } + + const entries = Array.from(pendingAttemptsByRequest.entries()); + pendingAttemptsByRequest.clear(); + + for (const [proxyRequestID, attemptsById] of entries) { + const attemptsKey = requestKeys.attempts(proxyRequestID); + const attemptsQuery = queryCache.find({ queryKey: attemptsKey, exact: true }); + if (!attemptsQuery || attemptsQuery.getObserversCount() === 0) { + continue; + } + + const updates = Array.from(attemptsById.values()); + + queryClient.setQueryData(attemptsKey, (old) => { + const list = old ? [...old] : []; + + for (const updatedAttempt of updates) { + const index = list.findIndex((a) => a.id === updatedAttempt.id); + if (index >= 0) { + const prev = list[index]; + list[index] = { + ...prev, + ...updatedAttempt, + requestInfo: updatedAttempt.requestInfo ?? prev.requestInfo, + responseInfo: updatedAttempt.responseInfo ?? prev.responseInfo, + }; + continue; + } + list.push(updatedAttempt); + } + + return list; + }); + } + }; + const flush = () => { + if (pendingRequests.size === 0 && pendingAttemptsByRequest.size === 0) { + return; + } + if (pendingRequests.size === 0) { + flushAttempts(); return; } @@ -117,7 +163,19 @@ export function useProxyRequestUpdates() { const detailKey = requestKeys.detail(requestId); const detailQuery = queryCache.find({ queryKey: detailKey, exact: true }); if (detailQuery && detailQuery.getObserversCount() > 0) { - queryClient.setQueryData(detailKey, updatedRequest); + // 后端可能会对 WS 广播做“瘦身”(不带 requestInfo/responseInfo 大字段), + // 这里合并旧值,避免把详情页已加载的内容覆盖成空。 + queryClient.setQueryData(detailKey, (old) => { + if (!old) { + return updatedRequest; + } + return { + ...old, + ...updatedRequest, + requestInfo: updatedRequest.requestInfo ?? old.requestInfo, + responseInfo: updatedRequest.responseInfo ?? old.responseInfo, + }; + }); isKnown = true; } @@ -292,6 +350,8 @@ export function useProxyRequestUpdates() { if (invalidateCooldowns) { queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); } + + flushAttempts(); }; const scheduleFlush = () => { @@ -320,19 +380,13 @@ export function useProxyRequestUpdates() { return; } - queryClient.setQueryData( - attemptsKey, - (old) => { - if (!old) return [updatedAttempt]; - const index = old.findIndex((a) => a.id === updatedAttempt.id); - if (index >= 0) { - const newList = [...old]; - newList[index] = updatedAttempt; - return newList; - } - return [...old, updatedAttempt]; - }, - ); + let perRequest = pendingAttemptsByRequest.get(updatedAttempt.proxyRequestID); + if (!perRequest) { + perRequest = new Map(); + pendingAttemptsByRequest.set(updatedAttempt.proxyRequestID, perRequest); + } + perRequest.set(updatedAttempt.id, updatedAttempt); + scheduleFlush(); }, ); @@ -342,6 +396,7 @@ export function useProxyRequestUpdates() { flushTimer = null; } pendingRequests.clear(); + pendingAttemptsByRequest.clear(); unsubscribeRequest(); unsubscribeAttempt(); }; diff --git a/web/src/hooks/use-cooldowns.ts b/web/src/hooks/use-cooldowns.ts index aa21fe2f..96798eb6 100644 --- a/web/src/hooks/use-cooldowns.ts +++ b/web/src/hooks/use-cooldowns.ts @@ -1,7 +1,8 @@ -import { useQuery, useQueryClient, useMutation } from '@tanstack/react-query'; +import { useQuery, useQueryClient, useMutation } from '@tanstack/react-query'; import { getTransport } from '@/lib/transport'; import type { Cooldown } from '@/lib/transport'; import { useEffect, useState, useCallback } from 'react'; +import { subscribeCooldownUpdates } from '@/lib/cooldown-update-subscription'; export function useCooldowns() { const queryClient = useQueryClient(); @@ -20,15 +21,7 @@ export function useCooldowns() { // Subscribe to cooldown_update WebSocket event useEffect(() => { - const transport = getTransport(); - const unsubscribe = transport.subscribe('cooldown_update', () => { - // Invalidate and refetch cooldowns when a cooldown update is received - queryClient.invalidateQueries({ queryKey: ['cooldowns'] }); - }); - - return () => { - unsubscribe(); - }; + return subscribeCooldownUpdates(queryClient); }, [queryClient]); // Mutation for clearing cooldown diff --git a/web/src/hooks/use-streaming.ts b/web/src/hooks/use-streaming.ts index af97b864..22a6762d 100644 --- a/web/src/hooks/use-streaming.ts +++ b/web/src/hooks/use-streaming.ts @@ -1,9 +1,9 @@ -/** +/** * Streaming Requests Hook * 追踪实时活动请求状态 */ -import { useState, useEffect, useCallback, useRef } from 'react'; +import { useState, useEffect, useCallback, useRef, useMemo } from 'react'; import { getTransport, type ProxyRequest, type ClientType } from '@/lib/transport'; export interface StreamingState { @@ -105,43 +105,46 @@ export function useStreamingRequests(): StreamingState { }; }, [handleRequestUpdate, loadActiveRequests]); - // 计算按 clientType 和 providerID 的统计 - const countsByClient = new Map(); - const countsByProvider = new Map(); - const countsByProviderAndClient = new Map(); - const countsByRoute = new Map(); - - for (const request of activeRequests.values()) { - // 按 clientType 统计 - const clientCount = countsByClient.get(request.clientType) || 0; - countsByClient.set(request.clientType, clientCount + 1); - - // 按 routeID 统计 - if (request.routeID > 0) { - const routeCount = countsByRoute.get(request.routeID) || 0; - countsByRoute.set(request.routeID, routeCount + 1); - } + return useMemo((): StreamingState => { + // 计算按 clientType 和 providerID 的统计 + const countsByClient = new Map(); + const countsByProvider = new Map(); + const countsByProviderAndClient = new Map(); + const countsByRoute = new Map(); + const requests = Array.from(activeRequests.values()); + + for (const request of requests) { + // 按 clientType 统计 + const clientCount = countsByClient.get(request.clientType) || 0; + countsByClient.set(request.clientType, clientCount + 1); + + // 按 routeID 统计 + if (request.routeID > 0) { + const routeCount = countsByRoute.get(request.routeID) || 0; + countsByRoute.set(request.routeID, routeCount + 1); + } - // 按 providerID 统计 - if (request.providerID > 0) { - const providerCount = countsByProvider.get(request.providerID) || 0; - countsByProvider.set(request.providerID, providerCount + 1); + // 按 providerID 统计 + if (request.providerID > 0) { + const providerCount = countsByProvider.get(request.providerID) || 0; + countsByProvider.set(request.providerID, providerCount + 1); - // 按 providerID + clientType 组合统计 - const key = `${request.providerID}:${request.clientType}`; - const combinedCount = countsByProviderAndClient.get(key) || 0; - countsByProviderAndClient.set(key, combinedCount + 1); + // 按 providerID + clientType 组合统计 + const key = `${request.providerID}:${request.clientType}`; + const combinedCount = countsByProviderAndClient.get(key) || 0; + countsByProviderAndClient.set(key, combinedCount + 1); + } } - } - - return { - total: activeRequests.size, - requests: Array.from(activeRequests.values()), - countsByClient, - countsByProvider, - countsByProviderAndClient, - countsByRoute, - }; + + return { + total: activeRequests.size, + requests, + countsByClient, + countsByProvider, + countsByProviderAndClient, + countsByRoute, + }; + }, [activeRequests]); } /** diff --git a/web/src/lib/cooldown-update-subscription.ts b/web/src/lib/cooldown-update-subscription.ts new file mode 100644 index 00000000..8ccdd627 --- /dev/null +++ b/web/src/lib/cooldown-update-subscription.ts @@ -0,0 +1,29 @@ +import type { QueryClient } from '@tanstack/react-query'; +import { getTransport } from '@/lib/transport'; + +type Unsubscribe = () => void; + +let transportUnsubscribe: Unsubscribe | null = null; +const queryClients = new Set(); + +export function subscribeCooldownUpdates(queryClient: QueryClient): Unsubscribe { + queryClients.add(queryClient); + + if (!transportUnsubscribe) { + const transport = getTransport(); + transportUnsubscribe = transport.subscribe('cooldown_update', () => { + for (const qc of queryClients) { + qc.invalidateQueries({ queryKey: ['cooldowns'] }); + } + }); + } + + return () => { + queryClients.delete(queryClient); + if (queryClients.size === 0 && transportUnsubscribe) { + transportUnsubscribe(); + transportUnsubscribe = null; + } + }; +} + diff --git a/web/src/pages/requests/index.tsx b/web/src/pages/requests/index.tsx index 6d3f8bfb..0ac8ecaf 100644 --- a/web/src/pages/requests/index.tsx +++ b/web/src/pages/requests/index.tsx @@ -1,4 +1,5 @@ import { useState, useMemo, useRef, useEffect } from 'react'; +import { useCallback, memo } from 'react'; import { useNavigate } from 'react-router-dom'; import { useTranslation } from 'react-i18next'; import { @@ -167,6 +168,13 @@ export function RequestsPage() { scrollContainerRef.current?.scrollTo({ top: 0 }); }; + const handleOpenRequest = useCallback( + (id: number) => { + navigate(`/requests/${id}`); + }, + [navigate], + ); + return (
{allRequests.map((req) => ( - navigate(`/requests/${req.id}`)} + onOpenRequest={handleOpenRequest} /> ))} {/* 触底加载指示器 */} @@ -313,7 +321,7 @@ export function RequestsPage() { {allRequests.map((req, index) => ( - navigate(`/requests/${req.id}`)} + onOpenRequest={handleOpenRequest} /> ))} @@ -480,6 +488,20 @@ function CostCell({ cost }: { cost: number }) { } // Log Row Component +type LogRowProps = { + request: ProxyRequest; + index: number; + providerName?: string; + projectName?: string; + tokenName?: string; + showProjectColumn?: boolean; + showTokenColumn?: boolean; + forceProjectBinding?: boolean; + nowMs: number; + enableMarquee: boolean; + onOpenRequest: (id: number) => void; +}; + function LogRow({ request, index, @@ -491,20 +513,8 @@ function LogRow({ forceProjectBinding, nowMs, enableMarquee, - onClick, -}: { - request: ProxyRequest; - index: number; - providerName?: string; - projectName?: string; - tokenName?: string; - showProjectColumn?: boolean; - showTokenColumn?: boolean; - forceProjectBinding?: boolean; - nowMs: number; - enableMarquee: boolean; - onClick: () => void; -}) { + onOpenRequest, +}: LogRowProps) { const isPending = request.status === 'PENDING' || request.status === 'IN_PROGRESS'; const isFailed = request.status === 'FAILED'; const isPendingBinding = @@ -565,10 +575,11 @@ function LogRow({ // Zebra striping base class const zebraClass = index % 2 === 1 ? 'bg-foreground/[0.03]' : ''; + const handleClick = useCallback(() => onOpenRequest(request.id), [onOpenRequest, request.id]); return ( , next: Readonly) => { + if (prev.request !== next.request) return false; + if (prev.index !== next.index) return false; + if (prev.providerName !== next.providerName) return false; + if (prev.projectName !== next.projectName) return false; + if (prev.tokenName !== next.tokenName) return false; + if (prev.showProjectColumn !== next.showProjectColumn) return false; + if (prev.showTokenColumn !== next.showTokenColumn) return false; + if (prev.forceProjectBinding !== next.forceProjectBinding) return false; + if (prev.enableMarquee !== next.enableMarquee) return false; + if (prev.onOpenRequest !== next.onOpenRequest) return false; + + const prevPending = prev.request.status === 'PENDING' || prev.request.status === 'IN_PROGRESS'; + const nextPending = next.request.status === 'PENDING' || next.request.status === 'IN_PROGRESS'; + if (prevPending || nextPending) { + return prev.nowMs === next.nowMs; + } + + return true; + }, +); + // Mobile Request Card Component -function MobileRequestCard({ - request, - providerName, - onClick, -}: { +type MobileRequestCardProps = { request: ProxyRequest; providerName?: string; - onClick: () => void; -}) { + onOpenRequest: (id: number) => void; +}; + +function MobileRequestCard({ request, providerName, onOpenRequest }: MobileRequestCardProps) { const isPending = request.status === 'PENDING' || request.status === 'IN_PROGRESS'; const isFailed = request.status === 'FAILED'; + const handleClick = useCallback(() => onOpenRequest(request.id), [onOpenRequest, request.id]); const formatTime = (dateStr: string) => { const date = new Date(dateStr); @@ -790,7 +824,7 @@ function MobileRequestCard({ return (
, next: Readonly) => { + if (prev.request !== next.request) return false; + if (prev.providerName !== next.providerName) return false; + if (prev.onOpenRequest !== next.onOpenRequest) return false; + return true; + }, +); + // Provider Filter Component using Select function ProviderFilter({ providers, From 7b80677e0e384822724757146f33ec2f3b0cc8ef Mon Sep 17 00:00:00 2001 From: Asura Date: Sun, 22 Feb 2026 23:23:47 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=20WebSocket=20=E5=B9=BF?= =?UTF-8?q?=E6=92=AD=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A1=AE=E4=BF=9D=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=BF=AB=E7=85=A7=EF=BC=8C=E6=B7=BB=E5=8A=A0=20MySQL?= =?UTF-8?q?=20=E9=87=8D=E5=A4=8D=E7=B4=A2=E5=BC=95=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/broadcaster.go | 2 +- internal/event/wails_broadcaster_desktop.go | 2 +- internal/handler/websocket.go | 83 ++++++++---- internal/handler/websocket_test.go | 123 ++++++++++++++++++ internal/repository/sqlite/migrations.go | 24 +++- internal/repository/sqlite/migrations_test.go | 23 ++++ web/src/pages/requests/index.tsx | 4 +- 7 files changed, 232 insertions(+), 29 deletions(-) create mode 100644 internal/handler/websocket_test.go create mode 100644 internal/repository/sqlite/migrations_test.go diff --git a/internal/event/broadcaster.go b/internal/event/broadcaster.go index c4b83ff9..326ef352 100644 --- a/internal/event/broadcaster.go +++ b/internal/event/broadcaster.go @@ -1,4 +1,4 @@ -package event +package event import "github.com/awsl-project/maxx/internal/domain" diff --git a/internal/event/wails_broadcaster_desktop.go b/internal/event/wails_broadcaster_desktop.go index 0e9c6a95..377c6989 100644 --- a/internal/event/wails_broadcaster_desktop.go +++ b/internal/event/wails_broadcaster_desktop.go @@ -1,4 +1,4 @@ -//go:build desktop +//go:build desktop package event diff --git a/internal/handler/websocket.go b/internal/handler/websocket.go index 4f415eaa..4fc040b3 100644 --- a/internal/handler/websocket.go +++ b/internal/handler/websocket.go @@ -1,4 +1,4 @@ -package handler +package handler import ( "bufio" @@ -6,8 +6,11 @@ import ( "log" "net/http" "os" + "strconv" "strings" "sync" + "sync/atomic" + "time" "github.com/awsl-project/maxx/internal/domain" "github.com/awsl-project/maxx/internal/event" @@ -29,8 +32,13 @@ type WebSocketHub struct { clients map[*websocket.Conn]bool broadcast chan WSMessage mu sync.RWMutex + + // broadcast channel 满时的丢弃计数(热路径:只做原子累加) + broadcastDroppedTotal atomic.Uint64 } +const websocketWriteTimeout = 5 * time.Second + func NewWebSocketHub() *WebSocketHub { hub := &WebSocketHub{ clients: make(map[*websocket.Conn]bool), @@ -52,6 +60,7 @@ func (h *WebSocketHub) run() { var toRemove []*websocket.Conn for _, client := range clients { + _ = client.SetWriteDeadline(time.Now().Add(websocketWriteTimeout)) if err := client.WriteJSON(msg); err != nil { _ = client.Close() toRemove = append(toRemove, client) @@ -68,6 +77,23 @@ func (h *WebSocketHub) run() { } } +func (h *WebSocketHub) tryEnqueueBroadcast(msg WSMessage, meta string) { + select { + case h.broadcast <- msg: + default: + dropped := h.broadcastDroppedTotal.Add(1) + // 避免日志刷屏:首次 + 每100次打印一次,确保可观测性但不拖慢热路径。 + if dropped == 1 || dropped%100 == 0 { + meta = strings.TrimSpace(meta) + if meta != "" { + log.Printf("[WebSocket] drop broadcast message type=%s %s dropped_total=%d", msg.Type, meta, dropped) + } else { + log.Printf("[WebSocket] drop broadcast message type=%s dropped_total=%d", msg.Type, dropped) + } + } + } +} + func (h *WebSocketHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -96,28 +122,47 @@ func (h *WebSocketHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { } func (h *WebSocketHub) BroadcastProxyRequest(req *domain.ProxyRequest) { - req = event.SanitizeProxyRequestForBroadcast(req) + sanitized := event.SanitizeProxyRequestForBroadcast(req) + var data interface{} = sanitized + var meta string + if sanitized != nil { + // 无论 Sanitize 是否返回原指针,都强制做一次浅拷贝快照,避免异步消费者读到后续可变更的数据。 + snapshot := *sanitized + data = snapshot + meta = "requestID=" + snapshot.RequestID + if snapshot.ID != 0 { + meta += " requestDbID=" + strconv.FormatUint(snapshot.ID, 10) + } + } msg := WSMessage{ Type: "proxy_request_update", - Data: req, - } - select { - case h.broadcast <- msg: - default: - // Channel 满时丢弃,避免阻塞 executor/请求处理链路 + Data: data, } + h.tryEnqueueBroadcast(msg, meta) } func (h *WebSocketHub) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstreamAttempt) { - attempt = event.SanitizeProxyUpstreamAttemptForBroadcast(attempt) + sanitized := event.SanitizeProxyUpstreamAttemptForBroadcast(attempt) + var data interface{} = sanitized + var meta string + if sanitized != nil { + snapshot := *sanitized + data = snapshot + if snapshot.ProxyRequestID != 0 { + meta = "proxyRequestID=" + strconv.FormatUint(snapshot.ProxyRequestID, 10) + } + if snapshot.ID != 0 { + if meta != "" { + meta += " " + } + meta += "attemptDbID=" + strconv.FormatUint(snapshot.ID, 10) + } + } msg := WSMessage{ Type: "proxy_upstream_attempt_update", - Data: attempt, - } - select { - case h.broadcast <- msg: - default: + Data: data, } + h.tryEnqueueBroadcast(msg, meta) } // BroadcastMessage sends a custom message with specified type to all connected clients @@ -126,10 +171,7 @@ func (h *WebSocketHub) BroadcastMessage(messageType string, data interface{}) { Type: messageType, Data: data, } - select { - case h.broadcast <- msg: - default: - } + h.tryEnqueueBroadcast(msg, "") } // BroadcastLog sends a log message to all connected clients @@ -138,10 +180,7 @@ func (h *WebSocketHub) BroadcastLog(message string) { Type: "log_message", Data: message, } - select { - case h.broadcast <- msg: - default: - } + h.tryEnqueueBroadcast(msg, "") } // WebSocketLogWriter implements io.Writer to capture logs and broadcast via WebSocket diff --git a/internal/handler/websocket_test.go b/internal/handler/websocket_test.go new file mode 100644 index 00000000..e3942b8c --- /dev/null +++ b/internal/handler/websocket_test.go @@ -0,0 +1,123 @@ +package handler + +import ( + "bytes" + "log" + "strings" + "testing" + + "github.com/awsl-project/maxx/internal/domain" +) + +func TestWebSocketHub_BroadcastProxyRequest_SendsSnapshot(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + + req := &domain.ProxyRequest{ + ID: 1, + RequestID: "req_1", + Status: "IN_PROGRESS", + } + + hub.BroadcastProxyRequest(req) + + // 如果 Broadcast 发送的是同一个指针,那么这里对原对象的修改会“污染”队列中的消息。 + req.Status = "COMPLETED" + + msg := <-hub.broadcast + if msg.Type != "proxy_request_update" { + t.Fatalf("unexpected message type: %s", msg.Type) + } + + switch v := msg.Data.(type) { + case *domain.ProxyRequest: + if v == req { + t.Fatalf("expected snapshot (different pointer), got original pointer") + } + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + case domain.ProxyRequest: + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + default: + t.Fatalf("unexpected data type: %T", msg.Data) + } +} + +func TestWebSocketHub_BroadcastProxyUpstreamAttempt_SendsSnapshot(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + + attempt := &domain.ProxyUpstreamAttempt{ + ID: 2, + ProxyRequestID: 1, + Status: "IN_PROGRESS", + } + + hub.BroadcastProxyUpstreamAttempt(attempt) + attempt.Status = "COMPLETED" + + msg := <-hub.broadcast + if msg.Type != "proxy_upstream_attempt_update" { + t.Fatalf("unexpected message type: %s", msg.Type) + } + + switch v := msg.Data.(type) { + case *domain.ProxyUpstreamAttempt: + if v == attempt { + t.Fatalf("expected snapshot (different pointer), got original pointer") + } + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + case domain.ProxyUpstreamAttempt: + if v.Status != "IN_PROGRESS" { + t.Fatalf("expected snapshot status IN_PROGRESS, got %s", v.Status) + } + default: + t.Fatalf("unexpected data type: %T", msg.Data) + } +} + +func TestWebSocketHub_BroadcastProxyRequest_LogsWhenDropped(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + hub.broadcast <- WSMessage{Type: "dummy", Data: nil} + + var buf bytes.Buffer + oldOutput := log.Writer() + oldFlags := log.Flags() + oldPrefix := log.Prefix() + log.SetOutput(&buf) + log.SetFlags(0) + log.SetPrefix("") + defer func() { + log.SetOutput(oldOutput) + log.SetFlags(oldFlags) + log.SetPrefix(oldPrefix) + }() + + req := &domain.ProxyRequest{ + ID: 1, + RequestID: "req_1", + Status: "IN_PROGRESS", + } + + hub.BroadcastProxyRequest(req) + + out := buf.String() + if !strings.Contains(out, "drop") && !strings.Contains(out, "丢弃") { + t.Fatalf("expected drop log, got: %q", out) + } + if !strings.Contains(out, "proxy_request_update") { + t.Fatalf("expected message type in log, got: %q", out) + } + if !strings.Contains(out, "req_1") { + t.Fatalf("expected requestID in log, got: %q", out) + } +} diff --git a/internal/repository/sqlite/migrations.go b/internal/repository/sqlite/migrations.go index f20972ce..498b9eb7 100644 --- a/internal/repository/sqlite/migrations.go +++ b/internal/repository/sqlite/migrations.go @@ -1,11 +1,13 @@ -package sqlite +package sqlite import ( + "errors" "log" "sort" "strings" "time" + mysqlDriver "github.com/go-sql-driver/mysql" "gorm.io/gorm" ) @@ -61,7 +63,7 @@ var migrations = []Migration{ switch db.Dialector.Name() { case "mysql": err := db.Exec("CREATE INDEX idx_proxy_requests_provider_id ON proxy_requests(provider_id)").Error - if err != nil && strings.Contains(strings.ToLower(err.Error()), "duplicate") { + if isMySQLDuplicateIndexError(err) { return nil } return err @@ -73,7 +75,10 @@ var migrations = []Migration{ switch db.Dialector.Name() { case "mysql": // MySQL 不支持 DROP INDEX IF EXISTS;这里尽量执行,失败则忽略(回滚不是主路径)。 - _ = db.Exec("DROP INDEX idx_proxy_requests_provider_id ON proxy_requests").Error + sql := "DROP INDEX idx_proxy_requests_provider_id ON proxy_requests" + if err := db.Exec(sql).Error; err != nil { + log.Printf("[Migration] Warning: rollback v2 failed (dialector=mysql) sql=%q err=%v", sql, err) + } return nil default: return db.Exec("DROP INDEX IF EXISTS idx_proxy_requests_provider_id").Error @@ -82,6 +87,19 @@ var migrations = []Migration{ }, } +func isMySQLDuplicateIndexError(err error) bool { + if err == nil { + return false + } + var mysqlErr *mysqlDriver.MySQLError + if errors.As(err, &mysqlErr) { + return mysqlErr.Number == 1061 // ER_DUP_KEYNAME + } + // 兜底:错误可能被包装成字符串,避免使用过宽的 "duplicate" 匹配导致吞掉其它错误。 + lower := strings.ToLower(err.Error()) + return strings.Contains(lower, "duplicate key name") || strings.Contains(lower, "error 1061") +} + // RunMigrations 运行所有待执行的迁移 func (d *DB) RunMigrations() error { // 确保迁移表存在(由 GORM AutoMigrate 处理) diff --git a/internal/repository/sqlite/migrations_test.go b/internal/repository/sqlite/migrations_test.go new file mode 100644 index 00000000..9f48aa0d --- /dev/null +++ b/internal/repository/sqlite/migrations_test.go @@ -0,0 +1,23 @@ +package sqlite + +import ( + "errors" + "testing" + + mysqlDriver "github.com/go-sql-driver/mysql" +) + +func TestIsMySQLDuplicateIndexError(t *testing.T) { + if !isMySQLDuplicateIndexError(&mysqlDriver.MySQLError{Number: 1061, Message: "Duplicate key name"}) { + t.Fatalf("expected true for ER_DUP_KEYNAME(1061)") + } + if isMySQLDuplicateIndexError(&mysqlDriver.MySQLError{Number: 1146, Message: "Table doesn't exist"}) { + t.Fatalf("expected false for non-duplicate mysql error") + } + if !isMySQLDuplicateIndexError(errors.New("Error 1061: Duplicate key name 'idx_proxy_requests_provider_id'")) { + t.Fatalf("expected true for duplicate key name string match fallback") + } + if isMySQLDuplicateIndexError(errors.New("some other error")) { + t.Fatalf("expected false for unrelated error") + } +} diff --git a/web/src/pages/requests/index.tsx b/web/src/pages/requests/index.tsx index 0ac8ecaf..4884f172 100644 --- a/web/src/pages/requests/index.tsx +++ b/web/src/pages/requests/index.tsx @@ -1,4 +1,4 @@ -import { useState, useMemo, useRef, useEffect } from 'react'; +import { useState, useMemo, useRef, useEffect } from 'react'; import { useCallback, memo } from 'react'; import { useNavigate } from 'react-router-dom'; import { useTranslation } from 'react-i18next'; @@ -575,7 +575,7 @@ function LogRow({ // Zebra striping base class const zebraClass = index % 2 === 1 ? 'bg-foreground/[0.03]' : ''; - const handleClick = useCallback(() => onOpenRequest(request.id), [onOpenRequest, request.id]); + const handleClick = () => onOpenRequest(request.id); return ( Date: Sun, 22 Feb 2026 23:57:47 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=20WebSocket=20=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=B9=BF=E6=92=AD=EF=BC=8C=E7=A1=AE=E4=BF=9D=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BF=AB=E7=85=A7=EF=BC=9B=E6=B7=BB=E5=8A=A0=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95=E4=BB=A5=E9=AA=8C=E8=AF=81=E8=A1=8C?= =?UTF-8?q?=E4=B8=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/handler/websocket.go | 22 ++++++++++++++- internal/handler/websocket_test.go | 35 ++++++++++++++++++++++++ internal/repository/sqlite/migrations.go | 6 ++++ web/src/pages/requests/index.tsx | 18 ++++-------- 4 files changed, 68 insertions(+), 13 deletions(-) diff --git a/internal/handler/websocket.go b/internal/handler/websocket.go index 4fc040b3..642b5257 100644 --- a/internal/handler/websocket.go +++ b/internal/handler/websocket.go @@ -2,6 +2,7 @@ package handler import ( "bufio" + "encoding/json" "io" "log" "net/http" @@ -167,9 +168,28 @@ func (h *WebSocketHub) BroadcastProxyUpstreamAttempt(attempt *domain.ProxyUpstre // BroadcastMessage sends a custom message with specified type to all connected clients func (h *WebSocketHub) BroadcastMessage(messageType string, data interface{}) { + // 约定:BroadcastMessage 允许调用方传入 map/struct/指针等可变对象。 + // + // 但由于实际发送是异步的(入队后由 run() 写到各连接),如果这里直接把可变指针放进 channel, + // 调用方在入队后继续修改数据,会导致与 BroadcastProxyRequest 类似的数据竞态。 + // + // 因此这里先把 data 预先序列化为 json.RawMessage,形成不可变快照;后续 WriteJSON 会直接写入该快照。 + var snapshot interface{} = data + if data != nil { + if raw, ok := data.(json.RawMessage); ok { + snapshot = raw + } else { + b, err := json.Marshal(data) + if err != nil { + log.Printf("[WebSocket] drop broadcast message type=%s: marshal snapshot failed: %v", messageType, err) + return + } + snapshot = json.RawMessage(b) + } + } msg := WSMessage{ Type: messageType, - Data: data, + Data: snapshot, } h.tryEnqueueBroadcast(msg, "") } diff --git a/internal/handler/websocket_test.go b/internal/handler/websocket_test.go index e3942b8c..c84c7cac 100644 --- a/internal/handler/websocket_test.go +++ b/internal/handler/websocket_test.go @@ -2,6 +2,7 @@ package handler import ( "bytes" + "encoding/json" "log" "strings" "testing" @@ -121,3 +122,37 @@ func TestWebSocketHub_BroadcastProxyRequest_LogsWhenDropped(t *testing.T) { t.Fatalf("expected requestID in log, got: %q", out) } } + +func TestWebSocketHub_BroadcastMessage_SendsSnapshot(t *testing.T) { + hub := &WebSocketHub{ + broadcast: make(chan WSMessage, 1), + } + + type payload struct { + A int `json:"a"` + } + + p := &payload{A: 1} + hub.BroadcastMessage("custom_event", p) + + // 如果 BroadcastMessage 直接把指针放进队列,这里修改会污染后续消费者看到的数据。 + p.A = 2 + + msg := <-hub.broadcast + if msg.Type != "custom_event" { + t.Fatalf("unexpected message type: %s", msg.Type) + } + + raw, ok := msg.Data.(json.RawMessage) + if !ok { + t.Fatalf("expected Data to be json.RawMessage snapshot, got %T", msg.Data) + } + + var got payload + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("failed to unmarshal snapshot: %v", err) + } + if got.A != 1 { + t.Fatalf("expected snapshot A=1, got %d", got.A) + } +} diff --git a/internal/repository/sqlite/migrations.go b/internal/repository/sqlite/migrations.go index 498b9eb7..946f0441 100644 --- a/internal/repository/sqlite/migrations.go +++ b/internal/repository/sqlite/migrations.go @@ -150,6 +150,10 @@ func (d *DB) getCurrentVersion() int { // runMigration 在事务中运行单个迁移 func (d *DB) runMigration(m Migration) error { + // 注意:MySQL 的 DDL(如 CREATE/DROP INDEX)会触发隐式提交(implicit commit), + // 这意味着即使这里用 gorm.Transaction 包裹,MySQL 路径也无法提供严格的“DDL + 迁移记录”原子性。 + // + // 因此迁移实现必须尽量幂等:例如重复执行 CREATE INDEX 时,仅在 ER_DUP_KEYNAME(1061) 场景下视为成功。 return d.gorm.Transaction(func(tx *gorm.DB) error { // 运行迁移 if m.Up != nil { @@ -207,6 +211,8 @@ func (d *DB) RollbackMigration(targetVersion int) error { // rollbackMigration 在事务中回滚单个迁移 func (d *DB) rollbackMigration(m Migration) error { + // 同 runMigration:MySQL DDL 在回滚路径同样可能发生隐式提交,因此这里的事务主要用于把“回滚逻辑” + // 与“删除迁移记录”尽量绑定在一起,但不应假设 MySQL 上能做到严格原子回滚。 return d.gorm.Transaction(func(tx *gorm.DB) error { // 运行回滚 if m.Down != nil { diff --git a/web/src/pages/requests/index.tsx b/web/src/pages/requests/index.tsx index 4884f172..b84df678 100644 --- a/web/src/pages/requests/index.tsx +++ b/web/src/pages/requests/index.tsx @@ -320,11 +320,10 @@ export function RequestsPage() { - {allRequests.map((req, index) => ( + {allRequests.map((req) => ( onOpenRequest(request.id); return ( @@ -582,18 +577,18 @@ function LogRow({ onClick={handleClick} className={cn( 'cursor-pointer group transition-colors', - // Zebra striping - applies to all rows as base layer - zebraClass, + // Zebra striping - use CSS selector to avoid passing index (inserts won't invalidate memo) + 'even:bg-foreground/[0.03]', // Base hover effect (stronger background change) !isRecent && !isFailed && !isPending && !isPendingBinding && 'hover:bg-accent/50', // Failed state - Red background only (testing without border) - isFailed && cn(index % 2 === 1 ? 'bg-red-500/25' : 'bg-red-500/20', 'hover:bg-red-500/40'), + isFailed && cn('bg-red-500/20 even:bg-red-500/25', 'hover:bg-red-500/40'), // Pending binding state - Amber background with left border isPendingBinding && cn( - index % 2 === 1 ? 'bg-amber-500/15' : 'bg-amber-500/10', + 'bg-amber-500/10 even:bg-amber-500/15', 'hover:bg-amber-500/25', 'border-l-2 border-l-amber-500', ), @@ -604,7 +599,7 @@ function LogRow({ (enableMarquee ? 'animate-marquee-row' : cn( - index % 2 === 1 ? 'bg-blue-500/10' : 'bg-blue-500/5', + 'bg-blue-500/5 even:bg-blue-500/10', 'border-l-2 border-l-blue-500/50', )), @@ -765,7 +760,6 @@ const MemoLogRow = memo( LogRow, (prev: Readonly, next: Readonly) => { if (prev.request !== next.request) return false; - if (prev.index !== next.index) return false; if (prev.providerName !== next.providerName) return false; if (prev.projectName !== next.projectName) return false; if (prev.tokenName !== next.tokenName) return false;