Skip to content

Commit

Permalink
fix: EphemeralFiles can outlive their Timeline via enum LayerManager (
Browse files Browse the repository at this point in the history
#8229)

Ephemeral files cleanup on drop but did not delay shutdown, leading to
problems with restarting the tenant. The solution is as proposed:
- make ephemeral files carry the gate guard to delay `Timeline::gate`
closing
- flush in-memory layers and strong references to those on
`Timeline::shutdown`

The above are realized by making LayerManager an `enum` with `Open` and
`Closed` variants, and fail requests to modify `LayerMap`.

Additionally:

- fix too eager anyhow conversions in compaction
- unify how we freeze layers and handle errors
- optimize likely_resident_layers to read LayerFileManager hashmap
values instead of bouncing through LayerMap

Fixes: #7830
  • Loading branch information
koivunej authored Aug 7, 2024
1 parent ad0988f commit fc78774
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 308 deletions.
3 changes: 2 additions & 1 deletion libs/utils/src/sync/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ impl Drop for GateGuard {
}
}

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum GateError {
#[error("gate is closed")]
GateClosed,
}

Expand Down
6 changes: 5 additions & 1 deletion pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ async fn ingest(

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();

let layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;

let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let ctx = RequestContext::new(
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,10 @@ async fn layer_map_info_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let layer_map_info = timeline.layer_map_info(reset).await;
let layer_map_info = timeline
.layer_map_info(reset)
.await
.map_err(|_shutdown| ApiError::ShuttingDown)?;

json_response(StatusCode::OK, layer_map_info)
}
Expand Down
38 changes: 18 additions & 20 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,12 @@ impl From<PageReconstructError> for GcError {
}
}

impl From<timeline::layer_manager::Shutdown> for GcError {
fn from(_: timeline::layer_manager::Shutdown) -> Self {
GcError::TimelineCancelled
}
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum LoadConfigError {
#[error("TOML deserialization error: '{0}'")]
Expand Down Expand Up @@ -710,6 +716,7 @@ impl Tenant {
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
Expand Down Expand Up @@ -4674,10 +4681,10 @@ mod tests {

let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.layer_map()?
.level0_deltas()
.iter()
.map(|desc| layer_map.get_from_desc(desc))
.collect::<Vec<_>>();

assert!(!level0_deltas.is_empty());
Expand Down Expand Up @@ -4908,11 +4915,13 @@ mod tests {
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;

let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let lm = guard.layer_map()?;

lm.dump(true, &ctx).await?;

let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
lm.iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
Expand Down Expand Up @@ -5918,23 +5927,12 @@ mod tests {
tline.freeze_and_flush().await?; // force create a delta layer
}

let before_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()
.len();
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();

tline.compact(&cancel, EnumSet::empty(), &ctx).await?;

let after_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()
.len();
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();

assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");

Expand Down
45 changes: 43 additions & 2 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
Expand All @@ -51,10 +52,12 @@ impl EphemeralFile {
)
.await?;

let prewarm = conf.l0_flush.prewarm_on_write();

Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
rw: page_caching::RW::new(file, prewarm, gate_guard),
})
}

Expand Down Expand Up @@ -161,7 +164,11 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
let gate = utils::sync::gate::Gate::default();

let entered = gate.enter().unwrap();

let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;

let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
Expand Down Expand Up @@ -215,4 +222,38 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn ephemeral_file_holds_gate_open() {
const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);

let (conf, tenant_id, timeline_id, ctx) =
harness("ephemeral_file_holds_gate_open").unwrap();

let gate = utils::sync::gate::Gate::default();

let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
.await
.unwrap();

let mut closing = tokio::task::spawn(async move {
gate.close().await;
});

// gate is entered until the ephemeral file is dropped
// do not start paused tokio-epoll-uring has a sleep loop
tokio::time::pause();
tokio::time::timeout(FOREVER, &mut closing)
.await
.expect_err("closing cannot complete before dropping");

// this is a requirement of the reset_tenant functionality: we have to be able to restart a
// tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
drop(file);

tokio::time::timeout(FOREVER, &mut closing)
.await
.expect("closing completes right away")
.expect("closing does not panic");
}
}
10 changes: 9 additions & 1 deletion pageserver/src/tenant/ephemeral_file/page_caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use super::zero_padded_read_write;
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}

/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
Expand All @@ -29,7 +31,11 @@ pub enum PrewarmOnWrite {
}

impl RW {
pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
pub fn new(
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
_gate_guard: utils::sync::gate::GateGuard,
) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
Expand All @@ -38,6 +44,7 @@ impl RW {
file,
prewarm_on_write,
)),
_gate_guard,
}
}

Expand Down Expand Up @@ -145,6 +152,7 @@ impl Drop for RW {
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.

// unlink the file
// we are clear to do this, because we have entered a gate
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/layer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,8 @@ impl LayerMap {
}

/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
self.l0_delta_layers.to_vec()
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
&self.l0_delta_layers
}

/// debugging function to print out the contents of the layer map
Expand Down
9 changes: 2 additions & 7 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1767,14 +1767,9 @@ impl TenantManager {
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
for timeline in timelines.values() {
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
let timeline_layers = timeline
.layers
.read()
.await
.likely_resident_layers()
.collect::<Vec<_>>();
let layers = timeline.layers.read().await;

for layer in timeline_layers {
for layer in layers.likely_resident_layers() {
let relative_path = layer
.local_path()
.strip_prefix(&parent_path)
Expand Down
4 changes: 3 additions & 1 deletion pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1957,6 +1957,7 @@ pub(crate) mod test {
.await
.likely_resident_layers()
.next()
.cloned()
.unwrap();

{
Expand Down Expand Up @@ -2031,7 +2032,8 @@ pub(crate) mod test {
.read()
.await
.likely_resident_layers()
.find(|x| x != &initdb_layer)
.find(|&x| x != &initdb_layer)
.cloned()
.unwrap();

// create a copy for the timeline, so we don't overwrite the file
Expand Down
4 changes: 3 additions & 1 deletion pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,13 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");

let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());

Ok(InMemoryLayer {
Expand Down
20 changes: 10 additions & 10 deletions pageserver/src/tenant/storage_layer/layer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn smoke_test() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};

assert_eq!(layers.len(), 1);
Expand Down Expand Up @@ -176,7 +176,7 @@ async fn smoke_test() {
{
let layers = &[layer];
let mut g = timeline.layers.write().await;
g.finish_gc_timeline(layers);
g.open_mut().unwrap().finish_gc_timeline(layers);
// this just updates the remote_physical_size for demonstration purposes
rtc.schedule_gc_update(layers).unwrap();
}
Expand Down Expand Up @@ -216,7 +216,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};

assert_eq!(layers.len(), 1);
Expand Down Expand Up @@ -260,7 +260,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// the deletion of the layer in remote_storage happens.
{
let mut layers = timeline.layers.write().await;
layers.finish_gc_timeline(&[layer]);
layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
}

SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
Expand Down Expand Up @@ -301,7 +301,7 @@ fn read_wins_pending_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};

assert_eq!(layers.len(), 1);
Expand Down Expand Up @@ -433,7 +433,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};

assert_eq!(layers.len(), 1);
Expand Down Expand Up @@ -602,7 +602,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};

assert_eq!(layers.len(), 1);
Expand Down Expand Up @@ -682,7 +682,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};

assert_eq!(layers.len(), 1);
Expand Down Expand Up @@ -801,9 +801,9 @@ async fn eviction_cancellation_on_drop() {
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
// remove the layers from layermap
guard.finish_gc_timeline(&layers);
guard.open_mut().unwrap().finish_gc_timeline(&layers);

layers
};
Expand Down
Loading

1 comment on commit fc78774

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2112 tests run: 2043 passed, 0 failed, 69 skipped (full report)


Flaky tests (1)

Postgres 14

  • test_ondemand_wal_download_in_replication_slot_funcs: release

Test coverage report is not available

The comment gets automatically updated with the latest test results
fc78774 at 2024-08-07T15:37:09.310Z :recycle:

Please sign in to comment.