Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): generate image layers for sparse keyspace #7567

Merged
merged 7 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ use utils::bin_ser::DeserializeError;
use utils::vec_map::{VecMap, VecMapOrdering};
use utils::{bin_ser::BeSer, lsn::Lsn};

const MAX_AUX_FILE_DELTAS: usize = 1024;
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
pub const MAX_AUX_FILE_DELTAS: usize = 1024;

/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;

#[derive(Debug)]
pub enum LsnForTimestamp {
Expand Down
103 changes: 91 additions & 12 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4777,7 +4777,12 @@ mod tests {
info!("Doing vectored read on {:?}", read);

let vectored_res = tline
.get_vectored_impl(read.clone(), reads_lsn, ValuesReconstructState::new(), &ctx)
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
Expand Down Expand Up @@ -4826,7 +4831,7 @@ mod tests {
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
ValuesReconstructState::new(),
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
Expand Down Expand Up @@ -4971,7 +4976,7 @@ mod tests {
.get_vectored_impl(
read.clone(),
current_lsn,
ValuesReconstructState::new(),
&mut ValuesReconstructState::new(),
&ctx,
)
.await?;
Expand Down Expand Up @@ -5106,7 +5111,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
ValuesReconstructState::new(),
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
Expand Down Expand Up @@ -5547,7 +5552,7 @@ mod tests {
.await?;

const NUM_KEYS: usize = 1000;
const STEP: usize = 100; // random update + scan base_key + idx * STEP
const STEP: usize = 10000; // random update + scan base_key + idx * STEP

let cancel = CancellationToken::new();

Expand Down Expand Up @@ -5580,7 +5585,7 @@ mod tests {

let keyspace = KeySpace::single(base_key..base_key.add((NUM_KEYS * STEP) as u32));

for _ in 0..10 {
for iter in 0..=10 {
// Read all the blocks
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = (blknum * STEP) as u32;
Expand All @@ -5595,7 +5600,7 @@ mod tests {
.get_vectored_impl(
keyspace.clone(),
lsn,
ValuesReconstructState::default(),
&mut ValuesReconstructState::default(),
&ctx,
)
.await?
Expand Down Expand Up @@ -5631,14 +5636,88 @@ mod tests {
updated[blknum] = lsn;
}

// Perform a cycle of flush, compact, and GC
tline.freeze_and_flush().await?;
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
// Perform two cycles of flush, compact, and GC
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
for round in 0..2 {
tline.freeze_and_flush().await?;
tline
.compact(
&cancel,
if iter % 5 == 0 && round == 0 {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags
} else {
EnumSet::empty()
},
&ctx,
)
.await?;
tenant
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
.await?;
}
}

Ok(())
}

#[tokio::test]
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_compaction_trigger")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;

let cancel = CancellationToken::new();

let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
base_key.field1 = AUX_KEY_PREFIX;
let test_key = base_key;
let mut lsn = Lsn(0x10);

for _ in 0..20 {
lsn = Lsn(lsn.0 + 0x10);
let mut writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", 0, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
tline.freeze_and_flush().await?; // force create a delta layer
}

let before_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()?
.len();

tline.compact(&cancel, EnumSet::empty(), &ctx).await?;

let after_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()?
.len();

assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");

assert_eq!(
tline.get(test_key, lsn, &ctx).await?,
test_img(&format!("{} at {}", 0, lsn))
);

Ok(())
}

Expand Down
19 changes: 17 additions & 2 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,17 @@ impl From<VectoredValueReconstructState> for ValueReconstructState {
}
}

/// Bag of data accumulated during a vectored get
/// Bag of data accumulated during a vectored get.
pub(crate) struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,

keys_done: KeySpaceRandomAccum,

// Statistics that are still accessible as a caller of `get_vectored_impl`.
layers_visited: u32,
delta_layers_visited: u32,
}

impl ValuesReconstructState {
Expand All @@ -127,6 +132,7 @@ impl ValuesReconstructState {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
layers_visited: 0,
delta_layers_visited: 0,
}
}

Expand All @@ -140,8 +146,17 @@ impl ValuesReconstructState {
}
}

pub(crate) fn on_layer_visited(&mut self) {
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
self.layers_visited += 1;
if let ReadableLayer::PersistentLayer(layer) = layer {
if layer.layer_desc().is_delta() {
self.delta_layers_visited += 1;
}
}
}

pub(crate) fn get_delta_layers_visited(&self) -> u32 {
skyzh marked this conversation as resolved.
Show resolved Hide resolved
self.delta_layers_visited
}

pub(crate) fn get_layers_visited(&self) -> u32 {
Expand Down
Loading
Loading