From 4132b106a9830f107a00c515a8a4b9be13dfb5eb Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 7 Jan 2025 11:52:04 +0000 Subject: [PATCH] pageserver: API for invoking page trace --- Cargo.lock | 1 + pageserver/Cargo.toml | 1 + pageserver/src/http/routes.rs | 60 +++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index e2d5e03613b1..dc81cc573b5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3877,6 +3877,7 @@ dependencies = [ "arc-swap", "async-compression", "async-stream", + "bincode", "bit_field", "byteorder", "bytes", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 140b287ccc18..8ed93f1aed25 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -16,6 +16,7 @@ arc-swap.workspace = true async-compression.workspace = true async-stream.workspace = true bit_field.workspace = true +bincode.workspace = true byteorder.workspace = true bytes.workspace = true camino.workspace = true diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index db7d29385641..573d9f350be8 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use anyhow::{anyhow, Context, Result}; use enumset::EnumSet; @@ -90,6 +91,7 @@ use crate::tenant::timeline::CompactFlags; use crate::tenant::timeline::CompactOptions; use crate::tenant::timeline::CompactRequest; use crate::tenant::timeline::CompactionError; +use crate::tenant::timeline::PageTrace; use crate::tenant::timeline::Timeline; use crate::tenant::GetTimelineError; use crate::tenant::OffloadedTimeline; @@ -1521,6 +1523,60 @@ async fn timeline_gc_unblocking_handler( block_or_unblock_gc(request, false).await } +async fn timeline_page_trace_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let state = get_state(&request); + + let size_limit = + parse_query_param::<_, u64>(&request, "size_limit_bytes")?.unwrap_or(1024 * 1024); + let time_limit_secs = parse_query_param::<_, u64>(&request, "time_limit_secs")?.unwrap_or(5); + + // Convert size limit to event limit based on known serialized size of an event + let event_limit = size_limit / 32; + + let timeline = + active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) + .await?; + + let (page_trace, mut trace_rx) = PageTrace::new(event_limit); + timeline.page_trace.store(Arc::new(Some(page_trace))); + + let mut buffer = Vec::with_capacity(size_limit as usize); + + let deadline = Instant::now() + Duration::from_secs(time_limit_secs); + + loop { + let timeout = deadline.saturating_duration_since(Instant::now()); + tokio::select! { + event = trace_rx.recv() => { + buffer.extend(bincode::serialize(&event).unwrap()); + + if buffer.len() >= size_limit as usize { + // Size threshold reached + break; + } + } + _ = tokio::time::sleep(timeout) => { + // Time threshold reached + break; + } + } + } + + // Above code is infallible, so we guarantee to switch the trace off when done + timeline.page_trace.store(Arc::new(None)); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(hyper::Body::from(buffer)) + .unwrap()) +} + /// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`. /// /// Both are technically unsafe because they might fire off index uploads, thus they are POST. @@ -3455,6 +3511,10 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc", |r| api_handler(r, timeline_gc_unblocking_handler), ) + .post( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/page_trace", + |r| api_handler(r, timeline_page_trace_handler), + ) .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| { api_handler(r, secondary_upload_handler) })