From 7024e7872313b68416be987bf52ae52096174a77 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 15 May 2024 14:59:50 -0400 Subject: [PATCH 1/6] feat(pageserver): generate image layers for sparse keyspace Signed-off-by: Alex Chi Z --- pageserver/src/context.rs | 5 + pageserver/src/tenant.rs | 31 ++- pageserver/src/tenant/storage_layer.rs | 13 +- pageserver/src/tenant/timeline.rs | 240 ++++++++++++------- pageserver/src/tenant/timeline/compaction.rs | 35 +-- 5 files changed, 204 insertions(+), 120 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 86d0390c30b1..8eadd4b0c8e2 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -85,6 +85,7 @@ //! The solution is that all code paths are infected with precisely one //! [`RequestContext`] argument. Functions in the middle of the call chain //! only need to pass it on. +use std::sync::atomic::AtomicUsize; use crate::task_mgr::TaskKind; @@ -98,6 +99,8 @@ pub struct RequestContext { access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32, + /// Total number of delta layer files processed in this request. + pub vectored_access_delta_file_cnt: AtomicUsize, } /// The kind of access to the page cache. @@ -154,6 +157,7 @@ impl RequestContextBuilder { access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, micros_spent_throttled: Default::default(), + vectored_access_delta_file_cnt: AtomicUsize::new(0), }, } } @@ -168,6 +172,7 @@ impl RequestContextBuilder { access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, micros_spent_throttled: Default::default(), + vectored_access_delta_file_cnt: original.vectored_access_delta_file_cnt.clone(), }, } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 80d354d79e18..450295e85219 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5534,7 +5534,7 @@ mod tests { .await?; const NUM_KEYS: usize = 1000; - const STEP: usize = 100; // random update + scan base_key + idx * STEP + const STEP: usize = 10000; // random update + scan base_key + idx * STEP let cancel = CancellationToken::new(); @@ -5567,7 +5567,7 @@ mod tests { let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32)); - for _ in 0..10 { + for iter in 0..=10 { // Read all the blocks for (blknum, last_lsn) in updated.iter().enumerate() { test_key.field6 = (blknum * STEP) as u32; @@ -5618,12 +5618,27 @@ mod tests { updated[blknum] = lsn; } - // Perform a cycle of flush, compact, and GC - tline.freeze_and_flush().await?; - tline.compact(&cancel, EnumSet::empty(), &ctx).await?; - tenant - .gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx) - .await?; + // Perform two cycles of flush, compact, and GC + for round in 0..2 { + tline.freeze_and_flush().await?; + tline + .compact( + &cancel, + if iter % 5 == 0 && round == 0 { + let mut flags = EnumSet::new(); + flags.insert(CompactFlags::ForceImageLayerCreation); + flags.insert(CompactFlags::ForceRepartition); + flags + } else { + EnumSet::empty() + }, + &ctx, + ) + .await?; + tenant + .gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx) + .await?; + } } Ok(()) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 94a5e9ec47af..aba773ab4b50 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -119,6 +119,7 @@ pub(crate) struct ValuesReconstructState { keys_done: KeySpaceRandomAccum, layers_visited: u32, + delta_layers_visited: u32, } impl ValuesReconstructState { @@ -127,6 +128,7 @@ impl ValuesReconstructState { keys: HashMap::new(), keys_done: KeySpaceRandomAccum::new(), layers_visited: 0, + delta_layers_visited: 0, } } @@ -140,8 +142,17 @@ impl ValuesReconstructState { } } - pub(crate) fn on_layer_visited(&mut self) { + pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) { self.layers_visited += 1; + if let ReadableLayer::PersistentLayer(layer) = layer { + if layer.layer_desc().is_delta() { + self.delta_layer_visited += 1; + } + } + } + + pub(crate) fn get_delta_layers_visited(&self) -> usize { + self.delta_layers_visited } pub(crate) fn get_layers_visited(&self) -> u32 { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9ee24a4ff0d4..08fa6e0d795a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -18,8 +18,8 @@ use fail::fail_point; use once_cell::sync::Lazy; use pageserver_api::{ key::{ - AUX_FILES_KEY, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE, - NON_INHERITED_SPARSE_RANGE, + AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, + NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE, }, keyspace::{KeySpaceAccum, SparseKeyPartitioning}, models::{ @@ -1210,6 +1210,10 @@ impl Timeline { .start_timer(); let mut results: BTreeMap> = BTreeMap::new(); let layers_visited = reconstruct_state.get_layers_visited(); + ctx.vectored_access_delta_file_cnt.fetch_add( + reconstruct_state.get_delta_layers_visited(), + AtomicOrdering::SeqCst, + ); for (key, res) in reconstruct_state.keys { match res { Err(err) => { @@ -3463,7 +3467,7 @@ impl Timeline { unmapped_keyspace = keyspace_to_read; cont_lsn = next_cont_lsn; - reconstruct_state.on_layer_visited(); + reconstruct_state.on_layer_visited(&layer_to_read); } else { break; } @@ -4196,19 +4200,17 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; - - if partition.overlaps(&Key::metadata_key_range()) { - // TODO(chi): The next patch will correctly create image layers for metadata keys, and it would be a - // rather big change. Keep this patch small for now. - match mode { - ImageLayerCreationMode::Force | ImageLayerCreationMode::Try => { - // skip image layer creation anyways for metadata keys. - start = img_range.end; - continue; - } - ImageLayerCreationMode::Initial => { - return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers"))); - } + let compact_metadata = partition.overlaps(&Key::metadata_key_range()); + if compact_metadata { + for range in &partition.ranges { + assert!( + range.start.field1 >= METADATA_KEY_BEGIN_PREFIX + && range.end.field1 <= METADATA_KEY_END_PREFIX, + "metadata keys must be partitioned separately" + ); + } + if mode == ImageLayerCreationMode::Initial { + return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers"))); } } else if let ImageLayerCreationMode::Try = mode { // check_for_image_layers = false -> skip @@ -4217,7 +4219,7 @@ impl Timeline { start = img_range.end; continue; } - } + }; let mut image_layer_writer = ImageLayerWriter::new( self.conf, @@ -4235,87 +4237,147 @@ impl Timeline { ))) }); - let mut wrote_keys = false; - - let mut key_request_accum = KeySpaceAccum::new(); - for range in &partition.ranges { - let mut key = range.start; - while key < range.end { - // Decide whether to retain this key: usually we do, but sharded tenants may - // need to drop keys that don't belong to them. If we retain the key, add it - // to `key_request_accum` for later issuing a vectored get - if self.shard_identity.is_key_disposable(&key) { - debug!( - "Dropping key {} during compaction (it belongs on shard {:?})", - key, - self.shard_identity.get_shard_number(&key) - ); - } else { - key_request_accum.add_key(key); - } + if !compact_metadata { + let mut wrote_keys = false; + + let mut key_request_accum = KeySpaceAccum::new(); + for range in &partition.ranges { + let mut key = range.start; + while key < range.end { + // Decide whether to retain this key: usually we do, but sharded tenants may + // need to drop keys that don't belong to them. If we retain the key, add it + // to `key_request_accum` for later issuing a vectored get + if self.shard_identity.is_key_disposable(&key) { + debug!( + "Dropping key {} during compaction (it belongs on shard {:?})", + key, + self.shard_identity.get_shard_number(&key) + ); + } else { + key_request_accum.add_key(key); + } - let last_key_in_range = key.next() == range.end; - key = key.next(); + let last_key_in_range = key.next() == range.end; + key = key.next(); - // Maybe flush `key_rest_accum` - if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS - || (last_key_in_range && key_request_accum.raw_size() > 0) - { - let results = self - .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) - .await?; - - for (img_key, img) in results { - let img = match img { - Ok(img) => img, - Err(err) => { - // If we fail to reconstruct a VM or FSM page, we can zero the - // page without losing any actual user data. That seems better - // than failing repeatedly and getting stuck. - // - // We had a bug at one point, where we truncated the FSM and VM - // in the pageserver, but the Postgres didn't know about that - // and continued to generate incremental WAL records for pages - // that didn't exist in the pageserver. Trying to replay those - // WAL records failed to find the previous image of the page. - // This special case allows us to recover from that situation. - // See https://github.com/neondatabase/neon/issues/2601. - // - // Unfortunately we cannot do this for the main fork, or for - // any metadata keys, keys, as that would lead to actual data - // loss. - if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key) - { - warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}"); - ZERO_PAGE.clone() - } else { - return Err(CreateImageLayersError::PageReconstructError( - err, - )); + // Maybe flush `key_rest_accum` + if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS + || (last_key_in_range && key_request_accum.raw_size() > 0) + { + let results = self + .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) + .await?; + + for (img_key, img) in results { + let img = match img { + Ok(img) => img, + Err(err) => { + // If we fail to reconstruct a VM or FSM page, we can zero the + // page without losing any actual user data. That seems better + // than failing repeatedly and getting stuck. + // + // We had a bug at one point, where we truncated the FSM and VM + // in the pageserver, but the Postgres didn't know about that + // and continued to generate incremental WAL records for pages + // that didn't exist in the pageserver. Trying to replay those + // WAL records failed to find the previous image of the page. + // This special case allows us to recover from that situation. + // See https://github.com/neondatabase/neon/issues/2601. + // + // Unfortunately we cannot do this for the main fork, or for + // any metadata keys, keys, as that would lead to actual data + // loss. + if is_rel_fsm_block_key(img_key) + || is_rel_vm_block_key(img_key) + { + warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}"); + ZERO_PAGE.clone() + } else { + return Err( + CreateImageLayersError::PageReconstructError(err), + ); + } } - } - }; + }; - // Write all the keys we just read into our new image layer. - image_layer_writer.put_image(img_key, img, ctx).await?; - wrote_keys = true; + // Write all the keys we just read into our new image layer. + image_layer_writer.put_image(img_key, img, ctx).await?; + wrote_keys = true; + } } } } - } - if wrote_keys { - // Normal path: we have written some data into the new image layer for this - // partition, so flush it to disk. - start = img_range.end; - let image_layer = image_layer_writer.finish(self, ctx).await?; - image_layers.push(image_layer); + if wrote_keys { + // Normal path: we have written some data into the new image layer for this + // partition, so flush it to disk. + start = img_range.end; + let image_layer = image_layer_writer.finish(self, ctx).await?; + image_layers.push(image_layer); + } else { + // Special case: the image layer may be empty if this is a sharded tenant and the + // partition does not cover any keys owned by this shard. In this case, to ensure + // we don't leave gaps between image layers, leave `start` where it is, so that the next + // layer we write will cover the key range that we just scanned. + tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + } } else { - // Special case: the image layer may be empty if this is a sharded tenant and the - // partition does not cover any keys owned by this shard. In this case, to ensure - // we don't leave gaps between image layers, leave `start` where it is, so that the next - // layer we write will cover the key range that we just scanned. - tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + // Metadata keys image layer creation. + let total_kb_reads_begin = ctx + .vectored_access_delta_file_size_kb + .load(AtomicOrdering::SeqCst); + let data = self + .get_vectored_impl( + partition.clone(), + lsn, + ValuesReconstructState::default(), + ctx, + ) + .await?; + let (data, total_kb_retrieved) = { + let mut new_data = BTreeMap::new(); + let mut total_kb_retrieved = 0; + for (k, v) in data { + let v = v.map_err(CreateImageLayersError::PageReconstructError)?; + total_kb_retrieved += KEY_SIZE + v.len(); + new_data.insert(k, v); + } + (new_data, total_kb_retrieved / 1024) + }; + let total_kb_reads = ctx + .vectored_access_delta_file_size_kb + .load(AtomicOrdering::SeqCst) + - total_kb_reads_begin; + // Data too small, or most data are garbage + + let trigger_generation = + total_kb_reads >= 16000 && total_kb_reads >= total_kb_retrieved * 10; + info!("generate image layers for metadata keys: trigger_generation={trigger_generation}, total_kb_reads={total_kb_reads}, total_kb_retrieved={total_kb_retrieved}"); + if !trigger_generation && mode == ImageLayerCreationMode::Try { + start = img_range.end; + continue; + } + let has_keys = !data.is_empty(); + for (k, v) in data { + // Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get + // considers this situation properly. + // if v.is_empty() { + // continue; + // } + + // No need to handle sharding b/c metadata keys are always on the 0-th shard. + + // TODO: split image layers to avoid too large layer files. Too large image files are not handled + // on the normal data path either. + image_layer_writer.put_image(k, v, ctx).await?; + } + start = img_range.end; + if has_keys { + let image_layer = image_layer_writer.finish(self, ctx).await?; + image_layers.push(image_layer); + } else { + tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + } } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 4226bf431e5a..37004d1d0605 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -116,9 +116,13 @@ impl Timeline { // 3. Create new image layers for partitions that have been modified // "enough". - let dense_layers = self + let mut partitioning = dense_partitioning; + partitioning + .parts + .extend(sparse_partitioning.into_dense().parts); + let image_layers = self .create_image_layers( - &dense_partitioning, + &partitioning, lsn, if flags.contains(CompactFlags::ForceImageLayerCreation) { ImageLayerCreationMode::Force @@ -130,24 +134,8 @@ impl Timeline { .await .map_err(anyhow::Error::from)?; - // For now, nothing will be produced... - let sparse_layers = self - .create_image_layers( - &sparse_partitioning.clone().into_dense(), - lsn, - if flags.contains(CompactFlags::ForceImageLayerCreation) { - ImageLayerCreationMode::Force - } else { - ImageLayerCreationMode::Try - }, - &image_ctx, - ) - .await - .map_err(anyhow::Error::from)?; - assert!(sparse_layers.is_empty()); - - self.upload_new_image_layers(dense_layers)?; - dense_partitioning.parts.len() + self.upload_new_image_layers(image_layers)?; + partitioning.parts.len() } Err(err) => { // no partitioning? This is normal, if the timeline was just created @@ -501,8 +489,11 @@ impl Timeline { for &DeltaEntry { key: next_key, .. } in all_keys.iter() { if let Some(prev_key) = prev { - // just first fast filter - if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { + // just first fast filter, do not create hole entries for metadata keys. The last hole in the + // compaction is the gap between data key and metadata keys. + if next_key.to_i128() - prev_key.to_i128() >= min_hole_range + && !Key::is_metadata_key(&prev_key) + { let key_range = prev_key..next_key; // Measuring hole by just subtraction of i128 representation of key range boundaries // has not so much sense, because largest holes will corresponds field1/field2 changes. From 5af19aa627fd269875e52a40ac52f0d76fb22bc8 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 15 May 2024 15:18:38 -0400 Subject: [PATCH 2/6] split image layer creation into two functions Signed-off-by: Alex Chi Z --- pageserver/src/context.rs | 6 +- pageserver/src/pgdatadir_mapping.rs | 6 +- pageserver/src/tenant/storage_layer.rs | 4 +- pageserver/src/tenant/timeline.rs | 327 ++++++++++++++----------- 4 files changed, 200 insertions(+), 143 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 8eadd4b0c8e2..8c96fa835715 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -172,7 +172,11 @@ impl RequestContextBuilder { access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, micros_spent_throttled: Default::default(), - vectored_access_delta_file_cnt: original.vectored_access_delta_file_cnt.clone(), + vectored_access_delta_file_cnt: AtomicUsize::new( + original + .vectored_access_delta_file_cnt + .load(std::sync::atomic::Ordering::SeqCst), + ), }, } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 1092d64d33fe..4a86b46081a5 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -40,7 +40,11 @@ use utils::bin_ser::DeserializeError; use utils::vec_map::{VecMap, VecMapOrdering}; use utils::{bin_ser::BeSer, lsn::Lsn}; -const MAX_AUX_FILE_DELTAS: usize = 1024; +/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached. +pub const MAX_AUX_FILE_DELTAS: usize = 1024; + +/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached. +pub const MAX_AUX_FILE_V2_DELTAS: usize = 64; #[derive(Debug)] pub enum LsnForTimestamp { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index aba773ab4b50..d51ababcd957 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -146,12 +146,12 @@ impl ValuesReconstructState { self.layers_visited += 1; if let ReadableLayer::PersistentLayer(layer) = layer { if layer.layer_desc().is_delta() { - self.delta_layer_visited += 1; + self.delta_layers_visited += 1; } } } - pub(crate) fn get_delta_layers_visited(&self) -> usize { + pub(crate) fn get_delta_layers_visited(&self) -> u32 { self.delta_layers_visited } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 08fa6e0d795a..22c535a69ba2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -60,7 +60,6 @@ use std::{ ops::ControlFlow, }; -use crate::tenant::timeline::init::LocalLayerFileMetadata; use crate::{ aux_file::AuxFileSizeEstimator, tenant::{ @@ -89,6 +88,9 @@ use crate::{ metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize, }; use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind}; +use crate::{ + pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::timeline::init::LocalLayerFileMetadata, +}; use crate::{ pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind}, virtual_file::{MaybeFatalIo, VirtualFile}, @@ -1211,7 +1213,7 @@ impl Timeline { let mut results: BTreeMap> = BTreeMap::new(); let layers_visited = reconstruct_state.get_layers_visited(); ctx.vectored_access_delta_file_cnt.fetch_add( - reconstruct_state.get_delta_layers_visited(), + reconstruct_state.get_delta_layers_visited() as usize, AtomicOrdering::SeqCst, ); for (key, res) in reconstruct_state.keys { @@ -4159,6 +4161,169 @@ impl Timeline { false } + async fn create_image_layers_for_rel_blocks( + self: &Arc, + partition: &KeySpace, + mut image_layer_writer: ImageLayerWriter, + lsn: Lsn, + ctx: &RequestContext, + img_range: Range, + start: &mut Key, + ) -> Result, CreateImageLayersError> { + let mut wrote_keys = false; + + let mut key_request_accum = KeySpaceAccum::new(); + for range in &partition.ranges { + let mut key = range.start; + while key < range.end { + // Decide whether to retain this key: usually we do, but sharded tenants may + // need to drop keys that don't belong to them. If we retain the key, add it + // to `key_request_accum` for later issuing a vectored get + if self.shard_identity.is_key_disposable(&key) { + debug!( + "Dropping key {} during compaction (it belongs on shard {:?})", + key, + self.shard_identity.get_shard_number(&key) + ); + } else { + key_request_accum.add_key(key); + } + + let last_key_in_range = key.next() == range.end; + key = key.next(); + + // Maybe flush `key_rest_accum` + if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS + || (last_key_in_range && key_request_accum.raw_size() > 0) + { + let results = self + .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) + .await?; + + for (img_key, img) in results { + let img = match img { + Ok(img) => img, + Err(err) => { + // If we fail to reconstruct a VM or FSM page, we can zero the + // page without losing any actual user data. That seems better + // than failing repeatedly and getting stuck. + // + // We had a bug at one point, where we truncated the FSM and VM + // in the pageserver, but the Postgres didn't know about that + // and continued to generate incremental WAL records for pages + // that didn't exist in the pageserver. Trying to replay those + // WAL records failed to find the previous image of the page. + // This special case allows us to recover from that situation. + // See https://github.com/neondatabase/neon/issues/2601. + // + // Unfortunately we cannot do this for the main fork, or for + // any metadata keys, keys, as that would lead to actual data + // loss. + if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key) { + warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}"); + ZERO_PAGE.clone() + } else { + return Err(CreateImageLayersError::PageReconstructError(err)); + } + } + }; + + // Write all the keys we just read into our new image layer. + image_layer_writer.put_image(img_key, img, ctx).await?; + wrote_keys = true; + } + } + } + } + + let mut image_layers = Vec::new(); + + if wrote_keys { + // Normal path: we have written some data into the new image layer for this + // partition, so flush it to disk. + *start = img_range.end; + let image_layer = image_layer_writer.finish(self, ctx).await?; + image_layers.push(image_layer); + } else { + // Special case: the image layer may be empty if this is a sharded tenant and the + // partition does not cover any keys owned by this shard. In this case, to ensure + // we don't leave gaps between image layers, leave `start` where it is, so that the next + // layer we write will cover the key range that we just scanned. + tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + } + + Ok(image_layers) + } + + #[allow(clippy::too_many_arguments)] + async fn create_image_layers_for_metadata_keys( + self: &Arc, + partition: &KeySpace, + mut image_layer_writer: ImageLayerWriter, + lsn: Lsn, + ctx: &RequestContext, + img_range: Range, + start: &mut Key, + mode: ImageLayerCreationMode, + ) -> Result, CreateImageLayersError> { + // Metadata keys image layer creation. + let delta_file_accessed_begin = ctx + .vectored_access_delta_file_cnt + .load(AtomicOrdering::SeqCst); + let data = self + .get_vectored_impl( + partition.clone(), + lsn, + ValuesReconstructState::default(), + ctx, + ) + .await?; + let (data, total_kb_retrieved, total_key_retrieved) = { + let mut new_data = BTreeMap::new(); + let mut total_kb_retrieved = 0; + let mut total_key_retrieved = 0; + for (k, v) in data { + let v = v.map_err(CreateImageLayersError::PageReconstructError)?; + total_kb_retrieved += KEY_SIZE + v.len(); + total_key_retrieved += 1; + new_data.insert(k, v); + } + (new_data, total_kb_retrieved / 1024, total_key_retrieved) + }; + let delta_file_accessed = ctx + .vectored_access_delta_file_cnt + .load(AtomicOrdering::SeqCst) + - delta_file_accessed_begin; + + let trigger_generation = delta_file_accessed >= MAX_AUX_FILE_V2_DELTAS; + info!("generate image layers for metadata keys: trigger_generation={trigger_generation}, delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, total_key_retrieved={total_key_retrieved}"); + if !trigger_generation && mode == ImageLayerCreationMode::Try { + return Ok(Vec::new()); + } + let has_keys = !data.is_empty(); + for (k, v) in data { + // Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get + // considers this situation properly. + // if v.is_empty() { + // continue; + // } + + // No need to handle sharding b/c metadata keys are always on the 0-th shard. + + // TODO: split image layers to avoid too large layer files. Too large image files are not handled + // on the normal data path either. + image_layer_writer.put_image(k, v, ctx).await?; + } + *start = img_range.end; + if has_keys { + let image_layer = image_layer_writer.finish(self, ctx).await?; + Ok(vec![image_layer]) + } else { + tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + Ok(Vec::new()) + } + } + #[tracing::instrument(skip_all, fields(%lsn, %mode))] async fn create_image_layers( self: &Arc, @@ -4221,7 +4386,7 @@ impl Timeline { } }; - let mut image_layer_writer = ImageLayerWriter::new( + let image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, self.tenant_shard_id, @@ -4238,146 +4403,30 @@ impl Timeline { }); if !compact_metadata { - let mut wrote_keys = false; - - let mut key_request_accum = KeySpaceAccum::new(); - for range in &partition.ranges { - let mut key = range.start; - while key < range.end { - // Decide whether to retain this key: usually we do, but sharded tenants may - // need to drop keys that don't belong to them. If we retain the key, add it - // to `key_request_accum` for later issuing a vectored get - if self.shard_identity.is_key_disposable(&key) { - debug!( - "Dropping key {} during compaction (it belongs on shard {:?})", - key, - self.shard_identity.get_shard_number(&key) - ); - } else { - key_request_accum.add_key(key); - } - - let last_key_in_range = key.next() == range.end; - key = key.next(); - - // Maybe flush `key_rest_accum` - if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS - || (last_key_in_range && key_request_accum.raw_size() > 0) - { - let results = self - .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) - .await?; - - for (img_key, img) in results { - let img = match img { - Ok(img) => img, - Err(err) => { - // If we fail to reconstruct a VM or FSM page, we can zero the - // page without losing any actual user data. That seems better - // than failing repeatedly and getting stuck. - // - // We had a bug at one point, where we truncated the FSM and VM - // in the pageserver, but the Postgres didn't know about that - // and continued to generate incremental WAL records for pages - // that didn't exist in the pageserver. Trying to replay those - // WAL records failed to find the previous image of the page. - // This special case allows us to recover from that situation. - // See https://github.com/neondatabase/neon/issues/2601. - // - // Unfortunately we cannot do this for the main fork, or for - // any metadata keys, keys, as that would lead to actual data - // loss. - if is_rel_fsm_block_key(img_key) - || is_rel_vm_block_key(img_key) - { - warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}"); - ZERO_PAGE.clone() - } else { - return Err( - CreateImageLayersError::PageReconstructError(err), - ); - } - } - }; - - // Write all the keys we just read into our new image layer. - image_layer_writer.put_image(img_key, img, ctx).await?; - wrote_keys = true; - } - } - } - } - - if wrote_keys { - // Normal path: we have written some data into the new image layer for this - // partition, so flush it to disk. - start = img_range.end; - let image_layer = image_layer_writer.finish(self, ctx).await?; - image_layers.push(image_layer); - } else { - // Special case: the image layer may be empty if this is a sharded tenant and the - // partition does not cover any keys owned by this shard. In this case, to ensure - // we don't leave gaps between image layers, leave `start` where it is, so that the next - // layer we write will cover the key range that we just scanned. - tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); - } + image_layers.extend( + self.create_image_layers_for_rel_blocks( + partition, + image_layer_writer, + lsn, + ctx, + img_range, + &mut start, + ) + .await?, + ); } else { - // Metadata keys image layer creation. - let total_kb_reads_begin = ctx - .vectored_access_delta_file_size_kb - .load(AtomicOrdering::SeqCst); - let data = self - .get_vectored_impl( - partition.clone(), + image_layers.extend( + self.create_image_layers_for_metadata_keys( + partition, + image_layer_writer, lsn, - ValuesReconstructState::default(), ctx, + img_range, + &mut start, + mode, ) - .await?; - let (data, total_kb_retrieved) = { - let mut new_data = BTreeMap::new(); - let mut total_kb_retrieved = 0; - for (k, v) in data { - let v = v.map_err(CreateImageLayersError::PageReconstructError)?; - total_kb_retrieved += KEY_SIZE + v.len(); - new_data.insert(k, v); - } - (new_data, total_kb_retrieved / 1024) - }; - let total_kb_reads = ctx - .vectored_access_delta_file_size_kb - .load(AtomicOrdering::SeqCst) - - total_kb_reads_begin; - // Data too small, or most data are garbage - - let trigger_generation = - total_kb_reads >= 16000 && total_kb_reads >= total_kb_retrieved * 10; - info!("generate image layers for metadata keys: trigger_generation={trigger_generation}, total_kb_reads={total_kb_reads}, total_kb_retrieved={total_kb_retrieved}"); - if !trigger_generation && mode == ImageLayerCreationMode::Try { - start = img_range.end; - continue; - } - let has_keys = !data.is_empty(); - for (k, v) in data { - // Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get - // considers this situation properly. - // if v.is_empty() { - // continue; - // } - - // No need to handle sharding b/c metadata keys are always on the 0-th shard. - - // TODO: split image layers to avoid too large layer files. Too large image files are not handled - // on the normal data path either. - image_layer_writer.put_image(k, v, ctx).await?; - } - start = img_range.end; - if has_keys { - let image_layer = image_layer_writer.finish(self, ctx).await?; - image_layers.push(image_layer); - } else { - tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); - } + .await?, + ); } } From 0823f86e4408d22a3764b6fd41d2780a1076e9d1 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 16 May 2024 14:17:43 -0400 Subject: [PATCH 3/6] return Option for create image layers Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 22c535a69ba2..4b55b4864e67 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4161,6 +4161,8 @@ impl Timeline { false } + /// Create image layers for Postgres data. Assumes the caller passes a partition that is not too large, + /// so that at most one image layer will be produced from this function. async fn create_image_layers_for_rel_blocks( self: &Arc, partition: &KeySpace, @@ -4169,7 +4171,7 @@ impl Timeline { ctx: &RequestContext, img_range: Range, start: &mut Key, - ) -> Result, CreateImageLayersError> { + ) -> Result, CreateImageLayersError> { let mut wrote_keys = false; let mut key_request_accum = KeySpaceAccum::new(); @@ -4236,25 +4238,25 @@ impl Timeline { } } - let mut image_layers = Vec::new(); - if wrote_keys { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. *start = img_range.end; let image_layer = image_layer_writer.finish(self, ctx).await?; - image_layers.push(image_layer); + Ok(Some(image_layer)) } else { // Special case: the image layer may be empty if this is a sharded tenant and the // partition does not cover any keys owned by this shard. In this case, to ensure // we don't leave gaps between image layers, leave `start` where it is, so that the next // layer we write will cover the key range that we just scanned. tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + Ok(None) } - - Ok(image_layers) } + /// Create an image layer for metadata keys. This function produces one image layer for all metadata + /// keys for now. Because metadata keys cannot exceed basebackup size limit, the image layer for it + /// would not be too large to fit in a single image layer. #[allow(clippy::too_many_arguments)] async fn create_image_layers_for_metadata_keys( self: &Arc, @@ -4265,7 +4267,7 @@ impl Timeline { img_range: Range, start: &mut Key, mode: ImageLayerCreationMode, - ) -> Result, CreateImageLayersError> { + ) -> Result, CreateImageLayersError> { // Metadata keys image layer creation. let delta_file_accessed_begin = ctx .vectored_access_delta_file_cnt @@ -4298,7 +4300,7 @@ impl Timeline { let trigger_generation = delta_file_accessed >= MAX_AUX_FILE_V2_DELTAS; info!("generate image layers for metadata keys: trigger_generation={trigger_generation}, delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, total_key_retrieved={total_key_retrieved}"); if !trigger_generation && mode == ImageLayerCreationMode::Try { - return Ok(Vec::new()); + return Ok(None); } let has_keys = !data.is_empty(); for (k, v) in data { @@ -4317,10 +4319,10 @@ impl Timeline { *start = img_range.end; if has_keys { let image_layer = image_layer_writer.finish(self, ctx).await?; - Ok(vec![image_layer]) + Ok(Some(image_layer)) } else { tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); - Ok(Vec::new()) + Ok(None) } } From 6eaacbf14d8fc3e5c18239296c46d0324fec7f56 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 16 May 2024 14:29:13 -0400 Subject: [PATCH 4/6] add test case on metadata l0 compaction Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 59 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 450295e85219..c254235aba10 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5643,4 +5643,63 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_metadata_compaction_trigger() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_metadata_compaction_trigger")?; + let (tenant, ctx) = harness.load().await; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + + let cancel = CancellationToken::new(); + + let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + base_key.field1 = AUX_KEY_PREFIX; + let test_key = base_key; + let mut lsn = Lsn(0x10); + + for _ in 0..20 { + lsn = Lsn(lsn.0 + 0x10); + let mut writer = tline.writer().await; + writer + .put( + test_key, + lsn, + &Value::Image(test_img(&format!("{} at {}", 0, lsn))), + &ctx, + ) + .await?; + writer.finish_write(lsn); + drop(writer); + tline.freeze_and_flush().await?; // force create a delta layer + } + + let before_num_l0_delta_files = tline + .layers + .read() + .await + .layer_map() + .get_level0_deltas()? + .len(); + + tline.compact(&cancel, EnumSet::empty(), &ctx).await?; + + let after_num_l0_delta_files = tline + .layers + .read() + .await + .layer_map() + .get_level0_deltas()? + .len(); + + assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}"); + + assert_eq!( + tline.get(test_key, lsn, &ctx).await?, + test_img(&format!("{} at {}", 0, lsn)) + ); + + Ok(()) + } } From 26d6c12f5d38dec4e69d8fff885a92a0c3b585be Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 17 May 2024 12:44:47 -0400 Subject: [PATCH 5/6] resolve comments Signed-off-by: Alex Chi Z --- pageserver/src/context.rs | 9 ----- pageserver/src/tenant.rs | 15 ++++--- pageserver/src/tenant/storage_layer.rs | 6 ++- pageserver/src/tenant/timeline.rs | 54 ++++++++++++-------------- 4 files changed, 40 insertions(+), 44 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 8c96fa835715..86d0390c30b1 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -85,7 +85,6 @@ //! The solution is that all code paths are infected with precisely one //! [`RequestContext`] argument. Functions in the middle of the call chain //! only need to pass it on. -use std::sync::atomic::AtomicUsize; use crate::task_mgr::TaskKind; @@ -99,8 +98,6 @@ pub struct RequestContext { access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32, - /// Total number of delta layer files processed in this request. - pub vectored_access_delta_file_cnt: AtomicUsize, } /// The kind of access to the page cache. @@ -157,7 +154,6 @@ impl RequestContextBuilder { access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, micros_spent_throttled: Default::default(), - vectored_access_delta_file_cnt: AtomicUsize::new(0), }, } } @@ -172,11 +168,6 @@ impl RequestContextBuilder { access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, micros_spent_throttled: Default::default(), - vectored_access_delta_file_cnt: AtomicUsize::new( - original - .vectored_access_delta_file_cnt - .load(std::sync::atomic::Ordering::SeqCst), - ), }, } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c254235aba10..451620efe7ea 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4764,7 +4764,12 @@ mod tests { info!("Doing vectored read on {:?}", read); let vectored_res = tline - .get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx) + .get_vectored_impl( + read.clone(), + reads_lsn, + &mut ValuesReconstructState::new(), + &ctx, + ) .await; tline .validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx) @@ -4813,7 +4818,7 @@ mod tests { .get_vectored_impl( aux_keyspace.clone(), read_lsn, - ValuesReconstructState::new(), + &mut ValuesReconstructState::new(), &ctx, ) .await; @@ -4958,7 +4963,7 @@ mod tests { .get_vectored_impl( read.clone(), current_lsn, - ValuesReconstructState::new(), + &mut ValuesReconstructState::new(), &ctx, ) .await?; @@ -5093,7 +5098,7 @@ mod tests { ranges: vec![child_gap_at_key..child_gap_at_key.next()], }, query_lsn, - ValuesReconstructState::new(), + &mut ValuesReconstructState::new(), &ctx, ) .await; @@ -5582,7 +5587,7 @@ mod tests { .get_vectored_impl( keyspace.clone(), lsn, - ValuesReconstructState::default(), + &mut ValuesReconstructState::default(), &ctx, ) .await? diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index d51ababcd957..4c8a5185519e 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -113,11 +113,15 @@ impl From for ValueReconstructState { } } -/// Bag of data accumulated during a vectored get +/// Bag of data accumulated during a vectored get. pub(crate) struct ValuesReconstructState { + /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline` + /// should not expect to get anything from this hashmap. pub(crate) keys: HashMap>, keys_done: KeySpaceRandomAccum, + + // Statistics that are still accessible as a caller of `get_vectored_impl`. layers_visited: u32, delta_layers_visited: u32, } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4b55b4864e67..b9a55d2bb58e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -881,7 +881,7 @@ impl Timeline { } let vectored_res = self - .get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx) + .get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx) .await; if self.conf.validate_vectored_get { @@ -1026,7 +1026,12 @@ impl Timeline { } GetVectoredImpl::Vectored => { let vectored_res = self - .get_vectored_impl(keyspace.clone(), lsn, ValuesReconstructState::new(), ctx) + .get_vectored_impl( + keyspace.clone(), + lsn, + &mut ValuesReconstructState::new(), + ctx, + ) .await; if self.conf.validate_vectored_get { @@ -1114,7 +1119,7 @@ impl Timeline { .get_vectored_impl( keyspace.clone(), lsn, - ValuesReconstructState::default(), + &mut ValuesReconstructState::default(), ctx, ) .await; @@ -1191,7 +1196,7 @@ impl Timeline { &self, keyspace: KeySpace, lsn: Lsn, - mut reconstruct_state: ValuesReconstructState, + reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result>, GetVectoredError> { let get_kind = if keyspace.total_raw_size() == 1 { @@ -1203,7 +1208,7 @@ impl Timeline { let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME .for_get_kind(get_kind) .start_timer(); - self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx) + self.get_vectored_reconstruct_data(keyspace, lsn, reconstruct_state, ctx) .await?; get_data_timer.stop_and_record(); @@ -1212,11 +1217,8 @@ impl Timeline { .start_timer(); let mut results: BTreeMap> = BTreeMap::new(); let layers_visited = reconstruct_state.get_layers_visited(); - ctx.vectored_access_delta_file_cnt.fetch_add( - reconstruct_state.get_delta_layers_visited() as usize, - AtomicOrdering::SeqCst, - ); - for (key, res) in reconstruct_state.keys { + + for (key, res) in std::mem::take(&mut reconstruct_state.keys) { match res { Err(err) => { results.insert(key, Err(err)); @@ -4163,7 +4165,7 @@ impl Timeline { /// Create image layers for Postgres data. Assumes the caller passes a partition that is not too large, /// so that at most one image layer will be produced from this function. - async fn create_image_layers_for_rel_blocks( + async fn create_image_layer_for_rel_blocks( self: &Arc, partition: &KeySpace, mut image_layer_writer: ImageLayerWriter, @@ -4258,7 +4260,7 @@ impl Timeline { /// keys for now. Because metadata keys cannot exceed basebackup size limit, the image layer for it /// would not be too large to fit in a single image layer. #[allow(clippy::too_many_arguments)] - async fn create_image_layers_for_metadata_keys( + async fn create_image_layer_for_metadata_keys( self: &Arc, partition: &KeySpace, mut image_layer_writer: ImageLayerWriter, @@ -4269,16 +4271,9 @@ impl Timeline { mode: ImageLayerCreationMode, ) -> Result, CreateImageLayersError> { // Metadata keys image layer creation. - let delta_file_accessed_begin = ctx - .vectored_access_delta_file_cnt - .load(AtomicOrdering::SeqCst); + let mut reconstruct_state = ValuesReconstructState::default(); let data = self - .get_vectored_impl( - partition.clone(), - lsn, - ValuesReconstructState::default(), - ctx, - ) + .scan(partition.clone(), lsn, &mut reconstruct_state, ctx) .await?; let (data, total_kb_retrieved, total_key_retrieved) = { let mut new_data = BTreeMap::new(); @@ -4292,13 +4287,14 @@ impl Timeline { } (new_data, total_kb_retrieved / 1024, total_key_retrieved) }; - let delta_file_accessed = ctx - .vectored_access_delta_file_cnt - .load(AtomicOrdering::SeqCst) - - delta_file_accessed_begin; + let delta_file_accessed = reconstruct_state.get_delta_layers_visited(); - let trigger_generation = delta_file_accessed >= MAX_AUX_FILE_V2_DELTAS; - info!("generate image layers for metadata keys: trigger_generation={trigger_generation}, delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, total_key_retrieved={total_key_retrieved}"); + let trigger_generation = delta_file_accessed as usize >= MAX_AUX_FILE_V2_DELTAS; + info!( + "generate image layers for metadata keys: trigger_generation={trigger_generation}, \ + delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, \ + total_key_retrieved={total_key_retrieved}" + ); if !trigger_generation && mode == ImageLayerCreationMode::Try { return Ok(None); } @@ -4406,7 +4402,7 @@ impl Timeline { if !compact_metadata { image_layers.extend( - self.create_image_layers_for_rel_blocks( + self.create_image_layer_for_rel_blocks( partition, image_layer_writer, lsn, @@ -4418,7 +4414,7 @@ impl Timeline { ); } else { image_layers.extend( - self.create_image_layers_for_metadata_keys( + self.create_image_layer_for_metadata_keys( partition, image_layer_writer, lsn, From feb2ca45cdeb7173e3302320a797ac02ac99b881 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 17 May 2024 12:56:52 -0400 Subject: [PATCH 6/6] refactor start key modification Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline.rs | 80 ++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b9a55d2bb58e..aa406bc5438c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -780,6 +780,11 @@ pub(crate) enum ShutdownMode { Hard, } +struct ImageLayerCreationOutcome { + image: Option, + next_start_key: Key, +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -4172,8 +4177,8 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, img_range: Range, - start: &mut Key, - ) -> Result, CreateImageLayersError> { + start: Key, + ) -> Result { let mut wrote_keys = false; let mut key_request_accum = KeySpaceAccum::new(); @@ -4243,16 +4248,21 @@ impl Timeline { if wrote_keys { // Normal path: we have written some data into the new image layer for this // partition, so flush it to disk. - *start = img_range.end; let image_layer = image_layer_writer.finish(self, ctx).await?; - Ok(Some(image_layer)) + Ok(ImageLayerCreationOutcome { + image: Some(image_layer), + next_start_key: img_range.end, + }) } else { // Special case: the image layer may be empty if this is a sharded tenant and the // partition does not cover any keys owned by this shard. In this case, to ensure // we don't leave gaps between image layers, leave `start` where it is, so that the next // layer we write will cover the key range that we just scanned. tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); - Ok(None) + Ok(ImageLayerCreationOutcome { + image: None, + next_start_key: start, + }) } } @@ -4267,13 +4277,14 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, img_range: Range, - start: &mut Key, mode: ImageLayerCreationMode, - ) -> Result, CreateImageLayersError> { + ) -> Result { + assert!(!matches!(mode, ImageLayerCreationMode::Initial)); + // Metadata keys image layer creation. let mut reconstruct_state = ValuesReconstructState::default(); let data = self - .scan(partition.clone(), lsn, &mut reconstruct_state, ctx) + .get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx) .await?; let (data, total_kb_retrieved, total_key_retrieved) = { let mut new_data = BTreeMap::new(); @@ -4296,7 +4307,10 @@ impl Timeline { total_key_retrieved={total_key_retrieved}" ); if !trigger_generation && mode == ImageLayerCreationMode::Try { - return Ok(None); + return Ok(ImageLayerCreationOutcome { + image: None, + next_start_key: img_range.end, + }); } let has_keys = !data.is_empty(); for (k, v) in data { @@ -4312,14 +4326,16 @@ impl Timeline { // on the normal data path either. image_layer_writer.put_image(k, v, ctx).await?; } - *start = img_range.end; - if has_keys { - let image_layer = image_layer_writer.finish(self, ctx).await?; - Ok(Some(image_layer)) - } else { - tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); - Ok(None) - } + Ok(ImageLayerCreationOutcome { + image: if has_keys { + let image_layer = image_layer_writer.finish(self, ctx).await?; + Some(image_layer) + } else { + tracing::debug!("no data in range {}-{}", img_range.start, img_range.end); + None + }, + next_start_key: img_range.end, + }) } #[tracing::instrument(skip_all, fields(%lsn, %mode))] @@ -4382,7 +4398,7 @@ impl Timeline { start = img_range.end; continue; } - }; + } let image_layer_writer = ImageLayerWriter::new( self.conf, @@ -4401,30 +4417,38 @@ impl Timeline { }); if !compact_metadata { - image_layers.extend( - self.create_image_layer_for_rel_blocks( + let ImageLayerCreationOutcome { + image, + next_start_key, + } = self + .create_image_layer_for_rel_blocks( partition, image_layer_writer, lsn, ctx, img_range, - &mut start, + start, ) - .await?, - ); + .await?; + + start = next_start_key; + image_layers.extend(image); } else { - image_layers.extend( - self.create_image_layer_for_metadata_keys( + let ImageLayerCreationOutcome { + image, + next_start_key, + } = self + .create_image_layer_for_metadata_keys( partition, image_layer_writer, lsn, ctx, img_range, - &mut start, mode, ) - .await?, - ); + .await?; + start = next_start_key; + image_layers.extend(image); } }