From c05e3f89c568898936cd814143fa92bd4ec42e0d Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sat, 26 Oct 2024 11:06:09 +0000 Subject: [PATCH 1/6] feat: introduce manifest file for fast disk cache clear Signed-off-by: MrCroxx --- foyer-storage/src/device/direct_fs.rs | 4 + foyer-storage/src/lib.rs | 1 + foyer-storage/src/manifest.rs | 238 +++++++++++++++++++++++++ foyer-storage/src/small/batch.rs | 2 +- foyer-storage/src/small/generic.rs | 11 +- foyer-storage/src/small/set_manager.rs | 89 ++------- foyer-storage/src/store.rs | 31 ++++ 7 files changed, 299 insertions(+), 77 deletions(-) create mode 100644 foyer-storage/src/manifest.rs diff --git a/foyer-storage/src/device/direct_fs.rs b/foyer-storage/src/device/direct_fs.rs index 30310a06..9cb0cdf1 100644 --- a/foyer-storage/src/device/direct_fs.rs +++ b/foyer-storage/src/device/direct_fs.rs @@ -39,6 +39,10 @@ pub struct DirectFsDeviceConfig { } impl DirectFsDeviceConfig { + pub fn dir(&self) -> &PathBuf { + &self.dir + } + fn verify(&self) -> Result<()> { if self.file_size == 0 || self.file_size % ALIGN != 0 { return Err(anyhow::anyhow!( diff --git a/foyer-storage/src/lib.rs b/foyer-storage/src/lib.rs index 4d853fba..e588ce26 100644 --- a/foyer-storage/src/lib.rs +++ b/foyer-storage/src/lib.rs @@ -23,6 +23,7 @@ mod engine; mod error; mod io_buffer_pool; mod large; +mod manifest; mod picker; mod region; mod runtime; diff --git a/foyer-storage/src/manifest.rs b/foyer-storage/src/manifest.rs new file mode 100644 index 00000000..083b27e1 --- /dev/null +++ b/foyer-storage/src/manifest.rs @@ -0,0 +1,238 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fs::{File, OpenOptions}, + os::unix::fs::FileExt, + path::Path, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use bytes::{Buf, BufMut}; +use foyer_common::asyncify::asyncify_with_runtime; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock as AsyncRwLock; + +use crate::{ + error::{Error, Result}, + runtime::Runtime, + serde::Checksummer, +}; + +/// Persistent metadata for the disk cache. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +struct Metadata { + watermark: u128, +} + +impl Default for Metadata { + /// If the metadata is corrupted, the watermark is supposed to set as the current timestamp to prevent from + /// accessing stale data. + fn default() -> Self { + Self { + watermark: Self::timestamp(), + } + } +} + +impl Metadata { + /// | magic 8B | checksum 8B | watermark 16B | + const LENGTH: usize = 8 + 8 + 16; + /// | magic 8B | checksum 8B | + const HEADER: usize = 16; + /// magic number for metadata + const MAGIC: u64 = 0x20230512deadbeef; + + fn read(buf: &[u8]) -> Self { + let magic = (&buf[0..8]).get_u64(); + let checksum = (&buf[8..16]).get_u64(); + let watermark = (&buf[16..32]).get_u128(); + + let c = Checksummer::checksum64(&buf[Self::HEADER..Self::LENGTH]); + + if magic != Self::MAGIC || checksum != c { + tracing::warn!( + "[manifest]: manifest magic or checksum mismatch, update the watermark to the current timestamp." + ); + return Self::default(); + } + + Self { watermark } + } + + fn write(&self, buf: &mut [u8]) { + (&mut buf[16..32]).put_u128(self.watermark); + + let checksum = Checksummer::checksum64(&buf[Self::HEADER..Self::LENGTH]); + + (&mut buf[0..8]).put_u64(Self::MAGIC); + (&mut buf[8..16]).put_u64(checksum); + } + + fn timestamp() -> u128 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() + } +} + +/// Manifest file for persistent metadata for the disk cache. +#[derive(Debug, Clone)] +pub struct Manifest { + inner: Arc>, +} + +#[derive(Debug)] +struct ManifestInner { + metadata: Metadata, + + file: Arc, + + flush: bool, + runtime: Runtime, +} + +impl Manifest { + /// Default manifest filename + pub const DEFAULT_FILENAME: &str = "manifest"; + + pub async fn open

(path: P, flush: bool, runtime: Runtime) -> Result + where + P: AsRef + Send + 'static, + { + let file = asyncify_with_runtime(runtime.user(), move || { + let mut opts = OpenOptions::new(); + opts.create(true).read(true).write(true); + opts.open(path) + }) + .await?; + let file = Arc::new(file); + + let f = file.clone(); + let metadata = asyncify_with_runtime(runtime.read(), move || { + let mut buf = [0; Metadata::LENGTH]; + let _ = f.read_exact_at(&mut buf[..], 0); + let metadata = Metadata::read(&buf[..]); + Ok::<_, Error>(metadata) + }) + .await?; + + let inner = ManifestInner { + metadata, + file, + flush, + runtime, + }; + + Ok(Self { + inner: Arc::new(AsyncRwLock::new(inner)), + }) + } + + pub async fn watermark(&self) -> u128 { + self.inner.read().await.metadata.watermark + } + + /// Update watermark and flush. + pub async fn update_watermark(&self, watermark: u128) -> Result<()> { + let mut inner = self.inner.write().await; + + inner.metadata.watermark = watermark; + + let mut buf = [0; Metadata::LENGTH]; + inner.metadata.write(&mut buf[..]); + + let file = inner.file.clone(); + let flush = inner.flush; + asyncify_with_runtime(inner.runtime.write(), move || { + file.write_all_at(&buf[..], 0)?; + if flush { + file.sync_data()?; + } + Ok::<_, Error>(()) + }) + .await?; + + drop(inner); + + Ok(()) + } + + /// Update watermark to latest and flush. + pub async fn update(&self) -> Result<()> { + self.update_watermark(Metadata::timestamp()).await + } + + /// Flush manifest for persistent. + #[expect(unused)] + pub async fn flush(&self) -> Result<()> { + let inner = self.inner.read().await; + + let mut buf = [0; Metadata::LENGTH]; + inner.metadata.write(&mut buf[..]); + + let file = inner.file.clone(); + let flush = inner.flush; + asyncify_with_runtime(inner.runtime.write(), move || { + file.write_all_at(&buf[..], 0)?; + if flush { + file.sync_data()?; + } + Ok::<_, Error>(()) + }) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use tempfile::tempdir; + + use super::*; + + #[test_log::test] + fn test_metadata_serde() { + let mut buf = [0; Metadata::LENGTH]; + let mut metadata = Metadata::read(&buf[..]); + assert!(metadata.watermark > 0); + + metadata.watermark = 0x0123456789abcdef; + metadata.write(&mut buf[..]); + + let m = Metadata::read(&buf[..]); + assert_eq!(metadata, m); + } + + #[test_log::test(tokio::test)] + async fn test_manifest_file() { + let dir = tempdir().unwrap(); + + let manifest = Manifest::open(dir.path().join("manifest"), true, Runtime::current()) + .await + .unwrap(); + + let w = Metadata::timestamp(); + + manifest.update_watermark(w).await.unwrap(); + + let manifest = Manifest::open(dir.path().join("manifest"), true, Runtime::current()) + .await + .unwrap(); + + let watermark = manifest.watermark().await; + + assert_eq!(watermark, w); + } +} diff --git a/foyer-storage/src/small/batch.rs b/foyer-storage/src/small/batch.rs index 30fb3385..32175f67 100644 --- a/foyer-storage/src/small/batch.rs +++ b/foyer-storage/src/small/batch.rs @@ -190,7 +190,7 @@ where } fn sid(&self, hash: u64) -> SetId { - self.set_picker.sid(hash) + self.set_picker.pick(hash) } pub fn is_empty(&self) -> bool { diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index 2f94e6b8..6e2ecb2d 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -30,6 +30,7 @@ use itertools::Itertools; use crate::{ device::{MonitoredDevice, RegionId}, error::Result, + manifest::Manifest, small::{ flusher::{Flusher, Submission}, set_manager::SetManager, @@ -48,6 +49,7 @@ where pub set_cache_capacity: usize, pub set_cache_shards: usize, pub device: MonitoredDevice, + pub manifest: Manifest, pub regions: Range, pub flush: bool, pub flushers: usize, @@ -69,6 +71,7 @@ where .field("set_cache_capacity", &self.set_cache_capacity) .field("set_cache_shards", &self.set_cache_shards) .field("device", &self.device) + .field("manifest", &self.manifest) .field("regions", &self.regions) .field("flush", &self.flush) .field("flushers", &self.flushers) @@ -319,19 +322,25 @@ mod tests { } async fn store_for_test(dir: impl AsRef) -> GenericSmallStorage, RandomState> { + let runtime = Runtime::new(None, None, Handle::current()); + let dir = dir.as_ref(); let device = device_for_test(dir).await; + let manifest = Manifest::open(dir.join(Manifest::DEFAULT_FILENAME), true, runtime.clone()) + .await + .unwrap(); let regions = 0..device.regions() as RegionId; let config = GenericSmallStorageConfig { set_size: ByteSize::kib(4).as_u64() as _, set_cache_capacity: 4, set_cache_shards: 1, device, + manifest, regions, flush: false, flushers: 1, buffer_pool_size: ByteSize::kib(64).as_u64() as _, statistics: Arc::::default(), - runtime: Runtime::new(None, None, Handle::current()), + runtime, marker: PhantomData, }; GenericSmallStorage::open(config).await.unwrap() diff --git a/foyer-storage/src/small/set_manager.rs b/foyer-storage/src/small/set_manager.rs index 46d366b3..acaa79d3 100644 --- a/foyer-storage/src/small/set_manager.rs +++ b/foyer-storage/src/small/set_manager.rs @@ -14,7 +14,6 @@ use std::{collections::HashSet, fmt::Debug, ops::Range, sync::Arc}; -use bytes::{Buf, BufMut}; use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; use itertools::Itertools; use parking_lot::RwLock; @@ -24,13 +23,13 @@ use super::{ batch::Item, bloom_filter::BloomFilterU64, generic::GenericSmallStorageConfig, - set::{SetId, SetStorage, SetTimestamp}, + set::{SetId, SetStorage}, set_cache::SetCache, }; use crate::{ device::{Dev, MonitoredDevice, RegionId}, error::Result, - IoBytesMut, + manifest::Manifest, }; /// # Lock Order @@ -60,11 +59,12 @@ struct SetManagerInner { /// correctness. loose_bloom_filters: Vec>>, set_cache: SetCache, - metadata: AsyncRwLock, set_picker: SetPicker, - set_size: usize, + device: MonitoredDevice, + manifest: Manifest, + regions: Range, flush: bool, } @@ -81,9 +81,9 @@ impl Debug for SetManager { .field("loose_bloom_filters", &self.inner.loose_bloom_filters) .field("set_picker", &self.inner.set_picker) .field("set_cache", &self.inner.set_cache) - .field("metadata", &self.inner.metadata) .field("set_size", &self.inner.set_size) .field("device", &self.inner.device) + .field("manifest", &self.inner.manifest) .field("regions", &self.inner.regions) .field("flush", &self.inner.flush) .finish() @@ -105,11 +105,6 @@ impl SetManager { let set_picker = SetPicker::new(sets); - // load & flush metadata - let metadata = Metadata::load(&device).await?; - metadata.flush(&device).await?; - let metadata = AsyncRwLock::new(metadata); - let set_cache = SetCache::new(config.set_cache_capacity, config.set_cache_shards); let loose_bloom_filters = (0..sets).map(|_| RwLock::new(BloomFilterU64::new())).collect_vec(); @@ -120,9 +115,9 @@ impl SetManager { loose_bloom_filters, set_cache, set_picker, - metadata, set_size: config.set_size, device, + manifest: config.manifest.clone(), regions, flush: config.flush, }; @@ -131,7 +126,7 @@ impl SetManager { } pub fn may_contains(&self, hash: u64) -> bool { - let sid = self.inner.set_picker.sid(hash); + let sid = self.inner.set_picker.pick(hash); self.inner.loose_bloom_filters[sid as usize].read().lookup(hash) } @@ -140,7 +135,7 @@ impl SetManager { K: StorageKey, V: StorageValue, { - let sid = self.inner.set_picker.sid(hash); + let sid = self.inner.set_picker.pick(hash); // Query bloom filter. if !self.inner.loose_bloom_filters[sid as usize].read().lookup(hash) { @@ -207,7 +202,7 @@ impl SetManager { } pub async fn watermark(&self) -> u128 { - self.inner.metadata.read().await.watermark + self.inner.manifest.watermark().await } pub async fn destroy(&self) -> Result<()> { @@ -217,11 +212,7 @@ impl SetManager { } async fn update_watermark(&self) -> Result<()> { - let mut metadata = self.inner.metadata.write().await; - - let watermark = SetTimestamp::current(); - metadata.watermark = watermark; - metadata.flush(&self.inner.device).await + self.inner.manifest.update().await } async fn storage(&self, id: SetId) -> Result { @@ -254,65 +245,13 @@ impl SetPicker { /// Create a [`SetPicker`] with a total size count. /// /// The `sets` should be the count of all sets. - /// - /// Note: - /// - /// The 0th set will be used as the meta set. + pub fn new(sets: usize) -> Self { Self { sets } } - pub fn sid(&self, hash: u64) -> SetId { + pub fn pick(&self, hash: u64) -> SetId { // skip the meta set - hash % (self.sets as SetId - 1) + 1 - } -} - -#[derive(Debug)] -struct Metadata { - /// watermark timestamp - watermark: u128, -} - -impl Default for Metadata { - fn default() -> Self { - Self { - watermark: SetTimestamp::current(), - } - } -} - -impl Metadata { - const MAGIC: u64 = 0x20230512deadbeef; - const SIZE: usize = 8 + 16; - - fn write(&self, mut buf: impl BufMut) { - buf.put_u64(Self::MAGIC); - buf.put_u128(self.watermark); - } - - fn read(mut buf: impl Buf) -> Self { - let magic = buf.get_u64(); - let watermark = buf.get_u128(); - - if magic != Self::MAGIC || watermark > SetTimestamp::current() { - return Self::default(); - } - - Self { watermark } - } - - async fn flush(&self, device: &MonitoredDevice) -> Result<()> { - let mut buf = IoBytesMut::with_capacity(Self::SIZE); - self.write(&mut buf); - let buf = buf.freeze(); - device.write(buf, 0, 0).await?; - Ok(()) - } - - async fn load(device: &MonitoredDevice) -> Result { - let buf = device.read(0, 0, Metadata::SIZE).await?; - let metadata = Metadata::read(&buf[..Metadata::SIZE]); - Ok(metadata) + hash % (self.sets as SetId) } } diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index f2379c02..d6ba540a 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -16,6 +16,7 @@ use std::{ fmt::{Debug, Display}, hash::Hash, marker::PhantomData, + path::{Path, PathBuf}, str::FromStr, sync::Arc, time::Instant, @@ -42,6 +43,7 @@ use crate::{ engine::{EngineConfig, EngineEnum, SizeSelector}, error::{Error, Result}, large::{generic::GenericLargeStorageConfig, recover::RecoverMode, tombstone::TombstoneLogConfig}, + manifest::Manifest, picker::{ utils::{AdmitAllPicker, FifoPicker, InvalidRatioPicker, RejectAllPicker}, AdmissionPicker, EvictionPicker, ReinsertionPicker, @@ -348,6 +350,8 @@ where engine: Engine, runtime_config: RuntimeOptions, + manifest_file_path: Option, + admission_picker: Arc>, compression: Compression, recover_mode: RecoverMode, @@ -377,6 +381,8 @@ where engine, runtime_config: RuntimeOptions::Disabled, + manifest_file_path: None, + admission_picker: Arc::>::default(), compression: Compression::None, recover_mode: RecoverMode::Quiet, @@ -403,6 +409,18 @@ where self } + /// Set the path for the manifest file. + /// + /// The manifest file is used to tracking the watermark for fast disk cache clearing. + /// + /// When creating a disk cache on a fs, it is not required to set the manifest file path. + /// + /// When creating a disk cache on a raw device, it is required to set the manifest file path. + pub fn with_manifest_file_path(mut self, path: impl AsRef) -> Self { + self.manifest_file_path = Some(path.as_ref().into()); + self + } + /// Enable/disable `sync` after writes. /// /// Default: `false`. @@ -514,6 +532,17 @@ where }; let runtime = Runtime::new(read_runtime, write_runtime, user_runtime_handle); + let manifest_file_path = match &self.device_options { + DeviceOptions::None => "phantom".into(), + DeviceOptions::DeviceConfig(DeviceConfig::DirectFile(_)) => self + .manifest_file_path + .expect("manifest file path must be set when using a direct file device for disk cache"), + DeviceOptions::DeviceConfig(DeviceConfig::DirectFs(config)) => self + .manifest_file_path + .unwrap_or_else(|| config.dir().join(Manifest::DEFAULT_FILENAME)), + }; + let manifest = Manifest::open(manifest_file_path, self.flush, runtime.clone()).await?; + let engine = { let statistics = statistics.clone(); let metrics = metrics.clone(); @@ -569,6 +598,7 @@ where set_cache_capacity: self.small.set_cache_capacity, set_cache_shards: self.small.set_cache_shards, device, + manifest, regions, flush: self.flush, flushers: self.small.flushers, @@ -590,6 +620,7 @@ where set_cache_capacity: self.small.set_cache_capacity, set_cache_shards: self.small.set_cache_shards, device: device.clone(), + manifest, regions: small_regions, flush: self.flush, flushers: self.small.flushers, From 602d47d4989a6358fdc42efa777b86fbb26702a9 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sat, 26 Oct 2024 14:32:48 +0000 Subject: [PATCH 2/6] feat: use manifest to clear both large & small disk cache Signed-off-by: MrCroxx --- foyer-storage/src/large/batch.rs | 14 ++-- foyer-storage/src/large/generic.rs | 51 ++++++++------ foyer-storage/src/large/reclaimer.rs | 11 +++ foyer-storage/src/large/recover.rs | 11 ++- foyer-storage/src/manifest.rs | 95 ++++++++++++++------------ foyer-storage/src/region.rs | 5 +- foyer-storage/src/small/set_manager.rs | 9 ++- foyer-storage/src/store.rs | 4 +- 8 files changed, 128 insertions(+), 72 deletions(-) diff --git a/foyer-storage/src/large/batch.rs b/foyer-storage/src/large/batch.rs index 5adcea96..493c088f 100644 --- a/foyer-storage/src/large/batch.rs +++ b/foyer-storage/src/large/batch.rs @@ -111,17 +111,19 @@ where } pub fn entry(&mut self, entry: CacheEntry, compression: &Compression, sequence: Sequence) -> bool { - tracing::trace!("[batch]: append entry with sequence: {sequence}"); + tracing::trace!("[lodc batch]: append entry with sequence: {sequence}"); self.may_init(); if entry.is_outdated() { + tracing::trace!("[lodc batch]: skip outdated entry"); return false; } let pos = self.len; if pos + EntryHeader::serialized_len() >= self.buffer.len() { + tracing::trace!("[lodc batch]: entry out of buffer capacity, skip"); // Only handle start position overflow. End position overflow will be handled by serde. return false; } @@ -157,7 +159,7 @@ where self.advance(aligned); let group = self.groups.last_mut().unwrap(); - group.indices.push(HashedEntryAddress { + let addr = HashedEntryAddress { hash: entry.hash(), address: EntryAddress { region: RegionId::MAX, @@ -165,7 +167,9 @@ where len: header.entry_len() as _, sequence, }, - }); + }; + tracing::trace!("[lodc batch]: entry addr: {addr:?}"); + group.indices.push(addr); group.entries.push(entry); group.region.len += aligned; group.range.end += aligned; @@ -225,6 +229,8 @@ where } pub fn rotate(&mut self) -> Option> { + tracing::trace!("[lodc batch]: rotate"); + if self.is_empty() { return None; } @@ -351,7 +357,7 @@ impl Debug for RegionHandle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RegionHandle") .field("offset", &self.offset) - .field("size", &self.len) + .field("len", &self.len) .field("is_full", &self.is_full) .finish() } diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index 047d96a7..f8408b70 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -46,10 +46,10 @@ use crate::{ device::{monitor::DeviceStats, Dev, DevExt, MonitoredDevice, RegionId}, error::{Error, Result}, large::{ - reclaimer::RegionCleaner, serde::{AtomicSequence, EntryHeader}, tombstone::{Tombstone, TombstoneLog, TombstoneLogConfig}, }, + manifest::Manifest, picker::{EvictionPicker, ReinsertionPicker}, region::RegionManager, runtime::Runtime, @@ -66,6 +66,7 @@ where { pub name: String, pub device: MonitoredDevice, + pub manifest: Manifest, pub regions: Range, pub compression: Compression, pub flush: bool, @@ -95,6 +96,7 @@ where f.debug_struct("GenericStoreConfig") .field("name", &self.name) .field("device", &self.device) + .field("manifest", &self.manifest) .field("compression", &self.compression) .field("flush", &self.flush) .field("indexer_shards", &self.indexer_shards) @@ -142,7 +144,8 @@ where { indexer: Indexer, device: MonitoredDevice, - region_manager: RegionManager, + manifest: Manifest, + _region_manager: RegionManager, flushers: Vec>, reclaimers: Vec, @@ -152,8 +155,6 @@ where statistics: Arc, - flush: bool, - sequence: AtomicSequence, runtime: Runtime, @@ -249,6 +250,7 @@ where let reclaimers = join_all((0..config.reclaimers).map(|_| async { Reclaimer::open( + config.manifest.clone(), region_manager.clone(), reclaim_semaphore.clone(), config.reinsertion_picker.clone(), @@ -266,13 +268,13 @@ where inner: Arc::new(GenericStoreInner { indexer, device, - region_manager, + manifest: config.manifest, + _region_manager: region_manager, flushers, reclaimers, submit_queue_size, submit_queue_size_threshold: config.submit_queue_size_threshold, statistics: stats, - flush: config.flush, sequence, runtime: config.runtime, active: AtomicBool::new(true), @@ -430,6 +432,8 @@ where tombstone: Tombstone { hash: 0, sequence }, stats: None, }); + + // Wait all inflight data to finish. self.wait().await; // Clear indices. @@ -438,16 +442,11 @@ where // otherwise the indices of the latest batch cannot be cleared. self.inner.indexer.clear(); - // Clean regions. - try_join_all((0..self.inner.region_manager.regions() as RegionId).map(|id| { - let region = self.inner.region_manager.region(id).clone(); - async move { - let res = RegionCleaner::clean(®ion, self.inner.flush).await; - region.stats().reset(); - res - } - })) - .await?; + // Update manifest watermark to prevent stale regions to be read in future. + self.inner + .manifest + .update_sequence_watermark(self.inner.sequence.fetch_add(1, Ordering::Relaxed)) + .await?; Ok(()) } @@ -534,8 +533,8 @@ mod tests { Monitored::open( MonitoredConfig { config: DirectFsDeviceOptions::new(dir) - .with_capacity(ByteSize::kib(64).as_u64() as _) - .with_file_size(ByteSize::kib(16).as_u64() as _) + .with_capacity(ByteSize::kib(80).as_u64() as _) + .with_file_size(ByteSize::kib(20).as_u64() as _) .into(), metrics: Arc::new(Metrics::new("test")), }, @@ -554,11 +553,18 @@ mod tests { dir: impl AsRef, reinsertion_picker: Arc>, ) -> GenericLargeStorage, RandomState> { + let dir = dir.as_ref(); + let runtime = Runtime::new(None, None, Handle::current()); let device = device_for_test(dir).await; + let manifest = Manifest::open(dir.join(Manifest::DEFAULT_FILENAME), true, runtime.clone()) + .await + .unwrap(); + let regions = 0..device.regions() as RegionId; let config = GenericLargeStorageConfig { name: "test".to_string(), device, + manifest, regions, compression: Compression::None, flush: true, @@ -574,7 +580,7 @@ mod tests { buffer_pool_size: 16 * 1024 * 1024, submit_queue_size_threshold: 16 * 1024 * 1024 * 2, statistics: Arc::::default(), - runtime: Runtime::new(None, None, Handle::current()), + runtime, marker: PhantomData, }; GenericLargeStorage::open(config).await.unwrap() @@ -584,11 +590,18 @@ mod tests { dir: impl AsRef, path: impl AsRef, ) -> GenericLargeStorage, RandomState> { + let dir = dir.as_ref(); + let runtime = Runtime::new(None, None, Handle::current()); let device = device_for_test(dir).await; + let manifest = Manifest::open(dir.join(Manifest::DEFAULT_FILENAME), true, runtime.clone()) + .await + .unwrap(); + let regions = 0..device.regions() as RegionId; let config = GenericLargeStorageConfig { name: "test".to_string(), device, + manifest, regions, compression: Compression::None, flush: true, diff --git a/foyer-storage/src/large/reclaimer.rs b/foyer-storage/src/large/reclaimer.rs index ac2ffb64..1c24db32 100644 --- a/foyer-storage/src/large/reclaimer.rs +++ b/foyer-storage/src/large/reclaimer.rs @@ -31,6 +31,7 @@ use crate::{ scanner::RegionScanner, serde::Sequence, }, + manifest::Manifest, picker::ReinsertionPicker, region::{Region, RegionManager}, runtime::Runtime, @@ -46,6 +47,7 @@ pub struct Reclaimer { impl Reclaimer { #[expect(clippy::too_many_arguments)] pub fn open( + manifest: Manifest, region_manager: RegionManager, reclaim_semaphore: Arc, reinsertion_picker: Arc>, @@ -64,6 +66,7 @@ impl Reclaimer { let (wait_tx, wait_rx) = mpsc::unbounded_channel(); let runner = ReclaimRunner { + manifest, region_manager, reclaim_semaphore, indexer, @@ -99,7 +102,10 @@ where { reinsertion_picker: Arc>, + manifest: Manifest, + region_manager: RegionManager, + reclaim_semaphore: Arc, indexer: Indexer, @@ -171,6 +177,7 @@ where tracing::debug!("[reclaimer]: Start reclaiming region {id}."); + let watermark = self.manifest.sequence_watermark().await; let mut scanner = RegionScanner::new(region.clone(), self.metrics.clone()); let mut picked_count = 0; let mut unpicked = vec![]; @@ -193,6 +200,10 @@ where } Ok(Some((info, key))) => (info, key), }; + if info.sequence < watermark { + unpicked.push(info.hash); + continue; + } if self.reinsertion_picker.pick(&self.stats, &key) { let buffer = match region.read(info.addr.offset as _, info.addr.len as _).await { Err(e) => { diff --git a/foyer-storage/src/large/recover.rs b/foyer-storage/src/large/recover.rs index cc4bf827..b28ae1eb 100644 --- a/foyer-storage/src/large/recover.rs +++ b/foyer-storage/src/large/recover.rs @@ -83,6 +83,7 @@ impl RecoverRunner { // Recover regions concurrently. let semaphore = Arc::new(Semaphore::new(config.recover_concurrency)); let mode = config.recover_mode; + let watermark = config.manifest.sequence_watermark().await; let handles = regions.map(|id| { let semaphore = semaphore.clone(); let region = region_manager.region(id).clone(); @@ -145,6 +146,7 @@ impl RecoverRunner { tracing::trace!("[recover runner]: hash {hash} has versions: {versions:?}"); match versions.pop() { None => None, + Some((sequence, _)) if sequence < watermark => None, Some((_, EntryAddressOrTombstone::Tombstone)) => None, Some((_, EntryAddressOrTombstone::EntryAddress(address))) => { Some(HashedEntryAddress { hash, address }) @@ -167,8 +169,9 @@ impl RecoverRunner { ); // Update components. + let seq = latest_sequence + 1; indexer.insert_batch(indices); - sequence.store(latest_sequence + 1, Ordering::Release); + sequence.store(seq, Ordering::Release); for region in clean_regions { region_manager.mark_clean(region).await; } @@ -178,6 +181,12 @@ impl RecoverRunner { region_manager.reclaim_semaphore().add_permits(permits); region_manager.reclaim_semaphore_countdown().reset(countdown); + if watermark == Sequence::MAX { + // Only manifest error may cause MAX sequence watermark. + // Set it to a normal value after recovery. + config.manifest.update_sequence_watermark(seq).await?; + } + // Note: About reclaim semaphore permits and countdown: // // ``` diff --git a/foyer-storage/src/manifest.rs b/foyer-storage/src/manifest.rs index 083b27e1..e088185d 100644 --- a/foyer-storage/src/manifest.rs +++ b/foyer-storage/src/manifest.rs @@ -27,38 +27,40 @@ use tokio::sync::RwLock as AsyncRwLock; use crate::{ error::{Error, Result}, + large::serde::Sequence, runtime::Runtime, serde::Checksummer, }; /// Persistent metadata for the disk cache. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -struct Metadata { - watermark: u128, -} - -impl Default for Metadata { - /// If the metadata is corrupted, the watermark is supposed to set as the current timestamp to prevent from - /// accessing stale data. - fn default() -> Self { - Self { - watermark: Self::timestamp(), - } - } +pub struct Metadata { + /// watermark for large object disk cache. + pub sequence_watermark: Sequence, + /// watermark for small object disk cache + pub timestamp_watermark: u128, } impl Metadata { - /// | magic 8B | checksum 8B | watermark 16B | - const LENGTH: usize = 8 + 8 + 16; + /// | magic 8B | checksum 8B | sequence watermark 8B | timestamp watermark 16B | + pub const LENGTH: usize = 8 + 8 + 8 + 16; /// | magic 8B | checksum 8B | - const HEADER: usize = 16; + pub const HEADER: usize = 16; /// magic number for metadata - const MAGIC: u64 = 0x20230512deadbeef; + pub const MAGIC: u64 = 0x20230512deadbeef; + + pub fn new(sequence: Sequence, timestamp: u128) -> Self { + Self { + sequence_watermark: sequence, + timestamp_watermark: timestamp, + } + } - fn read(buf: &[u8]) -> Self { + pub fn read(buf: &[u8], default: Self) -> Self { let magic = (&buf[0..8]).get_u64(); let checksum = (&buf[8..16]).get_u64(); - let watermark = (&buf[16..32]).get_u128(); + let sequence_watermark = (&buf[16..24]).get_u64(); + let timestamp_watermark = (&buf[24..40]).get_u128(); let c = Checksummer::checksum64(&buf[Self::HEADER..Self::LENGTH]); @@ -66,14 +68,18 @@ impl Metadata { tracing::warn!( "[manifest]: manifest magic or checksum mismatch, update the watermark to the current timestamp." ); - return Self::default(); + return default; } - Self { watermark } + Self { + sequence_watermark, + timestamp_watermark, + } } - fn write(&self, buf: &mut [u8]) { - (&mut buf[16..32]).put_u128(self.watermark); + pub fn write(&self, buf: &mut [u8]) { + (&mut buf[16..24]).put_u64(self.sequence_watermark); + (&mut buf[24..40]).put_u128(self.timestamp_watermark); let checksum = Checksummer::checksum64(&buf[Self::HEADER..Self::LENGTH]); @@ -81,7 +87,7 @@ impl Metadata { (&mut buf[8..16]).put_u64(checksum); } - fn timestamp() -> u128 { + pub fn timestamp() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() } } @@ -122,7 +128,9 @@ impl Manifest { let metadata = asyncify_with_runtime(runtime.read(), move || { let mut buf = [0; Metadata::LENGTH]; let _ = f.read_exact_at(&mut buf[..], 0); - let metadata = Metadata::read(&buf[..]); + // If the metadata is corrupted, the watermark is supposed to set as the current timestamp to prevent from + // accessing stale data. + let metadata = Metadata::read(&buf[..], Metadata::new(u64::MAX, Metadata::timestamp())); Ok::<_, Error>(metadata) }) .await?; @@ -139,15 +147,19 @@ impl Manifest { }) } - pub async fn watermark(&self) -> u128 { - self.inner.read().await.metadata.watermark + pub async fn sequence_watermark(&self) -> Sequence { + self.inner.read().await.metadata.sequence_watermark + } + + pub async fn timestamp_watermark(&self) -> u128 { + self.inner.read().await.metadata.timestamp_watermark } /// Update watermark and flush. - pub async fn update_watermark(&self, watermark: u128) -> Result<()> { + pub async fn update_timestamp_watermark(&self, watermark: u128) -> Result<()> { let mut inner = self.inner.write().await; - inner.metadata.watermark = watermark; + inner.metadata.timestamp_watermark = watermark; let mut buf = [0; Metadata::LENGTH]; inner.metadata.write(&mut buf[..]); @@ -168,15 +180,11 @@ impl Manifest { Ok(()) } - /// Update watermark to latest and flush. - pub async fn update(&self) -> Result<()> { - self.update_watermark(Metadata::timestamp()).await - } + /// Update watermark and flush. + pub async fn update_sequence_watermark(&self, watermark: Sequence) -> Result<()> { + let mut inner = self.inner.write().await; - /// Flush manifest for persistent. - #[expect(unused)] - pub async fn flush(&self) -> Result<()> { - let inner = self.inner.read().await; + inner.metadata.sequence_watermark = watermark; let mut buf = [0; Metadata::LENGTH]; inner.metadata.write(&mut buf[..]); @@ -192,6 +200,8 @@ impl Manifest { }) .await?; + drop(inner); + Ok(()) } } @@ -205,13 +215,14 @@ mod tests { #[test_log::test] fn test_metadata_serde() { let mut buf = [0; Metadata::LENGTH]; - let mut metadata = Metadata::read(&buf[..]); - assert!(metadata.watermark > 0); + let mut metadata = Metadata::read(&buf[..], Metadata::new(114, 514)); + assert_eq!(metadata.sequence_watermark, 114); + assert_eq!(metadata.timestamp_watermark, 514); - metadata.watermark = 0x0123456789abcdef; + metadata.timestamp_watermark = 0x0123456789abcdef; metadata.write(&mut buf[..]); - let m = Metadata::read(&buf[..]); + let m = Metadata::read(&buf[..], Metadata::new(114, 514)); assert_eq!(metadata, m); } @@ -225,13 +236,13 @@ mod tests { let w = Metadata::timestamp(); - manifest.update_watermark(w).await.unwrap(); + manifest.update_timestamp_watermark(w).await.unwrap(); let manifest = Manifest::open(dir.path().join("manifest"), true, Runtime::current()) .await .unwrap(); - let watermark = manifest.watermark().await; + let watermark = manifest.timestamp_watermark().await; assert_eq!(watermark, w); } diff --git a/foyer-storage/src/region.rs b/foyer-storage/src/region.rs index fad51781..6891ae87 100644 --- a/foyer-storage/src/region.rs +++ b/foyer-storage/src/region.rs @@ -284,16 +284,17 @@ impl RegionManager { ) } + #[expect(unused)] pub fn regions(&self) -> usize { self.inner.regions.len() } - #[expect(dead_code)] + #[expect(unused)] pub fn evictable_regions(&self) -> usize { self.inner.eviction.lock().evictable.len() } - #[expect(dead_code)] + #[expect(unused)] pub fn clean_regions(&self) -> usize { self.inner.clean_region_rx.len() } diff --git a/foyer-storage/src/small/set_manager.rs b/foyer-storage/src/small/set_manager.rs index acaa79d3..9594882c 100644 --- a/foyer-storage/src/small/set_manager.rs +++ b/foyer-storage/src/small/set_manager.rs @@ -29,7 +29,7 @@ use super::{ use crate::{ device::{Dev, MonitoredDevice, RegionId}, error::Result, - manifest::Manifest, + manifest::{Manifest, Metadata}, }; /// # Lock Order @@ -202,7 +202,7 @@ impl SetManager { } pub async fn watermark(&self) -> u128 { - self.inner.manifest.watermark().await + self.inner.manifest.timestamp_watermark().await } pub async fn destroy(&self) -> Result<()> { @@ -212,7 +212,10 @@ impl SetManager { } async fn update_watermark(&self) -> Result<()> { - self.inner.manifest.update().await + self.inner + .manifest + .update_timestamp_watermark(Metadata::timestamp()) + .await } async fn storage(&self, id: SetId) -> Result { diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index d6ba540a..69734af4 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -571,6 +571,7 @@ where EngineEnum::open(EngineConfig::Large(GenericLargeStorageConfig { name: self.name, device, + manifest, regions, compression: self.compression, flush: self.flush, @@ -620,7 +621,7 @@ where set_cache_capacity: self.small.set_cache_capacity, set_cache_shards: self.small.set_cache_shards, device: device.clone(), - manifest, + manifest: manifest.clone(), regions: small_regions, flush: self.flush, flushers: self.small.flushers, @@ -632,6 +633,7 @@ where right: GenericLargeStorageConfig { name: self.name, device, + manifest, regions: large_regions, compression: self.compression, flush: self.flush, From 4e8250a01214b526fc9f401f3e5ccf491263b2fa Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sat, 26 Oct 2024 14:55:07 +0000 Subject: [PATCH 3/6] fix: fix build on windows Signed-off-by: MrCroxx --- foyer-storage/src/manifest.rs | 41 +++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/foyer-storage/src/manifest.rs b/foyer-storage/src/manifest.rs index e088185d..882b1e33 100644 --- a/foyer-storage/src/manifest.rs +++ b/foyer-storage/src/manifest.rs @@ -14,7 +14,6 @@ use std::{ fs::{File, OpenOptions}, - os::unix::fs::FileExt, path::Path, sync::Arc, time::{SystemTime, UNIX_EPOCH}, @@ -127,7 +126,19 @@ impl Manifest { let f = file.clone(); let metadata = asyncify_with_runtime(runtime.read(), move || { let mut buf = [0; Metadata::LENGTH]; - let _ = f.read_exact_at(&mut buf[..], 0); + + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + let _ = f.seek_read(&mut buf[..], 0); + }; + + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::FileExt; + let _ = f.read_exact_at(&mut buf[..], 0); + }; + // If the metadata is corrupted, the watermark is supposed to set as the current timestamp to prevent from // accessing stale data. let metadata = Metadata::read(&buf[..], Metadata::new(u64::MAX, Metadata::timestamp())); @@ -167,7 +178,18 @@ impl Manifest { let file = inner.file.clone(); let flush = inner.flush; asyncify_with_runtime(inner.runtime.write(), move || { - file.write_all_at(&buf[..], 0)?; + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + f.seek_write(&buf[..], 0)?; + }; + + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::FileExt; + file.write_all_at(&buf[..], 0)?; + }; + if flush { file.sync_data()?; } @@ -192,7 +214,18 @@ impl Manifest { let file = inner.file.clone(); let flush = inner.flush; asyncify_with_runtime(inner.runtime.write(), move || { - file.write_all_at(&buf[..], 0)?; + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + file.seek_write(&buf[..], 0)?; + }; + + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::FileExt; + file.write_all_at(&buf[..], 0)?; + }; + if flush { file.sync_data()?; } From 091793018d54638a9ed244b320a54d340486a28f Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sun, 27 Oct 2024 07:14:01 +0000 Subject: [PATCH 4/6] fix: fix build on windows Signed-off-by: MrCroxx --- foyer-storage/src/manifest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foyer-storage/src/manifest.rs b/foyer-storage/src/manifest.rs index 882b1e33..a3afa863 100644 --- a/foyer-storage/src/manifest.rs +++ b/foyer-storage/src/manifest.rs @@ -181,7 +181,7 @@ impl Manifest { #[cfg(target_family = "windows")] { use std::os::windows::fs::FileExt; - f.seek_write(&buf[..], 0)?; + file.seek_write(&buf[..], 0)?; }; #[cfg(target_family = "unix")] From d7b29794bc31977a92f5108b0d0d8d8d87105a58 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sun, 27 Oct 2024 15:57:37 +0000 Subject: [PATCH 5/6] feat: support set manifest path in foyer-bench Signed-off-by: MrCroxx --- foyer-bench/src/main.rs | 10 ++++++++++ foyer-storage/src/large/recover.rs | 8 ++------ foyer/src/hybrid/builder.rs | 19 ++++++++++++++++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index 251084ae..acf48648 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -81,6 +81,12 @@ struct Args { #[arg(short, long)] dir: Option, + /// Setup path for the manifest file. + /// + /// The manifest file is required with `DirectFile` device. + #[arg(short, long)] + manifest: Option, + /// In-memory cache capacity. #[arg(long, default_value_t = ByteSize::gib(1))] mem: ByteSize, @@ -484,6 +490,10 @@ async fn benchmark(args: Args) { _ => unreachable!(), }; + if let Some(path) = &args.manifest { + builder = builder.with_manifest_file_path(path); + } + builder = builder .with_flush(args.flush) .with_recover_mode(args.recover_mode) diff --git a/foyer-storage/src/large/recover.rs b/foyer-storage/src/large/recover.rs index b28ae1eb..13b6d3de 100644 --- a/foyer-storage/src/large/recover.rs +++ b/foyer-storage/src/large/recover.rs @@ -180,12 +180,8 @@ impl RecoverRunner { } region_manager.reclaim_semaphore().add_permits(permits); region_manager.reclaim_semaphore_countdown().reset(countdown); - - if watermark == Sequence::MAX { - // Only manifest error may cause MAX sequence watermark. - // Set it to a normal value after recovery. - config.manifest.update_sequence_watermark(seq).await?; - } + // Update the manifest sequence watermark with the smallest possible value. + config.manifest.update_sequence_watermark(seq).await?; // Note: About reclaim semaphore permits and countdown: // diff --git a/foyer/src/hybrid/builder.rs b/foyer/src/hybrid/builder.rs index 826c091a..9f274972 100644 --- a/foyer/src/hybrid/builder.rs +++ b/foyer/src/hybrid/builder.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, path::Path, sync::Arc}; use ahash::RandomState; use foyer_common::{ @@ -215,6 +215,23 @@ where } } + /// Set the path for the manifest file. + /// + /// The manifest file is used to tracking the watermark for fast disk cache clearing. + /// + /// When creating a disk cache on a fs, it is not required to set the manifest file path. + /// + /// When creating a disk cache on a raw device, it is required to set the manifest file path. + pub fn with_manifest_file_path(self, path: impl AsRef) -> Self { + let builder = self.builder.with_manifest_file_path(path); + Self { + name: self.name, + tracing_options: self.tracing_options, + memory: self.memory, + builder, + } + } + /// Enable/disable `sync` after writes. /// /// Default: `false`. From d6c6690cdca2fb6f7f05cde0816267a1ddd87f38 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sun, 27 Oct 2024 16:15:26 +0000 Subject: [PATCH 6/6] fix: fix watermark updates Signed-off-by: MrCroxx --- foyer-storage/src/large/recover.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/foyer-storage/src/large/recover.rs b/foyer-storage/src/large/recover.rs index 13b6d3de..51faa8e3 100644 --- a/foyer-storage/src/large/recover.rs +++ b/foyer-storage/src/large/recover.rs @@ -180,8 +180,10 @@ impl RecoverRunner { } region_manager.reclaim_semaphore().add_permits(permits); region_manager.reclaim_semaphore_countdown().reset(countdown); - // Update the manifest sequence watermark with the smallest possible value. - config.manifest.update_sequence_watermark(seq).await?; + if watermark > seq { + // Update the manifest sequence watermark with the smallest possible value. + config.manifest.update_sequence_watermark(seq).await?; + } // Note: About reclaim semaphore permits and countdown: //