Skip to content

Commit

Permalink
safekeeper: don't pass conf into storage constructors (#9523)
Browse files Browse the repository at this point in the history
## Problem

The storage components take an entire `SafekeeperConf` during
construction, but only actually use the `no_sync` field. This makes it
hard to understand the storage inputs (which fields do they actually
care about?), and is also inconvenient for tests and benchmarks that
need to set up a lot of unnecessary boilerplate.

## Summary of changes

* Don't take the entire config, but pass in the `no_sync` field
explicitly.
* Take the timeline dir instead of `ttid` as an input, since it's the
only thing it cares about.
* Fix a couple of tests to not leak tempdirs.
* Various minor tweaks.
  • Loading branch information
erikgrinaker authored Oct 25, 2024
1 parent 9909551 commit b54b632
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 109 deletions.
136 changes: 45 additions & 91 deletions safekeeper/src/control_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ use std::path::Path;
use std::time::Instant;

use crate::control_file_upgrade::downgrade_v9_to_v8;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::state::{EvictionState, TimelinePersistentState};
use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
use utils::{bin_ser::LeSer, id::TenantTimelineId};

use crate::SafeKeeperConf;
use utils::bin_ser::LeSer;

pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 9;
Expand Down Expand Up @@ -54,13 +52,12 @@ pub struct FileStorage {

impl FileStorage {
/// Initialize storage by loading state from disk.
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
let timeline_dir = get_timeline_dir(conf, ttid);
let state = Self::load_control_file_from_dir(&timeline_dir)?;
pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
let state = Self::load_control_file_from_dir(timeline_dir)?;

Ok(FileStorage {
timeline_dir,
no_sync: conf.no_sync,
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
state,
last_persist_at: Instant::now(),
})
Expand All @@ -71,16 +68,16 @@ impl FileStorage {
/// Note: we normally call this in temp directory for atomic init, so
/// interested in FileStorage as a result only in tests.
pub async fn create_new(
dir: Utf8PathBuf,
conf: &SafeKeeperConf,
timeline_dir: &Utf8Path,
state: TimelinePersistentState,
no_sync: bool,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));

let mut store = FileStorage {
timeline_dir: dir,
no_sync: conf.no_sync,
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
state: state.clone(),
last_persist_at: Instant::now(),
};
Expand Down Expand Up @@ -239,89 +236,46 @@ mod test {
use tokio::fs;
use utils::lsn::Lsn;

fn stub_conf() -> SafeKeeperConf {
let workdir = camino_tempfile::tempdir().unwrap().into_path();
SafeKeeperConf {
workdir,
..SafeKeeperConf::dummy()
}
}

async fn load_from_control_file(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
Ok((
FileStorage::restore_new(ttid, conf)?,
FileStorage::load_control_file_from_dir(&timeline_dir)?,
))
}

async fn create(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
let state = TimelinePersistentState::empty();
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
Ok((storage, state))
}
const NO_SYNC: bool = true;

#[tokio::test]
async fn test_read_write_safekeeper_state() {
let conf = stub_conf();
let ttid = TenantTimelineId::generate();
{
let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to create state");
// change something
state.commit_lsn = Lsn(42);
storage
.persist(&state)
.await
.expect("failed to persist state");
}

let (_, state) = load_from_control_file(&conf, &ttid)
.await
.expect("failed to read state");
assert_eq!(state.commit_lsn, Lsn(42));
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;

// Make a change.
state.commit_lsn = Lsn(42);
storage.persist(&state).await?;

// Reload the state. It should match the previously persisted state.
let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
assert_eq!(loaded_state, state);
Ok(())
}

#[tokio::test]
async fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf();
let ttid = TenantTimelineId::generate();
{
let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to read state");

// change something
state.commit_lsn = Lsn(42);
storage
.persist(&state)
.await
.expect("failed to persist state");
}
let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).await.unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data)
.await
.expect("failed to write control file");

match load_from_control_file(&conf, &ttid).await {
Err(err) => assert!(err
.to_string()
.contains("safekeeper control file checksum mismatch")),
Ok(_) => panic!("expected error"),
async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;

// Make a change.
state.commit_lsn = Lsn(42);
storage.persist(&state).await?;

// Change the first byte to fail checksum validation.
let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
let mut data = fs::read(&ctrl_path).await?;
data[0] += 1;
fs::write(&ctrl_path, &data).await?;

// Loading the file should fail checksum validation.
if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
assert!(err.to_string().contains("control file checksum mismatch"))
} else {
panic!("expected checksum error")
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
new_state.peer_horizon_lsn = request.until_lsn;
new_state.backup_lsn = new_backup_lsn;

FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;

// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
Expand Down
1 change: 1 addition & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl SafeKeeperConf {

impl SafeKeeperConf {
#[cfg(test)]
#[allow(unused)]
fn dummy() -> Self {
SafeKeeperConf {
workdir: Utf8PathBuf::from("./"),
Expand Down
14 changes: 9 additions & 5 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,19 @@ impl SharedState {
/// Restore SharedState from control file. If file doesn't exist, bails out.
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
let timeline_dir = get_timeline_dir(conf, ttid);
let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
if control_store.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}

let sk = match control_store.eviction_state {
EvictionState::Present => {
let wal_store =
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
let wal_store = wal_storage::PhysicalStorage::new(
ttid,
&timeline_dir,
&control_store,
conf.no_sync,
)?;
StateSK::Loaded(SafeKeeper::new(
TimelineState::new(control_store),
wal_store,
Expand Down Expand Up @@ -1046,9 +1050,9 @@ impl ManagerTimeline {
// trying to restore WAL storage
let wal_store = wal_storage::PhysicalStorage::new(
&self.ttid,
self.timeline_dir.clone(),
&conf,
&self.timeline_dir,
shared.sk.state(),
conf.no_sync,
)?;

// updating control file
Expand Down
4 changes: 2 additions & 2 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl GlobalTimelines {
// immediately initialize first WAL segment as well.
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline)
}
Expand Down Expand Up @@ -596,7 +596,7 @@ pub async fn validate_temp_timeline(
bail!("wal_seg_size is not set");
}

let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;

let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
Expand Down
19 changes: 9 additions & 10 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::metrics::{
};
use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path};
use crate::SafeKeeperConf;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
Expand Down Expand Up @@ -87,7 +86,9 @@ pub trait Storage {
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
timeline_dir: Utf8PathBuf,
conf: SafeKeeperConf,

/// Disables fsync if true.
no_sync: bool,

/// Size of WAL segment in bytes.
wal_seg_size: usize,
Expand Down Expand Up @@ -151,9 +152,9 @@ impl PhysicalStorage {
/// the disk. Otherwise, all LSNs are set to zero.
pub fn new(
ttid: &TenantTimelineId,
timeline_dir: Utf8PathBuf,
conf: &SafeKeeperConf,
timeline_dir: &Utf8Path,
state: &TimelinePersistentState,
no_sync: bool,
) -> Result<PhysicalStorage> {
let wal_seg_size = state.server.wal_seg_size as usize;

Expand Down Expand Up @@ -198,8 +199,8 @@ impl PhysicalStorage {

Ok(PhysicalStorage {
metrics: WalStorageMetrics::default(),
timeline_dir,
conf: conf.clone(),
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
wal_seg_size,
pg_version: state.server.pg_version,
system_id: state.server.system_id,
Expand All @@ -224,7 +225,7 @@ impl PhysicalStorage {

/// Call fdatasync if config requires so.
async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
if !self.conf.no_sync {
if !self.no_sync {
self.metrics
.observe_flush_seconds(time_io_closure(file.sync_data()).await?);
}
Expand Down Expand Up @@ -263,9 +264,7 @@ impl PhysicalStorage {

// Note: this doesn't get into observe_flush_seconds metric. But
// segment init should be separate metric, if any.
if let Err(e) =
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
{
if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
// Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it.
remove_file(wal_file_partial_path)
Expand Down

1 comment on commit b54b632

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5256 tests run: 5040 passed, 1 failed, 215 skipped (full report)


Failures on Postgres 17

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pageserver_metrics_removed_after_detach[release-pg17]"

Test coverage report is not available

The comment gets automatically updated with the latest test results
b54b632 at 2024-10-25T18:23:00.829Z :recycle:

Please sign in to comment.