Skip to content

Commit

Permalink
pageserver: API for invoking page trace
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jan 14, 2025
1 parent 44f1c49 commit 7dc86ec
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
bit_field.workspace = true
bincode.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
Expand Down
60 changes: 60 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
Expand Down Expand Up @@ -90,6 +91,7 @@ use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::PageTrace;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::OffloadedTimeline;
Expand Down Expand Up @@ -1521,6 +1523,60 @@ async fn timeline_gc_unblocking_handler(
block_or_unblock_gc(request, false).await
}

async fn timeline_page_trace_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let state = get_state(&request);

let size_limit =
parse_query_param::<_, u64>(&request, "size_limit_bytes")?.unwrap_or(1024 * 1024);
let time_limit_secs = parse_query_param::<_, u64>(&request, "time_limit_secs")?.unwrap_or(5);

// Convert size limit to event limit based on known serialized size of an event
let event_limit = size_limit / 32;

let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;

let (page_trace, mut trace_rx) = PageTrace::new(event_limit);
timeline.page_trace.store(Arc::new(Some(page_trace)));

let mut buffer = Vec::with_capacity(size_limit as usize);

let deadline = Instant::now() + Duration::from_secs(time_limit_secs);

loop {
let timeout = deadline.saturating_duration_since(Instant::now());
tokio::select! {
event = trace_rx.recv() => {
buffer.extend(bincode::serialize(&event).unwrap());

if buffer.len() >= size_limit as usize {
// Size threshold reached
break;
}
}
_ = tokio::time::sleep(timeout) => {
// Time threshold reached
break;
}
}
}

// Above code is infallible, so we guarantee to switch the trace off when done
timeline.page_trace.store(Arc::new(None));

Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(buffer))
.unwrap())
}

/// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`.
///
/// Both are technically unsafe because they might fire off index uploads, thus they are POST.
Expand Down Expand Up @@ -3487,6 +3543,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc",
|r| api_handler(r, timeline_gc_unblocking_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/page_trace",
|r| api_handler(r, timeline_page_trace_handler),
)
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
api_handler(r, secondary_upload_handler)
})
Expand Down

0 comments on commit 7dc86ec

Please sign in to comment.