From 4c740ea6fb35084a5a9a965c9ca7eea587255ba4 Mon Sep 17 00:00:00 2001 From: Jure Bajic Date: Wed, 15 May 2024 18:41:12 +0200 Subject: [PATCH] Add performance regress `test_ondemand_download_churn.py` (#7242) Add performance regress test for on-demand download throughput. Closes https://github.com/neondatabase/neon/issues/7146 Co-authored-by: Christian Schwarz Co-authored-by: Alexander Bayandin --- libs/pageserver_api/src/models.rs | 10 + .../src/cmd/ondemand_download_churn.rs | 103 ++++++++--- .../pagebench/test_ondemand_download_churn.py | 175 ++++++++++++++++++ 3 files changed, 267 insertions(+), 21 deletions(-) create mode 100644 test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d78d2bcbeae0..7cf54bf32a95 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -745,6 +745,16 @@ impl HistoricLayerInfo { }; *field = value; } + pub fn layer_file_size(&self) -> u64 { + match self { + HistoricLayerInfo::Delta { + layer_file_size, .. + } => *layer_file_size, + HistoricLayerInfo::Image { + layer_file_size, .. + } => *layer_file_size, + } + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs index 197e782dca47..1bb71b93533c 100644 --- a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs +++ b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs @@ -2,9 +2,11 @@ use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId}; use pageserver_client::mgmt_api; use rand::seq::SliceRandom; +use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use utils::id::{TenantTimelineId, TimelineId}; +use std::{f64, sync::Arc}; use tokio::{ sync::{mpsc, OwnedSemaphorePermit}, task::JoinSet, @@ -12,10 +14,7 @@ use tokio::{ use std::{ num::NonZeroUsize, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant}, }; @@ -51,19 +50,31 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { Ok(()) } +#[derive(serde::Serialize)] +struct Output { + downloads_count: u64, + downloads_bytes: u64, + evictions_count: u64, + timeline_restarts: u64, + #[serde(with = "humantime_serde")] + runtime: Duration, +} + #[derive(Debug, Default)] struct LiveStats { - evictions: AtomicU64, - downloads: AtomicU64, + evictions_count: AtomicU64, + downloads_count: AtomicU64, + downloads_bytes: AtomicU64, timeline_restarts: AtomicU64, } impl LiveStats { fn eviction_done(&self) { - self.evictions.fetch_add(1, Ordering::Relaxed); + self.evictions_count.fetch_add(1, Ordering::Relaxed); } - fn download_done(&self) { - self.downloads.fetch_add(1, Ordering::Relaxed); + fn download_done(&self, size: u64) { + self.downloads_count.fetch_add(1, Ordering::Relaxed); + self.downloads_bytes.fetch_add(size, Ordering::Relaxed); } fn timeline_restart_done(&self) { self.timeline_restarts.fetch_add(1, Ordering::Relaxed); @@ -92,28 +103,49 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { ) .await?; + let token = CancellationToken::new(); let mut tasks = JoinSet::new(); - let live_stats = Arc::new(LiveStats::default()); + let periodic_stats = Arc::new(LiveStats::default()); + let total_stats = Arc::new(LiveStats::default()); + + let start = Instant::now(); tasks.spawn({ - let live_stats = Arc::clone(&live_stats); + let periodic_stats = Arc::clone(&periodic_stats); + let total_stats = Arc::clone(&total_stats); + let cloned_token = token.clone(); async move { let mut last_at = Instant::now(); loop { + if cloned_token.is_cancelled() { + return; + } tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await; let now = Instant::now(); let delta: Duration = now - last_at; last_at = now; let LiveStats { - evictions, - downloads, + evictions_count, + downloads_count, + downloads_bytes, timeline_restarts, - } = &*live_stats; - let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64(); - let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64(); + } = &*periodic_stats; + let evictions_count = evictions_count.swap(0, Ordering::Relaxed); + let downloads_count = downloads_count.swap(0, Ordering::Relaxed); + let downloads_bytes = downloads_bytes.swap(0, Ordering::Relaxed); let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed); - info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}"); + + total_stats.evictions_count.fetch_add(evictions_count, Ordering::Relaxed); + total_stats.downloads_count.fetch_add(downloads_count, Ordering::Relaxed); + total_stats.downloads_bytes.fetch_add(downloads_bytes, Ordering::Relaxed); + total_stats.timeline_restarts.fetch_add(timeline_restarts, Ordering::Relaxed); + + let evictions_per_s = evictions_count as f64 / delta.as_secs_f64(); + let downloads_per_s = downloads_count as f64 / delta.as_secs_f64(); + let downloads_mibs_per_s = downloads_bytes as f64 / delta.as_secs_f64() / ((1 << 20) as f64); + + info!("evictions={evictions_per_s:.2}/s downloads={downloads_per_s:.2}/s download_bytes={downloads_mibs_per_s:.2}MiB/s timeline_restarts={timeline_restarts}"); } } }); @@ -124,14 +156,42 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { args, Arc::clone(&mgmt_api_client), tl, - Arc::clone(&live_stats), + Arc::clone(&periodic_stats), + token.clone(), )); } } + if let Some(runtime) = args.runtime { + tokio::spawn(async move { + tokio::time::sleep(runtime.into()).await; + token.cancel(); + }); + } while let Some(res) = tasks.join_next().await { res.unwrap(); } + let end = Instant::now(); + let duration: Duration = end - start; + + let output = { + let LiveStats { + evictions_count, + downloads_count, + downloads_bytes, + timeline_restarts, + } = &*total_stats; + Output { + downloads_count: downloads_count.load(Ordering::Relaxed), + downloads_bytes: downloads_bytes.load(Ordering::Relaxed), + evictions_count: evictions_count.load(Ordering::Relaxed), + timeline_restarts: timeline_restarts.load(Ordering::Relaxed), + runtime: duration, + } + }; + let output = serde_json::to_string_pretty(&output).unwrap(); + println!("{output}"); + Ok(()) } @@ -140,6 +200,7 @@ async fn timeline_actor( mgmt_api_client: Arc, timeline: TenantTimelineId, live_stats: Arc, + token: CancellationToken, ) { // TODO: support sharding let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id); @@ -149,7 +210,7 @@ async fn timeline_actor( layers: Vec>, concurrency: Arc, } - loop { + while !token.is_cancelled() { debug!("restarting timeline"); let layer_map_info = mgmt_api_client .layer_map_info(tenant_shard_id, timeline.timeline_id) @@ -185,7 +246,7 @@ async fn timeline_actor( live_stats.timeline_restart_done(); - loop { + while !token.is_cancelled() { assert!(!timeline.joinset.is_empty()); if let Some(res) = timeline.joinset.try_join_next() { debug!(?res, "a layer actor exited, should not happen"); @@ -255,7 +316,7 @@ async fn layer_actor( .layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name()) .await .unwrap(); - live_stats.download_done(); + live_stats.download_done(layer.layer_file_size()); did_it } }; diff --git a/test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py b/test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py new file mode 100644 index 000000000000..644c1f559b80 --- /dev/null +++ b/test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py @@ -0,0 +1,175 @@ +import json +from pathlib import Path +from typing import Any, Dict, Tuple + +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn +from fixtures.pageserver.utils import wait_for_upload_queue_empty +from fixtures.remote_storage import s3_storage +from fixtures.utils import humantime_to_ms + + +@pytest.mark.parametrize("duration", [30]) +@pytest.mark.parametrize("io_engine", ["tokio-epoll-uring", "std-fs"]) +@pytest.mark.parametrize("concurrency_per_target", [1, 10, 100]) +@pytest.mark.timeout(1000) +def test_download_churn( + neon_env_builder: NeonEnvBuilder, + zenbenchmark: NeonBenchmarker, + pg_bin: PgBin, + io_engine: str, + concurrency_per_target: int, + duration: int, +): + def record(metric, **kwargs): + zenbenchmark.record(metric_name=f"pageserver_ondemand_download_churn.{metric}", **kwargs) + + params: Dict[str, Tuple[Any, Dict[str, Any]]] = {} + + # params from fixtures + params.update( + { + # we don't capture `duration`, but instead use the `runtime` output field from pagebench + } + ) + + # configure cache sizes like in prod + page_cache_size = 16384 + max_file_descriptors = 500000 + neon_env_builder.pageserver_config_override = ( + f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}" + ) + params.update( + { + "pageserver_config_override.page_cache_size": ( + page_cache_size * 8192, + {"unit": "byte"}, + ), + "pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}), + } + ) + + for param, (value, kwargs) in params.items(): + record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs) + + # Setup env + env = setup_env(neon_env_builder, pg_bin) + env.pageserver.allowed_errors.append( + f".*path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" + ) + + run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration) + + +def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + # We configure tenant conf such that SQL query below produces a lot of layers. + # We don't care what's in the layers really, we just care that layers are created. + bytes_per_layer = 10 * (1024**2) + env = neon_env_builder.init_start( + initial_tenant_conf={ + "pitr_interval": "1000d", # let's not make it get in the way + "gc_period": "0s", # disable periodic gc to avoid noise + "compaction_period": "0s", # disable L0=>L1 compaction + "checkpoint_timeout": "10years", # rely solely on checkpoint_distance + "checkpoint_distance": bytes_per_layer, # 10M instead of 256M to create more smaller layers + "image_creation_threshold": 100000, # don't create image layers ever + } + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + client = env.pageserver.http_client() + + with env.endpoints.create_start("main", tenant_id=tenant_id) as ep: + ep.safe_psql("CREATE TABLE data (random_text text)") + bytes_per_row = 512 # make big enough so WAL record size doesn't dominate + desired_layers = 300 + desired_bytes = bytes_per_layer * desired_layers + nrows = desired_bytes / bytes_per_row + ep.safe_psql( + f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i", + options="-c statement_timeout=0", + ) + wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id) + # TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here + wait_for_upload_queue_empty(client, tenant_id, timeline_id) + + return env + + +def run_benchmark( + env: NeonEnv, + pg_bin: PgBin, + record, + io_engine: str, + concurrency_per_target: int, + duration_secs: int, +): + ps_http = env.pageserver.http_client() + cmd = [ + str(env.neon_binpath / "pagebench"), + "ondemand-download-churn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--runtime", + f"{duration_secs}s", + "--set-io-engine", + f"{io_engine}", + "--concurrency-per-target", + f"{concurrency_per_target}", + # don't specify the targets explicitly, let pagebench auto-discover them + ] + + log.info(f"command: {' '.join(cmd)}") + basepath = pg_bin.run_capture(cmd, with_command_header=False) + results_path = Path(basepath + ".stdout") + log.info(f"Benchmark results at: {results_path}") + + with open(results_path, "r") as f: + results = json.load(f) + log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}") + + metric = "downloads_count" + record( + metric, + metric_value=results[metric], + unit="", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "downloads_bytes" + record( + metric, + metric_value=results[metric], + unit="byte", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "evictions_count" + record( + metric, + metric_value=results[metric], + unit="", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "timeline_restarts" + record( + metric, + metric_value=results[metric], + unit="", + report=MetricReport.LOWER_IS_BETTER, + ) + + metric = "runtime" + record( + metric, + metric_value=humantime_to_ms(results[metric]) / 1000, + unit="s", + report=MetricReport.TEST_PARAM, + )