Skip to content

Commit 62db246

Browse files
committed
dynamic chunk id calculation
1 parent bde2dea commit 62db246

File tree

2 files changed

+153
-191
lines changed

2 files changed

+153
-191
lines changed

grovedb/src/lib.rs

Lines changed: 142 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -234,64 +234,21 @@ use crate::helpers::raw_decode;
234234
use crate::util::{root_merk_optional_tx, storage_context_optional_tx};
235235
use crate::Error::MerkError;
236236

237+
use std::rc::Rc;
238+
use std::cell::RefCell;
239+
237240
#[cfg(feature = "full")]
238241
type Hash = [u8; 32];
239242

240243
/// GroveDb
241244
pub struct GroveDb {
242245
#[cfg(feature = "full")]
243246
db: RocksDbStorage,
244-
}
245-
246-
pub struct s_db_snapshot {
247-
pub root_hash: CryptoHash,
248-
pub data: Vec<(String, Vec<Op>)>
249-
}
250-
251-
impl s_db_snapshot {
252-
pub fn new() -> s_db_snapshot {
253-
s_db_snapshot {
254-
root_hash: CryptoHash::default(),
255-
data: Vec::new(),
256-
}
257-
}
258-
}
259-
260-
impl fmt::Debug for s_db_snapshot {
261-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262-
write!(f, "root_hash:{:?}\n", hex::encode(self.root_hash));
263-
for (global_chunk_id, _) in self.data.iter() {
264-
write!(f, " global_chunk_id:{:?}\n", global_chunk_id);
265-
}
266-
Ok(())
267-
}
268-
}
269247

270-
pub struct s_db_snapshot_sorted {
271-
pub root_hash: CryptoHash,
272-
pub data: BTreeMap<String, Vec<(String, Vec<Op>)>>
273-
}
274-
275-
impl s_db_snapshot_sorted {
276-
pub fn new() -> s_db_snapshot_sorted {
277-
s_db_snapshot_sorted {
278-
root_hash: CryptoHash::default(),
279-
data: BTreeMap::new(),
280-
}
281-
}
282-
}
283-
284-
impl fmt::Debug for s_db_snapshot_sorted {
285-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286-
write!(f, "root_hash:{:?}\n", hex::encode(self.root_hash));
287-
for (prefix, chunk_vec) in self.data.iter() {
288-
write!(f, " prefix:{:?}\n", prefix);
289-
for (chunk_id, _) in chunk_vec.iter() {
290-
write!(f, " chunk_id:{:?}\n", chunk_id);
291-
}
292-
}
293-
Ok(())
294-
}
248+
version: i32,
249+
pending_chunks: BTreeMap<String, Vec<Op>>,
250+
//current_tx: Option<Transaction<'db>>,
251+
//restorer: Restorer<T>,
295252
}
296253

297254
pub struct s_subtrees_metadata {
@@ -330,8 +287,10 @@ pub type TransactionArg<'db, 'a> = Option<&'a Transaction<'db>>;
330287
impl GroveDb {
331288
/// Opens a given path
332289
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
290+
333291
let db = RocksDbStorage::default_rocksdb_with_path(path)?;
334-
Ok(GroveDb { db })
292+
let pending_chunks = BTreeMap::new();
293+
Ok(GroveDb { db, version: 1, pending_chunks })
335294
}
336295

337296
/// Uses raw iter to delete GroveDB key values pairs from rocksdb
@@ -1091,81 +1050,6 @@ impl GroveDb {
10911050
Ok(issues)
10921051
}
10931052

1094-
pub fn s_create_db_snapshot(
1095-
&self,
1096-
list_only_chunk_ids: bool,
1097-
) -> Result<s_db_snapshot, Error> {
1098-
let mut db_snapsot = s_db_snapshot::new();
1099-
1100-
db_snapsot.root_hash = self.root_hash(None).unwrap().unwrap();
1101-
1102-
let subtrees_root = self.find_subtrees(&SubtreePath::empty(), None).unwrap()?;
1103-
for subtree in subtrees_root.into_iter() {
1104-
let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect();
1105-
let path: &[&[u8]] = &subtree_path;
1106-
1107-
let continue_storage_batch = StorageBatch::new();
1108-
1109-
let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap();
1110-
let merk = self.open_batch_merk_at_path(&continue_storage_batch, path.into(), false).value.unwrap();
1111-
1112-
if (merk.is_empty_tree().unwrap()) {
1113-
continue;
1114-
}
1115-
1116-
let mut chunk_producer = ChunkProducer::new(&merk).unwrap();
1117-
1118-
let mut chunk_id_opt = Some("".to_string());
1119-
while let Some(chunk_id) = chunk_id_opt {
1120-
let (chunk, next_chunk_id) = chunk_producer.chunk(chunk_id.as_str()).unwrap();
1121-
1122-
let global_chunk_id = hex::encode(prefix) + &chunk_id;
1123-
if (list_only_chunk_ids) {
1124-
db_snapsot.data.push((global_chunk_id, vec![]));
1125-
}
1126-
else {
1127-
db_snapsot.data.push((global_chunk_id, chunk));
1128-
}
1129-
1130-
chunk_id_opt = next_chunk_id;
1131-
}
1132-
}
1133-
1134-
Ok(db_snapsot)
1135-
}
1136-
1137-
fn s_sort_db_snapshot(
1138-
&self,
1139-
snapshot: s_db_snapshot,
1140-
) -> Result<s_db_snapshot_sorted, Error> {
1141-
let mut db_snapsot_sorted = s_db_snapshot_sorted::new();
1142-
db_snapsot_sorted.root_hash = snapshot.root_hash;
1143-
1144-
let CHUNK_PREFIX_LENGTH: usize = 64;
1145-
1146-
for chunk_entry in snapshot.data {
1147-
let global_chunk_id = chunk_entry.0;
1148-
let chunk_data = chunk_entry.1;
1149-
1150-
if (global_chunk_id.len() < CHUNK_PREFIX_LENGTH) {
1151-
return Err(Error::CorruptedData(
1152-
"expected global chunk id of at least 64 length".to_string(),
1153-
));
1154-
}
1155-
1156-
let chunk_prefix = global_chunk_id.chars().take(CHUNK_PREFIX_LENGTH).collect::<String>();
1157-
let chunk_id = global_chunk_id.chars().skip(CHUNK_PREFIX_LENGTH).collect::<String>();
1158-
1159-
db_snapsot_sorted.data.entry(chunk_prefix).or_insert(Vec::new()).push((chunk_id, chunk_data.to_vec()));
1160-
}
1161-
1162-
for (_key, vec) in db_snapsot_sorted.data.iter_mut() {
1163-
vec.sort_by(|a, b| a.0.len().cmp(&b.0.len()));
1164-
}
1165-
1166-
Ok(db_snapsot_sorted)
1167-
}
1168-
11691053
fn s_get_subtrees_metadata<B: AsRef<[u8]>>(
11701054
&self,
11711055
path: &SubtreePath<B>,
@@ -1232,9 +1116,7 @@ impl GroveDb {
12321116
let merk = self.open_batch_merk_at_path(&continue_storage_batch, path.into(), false).value?;
12331117

12341118
if (merk.is_empty_tree().unwrap()) {
1235-
return Err(Error::CorruptedData(
1236-
"Empty merk".to_string(),
1237-
));
1119+
return Ok(vec![]);
12381120
}
12391121

12401122
let mut chunk_producer = ChunkProducer::new(&merk).unwrap();
@@ -1249,82 +1131,149 @@ impl GroveDb {
12491131
}
12501132
}
12511133

1252-
pub fn s_reconstruct_db(
1253-
&self,
1254-
snapshot: s_db_snapshot
1134+
pub fn s_sync_db_demo(
1135+
&mut self,
1136+
source_db: &GroveDb,
12551137
) -> Result<(), Error> {
1256-
let mut sorted_snapshot = self.s_sort_db_snapshot(snapshot)?;
12571138

1258-
//Always start by empty prefix = root
1259-
if let Some(chunk_vec) = sorted_snapshot.data.remove(&hex::encode(CryptoHash::default())) {
1139+
// Start always by root
1140+
let app_hash = source_db.root_hash(None).value.unwrap();
1141+
let root_global_chunk_id = hex::encode(vec![0u8; 32]);
1142+
1143+
let root_chunk = source_db.s_fetch_chunk(root_global_chunk_id.to_string())?;
1144+
let (root_chunk_prefix, _) = s_util_split_global_chunk_id(&root_global_chunk_id)?;
1145+
1146+
let mut pending_chunks :BTreeMap<String, Vec<Op>> = BTreeMap::new();
1147+
let mut processed_prefixes :BTreeSet<String> = BTreeSet::new();
1148+
{
12601149
let tx = self.start_transaction();
12611150
let merk = self.open_merk_for_replication(SubtreePath::empty(), &tx).unwrap();
1262-
let mut restorer = Restorer::new(merk, sorted_snapshot.root_hash, None);
1263-
for (chunk_id, chunk) in chunk_vec {
1264-
restorer.process_chunk(chunk_id, chunk).expect("should process chunk successfully");
1151+
let mut restorer = Restorer::new(merk, app_hash, None);
1152+
let next_chunk_ids = restorer.process_chunk("".to_string(), root_chunk).expect("should process chunk successfully");
1153+
for next_chunk_id in next_chunk_ids {
1154+
let next_global_chunk_id = hex::encode(root_chunk_prefix.to_string()) + &next_chunk_id;
1155+
pending_chunks.insert(next_global_chunk_id, vec![]);
12651156
}
1157+
1158+
while (!pending_chunks.is_empty()) {
1159+
for (global_chunk_id, chunk_data) in pending_chunks.iter_mut() {
1160+
match source_db.s_fetch_chunk(global_chunk_id.to_string()) {
1161+
Ok(chunk) => {
1162+
*chunk_data = chunk;
1163+
}
1164+
Err(e) => {
1165+
println!("Error while updating {}", e);
1166+
}
1167+
}
1168+
}
1169+
1170+
// Collect the keys to avoid borrowing issues during removal
1171+
let keys: Vec<String> = pending_chunks.keys().cloned().collect();
1172+
1173+
// Iterate over the collected keys and remove each entry from the map
1174+
for key in keys {
1175+
if let Some(chunk) = pending_chunks.remove(&key) {
1176+
let (_, chunk_id) = s_util_split_global_chunk_id(&key)?;
1177+
let next_chunk_ids = restorer.process_chunk(chunk_id, chunk).expect("should process chunk successfully");
1178+
for next_chunk_id in next_chunk_ids {
1179+
let next_global_chunk_id = hex::encode(root_chunk_prefix.to_string()) + &next_chunk_id;
1180+
pending_chunks.insert(next_global_chunk_id, vec![]);
1181+
}
1182+
}
1183+
}
1184+
}
1185+
12661186
restorer.finalize().expect("should finalize");
12671187
self.commit_transaction(tx);
1268-
} else {
1269-
return Err(Error::CorruptedData(
1270-
"No root prefix chunks found".to_string(),
1271-
));
12721188
}
1273-
1274-
let mut processed_prefixes :BTreeSet<String> = BTreeSet::new();
1275-
processed_prefixes.insert(hex::encode(CryptoHash::default()));
1276-
1277-
let mut queue_prefixes_to_be_processed : VecDeque<String> = VecDeque::new();
1189+
processed_prefixes.insert(root_chunk_prefix.to_string());
12781190

12791191
let mut subtrees_metadata = self.s_get_subtrees_metadata(&SubtreePath::empty()).unwrap();
1280-
for prefix in subtrees_metadata.data.keys() {
1192+
1193+
let mut current_subtree_opt :Option<(String, Vec<Vec<u8>>, CryptoHash, CryptoHash)> = None;
1194+
for (prefix, prefix_metadata) in &subtrees_metadata.data {
12811195
if !processed_prefixes.contains(prefix) {
1282-
//println!("prefix:{:?} pending for processing", prefix);
1283-
queue_prefixes_to_be_processed.push_back(prefix.to_string());
1196+
current_subtree_opt = Some((prefix.to_string(), prefix_metadata.0.to_vec(), prefix_metadata.1, prefix_metadata.2));
1197+
break;
12841198
}
12851199
}
12861200

1287-
while (!queue_prefixes_to_be_processed.is_empty()) {
1288-
while let Some(current_prefix) = queue_prefixes_to_be_processed.pop_front() {
1289-
let prefix_metadata = &subtrees_metadata.data[&current_prefix];
1290-
let s_path = &prefix_metadata.0;
1291-
let s_actual_value_hash = &prefix_metadata.1;
1292-
let s_elem_value_hash = &prefix_metadata.2;
1293-
println!(" about to process{:?} with ({:?}:{:?})", s_util_path_to_string(&s_path), hex::encode(s_actual_value_hash), hex::encode(s_elem_value_hash));
1201+
while current_subtree_opt.is_some() {
1202+
if let Some(current_subtree) = current_subtree_opt {
1203+
let current_prefix = &current_subtree.0;
1204+
let current_path = &current_subtree.1;
1205+
let s_actual_value_hash = &current_subtree.2;
1206+
let s_elem_value_hash = &current_subtree.3;
12941207

1295-
let subtree_path: Vec<&[u8]> = s_path.iter().map(|vec| vec.as_slice()).collect();
1208+
println!(" about to process prefix:{:?} {:?})", current_prefix, s_util_path_to_string(&current_path));
1209+
1210+
let subtree_path: Vec<&[u8]> = current_path.iter().map(|vec| vec.as_slice()).collect();
12961211
let path: &[&[u8]] = &subtree_path;
12971212

1298-
if let Some(chunk_vec) = sorted_snapshot.data.remove(&current_prefix) {
1299-
let tx = self.start_transaction();
1300-
if (chunk_vec.is_empty()) {println!("empty"); }
1301-
let merk = self.open_merk_for_replication(path.into(), &tx).unwrap();
1302-
let mut restorer = Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash));
1303-
for (chunk_id, chunk) in chunk_vec {
1304-
restorer.process_chunk(chunk_id, chunk).expect("should process chunk successfully");
1213+
let tx = self.start_transaction();
1214+
let merk = self.open_merk_for_replication(path.into(), &tx).unwrap();
1215+
let mut restorer = Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash));
1216+
1217+
let subtree_root_chunk = source_db.s_fetch_chunk(current_prefix.to_string())?;
1218+
if (!subtree_root_chunk.is_empty()) {
1219+
let next_chunk_ids = restorer.process_chunk("".to_string(), subtree_root_chunk).expect("should process chunk successfully");
1220+
for next_chunk_id in next_chunk_ids {
1221+
let next_global_chunk_id = current_prefix.to_string() + &next_chunk_id;
1222+
pending_chunks.insert(next_global_chunk_id, vec![]);
13051223
}
1224+
while (!pending_chunks.is_empty()) {
1225+
for (global_chunk_id, chunk_data) in pending_chunks.iter_mut() {
1226+
match source_db.s_fetch_chunk(global_chunk_id.to_string()) {
1227+
Ok(chunk) => {
1228+
*chunk_data = chunk;
1229+
}
1230+
Err(e) => {
1231+
println!("Error while updating {}", e);
1232+
}
1233+
}
1234+
}
1235+
1236+
// Collect the keys to avoid borrowing issues during removal
1237+
let keys: Vec<String> = pending_chunks.keys().cloned().collect();
1238+
1239+
// Iterate over the collected keys and remove each entry from the map
1240+
for key in keys {
1241+
if let Some(chunk) = pending_chunks.remove(&key) {
1242+
let (_, chunk_id) = s_util_split_global_chunk_id(&key)?;
1243+
let next_chunk_ids = restorer.process_chunk(chunk_id, chunk).expect("should process chunk successfully");
1244+
for next_chunk_id in next_chunk_ids {
1245+
let next_global_chunk_id = current_prefix.to_string() + &next_chunk_id;
1246+
pending_chunks.insert(next_global_chunk_id, vec![]);
1247+
}
1248+
}
1249+
}
1250+
}
1251+
13061252
restorer.finalize().expect("should finalize");
13071253
self.commit_transaction(tx);
1308-
} else {
1309-
println!(" skipping empty {:?}", s_util_path_to_string(&s_path));
13101254
}
1311-
1312-
processed_prefixes.insert(current_prefix);
1255+
else {
1256+
self.rollback_transaction(&tx);
1257+
println!(" subtree{:?} is empty", s_util_path_to_string(&current_path));
1258+
}
1259+
processed_prefixes.insert(current_prefix.to_string());
1260+
println!(" prefix:{:?} done", current_prefix);
13131261
}
13141262

1263+
current_subtree_opt = None;
13151264
subtrees_metadata = self.s_get_subtrees_metadata(&SubtreePath::empty()).unwrap();
1316-
for prefix in subtrees_metadata.data.keys() {
1265+
1266+
for (prefix, prefix_metadata) in &subtrees_metadata.data {
13171267
if !processed_prefixes.contains(prefix) {
1318-
queue_prefixes_to_be_processed.push_back(prefix.to_string());
1268+
current_subtree_opt = Some((prefix.to_string(), prefix_metadata.0.to_vec(), prefix_metadata.1, prefix_metadata.2));
1269+
break;
13191270
}
13201271
}
13211272
}
13221273

1323-
if (sorted_snapshot.data.len() > 0) {
1324-
return Err(Error::CorruptedData(
1325-
"Remaining chunks not processed".to_string(),
1326-
));
1327-
}
1274+
subtrees_metadata = self.s_get_subtrees_metadata(&SubtreePath::empty()).unwrap();
1275+
println!("now containing:{:?}", subtrees_metadata);
1276+
println!("processed prefixes:{:?}", processed_prefixes);
13281277

13291278
let incorrect_hashes = self.verify_grovedb(None)?;
13301279
if (incorrect_hashes.len() > 0) {
@@ -1347,3 +1296,20 @@ pub fn s_util_path_to_string(
13471296
}
13481297
subtree_path_str
13491298
}
1299+
1300+
pub fn s_util_split_global_chunk_id(
1301+
global_chunk_id: &String,
1302+
) -> Result<(String, String), Error> {
1303+
let CHUNK_PREFIX_LENGTH: usize = 64;
1304+
1305+
if (global_chunk_id.len() < CHUNK_PREFIX_LENGTH) {
1306+
return Err(Error::CorruptedData(
1307+
"expected global chunk id of at least 64 length".to_string(),
1308+
));
1309+
}
1310+
1311+
let chunk_prefix = global_chunk_id.chars().take(CHUNK_PREFIX_LENGTH).collect::<String>();
1312+
let chunk_id = global_chunk_id.chars().skip(CHUNK_PREFIX_LENGTH).collect::<String>();
1313+
1314+
Ok((chunk_prefix, chunk_id))
1315+
}

0 commit comments

Comments
 (0)