Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix the ghost queue bug with s3fifo #432

Merged
merged 5 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foyer-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ parking_lot = "0.12"
tokio = { workspace = true }

[dev-dependencies]
csv = "1.3.0"
moka = { version = "0", features = ["sync"] }
rand = { version = "0.8", features = ["small_rng"] }
zipf = "7.0.1"
Expand Down
98 changes: 81 additions & 17 deletions foyer-memory/benches/bench_hit_ratio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use csv::Reader;
use foyer_memory::{Cache, CacheBuilder, FifoConfig, LfuConfig, LruConfig, S3FifoConfig};
use rand::{distributions::Distribution, thread_rng};

Expand Down Expand Up @@ -66,7 +67,7 @@ fn cache_hit(cache: Cache<CacheKey, CacheValue>, keys: Arc<Vec<CacheKey>>) -> f6
cache.insert(key.clone(), ());
}
}
hit as f64 / ITERATIONS as f64
(hit as f64) / (keys.len() as f64)
}

fn moka_cache_hit(cache: &moka::sync::Cache<CacheKey, CacheValue>, keys: &[String]) -> f64 {
Expand All @@ -79,7 +80,7 @@ fn moka_cache_hit(cache: &moka::sync::Cache<CacheKey, CacheValue>, keys: &[Strin
cache.insert(key.clone(), ());
}
}
hit as f64 / ITERATIONS as f64
hit as f64 / (keys.len() as f64)
}

fn new_fifo_cache(capacity: usize) -> Cache<CacheKey, CacheValue> {
Expand Down Expand Up @@ -130,33 +131,21 @@ fn new_s3fifo_cache_w_ghost(capacity: usize) -> Cache<CacheKey, CacheValue> {
.with_shards(SHARDS)
.with_eviction_config(S3FifoConfig {
small_queue_capacity_ratio: 0.1,
ghost_queue_capacity_ratio: 10.0,
ghost_queue_capacity_ratio: 1.0,
small_to_main_freq_threshold: 2,
})
.with_object_pool_capacity(OBJECT_POOL_CAPACITY)
.build()
}

fn bench_one(zif_exp: f64, cache_size_percent: f64) {
print!("{zif_exp:6.2}, {cache_size_percent:6}{:6}", "");
let mut rng = thread_rng();
let zipf = zipf::ZipfDistribution::new(ITEMS, zif_exp).unwrap();

let cache_size = (ITEMS as f64 * cache_size_percent) as usize;

fn bench_workload(keys: Vec<String>, cache_size: usize) {
let fifo_cache = new_fifo_cache(cache_size);
let lru_cache = new_lru_cache(cache_size);
let lfu_cache = new_lfu_cache(cache_size);
let s3fifo_cache_wo_ghost = new_s3fifo_cache_wo_ghost(cache_size);
let s3fifo_cache_w_ghost = new_s3fifo_cache_w_ghost(cache_size);
let moka_cache = moka::sync::Cache::new(cache_size as u64);

let mut keys = Vec::with_capacity(ITERATIONS);
for _ in 0..ITERATIONS {
let key = zipf.sample(&mut rng).to_string();
keys.push(key.clone());
}

let keys = Arc::new(keys);

// Use multiple threads to simulate concurrent read-through requests.
Expand Down Expand Up @@ -212,10 +201,24 @@ fn bench_one(zif_exp: f64, cache_size_percent: f64) {
println!();
}

fn bench_one(zif_exp: f64, cache_size_percent: f64) {
print!("{zif_exp:6.2}, {cache_size_percent:6}{:6}", "");
let mut rng = thread_rng();
let zipf = zipf::ZipfDistribution::new(ITEMS, zif_exp).unwrap();

let cache_size = (ITEMS as f64 * cache_size_percent) as usize;
let mut keys = Vec::with_capacity(ITERATIONS);
for _ in 0..ITERATIONS {
let key = zipf.sample(&mut rng).to_string();
keys.push(key.clone());
}
bench_workload(keys, cache_size);
}

fn bench_zipf_hit() {
println!(
"{:30}{:16}{:16}{:16}{:16}{:16}{:16}",
"zif_exp, cache_size", "fifo", "lru", "lfu", "s3fifo (0g)", "s3fifo (10g)", "moka"
"zif_exp, cache_size", "fifo", "lru", "lfu", "s3fifo (0g)", "s3fifo (1g)", "moka"
);
for zif_exp in [0.9, 1.0, 1.05, 1.1, 1.5] {
for cache_capacity in [0.005, 0.01, 0.05, 0.1, 0.25] {
Expand All @@ -224,6 +227,67 @@ fn bench_zipf_hit() {
}
}

fn read_twitter_trace(path: &str, limit: usize) -> Vec<String> {
let file = std::fs::File::open(path).unwrap();
let mut reader = Reader::from_reader(file);
let mut keys = Vec::new();
for result in reader.records() {
let record = result.unwrap();
let key = record.get(1).unwrap().to_string();
keys.push(key);
if keys.len() >= limit {
break;
}
}
keys
}

/*
cache_size fifo lru lfu s3fifo (0g) s3fifo (1g) moka
50000 67.50% 70.51% 74.99% 70.88% 72.33% 64.70%
zif_exp, cache_size fifo lru lfu s3fifo (0g) s3fifo (1g) moka
0.90, 0.005 16.24% 19.20% 32.38% 32.06% 31.94% 33.44%
0.90, 0.01 22.55% 26.21% 38.56% 39.27% 38.46% 37.86%
0.90, 0.05 41.10% 45.61% 55.41% 56.64% 55.37% 55.19%
0.90, 0.1 51.05% 55.69% 63.81% 65.27% 63.61% 64.16%
0.90, 0.25 66.76% 71.15% 76.17% 77.53% 75.68% 77.11%
1.00, 0.005 26.59% 31.05% 44.11% 44.37% 43.54% 45.54%
1.00, 0.01 34.36% 39.13% 50.64% 51.40% 50.59% 50.69%
1.00, 0.05 54.03% 58.75% 66.80% 67.94% 66.81% 66.91%
1.00, 0.1 63.16% 67.62% 73.93% 75.01% 73.83% 74.38%
1.00, 0.25 76.17% 79.93% 83.61% 84.53% 83.27% 84.36%
1.05, 0.005 32.64% 37.67% 50.23% 50.31% 49.63% 51.83%
1.05, 0.01 40.90% 46.02% 56.71% 57.59% 56.67% 56.99%
1.05, 0.05 60.44% 65.03% 72.05% 73.02% 72.09% 72.31%
1.05, 0.1 68.90% 73.10% 78.50% 79.36% 78.39% 78.98%
1.05, 0.25 80.34% 83.71% 86.77% 87.47% 86.49% 87.40%
1.10, 0.005 38.98% 44.45% 56.25% 56.62% 55.62% 57.89%
1.10, 0.01 47.66% 52.98% 62.65% 63.55% 62.62% 63.22%
1.10, 0.05 66.55% 70.91% 76.90% 77.77% 76.96% 77.22%
1.10, 0.1 74.27% 78.07% 82.56% 83.38% 82.51% 82.97%
1.10, 0.25 84.19% 87.10% 89.56% 90.08% 89.34% 90.08%
1.50, 0.005 81.20% 85.30% 88.90% 89.32% 88.79% 89.92%
1.50, 0.01 86.91% 89.87% 92.25% 92.66% 92.29% 92.76%
1.50, 0.05 94.77% 96.05% 96.96% 97.08% 96.96% 97.10%
1.50, 0.1 96.64% 97.50% 98.05% 98.10% 98.04% 98.12%
1.50, 0.25 98.37% 98.82% 99.05% 99.03% 99.03% 99.10%
*/
fn main() {
// Try to read the csv file path by environment variable.
let path = std::env::var("TWITTER_TRACE_PATH").ok();
if let Some(path) = path {
// Limit the number of keys to read.
// MAX means read all keys, which may take a really long time.
Comment on lines +276 to +280
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an interactive command to download the dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Download this is a little bit complex.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Download this is a little bit complex.

Hi @xiaguan , where can I find the dataset? Let me handle the download part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/twitter/cache-trace, I think I used miss ratio related cluster's data.
I use SNIA to download. In this way, I could just download part of it(6GB).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it is really big. 🤣

let limit = usize::MAX;
let capacity = 50_000;
let keys = read_twitter_trace(&path, limit);
println!(
"{:30}{:16}{:16}{:16}{:16}{:16}{:16}",
"cache_size", "fifo", "lru", "lfu", "s3fifo (0g)", "s3fifo (1g)", "moka"
);
print!("{capacity:10}");
print!("{:9}", " ");
bench_workload(keys, capacity);
}
bench_zipf_hit();
}
24 changes: 7 additions & 17 deletions foyer-memory/src/eviction/s3fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
collections::{HashSet, VecDeque},
fmt::Debug,
ptr::NonNull,
};
Expand Down Expand Up @@ -235,7 +235,6 @@ where

unsafe fn push(&mut self, mut ptr: NonNull<Self::Handle>) {
let handle = ptr.as_mut();

debug_assert_eq!(handle.freq, 0);
debug_assert_eq!(handle.queue, Queue::None);

Expand Down Expand Up @@ -331,7 +330,7 @@ unsafe impl<T> Send for S3Fifo<T> where T: Send + Sync + 'static {}
unsafe impl<T> Sync for S3Fifo<T> where T: Send + Sync + 'static {}

struct GhostQueue {
counts: HashMap<u64, usize>,
counts: HashSet<u64>,
queue: VecDeque<(u64, usize)>,
capacity: usize,
weight: usize,
Expand All @@ -340,7 +339,7 @@ struct GhostQueue {
impl GhostQueue {
fn new(capacity: usize) -> Self {
Self {
counts: HashMap::default(),
counts: HashSet::default(),
queue: VecDeque::new(),
capacity,
weight: 0,
Expand All @@ -351,32 +350,23 @@ impl GhostQueue {
if self.capacity == 0 {
return;
}

while self.weight + weight > self.capacity && self.weight > 0 {
self.pop();
}
*self.counts.entry(hash).or_default() += 1;
self.queue.push_back((hash, weight));
self.counts.insert(hash);
self.weight += weight;
}

fn pop(&mut self) {
if let Some((hash, weight)) = self.queue.pop_front() {
self.weight -= weight;
match self.counts.entry(hash) {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut o) => {
let count = o.get_mut();
*count -= 1;
if *count == 0 {
o.remove();
}
}
}
self.counts.remove(&hash);
}
}

fn contains(&self, hash: u64) -> bool {
self.counts.contains_key(&hash)
self.counts.contains(&hash)
}
}

Expand Down
Loading