-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathrecover.rs
240 lines (220 loc) · 8.31 KB
/
recover.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright 2024 foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::HashMap,
fmt::Debug,
ops::Range,
sync::{atomic::Ordering, Arc},
};
use clap::ValueEnum;
use foyer_common::{
code::{HashBuilder, StorageKey, StorageValue},
metrics::Metrics,
};
use futures::future::try_join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use super::{
generic::GenericLargeStorageConfig,
indexer::{EntryAddress, Indexer},
};
use crate::{
device::RegionId,
error::{Error, Result},
large::{
indexer::HashedEntryAddress,
scanner::{EntryInfo, RegionScanner},
serde::{AtomicSequence, Sequence},
tombstone::Tombstone,
},
region::{Region, RegionManager},
runtime::Runtime,
};
/// The recover mode of the disk cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ValueEnum)]
pub enum RecoverMode {
/// Do not recover disk cache.
///
/// For updatable cache, either [`RecoverMode::None`] or the tombstone log must be used to prevent from phantom
/// entry when reopen.
None,
/// Recover disk cache and skip errors.
Quiet,
/// Recover disk cache and panic on errors.
Strict,
}
#[derive(Debug)]
pub struct RecoverRunner;
impl RecoverRunner {
#[expect(clippy::too_many_arguments)]
pub async fn run<K, V, S>(
config: &GenericLargeStorageConfig<K, V, S>,
regions: Range<RegionId>,
sequence: &AtomicSequence,
indexer: &Indexer,
region_manager: &RegionManager,
tombstones: &[Tombstone],
metrics: Arc<Metrics>,
runtime: Runtime,
) -> Result<()>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
// Recover regions concurrently.
let semaphore = Arc::new(Semaphore::new(config.recover_concurrency));
let mode = config.recover_mode;
let watermark = config.manifest.sequence_watermark().await;
let handles = regions.map(|id| {
let semaphore = semaphore.clone();
let region = region_manager.region(id).clone();
let metrics = metrics.clone();
runtime.user().spawn(async move {
let permit = semaphore.acquire().await;
let res = RegionRecoverRunner::run(mode, region, metrics).await;
drop(permit);
res
})
});
let total = try_join_all(handles).await.unwrap();
// Return error is there is.
let (total, errs): (Vec<_>, Vec<_>) = total.into_iter().partition(|res| res.is_ok());
if !errs.is_empty() {
let errs = errs.into_iter().map(|r| r.unwrap_err()).collect_vec();
return Err(Error::multiple(errs));
}
#[derive(Debug)]
enum EntryAddressOrTombstone {
EntryAddress(EntryAddress),
Tombstone,
}
// Dedup entries.
let mut latest_sequence = 0;
let mut indices: HashMap<u64, Vec<(Sequence, EntryAddressOrTombstone)>> = HashMap::new();
let mut clean_regions = vec![];
let mut evictable_regions = vec![];
for (region, infos) in total.into_iter().map(|r| r.unwrap()).enumerate() {
let region = region as RegionId;
if infos.is_empty() {
clean_regions.push(region);
} else {
evictable_regions.push(region);
}
for EntryInfo { hash, sequence, addr } in infos {
latest_sequence = latest_sequence.max(sequence);
indices
.entry(hash)
.or_default()
.push((sequence, EntryAddressOrTombstone::EntryAddress(addr)));
}
}
tombstones.iter().for_each(|tombstone| {
latest_sequence = latest_sequence.max(tombstone.sequence);
indices
.entry(tombstone.hash)
.or_default()
.push((tombstone.sequence, EntryAddressOrTombstone::Tombstone))
});
let indices = indices
.into_iter()
.filter_map(|(hash, mut versions)| {
versions.sort_by_key(|(sequence, _)| *sequence);
tracing::trace!("[recover runner]: hash {hash} has versions: {versions:?}");
match versions.pop() {
None => None,
Some((sequence, _)) if sequence < watermark => None,
Some((_, EntryAddressOrTombstone::Tombstone)) => None,
Some((_, EntryAddressOrTombstone::EntryAddress(address))) => {
Some(HashedEntryAddress { hash, address })
}
}
})
.collect_vec();
// let indices = indices.into_iter().map(|(hash, (_, addr))| (hash, addr)).collect_vec();
let permits = config.clean_region_threshold.saturating_sub(clean_regions.len());
let countdown = clean_regions.len().saturating_sub(config.clean_region_threshold);
// Log recovery.
tracing::info!(
"Recovers {e} regions with data, {c} clean regions, {t} total entries with max sequence as {s}, initial reclaim permits is {p}.",
e = evictable_regions.len(),
c = clean_regions.len(),
t = indices.len(),
s = latest_sequence,
p = permits,
);
// Update components.
let seq = latest_sequence + 1;
indexer.insert_batch(indices);
sequence.store(seq, Ordering::Release);
for region in clean_regions {
region_manager.mark_clean(region).await;
}
for region in evictable_regions {
region_manager.mark_evictable(region);
}
region_manager.reclaim_semaphore().add_permits(permits);
region_manager.reclaim_semaphore_countdown().reset(countdown);
if watermark > seq {
// Update the manifest sequence watermark with the smallest possible value.
config.manifest.update_sequence_watermark(seq).await?;
}
// Note: About reclaim semaphore permits and countdown:
//
// ```
// permits = clean region threshold - clean region - reclaiming region
// ```
//
// When recovery, `reclaiming region` is always `0`.
//
// If `clean region threshold >= clean region`, permits is simply `clean region threshold - clean region`.
//
// If `clean region threshold < clean region`, for permits must be NON-NEGATIVE, we can temporarily set permits
// to `0`, and skip first `clean region - clean region threshold` permits increments. It is implemented by
// `Countdown`.
Ok(())
}
}
#[derive(Debug)]
struct RegionRecoverRunner;
impl RegionRecoverRunner {
async fn run(mode: RecoverMode, region: Region, metrics: Arc<Metrics>) -> Result<Vec<EntryInfo>> {
if mode == RecoverMode::None {
return Ok(vec![]);
}
let mut infos = vec![];
let id = region.id();
let mut iter = RegionScanner::new(region, metrics);
loop {
let r = iter.next().await;
match r {
Err(e) => {
if mode == RecoverMode::Strict {
return Err(e);
} else {
tracing::warn!("error raised when recovering region {id}, skip further recovery for {id}.");
break;
}
}
Ok(Some(info)) if info.sequence < infos.last().map(|last: &EntryInfo| last.sequence).unwrap_or(0) => {
break
}
Ok(Some(info)) => infos.push(info),
Ok(None) => break,
}
}
Ok(infos)
}
}