@@ -7,6 +7,7 @@ use std::collections::HashMap;
7
7
use std:: str:: FromStr ;
8
8
use std:: sync:: Arc ;
9
9
use std:: time:: Duration ;
10
+ use std:: time:: Instant ;
10
11
11
12
use anyhow:: { anyhow, Context , Result } ;
12
13
use enumset:: EnumSet ;
@@ -90,6 +91,7 @@ use crate::tenant::timeline::CompactFlags;
90
91
use crate :: tenant:: timeline:: CompactOptions ;
91
92
use crate :: tenant:: timeline:: CompactRequest ;
92
93
use crate :: tenant:: timeline:: CompactionError ;
94
+ use crate :: tenant:: timeline:: PageTrace ;
93
95
use crate :: tenant:: timeline:: Timeline ;
94
96
use crate :: tenant:: GetTimelineError ;
95
97
use crate :: tenant:: OffloadedTimeline ;
@@ -1521,6 +1523,60 @@ async fn timeline_gc_unblocking_handler(
1521
1523
block_or_unblock_gc ( request, false ) . await
1522
1524
}
1523
1525
1526
+ async fn timeline_page_trace_handler (
1527
+ request : Request < Body > ,
1528
+ _cancel : CancellationToken ,
1529
+ ) -> Result < Response < Body > , ApiError > {
1530
+ let tenant_shard_id: TenantShardId = parse_request_param ( & request, "tenant_shard_id" ) ?;
1531
+ let timeline_id: TimelineId = parse_request_param ( & request, "timeline_id" ) ?;
1532
+ let state = get_state ( & request) ;
1533
+
1534
+ let size_limit =
1535
+ parse_query_param :: < _ , u64 > ( & request, "size_limit_bytes" ) ?. unwrap_or ( 1024 * 1024 ) ;
1536
+ let time_limit_secs = parse_query_param :: < _ , u64 > ( & request, "time_limit_secs" ) ?. unwrap_or ( 5 ) ;
1537
+
1538
+ // Convert size limit to event limit based on known serialized size of an event
1539
+ let event_limit = size_limit / 32 ;
1540
+
1541
+ let timeline =
1542
+ active_timeline_of_active_tenant ( & state. tenant_manager , tenant_shard_id, timeline_id)
1543
+ . await ?;
1544
+
1545
+ let ( page_trace, mut trace_rx) = PageTrace :: new ( event_limit) ;
1546
+ timeline. page_trace . store ( Arc :: new ( Some ( page_trace) ) ) ;
1547
+
1548
+ let mut buffer = Vec :: with_capacity ( size_limit as usize ) ;
1549
+
1550
+ let deadline = Instant :: now ( ) + Duration :: from_secs ( time_limit_secs) ;
1551
+
1552
+ loop {
1553
+ let timeout = deadline. saturating_duration_since ( Instant :: now ( ) ) ;
1554
+ tokio:: select! {
1555
+ event = trace_rx. recv( ) => {
1556
+ buffer. extend( bincode:: serialize( & event) . unwrap( ) ) ;
1557
+
1558
+ if buffer. len( ) >= size_limit as usize {
1559
+ // Size threshold reached
1560
+ break ;
1561
+ }
1562
+ }
1563
+ _ = tokio:: time:: sleep( timeout) => {
1564
+ // Time threshold reached
1565
+ break ;
1566
+ }
1567
+ }
1568
+ }
1569
+
1570
+ // Above code is infallible, so we guarantee to switch the trace off when done
1571
+ timeline. page_trace . store ( Arc :: new ( None ) ) ;
1572
+
1573
+ Ok ( Response :: builder ( )
1574
+ . status ( StatusCode :: OK )
1575
+ . header ( header:: CONTENT_TYPE , "application/octet-stream" )
1576
+ . body ( hyper:: Body :: from ( buffer) )
1577
+ . unwrap ( ) )
1578
+ }
1579
+
1524
1580
/// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`.
1525
1581
///
1526
1582
/// Both are technically unsafe because they might fire off index uploads, thus they are POST.
@@ -3487,6 +3543,10 @@ pub fn make_router(
3487
3543
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc" ,
3488
3544
|r| api_handler ( r, timeline_gc_unblocking_handler) ,
3489
3545
)
3546
+ . post (
3547
+ "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/page_trace" ,
3548
+ |r| api_handler ( r, timeline_page_trace_handler) ,
3549
+ )
3490
3550
. post ( "/v1/tenant/:tenant_shard_id/heatmap_upload" , |r| {
3491
3551
api_handler ( r, secondary_upload_handler)
3492
3552
} )
0 commit comments