Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 165 additions & 14 deletions crates/token_proxy_core/src/proxy/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ pub struct DashboardProviderStat {
#[serde(rename_all = "camelCase")]
pub struct DashboardUpstreamStat {
pub upstream_id: String,
pub provider: String,
pub requests: u64,
pub total_tokens: u64,
pub cached_tokens: u64,
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DashboardAccountStat {
pub upstream_id: String,
pub account_id: Option<String>,
pub requests: u64,
pub total_tokens: u64,
pub cached_tokens: u64,
Expand Down Expand Up @@ -82,6 +91,7 @@ pub struct DashboardSnapshot {
pub summary: DashboardSummary,
pub providers: Vec<DashboardProviderStat>,
pub upstreams: Vec<DashboardUpstreamStat>,
pub accounts: Vec<DashboardAccountStat>,
pub series: Vec<DashboardSeriesPoint>,
pub recent: Vec<DashboardRequestItem>,
/// 是否只基于日志文件末尾片段做统计(Step1:true;Step2 SQLite 后应为 false)。
Expand All @@ -93,26 +103,74 @@ pub async fn read_snapshot(
range: DashboardRange,
offset: Option<u32>,
upstream_id: Option<String>,
account_id: Option<String>,
public_only: bool,
) -> Result<DashboardSnapshot, String> {
let offset = offset.unwrap_or(0);

let from_ts_ms = range.from_ts_ms.map(|value| value as i64);
let to_ts_ms = range.to_ts_ms.map(|value| value as i64);
let upstream_id = upstream_id.as_deref();
let bucket_ms = resolve_bucket_ms(&pool, from_ts_ms, to_ts_ms, upstream_id).await?;

let summary = query_summary(&pool, from_ts_ms, to_ts_ms, upstream_id).await?;
let providers = query_providers(&pool, from_ts_ms, to_ts_ms, upstream_id).await?;
let account_id = account_id.as_deref();
let bucket_ms = resolve_bucket_ms(
&pool,
from_ts_ms,
to_ts_ms,
upstream_id,
account_id,
public_only,
)
.await?;

let summary = query_summary(
&pool,
from_ts_ms,
to_ts_ms,
upstream_id,
account_id,
public_only,
)
.await?;
let providers = query_providers(
&pool,
from_ts_ms,
to_ts_ms,
upstream_id,
account_id,
public_only,
)
.await?;
// 选项列表只受时间范围限制,切换筛选时仍可看到同一范围内的其它上游。
let upstreams = query_upstreams(&pool, from_ts_ms, to_ts_ms).await?;
let series = query_series(&pool, from_ts_ms, to_ts_ms, bucket_ms, upstream_id).await?;
// 账户选项跟随上游收窄,但不受当前账户筛选影响。
let accounts = query_accounts(&pool, from_ts_ms, to_ts_ms, upstream_id).await?;
let series = query_series(
&pool,
from_ts_ms,
to_ts_ms,
bucket_ms,
upstream_id,
account_id,
public_only,
)
.await?;
let series = fill_series_buckets(series, from_ts_ms, to_ts_ms, bucket_ms);
let recent = query_recent(&pool, from_ts_ms, to_ts_ms, offset, upstream_id).await?;
let recent = query_recent(
&pool,
from_ts_ms,
to_ts_ms,
offset,
upstream_id,
account_id,
public_only,
)
.await?;

Ok(DashboardSnapshot {
summary,
providers,
upstreams,
accounts,
series,
recent,
truncated: false,
Expand All @@ -124,6 +182,8 @@ async fn query_summary(
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
upstream_id: Option<&str>,
account_id: Option<&str>,
public_only: bool,
) -> Result<DashboardSummary, String> {
let row = sqlx::query(
r#"
Expand All @@ -143,12 +203,16 @@ SELECT
FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?3 IS NULL OR upstream_id = ?3);
AND (?3 IS NULL OR upstream_id = ?3)
AND (?4 IS NULL OR account_id = ?4)
AND (?5 = 0 OR account_id IS NULL);
"#,
)
.bind(from_ts_ms)
.bind(to_ts_ms)
.bind(upstream_id)
.bind(account_id)
.bind(public_only)
.fetch_one(pool)
.await
.map_err(|err| format!("Failed to query dashboard summary: {err}"))?;
Expand All @@ -169,7 +233,15 @@ WHERE (?1 IS NULL OR ts_ms >= ?1)
};

// 中位数查询:使用 LIMIT/OFFSET 取中间值
let median_latency_ms = query_median_latency(pool, from_ts_ms, to_ts_ms, upstream_id).await?;
let median_latency_ms = query_median_latency(
pool,
from_ts_ms,
to_ts_ms,
upstream_id,
account_id,
public_only,
)
.await?;

Ok(DashboardSummary {
total_requests,
Expand All @@ -190,6 +262,8 @@ async fn query_median_latency(
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
upstream_id: Option<&str>,
account_id: Option<&str>,
public_only: bool,
) -> Result<u64, String> {
// 单条 SQL 完成中位数计算:
// - 使用 CTE 保证 count 和数据在同一快照内
Expand All @@ -202,6 +276,8 @@ WITH filtered AS (
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?3 IS NULL OR upstream_id = ?3)
AND (?4 IS NULL OR account_id = ?4)
AND (?5 = 0 OR account_id IS NULL)
),
cnt AS (
SELECT COUNT(*) AS n FROM filtered
Expand All @@ -227,6 +303,8 @@ SELECT COALESCE(
.bind(from_ts_ms)
.bind(to_ts_ms)
.bind(upstream_id)
.bind(account_id)
.bind(public_only)
.fetch_one(pool)
.await
.map_err(|err| format!("Failed to query median latency: {err}"))?;
Expand All @@ -240,6 +318,8 @@ async fn query_providers(
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
upstream_id: Option<&str>,
account_id: Option<&str>,
public_only: bool,
) -> Result<Vec<DashboardProviderStat>, String> {
let providers = sqlx::query(
r#"
Expand All @@ -256,13 +336,17 @@ FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?3 IS NULL OR upstream_id = ?3)
AND (?4 IS NULL OR account_id = ?4)
AND (?5 = 0 OR account_id IS NULL)
GROUP BY provider
ORDER BY total_tokens DESC;
"#,
)
.bind(from_ts_ms)
.bind(to_ts_ms)
.bind(upstream_id)
.bind(account_id)
.bind(public_only)
.fetch_all(pool)
.await
.map_err(|err| format!("Failed to query provider stats: {err}"))?
Expand Down Expand Up @@ -293,7 +377,6 @@ async fn query_upstreams(
r#"
SELECT
upstream_id,
provider,
COUNT(*) AS requests,
COALESCE(SUM(CASE
WHEN total_tokens IS NOT NULL THEN total_tokens
Expand All @@ -304,7 +387,7 @@ SELECT
FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
GROUP BY upstream_id, provider
GROUP BY upstream_id
ORDER BY total_tokens DESC, requests DESC, upstream_id ASC;
"#,
)
Expand All @@ -316,13 +399,11 @@ ORDER BY total_tokens DESC, requests DESC, upstream_id ASC;
.into_iter()
.filter_map(|row| {
let upstream_id: String = row.try_get("upstream_id").ok()?;
let provider: String = row.try_get("provider").ok()?;
let requests: i64 = row.try_get("requests").ok()?;
let total_tokens: i64 = row.try_get("total_tokens").ok()?;
let cached_tokens: i64 = row.try_get("cached_tokens").ok()?;
Some(DashboardUpstreamStat {
upstream_id,
provider,
requests: i64_to_u64(requests),
total_tokens: i64_to_u64(total_tokens),
cached_tokens: i64_to_u64(cached_tokens),
Expand All @@ -333,12 +414,66 @@ ORDER BY total_tokens DESC, requests DESC, upstream_id ASC;
Ok(upstreams)
}

async fn query_accounts(
pool: &sqlx::SqlitePool,
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
upstream_id: Option<&str>,
) -> Result<Vec<DashboardAccountStat>, String> {
let accounts = sqlx::query(
r#"
SELECT
upstream_id,
account_id,
COUNT(*) AS requests,
COALESCE(SUM(CASE
WHEN total_tokens IS NOT NULL THEN total_tokens
WHEN input_tokens IS NOT NULL OR output_tokens IS NOT NULL THEN COALESCE(input_tokens, 0) + COALESCE(output_tokens, 0)
ELSE 0
END), 0) AS total_tokens,
COALESCE(SUM(COALESCE(cached_tokens, 0)), 0) AS cached_tokens
FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?3 IS NULL OR upstream_id = ?3)
GROUP BY upstream_id, account_id
ORDER BY upstream_id ASC, account_id IS NULL DESC, requests DESC, account_id ASC;
"#,
)
.bind(from_ts_ms)
.bind(to_ts_ms)
.bind(upstream_id)
.fetch_all(pool)
.await
.map_err(|err| format!("Failed to query dashboard accounts: {err}"))?
.into_iter()
.filter_map(|row| {
let upstream_id: String = row.try_get("upstream_id").ok()?;
let account_id: Option<String> = row.try_get("account_id").ok()?;
let requests: i64 = row.try_get("requests").ok()?;
let total_tokens: i64 = row.try_get("total_tokens").ok()?;
let cached_tokens: i64 = row.try_get("cached_tokens").ok()?;
Some(DashboardAccountStat {
upstream_id,
account_id,
requests: i64_to_u64(requests),
total_tokens: i64_to_u64(total_tokens),
cached_tokens: i64_to_u64(cached_tokens),
})
})
.collect::<Vec<_>>();

Ok(accounts)
}

async fn query_series(
pool: &sqlx::SqlitePool,
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
bucket_ms: u64,
upstream_id: Option<&str>,
account_id: Option<&str>,
public_only: bool,
) -> Result<Vec<DashboardSeriesPoint>, String> {
let series = sqlx::query(
r#"
Expand All @@ -358,6 +493,8 @@ FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?4 IS NULL OR upstream_id = ?4)
AND (?5 IS NULL OR account_id = ?5)
AND (?6 = 0 OR account_id IS NULL)
GROUP BY bucket_ts_ms
ORDER BY bucket_ts_ms ASC;
"#,
Expand All @@ -366,6 +503,8 @@ ORDER BY bucket_ts_ms ASC;
.bind(to_ts_ms)
.bind(i64::try_from(bucket_ms).unwrap_or(i64::MAX))
.bind(upstream_id)
.bind(account_id)
.bind(public_only)
.fetch_all(pool)
.await
.map_err(|err| format!("Failed to query dashboard series: {err}"))?
Expand Down Expand Up @@ -476,6 +615,8 @@ async fn query_recent(
to_ts_ms: Option<i64>,
offset: u32,
upstream_id: Option<&str>,
account_id: Option<&str>,
public_only: bool,
) -> Result<Vec<DashboardRequestItem>, String> {
let recent = sqlx::query(
r#"
Expand Down Expand Up @@ -503,6 +644,8 @@ FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?5 IS NULL OR upstream_id = ?5)
AND (?6 IS NULL OR account_id = ?6)
AND (?7 = 0 OR account_id IS NULL)
ORDER BY ts_ms DESC
LIMIT ?3 OFFSET ?4;
"#,
Expand All @@ -512,6 +655,8 @@ LIMIT ?3 OFFSET ?4;
.bind(i64::from(RECENT_PAGE_SIZE))
.bind(i64::from(offset))
.bind(upstream_id)
.bind(account_id)
.bind(public_only)
.fetch_all(pool)
.await
.map_err(|err| format!("Failed to query recent requests: {err}"))?
Expand Down Expand Up @@ -560,6 +705,8 @@ async fn resolve_bucket_ms(
from_ts_ms: Option<i64>,
to_ts_ms: Option<i64>,
upstream_id: Option<&str>,
account_id: Option<&str>,
public_only: bool,
) -> Result<u64, String> {
if let (Some(from), Some(to)) = (from_ts_ms, to_ts_ms) {
let span_ms = (to - from).max(0) as u64;
Expand All @@ -574,12 +721,16 @@ SELECT
FROM request_logs
WHERE (?1 IS NULL OR ts_ms >= ?1)
AND (?2 IS NULL OR ts_ms <= ?2)
AND (?3 IS NULL OR upstream_id = ?3);
AND (?3 IS NULL OR upstream_id = ?3)
AND (?4 IS NULL OR account_id = ?4)
AND (?5 = 0 OR account_id IS NULL);
"#,
)
.bind(from_ts_ms)
.bind(to_ts_ms)
.bind(upstream_id)
.bind(account_id)
.bind(public_only)
.fetch_one(pool)
.await
.map_err(|err| format!("Failed to query dashboard range: {err}"))?;
Expand Down
Loading
Loading