Skip to content

Commit

Permalink
Add performance regress test_ondemand_download_churn.py (#7242)
Browse files Browse the repository at this point in the history
Add performance regress test  for on-demand download throughput.

Closes #7146

Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
  • Loading branch information
3 people authored and a-masterov committed May 20, 2024
1 parent 9783e0f commit 4c740ea
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 21 deletions.
10 changes: 10 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,16 @@ impl HistoricLayerInfo {
};
*field = value;
}
pub fn layer_file_size(&self) -> u64 {
match self {
HistoricLayerInfo::Delta {
layer_file_size, ..
} => *layer_file_size,
HistoricLayerInfo::Image {
layer_file_size, ..
} => *layer_file_size,
}
}
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
103 changes: 82 additions & 21 deletions pageserver/pagebench/src/cmd/ondemand_download_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId};

use pageserver_client::mgmt_api;
use rand::seq::SliceRandom;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use utils::id::{TenantTimelineId, TimelineId};

use std::{f64, sync::Arc};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
task::JoinSet,
};

use std::{
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -51,19 +50,31 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
Ok(())
}

#[derive(serde::Serialize)]
struct Output {
downloads_count: u64,
downloads_bytes: u64,
evictions_count: u64,
timeline_restarts: u64,
#[serde(with = "humantime_serde")]
runtime: Duration,
}

#[derive(Debug, Default)]
struct LiveStats {
evictions: AtomicU64,
downloads: AtomicU64,
evictions_count: AtomicU64,
downloads_count: AtomicU64,
downloads_bytes: AtomicU64,
timeline_restarts: AtomicU64,
}

impl LiveStats {
fn eviction_done(&self) {
self.evictions.fetch_add(1, Ordering::Relaxed);
self.evictions_count.fetch_add(1, Ordering::Relaxed);
}
fn download_done(&self) {
self.downloads.fetch_add(1, Ordering::Relaxed);
fn download_done(&self, size: u64) {
self.downloads_count.fetch_add(1, Ordering::Relaxed);
self.downloads_bytes.fetch_add(size, Ordering::Relaxed);
}
fn timeline_restart_done(&self) {
self.timeline_restarts.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -92,28 +103,49 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
)
.await?;

let token = CancellationToken::new();
let mut tasks = JoinSet::new();

let live_stats = Arc::new(LiveStats::default());
let periodic_stats = Arc::new(LiveStats::default());
let total_stats = Arc::new(LiveStats::default());

let start = Instant::now();
tasks.spawn({
let live_stats = Arc::clone(&live_stats);
let periodic_stats = Arc::clone(&periodic_stats);
let total_stats = Arc::clone(&total_stats);
let cloned_token = token.clone();
async move {
let mut last_at = Instant::now();
loop {
if cloned_token.is_cancelled() {
return;
}
tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await;
let now = Instant::now();
let delta: Duration = now - last_at;
last_at = now;

let LiveStats {
evictions,
downloads,
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*live_stats;
let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
} = &*periodic_stats;
let evictions_count = evictions_count.swap(0, Ordering::Relaxed);
let downloads_count = downloads_count.swap(0, Ordering::Relaxed);
let downloads_bytes = downloads_bytes.swap(0, Ordering::Relaxed);
let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed);
info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}");

total_stats.evictions_count.fetch_add(evictions_count, Ordering::Relaxed);
total_stats.downloads_count.fetch_add(downloads_count, Ordering::Relaxed);
total_stats.downloads_bytes.fetch_add(downloads_bytes, Ordering::Relaxed);
total_stats.timeline_restarts.fetch_add(timeline_restarts, Ordering::Relaxed);

let evictions_per_s = evictions_count as f64 / delta.as_secs_f64();
let downloads_per_s = downloads_count as f64 / delta.as_secs_f64();
let downloads_mibs_per_s = downloads_bytes as f64 / delta.as_secs_f64() / ((1 << 20) as f64);

info!("evictions={evictions_per_s:.2}/s downloads={downloads_per_s:.2}/s download_bytes={downloads_mibs_per_s:.2}MiB/s timeline_restarts={timeline_restarts}");
}
}
});
Expand All @@ -124,14 +156,42 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
args,
Arc::clone(&mgmt_api_client),
tl,
Arc::clone(&live_stats),
Arc::clone(&periodic_stats),
token.clone(),
));
}
}
if let Some(runtime) = args.runtime {
tokio::spawn(async move {
tokio::time::sleep(runtime.into()).await;
token.cancel();
});
}

while let Some(res) = tasks.join_next().await {
res.unwrap();
}
let end = Instant::now();
let duration: Duration = end - start;

let output = {
let LiveStats {
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*total_stats;
Output {
downloads_count: downloads_count.load(Ordering::Relaxed),
downloads_bytes: downloads_bytes.load(Ordering::Relaxed),
evictions_count: evictions_count.load(Ordering::Relaxed),
timeline_restarts: timeline_restarts.load(Ordering::Relaxed),
runtime: duration,
}
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");

Ok(())
}

Expand All @@ -140,6 +200,7 @@ async fn timeline_actor(
mgmt_api_client: Arc<pageserver_client::mgmt_api::Client>,
timeline: TenantTimelineId,
live_stats: Arc<LiveStats>,
token: CancellationToken,
) {
// TODO: support sharding
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
Expand All @@ -149,7 +210,7 @@ async fn timeline_actor(
layers: Vec<mpsc::Sender<OwnedSemaphorePermit>>,
concurrency: Arc<tokio::sync::Semaphore>,
}
loop {
while !token.is_cancelled() {
debug!("restarting timeline");
let layer_map_info = mgmt_api_client
.layer_map_info(tenant_shard_id, timeline.timeline_id)
Expand Down Expand Up @@ -185,7 +246,7 @@ async fn timeline_actor(

live_stats.timeline_restart_done();

loop {
while !token.is_cancelled() {
assert!(!timeline.joinset.is_empty());
if let Some(res) = timeline.joinset.try_join_next() {
debug!(?res, "a layer actor exited, should not happen");
Expand Down Expand Up @@ -255,7 +316,7 @@ async fn layer_actor(
.layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name())
.await
.unwrap();
live_stats.download_done();
live_stats.download_done(layer.layer_file_size());
did_it
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import json
from pathlib import Path
from typing import Any, Dict, Tuple

import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import s3_storage
from fixtures.utils import humantime_to_ms


@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("io_engine", ["tokio-epoll-uring", "std-fs"])
@pytest.mark.parametrize("concurrency_per_target", [1, 10, 100])
@pytest.mark.timeout(1000)
def test_download_churn(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
io_engine: str,
concurrency_per_target: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_ondemand_download_churn.{metric}", **kwargs)

params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}

# params from fixtures
params.update(
{
# we don't capture `duration`, but instead use the `runtime` output field from pagebench
}
)

# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)

for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)

# Setup env
env = setup_env(neon_env_builder, pg_bin)
env.pageserver.allowed_errors.append(
f".*path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)

run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration)


def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)

# We configure tenant conf such that SQL query below produces a lot of layers.
# We don't care what's in the layers really, we just care that layers are created.
bytes_per_layer = 10 * (1024**2)
env = neon_env_builder.init_start(
initial_tenant_conf={
"pitr_interval": "1000d", # let's not make it get in the way
"gc_period": "0s", # disable periodic gc to avoid noise
"compaction_period": "0s", # disable L0=>L1 compaction
"checkpoint_timeout": "10years", # rely solely on checkpoint_distance
"checkpoint_distance": bytes_per_layer, # 10M instead of 256M to create more smaller layers
"image_creation_threshold": 100000, # don't create image layers ever
}
)

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()

with env.endpoints.create_start("main", tenant_id=tenant_id) as ep:
ep.safe_psql("CREATE TABLE data (random_text text)")
bytes_per_row = 512 # make big enough so WAL record size doesn't dominate
desired_layers = 300
desired_bytes = bytes_per_layer * desired_layers
nrows = desired_bytes / bytes_per_row
ep.safe_psql(
f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i",
options="-c statement_timeout=0",
)
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
# TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here
wait_for_upload_queue_empty(client, tenant_id, timeline_id)

return env


def run_benchmark(
env: NeonEnv,
pg_bin: PgBin,
record,
io_engine: str,
concurrency_per_target: int,
duration_secs: int,
):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"ondemand-download-churn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--runtime",
f"{duration_secs}s",
"--set-io-engine",
f"{io_engine}",
"--concurrency-per-target",
f"{concurrency_per_target}",
# don't specify the targets explicitly, let pagebench auto-discover them
]

log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")

with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")

metric = "downloads_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)

metric = "downloads_bytes"
record(
metric,
metric_value=results[metric],
unit="byte",
report=MetricReport.HIGHER_IS_BETTER,
)

metric = "evictions_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)

metric = "timeline_restarts"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.LOWER_IS_BETTER,
)

metric = "runtime"
record(
metric,
metric_value=humantime_to_ms(results[metric]) / 1000,
unit="s",
report=MetricReport.TEST_PARAM,
)

0 comments on commit 4c740ea

Please sign in to comment.