Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 293 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions crates/token_proxy_core/src/proxy/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct DashboardSummary {
pub output_tokens: u64,
pub cached_tokens: u64,
pub avg_latency_ms: u64,
pub median_latency_ms: u64,
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -145,6 +146,9 @@ WHERE (?1 IS NULL OR ts_ms >= ?1) AND (?2 IS NULL OR ts_ms <= ?2);
latency_sum_ms / total_requests
};

// 中位数查询:使用 LIMIT/OFFSET 取中间值
let median_latency_ms = query_median_latency(pool, from_ts_ms, to_ts_ms).await?;

Ok(DashboardSummary {
total_requests,
success_requests,
Expand All @@ -154,9 +158,57 @@ WHERE (?1 IS NULL OR ts_ms >= ?1) AND (?2 IS NULL OR ts_ms <= ?2);
output_tokens,
cached_tokens,
avg_latency_ms,
median_latency_ms,
})
}

/// 计算中位数延迟(SQLite 无内置 MEDIAN,使用单条子查询避免并发写入时的 count/offset 错位)
async fn query_median_latency(
pool: &sqlx::SqlitePool,
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
) -> Result<u64, String> {
// 单条 SQL 完成中位数计算:
// - 使用 CTE 保证 count 和数据在同一快照内
// - 奇数个取中间值,偶数个取中间两个值的整数除法平均
let row = sqlx::query(
r#"
WITH filtered AS (
SELECT latency_ms
FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1) AND (?2 IS NULL OR ts_ms <= ?2)
),
cnt AS (
SELECT COUNT(*) AS n FROM filtered
),
ordered AS (
SELECT latency_ms, ROW_NUMBER() OVER (ORDER BY latency_ms) AS rn
FROM filtered
)
SELECT COALESCE(
CASE
WHEN (SELECT n FROM cnt) = 0 THEN 0
WHEN (SELECT n FROM cnt) % 2 = 1 THEN
(SELECT latency_ms FROM ordered WHERE rn = ((SELECT n FROM cnt) + 1) / 2)
ELSE
(SELECT (o1.latency_ms + o2.latency_ms) / 2
FROM ordered o1, ordered o2
WHERE o1.rn = (SELECT n FROM cnt) / 2 AND o2.rn = (SELECT n FROM cnt) / 2 + 1)
END,
0
) AS median_latency;
"#,
)
.bind(from_ts_ms)
.bind(to_ts_ms)
.fetch_one(pool)
.await
.map_err(|err| format!("Failed to query median latency: {err}"))?;

let median: i64 = row.try_get("median_latency").unwrap_or(0);
Ok(i64_to_u64(median))
}

async fn query_providers(
pool: &sqlx::SqlitePool,
from_ts_ms: Option<i64>,
Expand Down
150 changes: 150 additions & 0 deletions crates/token_proxy_core/src/proxy/dashboard.test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,153 @@ fn fill_series_buckets_returns_original_when_range_unknown_and_empty() {
let filled = fill_series_buckets(Vec::new(), None, None, bucket_ms);
assert!(filled.is_empty());
}

// ============================================================================
// query_median_latency 测试
// ============================================================================

use sqlx::{sqlite::SqlitePoolOptions, SqlitePool};

/// 创建内存数据库并初始化 schema
async fn setup_test_db() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.expect("Failed to create in-memory database");

sqlx::query(
r#"
CREATE TABLE request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts_ms INTEGER NOT NULL,
path TEXT NOT NULL,
provider TEXT NOT NULL,
upstream_id TEXT NOT NULL,
model TEXT,
mapped_model TEXT,
stream INTEGER NOT NULL,
status INTEGER NOT NULL,
input_tokens INTEGER,
output_tokens INTEGER,
total_tokens INTEGER,
cached_tokens INTEGER,
usage_json TEXT,
upstream_request_id TEXT,
request_headers TEXT,
request_body TEXT,
response_error TEXT,
latency_ms INTEGER NOT NULL
);
"#,
)
.execute(&pool)
.await
.expect("Failed to create table");

pool
}

/// 插入测试数据,只需指定 latency_ms
async fn insert_latency(pool: &SqlitePool, latency_ms: i64) {
sqlx::query(
r#"
INSERT INTO request_logs (ts_ms, path, provider, upstream_id, stream, status, latency_ms)
VALUES (0, '/test', 'test', 'test', 0, 200, ?)
"#,
)
.bind(latency_ms)
.execute(pool)
.await
.expect("Failed to insert test data");
}

#[tokio::test]
async fn median_latency_empty_table_returns_zero() {
let pool = setup_test_db().await;
let result = query_median_latency(&pool, None, None).await.unwrap();
assert_eq!(result, 0, "Empty table should return 0");
}

#[tokio::test]
async fn median_latency_single_value() {
let pool = setup_test_db().await;
insert_latency(&pool, 100).await;

let result = query_median_latency(&pool, None, None).await.unwrap();
assert_eq!(result, 100, "Single value should be the median");
}

#[tokio::test]
async fn median_latency_odd_count() {
let pool = setup_test_db().await;
// 插入 3 个值: 10, 20, 30 -> 中位数应为 20
insert_latency(&pool, 10).await;
insert_latency(&pool, 30).await;
insert_latency(&pool, 20).await;

let result = query_median_latency(&pool, None, None).await.unwrap();
assert_eq!(result, 20, "Odd count median should be middle value");
}

#[tokio::test]
async fn median_latency_even_count() {
let pool = setup_test_db().await;
// 插入 4 个值: 10, 20, 30, 40 -> 中位数应为 (20+30)/2 = 25
insert_latency(&pool, 10).await;
insert_latency(&pool, 40).await;
insert_latency(&pool, 20).await;
insert_latency(&pool, 30).await;

let result = query_median_latency(&pool, None, None).await.unwrap();
assert_eq!(
result, 25,
"Even count median should be average of two middle values"
);
}

#[tokio::test]
async fn median_latency_even_count_rounds_down() {
let pool = setup_test_db().await;
// 插入 2 个值: 10, 21 -> 中位数应为 (10+21)/2 = 15 (整数除法向下取整)
insert_latency(&pool, 10).await;
insert_latency(&pool, 21).await;

let result = query_median_latency(&pool, None, None).await.unwrap();
assert_eq!(result, 15, "Median should use integer division");
}

#[tokio::test]
async fn median_latency_with_time_range_filter() {
let pool = setup_test_db().await;

// 插入不同时间戳的数据
sqlx::query(
"INSERT INTO request_logs (ts_ms, path, provider, upstream_id, stream, status, latency_ms) VALUES (100, '/test', 'test', 'test', 0, 200, 50)",
)
.execute(&pool)
.await
.unwrap();

sqlx::query(
"INSERT INTO request_logs (ts_ms, path, provider, upstream_id, stream, status, latency_ms) VALUES (200, '/test', 'test', 'test', 0, 200, 100)",
)
.execute(&pool)
.await
.unwrap();

sqlx::query(
"INSERT INTO request_logs (ts_ms, path, provider, upstream_id, stream, status, latency_ms) VALUES (300, '/test', 'test', 'test', 0, 200, 150)",
)
.execute(&pool)
.await
.unwrap();

// 只查询 ts_ms 在 150-250 范围内的数据,应该只有 latency_ms=100 的记录
let result = query_median_latency(&pool, Some(150), Some(250)).await.unwrap();
assert_eq!(result, 100, "Should filter by time range");

// 查询所有数据,中位数应为 100
let result_all = query_median_latency(&pool, None, None).await.unwrap();
assert_eq!(result_all, 100, "All data median should be 100");
}
61 changes: 52 additions & 9 deletions crates/token_proxy_core/src/proxy/logs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
use serde::Serialize;
use sqlx::Row;

/// 请求日志详情,包含表格展示的基础字段和详情面板的扩展字段
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RequestLogDetail {
pub id: u64,
// 基础字段(与表格一致)
pub ts_ms: i64,
pub path: String,
pub provider: String,
pub upstream_id: String,
pub model: Option<String>,
pub mapped_model: Option<String>,
pub stream: bool,
pub status: i32,
pub input_tokens: Option<i64>,
pub output_tokens: Option<i64>,
pub total_tokens: Option<i64>,
pub cached_tokens: Option<i64>,
pub latency_ms: i64,
pub upstream_request_id: Option<String>,
// 详情扩展字段
pub usage_json: Option<String>,
pub request_headers: Option<String>,
pub request_body: Option<String>,
pub response_error: Option<String>,
Expand All @@ -18,6 +36,21 @@ pub async fn read_request_log_detail(
r#"
SELECT
id,
ts_ms,
path,
provider,
upstream_id,
model,
mapped_model,
stream,
status,
input_tokens,
output_tokens,
total_tokens,
cached_tokens,
latency_ms,
upstream_request_id,
usage_json,
request_headers,
request_body,
response_error
Expand All @@ -35,15 +68,25 @@ LIMIT 1;
return Err("Request log not found.".to_string());
};

let id = row.try_get::<i64, _>("id").unwrap_or_default();
let request_headers = row.try_get::<Option<String>, _>("request_headers").ok().flatten();
let request_body = row.try_get::<Option<String>, _>("request_body").ok().flatten();
let response_error = row.try_get::<Option<String>, _>("response_error").ok().flatten();

Ok(RequestLogDetail {
id: id.max(0) as u64,
request_headers,
request_body,
response_error,
id: row.try_get::<i64, _>("id").unwrap_or_default().max(0) as u64,
ts_ms: row.try_get::<i64, _>("ts_ms").unwrap_or_default(),
path: row.try_get::<String, _>("path").unwrap_or_default(),
provider: row.try_get::<String, _>("provider").unwrap_or_default(),
upstream_id: row.try_get::<String, _>("upstream_id").unwrap_or_default(),
model: row.try_get::<Option<String>, _>("model").ok().flatten(),
mapped_model: row.try_get::<Option<String>, _>("mapped_model").ok().flatten(),
stream: row.try_get::<i32, _>("stream").unwrap_or_default() != 0,
status: row.try_get::<i32, _>("status").unwrap_or_default(),
input_tokens: row.try_get::<Option<i64>, _>("input_tokens").ok().flatten(),
output_tokens: row.try_get::<Option<i64>, _>("output_tokens").ok().flatten(),
total_tokens: row.try_get::<Option<i64>, _>("total_tokens").ok().flatten(),
cached_tokens: row.try_get::<Option<i64>, _>("cached_tokens").ok().flatten(),
latency_ms: row.try_get::<i64, _>("latency_ms").unwrap_or_default(),
upstream_request_id: row.try_get::<Option<String>, _>("upstream_request_id").ok().flatten(),
usage_json: row.try_get::<Option<String>, _>("usage_json").ok().flatten(),
request_headers: row.try_get::<Option<String>, _>("request_headers").ok().flatten(),
request_body: row.try_get::<Option<String>, _>("request_body").ok().flatten(),
response_error: row.try_get::<Option<String>, _>("response_error").ok().flatten(),
})
}
8 changes: 8 additions & 0 deletions crates/token_proxy_core/src/proxy/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ CREATE TABLE IF NOT EXISTS request_logs (
.await
.map_err(|err| format!("Failed to create idx_request_logs_provider_ts_ms: {err}"))?;

// 复合索引:优化中位数延迟查询(按时间范围过滤后按延迟排序)
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_request_logs_ts_latency ON request_logs(ts_ms, latency_ms);",
)
.execute(pool)
.await
.map_err(|err| format!("Failed to create idx_request_logs_ts_latency: {err}"))?;

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion crates/token_proxy_core/src/proxy/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,10 +680,11 @@ async fn resolve_antigravity_upstream(
}

fn build_mapped_meta(meta: &RequestMeta, upstream: &UpstreamRuntime, provider: &str) -> RequestMeta {
// 只有当实际发生映射时才设置 mapped_model,避免与 original_model 重复
let mapped_model = meta
.original_model
.as_deref()
.map(|original| upstream.map_model(original).unwrap_or_else(|| original.to_string()));
.and_then(|original| upstream.map_model(original));
let (mapped_model, reasoning_effort) = normalize_mapped_model_reasoning_suffix(
mapped_model,
meta.reasoning_effort.clone(),
Expand Down
14 changes: 12 additions & 2 deletions messages/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@
"update_release_notes_empty": "No release notes.",
"update_last_checked": "Last checked: {time}",
"update_check": "Check for updates",
"update_download_install": "Download & install",
"update_download_install": "Download",
"update_restart_now": "Restart now",
"update_status_idle": "Not checked",
"update_status_checking": "Checking",
Expand Down Expand Up @@ -376,7 +376,7 @@
"dashboard_hint_error_rate": "Error rate {rate}",
"dashboard_tokens_hint_no_cache": "Input {input} · Output {output}",
"dashboard_tokens_hint_with_cache": "Input {input} · Output {output} · Cached {cached}",
"dashboard_latency_hint": "Time to first byte (avg.)",
"dashboard_latency_hint": "Median {median}",
"dashboard_providers_title": "Providers",
"dashboard_providers_desc": "Sorted by tokens (Top 10)",
"dashboard_no_data": "No data",
Expand All @@ -402,6 +402,16 @@
"logs_detail_desc": "Headers/body appear only when capture is enabled; error responses are always recorded for failed requests.",
"logs_detail_loading": "Loading…",
"logs_detail_error": "Load failed",
"logs_detail_copy": "Copy all",
"logs_detail_copied": "Copied",
"logs_detail_copy_failed": "Copy failed",
"logs_detail_basic_info": "Basic info",
"logs_detail_stream": "Stream",
"logs_detail_stream_yes": "Yes",
"logs_detail_stream_no": "No",
"logs_detail_upstream_request_id": "Upstream request ID",
"logs_detail_model_mapped": "Model (mapped)",
"logs_detail_usage_json": "Usage (JSON)",
"logs_detail_headers": "Request headers",
"logs_detail_body": "Request body",
"logs_detail_response": "Error response",
Expand Down
Loading