Skip to content

Commit

Permalink
pageserver: implement auto-splitting (#7681)
Browse files Browse the repository at this point in the history
## Problem

Currently tenants are only split into multiple shards if a human being
calls the API to do it.

Issue: #7388 

## Summary of changes

- Add a pageserver API for returning the top tenants by size
- Add a step to the controller's background loop where if there is no
reconciliation or optimization to be done, it looks for things to split.
- Add a test that runs pgbench on many tenants concurrently, and checks
that splitting happens as expected as tenants grow, without interrupting
the client I/O.

This PR is quite basic: there is a tasklist in
#7388 for further work. This
PR is meant to be safe (off by default), and sufficient to enable our
staging environment to run lots of sharded tenants without a human
having to set them up.
  • Loading branch information
jcsp authored May 17, 2024
1 parent af99c95 commit c84656a
Show file tree
Hide file tree
Showing 15 changed files with 689 additions and 46 deletions.
4 changes: 4 additions & 0 deletions control_plane/src/local_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ pub struct NeonStorageControllerConf {
/// Heartbeat timeout before marking a node offline
#[serde(with = "humantime_serde")]
pub max_unavailable: Duration,

/// Threshold for auto-splitting a tenant into shards
pub split_threshold: Option<u64>,
}

impl NeonStorageControllerConf {
Expand All @@ -164,6 +167,7 @@ impl Default for NeonStorageControllerConf {
fn default() -> Self {
Self {
max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL,
split_threshold: None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions control_plane/src/storage_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ impl StorageController {
));
}

if let Some(split_threshold) = self.config.split_threshold.as_ref() {
args.push(format!("--split-threshold={split_threshold}"))
}

background_process::start_process(
COMMAND,
&self.env.base_data_dir,
Expand Down
49 changes: 49 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,55 @@ pub struct TenantScanRemoteStorageResponse {
pub shards: Vec<TenantScanRemoteStorageShard>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TenantSorting {
ResidentSize,
MaxLogicalSize,
}

impl Default for TenantSorting {
fn default() -> Self {
Self::ResidentSize
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TopTenantShardsRequest {
// How would you like to sort the tenants?
pub order_by: TenantSorting,

// How many results?
pub limit: usize,

// Omit tenants with more than this many shards (e.g. if this is the max number of shards
// that the caller would ever split to)
pub where_shards_lt: Option<ShardCount>,

// Omit tenants where the ordering metric is less than this (this is an optimization to
// let us quickly exclude numerous tiny shards)
pub where_gt: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct TopTenantShardItem {
pub id: TenantShardId,

/// Total size of layers on local disk for all timelines in this tenant
pub resident_size: u64,

/// Total size of layers in remote storage for all timelines in this tenant
pub physical_size: u64,

/// The largest logical size of a timeline within this tenant
pub max_logical_size: u64,
}

#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TopTenantShardsResponse {
pub shards: Vec<TopTenantShardItem>,
}

pub mod virtual_file {
#[derive(
Copy,
Expand Down
2 changes: 1 addition & 1 deletion libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ShardCount {

/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub fn new(val: u8) -> Self {
pub const fn new(val: u8) -> Self {
Self(val)
}
}
Expand Down
12 changes: 12 additions & 0 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,18 @@ impl Client {
.map_err(Error::ReceiveBody)
}

pub async fn top_tenant_shards(
&self,
request: TopTenantShardsRequest,
) -> Result<TopTenantShardsResponse> {
let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
self.request(Method::POST, uri, request)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}

pub async fn layer_map_info(
&self,
tenant_shard_id: TenantShardId,
Expand Down
98 changes: 98 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//!
//! Management HTTP API
//!
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -24,7 +26,11 @@ use pageserver_api::models::TenantScanRemoteStorageShard;
use pageserver_api::models::TenantShardLocation;
use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TenantState;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
use pageserver_api::models::TopTenantShardsResponse;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
Expand Down Expand Up @@ -2323,6 +2329,97 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError)
}

/// Report on the largest tenants on this pageserver, for the storage controller to identify
/// candidates for splitting
async fn post_top_tenants(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let request: TopTenantShardsRequest = json_request(&mut r).await?;
let state = get_state(&r);

fn get_size_metric(sizes: &TopTenantShardItem, order_by: &TenantSorting) -> u64 {
match order_by {
TenantSorting::ResidentSize => sizes.resident_size,
TenantSorting::MaxLogicalSize => sizes.max_logical_size,
}
}

#[derive(Eq, PartialEq)]
struct HeapItem {
metric: u64,
sizes: TopTenantShardItem,
}

impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

/// Heap items have reverse ordering on their metric: this enables using BinaryHeap, which
/// supports popping the greatest item but not the smallest.
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
Reverse(self.metric).cmp(&Reverse(other.metric))
}
}

let mut top_n: BinaryHeap<HeapItem> = BinaryHeap::with_capacity(request.limit);

// FIXME: this is a lot of clones to take this tenant list
for (tenant_shard_id, tenant_slot) in state.tenant_manager.list() {
if let Some(shards_lt) = request.where_shards_lt {
// Ignore tenants which already have >= this many shards
if tenant_shard_id.shard_count >= shards_lt {
continue;
}
}

let sizes = match tenant_slot {
TenantSlot::Attached(tenant) => tenant.get_sizes(),
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
continue;
}
};
let metric = get_size_metric(&sizes, &request.order_by);

if let Some(gt) = request.where_gt {
// Ignore tenants whose metric is <= the lower size threshold, to do less sorting work
if metric <= gt {
continue;
}
};

match top_n.peek() {
None => {
// Top N list is empty: candidate becomes first member
top_n.push(HeapItem { metric, sizes });
}
Some(i) if i.metric > metric && top_n.len() < request.limit => {
// Lowest item in list is greater than our candidate, but we aren't at limit yet: push to end
top_n.push(HeapItem { metric, sizes });
}
Some(i) if i.metric > metric => {
// List is at limit and lowest value is greater than our candidate, drop it.
}
Some(_) => top_n.push(HeapItem { metric, sizes }),
}

while top_n.len() > request.limit {
top_n.pop();
}
}

json_response(
StatusCode::OK,
TopTenantShardsResponse {
shards: top_n.into_iter().map(|i| i.sizes).collect(),
},
)
}

/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
Expand Down Expand Up @@ -2609,5 +2706,6 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
.any(handler_404))
}
64 changes: 27 additions & 37 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2098,7 +2098,7 @@ pub(crate) struct TimelineMetrics {
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
resident_physical_size_gauge: UIntGauge,
pub resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
Expand Down Expand Up @@ -2312,6 +2312,7 @@ use pin_project_lite::pin_project;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
Expand All @@ -2321,82 +2322,71 @@ use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;

/// Maintain a per timeline gauge in addition to the global gauge.
struct PerTimelineRemotePhysicalSizeGauge {
last_set: u64,
pub(crate) struct PerTimelineRemotePhysicalSizeGauge {
last_set: AtomicU64,
gauge: UIntGauge,
}

impl PerTimelineRemotePhysicalSizeGauge {
fn new(per_timeline_gauge: UIntGauge) -> Self {
Self {
last_set: per_timeline_gauge.get(),
last_set: AtomicU64::new(0),
gauge: per_timeline_gauge,
}
}
fn set(&mut self, sz: u64) {
pub(crate) fn set(&self, sz: u64) {
self.gauge.set(sz);
if sz < self.last_set {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set - sz);
let prev = self.last_set.swap(sz, std::sync::atomic::Ordering::Relaxed);
if sz < prev {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(prev - sz);
} else {
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - self.last_set);
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - prev);
};
self.last_set = sz;
}
fn get(&self) -> u64 {
pub(crate) fn get(&self) -> u64 {
self.gauge.get()
}
}

impl Drop for PerTimelineRemotePhysicalSizeGauge {
fn drop(&mut self) {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set);
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set.load(std::sync::atomic::Ordering::Relaxed));
}
}

pub(crate) struct RemoteTimelineClientMetrics {
tenant_id: String,
shard_id: String,
timeline_id: String,
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
pub(crate) remote_physical_size_gauge: PerTimelineRemotePhysicalSizeGauge,
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
}

impl RemoteTimelineClientMetrics {
pub fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id_str = tenant_shard_id.tenant_id.to_string();
let shard_id_str = format!("{}", tenant_shard_id.shard_slug());
let timeline_id_str = timeline_id.to_string();

let remote_physical_size_gauge = PerTimelineRemotePhysicalSizeGauge::new(
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id_str, &shard_id_str, &timeline_id_str])
.unwrap(),
);

RemoteTimelineClientMetrics {
tenant_id: tenant_shard_id.tenant_id.to_string(),
shard_id: format!("{}", tenant_shard_id.shard_slug()),
timeline_id: timeline_id.to_string(),
tenant_id: tenant_id_str,
shard_id: shard_id_str,
timeline_id: timeline_id_str,
calls: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
remote_physical_size_gauge,
}
}

pub(crate) fn remote_physical_size_set(&self, sz: u64) {
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
let gauge = guard.get_or_insert_with(|| {
PerTimelineRemotePhysicalSizeGauge::new(
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.unwrap(),
)
});
gauge.set(sz);
}

pub(crate) fn remote_physical_size_get(&self) -> u64 {
let guard = self.remote_physical_size_gauge.lock().unwrap();
guard.as_ref().map(|gauge| gauge.get()).unwrap_or(0)
}

pub fn remote_operation_time(
&self,
file_kind: &RemoteOpFileKind,
Expand Down
Loading

1 comment on commit c84656a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3160 tests run: 3017 passed, 3 failed, 140 skipped (full report)


Failures on Postgres 14

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release
  • test_basebackup_with_high_slru_count[github-actions-selfhosted-sequential-10-13-30]: release
  • test_basebackup_with_high_slru_count[github-actions-selfhosted-vectored-10-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg14-github-actions-selfhosted] or test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-sequential-10-13-30] or test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-vectored-10-13-30]"
Flaky tests (3)

Postgres 14

  • test_crafted_wal_end[last_wal_record_xlog_switch_ends_on_page_boundary]: debug
  • test_hot_standby: debug
  • test_synthetic_size_while_deleting: release

Code coverage* (full report)

  • functions: 31.3% (6345 of 20287 functions)
  • lines: 47.3% (47927 of 101251 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
c84656a at 2024-05-17T17:19:06.745Z :recycle:

Please sign in to comment.