From bf5d443322a739eb929475a9df35f0ee70129c2b Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sat, 1 Apr 2023 19:07:16 +0900 Subject: [PATCH 01/24] Make test setup also return finalization info --- test-suite/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test-suite/src/lib.rs b/test-suite/src/lib.rs index 75740b7c..d5fef15a 100644 --- a/test-suite/src/lib.rs +++ b/test-suite/src/lib.rs @@ -101,14 +101,15 @@ pub async fn setup_server_client_nodes( (ServerNetworkConfig, PrivateKey), Vec<(ClientNetworkConfig, PrivateKey)>, Vec, + FinalizationInfo, ) { + let (fi, keys) = simperby_core::test_utils::generate_fi(client_n); let (_, server_private_key) = generate_keypair_random(); let server = ServerNetworkConfig { port: dispense_port(), }; let mut clients = Vec::new(); - for _ in 0..client_n { - let (_, private_key) = generate_keypair_random(); + for (_, private_key) in keys { let network_config = ClientNetworkConfig { peers: vec![Peer { public_key: server_private_key.public_key(), @@ -128,7 +129,7 @@ pub async fn setup_server_client_nodes( .map(|(_, private_key)| private_key.public_key()) .collect::>(); pubkeys.push(server_private_key.public_key()); - ((server, server_private_key), clients, pubkeys) + ((server, server_private_key), clients, pubkeys, fi) } pub async fn sleep_ms(ms: u64) { From 9f25bb4ad5f694178398cbff38016520c287dd4a Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 15:50:16 +0900 Subject: [PATCH 02/24] Update Cargo.lock --- Cargo.lock | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 2fb94976..47beccbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1655,6 +1655,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" + [[package]] name = "serde" version = "1.0.158" @@ -1764,6 +1770,28 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "simperby" +version = "0.0.0" +dependencies = [ + "async-trait", + "chrono", + "eyre", + "futures", + "log", + "rand 0.8.5", + "semver", + "serde", + "simperby-consensus", + "simperby-core", + "simperby-governance", + "simperby-network", + "simperby-repository", + "simperby-test-suite", + "thiserror", + "tokio", +] + [[package]] name = "simperby-consensus" version = "0.1.0" From d7ea2f1f8f94a54c01345ca61892bcd69914979b Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 15:51:06 +0900 Subject: [PATCH 03/24] Remove node --- node/Cargo.toml | 25 --- node/src/lib.rs | 144 ------------ node/src/node.rs | 381 -------------------------------- node/tests/integration_tests.rs | 169 -------------- 4 files changed, 719 deletions(-) delete mode 100644 node/Cargo.toml delete mode 100644 node/src/lib.rs delete mode 100644 node/src/node.rs delete mode 100644 node/tests/integration_tests.rs diff --git a/node/Cargo.toml b/node/Cargo.toml deleted file mode 100644 index 7d26a1db..00000000 --- a/node/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "simperby-node" -version = "0.0.0" -authors = ["PDAO Team "] -edition = "2021" - -[dependencies] -eyre = "0.6.8" -async-trait = "0.1.42" -serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.0", features = ["full"] } -chrono = { version = "0.4", features = ["serde"] } -futures = "0.3" -log = "0.4" -simperby-core = { version = "0.0.0", path = "../core" } -simperby-network = { version = "0.0.0", path = "../network" } -simperby-governance = { version = "0.0.0", path = "../governance" } -simperby-consensus = { version = "0.0.0", path = "../consensus" } -simperby-repository = { version = "0.0.0", path = "../repository" } -thiserror = "1.0.32" -semver = "1.0.0" - -[dev-dependencies] -rand = "0.8.5" -simperby-test-suite = { path = "../test-suite" } diff --git a/node/src/lib.rs b/node/src/lib.rs deleted file mode 100644 index 47fb789a..00000000 --- a/node/src/lib.rs +++ /dev/null @@ -1,144 +0,0 @@ -//! A Simperby node. -//! -//! The following CLI commands are provided by `SimperbyNode` as they are based on the node state. -//! -//! - `sync` -//! - `clean` -//! - `create` -//! - `vote` -//! - `veto` -//! - `consensus` -//! - `git` -//! - `show` -//! - `network` -//! - `update` -//! - `broadcast` -//! - `chat` -//! -//! The following CLI commands are provided as global functions as they are node-stateless. -//! -//! - `genesis` -//! -//! The following CLI commands are provided as global functions as they are about the node creation. -//! -//! - `init` -//! - `clone` -//! - `serve` -//! -//! The following CLI commands are not provided here because they are simple -//! and so directly implemented in the CLI. -//! -//! - `sign` -pub mod node; - -pub use simperby_core; -pub use simperby_network; -pub use simperby_repository; - -use eyre::Result; -use serde::{Deserialize, Serialize}; -use simperby_core::crypto::*; -use simperby_core::*; -use simperby_governance::Governance; -use simperby_network::Peer; -use simperby_repository::raw::{RawRepository, SemanticCommit}; -use simperby_repository::CommitHash; -use simperby_repository::DistributedRepository; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Config { - pub chain_name: String, - - pub public_key: PublicKey, - pub private_key: PrivateKey, - - pub broadcast_interval_ms: Option, - pub fetch_interval_ms: Option, - - /// Public repos (usually mirrors) for the read-only accesses - /// - /// They're added as a remote repo, named `public_#`. - pub public_repo_url: Vec, - - pub governance_port: u16, - pub consensus_port: u16, - pub repository_port: u16, - - /// TODO: remove this and introduce a proper peer discovery protocol - pub peers: Vec, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ConsensusStatus { - // TODO -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct NetworkStatus { - // TODO -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum CommitInfo { - Block { - semantic_commit: SemanticCommit, - block_header: BlockHeader, - // TODO: block-specific consensus status - }, - Agenda { - semantic_commit: SemanticCommit, - agenda: Agenda, - voters: Vec<(MemberName, Timestamp)>, - }, - AgendaProof { - semantic_commit: SemanticCommit, - agenda_proof: AgendaProof, - }, - Transaction { - semantic_commit: SemanticCommit, - transaction: Transaction, - }, - PreGenesisCommit { - title: String, - }, - Unknown { - semantic_commit: SemanticCommit, - msg: String, - }, // TODO -} - -pub type SimperbyNode = node::Node; - -/// Creates a genesis commit. -pub async fn genesis(config: Config, path: &str) -> Result<()> { - let raw_repository = RawRepository::open(&format!("{path}/repository/repo")).await?; - let mut repository = DistributedRepository::new( - raw_repository, - simperby_repository::Config { - mirrors: config.public_repo_url.clone(), - long_range_attack_distance: 3, - }, - None, - ) - .await?; - repository.genesis().await?; - Ok(()) -} - -/// Initializes a node. -pub async fn initialize(config: Config, path: &str) -> Result { - SimperbyNode::initialize(config, path).await -} - -/// Clones a remote repository and initializes a node. -pub async fn clone(config: Config, path: &str, url: &str) -> Result { - RawRepository::clone(&format!("{path}/repository/repo"), url) - .await - .unwrap(); - SimperbyNode::initialize(config, path).await -} - -/// Runs a server node indefinitely. -pub async fn serve(_config: Config, _path: &str) -> Result<()> { - todo!() -} diff --git a/node/src/node.rs b/node/src/node.rs deleted file mode 100644 index 7610d02b..00000000 --- a/node/src/node.rs +++ /dev/null @@ -1,381 +0,0 @@ -use super::*; -use eyre::eyre; -use simperby_core::utils::get_timestamp; -use simperby_consensus::{Consensus, ConsensusParameters, ProgressResult}; -use simperby_network::primitives::Storage; -use simperby_network::{dms::Config as DmsConfig, storage::StorageImpl, Dms}; -use simperby_network::{ClientNetworkConfig, ServerNetworkConfig}; -use simperby_repository::raw::RawRepository; -use simperby_repository::{DistributedRepository, WORK_BRANCH_NAME}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; - -pub struct Node { - config: Config, - repository: DistributedRepository, - governance: Governance, - consensus: Consensus, - - last_reserved_state: ReservedState, - #[allow(dead_code)] - last_finalized_header: BlockHeader, - _path: String, - - _client_network_config: ClientNetworkConfig, - _server_network_config: ServerNetworkConfig, -} - -impl SimperbyNode { - pub async fn initialize(config: Config, path: &str) -> Result { - // Step 0: initialize the repository module - let raw_repository = RawRepository::open(&format!("{path}/repository/repo")).await?; - let repository = DistributedRepository::new( - raw_repository, - simperby_repository::Config { - mirrors: config.public_repo_url.clone(), - long_range_attack_distance: 3, - }, - Some(config.private_key.clone()), - ) - .await?; - - // Step 1: initialize configs - let last_finalized_header = repository.get_last_finalized_block_header().await?; - let reserved_state = repository.get_reserved_state().await?; - let governance_dms_key = simperby_governance::generate_dms_key(&last_finalized_header); - let consensus_dms_key = simperby_consensus::generate_dms_key(&last_finalized_header); - - let server_network_config = ServerNetworkConfig { - network_id: reserved_state.genesis_info.chain_name.clone(), - ports: vec![ - ( - format!("dms-{}", governance_dms_key.clone()), - config.governance_port, - ), - ( - format!("dms-{}", consensus_dms_key.clone()), - config.consensus_port, - ), - ("repository".to_owned(), config.repository_port), - ] - .into_iter() - .collect(), - members: reserved_state - .members - .iter() - .map(|m| m.public_key.clone()) - .collect(), - public_key: config.public_key.clone(), - private_key: config.private_key.clone(), - }; - - let client_network_config = ClientNetworkConfig { - network_id: server_network_config.network_id.clone(), - members: server_network_config.members.clone(), - public_key: server_network_config.public_key.clone(), - private_key: server_network_config.private_key.clone(), - peers: config.peers.clone(), - }; - - let dms_peers = reserved_state - .get_governance_set() - .map_err(|e| eyre!("{e}"))? - .into_iter() - .map(|(public_key, _)| public_key) - .collect::>(); - - // Step 2: initialize the governance module - let dms_path = format!("{path}/governance/dms"); - StorageImpl::create(&dms_path).await.unwrap(); - let storage = StorageImpl::open(&dms_path).await.unwrap(); - let dms = Dms::new( - storage, - DmsConfig { - dms_key: governance_dms_key, - peers: dms_peers.clone(), - }, - config.private_key.clone(), - ) - .await?; - let governance = - Governance::new(Arc::new(RwLock::new(dms)), Some(config.private_key.clone())).await?; - - // Step 3: initialize the consensus module - let dms_path = format!("{path}/consensus/dms"); - StorageImpl::create(&dms_path).await.unwrap(); - let storage = StorageImpl::open(&dms_path).await.unwrap(); - let dms = Dms::new( - storage, - DmsConfig { - dms_key: consensus_dms_key, - peers: dms_peers.clone(), - }, - config.private_key.clone(), - ) - .await?; - let state_path = format!("{path}/consensus/state"); - StorageImpl::create(&state_path).await.unwrap(); - let consensus_state_storage = StorageImpl::open(&state_path).await.unwrap(); - let consensus = Consensus::new( - Arc::new(RwLock::new(dms)), - consensus_state_storage, - last_finalized_header.clone(), - // TODO: replace params and timestamp with proper values - ConsensusParameters { - timeout_ms: 10000000, - repeat_round_for_first_leader: 100, - }, - 0, - Some(config.private_key.clone()), - ) - .await?; - Ok(Self { - config, - repository, - governance, - consensus, - last_reserved_state: reserved_state, - last_finalized_header, - _path: path.to_owned(), - _client_network_config: client_network_config, - _server_network_config: server_network_config, - }) - } - - pub fn get_raw_repo(&self) -> &RawRepository { - self.repository.get_raw() - } - - pub fn get_raw_repo_mut(&mut self) -> &mut RawRepository { - self.repository.get_raw_mut() - } - - /// Synchronizes the `finalized` branch to the last block of the `work` branch. - pub async fn sync(&mut self, last_finalization_proof: LastFinalizationProof) -> Result<()> { - let work_branch_tip = self - .repository - .get_raw() - .locate_branch(WORK_BRANCH_NAME.into()) - .await?; - let work_branch_tip_commit = self.repository.read_commit(work_branch_tip).await?; - if let Commit::Block(_) = work_branch_tip_commit { - self.repository - .sync( - &work_branch_tip_commit.to_hash256(), - &last_finalization_proof.proof, - ) - .await - } else { - Err(eyre!( - "last commit of the work branch is not a block commit" - )) - } - } - - /// Cleans the repository, removing all the outdated commits. - pub async fn clean(&mut self, hard: bool) -> Result<()> { - self.repository.clean(hard).await - } - - /// Creates a block commit on the `work` branch. - pub async fn create_block(&mut self) -> Result { - let (header, commit_hash) = self - .repository - .create_block(self.config.public_key.clone()) - .await?; - // automatically set as my proposal - self.consensus - .register_verified_block_hash(header.to_hash256()) - .await?; - self.consensus - .set_proposal_candidate(header.to_hash256(), get_timestamp()) - .await?; - Ok(commit_hash) - } - - /// Creates an agenda commit on the `work` branch. - pub async fn create_agenda(&mut self) -> Result { - let rs = self.repository.get_reserved_state().await?; - let (_, commit_hash) = self - .repository - .create_agenda( - rs.query_name(&self.config.public_key) - .expect("already checked in initialization"), - ) - .await?; - Ok(commit_hash) - } - - /// Creates an extra-agenda transaction on the `work` branch. - pub async fn create_extra_agenda_transaction( - &mut self, - tx: ExtraAgendaTransaction, - ) -> Result<()> { - self.repository.create_extra_agenda_transaction(&tx).await?; - Ok(()) - } - - /// Votes on the agenda corresponding to the given `agenda_commit` and propagates the result. - pub async fn vote(&mut self, agenda_commit: CommitHash) -> Result<()> { - let valid_agendas = self.repository.get_agendas().await?; - let agenda_hash = if let Some(x) = valid_agendas.iter().find(|(x, _)| *x == agenda_commit) { - x.1 - } else { - return Err(eyre!( - "the given commit hash {} is not one of the valid agendas", - agenda_commit - )); - }; - self.repository.vote(agenda_commit).await?; - self.governance.vote(agenda_hash).await?; - Ok(()) - } - - /// Vetoes the current round. - pub async fn veto_round(&mut self) -> Result<()> { - unimplemented!() - } - - /// Vetoes the given block. - pub async fn veto_block(&mut self, _block_commit: CommitHash) -> Result<()> { - unimplemented!() - } - - /// Shows information about the given commit. - pub async fn show(&self, commit_hash: CommitHash) -> Result { - let semantic_commit = self - .repository - .get_raw() - .read_semantic_commit(commit_hash) - .await?; - let commit = simperby_repository::format::from_semantic_commit(semantic_commit.clone())?; - let result = match commit { - Commit::Block(block_header) => CommitInfo::Block { - semantic_commit, - block_header, - }, - Commit::Agenda(agenda) => CommitInfo::Agenda { - semantic_commit, - agenda: agenda.clone(), - voters: self - .governance - .read() - .await? - .votes - .get(&agenda.to_hash256()) - .unwrap_or(&Default::default()) - .iter() - .filter_map(|(public_key, _)| { - self.last_reserved_state - .query_name(public_key) - .map(|x| (x, 0)) - }) - .collect(), // TODO - }, - Commit::AgendaProof(agenda_proof) => CommitInfo::AgendaProof { - semantic_commit, - agenda_proof, - }, - x => CommitInfo::Unknown { - semantic_commit, - msg: format!("{x:?}"), - }, - }; - Ok(result) - } - - /// Makes a progress for the consensus, returning the result. - /// - /// TODO: it has to consume the object if finalized. - pub async fn progress_for_consensus(&mut self) -> Result { - let result = self.consensus.progress(get_timestamp()).await?; - for result in result.iter() { - if let ProgressResult::Finalized(hash, _, proof) = result { - self.repository.sync(hash, proof).await?; - } - } - Ok(format!("{result:?}")) - } - - /// Gets the current status of the consensus. - pub async fn get_consensus_status(&self) -> Result { - todo!() - } - - /// Gets the current status of the p2p network. - pub async fn get_network_status(&self) -> Result { - unimplemented!() - } - - pub async fn fetch(&mut self) -> Result<()> { - // TODO: perform the actual network operations - - // Update governance - let governance_set = self - .last_reserved_state - .get_governance_set() - .unwrap() - .into_iter() - .collect::>(); - let governance_state = self.governance.read().await?; - let votes: Vec<(Hash256, VotingPower)> = governance_state - .votes - .iter() - .map(|(agenda, votes)| { - ( - *agenda, - votes - .keys() - .map(|voter| governance_set.get(voter).unwrap()) - .sum(), - ) - }) - .collect(); - let total_voting_power = governance_set.values().sum::(); - for (agenda, voted_power) in votes { - if voted_power * 2 > total_voting_power { - // TODO: handle this error - let _ = self - .repository - .approve( - &agenda, - governance_state.votes[&agenda] - .iter() - .map(|(k, s)| TypedSignature::new(s.clone(), k.clone())) - .collect(), - get_timestamp(), - ) - .await; - } - } - - // Update consensus - for (_, block_hash) in self.repository.get_blocks().await? { - self.consensus - .register_verified_block_hash(block_hash) - .await?; - } - Ok(()) - } - - /// Broadcasts all the local messages and reports the result. - pub async fn broadcast(&mut self) -> Result> { - // TODO - Ok(vec![]) - } - - pub async fn check_push( - &mut self, - _commit_hash: CommitHash, - _branch_name: String, - _timestamp: Timestamp, - _signature: Signature, - ) -> Result { - todo!() - } - - pub async fn notify_push(&mut self, _commit_hash: CommitHash) -> Result<()> { - todo!() - } -} diff --git a/node/tests/integration_tests.rs b/node/tests/integration_tests.rs deleted file mode 100644 index 81a8b701..00000000 --- a/node/tests/integration_tests.rs +++ /dev/null @@ -1,169 +0,0 @@ -#![cfg(never)] - -use simperby_core::*; -use simperby_network::Peer; -use simperby_node::{genesis, *}; -use simperby_repository::raw::RawRepository; -use simperby_test_suite::*; -use tokio::io::AsyncWriteExt; - -fn generate_config(key: PrivateKey, chain_name: String) -> Config { - Config { - chain_name, - public_key: key.public_key(), - private_key: key, - broadcast_interval_ms: None, - fetch_interval_ms: None, - public_repo_url: vec![], - governance_port: dispense_port(), - consensus_port: dispense_port(), - repository_port: dispense_port(), - } -} - -async fn setup_peer(path: &str, peers: &[Peer]) { - let mut file = tokio::fs::File::create(format!("{path}/peers.json")) - .await - .unwrap(); - file.write_all(serde_spb::to_string(&peers).unwrap().as_bytes()) - .await - .unwrap(); - file.flush().await.unwrap(); -} - -#[tokio::test] -async fn normal_1() { - setup_test(); - let (rs, keys) = test_utils::generate_standard_genesis(5); - let chain_name = "normal_1".to_owned(); - - let configs = keys - .iter() - .map(|(_, private_key)| generate_config(private_key.clone(), chain_name.clone())) - .collect::>(); - - // Step 0: initialize each's repo - let server_dir = create_temp_dir(); - setup_peer(&server_dir, &[]).await; - setup_pre_genesis_repository(&server_dir, rs.clone()).await; - // Add push configs to server repository. - run_command(format!( - "cd {server_dir}/repository/repo && git config receive.advertisePushOptions true" - )) - .await; - run_command(format!( - "cd {server_dir}/repository/repo && git config sendpack.sideband false" - )) - .await; - - genesis(configs[0].clone(), &server_dir).await.unwrap(); - let mut proposer_node = initialize(configs[0].clone(), &server_dir).await.unwrap(); - let mut other_nodes = Vec::new(); - for config in configs[1..=4].iter() { - let dir = create_temp_dir(); - copy_repository(&server_dir, &dir).await; - setup_peer( - &dir, - &[Peer { - public_key: configs[0].public_key.clone(), - name: "proposer".to_owned(), - address: "127.0.0.1:1".parse().unwrap(), - ports: proposer_node.network_config().ports.clone(), - message: "123".to_owned(), - recently_seen_timestamp: 0, - }], - ) - .await; - other_nodes.push(initialize(config.clone(), &dir).await.unwrap()); - } - - // Step 1: create an agenda and propagate it - log::info!("STEP 1"); - proposer_node.create_agenda().await.unwrap(); - let agenda_commit = proposer_node - .get_raw_repo_mut() - .locate_branch("work".to_owned()) - .await - .unwrap(); - let serve = tokio::spawn(async move { proposer_node.serve(15000).await.unwrap() }); - sleep_ms(500).await; - for node in other_nodes.iter_mut() { - node.fetch().await.unwrap(); - node.vote(agenda_commit).await.unwrap(); - node.broadcast().await.unwrap(); - } - let mut proposer_node = serve.await.unwrap(); - // currently calling `fetch()` is the only way to notice governance approval - proposer_node.fetch().await.unwrap(); - // TODO: it is not guaranteed that `HEAD` is on the agenda proof. - run_command(format!( - "cd {server_dir}/repository/repo && git branch -f work HEAD" - )) - .await; - - // Step 2: create block and run prevote phase - log::info!("STEP 2"); - proposer_node.create_block().await.unwrap(); - proposer_node.progress_for_consensus().await.unwrap(); - proposer_node.broadcast().await.unwrap(); - let serve = tokio::spawn(async move { proposer_node.serve(30000).await.unwrap() }); - sleep_ms(500).await; - for node in other_nodes.iter_mut() { - node.fetch().await.unwrap(); - } - for node in other_nodes.iter_mut() { - node.fetch().await.unwrap(); - } - for node in other_nodes.iter_mut() { - node.progress_for_consensus().await.unwrap(); - node.broadcast().await.unwrap(); - } - let mut proposer_node = serve.await.unwrap(); - proposer_node.progress_for_consensus().await.unwrap(); - proposer_node.broadcast().await.unwrap(); - - // Step 3: Run precommit phase - log::info!("STEP 3"); - let serve = tokio::spawn(async move { proposer_node.serve(40000).await.unwrap() }); - sleep_ms(500).await; - for node in other_nodes.iter_mut() { - node.fetch().await.unwrap(); - } - for node in other_nodes.iter_mut() { - let _ = node.progress_for_consensus().await.unwrap(); - node.broadcast().await.unwrap(); - } - let mut proposer_node = serve.await.unwrap(); - let _ = proposer_node.progress_for_consensus().await.unwrap(); - proposer_node.broadcast().await.unwrap(); - - // Step 4: Propagate finalized proof - log::info!("STEP 4"); - let serve = tokio::spawn(async move { proposer_node.serve(15000).await.unwrap() }); - sleep_ms(500).await; - for node in other_nodes.iter_mut() { - node.fetch().await.unwrap(); - } - for node in other_nodes.iter_mut() { - let _ = node.progress_for_consensus().await; - node.broadcast().await.unwrap(); - } - let mut proposer_node = serve.await.unwrap(); - let _ = proposer_node.progress_for_consensus().await; - proposer_node.broadcast().await.unwrap(); - - for node in std::iter::once(proposer_node).chain(other_nodes.into_iter()) { - let finalized = node - .get_raw_repo() - .locate_branch("finalized".to_owned()) - .await - .unwrap(); - let title = node - .get_raw_repo() - .read_semantic_commit(finalized) - .await - .unwrap() - .title; - assert_eq!(title, ">block: 1"); - } -} From 581b0b7a6dc0cd06039512f4398145490e4f062a Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 15:55:47 +0900 Subject: [PATCH 04/24] Add get_dms() in Consensus --- consensus/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 57472228..551de025 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -12,6 +12,7 @@ use tokio::sync::RwLock; pub type Error = eyre::Error; +pub use state::ConsensusMessage; pub use vetomint::ConsensusParams; const STATE_FILE_NAME: &str = "state.json"; @@ -140,6 +141,10 @@ impl Consensus { Ok(()) } + pub fn get_dms(&self) -> Arc>> { + Arc::clone(&self.dms) + } + pub async fn flush(&mut self) -> Result<(), Error> { // TODO: filter unverified messages (due to the lack of the block verification) let mut state = self.read_state().await?; From 96b860f4bb4e56ee6456c6845dccb7a4d82e23f6 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 16:06:14 +0900 Subject: [PATCH 05/24] Make Governance module calculate eligibility --- governance/src/lib.rs | 54 ++++++++++++++++++++++++++-- governance/tests/integration_test.rs | 22 +++++++----- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/governance/src/lib.rs b/governance/src/lib.rs index 7db66d39..2e9b2218 100644 --- a/governance/src/lib.rs +++ b/governance/src/lib.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use simperby_core::utils::get_timestamp; use simperby_core::*; use simperby_network::*; use std::collections::HashMap; @@ -56,12 +57,14 @@ impl DmsMessage for Vote { pub struct Governance { dms: Arc>>, + fi: FinalizationInfo, } impl Governance { - /// TODO: this must take the eligible governance set for this height. - pub async fn new(dms: Arc>>) -> Result { - Ok(Self { dms }) + pub async fn new(dms: Arc>>, fi: FinalizationInfo) -> Result { + // TODO: this must set the DMS to accept messages only from + // the eligible governance set for this height. + Ok(Self { dms, fi }) } pub async fn read(&self) -> Result { @@ -79,6 +82,51 @@ impl Governance { Ok(status) } + pub async fn get_eligible_agendas(&self) -> Result, Error> { + let governance_set = self + .fi + .reserved_state + .get_governance_set() + // TODO: handle integrity error + .unwrap() + .into_iter() + .collect::>(); + let governance_state = self.read().await?; + let votes: Vec<(Hash256, VotingPower)> = governance_state + .votes + .iter() + .map(|(agenda, votes)| { + ( + *agenda, + votes + .keys() + .map(|voter| governance_set.get(voter).unwrap()) + .sum(), + ) + }) + .collect(); + let mut result = Vec::new(); + let total_voting_power = governance_set.values().sum::(); + for (agenda, voted_power) in votes { + if voted_power * 2 > total_voting_power { + let proof: Vec<_> = governance_state.votes[&agenda] + .iter() + .map(|(k, s)| TypedSignature::::new(s.clone(), k.clone())) + .collect(); + result.push(( + agenda, + AgendaProof { + height: self.fi.header.height + 1, + agenda_hash: agenda, + proof, + timestamp: get_timestamp(), + }, + )); + } + } + Ok(result) + } + pub async fn vote(&mut self, agenda_hash: Hash256) -> Result<(), Error> { self.dms .write() diff --git a/governance/tests/integration_test.rs b/governance/tests/integration_test.rs index 28846370..8cd8bb8b 100644 --- a/governance/tests/integration_test.rs +++ b/governance/tests/integration_test.rs @@ -10,21 +10,27 @@ async fn basic_1() { setup_test(); let network_id = "governance-basic-1".to_string(); - let ((server_network_config, server_private_key), client_network_configs_and_keys, members) = + let ((server_network_config, server_private_key), client_network_configs_and_keys, members, fi) = setup_server_client_nodes(network_id.clone(), 3).await; - let mut server_node = Governance::new(Arc::new(RwLock::new( - create_test_dms(network_id.clone(), members.clone(), server_private_key).await, - ))) + let mut server_node = Governance::new( + Arc::new(RwLock::new( + create_test_dms(network_id.clone(), members.clone(), server_private_key).await, + )), + fi.clone(), + ) .await .unwrap(); let mut client_nodes = Vec::new(); for (network_config, private_key) in client_network_configs_and_keys.iter() { client_nodes.push(( - Governance::new(Arc::new(RwLock::new( - create_test_dms(network_id.clone(), members.clone(), private_key.clone()).await, - ))) + Governance::new( + Arc::new(RwLock::new( + create_test_dms(network_id.clone(), members.clone(), private_key.clone()).await, + )), + fi.clone(), + ) .await .unwrap(), network_config, @@ -35,7 +41,7 @@ async fn basic_1() { server_node.vote(agenda_hash).await.unwrap(); let serve_task = tokio::spawn(async move { - let task = tokio::spawn(dms::serve(server_node.get_dms(), server_network_config)); + let task = tokio::spawn(Dms::serve(server_node.get_dms(), server_network_config)); sleep_ms(5000).await; task.abort(); let _ = task.await; From 52f00107e188e72da3f435d32924e94139212568 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 16:08:33 +0900 Subject: [PATCH 06/24] Make serve/sync as methods --- network/src/dms/server.rs | 141 +++++++++++++++++++------------------- network/src/dms/tests.rs | 4 +- 2 files changed, 74 insertions(+), 71 deletions(-) diff --git a/network/src/dms/server.rs b/network/src/dms/server.rs index fb87b4fb..842984bd 100644 --- a/network/src/dms/server.rs +++ b/network/src/dms/server.rs @@ -1,78 +1,81 @@ use super::*; -/// Runs a DMS server. This function will block the current thread. -pub async fn serve( - dms: Arc>>, - network_config: ServerNetworkConfig, -) -> Result<(), Error> { - let rpc_task = async move { - let wrapped_dms = Arc::new(parking_lot::RwLock::new(Some(dms))); - let wrapped_dms_ = Arc::clone(&wrapped_dms); - struct DropHelper { - wrapped_dms: Arc>>>>, - } - impl Drop for DropHelper { - fn drop(&mut self) { - self.wrapped_dms.write().take().unwrap(); +impl DistributedMessageSet { + /// Runs a DMS server. This function will block the current thread. + pub async fn serve( + dms: Arc>>, + network_config: ServerNetworkConfig, + ) -> Result<(), Error> { + let rpc_task = async move { + let wrapped_dms = Arc::new(parking_lot::RwLock::new(Some(dms))); + let wrapped_dms_ = Arc::clone(&wrapped_dms); + struct DropHelper { + wrapped_dms: Arc>>>>, } - } - let _drop_helper = DropHelper { wrapped_dms }; - run_server( - network_config.port, - [( - "dms".to_owned(), - create_http_object(Arc::new(DmsWrapper { dms: wrapped_dms_ }) - as Arc), - )] - .iter() - .cloned() - .collect(), - ) - .await; - }; - rpc_task.await; - Ok(()) -} + impl Drop for DropHelper { + fn drop(&mut self) { + self.wrapped_dms.write().take().unwrap(); + } + } + let _drop_helper = DropHelper { wrapped_dms }; + run_server( + network_config.port, + [( + "dms".to_owned(), + create_http_object(Arc::new(DmsWrapper { dms: wrapped_dms_ }) + as Arc), + )] + .iter() + .cloned() + .collect(), + ) + .await; + }; + rpc_task.await; + Ok(()) + } -/// Runs a DMS client with auto-sync. This function will block the current thread. -pub async fn sync( - dms: Arc>>, - fetch_interval: Option, - broadcast_interval: Option, - network_config: ClientNetworkConfig, -) -> Result<(), Error> { - let dms_ = Arc::clone(&dms); - let network_config_ = network_config.clone(); - let fetch_task = async move { - if let Some(interval) = fetch_interval { - loop { - if let Err(e) = - DistributedMessageSet::::fetch(Arc::clone(&dms_), &network_config_).await - { - log::warn!("failed to parse message from the RPC-fetch: {}", e); + /// Runs a DMS client with auto-sync. This function will block the current thread. + pub async fn sync( + dms: Arc>>, + fetch_interval: Option, + broadcast_interval: Option, + network_config: ClientNetworkConfig, + ) -> Result<(), Error> { + let dms_ = Arc::clone(&dms); + let network_config_ = network_config.clone(); + let fetch_task = async move { + if let Some(interval) = fetch_interval { + loop { + if let Err(e) = + DistributedMessageSet::::fetch(Arc::clone(&dms_), &network_config_) + .await + { + log::warn!("failed to parse message from the RPC-fetch: {}", e); + } + tokio::time::sleep(interval).await; } - tokio::time::sleep(interval).await; + } else { + futures::future::pending::<()>().await; } - } else { - futures::future::pending::<()>().await; - } - }; - let dms_ = Arc::clone(&dms); - let broadcast_task = async move { - if let Some(interval) = broadcast_interval { - loop { - if let Err(e) = - DistributedMessageSet::::broadcast(Arc::clone(&dms_), &network_config) - .await - { - log::warn!("failed to parse message from the RPC-broadcast: {}", e); + }; + let dms_ = Arc::clone(&dms); + let broadcast_task = async move { + if let Some(interval) = broadcast_interval { + loop { + if let Err(e) = + DistributedMessageSet::::broadcast(Arc::clone(&dms_), &network_config) + .await + { + log::warn!("failed to parse message from the RPC-broadcast: {}", e); + } + tokio::time::sleep(interval).await; } - tokio::time::sleep(interval).await; + } else { + futures::future::pending::<()>().await; } - } else { - futures::future::pending::<()>().await; - } - }; - join(fetch_task, broadcast_task).await; - Ok(()) + }; + join(fetch_task, broadcast_task).await; + Ok(()) + } } diff --git a/network/src/dms/tests.rs b/network/src/dms/tests.rs index 79477242..2c86bf52 100644 --- a/network/src/dms/tests.rs +++ b/network/src/dms/tests.rs @@ -110,7 +110,7 @@ async fn run_client_node( let dms_ = Arc::clone(&dms); let network_config_ = network_config.clone(); let sync_task = tokio::spawn(async move { - sync(dms_, fetch_interval, broadcast_interval, network_config_) + Dms::sync(dms_, fetch_interval, broadcast_interval, network_config_) .await .unwrap(); }); @@ -167,7 +167,7 @@ async fn multi_1() { )); client_dmses.push(dms); } - tokio::spawn(serve(Arc::clone(&server_dms), server_network_config)); + tokio::spawn(Dms::serve(Arc::clone(&server_dms), server_network_config)); join_all(tasks).await; for dms in client_dmses { From 1012bc866431002830111c6f0df39cc8c941c9fe Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 16:22:36 +0900 Subject: [PATCH 07/24] Add Debug bound for DmsMessage --- network/src/dms/messages.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/network/src/dms/messages.rs b/network/src/dms/messages.rs index 9cfa1581..8dcb9a61 100644 --- a/network/src/dms/messages.rs +++ b/network/src/dms/messages.rs @@ -1,9 +1,13 @@ +use std::fmt::Debug; + use super::*; use serde::{de::DeserializeOwned, ser::Serialize}; pub type DmsKey = String; -pub trait DmsMessage: Send + Sync + 'static + ToHash256 + Serialize + DeserializeOwned { +pub trait DmsMessage: + Send + Sync + 'static + ToHash256 + Serialize + DeserializeOwned + Debug +{ /// Checks if the message is valid. fn check(&self) -> Result<(), Error>; From 433387f7793bcfd1aff39149e18700bb92a42cc8 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 16:23:01 +0900 Subject: [PATCH 08/24] Skip broadcast if no messages exist --- network/src/dms/rpc.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/network/src/dms/rpc.rs b/network/src/dms/rpc.rs index e33d4d3c..9ce52a0a 100644 --- a/network/src/dms/rpc.rs +++ b/network/src/dms/rpc.rs @@ -112,6 +112,9 @@ impl DistributedMessageSet { let mut tasks_and_messages = Vec::new(); let packets = this.read().await.retrieve_packets().await?; + if packets.is_empty() { + return Ok(()); + } for peer in &network_config.peers { let key = this.read().await.config.dms_key.clone(); let port_key = format!("dms-{key}"); From cd0266a52caf692e5afac87dd01b18b95ff0a572 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 4 Apr 2023 16:23:14 +0900 Subject: [PATCH 09/24] Introduce Simperby API (prev. Node) --- Cargo.toml | 3 +- simperby/Cargo.toml | 25 ++++ simperby/src/lib.rs | 233 +++++++++++++++++++++++++++++ simperby/src/storage.rs | 86 +++++++++++ simperby/src/types.rs | 65 ++++++++ simperby/tests/integration_test.rs | 169 +++++++++++++++++++++ 6 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 simperby/Cargo.toml create mode 100644 simperby/src/lib.rs create mode 100644 simperby/src/storage.rs create mode 100644 simperby/src/types.rs create mode 100644 simperby/tests/integration_test.rs diff --git a/Cargo.toml b/Cargo.toml index 617d8004..c0a4c97d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,6 @@ members = [ "repository", "consensus", "governance", - "settlement" + "settlement", + "simperby" ] diff --git a/simperby/Cargo.toml b/simperby/Cargo.toml new file mode 100644 index 00000000..e1c3bc88 --- /dev/null +++ b/simperby/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "simperby" +version = "0.0.0" +authors = ["PDAO Team "] +edition = "2021" + +[dependencies] +eyre = "0.6.8" +async-trait = "0.1.42" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.0", features = ["full"] } +chrono = { version = "0.4", features = ["serde"] } +futures = "0.3" +log = "0.4" +simperby-core = { version = "0.1.0", path = "../core" } +simperby-network = { version = "0.1.0", path = "../network" } +simperby-governance = { version = "0.1.0", path = "../governance" } +simperby-consensus = { version = "0.1.0", path = "../consensus" } +simperby-repository = { version = "0.1.0", path = "../repository" } +thiserror = "1.0.32" +semver = "1.0.0" + +[dev-dependencies] +rand = "0.8.5" +simperby-test-suite = { path = "../test-suite" } diff --git a/simperby/src/lib.rs b/simperby/src/lib.rs new file mode 100644 index 00000000..12c57bd0 --- /dev/null +++ b/simperby/src/lib.rs @@ -0,0 +1,233 @@ +mod storage; +pub mod types; + +use eyre::eyre; +use eyre::Result; +use simperby_consensus::*; +use simperby_core::utils::get_timestamp; +use simperby_core::*; +use simperby_governance::*; +use simperby_network::*; +use simperby_repository::raw::RawRepository; +use simperby_repository::*; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub use crate::types::*; + +/// An instance of Simperby client (a.k.a. a 'node'). +pub struct Client { + config: types::Config, + auth: Auth, + path: String, + repository: DistributedRepository, + governance: Governance, + consensus: Consensus, +} + +impl Client { + pub async fn genesis(path: &str) -> Result<()> { + let repository = RawRepository::open(path).await?; + DistributedRepository::genesis(repository).await?; + Ok(()) + } + + pub async fn init(path: &str, config: types::Config) -> Result<()> { + storage::init(path, config).await?; + Ok(()) + } + + pub async fn open(path: &str, config: types::Config, auth: Auth) -> Result { + let (governance_dms, consensus_dms, consensus_state, repository) = + storage::open(path, config.clone(), auth.clone()).await?; + let lfi = repository.read_last_finalization_info().await?; + + Ok(Self { + config, + auth: auth.clone(), + path: path.to_string(), + repository, + governance: Governance::new(Arc::new(RwLock::new(governance_dms)), lfi.clone()).await?, + consensus: Consensus::new( + Arc::new(RwLock::new(consensus_dms)), + consensus_state, + lfi.header, + ConsensusParams { + timeout_ms: 10000000, + repeat_round_for_first_leader: 100, + }, + get_timestamp(), + Some(auth.private_key), + ) + .await?, + }) + } + + pub fn config(&self) -> &types::Config { + &self.config + } + + pub fn auth(&self) -> &Auth { + &self.auth + } + + pub async fn clean(&mut self, _hard: bool) -> Result<()> { + todo!() + } + + pub fn repository(&self) -> &DistributedRepository { + &self.repository + } + + pub fn repository_mut(&mut self) -> &mut DistributedRepository { + &mut self.repository + } + + /// Makes a progress for the consensus, returning the result. + /// + /// TODO: it has to consume the object if finalized. + pub async fn progress_for_consensus(&mut self) -> Result { + let result = self.consensus.progress(get_timestamp()).await?; + let report = format!("{result:?}"); + for result in result { + if let ProgressResult::Finalized(hash, _, proof) = result { + let commit_hash = self + .repository + .read_blocks() + .await? + .iter() + .find(|(_, h)| *h == hash) + .ok_or_else(|| eyre::eyre!("finalized block can't be found in repository"))? + .0; + self.repository.finalize(commit_hash, proof).await?; + } + } + Ok(report) + } + + pub async fn vote(&mut self, agenda_commit: CommitHash) -> Result<()> { + let agendas = self.repository.read_agendas().await?; + let agenda_hash = if let Some(x) = agendas.iter().find(|(x, _)| *x == agenda_commit) { + x.1 + } else { + return Err(eyre!( + "the given commit hash {} is not one of the valid agendas", + agenda_commit + )); + }; + self.repository.vote(agenda_commit).await?; + self.governance.vote(agenda_hash).await?; + Ok(()) + } + + /// Vetoes the current round. + pub async fn veto_round(&mut self) -> Result<()> { + unimplemented!() + } + + /// Vetoes the given block. + pub async fn veto_block(&mut self, _block_commit: CommitHash) -> Result<()> { + unimplemented!() + } + + /// Shows information about the given commit. + pub async fn show(&self, _commit_hash: CommitHash) -> Result { + todo!() + } + + /// Adds remote repositories according to current peer information. + pub async fn add_remote_repositories(&mut self) -> Result<()> { + for peer in &self.config.peers { + let port = if let Some(p) = peer.ports.get("repository") { + p + } else { + continue; + }; + let url = format!("git://{}:{port}/", peer.address.ip()); + self.repository + .get_raw() + .write() + .await + .add_remote(peer.name.clone(), url) + .await?; + } + Ok(()) + } + + pub async fn serve( + self, + config: ServerConfig, + git_hook_verifier: simperby_repository::server::PushVerifier, + ) -> Result>> { + let network_config = ServerNetworkConfig { + port: config.governance_port, + }; + let t1 = async move { + Dms::serve(self.governance.get_dms(), network_config) + .await + .unwrap() + }; + + let network_config = ServerNetworkConfig { + port: config.consensus_port, + }; + let t2 = async move { + Dms::serve(self.consensus.get_dms(), network_config) + .await + .unwrap() + }; + let t3 = async move { + let _server = simperby_repository::server::run_server( + &self.path, + config.repository_port, + git_hook_verifier, + ) + .await; + std::future::pending::<()>().await; + }; + + Ok(tokio::spawn(async move { + futures::future::join3(t1, t2, t3).await; + Ok(()) + })) + } + + pub async fn update(&mut self) -> Result<()> { + let network_config = ClientNetworkConfig { + peers: self.config.peers.clone(), + }; + Dms::fetch(self.governance.get_dms(), &network_config).await?; + Dms::fetch(self.consensus.get_dms(), &network_config).await?; + self.repository.get_raw().write().await.fetch_all().await?; + self.repository.sync_all().await?; + + // Update governance + self.governance.update().await?; + for (agenda_hash, agenda_proof) in self.governance.get_eligible_agendas().await? { + self.repository + .approve(&agenda_hash, agenda_proof.proof, get_timestamp()) + .await?; + } + + // Update consensus + self.consensus.update().await?; + for (_, block_hash) in self.repository.read_blocks().await? { + self.consensus + .register_verified_block_hash(block_hash) + .await?; + } + Ok(()) + } + + pub async fn broadcast(&mut self) -> Result<()> { + let network_config = ClientNetworkConfig { + peers: self.config.peers.clone(), + }; + self.governance.flush().await?; + Dms::broadcast(self.governance.get_dms(), &network_config).await?; + self.consensus.flush().await?; + Dms::broadcast(self.consensus.get_dms(), &network_config).await?; + self.repository.broadcast().await?; + Ok(()) + } +} diff --git a/simperby/src/storage.rs b/simperby/src/storage.rs new file mode 100644 index 00000000..ed358a06 --- /dev/null +++ b/simperby/src/storage.rs @@ -0,0 +1,86 @@ +use super::*; + +fn governance_dms_path(path: &str) -> String { + format!("{path}/.simperby/governance/dms") +} + +fn consensus_dms_path(path: &str) -> String { + format!("{path}/.simperby/consensus/dms") +} + +fn consensus_state_path(path: &str) -> String { + format!("{path}/.simperby/consensus/state") +} + +pub(crate) async fn init(path: &str, _config: types::Config) -> Result<()> { + let mut repository = DistributedRepository::new( + Arc::new(RwLock::new(RawRepository::open(path).await?)), + simperby_repository::Config { + long_range_attack_distance: 3, + }, + None, + ) + .await?; + repository.check(0).await?; + if !repository.check_gitignore().await? { + repository.commit_gitignore().await?; + } + + StorageImpl::create(&governance_dms_path(path)).await?; + StorageImpl::create(&consensus_dms_path(path)).await?; + StorageImpl::create(&consensus_state_path(path)).await?; + Ok(()) +} + +/// `(Governance DMS, Consensus DMS, ConsensusState, Distributed Repository)`. +pub(crate) async fn open( + path: &str, + _config: types::Config, + auth: Auth, +) -> Result<( + Dms, + Dms, + StorageImpl, + DistributedRepository, +)> { + let repository = DistributedRepository::new( + Arc::new(RwLock::new(RawRepository::open(path).await?)), + simperby_repository::Config { + long_range_attack_distance: 3, + }, + Some(auth.private_key.clone()), + ) + .await?; + repository.check(0).await?; + let lfi = repository.read_last_finalization_info().await?; + let dms_members: Vec<_> = lfi + .reserved_state + .get_governance_set() + .map_err(simperby_repository::IntegrityError::new)? + .into_iter() + .map(|x| x.0) + .collect(); + + let storage = StorageImpl::open(&governance_dms_path(path)).await?; + let governance_dms = Dms::::new( + storage, + dms::Config { + dms_key: format!("governance-{}", lfi.header.to_hash256()), + members: dms_members.clone(), + }, + auth.private_key.clone(), + ) + .await?; + let storage = StorageImpl::open(&consensus_dms_path(path)).await?; + let consensus_dms = Dms::::new( + storage, + dms::Config { + dms_key: format!("consensus-{}", lfi.header.to_hash256()), + members: dms_members.clone(), + }, + auth.private_key.clone(), + ) + .await?; + let consensus_state = StorageImpl::open(&consensus_state_path(path)).await?; + Ok((governance_dms, consensus_dms, consensus_state, repository)) +} diff --git a/simperby/src/types.rs b/simperby/src/types.rs new file mode 100644 index 00000000..2e3724fb --- /dev/null +++ b/simperby/src/types.rs @@ -0,0 +1,65 @@ +use super::*; +use serde::{Deserialize, Serialize}; +use simperby_repository::raw::SemanticCommit; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ConsensusStatus { + // TODO +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct NetworkStatus { + // TODO +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum CommitInfo { + Block { + semantic_commit: SemanticCommit, + block_header: BlockHeader, + // TODO: block-specific consensus status + }, + Agenda { + semantic_commit: SemanticCommit, + agenda: Agenda, + voters: Vec<(MemberName, Timestamp)>, + }, + AgendaProof { + semantic_commit: SemanticCommit, + agenda_proof: AgendaProof, + }, + Transaction { + semantic_commit: SemanticCommit, + transaction: Transaction, + }, + PreGenesisCommit { + title: String, + }, + Unknown { + semantic_commit: SemanticCommit, + msg: String, + }, // TODO +} + +/// A configuration for a node. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Config { + /// TODO: remove this and introduce a proper peer discovery protocol + pub peers: Vec, +} + +/// Hosting a server node requires extra configuration. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ServerConfig { + pub governance_port: u16, + pub consensus_port: u16, + pub repository_port: u16, + + pub broadcast_interval_ms: Option, + pub fetch_interval_ms: Option, +} + +#[derive(Debug, Clone)] +pub struct Auth { + pub private_key: PrivateKey, +} diff --git a/simperby/tests/integration_test.rs b/simperby/tests/integration_test.rs new file mode 100644 index 00000000..5cea190b --- /dev/null +++ b/simperby/tests/integration_test.rs @@ -0,0 +1,169 @@ +use simperby::types::{Auth, Config}; +use simperby::*; +use simperby_core::*; +use simperby_network::Peer; +use simperby_test_suite::*; + +fn setup_network( + start_fi: &FinalizationInfo, + server_public_key: PublicKey, +) -> (Vec, ServerConfig) { + let server_config = ServerConfig { + governance_port: dispense_port(), + consensus_port: dispense_port(), + repository_port: dispense_port(), + broadcast_interval_ms: Some(500), + fetch_interval_ms: Some(500), + }; + let peer = vec![Peer { + public_key: server_public_key, + name: "server".to_owned(), + address: "127.0.0.1:1".parse().unwrap(), + ports: vec![ + ( + format!("dms-governance-{}", start_fi.header.to_hash256()), + server_config.governance_port, + ), + ( + format!("dms-consensus-{}", start_fi.header.to_hash256()), + server_config.consensus_port, + ), + ("repository".to_owned(), server_config.repository_port), + ] + .into_iter() + .collect(), + message: "".to_owned(), + recently_seen_timestamp: 0, + }]; + (peer, server_config) +} + +async fn sync_each_other(clients: &mut [Client]) { + for client in clients.iter_mut() { + client.broadcast().await.unwrap(); + } + sleep_ms(200).await; + for client in clients.iter_mut() { + client.update().await.unwrap(); + } + sleep_ms(200).await; +} + +#[tokio::test] +async fn normal_1() { + setup_test(); + let (fi, keys) = test_utils::generate_fi(4); + let (peers, server_config) = setup_network(&fi, keys[0].0.clone()); + + // Setup repository and server + let server_dir = create_temp_dir(); + setup_pre_genesis_repository(&server_dir, fi.reserved_state.clone()).await; + Client::genesis(&server_dir).await.unwrap(); + Client::init(&server_dir, Config { peers: Vec::new() }) + .await + .unwrap(); + // Add push configs to server repository. + run_command(format!( + "cd {server_dir} && git config receive.advertisePushOptions true" + )) + .await; + run_command(format!( + "cd {server_dir} && git config sendpack.sideband false" + )) + .await; + + // Setup clients + let mut clients = Vec::new(); + for (_, key) in keys.iter().take(3) { + let dir = create_temp_dir(); + run_command(format!("cp -a {server_dir}/. {dir}/")).await; + let auth = Auth { + private_key: key.clone(), + }; + let mut client = Client::open( + &dir, + Config { + peers: peers.clone(), + }, + auth, + ) + .await + .unwrap(); + client.add_remote_repositories().await.unwrap(); + clients.push(client); + } + + // Run server + let auth = Auth { + private_key: keys[3].1.clone(), + }; + tokio::spawn(async move { + let client = Client::open(&server_dir, Config { peers: Vec::new() }, auth) + .await + .unwrap(); + let task = client + .serve( + server_config, + simperby_repository::server::PushVerifier::AlwaysAccept, + ) + .await + .unwrap(); + task.await.unwrap().unwrap(); + }); + + // Step 1: create an agenda and propagate it + log::info!("STEP 1"); + let (_, agenda_commit) = clients[0] + .repository_mut() + .create_agenda(fi.reserved_state.members[0].name.clone()) + .await + .unwrap(); + + sync_each_other(&mut clients).await; + for client in clients.iter_mut().take(3) { + client.vote(agenda_commit).await.unwrap(); + } + sync_each_other(&mut clients).await; + + // Step 2: create block and run consensus + log::info!("STEP 2"); + let proposer_public_key = clients[0].auth().private_key.public_key(); + clients[0] + .repository_mut() + .create_block(proposer_public_key) + .await + .unwrap(); + sync_each_other(&mut clients).await; + for client in clients.iter_mut().take(3) { + client.progress_for_consensus().await.unwrap(); + } + sync_each_other(&mut clients).await; + for client in clients.iter_mut().take(3) { + client.progress_for_consensus().await.unwrap(); + } + sync_each_other(&mut clients).await; + for client in clients.iter_mut().take(3) { + client.progress_for_consensus().await.unwrap(); + } + sync_each_other(&mut clients).await; + for client in clients.iter_mut().take(3) { + client.progress_for_consensus().await.unwrap(); + } + sync_each_other(&mut clients).await; + + // Step 3: check the result + for client in clients { + let raw_repo = client.repository().get_raw(); + let raw_repo_ = raw_repo.read().await; + let finalized = raw_repo_ + .locate_branch("finalized".to_owned()) + .await + .unwrap(); + let title = raw_repo_ + .read_semantic_commit(finalized) + .await + .unwrap() + .title; + assert_eq!(title, ">block: 1"); + } +} From 3c51936c49bb7c153c055ba490454093982af389 Mon Sep 17 00:00:00 2001 From: JeongHunP Date: Wed, 5 Apr 2023 13:31:01 +0900 Subject: [PATCH 10/24] Update create_commit methods correctly --- repository/src/raw/implementation.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/repository/src/raw/implementation.rs b/repository/src/raw/implementation.rs index 2360c7f2..d4ce4d43 100644 --- a/repository/src/raw/implementation.rs +++ b/repository/src/raw/implementation.rs @@ -301,6 +301,7 @@ impl RawRepositoryInner { for path in paths { index.add_path(std::path::Path::new(path))?; } + index.write()?; let id = index.write_tree()?; self.repo.find_tree(id)? } else { @@ -334,6 +335,7 @@ impl RawRepositoryInner { // Add all files to the index. let mut index = self.repo.index()?; index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?; + index.write()?; let id = index.write_tree()?; let tree = self.repo.find_tree(id)?; let head = self.get_head()?; From 1486dd2922d0b02adf60f904a6eddc24c194fae8 Mon Sep 17 00:00:00 2001 From: JeongHunP Date: Wed, 5 Apr 2023 13:36:18 +0900 Subject: [PATCH 11/24] Add simple_git_server to normal_1 test --- simperby/tests/integration_test.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/simperby/tests/integration_test.rs b/simperby/tests/integration_test.rs index 5cea190b..91a3ef00 100644 --- a/simperby/tests/integration_test.rs +++ b/simperby/tests/integration_test.rs @@ -49,6 +49,25 @@ async fn sync_each_other(clients: &mut [Client]) { sleep_ms(200).await; } +fn build_simple_git_server() -> String { + let mut cmd = std::process::Command::new("cargo"); + cmd.arg("build"); + cmd.arg("--bin"); + cmd.arg("simple_git_server"); + cmd.arg("--release"); + cmd.current_dir(concat!( + env!("CARGO_MANIFEST_DIR"), + "/../repository/src/bin" + )); + let output = cmd.output().unwrap(); + assert!(output.status.success()); + + format!( + "{}/../target/release/simple_git_server", + env!("CARGO_MANIFEST_DIR").replace('\\', "/") + ) +} + #[tokio::test] async fn normal_1() { setup_test(); @@ -104,7 +123,9 @@ async fn normal_1() { let task = client .serve( server_config, - simperby_repository::server::PushVerifier::AlwaysAccept, + simperby_repository::server::PushVerifier::VerifierExecutable( + build_simple_git_server(), + ), ) .await .unwrap(); From 6c58b49cca3f9bc4301bc895ecad6a047e59144a Mon Sep 17 00:00:00 2001 From: JeongHunP Date: Thu, 6 Apr 2023 15:56:36 +0900 Subject: [PATCH 12/24] Fix create_block to checkout branch correctly --- repository/src/interpret/create.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/repository/src/interpret/create.rs b/repository/src/interpret/create.rs index bfd7631b..6b5a5137 100644 --- a/repository/src/interpret/create.rs +++ b/repository/src/interpret/create.rs @@ -198,11 +198,13 @@ pub async fn create_block( let semantic_commit = to_semantic_commit(&block_commit, reserved_state)?; raw.checkout_clean().await?; + raw.checkout_detach(head).await?; let result = raw.create_semantic_commit(semantic_commit).await?; let mut block_branch_name = block_commit.to_hash256().to_string(); block_branch_name.truncate(BRANCH_NAME_HASH_DIGITS); let block_branch_name = format!("b-{block_branch_name}"); - raw.create_branch(block_branch_name, result).await?; + raw.create_branch(block_branch_name.clone(), result).await?; + raw.checkout(block_branch_name).await?; Ok((block_header, result)) } From 6c5d53f8995a86359eac899f6812389cc3f615cb Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Thu, 6 Apr 2023 16:58:54 +0900 Subject: [PATCH 13/24] Fix server index --- simperby/tests/integration_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simperby/tests/integration_test.rs b/simperby/tests/integration_test.rs index 91a3ef00..f43ccdf2 100644 --- a/simperby/tests/integration_test.rs +++ b/simperby/tests/integration_test.rs @@ -72,7 +72,7 @@ fn build_simple_git_server() -> String { async fn normal_1() { setup_test(); let (fi, keys) = test_utils::generate_fi(4); - let (peers, server_config) = setup_network(&fi, keys[0].0.clone()); + let (peers, server_config) = setup_network(&fi, keys[3].0.clone()); // Setup repository and server let server_dir = create_temp_dir(); From f594c05119b791e54e4e2228d986f3368c74ac7a Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Thu, 6 Apr 2023 16:58:58 +0900 Subject: [PATCH 14/24] Don't recreate agenda proof --- repository/src/interpret/create.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/repository/src/interpret/create.rs b/repository/src/interpret/create.rs index 6b5a5137..25703a5d 100644 --- a/repository/src/interpret/create.rs +++ b/repository/src/interpret/create.rs @@ -7,6 +7,22 @@ pub async fn approve( proof: Vec>, timestamp: Timestamp, ) -> Result { + let approved_agendas = read::read_governance_approved_agendas(raw).await?; + + for (commit_hash, _) in approved_agendas { + if let Commit::AgendaProof(agenda_proof) = read::read_commit(raw, commit_hash).await? { + if agenda_proof.agenda_hash == *agenda_hash { + // already approved + return Ok(commit_hash); + } + } else { + return Err(eyre!(IntegrityError::new(format!( + "commit {} is not an agenda proof", + commit_hash + )))); + } + } + // Check if the agenda branch is rebased on top of the `finalized` branch. let last_header_commit = raw.locate_branch(FINALIZED_BRANCH_NAME.into()).await?; let agenda_branch_name = format!("a-{}", &agenda_hash.to_string()[0..BRANCH_NAME_HASH_DIGITS]); From 007e2f9486df30b23b73cb9c8952e7069abe50d8 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:27:40 +0900 Subject: [PATCH 15/24] Use hex encoded string for consensus state --- Cargo.lock | 1 + consensus/Cargo.toml | 1 + consensus/src/lib.rs | 6 ++++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 47beccbf..e48a1f5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1799,6 +1799,7 @@ dependencies = [ "async-trait", "eyre", "futures", + "hex", "itertools", "log", "parking_lot", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 54d9883f..7961c8f7 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -22,6 +22,7 @@ simperby-core = { version = "0.1.0", path = "../core" } simperby-network = { version = "0.1.0", path = "../network" } vetomint = { version = "0.1.0", path = "../vetomint" } parking_lot = "0.12.1" +hex = "0.4.3" [dev-dependencies] simperby-test-suite = { version = "0.1.0", path = "../test-suite" } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 551de025..f1a6a0b6 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -178,13 +178,15 @@ impl Consensus { impl Consensus { async fn read_state(&self) -> Result { let raw_state = self.state_storage.read_file(STATE_FILE_NAME).await?; - let state: State = serde_spb::from_str(&raw_state)?; + let state: State = serde_spb::from_slice(&hex::decode(raw_state)?)?; Ok(state) } async fn commit_state(&mut self, state: &State) -> Result<(), Error> { + // We can't use json because of a non-string map + let data = hex::encode(serde_spb::to_vec(state).unwrap()); self.state_storage - .add_or_overwrite_file(STATE_FILE_NAME, serde_spb::to_string(state).unwrap()) + .add_or_overwrite_file(STATE_FILE_NAME, data) .await .map_err(|_| eyre!("failed to commit consensus state to the storage")) } From 1672fac21f97e89f49fb9b4e31d11aa14a0780a6 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:39:21 +0900 Subject: [PATCH 16/24] Introduce wrapper type Finalization --- consensus/src/lib.rs | 11 +++++++++-- consensus/src/state.rs | 23 +++++++++++------------ simperby/src/lib.rs | 7 +++++-- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f1a6a0b6..68e5174a 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -24,10 +24,17 @@ pub enum ProgressResult { NonNilPreCommitted(ConsensusRound, Hash256, Timestamp), NilPreVoted(ConsensusRound, Timestamp), NilPreCommitted(ConsensusRound, Timestamp), - Finalized(Hash256, Timestamp, FinalizationProof), + Finalized(Finalization), ViolationReported(PublicKey, String, Timestamp), } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Finalization { + pub block_hash: Hash256, + pub timestamp: Timestamp, + pub proof: FinalizationProof, +} + /// The consensus module pub struct Consensus { /// The distributed consensus message set. @@ -92,7 +99,7 @@ impl Consensus { } /// Checks whether the consensus is finalized. - pub async fn check_finalized(&self) -> Result, Error> { + pub async fn check_finalized(&self) -> Result, Error> { let state = self.read_state().await?; Ok(state.check_finalized()) } diff --git a/consensus/src/state.rs b/consensus/src/state.rs index a618c354..5df22575 100644 --- a/consensus/src/state.rs +++ b/consensus/src/state.rs @@ -1,4 +1,4 @@ -use super::ProgressResult; +use super::*; use eyre::eyre; use serde::{Deserialize, Serialize}; use simperby_core::*; @@ -108,7 +108,7 @@ pub struct State { precommits: BTreeMap<(Hash256, ConsensusRound), Vec>>, /// If `Some`, any operation on the consensus module will fail; /// the user must run `new()` with the next height info. - finalized: Option, + finalized: Option, } impl State { @@ -139,7 +139,7 @@ impl State { Ok(state) } - pub fn check_finalized(&self) -> Option { + pub fn check_finalized(&self) -> Option { self.finalized.clone() } @@ -279,7 +279,7 @@ impl State { } fn process_consensus_response_to_progress_result( - &self, + &mut self, response: ConsensusResponse, timestamp: Timestamp, ) -> (ProgressResult, Option) { @@ -345,14 +345,13 @@ impl State { .get(&(block_hash, round)) .cloned() .expect("there must be valid precommits for the finalized block"); - ( - ProgressResult::Finalized( - block_hash, - timestamp, - FinalizationProof { round, signatures }, - ), - None, - ) + let finalization = Finalization { + block_hash, + timestamp, + proof: FinalizationProof { round, signatures }, + }; + self.finalized = Some(finalization.clone()); + (ProgressResult::Finalized(finalization), None) } ConsensusResponse::ViolationReport { violator, diff --git a/simperby/src/lib.rs b/simperby/src/lib.rs index 12c57bd0..3c482673 100644 --- a/simperby/src/lib.rs +++ b/simperby/src/lib.rs @@ -90,13 +90,16 @@ impl Client { let result = self.consensus.progress(get_timestamp()).await?; let report = format!("{result:?}"); for result in result { - if let ProgressResult::Finalized(hash, _, proof) = result { + if let ProgressResult::Finalized(Finalization { + block_hash, proof, .. + }) = result + { let commit_hash = self .repository .read_blocks() .await? .iter() - .find(|(_, h)| *h == hash) + .find(|(_, h)| *h == block_hash) .ok_or_else(|| eyre::eyre!("finalized block can't be found in repository"))? .0; self.repository.finalize(commit_hash, proof).await?; From 707f1efa68dc3a86fed0a07b61c05129c90c2b61 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:32:11 +0900 Subject: [PATCH 17/24] Add missing start event --- consensus/src/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/src/state.rs b/consensus/src/state.rs index 5df22575..4159a111 100644 --- a/consensus/src/state.rs +++ b/consensus/src/state.rs @@ -128,7 +128,7 @@ impl State { vetomint: Vetomint::new(height_info), block_header: block_header.clone(), block_identifier_count: 0, - to_be_processed_events: Vec::new(), + to_be_processed_events: vec![(ConsensusEvent::Start, round_zero_timestamp)], updated_events: BTreeSet::new(), verified_block_hashes: BTreeMap::new(), vetoed_block_hashes: BTreeSet::new(), From 553bf5fd469870000118a7bbf8fe664fa1314d7c Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:32:23 +0900 Subject: [PATCH 18/24] Fix wrong access in map --- consensus/src/state.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consensus/src/state.rs b/consensus/src/state.rs index 4159a111..53e54b89 100644 --- a/consensus/src/state.rs +++ b/consensus/src/state.rs @@ -208,7 +208,8 @@ impl State { if let ConsensusMessage::NonNilPreCommitted(round, block_hash) = message { self.precommits .entry((block_hash, round)) - .and_modify(|v| v.push(TypedSignature::new(signature, author))); + .and_modify(|v| v.push(TypedSignature::new(signature.clone(), author.clone()))) + .or_insert(vec![TypedSignature::new(signature, author)]); } } } From d8c9c88120643437b3888455d1e498becaa2107e Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:32:45 +0900 Subject: [PATCH 19/24] Include server node in clients --- governance/tests/integration_test.rs | 2 +- test-suite/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/governance/tests/integration_test.rs b/governance/tests/integration_test.rs index 8cd8bb8b..34842fc2 100644 --- a/governance/tests/integration_test.rs +++ b/governance/tests/integration_test.rs @@ -11,7 +11,7 @@ async fn basic_1() { let network_id = "governance-basic-1".to_string(); let ((server_network_config, server_private_key), client_network_configs_and_keys, members, fi) = - setup_server_client_nodes(network_id.clone(), 3).await; + setup_server_client_nodes(network_id.clone(), 4).await; let mut server_node = Governance::new( Arc::new(RwLock::new( diff --git a/test-suite/src/lib.rs b/test-suite/src/lib.rs index d5fef15a..0b56fdd3 100644 --- a/test-suite/src/lib.rs +++ b/test-suite/src/lib.rs @@ -104,7 +104,7 @@ pub async fn setup_server_client_nodes( FinalizationInfo, ) { let (fi, keys) = simperby_core::test_utils::generate_fi(client_n); - let (_, server_private_key) = generate_keypair_random(); + let (_, server_private_key) = keys.last().unwrap().clone(); let server = ServerNetworkConfig { port: dispense_port(), }; From e3504af840d5a50a48e157f24729a6e75869186b Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:34:42 +0900 Subject: [PATCH 20/24] Remove outdated test --- consensus/tests/integration_test.rs | 441 ---------------------------- 1 file changed, 441 deletions(-) diff --git a/consensus/tests/integration_test.rs b/consensus/tests/integration_test.rs index ae4f7985..8b137891 100644 --- a/consensus/tests/integration_test.rs +++ b/consensus/tests/integration_test.rs @@ -1,442 +1 @@ -#![cfg(never)] -use common::{ - crypto::{Signature, TypedSignature}, - BlockHeight, FinalizationProof, PrivateKey, Timestamp, -}; -use itertools::Itertools; -#[allow(unused_imports)] -use log::debug; -use simperby_consensus::{Consensus, ConsensusMessage, Precommit, Prevote, ProgressResult}; -use simperby_core::{ - self as common, - crypto::{Hash256, PublicKey}, - utils::get_timestamp, - BlockHeader, VotingPower, -}; -use simperby_network::{ - primitives::Storage, storage::StorageImpl, NetworkConfig, SharedKnownPeers, -}; -use simperby_test_suite as test_suite; -use std::fmt::Debug; -use std::iter::once; -use test_suite::*; -use vetomint::ConsensusParams; - -fn get_initial_block_header(validator_set: Vec<(PublicKey, VotingPower)>) -> BlockHeader { - BlockHeader { - author: PublicKey::zero(), - prev_block_finalization_proof: Vec::new(), - previous_hash: Hash256::zero(), - height: 0 as BlockHeight, - timestamp: 0 as Timestamp, - commit_merkle_root: Hash256::zero(), - repository_merkle_root: Hash256::zero(), - validator_set, - version: "0.0.0".to_string(), - } -} - -fn configs_to_block_header( - configs: Vec<&NetworkConfig>, - voting_powers: Vec, -) -> BlockHeader { - let pubkeys = configs.iter().map(|config| config.public_key.clone()); - let validator_set: Vec<(PublicKey, u64)> = pubkeys.zip(voting_powers.iter().cloned()).collect(); - get_initial_block_header(validator_set) -} - -async fn create_storage(dirname: String) -> StorageImpl { - StorageImpl::create(&dirname).await.unwrap(); - StorageImpl::open(&dirname).await.unwrap() -} - -fn get_network_id_and_dms_key(testname: &str) -> (String, String) { - let network_id = format!("consensus_{testname}"); - let dms_key = network_id.clone(); - (network_id, dms_key) -} - -/// This may panic. -fn assert_eq_unordered(expected: &Vec, actual: &Vec) { - let result = expected.len() == actual.len() - && actual.iter().all(|a| expected.contains(a)) - && expected.iter().all(|e| actual.contains(e)); - if !result { - panic!("assert_eq_unordered failed: \nexpected: {expected:?}\nactual: {actual:?}"); - } -} - -fn prevote(block_hash: Hash256, privkey: &PrivateKey) -> Prevote { - TypedSignature::sign(&format!("{}-{}", block_hash, "prevote"), privkey).unwrap() -} - -fn precommit(block_hash: Hash256, privkey: &PrivateKey) -> Precommit { - TypedSignature::new( - Signature::sign(block_hash, privkey).unwrap(), - privkey.public_key(), - ) -} - -/// This may panic. -fn _verify_fp(_fp: FinalizationProof) { - unimplemented!(); -} - -#[tokio::test(flavor = "multi_thread")] -async fn single_server_propose_1() { - setup_test(); - let (network_id, dms_key) = get_network_id_and_dms_key("single_server_propose_1"); - - let voting_powers = vec![1, 1, 1, 1, 1]; - let num_nodes = voting_powers.len(); - let params = ConsensusParams { - timeout_ms: 60 * 1_000, - repeat_round_for_first_leader: 100, - }; - let round_zero_timestamp = get_timestamp(); - - let (server_config, other_configs, peers) = - setup_server_client_nodes(network_id.clone(), num_nodes - 1).await; - - let block_header = configs_to_block_header( - once(&server_config).chain(&other_configs).collect(), - voting_powers, - ); - - let mut server_node = Consensus::new( - create_test_dms( - server_config.clone(), - dms_key.clone(), - SharedKnownPeers::new_static(vec![]), - ) - .await, - create_storage(create_temp_dir()).await, - block_header.clone(), - params.clone(), - round_zero_timestamp, - Some(server_config.private_key.clone()), - ) - .await - .unwrap(); - let mut other_nodes = Vec::new(); - for config in &other_configs { - let consensus = Consensus::new( - create_test_dms(config.clone(), dms_key.clone(), peers.clone()).await, - create_storage(create_temp_dir()).await, - block_header.clone(), - params.clone(), - round_zero_timestamp, - Some(config.private_key.clone()), - ) - .await - .unwrap(); - other_nodes.push(consensus); - } - - // Make a block to propose - let dummy_block_hash = Hash256::hash("dummy_block"); - server_node - .register_verified_block_hash(dummy_block_hash) - .await - .unwrap(); - for other_node in &mut other_nodes { - other_node - .register_verified_block_hash(dummy_block_hash) - .await - .unwrap(); - } - - // [Step 1] - // Action: The server node proposes a block - // Expected: The server node will propose a block and prevotes on it. - let timestamp = get_timestamp(); - let mut result = Vec::new(); - result.extend( - server_node - .set_proposal_candidate(dummy_block_hash, timestamp) - .await - .unwrap(), - ); - result.extend(server_node.progress(get_timestamp()).await.unwrap()); - let expected = vec![ - ProgressResult::Proposed(0, dummy_block_hash, timestamp), - ProgressResult::NonNilPreVoted(0, dummy_block_hash, timestamp), - ]; - assert_eq!(result, expected); - - // Action: Non-server nodes fetch messages from the server and make progress. - // Expected: Node 0, 1 will prevote, node 2, 3 will precommit. - let serve_task = tokio::spawn(async { server_node.serve(5_000).await }); - for (i, other_node) in other_nodes.iter_mut().enumerate() { - other_node.fetch().await.unwrap(); - let timestamp = get_timestamp(); - let result = other_node.progress(timestamp).await.unwrap(); - sleep_ms(200).await; - let mut expected = vec![ProgressResult::NonNilPreVoted( - 0, - dummy_block_hash, - timestamp, - )]; - // The nodes will broadcast precommits as well if they see prevotes over 2/3. - if i == 2 || i == 3 { - expected.push(ProgressResult::NonNilPreCommitted( - 0, - dummy_block_hash, - timestamp, - )); - } - assert_eq!(result, expected); - } - let mut server_node = serve_task.await.unwrap().unwrap(); - - // Check if prevotes and precommits are broadcasted well. - let received_messages = server_node.read_messages().await.unwrap(); - let mut expected_received_messages = vec![ - ( - ConsensusMessage::Proposal { - round: 0, - valid_round: None, - block_hash: dummy_block_hash, - }, - server_config.public_key.clone(), - ), - ( - ConsensusMessage::NonNilPreVoted( - 0, - dummy_block_hash, - prevote(dummy_block_hash, &server_config.private_key), - ), - server_config.public_key.clone(), - ), - ]; - for (i, config) in other_configs.iter().enumerate() { - expected_received_messages.push(( - ConsensusMessage::NonNilPreVoted( - 0, - dummy_block_hash, - prevote(dummy_block_hash, &config.private_key), - ), - config.public_key.clone(), - )); - if i == 2 || i == 3 { - expected_received_messages.push(( - ConsensusMessage::NonNilPreCommitted( - 0, - dummy_block_hash, - precommit(dummy_block_hash, &config.private_key), - ), - config.public_key.clone(), - )); - } - } - assert_eq_unordered(&expected_received_messages, &received_messages); - - // [Step 2] - // Action: The server node progresses. - // Expected: The server node will precommit. - let timestamp = get_timestamp(); - let result = server_node.progress(timestamp).await.unwrap(); - let expected = vec![ProgressResult::NonNilPreCommitted( - 0, - dummy_block_hash, - timestamp, - )]; - assert_eq!(result, expected); - - // Action: Non-server nodes fetch and progress. - // Expected: Node 0, 1 will precommit and finalize, node 2, 3 will only finalize. - let serve_task = tokio::spawn(async { server_node.serve(5_000).await }); - let mut finalization_proofs = Vec::new(); - for (i, other_node) in other_nodes.iter_mut().enumerate() { - other_node.fetch().await.unwrap(); - let timestamp = get_timestamp(); - let result = other_node.progress(timestamp).await.unwrap(); - sleep_ms(200).await; - for r in &result { - debug!("{:?}", r); - } - let precommit = ProgressResult::NonNilPreCommitted(0, dummy_block_hash, timestamp); - if i == 0 || i == 1 { - assert_eq!(result.len(), 2); - assert_eq!(result[0], precommit); - match &result[1] { - ProgressResult::Finalized(hash, time, proof) => { - assert_eq!(*hash, dummy_block_hash); - assert_eq!(*time, timestamp); - finalization_proofs.push(proof.clone()); - } - _ => panic!("expect finalization"), - } - } else { - assert_eq!(result.len(), 1); - match &result[0] { - ProgressResult::Finalized(hash, time, proof) => { - assert_eq!(*hash, dummy_block_hash); - assert_eq!(*time, timestamp); - finalization_proofs.push(proof.clone()); - } - _ => panic!("expect finalization"), - } - } - } - let mut server_node = serve_task.await.unwrap().unwrap(); - - let received_messages = server_node.read_messages().await.unwrap(); - // Check if precommits are broadcasted well. - expected_received_messages.push(( - ConsensusMessage::NonNilPreCommitted( - 0, - dummy_block_hash, - precommit(dummy_block_hash, &server_config.private_key), - ), - server_config.public_key.clone(), - )); - for config in other_configs.iter().take(2) { - expected_received_messages.push(( - ConsensusMessage::NonNilPreCommitted( - 0, - dummy_block_hash, - precommit(dummy_block_hash, &config.private_key), - ), - config.public_key.clone(), - )); - } - assert_eq_unordered(&expected_received_messages, &received_messages); - - // [Step 3] - // Action: The server node progresses. - // Expected: The server node will finalize. - let timestamp = get_timestamp(); - let result = server_node.progress(timestamp).await.unwrap(); - assert_eq!(result.len(), 1); - match &result[0] { - ProgressResult::Finalized(hash, time, proof) => { - assert_eq!(*hash, dummy_block_hash); - assert_eq!(*time, timestamp); - finalization_proofs.push(proof.clone()); - } - _ => panic!("expect finalization"), - } - - // Action: Non-server nodes fetch and progress. - // Expected: No operation. - let serve_task = tokio::spawn(async { server_node.serve(3_000).await }); - for other_node in other_nodes.iter_mut() { - other_node.fetch().await.unwrap(); - let timestamp = get_timestamp(); - let result = other_node.progress(timestamp).await.unwrap_err(); - assert_eq!( - result.to_string(), - "operation on finalized state".to_string() - ); - } - let _ = serve_task.await.unwrap().unwrap(); - - // Todo: verify finalization proofs -} - -#[tokio::test(flavor = "multi_thread")] -async fn single_seperate_server_propose_1() { - setup_test(); - let (network_id, dms_key) = get_network_id_and_dms_key("single_seperate_server_propose_1"); - - let voting_powers = vec![1, 1, 1, 1, 1]; - let num_nodes = voting_powers.len(); - let params = ConsensusParams { - timeout_ms: 60 * 1_000, // 1 minute - repeat_round_for_first_leader: 100, - }; - let round_zero_timestamp = get_timestamp(); - - let (server_config, other_configs, peers) = - setup_server_client_nodes(network_id.clone(), num_nodes - 1).await; - - let block_header = configs_to_block_header( - once(&server_config).chain(&other_configs).collect(), - voting_powers, - ); - - // Create client nodes that will be executed by CLI. - let mut nodes = Vec::new(); - for config in once(&server_config).chain(other_configs.iter()) { - let consensus = Consensus::new( - create_test_dms(config.clone(), dms_key.clone(), peers.clone()).await, - create_storage(create_temp_dir()).await, - block_header.clone(), - params.clone(), - round_zero_timestamp, - Some(config.private_key.clone()), - ) - .await - .unwrap(); - nodes.push(consensus); - } - - // Create a server node. - let server_node = Consensus::new( - // NOTE: Ongoing issue (#318). - // Check https://github.com/postech-dao/simperby/issues/318 - // to see why `SharedKnownPeers::new_static` is used for the server node. - create_test_dms( - server_config.clone(), - dms_key, - SharedKnownPeers::new_static(vec![]), - ) - .await, - create_storage(create_temp_dir()).await, - block_header, - params, - round_zero_timestamp, - Some(server_config.private_key.clone()), - ) - .await - .unwrap(); - - // Serve for 1 hour. - let serve_task = tokio::spawn(async { server_node.serve(3_600 * 1_000).await.unwrap() }); - - let mut results: Vec> = vec![vec![], vec![], vec![], vec![], vec![]]; - - // Make a block to propose. - let dummy_block_hash = Hash256::hash("dummy_block"); - for node in &mut nodes { - node.register_verified_block_hash(dummy_block_hash) - .await - .unwrap(); - } - - // Propose the block. - let proposer = &mut nodes[0]; - proposer - .set_proposal_candidate(dummy_block_hash, get_timestamp()) - .await - .unwrap(); - - // The following loop mimics how the actual users will manipulate the node with CLI. - // Fetch, progress, sleep, and repeat five times. - for (_trial, i) in (0..5).cartesian_product(0..5) { - let node = &mut nodes[i]; - node.fetch().await.unwrap(); - match node.progress(get_timestamp()).await { - Ok(r) => results[i].extend(r), - Err(e) => assert_eq!(e.to_string(), "operation on finalized state".to_string()), - } - sleep_ms(200).await; - } - - // Check if the nodes have made consensus. - for result in results { - let last = result.last().unwrap(); - match last { - ProgressResult::Finalized(block_hash, _timestamp, _finalization_proof) => { - assert_eq!(*block_hash, dummy_block_hash) - } - _ => panic!("expect finalization"), - } - } - - // Clean up. - serve_task.abort(); - let _ = serve_task.await; -} From 167ac09ad6117d7eb0a3c1601be2008436b22b5c Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sun, 9 Apr 2023 19:34:56 +0900 Subject: [PATCH 21/24] Introduce integration test for consensus --- consensus/tests/integration_test.rs | 138 ++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/consensus/tests/integration_test.rs b/consensus/tests/integration_test.rs index 8b137891..77c7ca22 100644 --- a/consensus/tests/integration_test.rs +++ b/consensus/tests/integration_test.rs @@ -1 +1,139 @@ +use simperby_consensus::*; +use simperby_core::*; +use simperby_network::*; +use simperby_test_suite::*; +use std::sync::Arc; +use tokio::sync::RwLock; +#[tokio::test] +async fn basic_1() { + setup_test(); + + let network_id = "consensus-basic-1".to_string(); + let ((server_network_config, server_private_key), client_network_configs_and_keys, members, fi) = + setup_server_client_nodes(network_id.clone(), 4).await; + let path = create_temp_dir(); + StorageImpl::create(&path).await.unwrap(); + let storage = StorageImpl::open(&path).await.unwrap(); + + let mut server_node = Consensus::new( + Arc::new(RwLock::new( + create_test_dms( + network_id.clone(), + members.clone(), + server_private_key.clone(), + ) + .await, + )), + storage, + fi.header.clone(), + ConsensusParams { + timeout_ms: 6000, + repeat_round_for_first_leader: 10, + }, + 0, + Some(server_private_key), + ) + .await + .unwrap(); + + let mut client_nodes = Vec::new(); + for (network_config, private_key) in client_network_configs_and_keys { + let path = create_temp_dir(); + StorageImpl::create(&path).await.unwrap(); + let storage = StorageImpl::open(&path).await.unwrap(); + + client_nodes.push(( + Consensus::new( + Arc::new(RwLock::new( + create_test_dms(network_id.clone(), members.clone(), private_key.clone()).await, + )), + storage, + fi.header.clone(), + ConsensusParams { + timeout_ms: 6000, + repeat_round_for_first_leader: 10, + }, + 0, + Some(private_key.clone()), + ) + .await + .unwrap(), + network_config, + )); + } + + let block_hash = Hash256::hash("block"); + server_node + .register_verified_block_hash(block_hash) + .await + .unwrap(); + for (node, _) in client_nodes.iter_mut() { + node.register_verified_block_hash(block_hash).await.unwrap(); + } + + let serve_task = tokio::spawn(async move { + let task = tokio::spawn(Dms::serve(server_node.get_dms(), server_network_config)); + sleep_ms(5000).await; + task.abort(); + let _ = task.await; + server_node.update().await.unwrap(); + server_node.progress(0).await.unwrap(); + assert_eq!( + server_node + .check_finalized() + .await + .unwrap() + .unwrap() + .block_hash, + block_hash + ); + }); + + async fn sync(client_nodes: &mut [(Consensus, ClientNetworkConfig)]) { + for (node, network_config) in client_nodes.iter_mut() { + node.flush().await.unwrap(); + dms::DistributedMessageSet::broadcast(node.get_dms(), network_config) + .await + .unwrap(); + } + for (node, network_config) in client_nodes.iter_mut() { + dms::DistributedMessageSet::fetch(node.get_dms(), network_config) + .await + .unwrap(); + node.update().await.unwrap(); + } + } + + client_nodes[0] + .0 + .set_proposal_candidate(block_hash, 0) + .await + .unwrap(); + // PROPOSE + for (node, _) in client_nodes.iter_mut() { + node.progress(0).await.unwrap(); + } + sync(&mut client_nodes).await; + // PREVOTE + for (node, _) in client_nodes.iter_mut() { + node.progress(0).await.unwrap(); + } + sync(&mut client_nodes).await; + // PRECOMMIT + for (node, _) in client_nodes.iter_mut() { + node.progress(0).await.unwrap(); + } + sync(&mut client_nodes).await; + // FINALIZE + for (node, _) in client_nodes.iter_mut() { + node.progress(0).await.unwrap(); + } + for (node, _) in client_nodes.iter_mut() { + assert_eq!( + node.check_finalized().await.unwrap().unwrap().block_hash, + block_hash + ); + } + serve_task.await.unwrap(); +} From 19b1ab844169f1c9fa20f7c7bfe0515d6a9340ec Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Tue, 11 Apr 2023 02:09:54 +0900 Subject: [PATCH 22/24] Update client finalization behavior --- simperby/src/lib.rs | 133 ++++++++++++++++++++++++-------------------- 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/simperby/src/lib.rs b/simperby/src/lib.rs index 3c482673..eb748112 100644 --- a/simperby/src/lib.rs +++ b/simperby/src/lib.rs @@ -15,8 +15,8 @@ use tokio::sync::RwLock; pub use crate::types::*; -/// An instance of Simperby client (a.k.a. a 'node'). -pub struct Client { +/// A client for a single height. +struct ClientInner { config: types::Config, auth: Auth, path: String, @@ -25,6 +25,11 @@ pub struct Client { consensus: Consensus, } +/// An instance of Simperby client (a.k.a. a 'node'). +pub struct Client { + inner: Option, +} + impl Client { pub async fn genesis(path: &str) -> Result<()> { let repository = RawRepository::open(path).await?; @@ -43,32 +48,35 @@ impl Client { let lfi = repository.read_last_finalization_info().await?; Ok(Self { - config, - auth: auth.clone(), - path: path.to_string(), - repository, - governance: Governance::new(Arc::new(RwLock::new(governance_dms)), lfi.clone()).await?, - consensus: Consensus::new( - Arc::new(RwLock::new(consensus_dms)), - consensus_state, - lfi.header, - ConsensusParams { - timeout_ms: 10000000, - repeat_round_for_first_leader: 100, - }, - get_timestamp(), - Some(auth.private_key), - ) - .await?, + inner: Some(ClientInner { + config, + auth: auth.clone(), + path: path.to_string(), + repository, + governance: Governance::new(Arc::new(RwLock::new(governance_dms)), lfi.clone()) + .await?, + consensus: Consensus::new( + Arc::new(RwLock::new(consensus_dms)), + consensus_state, + lfi.header, + ConsensusParams { + timeout_ms: 10000000, + repeat_round_for_first_leader: 100, + }, + get_timestamp(), + Some(auth.private_key), + ) + .await?, + }), }) } pub fn config(&self) -> &types::Config { - &self.config + &self.inner.as_ref().unwrap().config } pub fn auth(&self) -> &Auth { - &self.auth + &self.inner.as_ref().unwrap().auth } pub async fn clean(&mut self, _hard: bool) -> Result<()> { @@ -76,25 +84,26 @@ impl Client { } pub fn repository(&self) -> &DistributedRepository { - &self.repository + &self.inner.as_ref().unwrap().repository } pub fn repository_mut(&mut self) -> &mut DistributedRepository { - &mut self.repository + &mut self.inner.as_mut().unwrap().repository } /// Makes a progress for the consensus, returning the result. /// /// TODO: it has to consume the object if finalized. pub async fn progress_for_consensus(&mut self) -> Result { - let result = self.consensus.progress(get_timestamp()).await?; + let mut this = self.inner.take().unwrap(); + let result = this.consensus.progress(get_timestamp()).await?; let report = format!("{result:?}"); for result in result { if let ProgressResult::Finalized(Finalization { block_hash, proof, .. }) = result { - let commit_hash = self + let commit_hash = this .repository .read_blocks() .await? @@ -102,14 +111,22 @@ impl Client { .find(|(_, h)| *h == block_hash) .ok_or_else(|| eyre::eyre!("finalized block can't be found in repository"))? .0; - self.repository.finalize(commit_hash, proof).await?; + this.repository.finalize(commit_hash, proof).await?; + let path = this.path.clone(); + let config = this.config.clone(); + let auth = this.auth.clone(); + drop(this); + self.inner = Some(Self::open(&path, config, auth).await?.inner.unwrap()); + return Ok(report); } } + self.inner = Some(this); Ok(report) } pub async fn vote(&mut self, agenda_commit: CommitHash) -> Result<()> { - let agendas = self.repository.read_agendas().await?; + let this = self.inner.as_mut().unwrap(); + let agendas = this.repository.read_agendas().await?; let agenda_hash = if let Some(x) = agendas.iter().find(|(x, _)| *x == agenda_commit) { x.1 } else { @@ -118,8 +135,8 @@ impl Client { agenda_commit )); }; - self.repository.vote(agenda_commit).await?; - self.governance.vote(agenda_hash).await?; + this.repository.vote(agenda_commit).await?; + this.governance.vote(agenda_hash).await?; Ok(()) } @@ -140,14 +157,15 @@ impl Client { /// Adds remote repositories according to current peer information. pub async fn add_remote_repositories(&mut self) -> Result<()> { - for peer in &self.config.peers { + let this = self.inner.as_mut().unwrap(); + for peer in &this.config.peers { let port = if let Some(p) = peer.ports.get("repository") { p } else { continue; }; let url = format!("git://{}:{port}/", peer.address.ip()); - self.repository + this.repository .get_raw() .write() .await @@ -162,26 +180,21 @@ impl Client { config: ServerConfig, git_hook_verifier: simperby_repository::server::PushVerifier, ) -> Result>> { + let this = self.inner.unwrap(); let network_config = ServerNetworkConfig { port: config.governance_port, }; - let t1 = async move { - Dms::serve(self.governance.get_dms(), network_config) - .await - .unwrap() - }; + let dms = this.governance.get_dms(); + let t1 = async move { Dms::serve(dms, network_config).await.unwrap() }; let network_config = ServerNetworkConfig { port: config.consensus_port, }; - let t2 = async move { - Dms::serve(self.consensus.get_dms(), network_config) - .await - .unwrap() - }; + let dms = this.consensus.get_dms(); + let t2 = async move { Dms::serve(dms, network_config).await.unwrap() }; let t3 = async move { let _server = simperby_repository::server::run_server( - &self.path, + &this.path, config.repository_port, git_hook_verifier, ) @@ -196,26 +209,27 @@ impl Client { } pub async fn update(&mut self) -> Result<()> { + let this = self.inner.as_mut().unwrap(); let network_config = ClientNetworkConfig { - peers: self.config.peers.clone(), + peers: this.config.peers.clone(), }; - Dms::fetch(self.governance.get_dms(), &network_config).await?; - Dms::fetch(self.consensus.get_dms(), &network_config).await?; - self.repository.get_raw().write().await.fetch_all().await?; - self.repository.sync_all().await?; + Dms::fetch(this.governance.get_dms(), &network_config).await?; + Dms::fetch(this.consensus.get_dms(), &network_config).await?; + this.repository.get_raw().write().await.fetch_all().await?; + this.repository.sync_all().await?; // Update governance - self.governance.update().await?; - for (agenda_hash, agenda_proof) in self.governance.get_eligible_agendas().await? { - self.repository + this.governance.update().await?; + for (agenda_hash, agenda_proof) in this.governance.get_eligible_agendas().await? { + this.repository .approve(&agenda_hash, agenda_proof.proof, get_timestamp()) .await?; } // Update consensus - self.consensus.update().await?; - for (_, block_hash) in self.repository.read_blocks().await? { - self.consensus + this.consensus.update().await?; + for (_, block_hash) in this.repository.read_blocks().await? { + this.consensus .register_verified_block_hash(block_hash) .await?; } @@ -223,14 +237,15 @@ impl Client { } pub async fn broadcast(&mut self) -> Result<()> { + let this = self.inner.as_mut().unwrap(); let network_config = ClientNetworkConfig { - peers: self.config.peers.clone(), + peers: this.config.peers.clone(), }; - self.governance.flush().await?; - Dms::broadcast(self.governance.get_dms(), &network_config).await?; - self.consensus.flush().await?; - Dms::broadcast(self.consensus.get_dms(), &network_config).await?; - self.repository.broadcast().await?; + this.governance.flush().await?; + Dms::broadcast(this.governance.get_dms(), &network_config).await?; + this.consensus.flush().await?; + Dms::broadcast(this.consensus.get_dms(), &network_config).await?; + this.repository.broadcast().await?; Ok(()) } } From f9899adf44156e1fa94800f563154e9d2a30ad1b Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sat, 22 Apr 2023 23:39:05 +0900 Subject: [PATCH 23/24] Clear storage before finalization --- simperby/src/lib.rs | 2 ++ simperby/src/storage.rs | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/simperby/src/lib.rs b/simperby/src/lib.rs index eb748112..306df697 100644 --- a/simperby/src/lib.rs +++ b/simperby/src/lib.rs @@ -116,6 +116,8 @@ impl Client { let config = this.config.clone(); let auth = this.auth.clone(); drop(this); + storage::clear(&path).await?; + storage::init(&path, config.clone()).await?; self.inner = Some(Self::open(&path, config, auth).await?.inner.unwrap()); return Ok(report); } diff --git a/simperby/src/storage.rs b/simperby/src/storage.rs index ed358a06..e6ba402f 100644 --- a/simperby/src/storage.rs +++ b/simperby/src/storage.rs @@ -84,3 +84,10 @@ pub(crate) async fn open( let consensus_state = StorageImpl::open(&consensus_state_path(path)).await?; Ok((governance_dms, consensus_dms, consensus_state, repository)) } + +pub(crate) async fn clear(path: &str) -> Result<()> { + let _ = tokio::fs::remove_dir_all(&governance_dms_path(path)).await; + let _ = tokio::fs::remove_dir_all(&consensus_dms_path(path)).await; + let _ = tokio::fs::remove_dir_all(&consensus_state_path(path)).await; + Ok(()) +} From d619320d6cbb410d53a7e5974502dfa721b537c9 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Sat, 29 Apr 2023 17:24:59 +0900 Subject: [PATCH 24/24] Remove unnecessary clone --- network/src/dms/server.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/network/src/dms/server.rs b/network/src/dms/server.rs index 842984bd..f8414075 100644 --- a/network/src/dms/server.rs +++ b/network/src/dms/server.rs @@ -8,7 +8,6 @@ impl DistributedMessageSet { ) -> Result<(), Error> { let rpc_task = async move { let wrapped_dms = Arc::new(parking_lot::RwLock::new(Some(dms))); - let wrapped_dms_ = Arc::clone(&wrapped_dms); struct DropHelper { wrapped_dms: Arc>>>>, } @@ -17,12 +16,14 @@ impl DistributedMessageSet { self.wrapped_dms.write().take().unwrap(); } } - let _drop_helper = DropHelper { wrapped_dms }; + let _drop_helper = DropHelper { + wrapped_dms: Arc::clone(&wrapped_dms), + }; run_server( network_config.port, [( "dms".to_owned(), - create_http_object(Arc::new(DmsWrapper { dms: wrapped_dms_ }) + create_http_object(Arc::new(DmsWrapper { dms: wrapped_dms }) as Arc), )] .iter()