Skip to content

Commit

Permalink
add storage upgrade from v3 to v4
Browse files Browse the repository at this point in the history
  • Loading branch information
simonjiao committed Dec 7, 2023
1 parent ad21c67 commit af62216
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 41 deletions.
9 changes: 4 additions & 5 deletions flexidag/src/flexidag_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,7 @@ impl ServiceHandler<Self, DumpTipsToAccumulator> for FlexidagService {
k_total_difficulties: [msg.k_total_difficulty].into_iter().collect(),
});
// broadcast the tip
ctx.broadcast(NewTips {
tips: new_tips,
});
ctx.broadcast(NewTips { tips: new_tips });
self.storage = storage.clone();
Ok(())
}
Expand Down Expand Up @@ -447,7 +445,8 @@ impl ServiceHandler<Self, UpdateDagTips> for FlexidagService {
None => {
let storage = ctx.get_shared::<Arc<Storage>>()?;
let config = ctx.get_shared::<Arc<NodeConfig>>()?;
if header.number() == BlockDAG::dag_fork_height_with_net(config.net().id().clone()) {
if header.number() == BlockDAG::dag_fork_height_with_net(config.net().id().clone())
{
let (dag, dag_accumulator) =
BlockDAG::try_init_with_storage(storage.clone(), config)?;
if dag.is_none() {
Expand All @@ -470,7 +469,7 @@ impl ServiceHandler<Self, UpdateDagTips> for FlexidagService {
storage
.get_startup_info()?
.map(|mut startup_info| {
startup_info.dag_main = Some(header.id());
startup_info.update_dag_main(header.id());
storage.save_startup_info(startup_info)
})
.expect("starup info should not be none")
Expand Down
1 change: 1 addition & 0 deletions genesis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Genesis {
self.block.clone(),
net.id().clone(),
)?;
// todo: check if it's dag genesis, and update StartupInfo.dag_main
let startup_info = StartupInfo::new(genesis_chain.current_header().id());
storage.save_startup_info(startup_info)?;
storage
Expand Down
254 changes: 243 additions & 11 deletions storage/src/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0
use crate::define_storage;
use crate::storage::{CodecKVStore, StorageInstance, ValueCodec};
use crate::storage::{CodecKVStore, CodecWriteBatch, StorageInstance, ValueCodec};
use crate::{
BLOCK_BODY_PREFIX_NAME, BLOCK_HEADER_PREFIX_NAME, BLOCK_PREFIX_NAME,
BLOCK_TRANSACTIONS_PREFIX_NAME, BLOCK_TRANSACTION_INFOS_PREFIX_NAME, FAILED_BLOCK_PREFIX_NAME,
BLOCK_BODY_PREFIX_NAME, BLOCK_HEADER_PREFIX_NAME, BLOCK_HEADER_PREFIX_NAME_V2,
BLOCK_PREFIX_NAME, BLOCK_PREFIX_NAME_V2, BLOCK_TRANSACTIONS_PREFIX_NAME,
BLOCK_TRANSACTION_INFOS_PREFIX_NAME, FAILED_BLOCK_PREFIX_NAME, FAILED_BLOCK_PREFIX_NAME_V2,
};
use anyhow::{bail, Result};
use bcs_ext::{BCSCodec, Sample};
use network_p2p_types::peer_id::PeerId;
use serde::{Deserialize, Serialize};
use starcoin_crypto::HashValue;
use starcoin_logger::prelude::*;
use starcoin_types::block::{Block, BlockBody, BlockHeader};
use starcoin_types::block::{Block, BlockBody, BlockHeader, OldBlock, OldBlockHeader};

#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct OldFailedBlock {
Expand Down Expand Up @@ -46,6 +47,26 @@ pub struct FailedBlock {
version: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename(deserialize = "FailedBlock"))]
pub struct OldFailedBlockV2 {
block: OldBlock,
peer_id: Option<PeerId>,
failed: String,
version: String,
}

impl From<OldFailedBlockV2> for FailedBlock {
fn from(value: OldFailedBlockV2) -> Self {
Self {
block: value.block.into(),
peer_id: value.peer_id,
failed: value.failed,
version: value.version,
}
}
}

#[allow(clippy::from_over_into)]
impl Into<(Block, Option<PeerId>, String, String)> for FailedBlock {
fn into(self) -> (Block, Option<PeerId>, String, String) {
Expand Down Expand Up @@ -75,11 +96,18 @@ impl Sample for FailedBlock {
}
}

define_storage!(BlockInnerStorage, HashValue, Block, BLOCK_PREFIX_NAME);
define_storage!(BlockInnerStorage, HashValue, Block, BLOCK_PREFIX_NAME_V2);
define_storage!(
BlockHeaderStorage,
HashValue,
BlockHeader,
BLOCK_HEADER_PREFIX_NAME_V2
);
define_storage!(OldBlockInnerStorage, HashValue, OldBlock, BLOCK_PREFIX_NAME);
define_storage!(
OldBlockHeaderStorage,
HashValue,
OldBlockHeader,
BLOCK_HEADER_PREFIX_NAME
);

Expand All @@ -102,10 +130,18 @@ define_storage!(
Vec<HashValue>,
BLOCK_TRANSACTION_INFOS_PREFIX_NAME
);

define_storage!(
FailedBlockStorage,
HashValue,
FailedBlock,
FAILED_BLOCK_PREFIX_NAME_V2
);

define_storage!(
OldFailedBlockStorage,
HashValue,
OldFailedBlockV2,
FAILED_BLOCK_PREFIX_NAME
);

Expand Down Expand Up @@ -139,6 +175,26 @@ impl ValueCodec for BlockHeader {
}
}

impl ValueCodec for OldBlock {
fn encode_value(&self) -> Result<Vec<u8>> {
self.encode()
}

fn decode_value(data: &[u8]) -> Result<Self> {
Self::decode(data)
}
}

impl ValueCodec for OldBlockHeader {
fn encode_value(&self) -> Result<Vec<u8>> {
self.encode()
}

fn decode_value(data: &[u8]) -> Result<Self> {
Self::decode(data)
}
}

impl ValueCodec for Vec<BlockHeader> {
fn encode_value(&self) -> Result<Vec<u8>> {
self.encode()
Expand Down Expand Up @@ -178,6 +234,16 @@ impl ValueCodec for FailedBlock {
}
}

impl ValueCodec for OldFailedBlockV2 {
fn encode_value(&self) -> Result<Vec<u8>> {
self.encode()
}

fn decode_value(data: &[u8]) -> Result<Self> {
Self::decode(data)
}
}

impl BlockStorage {
pub fn new(instance: StorageInstance) -> Self {
BlockStorage {
Expand Down Expand Up @@ -220,12 +286,7 @@ impl BlockStorage {
}

pub fn get_blocks(&self, ids: Vec<HashValue>) -> Result<Vec<Option<Block>>> {
Ok(self
.block_store
.multiple_get(ids)?
.into_iter()
.map(|cb| cb.map(|cb| cb.into()))
.collect())
Ok(self.block_store.multiple_get(ids)?.into_iter().collect())
}

pub fn get_body(&self, block_id: HashValue) -> Result<Option<BlockBody>> {
Expand Down Expand Up @@ -331,4 +392,175 @@ impl BlockStorage {
self.failed_block_storage
.put_raw(block_id, old_block.encode_value()?)
}

fn upgrade_header_store(
old_header_store: OldBlockHeaderStorage,
header_store: BlockHeaderStorage,
batch_size: usize,
) -> Result<usize> {
let mut total_size: usize = 0;
let mut old_header_iter = old_header_store.iter()?;
old_header_iter.seek_to_first();
let mut to_deleted = Some(CodecWriteBatch::<HashValue, OldBlockHeader>::new());
let mut to_put = Some(CodecWriteBatch::<HashValue, BlockHeader>::new());
let mut item_count = 0usize;
for item in old_header_iter {
let (id, old_header) = item?;
let header: BlockHeader = old_header.into();
to_deleted
.as_mut()
.unwrap()
.delete(id)
.expect("should never fail");
to_put
.as_mut()
.unwrap()
.put(id, header)
.expect("should never fail");
item_count += 1;
if item_count == batch_size {
total_size = total_size.saturating_add(item_count);
item_count = 0;
old_header_store.write_batch(to_deleted.take().unwrap())?;
header_store.write_batch(to_put.take().unwrap())?;
to_deleted = Some(CodecWriteBatch::<HashValue, OldBlockHeader>::new());
to_put = Some(CodecWriteBatch::<HashValue, BlockHeader>::new());
}
}
if item_count != 0 {
total_size = total_size.saturating_add(item_count);
old_header_store.write_batch(to_deleted.take().unwrap())?;
header_store.write_batch(to_put.take().unwrap())?;
}

Ok(total_size)
}

fn upgrade_block_store(
old_block_store: OldBlockInnerStorage,
block_store: BlockInnerStorage,
batch_size: usize,
) -> Result<usize> {
let mut total_size: usize = 0;
let mut old_block_iter = old_block_store.iter()?;
old_block_iter.seek_to_first();

let mut to_delete = Some(CodecWriteBatch::new());
let mut to_put = Some(CodecWriteBatch::new());
let mut item_count = 0;

for item in old_block_iter {
let (id, old_block) = item?;
let block: Block = old_block.into();
to_delete
.as_mut()
.unwrap()
.delete(id)
.expect("should never fail");
to_put
.as_mut()
.unwrap()
.put(id, block)
.expect("should never fail");

item_count += 1;
if item_count == batch_size {
total_size = total_size.saturating_add(item_count);
item_count = 0;
old_block_store
.write_batch(to_delete.take().unwrap())
.expect("should never fail");
block_store
.write_batch(to_put.take().unwrap())
.expect("should never fail");
}
}
if item_count != 0 {
total_size = total_size.saturating_add(item_count);
old_block_store
.write_batch(to_delete.take().unwrap())
.expect("should never fail");
block_store
.write_batch(to_put.take().unwrap())
.expect("should never fail");
}

Ok(total_size)
}

fn upgrade_failed_block_store(
old_failed_block_store: OldFailedBlockStorage,
failed_block_store: FailedBlockStorage,
batch_size: usize,
) -> Result<usize> {
let mut total_size: usize = 0;
let mut old_failed_block_iter = old_failed_block_store.iter()?;
old_failed_block_iter.seek_to_first();

let mut to_delete = Some(CodecWriteBatch::new());
let mut to_put = Some(CodecWriteBatch::new());
let mut item_count = 0;

for item in old_failed_block_iter {
let (id, old_block) = item?;
let block: FailedBlock = old_block.into();
to_delete
.as_mut()
.unwrap()
.delete(id)
.expect("should never fail");
to_put
.as_mut()
.unwrap()
.put(id, block)
.expect("should never fail");

item_count += 1;
if item_count == batch_size {
total_size = total_size.saturating_add(item_count);
item_count = 0;
old_failed_block_store
.write_batch(to_delete.take().unwrap())
.expect("should never fail");
failed_block_store
.write_batch(to_put.take().unwrap())
.expect("should never fail");
}
}
if item_count != 0 {
total_size = total_size.saturating_add(item_count);
old_failed_block_store
.write_batch(to_delete.take().unwrap())
.expect("should never fail");
failed_block_store
.write_batch(to_put.take().unwrap())
.expect("should never fail");
}

Ok(total_size)
}

pub fn upgrade_block_header(instance: StorageInstance) -> Result<()> {
const BATCH_SIZE: usize = 1000usize;
let old_header_store = OldBlockHeaderStorage::new(instance.clone());
let header_store = BlockHeaderStorage::new(instance.clone());

let _total_size = Self::upgrade_header_store(old_header_store, header_store, BATCH_SIZE)?;

let old_block_store = OldBlockInnerStorage::new(instance.clone());
let block_store = BlockInnerStorage::new(instance.clone());

let _total_blocks = Self::upgrade_block_store(old_block_store, block_store, BATCH_SIZE)?;

let old_failed_block_store = OldFailedBlockStorage::new(instance.clone());
let failed_block_store = FailedBlockStorage::new(instance);

let _total_failed_blocks = Self::upgrade_failed_block_store(
old_failed_block_store,
failed_block_store,
BATCH_SIZE,
)?;

Ok(())
}
}
Loading

0 comments on commit af62216

Please sign in to comment.