Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: split large io by flush_io_size in the new model #874

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions foyer-storage/src/large/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
bits,
code::{StorageKey, StorageValue},
metrics::Metrics,
rate::RateLimiter,
};
use foyer_memory::Piece;
use futures_core::future::BoxFuture;
use futures_util::{
future::{try_join, try_join_all},
future::{join_all, try_join, try_join_all},
FutureExt,
};
use itertools::Itertools;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Semaphore};

use super::{
batch::{Batch, BatchWriter, EntryWriter, Op},
Expand Down Expand Up @@ -152,6 +153,8 @@
stats: Arc<Statistics>,
metrics: Arc<Metrics>,
runtime: &Runtime,
flush_io_depth_limiter: Arc<Semaphore>,
flush_io_throughput_limiter: Option<Arc<RateLimiter>>,
#[cfg(test)] flush_holder: FlushHolder,
) -> Result<Self> {
let (tx, rx) = flume::unbounded();
Expand All @@ -167,6 +170,8 @@
let current_region_handle = region_manager.get_clean_region();
let remain = device.region_size();

let flush_io_size = bits::align_up(PAGE, config.flush_io_size);

let runner = Runner {
rx: Some(rx),
writer,
Expand All @@ -175,6 +180,9 @@
rotate_buffer,
queue_init: None,
submit_queue_size: submit_queue_size.clone(),
flush_io_size,
flush_io_depth_limiter,
flush_io_throughput_limiter,
region_manager,
device,
indexer,
Expand Down Expand Up @@ -229,7 +237,6 @@
current_region_remain: usize,
metrics: Arc<Metrics>,
) -> BatchWriter {
// TODO(MrCroxx): optimize buffer allocation.
BatchWriter::new(buffer, region_size, current_region_remain, metrics)
}

Expand Down Expand Up @@ -271,6 +278,9 @@
rotate_buffer: Option<IoBuffer>,

submit_queue_size: Arc<AtomicUsize>,
flush_io_size: usize,
flush_io_depth_limiter: Arc<Semaphore>,
flush_io_throughput_limiter: Option<Arc<RateLimiter>>,

current_region_handle: GetCleanRegionHandle,
remain: usize,
Expand Down Expand Up @@ -425,7 +435,6 @@
waiters: Vec<oneshot::Sender<()>>,
init: Instant,
) -> BoxFuture<'static, IoTaskCtx> {
// ) {
tracing::trace!(
?batch,
?tombstone_infos,
Expand Down Expand Up @@ -481,6 +490,9 @@
let flush = self.flush;
let slice = shared.absolute_slice(window.absolute_dirty_range.clone());
let metrics = self.metrics.clone();
let flush_io_size = self.flush_io_size;
let flush_io_depth_limiter = self.flush_io_depth_limiter.clone();
let flush_io_throughput_limiter = self.flush_io_throughput_limiter.clone();

async move {
// Wait for region is clean.
Expand All @@ -495,8 +507,36 @@
tracing::trace!(region = region.id(), offset, len, "[flusher]: prepare to write region");

if !window.is_empty() {
let (_, res) = region.write(slice, offset as _).await;
res?;
let ios = len / flush_io_size + if len % flush_io_size == 0 { 0 } else { 1 };
let ios = (0..ios).map(|i| {
let start = i * flush_io_size;
let end = std::cmp::min(start + flush_io_size, len);
let io_slice = slice.slice(start..end);
let region = region.clone();
let flush_io_depth_limiter = flush_io_depth_limiter.clone();
let flush_io_throughput_limiter = flush_io_throughput_limiter.clone();
async move {
let guard = flush_io_depth_limiter.acquire_owned().await.unwrap();
if let Some(limiter) = flush_io_throughput_limiter {
if let Some(duration) = limiter.consume(io_slice.len() as _) {
tokio::time::sleep(duration).await;
}

Check warning on line 523 in foyer-storage/src/large/flusher.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/flusher.rs#L521-L523

Added lines #L521 - L523 were not covered by tests
}
let res = region.write(io_slice, (offset + start) as _).await;
drop(guard);
res
}
});
let res = join_all(ios).await;
let mut errs = vec![];
for (_, r) in res {
if let Err(e) = r {
errs.push(e);

Check warning on line 534 in foyer-storage/src/large/flusher.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/flusher.rs#L534

Added line #L534 was not covered by tests
}
}
if !errs.is_empty() {
return Err(Error::multiple(errs));

Check warning on line 538 in foyer-storage/src/large/flusher.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/flusher.rs#L538

Added line #L538 was not covered by tests
}

if flush {
region.flush().await?;
Expand Down
17 changes: 17 additions & 0 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
bits,
code::{StorageKey, StorageValue},
metrics::Metrics,
rate::RateLimiter,
};
use foyer_memory::Piece;
use futures_util::future::{join_all, try_join_all};
Expand Down Expand Up @@ -76,6 +77,9 @@
pub reclaimers: usize,
pub buffer_pool_size: usize,
pub submit_queue_size_threshold: usize,
pub flush_io_size: usize,
pub flush_io_depth: usize,
pub flush_io_throughput: Option<usize>,
pub clean_region_threshold: usize,
pub eviction_pickers: Vec<Box<dyn EvictionPicker>>,
pub reinsertion_picker: Arc<dyn ReinsertionPicker>,
Expand All @@ -102,6 +106,9 @@
.field("reclaimers", &self.reclaimers)
.field("buffer_pool_size", &self.buffer_pool_size)
.field("submit_queue_size_threshold", &self.submit_queue_size_threshold)
.field("flush_io_size", &self.flush_io_size)
.field("flush_io_depth", &self.flush_io_depth)
.field("flush_io_throughput", &self.flush_io_throughput)

Check warning on line 111 in foyer-storage/src/large/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/generic.rs#L109-L111

Added lines #L109 - L111 were not covered by tests
.field("clean_region_threshold", &self.clean_region_threshold)
.field("eviction_pickers", &self.eviction_pickers)
.field("reinsertion_pickers", &self.reinsertion_picker)
Expand Down Expand Up @@ -238,6 +245,8 @@
#[cfg(test)]
let flush_holder = FlushHolder::default();

let flush_io_depth_limiter = Arc::new(Semaphore::new(config.flush_io_depth));
let flush_io_throughput_limiter = config.flush_io_throughput.map(|v| Arc::new(RateLimiter::new(v as _)));
let flushers = try_join_all((0..config.flushers).map(|_| async {
Flusher::open(
&config,
Expand All @@ -249,6 +258,8 @@
stats.clone(),
metrics.clone(),
&config.runtime,
flush_io_depth_limiter.clone(),
flush_io_throughput_limiter.clone(),
#[cfg(test)]
flush_holder.clone(),
)
Expand Down Expand Up @@ -597,6 +608,9 @@
tombstone_log_config: None,
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
flush_io_size: 128 * 1024,
flush_io_depth: 256,
flush_io_throughput: None,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
Expand Down Expand Up @@ -626,6 +640,9 @@
tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()),
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
flush_io_size: 128 * 1024,
flush_io_depth: 256,
flush_io_throughput: None,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
marker: PhantomData,
Expand Down
44 changes: 44 additions & 0 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,9 @@
tombstone_log_config: self.large.tombstone_log_config,
buffer_pool_size: self.large.buffer_pool_size,
submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2),
flush_io_size: self.large.flush_io_size,
flush_io_depth: self.large.flush_io_depth,
flush_io_throughput: self.large.flush_io_throughput,
statistics: statistics.clone(),
runtime,
marker: PhantomData,
Expand Down Expand Up @@ -621,6 +624,9 @@
tombstone_log_config: self.large.tombstone_log_config,
buffer_pool_size: self.large.buffer_pool_size,
submit_queue_size_threshold: self.large.submit_queue_size_threshold.unwrap_or(self.large.buffer_pool_size * 2),
flush_io_size: self.large.flush_io_size,
flush_io_depth: self.large.flush_io_depth,
flush_io_throughput: self.large.flush_io_throughput,
statistics: statistics.clone(),
runtime,
marker: PhantomData,
Expand Down Expand Up @@ -670,6 +676,9 @@
reclaimers: usize,
buffer_pool_size: usize,
submit_queue_size_threshold: Option<usize>,
flush_io_size: usize,
flush_io_depth: usize,
flush_io_throughput: Option<usize>,
clean_region_threshold: Option<usize>,
eviction_pickers: Vec<Box<dyn EvictionPicker>>,
reinsertion_picker: Arc<dyn ReinsertionPicker>,
Expand Down Expand Up @@ -704,6 +713,9 @@
reclaimers: 1,
buffer_pool_size: 16 * 1024 * 1024, // 16 MiB
submit_queue_size_threshold: None,
flush_io_size: 128 * 1024, // 128 KiB
flush_io_depth: 256,
flush_io_throughput: None,
clean_region_threshold: None,
eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::<FifoPicker>::default()],
reinsertion_picker: Arc::<RejectAllPicker>::default(),
Expand Down Expand Up @@ -818,6 +830,38 @@
self.tombstone_log_config = Some(tombstone_log_config);
self
}

/// Limit the maximum flush io size.
///
/// One larger disk write while flushing will be split into multiple disk writes based on this value.
///
/// The value will be align to 4K if it is not.
///
/// Default: 128 * 1024
pub fn with_flush_io_size(mut self, flush_io_size: usize) -> Self {
self.flush_io_size = flush_io_size;
self
}

Check warning on line 844 in foyer-storage/src/store.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/store.rs#L841-L844

Added lines #L841 - L844 were not covered by tests

/// Limit the flush io depth.
///
/// Limit the disk writes submitted to the disk based on this value.
///
/// Default: 256.
pub fn with_flush_io_depth(mut self, flush_io_depth: usize) -> Self {
self.flush_io_depth = flush_io_depth;
self
}

Check warning on line 854 in foyer-storage/src/store.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/store.rs#L851-L854

Added lines #L851 - L854 were not covered by tests

/// Limit the flush io throughput.
///
/// Limit the disk write throughput submitted to the disk based on this value.
///
/// Default: Unlimited.
pub fn with_flush_io_throughput(mut self, flush_io_throughput: usize) -> Self {
self.flush_io_throughput = Some(flush_io_throughput);
self
}

Check warning on line 864 in foyer-storage/src/store.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/store.rs#L861-L864

Added lines #L861 - L864 were not covered by tests
}

/// Small object disk cache engine default options.
Expand Down
Loading