Skip to content

Commit

Permalink
Simplify pageserver_physical_gc function (#10104)
Browse files Browse the repository at this point in the history
This simplifies the code in `pageserver_physical_gc` a little bit after
the feedback in #10007 that the code is too complicated.

Most importantly, we don't pass around `GcSummary` any more in a
complicated fashion, and we save on async stream-combinator-inception in
one place in favour of `try_stream!{}`.

Follow-up of #10007
  • Loading branch information
arpad-m authored Jan 20, 2025
1 parent 2de2b26 commit 2ab9f69
Showing 1 changed file with 40 additions and 46 deletions.
86 changes: 40 additions & 46 deletions storage_scrubber/src/pageserver_physical_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::checks::{
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
use async_stream::try_stream;
use futures::future::Either;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest;
Expand Down Expand Up @@ -578,7 +580,7 @@ async fn gc_timeline(
target: &RootTarget,
mode: GcMode,
ttid: TenantShardTimelineId,
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
accumulator: &std::sync::Mutex<TenantRefAccumulator>,
tenant_manifest_info: Arc<Option<RemoteTenantManifestInfo>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
Expand Down Expand Up @@ -721,26 +723,26 @@ pub async fn pageserver_physical_gc(

let remote_client = Arc::new(remote_client);
let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target))
Either::Left(stream_tenants(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
};

// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;

// Accumulate information about each tenant for cross-shard GC step we'll do at the end
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
let accumulator = std::sync::Mutex::new(TenantRefAccumulator::default());

// Accumulate information about how many manifests we have GCd
let manifest_gc_summary = std::sync::Mutex::new(GcSummary::default());

// Generate a stream of TenantTimelineId
enum GcSummaryOrContent<T> {
Content(T),
GcSummary(GcSummary),
}
let timelines = tenants.map_ok(|tenant_shard_id| {
let target_ref = &target;
let remote_client_ref = &remote_client;
let manifest_gc_summary_ref = &manifest_gc_summary;
async move {
let gc_manifest_result = gc_tenant_manifests(
remote_client_ref,
Expand All @@ -757,64 +759,56 @@ pub async fn pageserver_physical_gc(
(GcSummary::default(), None)
}
};
manifest_gc_summary_ref
.lock()
.unwrap()
.merge(summary_from_manifest);
let tenant_manifest_arc = Arc::new(tenant_manifest_opt);
let summary_from_manifest = Ok(GcSummaryOrContent::<(_, _)>::GcSummary(
summary_from_manifest,
));
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id)
.await
.map(|stream| {
stream
.zip(futures::stream::iter(std::iter::repeat(
tenant_manifest_arc,
)))
.map(|(ttid_res, tenant_manifest_arc)| {
ttid_res.map(move |ttid| {
GcSummaryOrContent::Content((ttid, tenant_manifest_arc))
})
})
.chain(futures::stream::iter([summary_from_manifest].into_iter()))
})
let mut timelines = Box::pin(
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
);
Ok(try_stream! {
while let Some(ttid_res) = timelines.next().await {
let ttid = ttid_res?;
yield (ttid, tenant_manifest_arc.clone());
}
})
}
});
let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
let timelines = timelines.try_flatten();

let mut summary = GcSummary::default();

// Drain futures for per-shard GC, populating accumulator as a side effect
{
let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid {
GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) => {
futures::future::Either::Left(gc_timeline(
&remote_client,
&min_age,
&target,
mode,
ttid,
&accumulator,
tenant_manifest_arc,
))
}
GcSummaryOrContent::GcSummary(gc_summary) => {
futures::future::Either::Right(futures::future::ok(gc_summary))
}
let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
let timelines = timelines.try_flatten();

let timelines = timelines.map_ok(|(ttid, tenant_manifest_arc)| {
gc_timeline(
&remote_client,
&min_age,
&target,
mode,
ttid,
&accumulator,
tenant_manifest_arc,
)
});
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));

// Drain futures for per-shard GC, populating accumulator as a side effect
while let Some(i) = timelines.next().await {
summary.merge(i?);
}
}
// Streams are lazily evaluated, so only now do we have access to the inner object
summary.merge(manifest_gc_summary.into_inner().unwrap());

// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
let Some(client) = controller_client else {
tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
return Ok(summary);
};

let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator)
.unwrap()
let (ancestor_shards, ancestor_refs) = accumulator
.into_inner()
.unwrap()
.into_gc_ancestors(client, &mut summary)
Expand Down

1 comment on commit 2ab9f69

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7510 tests run: 7118 passed, 2 failed, 390 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_layer_map[release-pg16-github-actions-selfhosted] or test_sharding_autosplit[release-pg16-github-actions-selfhosted]"
Flaky tests (6)

Postgres 17

Postgres 15

Postgres 14

Code coverage* (full report)

  • functions: 33.6% (8446 of 25111 functions)
  • lines: 49.2% (70795 of 143967 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
2ab9f69 at 2025-01-21T00:16:24.322Z :recycle:

Please sign in to comment.