Skip to content

Commit 4033b00

Browse files
authored
fix: fix the ghost queue bug with s3fifo (#432)
* add real workload bench * fix s3fifo impl * make fast * fix pull request comments
1 parent e612b5a commit 4033b00

File tree

3 files changed

+89
-34
lines changed

3 files changed

+89
-34
lines changed

foyer-memory/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ parking_lot = "0.12"
2424
tokio = { workspace = true }
2525

2626
[dev-dependencies]
27+
csv = "1.3.0"
2728
moka = { version = "0", features = ["sync"] }
2829
rand = { version = "0.8", features = ["small_rng"] }
2930
zipf = "7.0.1"

foyer-memory/benches/bench_hit_ratio.rs

+81-17
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use csv::Reader;
1718
use foyer_memory::{Cache, CacheBuilder, FifoConfig, LfuConfig, LruConfig, S3FifoConfig};
1819
use rand::{distributions::Distribution, thread_rng};
1920

@@ -66,7 +67,7 @@ fn cache_hit(cache: Cache<CacheKey, CacheValue>, keys: Arc<Vec<CacheKey>>) -> f6
6667
cache.insert(key.clone(), ());
6768
}
6869
}
69-
hit as f64 / ITERATIONS as f64
70+
(hit as f64) / (keys.len() as f64)
7071
}
7172

7273
fn moka_cache_hit(cache: &moka::sync::Cache<CacheKey, CacheValue>, keys: &[String]) -> f64 {
@@ -79,7 +80,7 @@ fn moka_cache_hit(cache: &moka::sync::Cache<CacheKey, CacheValue>, keys: &[Strin
7980
cache.insert(key.clone(), ());
8081
}
8182
}
82-
hit as f64 / ITERATIONS as f64
83+
hit as f64 / (keys.len() as f64)
8384
}
8485

8586
fn new_fifo_cache(capacity: usize) -> Cache<CacheKey, CacheValue> {
@@ -130,33 +131,21 @@ fn new_s3fifo_cache_w_ghost(capacity: usize) -> Cache<CacheKey, CacheValue> {
130131
.with_shards(SHARDS)
131132
.with_eviction_config(S3FifoConfig {
132133
small_queue_capacity_ratio: 0.1,
133-
ghost_queue_capacity_ratio: 10.0,
134+
ghost_queue_capacity_ratio: 1.0,
134135
small_to_main_freq_threshold: 2,
135136
})
136137
.with_object_pool_capacity(OBJECT_POOL_CAPACITY)
137138
.build()
138139
}
139140

140-
fn bench_one(zif_exp: f64, cache_size_percent: f64) {
141-
print!("{zif_exp:6.2}, {cache_size_percent:6}{:6}", "");
142-
let mut rng = thread_rng();
143-
let zipf = zipf::ZipfDistribution::new(ITEMS, zif_exp).unwrap();
144-
145-
let cache_size = (ITEMS as f64 * cache_size_percent) as usize;
146-
141+
fn bench_workload(keys: Vec<String>, cache_size: usize) {
147142
let fifo_cache = new_fifo_cache(cache_size);
148143
let lru_cache = new_lru_cache(cache_size);
149144
let lfu_cache = new_lfu_cache(cache_size);
150145
let s3fifo_cache_wo_ghost = new_s3fifo_cache_wo_ghost(cache_size);
151146
let s3fifo_cache_w_ghost = new_s3fifo_cache_w_ghost(cache_size);
152147
let moka_cache = moka::sync::Cache::new(cache_size as u64);
153148

154-
let mut keys = Vec::with_capacity(ITERATIONS);
155-
for _ in 0..ITERATIONS {
156-
let key = zipf.sample(&mut rng).to_string();
157-
keys.push(key.clone());
158-
}
159-
160149
let keys = Arc::new(keys);
161150

162151
// Use multiple threads to simulate concurrent read-through requests.
@@ -212,10 +201,24 @@ fn bench_one(zif_exp: f64, cache_size_percent: f64) {
212201
println!();
213202
}
214203

204+
fn bench_one(zif_exp: f64, cache_size_percent: f64) {
205+
print!("{zif_exp:6.2}, {cache_size_percent:6}{:6}", "");
206+
let mut rng = thread_rng();
207+
let zipf = zipf::ZipfDistribution::new(ITEMS, zif_exp).unwrap();
208+
209+
let cache_size = (ITEMS as f64 * cache_size_percent) as usize;
210+
let mut keys = Vec::with_capacity(ITERATIONS);
211+
for _ in 0..ITERATIONS {
212+
let key = zipf.sample(&mut rng).to_string();
213+
keys.push(key.clone());
214+
}
215+
bench_workload(keys, cache_size);
216+
}
217+
215218
fn bench_zipf_hit() {
216219
println!(
217220
"{:30}{:16}{:16}{:16}{:16}{:16}{:16}",
218-
"zif_exp, cache_size", "fifo", "lru", "lfu", "s3fifo (0g)", "s3fifo (10g)", "moka"
221+
"zif_exp, cache_size", "fifo", "lru", "lfu", "s3fifo (0g)", "s3fifo (1g)", "moka"
219222
);
220223
for zif_exp in [0.9, 1.0, 1.05, 1.1, 1.5] {
221224
for cache_capacity in [0.005, 0.01, 0.05, 0.1, 0.25] {
@@ -224,6 +227,67 @@ fn bench_zipf_hit() {
224227
}
225228
}
226229

230+
fn read_twitter_trace(path: &str, limit: usize) -> Vec<String> {
231+
let file = std::fs::File::open(path).unwrap();
232+
let mut reader = Reader::from_reader(file);
233+
let mut keys = Vec::new();
234+
for result in reader.records() {
235+
let record = result.unwrap();
236+
let key = record.get(1).unwrap().to_string();
237+
keys.push(key);
238+
if keys.len() >= limit {
239+
break;
240+
}
241+
}
242+
keys
243+
}
244+
245+
/*
246+
cache_size fifo lru lfu s3fifo (0g) s3fifo (1g) moka
247+
50000 67.50% 70.51% 74.99% 70.88% 72.33% 64.70%
248+
zif_exp, cache_size fifo lru lfu s3fifo (0g) s3fifo (1g) moka
249+
0.90, 0.005 16.24% 19.20% 32.38% 32.06% 31.94% 33.44%
250+
0.90, 0.01 22.55% 26.21% 38.56% 39.27% 38.46% 37.86%
251+
0.90, 0.05 41.10% 45.61% 55.41% 56.64% 55.37% 55.19%
252+
0.90, 0.1 51.05% 55.69% 63.81% 65.27% 63.61% 64.16%
253+
0.90, 0.25 66.76% 71.15% 76.17% 77.53% 75.68% 77.11%
254+
1.00, 0.005 26.59% 31.05% 44.11% 44.37% 43.54% 45.54%
255+
1.00, 0.01 34.36% 39.13% 50.64% 51.40% 50.59% 50.69%
256+
1.00, 0.05 54.03% 58.75% 66.80% 67.94% 66.81% 66.91%
257+
1.00, 0.1 63.16% 67.62% 73.93% 75.01% 73.83% 74.38%
258+
1.00, 0.25 76.17% 79.93% 83.61% 84.53% 83.27% 84.36%
259+
1.05, 0.005 32.64% 37.67% 50.23% 50.31% 49.63% 51.83%
260+
1.05, 0.01 40.90% 46.02% 56.71% 57.59% 56.67% 56.99%
261+
1.05, 0.05 60.44% 65.03% 72.05% 73.02% 72.09% 72.31%
262+
1.05, 0.1 68.90% 73.10% 78.50% 79.36% 78.39% 78.98%
263+
1.05, 0.25 80.34% 83.71% 86.77% 87.47% 86.49% 87.40%
264+
1.10, 0.005 38.98% 44.45% 56.25% 56.62% 55.62% 57.89%
265+
1.10, 0.01 47.66% 52.98% 62.65% 63.55% 62.62% 63.22%
266+
1.10, 0.05 66.55% 70.91% 76.90% 77.77% 76.96% 77.22%
267+
1.10, 0.1 74.27% 78.07% 82.56% 83.38% 82.51% 82.97%
268+
1.10, 0.25 84.19% 87.10% 89.56% 90.08% 89.34% 90.08%
269+
1.50, 0.005 81.20% 85.30% 88.90% 89.32% 88.79% 89.92%
270+
1.50, 0.01 86.91% 89.87% 92.25% 92.66% 92.29% 92.76%
271+
1.50, 0.05 94.77% 96.05% 96.96% 97.08% 96.96% 97.10%
272+
1.50, 0.1 96.64% 97.50% 98.05% 98.10% 98.04% 98.12%
273+
1.50, 0.25 98.37% 98.82% 99.05% 99.03% 99.03% 99.10%
274+
*/
227275
fn main() {
276+
// Try to read the csv file path by environment variable.
277+
let path = std::env::var("TWITTER_TRACE_PATH").ok();
278+
if let Some(path) = path {
279+
// Limit the number of keys to read.
280+
// MAX means read all keys, which may take a really long time.
281+
let limit = usize::MAX;
282+
let capacity = 50_000;
283+
let keys = read_twitter_trace(&path, limit);
284+
println!(
285+
"{:30}{:16}{:16}{:16}{:16}{:16}{:16}",
286+
"cache_size", "fifo", "lru", "lfu", "s3fifo (0g)", "s3fifo (1g)", "moka"
287+
);
288+
print!("{capacity:10}");
289+
print!("{:9}", " ");
290+
bench_workload(keys, capacity);
291+
}
228292
bench_zipf_hit();
229293
}

foyer-memory/src/eviction/s3fifo.rs

+7-17
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::{
16-
collections::{hash_map::Entry, HashMap, VecDeque},
16+
collections::{HashSet, VecDeque},
1717
fmt::Debug,
1818
ptr::NonNull,
1919
};
@@ -235,7 +235,6 @@ where
235235

236236
unsafe fn push(&mut self, mut ptr: NonNull<Self::Handle>) {
237237
let handle = ptr.as_mut();
238-
239238
debug_assert_eq!(handle.freq, 0);
240239
debug_assert_eq!(handle.queue, Queue::None);
241240

@@ -331,7 +330,7 @@ unsafe impl<T> Send for S3Fifo<T> where T: Send + Sync + 'static {}
331330
unsafe impl<T> Sync for S3Fifo<T> where T: Send + Sync + 'static {}
332331

333332
struct GhostQueue {
334-
counts: HashMap<u64, usize>,
333+
counts: HashSet<u64>,
335334
queue: VecDeque<(u64, usize)>,
336335
capacity: usize,
337336
weight: usize,
@@ -340,7 +339,7 @@ struct GhostQueue {
340339
impl GhostQueue {
341340
fn new(capacity: usize) -> Self {
342341
Self {
343-
counts: HashMap::default(),
342+
counts: HashSet::default(),
344343
queue: VecDeque::new(),
345344
capacity,
346345
weight: 0,
@@ -351,32 +350,23 @@ impl GhostQueue {
351350
if self.capacity == 0 {
352351
return;
353352
}
354-
355353
while self.weight + weight > self.capacity && self.weight > 0 {
356354
self.pop();
357355
}
358-
*self.counts.entry(hash).or_default() += 1;
359356
self.queue.push_back((hash, weight));
357+
self.counts.insert(hash);
358+
self.weight += weight;
360359
}
361360

362361
fn pop(&mut self) {
363362
if let Some((hash, weight)) = self.queue.pop_front() {
364363
self.weight -= weight;
365-
match self.counts.entry(hash) {
366-
Entry::Vacant(_) => unreachable!(),
367-
Entry::Occupied(mut o) => {
368-
let count = o.get_mut();
369-
*count -= 1;
370-
if *count == 0 {
371-
o.remove();
372-
}
373-
}
374-
}
364+
self.counts.remove(&hash);
375365
}
376366
}
377367

378368
fn contains(&self, hash: u64) -> bool {
379-
self.counts.contains_key(&hash)
369+
self.counts.contains(&hash)
380370
}
381371
}
382372

0 commit comments

Comments
 (0)