Skip to content

Commit

Permalink
pageserver: fix race cleaning up timeline files when shut down during…
Browse files Browse the repository at this point in the history
… bootstrap (#10532)

## Problem

Timeline bootstrap starts a flush loop, but doesn't reliably shut down
the timeline (incl. waiting for flush loop to exit) before destroying
UninitializedTimeline, and that destructor tries to clean up local
storage. If local storage is still being written to, then this is
unsound.

Currently the symptom is that we see a "Directory not empty" error log,
e.g.
https://neon-github-public-dev.s3.amazonaws.com/reports/main/12966756686/index.html#testresult/5523f7d15f46f7f7/retries

## Summary of changes

- Move fallible IO part of bootstrap into a function (notably, this is
fallible in the case of the tenant being shut down while creation is
happening)
- When that function returns an error, call shutdown() on the timeline
  • Loading branch information
jcsp authored Jan 30, 2025
1 parent bf6d5e9 commit 6da7c55
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 73 deletions.
8 changes: 6 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3169,12 +3169,16 @@ async fn put_tenant_timeline_import_basebackup(

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);

let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
let tenant_shard_id = TenantShardId::unsharded(tenant_id);

let span = info_span!("import_basebackup",
tenant_id=%tenant_id, timeline_id=%timeline_id, shard_id=%tenant_shard_id.shard_slug(),
base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
async move {
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?;
.get_attached_tenant_shard(tenant_shard_id)?;

let broker_client = state.broker_client.clone();

Expand Down
66 changes: 29 additions & 37 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2426,7 +2426,7 @@ impl Tenant {
// Make sure the freeze_and_flush reaches remote storage.
tline.remote_client.wait_completion().await.unwrap();

let tl = uninit_tl.finish_creation()?;
let tl = uninit_tl.finish_creation().await?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
Ok(tl)
Expand Down Expand Up @@ -4702,7 +4702,7 @@ impl Tenant {
)
.await?;

let new_timeline = uninitialized_timeline.finish_creation()?;
let new_timeline = uninitialized_timeline.finish_creation().await?;

// Root timeline gets its layers during creation and uploads them along with the metadata.
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
Expand Down Expand Up @@ -4892,10 +4892,11 @@ impl Tenant {
}

// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
let pgdata_path_deferred = pgdata_path.clone();
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
if let Err(e) = fs::remove_dir_all(&pgdata_path_deferred) {
// this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
error!("Failed to remove temporary initdb directory '{pgdata_path_deferred}': {e}");
}
}
if let Some(existing_initdb_timeline_id) = load_existing_initdb {
Expand Down Expand Up @@ -4962,7 +4963,7 @@ impl Tenant {
pgdata_lsn,
pg_version,
);
let raw_timeline = self
let mut raw_timeline = self
.prepare_new_timeline(
timeline_id,
&new_metadata,
Expand All @@ -4973,42 +4974,33 @@ impl Tenant {
.await?;

let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;

// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();

import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
pgdata_lsn,
ctx,
)
.await
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;
raw_timeline
.write(|unfinished_timeline| async move {
import_datadir::import_timeline_from_postgres_datadir(
&unfinished_timeline,
&pgdata_path,
pgdata_lsn,
ctx,
)
.await
.with_context(|| {
format!(
"Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}"
)
})?;

fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
});
fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
});

unfinished_timeline
.freeze_and_flush()
.await
.with_context(|| {
format!(
"Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
)
})?;
Ok(())
})
.await?;

// All done!
let timeline = raw_timeline.finish_creation()?;
let timeline = raw_timeline.finish_creation().await?;

// Callers are responsible to wait for uploads to complete and for activating the timeline.

Expand Down
129 changes: 95 additions & 34 deletions pageserver/src/tenant/timeline/uninit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::hash_map::Entry, fs, sync::Arc};
use std::{collections::hash_map::Entry, fs, future::Future, sync::Arc};

use anyhow::Context;
use camino::Utf8PathBuf;
Expand All @@ -8,7 +8,8 @@ use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
use crate::{
context::RequestContext,
import_datadir,
tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
span::debug_assert_current_span_has_tenant_and_timeline_id,
tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
};

use super::Timeline;
Expand All @@ -24,6 +25,9 @@ pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
/// Whether we spawned the inner Timeline's tasks such that we must later shut it down
/// if aborting the timeline creation
needs_shutdown: bool,
}

impl<'t> UninitializedTimeline<'t> {
Expand All @@ -36,6 +40,50 @@ impl<'t> UninitializedTimeline<'t> {
owning_tenant,
timeline_id,
raw_timeline,
needs_shutdown: false,
}
}

/// When writing data to this timeline during creation, use this wrapper: it will take care of
/// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
/// later.
pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
where
F: FnOnce(Arc<Timeline>) -> Fut,
Fut: Future<Output = Result<(), CreateTimelineError>>,
{
debug_assert_current_span_has_tenant_and_timeline_id();

// Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
self.needs_shutdown = true;

let timeline = self.raw_timeline()?;

// Spawn flush loop so that the Timeline is ready to accept writes
timeline.maybe_spawn_flush_loop();

// Invoke the provided function, which will write some data into the new timeline
if let Err(e) = f(timeline.clone()).await {
self.abort().await;
return Err(e.into());
}

// Flush the underlying timeline's ephemeral layers to disk
if let Err(e) = timeline
.freeze_and_flush()
.await
.context("Failed to flush after timeline creation writes")
{
self.abort().await;
return Err(e);
}

Ok(())
}

pub(crate) async fn abort(&self) {
if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
raw_timeline.shutdown(super::ShutdownMode::Hard).await;
}
}

Expand All @@ -44,11 +92,13 @@ impl<'t> UninitializedTimeline<'t> {
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_shard_id = self.owning_tenant.tenant_shard_id;

if self.raw_timeline.is_none() {
self.abort().await;

return Err(anyhow::anyhow!(
"No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
));
Expand All @@ -62,16 +112,25 @@ impl<'t> UninitializedTimeline<'t> {
.0
.get_disk_consistent_lsn();

anyhow::ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
);
if !new_disk_consistent_lsn.is_valid() {
self.abort().await;

return Err(anyhow::anyhow!(
"new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
));
}

let mut timelines = self.owning_tenant.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Occupied(_) => anyhow::bail!(
Entry::Occupied(_) => {
// Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline.
//
// We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should
// skip trying to delete the timeline directory too.
anyhow::bail!(
"Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
),
)
}
Entry::Vacant(v) => {
// after taking here should be no fallible operations, because the drop guard will not
// cleanup after and would block for example the tenant deletion
Expand All @@ -93,36 +152,31 @@ impl<'t> UninitializedTimeline<'t> {

/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self,
mut self,
tenant: Arc<Tenant>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
self.write(|raw_timeline| async move {
import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")
.map_err(CreateTimelineError::Other)?;

import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
.await
.context("Failed to import basebackup")?;

// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
raw_timeline.maybe_spawn_flush_loop();

fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
});

raw_timeline
.freeze_and_flush()
.await
.context("Failed to flush after basebackup import")?;
Ok(())
})
.await?;

// All the data has been imported. Insert the Timeline into the tenant's timelines map
let tl = self.finish_creation()?;
let tl = self.finish_creation().await?;
tl.activate(tenant, broker_client, None, ctx);
Ok(tl)
}
Expand All @@ -143,12 +197,19 @@ impl<'t> UninitializedTimeline<'t> {

impl Drop for UninitializedTimeline<'_> {
fn drop(&mut self) {
if let Some((_, create_guard)) = self.raw_timeline.take() {
if let Some((timeline, create_guard)) = self.raw_timeline.take() {
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
// This is unusual, but can happen harmlessly if the pageserver is stopped while
// creating a timeline.
info!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(create_guard);
if self.needs_shutdown && !timeline.gate.close_complete() {
// This should not happen: caller should call [`Self::abort`] on failures
tracing::warn!(
"Timeline not shut down after initialization failure, cannot clean up files"
);
} else {
// This is unusual, but can happen harmlessly if the pageserver is stopped while
// creating a timeline.
info!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(create_guard);
}
}
}
}
Expand Down

1 comment on commit 6da7c55

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7565 tests run: 7204 passed, 0 failed, 361 skipped (full report)


Flaky tests (7)

Postgres 17

Postgres 14

Code coverage* (full report)

  • functions: 33.4% (8516 of 25532 functions)
  • lines: 49.1% (71500 of 145551 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
6da7c55 at 2025-01-30T23:08:28.751Z :recycle:

Please sign in to comment.