diff --git a/.github/workflows/docker_build.yml b/.github/workflows/docker_build.yml index c9c62632f8..21ac373bca 100644 --- a/.github/workflows/docker_build.yml +++ b/.github/workflows/docker_build.yml @@ -92,5 +92,5 @@ jobs: chmod 600 private_key.pem sudo apt update sudo apt install -y --no-install-recommends openssh-server - ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && docker image prune -a -f && docker ps | grep main_debug | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep main_debug | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:main_debug' && docker run --rm -v /root:/root ghcr.io/rooch-network/rooch:main_debug server clean -n dev -f && docker run -d -v /root:/root -p 6767:6767 -p 9184:9184 'ghcr.io/rooch-network/rooch:main_debug' server start -n dev --btc-rpc-url '${{secrets.BTC_REGTEST_RPC_URL}}' --btc-rpc-username rooch-regtest --btc-rpc-password '${{secrets.BTC_REGTEST_RPC_PWD}}' --da '{\"internal-da-server\": {\"servers\": [{\"open-da\": {\"scheme\": \"fs\"}}]}}' --traffic-burst-size 100000 --traffic-per-second 1" + ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST bash -c "'sleep 30' && docker image prune -a -f && docker ps | grep main_debug | awk '{print \$1}' | xargs -r docker stop && docker ps -a | grep main_debug | awk '{print \$1}' | xargs -r docker rm -f && docker pull 'ghcr.io/rooch-network/rooch:main_debug' && docker run --rm -v /root:/root ghcr.io/rooch-network/rooch:main_debug server clean -n dev -f && docker run -d -v /root:/root -p 6767:6767 -p 9184:9184 'ghcr.io/rooch-network/rooch:main_debug' server start -n dev --btc-rpc-url '${{secrets.BTC_REGTEST_RPC_URL}}' --btc-rpc-username rooch-regtest --btc-rpc-password '${{secrets.BTC_REGTEST_RPC_PWD}}' --da '{\"da-backend\": {\"backends\": [{\"open-da\": {\"scheme\": \"fs\"}}]}}' --traffic-burst-size 100000 --traffic-per-second 1" ssh -o StrictHostKeyChecking=no -i private_key.pem $USER@$HOST "cd /root/rooch && git pull origin main && bash scripts/check_dev_deploy_status.sh main_debug '${{ secrets.TESTNET_MNEMONIC_PHRASE }}'" diff --git a/Cargo.lock b/Cargo.lock index 1365be738b..3c23fbc475 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9558,10 +9558,13 @@ name = "rooch-config" version = "0.7.3" dependencies = [ "anyhow", + "celestia-types", "clap 4.5.17", "dirs", "dirs-next", + "hex", "moveos-config", + "moveos-types", "once_cell", "rooch-types", "serde 1.0.210", @@ -9601,15 +9604,15 @@ dependencies = [ "celestia-rpc", "celestia-types", "coerce", - "futures", "log", - "lz4", "moveos-types", "opendal", "rooch-config", + "rooch-store", + "rooch-types", "serde 1.0.210", "serde_yaml 0.9.34+deprecated", - "xxhash-rust", + "tokio", ] [[package]] @@ -9647,7 +9650,6 @@ dependencies = [ "async-trait", "coerce", "function_name", - "hex", "log", "metrics", "move-core-types", @@ -10211,6 +10213,7 @@ version = "0.7.3" dependencies = [ "accumulator", "anyhow", + "moveos-common", "moveos-config", "moveos-types", "once_cell", @@ -10256,6 +10259,7 @@ dependencies = [ "framework-builder", "framework-types", "hex", + "lz4", "move-binary-format", "move-command-line-common", "move-core-types", @@ -10276,6 +10280,7 @@ dependencies = [ "strum_macros", "thiserror", "tracing", + "xxhash-rust", ] [[package]] diff --git a/crates/rooch-config/Cargo.toml b/crates/rooch-config/Cargo.toml index dd9cb0ded1..d432b3297e 100644 --- a/crates/rooch-config/Cargo.toml +++ b/crates/rooch-config/Cargo.toml @@ -22,6 +22,9 @@ dirs = { workspace = true } once_cell = { workspace = true } clap = { workspace = true } dirs-next = { workspace = true } +celestia-types = { workspace = true } +hex = { workspace = true } rooch-types = { workspace = true } moveos-config = { workspace = true } +moveos-types = { workspace = true } diff --git a/crates/rooch-config/src/config.rs b/crates/rooch-config/src/config.rs index db639c1311..24d31d38c9 100644 --- a/crates/rooch-config/src/config.rs +++ b/crates/rooch-config/src/config.rs @@ -4,17 +4,9 @@ use anyhow::Context; use serde::de::DeserializeOwned; use serde::Serialize; -use std::collections::HashMap; -use std::error::Error; use std::fs; use std::path::{Path, PathBuf}; -pub enum MapConfigValueSource { - ConfigKey, // Value came from the presence of a key in the configuration - Environment, // Value came from the environment - Default, // Value came from a defined default value -} - pub trait Config where Self: DeserializeOwned + Serialize, @@ -82,113 +74,3 @@ impl std::ops::DerefMut for PersistedConfig { &mut self.inner } } - -pub fn parse_hashmap( - s: &str, -) -> Result, Box> { - s.split(',') - .filter(|kv| !kv.is_empty()) - .map(|kv| { - let mut parts = kv.splitn(2, '='); - match (parts.next(), parts.next()) { - (Some(key), Some(value)) if !key.trim().is_empty() => { - Ok((key.to_string(), value.to_string())) - } - (Some(""), Some(_)) => Err("key is missing before '='".into()), - _ => { - Err("each key=value pair must be separated by a comma and contain a key".into()) - } - } - }) - .collect() -} - -// value order: -// 1. key value -// 2. env value -// 3. default value -pub fn retrieve_map_config_value( - config: &mut HashMap, - key: &str, - env_var: Option<&str>, - default_var: &str, -) -> MapConfigValueSource { - if config.contains_key(key) { - return MapConfigValueSource::ConfigKey; - } - - if let Some(env_var) = env_var { - if let Ok(env_var_value) = std::env::var(env_var) { - // env_var exists - config.insert(key.to_string(), env_var_value.clone()); - return MapConfigValueSource::Environment; - } - } - - // Use the default - config.insert(key.to_string(), default_var.to_string()); - MapConfigValueSource::Default -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_hashmap_ok() { - let input = "key1=VALUE1,key2=value2"; - let output = parse_hashmap(input).unwrap(); - - let mut expected = HashMap::new(); - expected.insert("key1".to_string(), "VALUE1".to_string()); - expected.insert("key2".to_string(), "value2".to_string()); - - assert_eq!(output, expected); - } - - #[test] - fn test_parse_hashmap_empty_value() { - let input = "key1=,key2=value2"; - let output = parse_hashmap(input).unwrap(); - - let mut expected = HashMap::new(); - expected.insert("key1".to_string(), "".to_string()); - expected.insert("key2".to_string(), "value2".to_string()); - - assert_eq!(output, expected); - } - - #[test] - fn test_parse_hashmap_empty_string() { - let input = ""; - let output = parse_hashmap(input).unwrap(); - - let expected = HashMap::new(); - - assert_eq!(output, expected); - } - - #[test] - fn test_parse_hashmap_missing_value() { - let input = "key1,key2=value2"; - let output = parse_hashmap(input); - - assert!(output.is_err()); - } - - #[test] - fn test_parse_hashmap_missing_key() { - let input = "=value1,key2=value2"; - let output = parse_hashmap(input); - - assert!(output.is_err()); - } - - #[test] - fn test_parse_hashmap_no_equals_sign() { - let input = "key1value1,key2=value2"; - let output = parse_hashmap(input); - - assert!(output.is_err()); - } -} diff --git a/crates/rooch-config/src/da_config.rs b/crates/rooch-config/src/da_config.rs index bc6fbc36a4..1ed292582f 100644 --- a/crates/rooch-config/src/da_config.rs +++ b/crates/rooch-config/src/da_config.rs @@ -2,19 +2,19 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config::Config; -use crate::config::{parse_hashmap, retrieve_map_config_value, MapConfigValueSource}; -use crate::BaseConfig; -use clap::Parser; +use crate::{retrieve_map_config_value, BaseConfig, MapConfigValueSource}; +use celestia_types::nmt::Namespace; +use hex::encode; +use moveos_types::h256::sha2_256_of; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::collections::HashMap; use std::fmt::Display; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; -static R_DEFAULT_OPENDA_FS_DIR: Lazy = Lazy::new(|| PathBuf::from("openda_fs")); +static R_DEFAULT_OPENDA_FS_DIR: Lazy = Lazy::new(|| PathBuf::from("openda-fs")); #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "lowercase")] @@ -26,20 +26,46 @@ pub enum DAServerSubmitStrategy { Number(usize), } +impl FromStr for DAServerSubmitStrategy { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "all" => Ok(DAServerSubmitStrategy::All), + "quorum" => Ok(DAServerSubmitStrategy::Quorum), + _ => { + if let Ok(n) = s.parse::() { + Ok(DAServerSubmitStrategy::Number(n)) + } else { + Err(format!("invalid da server submit strategy: {}", s)) + } + } + } + } +} + +impl Display for DAServerSubmitStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DAServerSubmitStrategy::All => write!(f, "all"), + DAServerSubmitStrategy::Quorum => write!(f, "quorum"), + DAServerSubmitStrategy::Number(n) => write!(f, "{}", n), + } + } +} + #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "lowercase")] pub enum OpenDAScheme { - // local filesystem, for developing only, config: - // root: file path to the root directory + // local filesystem, main config: + // root: file path #[default] Fs, - // gcs(Google Could Service) main config: + // gcs(Google Could Service), main config: // bucket - // root - // credential (it's okay to pass credential file path here, it'll be handled it automatically) + // credential/credential_path (using path instead) Gcs, - // s3 config: - // root + // s3, main config: // bucket // region // endpoint @@ -48,24 +74,37 @@ pub enum OpenDAScheme { S3, } -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -#[serde(rename_all = "kebab-case")] -pub enum InternalDAServerConfigType { - Celestia(DAServerCelestiaConfig), - OpenDa(DAServerOpenDAConfig), +impl Display for OpenDAScheme { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OpenDAScheme::Fs => write!(f, "fs"), + OpenDAScheme::Gcs => write!(f, "gcs"), + OpenDAScheme::S3 => write!(f, "s3"), + } + } +} + +impl FromStr for OpenDAScheme { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "gcs" => Ok(OpenDAScheme::Gcs), + "s3" => Ok(OpenDAScheme::S3), + "fs" => Ok(OpenDAScheme::Fs), + _ => Err("open-da scheme no match"), + } + } } -#[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize, Parser)] +#[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "kebab-case")] pub struct DAConfig { #[serde(skip_serializing_if = "Option::is_none")] - #[clap(name = "internal-da-server", long, help = "internal da server config")] - pub internal_da_server: Option, + pub da_backend: Option, #[serde(skip)] - #[clap(skip)] base: Option>, - // TODO external da server config } impl Display for DAConfig { @@ -96,20 +135,37 @@ impl DAConfig { let default_fs_root = self.get_openda_fs_dir(); - if let Some(InternalDAServerConfig { servers, .. }) = &mut self.internal_da_server { - for server in servers { - if let InternalDAServerConfigType::OpenDa(open_da_config) = server { + if let Some(backend_config) = &mut self.da_backend { + backend_config.adjust_submit_strategy(); + let backends = &mut backend_config.backends; + + for backend in backends { + if let DABackendConfigType::OpenDa(open_da_config) = backend { if matches!(open_da_config.scheme, OpenDAScheme::Fs) { - let var_source = retrieve_map_config_value( - &mut open_da_config.config, - "root", - None, - default_fs_root.to_str().unwrap(), - ); - if let MapConfigValueSource::Default = var_source { - if !default_fs_root.exists() { - std::fs::create_dir_all(default_fs_root.clone())?; + if let Some(fs_str) = default_fs_root.to_str() { + let var_source = retrieve_map_config_value( + &mut open_da_config.config, + "root", + None, + Some(fs_str), + ); + if let MapConfigValueSource::Default = var_source { + if !default_fs_root.exists() { + std::fs::create_dir_all(default_fs_root.clone()).map_err( + |e| { + anyhow::anyhow!( + "Failed to create OpenDA fs dir: {:?}", + e + ) + }, + )?; + } } + } else { + return Err(anyhow::anyhow!( + "Invalid UTF-8 path: {:?}", + default_fs_root + )); } } } @@ -132,43 +188,20 @@ impl DAConfig { } } -#[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize, Parser)] +#[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] #[serde(deny_unknown_fields)] -pub struct InternalDAServerConfig { +pub struct DABackendConfig { #[serde(skip_serializing_if = "Option::is_none")] - #[clap( - name = "submit-strategy", - long, - help = "specifies the submission strategy of internal DA servers to be used. 'all' with all servers, 'quorum' with quorum servers, 'n' with n servers, etc." - )] - pub submit_strategy: Option, - #[clap( - name = "servers", - long, - help = "specifies the type of internal DA servers to be used. 'celestia' with corresponding Celestia server configuration, 'xxx' with corresponding xxx server configuration, etc." - )] - pub servers: Vec, + pub submit_strategy: Option, // specifies the submission strategy of DA. 'all' with all backends, 'quorum' with quorum backends, 'n' with n backends, etc. + pub backends: Vec, // specifies the type of DA backends to be used. 'celestia' with corresponding Celestia backend configuration, 'foo' with corresponding foo backend configuration, etc. } -impl InternalDAServerConfig { - pub fn adjust_submit_strategy(&mut self) { - let servers_count = self.servers.len(); - - // Set default strategy to All if it's None. - let strategy = self - .submit_strategy - .get_or_insert(DAServerSubmitStrategy::All); - - // If it's a Number, adjust the value to be within [1, n]. - if let DAServerSubmitStrategy::Number(ref mut num) = strategy { - *num = std::cmp::max(1, std::cmp::min(*num, servers_count)); - } - } - +impl DABackendConfig { pub fn calculate_submit_threshold(&mut self) -> usize { self.adjust_submit_strategy(); // Make sure submit_strategy is adjusted before calling this function. - let servers_count = self.servers.len(); + let servers_count = self.backends.len(); match self.submit_strategy { Some(DAServerSubmitStrategy::All) => servers_count, Some(DAServerSubmitStrategy::Quorum) => servers_count / 2 + 1, @@ -176,282 +209,117 @@ impl InternalDAServerConfig { None => servers_count, // Default to 'All' if submit_strategy is None } } -} - -impl FromStr for InternalDAServerConfig { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - let deserialized = serde_json::from_str(s)?; - Ok(deserialized) - } -} + fn adjust_submit_strategy(&mut self) { + let servers_count = self.backends.len(); -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Parser)] -#[serde(deny_unknown_fields)] -pub struct DAServerCelestiaConfig { - #[serde(skip_serializing_if = "Option::is_none")] - #[clap(name = "namespace", long, help = "celestia namespace")] - pub namespace: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[clap(name = "conn", long, help = "celestia node connection")] - pub conn: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[clap(name = "auth-token", long, help = "celestia node auth token")] - pub auth_token: Option, - // for celestia: - // support for up to 8 MB blocks, starting with 2MB at genesis and upgradeable through onchain governance. - #[serde(skip_serializing_if = "Option::is_none")] - #[clap( - name = "max-segment-size", - long, - help = "max segment size, striking a balance between throughput and the constraints on blob size." - )] - pub max_segment_size: Option, -} + // Set default strategy to All if it's None. + let strategy = self + .submit_strategy + .get_or_insert(DAServerSubmitStrategy::All); -impl Default for DAServerCelestiaConfig { - fn default() -> Self { - Self { - namespace: None, - conn: None, - auth_token: None, - max_segment_size: Some(1024 * 1024), + // If it's a Number, adjust the value to be within [1, n]. + if let DAServerSubmitStrategy::Number(ref mut num) = strategy { + *num = std::cmp::max(1, std::cmp::min(*num, servers_count)); } } } -impl DAServerCelestiaConfig { - pub fn new_with_defaults(mut self) -> Self { - let default = DAServerCelestiaConfig::default(); - if self.max_segment_size.is_none() { - self.max_segment_size = default.max_segment_size; - } - self - } +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum DABackendConfigType { + Celestia(DABackendCelestiaConfig), + OpenDa(DABackendOpenDAConfig), } -impl FromStr for InternalDAServerConfigType { - type Err = String; - - fn from_str(s: &str) -> Result { - let v: Value = - serde_json::from_str(s).map_err(|e| format!("Error parsing JSON: {}, {}", e, s))?; - - if let Some(obj) = v.as_object() { - if let Some(celestia) = obj.get("celestia") { - let celestia_config: DAServerCelestiaConfig = - serde_json::from_value(celestia.clone()).map_err(|e| { - format!( - "invalid celestia config: {} error: {}, original: {}", - celestia, e, s - ) - })?; - Ok(InternalDAServerConfigType::Celestia(celestia_config)) - } else if let Some(openda) = obj.get("open-da") { - let openda_config: DAServerOpenDAConfig = serde_json::from_value(openda.clone()) - .map_err(|e| { - format!( - "invalid open-da config: {}, error: {}, original: {}", - openda, e, s - ) - })?; - Ok(InternalDAServerConfigType::OpenDa(openda_config)) - } else { - Err(format!("Invalid value: {}", s)) - } - } else { - Err(format!("Invalid value: {}", s)) - } - } +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +#[serde(deny_unknown_fields)] +/// Celestia provides ability to access Celestia node +pub struct DABackendCelestiaConfig { + /// celestia namespace + pub namespace: Namespace, + /// celestia node connection + pub conn: String, + /// celestia node auth token + pub auth_token: String, + #[serde(skip_serializing_if = "Option::is_none")] + /// max segment size, striking a balance between throughput and the constraints on blob size in celestia network. + /// Set at crates/rooch-da/src/backend/celestia if None. + pub max_segment_size: Option, } -// Open DA provides ability to access various storage services -#[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize, Parser)] +#[derive(Clone, Default, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] #[serde(deny_unknown_fields)] -pub struct DAServerOpenDAConfig { - #[clap( - name = "scheme", - long, - value_enum, - default_value = "fs", - help = "specifies the type of storage service to be used. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc." - )] +/// Open DA provides ability to access various storage services +pub struct DABackendOpenDAConfig { + /// specifies the type of storage service to be used. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc #[serde(default)] pub scheme: OpenDAScheme, - #[clap( - name = "config", - long, - value_parser = parse_hashmap, - help = "specifies the configuration of the storage service. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc." - )] - #[serde(default)] + /// specifies the configuration of the storage service. 'gcs' with corresponding GCS server configuration, 's3' with corresponding S3 server configuration, etc. pub config: HashMap, - #[serde(skip_serializing_if = "Option::is_none")] - #[clap( - name = "max-segment-size", - long, - help = "max segment size, striking a balance between throughput and the constraints on blob size." - )] + /// / is the path to store the segment. If not set, the / is the full path + pub namespace: Option, + #[serde(skip_serializing_if = "Option::is_none")] + /// max segment size. + /// Set at crates/rooch-da/src/backend/openda if None. pub max_segment_size: Option, } -impl FromStr for DAServerSubmitStrategy { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "all" => Ok(DAServerSubmitStrategy::All), - "quorum" => Ok(DAServerSubmitStrategy::Quorum), - _ => { - if let Ok(n) = s.parse::() { - Ok(DAServerSubmitStrategy::Number(n)) - } else { - Err(format!("invalid da server submit strategy: {}", s)) - } - } - } - } -} - -impl Display for OpenDAScheme { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - OpenDAScheme::Fs => write!(f, "fs"), - OpenDAScheme::Gcs => write!(f, "gcs"), - OpenDAScheme::S3 => write!(f, "s3"), - } - } -} - -impl FromStr for OpenDAScheme { - type Err = &'static str; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "gcs" => Ok(OpenDAScheme::Gcs), - "s3" => Ok(OpenDAScheme::S3), - "fs" => Ok(OpenDAScheme::Fs), - _ => Err("open-da scheme no match"), - } - } +/// Derive a namespace from genesis config for DA backend (could be used for open-da backend) +pub fn derive_genesis_namespace(genesis: &[u8]) -> String { + let raw = encode(sha2_256_of(genesis).0); + // get first 7 bytes is enough to avoid collision for genesis config + raw.chars().take(7).collect() } #[cfg(test)] mod tests { use super::*; + use celestia_types::nmt::Namespace; #[test] - fn test_adjust_submit_strategy_default_to_all() { - let mut config = InternalDAServerConfig { - submit_strategy: None, - servers: vec![], // Empty for this test + fn da_config_from_str() { + let da_config_str = r#"{"da-backend": {"submit-strategy": "all", + "backends": [{"celestia": {"namespace": "//////////////////////////////////////8=", "conn": "test-conn", "auth-token": "test-token", "max-segment-size": 2048}}, + {"open-da": {"scheme": "gcs", "config": {"bucket": "test-bucket", "credential": "test-credential"}}}]}}"#; + let exp_celestia_config = DABackendCelestiaConfig { + namespace: Namespace::PARITY_SHARE, + conn: "test-conn".to_string(), + auth_token: "test-token".to_string(), + max_segment_size: Some(2048), }; - config.adjust_submit_strategy(); - assert_eq!(config.submit_strategy, Some(DAServerSubmitStrategy::All)); - } - - #[test] - fn test_adjust_submit_strategy_number_too_low() { - let mut config = InternalDAServerConfig { - submit_strategy: Some(DAServerSubmitStrategy::Number(0)), - servers: vec![ - InternalDAServerConfigType::Celestia(DAServerCelestiaConfig::default()); - 2 - ], // Two servers for this test - }; - config.adjust_submit_strategy(); - assert_eq!( - config.submit_strategy, - Some(DAServerSubmitStrategy::Number(1)) - ); - } - - #[test] - fn test_adjust_submit_strategy_number_too_high() { - let mut config = InternalDAServerConfig { - submit_strategy: Some(DAServerSubmitStrategy::Number(5)), - servers: vec![ - InternalDAServerConfigType::Celestia(DAServerCelestiaConfig::default()); - 3 - ], // Three servers for this test + let exp_openda_config = DABackendOpenDAConfig { + scheme: OpenDAScheme::Gcs, + config: vec![ + ("bucket".to_string(), "test-bucket".to_string()), + ("credential".to_string(), "test-credential".to_string()), + ] + .into_iter() + .collect(), + namespace: None, + max_segment_size: None, }; - config.adjust_submit_strategy(); - assert_eq!( - config.submit_strategy, - Some(DAServerSubmitStrategy::Number(3)) - ); - } - - #[test] - fn test_adjust_submit_strategy_number_within_range() { - let mut config = InternalDAServerConfig { - submit_strategy: Some(DAServerSubmitStrategy::Number(2)), - servers: vec![ - InternalDAServerConfigType::Celestia(DAServerCelestiaConfig::default()); - 4 - ], // Four servers for this test + let exp_da_config = DAConfig { + da_backend: Some(DABackendConfig { + submit_strategy: Some(DAServerSubmitStrategy::All), + backends: vec![ + DABackendConfigType::Celestia(exp_celestia_config.clone()), + DABackendConfigType::OpenDa(exp_openda_config.clone()), + ], + }), + base: None, }; - config.adjust_submit_strategy(); - assert_eq!( - config.submit_strategy, - Some(DAServerSubmitStrategy::Number(2)) - ); - } - - #[test] - fn test_internal_da_server_config_str() { - let celestia_config_str = r#"{"celestia": {"namespace": "test_namespace", "conn": "test_conn", "auth_token": "test_token", "max_segment_size": 2048}}"#; - let openda_config_str = r#"{"open-da": {"scheme": "gcs", "config": {"Param1": "value1", "param2": "Value2"}, "max_segment_size": 2048}}"#; - let invalid_config_str = r#"{"unknown": {...}}"#; - - match InternalDAServerConfigType::from_str(celestia_config_str) { - Ok(InternalDAServerConfigType::Celestia(celestia_config)) => { - assert_eq!( - celestia_config, - DAServerCelestiaConfig { - namespace: Some("test_namespace".to_string()), - conn: Some("test_conn".to_string()), - auth_token: Some("test_token".to_string()), - max_segment_size: Some(2048), - } - ); - } - Ok(_) => { - panic!("Expected Celestia Config"); - } - Err(e) => { - panic!("Error parsing Celestia Config: {}", e) - } - } - - let mut config: HashMap = HashMap::new(); - config.insert("Param1".to_string(), "value1".to_string()); - config.insert("param2".to_string(), "Value2".to_string()); - - match InternalDAServerConfigType::from_str(openda_config_str) { - Ok(InternalDAServerConfigType::OpenDa(openda_config)) => { - assert_eq!( - openda_config, - DAServerOpenDAConfig { - scheme: OpenDAScheme::Gcs, - config, - max_segment_size: Some(2048), - } - ); - } - Ok(_) => { - panic!("Expected OpenDA Config"); + println!("exp_da_config: {}", exp_da_config); + match DAConfig::from_str(da_config_str) { + Ok(da_config) => { + assert_eq!(da_config, exp_da_config); } Err(e) => { - panic!("Error parsing OpenDA Config: {}", e) + panic!("Error parsing DA Config: {}", e) } } - - if InternalDAServerConfigType::from_str(invalid_config_str).is_err() { - } else { - panic!("Expected Error for invalid config"); - } } } diff --git a/crates/rooch-config/src/lib.rs b/crates/rooch-config/src/lib.rs index a57b163151..2ba11f8f69 100644 --- a/crates/rooch-config/src/lib.rs +++ b/crates/rooch-config/src/lib.rs @@ -13,6 +13,7 @@ use rooch_types::rooch_network::{BuiltinChainID, RoochChainID, RoochNetwork}; use rooch_types::service_status::ServiceStatus; use rooch_types::service_type::ServiceType; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fs::create_dir_all; use std::str::FromStr; use std::sync::Arc; @@ -34,6 +35,13 @@ pub static R_DEFAULT_BASE_DATA_DIR: Lazy = Lazy::new(|| { .join(".rooch") }); +pub enum MapConfigValueSource { + MapConfig, // Value came from the presence of a key in the map configuration + Environment, // Value came from the environment + Default, // Value came from a defined default value + None, // Value is not present in the map configuration, environment, or default value +} + pub fn rooch_config_dir() -> Result { get_rooch_config_dir().and_then(|dir| { if !dir.exists() { @@ -373,3 +381,33 @@ impl ServerOpt { .unwrap_or_else(|| RoochChainID::default().chain_name()) } } + +// value order: +// 1. config map +// 2. env value +// 3. default value +pub fn retrieve_map_config_value( + map_config: &mut HashMap, + key: &str, + env_var: Option<&str>, + default_var: Option<&str>, +) -> MapConfigValueSource { + if map_config.contains_key(key) { + return MapConfigValueSource::MapConfig; + } + + if let Some(env_var) = env_var { + if let Ok(env_var_value) = std::env::var(env_var) { + // env_var exists + map_config.insert(key.to_string(), env_var_value.clone()); + return MapConfigValueSource::Environment; + } + } + + // Use the default + if let Some(default_var) = default_var { + map_config.insert(key.to_string(), default_var.to_string()); + return MapConfigValueSource::Default; + } + MapConfigValueSource::None +} diff --git a/crates/rooch-da/Cargo.toml b/crates/rooch-da/Cargo.toml index 675d69c60a..daca30afde 100644 --- a/crates/rooch-da/Cargo.toml +++ b/crates/rooch-da/Cargo.toml @@ -24,10 +24,11 @@ bcs = { workspace = true } log = { workspace = true } coerce = { workspace = true } async-trait = { workspace = true } -futures = { workspace = true } opendal = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true } rooch-config = { workspace = true } -serde_yaml = { workspace = true } -xxhash-rust = { workspace = true, features = ["xxh3"] } -lz4 = { workspace = true } +rooch-types = { workspace = true } +rooch-store = { workspace = true } + diff --git a/crates/rooch-da/README.md b/crates/rooch-da/README.md index dd1f71d125..6827306891 100644 --- a/crates/rooch-da/README.md +++ b/crates/rooch-da/README.md @@ -7,20 +7,24 @@ Merge to specification. ## Overview -In Rooch, DA (Data Availability) is a mechanism to ensure that the data(tx lists batch) is avail for async verification: -DA must ensure that within the challenge period, every batch is accessible to at least one honest node. +In Rooch, DA (Data Availability) is the `input` in async states verification process: + +verification: + +1. `input` + `state transition function` = `actual output` +2. compare `actual output` with `expected output` ### Target -For serving the async verification, according I/O model, we have: +1. **Equivalent to Bitcoin Consensus**: Pack DA into Bitcoin block. +2. **Self-Verifying**: Anyone could verify DA by checksum and sign. +3. **Open**: DA could be anywhere, anyone could access it without permission. -Data producer is sequencer: DA provides the producer non-blocking writes as possible. +## Key Concepts -Data consumer is verifier: DA provides the consumer the most immediate data reads as possible. +### DA Stream -To meet our objectives, we require DA to render data visibility as rapidly as possible, -thus enabling verifiers to carry out asynchronous batch data verification after the data digests being documented on L1. -In other words, we don't have to wait for the data/digests to be written to L1 to start the verification. +[DA Stream](./docs/stream.md) is a continuous flow of data from sequencer to verifier. It is a sequence of DA Batch. ## Data Flow @@ -32,61 +36,44 @@ In Verifier's perspective (verifier verifies tx), the data flow is as follows: 2. sequencer batch maker buffer transactions to batch for lower average latency & gas cost 3. sequencer batch maker put to DA server 4. verifier get batch from DA server by: - 1. pull stream from DA server (after booking) - 2. get batch from DA server by batch number - 3. get segment from DA backend (after being submitted to DA backend) - -### Put - -Put includes three actions: - -1. Sequencer batch maker put data to DA server -2. Sequencer batch maker put batch meta to L1 -3. DA server put data to DA backend + 1. pull DA stream from DA server (after booking) + 2. get batch from DA server by batch hash + 3. get segments from DA backend (after being submitted to DA backend) -#### Sequencer to DA Server +## Roles -Sequencer put batch to DA server, blocking until DA server return response. If failed, sequencer will retry: +### Sequencer -In present, sequencer will keep retrying until success. After more DA servers(decentralized) are deployed, majority voting will be introduced. +Tx batch maker. Each sequencer maintains its own DA server. -##### Put Policy (TODO) +### DA Server -Put Policy is a policy to determine data persistence behaviour: +Has responsibilities: -1. lifecycle -2. hot/warm/cold storage -3. encode or not -4. ... +1. public DA Backend info for anyone could access DA data easily. +2. provides Put/Get interface for DA by DA Server. -Put Policy is a part of Sequencer configuration. Its principal objective is to provide a more balanced storage solution in terms of both performance and cost. +Each DA server could connect to multiple DA backends. -#### Sequencer to L1 +## DA Backend -After batch finalized, sequencer will put batch meta to L1 immediately. +The purpose of DA backend is to mitigate the single point of risk associated with DA server. DA server, +not the backend, remains the principal party responsible for data publication. Therefore, the DA server may elect to +submit data to DA backend asynchronously. -#### DA Server to DA Backend +## DA Server APIs -DA server must register on L2 with backend information and update the backend information on L2 prior to each change of backend. - -The purpose of DA backend is to mitigate the single point of risk associated with DA server. DA server, -not the backend, remains the principal party responsible for data publication. Therefore, the DA server may elect to submit data to DA backend asynchronously. - -##### Stream +### Put -DA server is obligated to pay fees to DA backend and is subject to its interface restrictions. -Particularly in the forthcoming decentralized DA server cluster, faced with a variety of different DA backend implementations, -we require the DA server to maintain flexibility and low cost in its implementation while providing a unified interface. -Rooch Network accomplishes our objectives by treating the transaction sequence as a stream and flexibly dividing it into segments: +Put includes these actions: -1. Each network has its own stream. -2. Several batch form an chunk for better compression ratio. -3. Every chunk, once compressed, will be partitioned into numerous segments to comply with the block size restrictions of the DA backend. -Simultaneously, this approach aids in augmenting parallelism. +1. Sequencer puts data to DA server +2. Sequencer packs data meta to Bitcoin +3. DA server put data to DA backends ### Get -There are various ways to get batch data. DA Batch could be verified by meta on L1. +There are various ways to get batch data. DA Batch could be verified by meta on Bitcoin. #### Bypass DA server accessing DA Backend directly @@ -94,7 +81,8 @@ Verifier could access DA backend directly to get data. However, it's not recomme 1. DA backend might lag behind the most recent data, given the likelihood of its data being uploaded asynchronously. 2. DA backend might be slow to respond to requests, DA server is the professional storage node. -3. DA server, accountable for data accessibility, risks forfeiture of its deposit via arbitration if it fails to meet the conditions of data availability. +3. DA server, accountable for data accessibility, risks forfeiture of its deposit via arbitration if it fails to meet + the conditions of data availability. This methodology may be employed to access data in the event that all DA servers are unable to respond appropriately. @@ -102,44 +90,6 @@ This methodology may be employed to access data in the event that all DA servers Verifier subscribe to a data stream from the DA server. -#### Get DA Batch by DA server - -DA server maintains a batch index, which is updated in real time as new batches are added. Anyone could get batch by batch number. - -#### Scaling - -Anyone can become an unpledged, non-liability DA server to facilitate horizontal scaling of data access, -with each DA server being homogeneous. Pledged nodes will receive data pushed by the sequencer, garner community rewards, -and be supervised by the community. Unpledged nodes can fetch data from other nodes; -these may be nodes operated by the Rooch network to enhance network throughput, -backup nodes established by other community participants for various needs, -or efforts made with the intention of joining the pledged network in the future. - -#### Data Integrity (TODO) - -Each batch is recorded on L1 via an accumulator. Given that the segments, post-partitioning, -are ultimately written onto the DA backend, the batches can be sufficiently large to compensate for the speed difference between L1 and L2. -Verifiers can initially trust the data from DA server optimistically, and perform verification once L1 completes the synchronization of the accumulator. -DA server will sign the data summary, and if it does not match with L1, a challenge can be initiated against DA server. - -## Roadmap - -DA is not merely a coding project, but also a community venture. Rooch Network will gradually achieve decentralized DA services in unison with the community. - -### Phase 1: Single DA Server - -Operated by the Rooch Network. - -#### As a component of Sequencer: PoC - -Using RPC to communicate with light client of DA backend directly: - -1. Offload DA details from Sequencer -2. Scaling independently of Sequencer -3. Fault isolation - -#### Independent from Sequencer - -### Phase 2: Decentralized DA Servers +#### Get DA Batch by DA server (TODO) -Data Visibility (DV) will be the acceleration component of DA. \ No newline at end of file +DA server maintains a batch index, which is updated in real time as new batches are added. Anyone could get batch. diff --git a/crates/rooch-da/docs/stream.md b/crates/rooch-da/docs/stream.md index 3fb0e184f4..62201fdfd7 100644 --- a/crates/rooch-da/docs/stream.md +++ b/crates/rooch-da/docs/stream.md @@ -3,11 +3,25 @@ DA Stream DA Stream is a continuous flow of data from sequencer to verifier. It is a sequence of DA Batch. +All efforts on DA is to maintain a single trustable DA data stream with high-performance and low cost. + +DA server is obligated to pay fees to DA backend and is subject to its interface restrictions. +Particularly in the forthcoming decentralized DA server cluster, faced with a variety of different DA backend +implementations, +we require the DA server to maintain flexibility and low cost in its implementation while providing a unified interface. +Rooch Network accomplishes our objectives by treating the transaction sequence as a stream and flexibly dividing it into +segments: + +1. Each network(e.g. main/test) has its own stream. +2. Several batch form a chunk for better compression ratio. +3. Every chunk, once compressed, will be partitioned into numerous segments to comply with the block size restrictions + of the DA backend. Simultaneously, this approach aids in augmenting parallelism. + ## Batch A batch is a collection of transactions. It is the unit of data flow in DA Stream. -Each batch maps to a L2 block. +Each batch maps to a range of tx. ## Chunk diff --git a/crates/rooch-da/src/actor/da.rs b/crates/rooch-da/src/actor/da.rs deleted file mode 100644 index 3a8a482405..0000000000 --- a/crates/rooch-da/src/actor/da.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::{Arc, RwLock}; - -use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use coerce::actor::context::ActorContext; -use coerce::actor::message::Handler; -use coerce::actor::system::ActorSystem; -use coerce::actor::{Actor, IntoActor}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; - -use rooch_config::da_config::{DAConfig, InternalDAServerConfigType}; - -use crate::messages::{Batch, PutBatchInternalDAMessage}; -use crate::server::celestia::actor::server::DAServerCelestiaActor; -use crate::server::celestia::proxy::DAServerCelestiaProxy; -use crate::server::openda::actor::server::DAServerOpenDAActor; -use crate::server::openda::proxy::DAServerOpenDAProxy; -use crate::server::serverproxy::DAServerProxy; - -// TODO tx buffer for building batch -pub struct DAActor { - internal_servers: InternalServers, -} - -struct InternalServers { - servers: Arc>>>, - submit_threshold: usize, -} - -impl Actor for DAActor {} - -impl DAActor { - pub async fn new(da_config: DAConfig, actor_system: &ActorSystem) -> Result { - // internal servers - let mut servers: Vec> = Vec::new(); - let mut submit_threshold = 1; - let mut success_count = 0; - - // server config has higher priority than submit threshold - if let Some(internal_da_server_config) = &da_config.internal_da_server { - let mut server_config = internal_da_server_config.clone(); - submit_threshold = server_config.calculate_submit_threshold(); - - for server_config_type in &server_config.servers { - if let InternalDAServerConfigType::Celestia(celestia_config) = server_config_type { - let da_server = DAServerCelestiaActor::new(celestia_config) - .await - .into_actor(Some("DAServerCelestia"), actor_system) - .await?; - servers.push(Arc::new(DAServerCelestiaProxy::new( - da_server.clone().into(), - ))); - success_count += 1; - } - if let InternalDAServerConfigType::OpenDa(openda_config) = server_config_type { - let da_server = DAServerOpenDAActor::new(openda_config) - .await? - .into_actor( - Some(format!("DAServerOpenDA-{}", openda_config.scheme)), - actor_system, - ) - .await?; - servers.push(Arc::new(DAServerOpenDAProxy::new(da_server.clone().into()))); - success_count += 1; - } - } - } else { - servers.push(Arc::new(crate::server::serverproxy::DAServerNopProxy {})); - success_count += 1; - } - - if success_count < submit_threshold { - return Err(anyhow!( - "failed to start da: not enough servers for future submissions. exp>= {} act: {}", - submit_threshold, - success_count - )); - } - - Ok(Self { - internal_servers: InternalServers { - servers: Arc::new(RwLock::new(servers)), - submit_threshold, - }, - }) - } - - pub async fn submit_batch(&self, batch: Batch) -> Result<()> { - // TODO calc checksum - // TODO richer policy for multi servers - // TODO verify checksum - // TODO retry policy & log - - let servers = self.internal_servers.servers.read().unwrap().to_vec(); - let submit_threshold = self.internal_servers.submit_threshold; - - let mut futures_unordered = FuturesUnordered::new(); - for server in servers { - let server = Arc::clone(&server); - let batch = batch.clone(); - futures_unordered.push(async move { - server - .public_batch(PutBatchInternalDAMessage { - batch: batch.clone(), - }) - .await - }); - } - - let mut success_count = 0; - while let Some(result) = futures_unordered.next().await { - match result { - Ok(_) => { - success_count += 1; - if success_count >= submit_threshold { - return Ok(()); - } - } - Err(e) => { - log::warn!("{:?}, fail to submit batch to da server.", e); // TODO add da server name - } - } - } - - if success_count < submit_threshold { - Err(anyhow::Error::msg(format!( - "not enough successful submissions. exp>= {} act: {}", - submit_threshold, success_count - ))) - } else { - Ok(()) - } - } -} - -#[async_trait] -impl Handler for DAActor { - async fn handle(&mut self, msg: Batch, _ctx: &mut ActorContext) -> Result<()> { - self.submit_batch(msg).await - } -} diff --git a/crates/rooch-da/src/actor/messages.rs b/crates/rooch-da/src/actor/messages.rs new file mode 100644 index 0000000000..4819848524 --- /dev/null +++ b/crates/rooch-da/src/actor/messages.rs @@ -0,0 +1,28 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use coerce::actor::message::Message; +use rooch_types::da::batch::SignedDABatchMeta; +use rooch_types::transaction::LedgerTransaction; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PutDABatchMessage { + pub tx_order_start: u64, + pub tx_order_end: u64, + pub tx_list: Vec, +} + +impl Message for PutDABatchMessage { + type Result = anyhow::Result; +} + +impl PutDABatchMessage { + pub fn new(tx_order_start: u64, tx_order_end: u64, tx_list: Vec) -> Self { + Self { + tx_order_start, + tx_order_end, + tx_list, + } + } +} diff --git a/crates/rooch-da/src/actor/mod.rs b/crates/rooch-da/src/actor/mod.rs index a166495ec2..e20404f8fa 100644 --- a/crates/rooch-da/src/actor/mod.rs +++ b/crates/rooch-da/src/actor/mod.rs @@ -1,4 +1,5 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -pub mod da; +pub mod messages; +pub mod server; diff --git a/crates/rooch-da/src/actor/server.rs b/crates/rooch-da/src/actor/server.rs new file mode 100644 index 0000000000..7a7a7b1197 --- /dev/null +++ b/crates/rooch-da/src/actor/server.rs @@ -0,0 +1,262 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::actor::messages::PutDABatchMessage; +use crate::backend::celestia::CelestiaBackend; +use crate::backend::openda::OpenDABackend; +use crate::backend::DABackend; +use anyhow::anyhow; +use async_trait::async_trait; +use coerce::actor::context::ActorContext; +use coerce::actor::message::Handler; +use coerce::actor::Actor; +use moveos_types::h256::H256; +use rooch_config::da_config::{DABackendConfigType, DAConfig}; +use rooch_store::da_store::DAMetaStore; +use rooch_store::transaction_store::TransactionStore; +use rooch_store::RoochStore; +use rooch_types::crypto::RoochKeyPair; +use rooch_types::da::batch::{BlockRange, DABatch, SignedDABatchMeta}; +use rooch_types::transaction::LedgerTransaction; +use std::sync::Arc; + +pub struct DAServerActor { + sequencer_key: RoochKeyPair, + rooch_store: RoochStore, + last_block_number: Option, + + backends: Vec>, + submit_threshold: usize, +} + +impl Actor for DAServerActor {} + +impl DAServerActor { + pub async fn new( + da_config: DAConfig, + sequencer_key: RoochKeyPair, + rooch_store: RoochStore, + last_tx_order: Option, + genesis_namespace: String, + ) -> anyhow::Result { + let mut backends: Vec> = Vec::new(); + let mut submit_threshold = 1; + let mut act_backends = 0; + + // backend config has higher priority than submit threshold + if let Some(mut backend_config) = da_config.da_backend { + submit_threshold = backend_config.calculate_submit_threshold(); + + for backend_type in &backend_config.backends { + if let DABackendConfigType::Celestia(celestia_config) = backend_type { + let backend = CelestiaBackend::new(celestia_config).await?; + backends.push(Arc::new(backend)); + act_backends += 1; + } + if let DABackendConfigType::OpenDa(openda_config) = backend_type { + let backend = + OpenDABackend::new(openda_config, genesis_namespace.clone()).await?; + backends.push(Arc::new(backend)); + act_backends += 1; + } + } + } else { + backends.push(Arc::new(crate::backend::DABackendNopProxy {})); + act_backends += 1; + } + + if act_backends < submit_threshold { + return Err(anyhow!( + "failed to start da: not enough backends for future submissions. exp>= {} act: {}", + submit_threshold, + act_backends + )); + } + + rooch_store.catchup_submitting_blocks(last_tx_order)?; + let last_block_number = rooch_store.get_last_block_number()?; + + if let Some(last_block_number) = last_block_number { + let background_da_server = DAServerActor { + sequencer_key: sequencer_key.copy(), + rooch_store: rooch_store.clone(), + last_block_number: Some(last_block_number), + backends: backends.clone(), + submit_threshold, + }; + + tokio::spawn(async move { + if let Err(e) = background_da_server + .start_background_submit(last_block_number) + .await + { + log::error!("{:?}, fail to start background da submit.", e); + } + }); + } + + Ok(DAServerActor { + sequencer_key, + rooch_store, + last_block_number, + backends, + submit_threshold, + }) + } + + pub async fn submit_batch( + &mut self, + msg: PutDABatchMessage, + ) -> anyhow::Result { + let tx_order_start = msg.tx_order_start; + let tx_order_end = msg.tx_order_end; + let block_number = self + .rooch_store + .append_submitting_block(self.last_block_number, tx_order_start, tx_order_end) + .unwrap_or_else(|_| panic!("fail to append submitting block: last_block_number: {:?}; new block: tx_order_start: {:?}, tx_order_end: {:?}", + self.last_block_number, tx_order_start, tx_order_end)); + + let signed_meta = self + .submit_batch_raw( + BlockRange { + block_number, + tx_order_start, + tx_order_end, + }, + msg.tx_list, + ) + .await?; + self.last_block_number = Some(block_number); + + Ok(signed_meta) + } + + // should be idempotent + async fn submit_batch_raw( + &self, + block_range: BlockRange, + tx_list: Vec, + ) -> anyhow::Result { + let block_number = block_range.block_number; + let tx_order_start = block_range.tx_order_start; + let tx_order_end = block_range.tx_order_end; + + // create batch + let batch = DABatch::new( + block_number, + tx_order_start, + tx_order_end, + &tx_list, + self.sequencer_key.copy(), + ); + let batch_meta = batch.meta.clone(); + let meta_signature = batch.meta_signature.clone(); + + // submit batch + self.submit_batch_to_backends(batch).await?; + + // update block submitting state + match self + .rooch_store + .set_submitting_block_done(block_number, tx_order_start, tx_order_end) + { + Ok(_) => {} + Err(e) => { + log::warn!("{:?}, fail to set submitting block done.", e); + } + }; + Ok(SignedDABatchMeta { + meta: batch_meta, + signature: meta_signature, + }) + } + + async fn submit_batch_to_backends(&self, batch: DABatch) -> anyhow::Result<()> { + let backends = self.backends.clone(); + let submit_threshold = self.submit_threshold; + // submit to backend in order until meet submit_threshold + let mut success_count = 0; + for backend in &backends { + let submit_fut = backend.submit_batch(batch.clone()); + match submit_fut.await { + Ok(_) => { + success_count += 1; + if success_count >= submit_threshold { + break; + } + } + Err(e) => { + log::warn!("{:?}, fail to submit batch to backend.", e); + } + } + } + + if success_count < submit_threshold { + return Err(anyhow!( + "not enough successful submissions. exp>= {} act: {}", + submit_threshold, + success_count + )); + }; + Ok(()) + } + + // TODO: continue to submit blocks in the background even after all blocks <= init last_block_number have been submitted + async fn start_background_submit(&self, stop_block_number: u128) -> anyhow::Result<()> { + let cursor = self.rooch_store.get_background_submit_block_cursor()?; + let unsubmitted_blocks = self + .rooch_store + .get_submitting_blocks(cursor.unwrap_or(0), None)?; + + if unsubmitted_blocks.is_empty() { + return Ok(()); // nothing to do + } + + let mut submit_count = 0; + for block in unsubmitted_blocks { + let block_number = block.block_number; + if block_number > stop_block_number { + break; + } + let tx_order_start = block.tx_order_start; + let tx_order_end = block.tx_order_end; + // collect tx from start to end for rooch_store + let tx_orders: Vec = (tx_order_start..=tx_order_end).collect(); + let tx_hashes = self.rooch_store.get_tx_hashes(tx_orders)?; + let tx_hashes: Vec = tx_hashes + .into_iter() + .map(|tx_hash| tx_hash.unwrap()) + .collect(); + let mut tx_list: Vec = Vec::new(); + for tx_hash in tx_hashes { + let tx = self + .rooch_store + .get_transaction_by_hash(tx_hash)? + .unwrap_or_else(|| panic!("tx not found for hash: {:?}", tx_hash)); + tx_list.push(tx); + } + self.submit_batch_raw(block, tx_list).await?; + submit_count += 1; + if submit_count % 1024 == 0 { + // it's okay to set cursor a bit behind: submit_batch_raw set submitting block done, so it won't be submitted again after restart + self.rooch_store + .set_background_submit_block_cursor(block_number)?; + log::info!("da: submitted {} blocks in background", submit_count); + } + } + self.rooch_store + .set_background_submit_block_cursor(stop_block_number)?; + Ok(()) + } +} + +#[async_trait] +impl Handler for DAServerActor { + async fn handle( + &mut self, + msg: PutDABatchMessage, + _ctx: &mut ActorContext, + ) -> anyhow::Result { + self.submit_batch(msg).await + } +} diff --git a/crates/rooch-da/src/backend/celestia/mod.rs b/crates/rooch-da/src/backend/celestia/mod.rs new file mode 100644 index 0000000000..d04266f10f --- /dev/null +++ b/crates/rooch-da/src/backend/celestia/mod.rs @@ -0,0 +1,113 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::backend::DABackend; +use async_trait::async_trait; +use celestia_rpc::{BlobClient, Client}; +use celestia_types::blob::SubmitOptions; +use celestia_types::nmt::Namespace; +use celestia_types::{Blob, Commitment}; +use rooch_config::da_config::DABackendCelestiaConfig; +use rooch_types::da::batch::DABatch; +use rooch_types::da::chunk::{Chunk, ChunkV0}; +use rooch_types::da::segment::{Segment, SegmentID}; + +// In present, celestia supports for up to 8 MB blocks, starting with 2MB at genesis and upgradeable through onchain governance. +// The max segment size is set to 1MB for now. +const DEFAULT_CELESTIA_MAX_SEGMENT_SIZE: usize = 1024 * 1024; + +pub struct CelestiaBackend { + max_segment_size: usize, + client: CelestiaClient, +} + +#[async_trait] +impl DABackend for CelestiaBackend { + async fn submit_batch(&self, batch: DABatch) -> anyhow::Result<()> { + let chunk: ChunkV0 = batch.into(); + let segments = chunk.to_segments(self.max_segment_size); + for segment in segments { + let result = self.client.submit(segment).await?; + log::info!( + "submitted segment to celestia node, segment_id: {:?}, namespace: {:?}, commitment: {:?}, height: {}", + result.segment_id, + result.namespace, + result.commitment, + result.height, + ); + } + + Ok(()) + } +} + +impl CelestiaBackend { + pub async fn new(cfg: &DABackendCelestiaConfig) -> anyhow::Result { + let max_segment_size = cfg + .max_segment_size + .unwrap_or(DEFAULT_CELESTIA_MAX_SEGMENT_SIZE); + let client = CelestiaClient::new(cfg.namespace, &cfg.conn, &cfg.auth_token).await?; + + Ok(CelestiaBackend { + max_segment_size, + client, + }) + } +} + +struct CelestiaClient { + namespace: Namespace, + client: Client, +} + +pub struct SubmitBackendResult { + pub segment_id: SegmentID, + pub namespace: Namespace, + pub height: u64, + pub commitment: Commitment, +} + +impl CelestiaClient { + pub async fn new( + namespace: Namespace, + conn_str: &str, + auth_token: &str, + ) -> anyhow::Result { + let celestia_client = Client::new(conn_str, Option::from(auth_token)).await?; + Ok(CelestiaClient { + namespace, + client: celestia_client, + }) + } + + pub async fn submit( + &self, + segment: Box, + ) -> anyhow::Result { + let data = segment.to_bytes(); + let blob = Blob::new(self.namespace, data)?; + let segment_id = segment.get_id(); + // TODO backoff retry + match self + .client + .blob_submit(&[blob.clone()], SubmitOptions::default()) + .await + { + Ok(height) => Ok(SubmitBackendResult { + segment_id, + namespace: self.namespace, + height, + commitment: blob.commitment, + }), + Err(e) => { + log::warn!( + "failed to submit segment to celestia node, segment_id: {:?}, commitment: {:?}, error:{:?}", + segment_id, + blob.commitment, + e, + ); + Err(e.into()) + } + } + } +} diff --git a/crates/rooch-da/src/backend/mod.rs b/crates/rooch-da/src/backend/mod.rs new file mode 100644 index 0000000000..45d288d43e --- /dev/null +++ b/crates/rooch-da/src/backend/mod.rs @@ -0,0 +1,23 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use async_trait::async_trait; +use rooch_types::da::batch::DABatch; + +pub mod celestia; +pub mod openda; + +#[async_trait] +pub trait DABackend: Sync + Send { + async fn submit_batch(&self, batch: DABatch) -> anyhow::Result<()>; +} + +// DABackendNopProxy is a no-op implementation of DABackendProxy +pub struct DABackendNopProxy; + +#[async_trait] +impl DABackend for DABackendNopProxy { + async fn submit_batch(&self, _batch: DABatch) -> anyhow::Result<()> { + Ok(()) + } +} diff --git a/crates/rooch-da/src/server/openda/actor/server.rs b/crates/rooch-da/src/backend/openda/mod.rs similarity index 63% rename from crates/rooch-da/src/server/openda/actor/server.rs rename to crates/rooch-da/src/backend/openda/mod.rs index 06b0a19d64..9367cc60a3 100644 --- a/crates/rooch-da/src/server/openda/actor/server.rs +++ b/crates/rooch-da/src/backend/openda/mod.rs @@ -1,61 +1,56 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use anyhow::{anyhow, Result}; +use crate::backend::DABackend; +use anyhow::anyhow; use async_trait::async_trait; -use coerce::actor::context::ActorContext; -use coerce::actor::message::Handler; -use coerce::actor::Actor; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::{Operator, Scheme}; -use rooch_config::config::retrieve_map_config_value; +use rooch_config::da_config::{DABackendOpenDAConfig, OpenDAScheme}; +use rooch_config::retrieve_map_config_value; +use rooch_types::da::batch::DABatch; +use rooch_types::da::chunk::{Chunk, ChunkV0}; +use rooch_types::da::segment::SegmentID; use std::collections::HashMap; use std::path::Path; -use crate::chunk::{Chunk, ChunkV0}; -use rooch_config::da_config::{DAServerOpenDAConfig, OpenDAScheme}; +pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024; +pub const DEFAULT_MAX_RETRY_TIMES: usize = 4; -use crate::messages::PutBatchInternalDAMessage; -use crate::segment::SegmentID; +#[async_trait] +impl DABackend for OpenDABackend { + async fn submit_batch(&self, batch: DABatch) -> anyhow::Result<()> { + self.pub_batch(batch).await + } +} -pub struct DAServerOpenDAActor { +pub struct OpenDABackend { + prefix: String, + scheme: OpenDAScheme, max_segment_size: usize, operator: Operator, } -pub const CHUNK_V0_PREFIX: &str = "chunk_v0"; -pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 4 * 1024 * 1024; -pub const DEFAULT_MAX_RETRY_TIMES: usize = 4; - -impl Actor for DAServerOpenDAActor {} - -impl DAServerOpenDAActor { - pub async fn new(cfg: &DAServerOpenDAConfig) -> Result { +impl OpenDABackend { + pub async fn new( + cfg: &DABackendOpenDAConfig, + genesis_namespace: String, + ) -> anyhow::Result { let mut config = cfg.clone(); let op: Operator = match config.scheme { OpenDAScheme::Fs => { // root must be existed - if !config.config.contains_key("root") { - return Err(anyhow!( - "key 'root' must be existed in config for scheme {:?}", - OpenDAScheme::Fs - )); - } + check_config_exist(OpenDAScheme::Fs, &config.config, "root")?; new_retry_operator(Scheme::Fs, config.config, None).await? } OpenDAScheme::Gcs => { - // If certain keys don't exist in the map, set them from environment - if !config.config.contains_key("bucket") { - if let Ok(bucket) = std::env::var("OPENDA_GCS_BUCKET") { - config.config.insert("bucket".to_string(), bucket); - } - } - if !config.config.contains_key("root") { - if let Ok(root) = std::env::var("OPENDA_GCS_ROOT") { - config.config.insert("root".to_string(), root); - } - } + retrieve_map_config_value( + &mut config.config, + "bucket", + Some("OPENDA_GCS_BUCKET"), + Some("rooch-openda-dev"), + ); if !config.config.contains_key("credential") { if let Ok(credential) = std::env::var("OPENDA_GCS_CREDENTIAL") { config.config.insert("credential".to_string(), credential); @@ -84,7 +79,7 @@ impl DAServerOpenDAActor { &mut config.config, "default_storage_class", Some("OPENDA_GCS_DEFAULT_STORAGE_CLASS"), - "STANDARD", + Some("STANDARD"), ); check_config_exist(OpenDAScheme::Gcs, &config.config, "bucket")?; @@ -100,41 +95,53 @@ impl DAServerOpenDAActor { (Err(_), Ok(_)) => (), (Err(_), Err(_)) => { - return Err(anyhow!("either 'credential' or 'credential_path' must exist in config for scheme {:?}", OpenDAScheme::Gcs)); + return Err(anyhow!( + "credential no found in config for scheme {:?}", + OpenDAScheme::Gcs + )); } } // After setting defaults, proceed with creating Operator new_retry_operator(Scheme::Gcs, config.config, None).await? } - _ => Err(anyhow!("unsupported open-da scheme: {:?}", config.scheme))?, + OpenDAScheme::S3 => { + todo!("s3 backend is not implemented yet"); + } }; Ok(Self { + prefix: config.namespace.unwrap_or(genesis_namespace), + scheme: config.scheme, max_segment_size: cfg.max_segment_size.unwrap_or(DEFAULT_MAX_SEGMENT_SIZE) as usize, operator: op, }) } - pub async fn pub_batch(&self, batch_msg: PutBatchInternalDAMessage) -> Result<()> { - let chunk: ChunkV0 = batch_msg.batch.into(); - let segments = chunk.to_segments(self.max_segment_size); + pub async fn pub_batch(&self, batch: DABatch) -> anyhow::Result<()> { + let chunk: ChunkV0 = batch.into(); + + let prefix = self.prefix.clone(); + let max_segment_size = self.max_segment_size; + let segments = chunk.to_segments(max_segment_size); for segment in segments { let bytes = segment.to_bytes(); match self - .write_segment(segment.get_id(), bytes, CHUNK_V0_PREFIX.to_string()) + .write_segment(segment.get_id(), bytes, Some(prefix.clone())) .await { Ok(_) => { log::info!( - "submitted segment to open-da node, segment: {:?}", + "submitted segment to open-da scheme: {:?}, segment_id: {:?}", + self.scheme, segment.get_id(), ); } Err(e) => { log::warn!( - "failed to submit segment to open-da node, segment_id: {:?}, error:{:?}", + "failed to submit segment to open-da scheme: {:?}, segment_id: {:?}, error:{:?}", + self.scheme, segment.get_id(), e, ); @@ -150,9 +157,12 @@ impl DAServerOpenDAActor { &self, segment_id: SegmentID, segment_bytes: Vec, - prefix: String, - ) -> Result<()> { - let path = format!("{}/{}", prefix, segment_id); + prefix: Option, + ) -> anyhow::Result<()> { + let path = match prefix { + Some(prefix) => format!("{}/{}", prefix, segment_id), + None => segment_id.to_string(), + }; let mut w = self.operator.writer(&path).await?; w.write(segment_bytes).await?; w.close().await?; @@ -164,7 +174,7 @@ fn check_config_exist( scheme: OpenDAScheme, config: &HashMap, key: &str, -) -> Result<()> { +) -> anyhow::Result<()> { if config.contains_key(key) { Ok(()) } else { @@ -180,7 +190,7 @@ async fn new_retry_operator( scheme: Scheme, config: HashMap, max_retry_times: Option, -) -> Result { +) -> anyhow::Result { let mut op = Operator::via_map(scheme, config)?; let max_times = max_retry_times.unwrap_or(DEFAULT_MAX_RETRY_TIMES); op = op @@ -189,14 +199,3 @@ async fn new_retry_operator( op.check().await?; Ok(op) } - -#[async_trait] -impl Handler for DAServerOpenDAActor { - async fn handle( - &mut self, - msg: PutBatchInternalDAMessage, - _ctx: &mut ActorContext, - ) -> Result<()> { - self.pub_batch(msg).await - } -} diff --git a/crates/rooch-da/src/lib.rs b/crates/rooch-da/src/lib.rs index 18af348be9..0edb2d3233 100644 --- a/crates/rooch-da/src/lib.rs +++ b/crates/rooch-da/src/lib.rs @@ -2,8 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod actor; -pub mod chunk; -pub mod messages; +pub mod backend; pub mod proxy; -pub mod segment; -pub mod server; diff --git a/crates/rooch-da/src/messages.rs b/crates/rooch-da/src/messages.rs deleted file mode 100644 index 624684f136..0000000000 --- a/crates/rooch-da/src/messages.rs +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use coerce::actor::message::Message; -use serde::{Deserialize, Serialize}; - -use moveos_types::h256::H256; - -/// The batch in Rooch is constructed by the batch submitter, representing a batch of transactions, mapping to a L2 block -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -pub struct Batch { - /// each batch maps to a L2 block - pub block_number: u128, - /// How many transactions in the batch - pub tx_count: u64, - /// The previous tx accumulator root of the block - pub prev_tx_accumulator_root: H256, - /// The tx accumulator root after the last transaction append to the accumulator - pub tx_accumulator_root: H256, - - /// sha256h of data - pub batch_hash: H256, - /// encoded tx(LedgerTransaction) list - pub data: Vec, -} - -impl Message for Batch { - type Result = Result<()>; -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct PutBatchInternalDAMessage { - pub batch: Batch, - // TODO add put policy -} - -impl Message for PutBatchInternalDAMessage { - type Result = Result<()>; -} diff --git a/crates/rooch-da/src/proxy/mod.rs b/crates/rooch-da/src/proxy/mod.rs index c779b13f7c..676c23c580 100644 --- a/crates/rooch-da/src/proxy/mod.rs +++ b/crates/rooch-da/src/proxy/mod.rs @@ -1,22 +1,22 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use crate::actor::messages::PutDABatchMessage; +use crate::actor::server::DAServerActor; use coerce::actor::ActorRef; - -use crate::actor::da::DAActor; -use crate::messages::Batch; +use rooch_types::da::batch::SignedDABatchMeta; #[derive(Clone)] -pub struct DAProxy { - pub actor: ActorRef, +pub struct DAServerProxy { + pub actor: ActorRef, } -impl DAProxy { - pub fn new(actor: ActorRef) -> Self { +impl DAServerProxy { + pub fn new(actor: ActorRef) -> Self { Self { actor } } - pub async fn submit_batch(&self, batch: Batch) -> anyhow::Result<()> { - self.actor.send(batch).await? + pub async fn pub_batch(&self, msg: PutDABatchMessage) -> anyhow::Result { + self.actor.send(msg).await? } } diff --git a/crates/rooch-da/src/server/README.md b/crates/rooch-da/src/server/README.md deleted file mode 100644 index 4f5de70cfd..0000000000 --- a/crates/rooch-da/src/server/README.md +++ /dev/null @@ -1,4 +0,0 @@ -Internal DA Server -================== - -This crate contains the internal DA server which is inside the Rooch server. \ No newline at end of file diff --git a/crates/rooch-da/src/server/celestia/README.md b/crates/rooch-da/src/server/celestia/README.md deleted file mode 100644 index bff30a9e5c..0000000000 --- a/crates/rooch-da/src/server/celestia/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Celestia Server - -DA Server implementation of using Celestia as DA backend. \ No newline at end of file diff --git a/crates/rooch-da/src/server/celestia/actor/server.rs b/crates/rooch-da/src/server/celestia/actor/server.rs deleted file mode 100644 index dad211ae30..0000000000 --- a/crates/rooch-da/src/server/celestia/actor/server.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use async_trait::async_trait; -use celestia_types::nmt::Namespace; -use coerce::actor::context::ActorContext; -use coerce::actor::message::Handler; -use coerce::actor::Actor; - -use crate::chunk::{Chunk, ChunkV0}; -use rooch_config::da_config::DAServerCelestiaConfig; - -use crate::messages::PutBatchInternalDAMessage; -use crate::server::celestia::backend::Backend; - -pub struct DAServerCelestiaActor { - max_segment_size: usize, - backend: Backend, -} - -impl Actor for DAServerCelestiaActor {} - -impl DAServerCelestiaActor { - pub async fn new(cfg: &DAServerCelestiaConfig) -> Self { - let namespace_str = cfg.namespace.as_ref().unwrap().clone(); - let namespace: Namespace = serde_yaml::from_str(&namespace_str).unwrap(); - let conn_str = cfg.conn.as_ref().unwrap().clone(); - let token = cfg.auth_token.as_ref().unwrap().clone(); - - Self { - max_segment_size: cfg.max_segment_size.unwrap() as usize, - backend: Backend::new(namespace, &conn_str, &token).await, - } - } - - pub async fn public_batch(&self, batch_msg: PutBatchInternalDAMessage) -> Result<()> { - let chunk: ChunkV0 = batch_msg.batch.into(); - let segments = chunk.to_segments(self.max_segment_size); - for segment in segments { - let result = self.backend.submit(segment).await?; - log::info!( - "submitted segment to celestia node, segment_id: {:?}, namespace: {:?}, commitment: {:?}, height: {}", - result.segment_id, - result.namespace, - result.commitment, - result.height, - ); - } - - Ok(()) - } -} - -#[async_trait] -impl Handler for DAServerCelestiaActor { - async fn handle( - &mut self, - msg: PutBatchInternalDAMessage, - _ctx: &mut ActorContext, - ) -> Result<()> { - self.public_batch(msg).await - } -} diff --git a/crates/rooch-da/src/server/celestia/backend.rs b/crates/rooch-da/src/server/celestia/backend.rs deleted file mode 100644 index ae1df33d3d..0000000000 --- a/crates/rooch-da/src/server/celestia/backend.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use celestia_rpc::{BlobClient, Client}; -use celestia_types::blob::SubmitOptions; -use celestia_types::nmt::Namespace; -use celestia_types::{Blob, Commitment}; - -use crate::segment::{Segment, SegmentID}; - -pub struct Backend { - namespace: Namespace, - client: Client, -} - -pub struct SubmitBackendResult { - pub segment_id: SegmentID, - pub namespace: Namespace, - pub height: u64, - pub commitment: Commitment, -} - -impl Backend { - pub async fn new(namespace: Namespace, conn_str: &str, auth_token: &str) -> Self { - let celestia_client = Client::new(conn_str, Option::from(auth_token)) - .await - .unwrap(); - Self { - namespace, - client: celestia_client, - } - } - - pub async fn submit(&self, segment: Box) -> Result { - let data = segment.to_bytes(); - let blob = Blob::new(self.namespace, data).unwrap(); - let segment_id = segment.get_id(); - - // TODO tx manager - // TODO backoff retry - match self - .client - .blob_submit(&[blob.clone()], SubmitOptions::default()) - .await - { - Ok(height) => Ok(SubmitBackendResult { - segment_id, - namespace: self.namespace, - height, - commitment: blob.commitment, - }), - Err(e) => { - log::warn!( - "failed to submit segment to celestia node, segment_id: {:?}, commitment: {:?}, error:{:?}", - segment_id, - blob.commitment, - e, - ); - Err(e.into()) - } - } - } -} diff --git a/crates/rooch-da/src/server/celestia/mod.rs b/crates/rooch-da/src/server/celestia/mod.rs deleted file mode 100644 index cdb4ad6f9e..0000000000 --- a/crates/rooch-da/src/server/celestia/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -pub mod actor; -mod backend; -pub mod proxy; diff --git a/crates/rooch-da/src/server/celestia/proxy/mod.rs b/crates/rooch-da/src/server/celestia/proxy/mod.rs deleted file mode 100644 index 8fc59e1b39..0000000000 --- a/crates/rooch-da/src/server/celestia/proxy/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use async_trait::async_trait; -use coerce::actor::ActorRef; - -use crate::messages::PutBatchInternalDAMessage; -use crate::server::celestia::actor::server::DAServerCelestiaActor; -use crate::server::serverproxy::DAServerProxy; - -#[derive(Clone)] -pub struct DAServerCelestiaProxy { - pub actor: ActorRef, -} - -impl DAServerCelestiaProxy { - pub fn new(actor: ActorRef) -> Self { - Self { actor } - } - - pub async fn submit_batch(&self, msg: PutBatchInternalDAMessage) -> anyhow::Result<()> { - self.actor.send(msg).await? - } -} - -#[async_trait] -impl DAServerProxy for DAServerCelestiaProxy { - async fn public_batch(&self, msg: PutBatchInternalDAMessage) -> anyhow::Result<()> { - self.submit_batch(msg).await - } -} diff --git a/crates/rooch-da/src/server/mod.rs b/crates/rooch-da/src/server/mod.rs deleted file mode 100644 index db57631e4f..0000000000 --- a/crates/rooch-da/src/server/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -pub mod celestia; -pub mod openda; -pub mod serverproxy; diff --git a/crates/rooch-da/src/server/openda/actor/mod.rs b/crates/rooch-da/src/server/openda/actor/mod.rs deleted file mode 100644 index 83afd4d573..0000000000 --- a/crates/rooch-da/src/server/openda/actor/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -pub mod server; diff --git a/crates/rooch-da/src/server/openda/mod.rs b/crates/rooch-da/src/server/openda/mod.rs deleted file mode 100644 index c6dbf6b1fd..0000000000 --- a/crates/rooch-da/src/server/openda/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -pub mod actor; -pub mod proxy; diff --git a/crates/rooch-da/src/server/openda/proxy/mod.rs b/crates/rooch-da/src/server/openda/proxy/mod.rs deleted file mode 100644 index 068e271ce9..0000000000 --- a/crates/rooch-da/src/server/openda/proxy/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use crate::messages::PutBatchInternalDAMessage; -use crate::server::openda::actor::server::DAServerOpenDAActor; -use crate::server::serverproxy::DAServerProxy; -use async_trait::async_trait; -use coerce::actor::ActorRef; - -#[derive(Clone)] -pub struct DAServerOpenDAProxy { - pub actor: ActorRef, -} - -impl DAServerOpenDAProxy { - pub fn new(actor: ActorRef) -> Self { - Self { actor } - } - - pub async fn submit_batch(&self, msg: PutBatchInternalDAMessage) -> anyhow::Result<()> { - self.actor.send(msg).await? - } -} - -#[async_trait] -impl DAServerProxy for DAServerOpenDAProxy { - async fn public_batch(&self, msg: PutBatchInternalDAMessage) -> anyhow::Result<()> { - self.submit_batch(msg).await - } -} diff --git a/crates/rooch-da/src/server/serverproxy.rs b/crates/rooch-da/src/server/serverproxy.rs deleted file mode 100644 index 590b4ba0ec..0000000000 --- a/crates/rooch-da/src/server/serverproxy.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) RoochNetwork -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use async_trait::async_trait; - -use crate::messages::PutBatchInternalDAMessage; - -#[async_trait] -pub trait DAServerProxy: Sync + Send { - async fn public_batch(&self, request: PutBatchInternalDAMessage) -> Result<()>; -} - -// DAServerNopProxy is a no-op implementation of DAServerProxy -pub struct DAServerNopProxy; - -#[async_trait] -impl DAServerProxy for DAServerNopProxy { - async fn public_batch(&self, _request: PutBatchInternalDAMessage) -> Result<()> { - Ok(()) - } -} diff --git a/crates/rooch-executor/Cargo.toml b/crates/rooch-executor/Cargo.toml index e41dfdd3be..33d1a84186 100644 --- a/crates/rooch-executor/Cargo.toml +++ b/crates/rooch-executor/Cargo.toml @@ -35,4 +35,3 @@ rooch-types = { workspace = true } rooch-genesis = { workspace = true } rooch-event = { workspace = true } rooch-store = { workspace = true } -hex = "0.4.3" diff --git a/crates/rooch-proposer/src/actor/proposer.rs b/crates/rooch-proposer/src/actor/proposer.rs index 7dce391c32..c830bb31ed 100644 --- a/crates/rooch-proposer/src/actor/proposer.rs +++ b/crates/rooch-proposer/src/actor/proposer.rs @@ -1,15 +1,14 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::ProposerMetrics; use anyhow::Result; use async_trait::async_trait; use coerce::actor::{context::ActorContext, message::Handler, Actor}; use prometheus::Registry; -use std::sync::Arc; - -use crate::metrics::ProposerMetrics; -use rooch_da::proxy::DAProxy; +use rooch_da::proxy::DAServerProxy; use rooch_types::crypto::RoochKeyPair; +use std::sync::Arc; use crate::scc::StateCommitmentChain; @@ -25,7 +24,7 @@ pub struct ProposerActor { } impl ProposerActor { - pub fn new(proposer_key: RoochKeyPair, da_proxy: DAProxy, registry: &Registry) -> Self { + pub fn new(proposer_key: RoochKeyPair, da_proxy: DAServerProxy, registry: &Registry) -> Self { Self { proposer_key, scc: StateCommitmentChain::new(da_proxy), diff --git a/crates/rooch-proposer/src/scc/mod.rs b/crates/rooch-proposer/src/scc/mod.rs index e59a8e8a5c..661fe41f94 100644 --- a/crates/rooch-proposer/src/scc/mod.rs +++ b/crates/rooch-proposer/src/scc/mod.rs @@ -2,11 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use crate::actor::messages::TransactionProposeMessage; -use moveos_types::h256; use moveos_types::h256::H256; -use rooch_da::messages::Batch; -use rooch_da::proxy::DAProxy; +use rooch_da::actor::messages::PutDABatchMessage; +use rooch_da::proxy::DAServerProxy; use rooch_types::block::Block; +use rooch_types::transaction::LedgerTransaction; /// State Commitment Chain(SCC) is a chain of transaction state root /// This SCC is a mirror of the on-chain SCC @@ -14,12 +14,12 @@ pub struct StateCommitmentChain { //TODO save to the storage last_block: Option, buffer: Vec, - da: DAProxy, + da: DAServerProxy, } impl StateCommitmentChain { /// Create a new SCC - pub fn new(da_proxy: DAProxy) -> Self { + pub fn new(da_proxy: DAServerProxy) -> Self { Self { last_block: None, buffer: Vec::new(), @@ -77,24 +77,30 @@ impl StateCommitmentChain { // submit batch to DA server // TODO move batch submit out of proposer - let batch_data: Vec = self.buffer.iter().flat_map(|tx| tx.tx.encode()).collect(); - let batch_hash = h256::sha2_256_of(&batch_data); - if let Err(e) = self + let tx_list: Vec = self.buffer.iter().map(|tx| tx.tx.clone()).collect(); + let batch_meta = self .da - .submit_batch(Batch { - block_number, - tx_count: batch_size, - prev_tx_accumulator_root, - tx_accumulator_root, - batch_hash, - data: batch_data, + .pub_batch(PutDABatchMessage { + tx_order_start: tx_list + .first() + .expect("tx list must not empty") + .sequence_info + .tx_order, + tx_order_end: latest_transaction.tx.sequence_info.tx_order, + tx_list, }) - .await - { - log::error!("submit batch to DA server failed: {}", e); - return None; + .await; + match batch_meta { + Ok(batch_meta) => { + log::info!("submit batch to DA success: {:?}", batch_meta); + } + Err(e) => { + log::error!("submit batch to DA failed: {:?}", e); + } } + // even if the batch submission failed, new block must have been created(otherwise panic) + // TODO update block struct and add propose logic let new_block = Block::new( block_number, batch_size, diff --git a/crates/rooch-rpc-server/src/lib.rs b/crates/rooch-rpc-server/src/lib.rs index 6a02e84d08..01bc3f9ef9 100644 --- a/crates/rooch-rpc-server/src/lib.rs +++ b/crates/rooch-rpc-server/src/lib.rs @@ -16,10 +16,11 @@ use coerce::actor::{system::ActorSystem, IntoActor}; use jsonrpsee::RpcModule; use moveos_eventbus::bus::EventBus; use raw_store::errors::RawStoreError; +use rooch_config::da_config::derive_genesis_namespace; use rooch_config::server_config::ServerConfig; use rooch_config::{RoochOpt, ServerOpt}; -use rooch_da::actor::da::DAActor; -use rooch_da::proxy::DAProxy; +use rooch_da::actor::server::DAServerActor; +use rooch_da::proxy::DAServerProxy; use rooch_db::RoochDB; use rooch_event::actor::EventActor; use rooch_executor::actor::executor::ExecutorActor; @@ -283,7 +284,7 @@ pub async fn run_start_server(opt: RoochOpt, server_opt: ServerOpt) -> Result Result>, ) -> Result { - // The sequencer info would be inited when genesis, so the sequencer info should not be None + // The sequencer info would be initialized when genesis, so the sequencer info should not be None let last_sequencer_info = rooch_store .get_meta_store() .get_sequencer_info()? @@ -155,6 +155,7 @@ impl SequencerActor { ); let sequencer_info = SequencerInfo::new(tx.sequence_info.tx_order, tx_accumulator_info); + // TODO sequencer_info & tx should be saved in a transaction self.rooch_store .save_sequencer_info(sequencer_info.clone())?; self.rooch_store.save_transaction(tx.clone())?; diff --git a/crates/rooch-store/Cargo.toml b/crates/rooch-store/Cargo.toml index 186f8796c0..9eac03e330 100644 --- a/crates/rooch-store/Cargo.toml +++ b/crates/rooch-store/Cargo.toml @@ -20,8 +20,10 @@ prometheus = { workspace = true } tokio = { workspace = true } raw-store = { workspace = true } +accumulator = { workspace = true } + moveos-config = { workspace = true } moveos-types = { workspace = true } -accumulator = { workspace = true } +moveos-common = { workspace = true } rooch-types = { workspace = true } \ No newline at end of file diff --git a/crates/rooch-store/src/da_store/mod.rs b/crates/rooch-store/src/da_store/mod.rs new file mode 100644 index 0000000000..314af1c63a --- /dev/null +++ b/crates/rooch-store/src/da_store/mod.rs @@ -0,0 +1,277 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::{DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME, DA_BLOCK_SUBMIT_STATE_COLUMN_FAMILY_NAME}; +use raw_store::{derive_store, CodecKVStore, CodecWriteBatch}; +use rooch_types::da::batch::{BlockRange, BlockSubmitState}; +use std::cmp::min; + +pub const SUBMITTING_BLOCKS_PAGE_SIZE: usize = 1024; +pub const MAX_TXS_PER_BLOCK_IN_FIX: usize = 8192; // avoid OOM when fix submitting blocks after collapse + +pub const BACKGROUND_SUBMIT_BLOCK_CURSOR_KEY: &str = "background_submit_block_cursor"; +pub const LAST_BLOCK_NUMBER_KEY: &str = "last_block_number"; + +derive_store!( + DABlockSubmitStateStore, + u128, + BlockSubmitState, + DA_BLOCK_SUBMIT_STATE_COLUMN_FAMILY_NAME +); + +derive_store!( + DABlockCursorStore, + String, + u128, + DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME +); + +pub trait DAMetaStore { + fn get_submitting_blocks( + &self, + start_block: u128, + exp_count: Option, + ) -> anyhow::Result>; + fn append_submitting_block( + &self, + last_block_number: Option, + tx_order_start: u64, + tx_order_end: u64, + ) -> anyhow::Result; + fn set_submitting_block_done( + &self, + block_number: u128, + tx_order_start: u64, + tx_order_end: u64, + ) -> anyhow::Result<()>; + + fn get_background_submit_block_cursor(&self) -> anyhow::Result>; + fn set_background_submit_block_cursor(&self, cursor: u128) -> anyhow::Result<()>; + + fn get_last_block_number(&self) -> anyhow::Result>; + + // try to fix submitting blocks by last tx order(if not None) at starting, + // only could be called after try_fix_last_block_number(which has been invoked in new) + fn catchup_submitting_blocks(&self, last_order: Option) -> anyhow::Result<()>; +} + +#[derive(Clone)] +pub struct DAMetaDBStore { + block_submit_state_store: DABlockSubmitStateStore, + block_cursor_store: DABlockCursorStore, +} + +impl DAMetaDBStore { + pub fn new(instance: raw_store::StoreInstance) -> anyhow::Result { + let store = DAMetaDBStore { + block_submit_state_store: DABlockSubmitStateStore::new(instance.clone()), + block_cursor_store: DABlockCursorStore::new(instance), + }; + store.try_fix_last_block_number()?; + Ok(store) + } + + // update last block and update submitting blocks are not in the same transaction + // so there may be a case that the submitting blocks is updated but last block is not updated + pub(crate) fn try_fix_last_block_number(&self) -> anyhow::Result<()> { + let last_block_number = self.get_last_block_number()?; + if let Some(last_block_number) = last_block_number { + let blocks = self.get_submitting_blocks(last_block_number + 1, None)?; + if let Some(block) = blocks.last() { + self.set_last_block_number(block.block_number)?; + } + } + Ok(()) + } + + pub(crate) fn set_last_block_number(&self, block_number: u128) -> anyhow::Result<()> { + self.block_cursor_store + .put_sync(LAST_BLOCK_NUMBER_KEY.to_string(), block_number) + } + + #[allow(dead_code)] + fn add_submitting_block( + &self, + block_number: u128, + tx_order_start: u64, + tx_order_end: u64, + ) -> anyhow::Result<()> { + self.block_submit_state_store.put_sync( + block_number, + BlockSubmitState::new(block_number, tx_order_start, tx_order_end), + )?; + + self.set_last_block_number(block_number)?; + Ok(()) + } + + fn add_submitting_blocks(&self, ranges: Vec) -> anyhow::Result<()> { + if ranges.is_empty() { + return Ok(()); + } + + let last_block_number = ranges.last().unwrap().block_number; + let kvs: Vec<(u128, BlockSubmitState)> = ranges + .into_iter() + .map(|range| { + ( + range.block_number, + BlockSubmitState::new( + range.block_number, + range.tx_order_start, + range.tx_order_end, + ), + ) + }) + .collect(); + let batch = CodecWriteBatch::new_puts(kvs); + self.block_submit_state_store.write_batch_sync(batch)?; + + self.set_last_block_number(last_block_number)?; + Ok(()) + } + + pub(crate) fn calc_needed_block_for_fix_submitting( + &self, + last_block_number: Option, + last_order: u64, + ) -> anyhow::Result> { + // each block has n txs, n = [1, MAX_TXS_PER_BLOCK_IN_FIX], so we need to split txs into multiple blocks + let mut blocks = Vec::new(); + let mut block_number: u128 = 0; + let mut tx_order_start: u64 = 1; // tx_order_start starts from 1 (bypass genesis_tx) + let mut tx_order_end: u64 = min(MAX_TXS_PER_BLOCK_IN_FIX as u64, last_order); + if let Some(last_block_number) = last_block_number { + let last_block_state = self + .block_submit_state_store + .kv_get(last_block_number)? + .unwrap_or_else(|| { + panic!( + "submitting block not found for existed last block: {}", + last_block_number + ) + }); + let last_range = last_block_state.block_range; + block_number = last_block_number + 1; + tx_order_start = last_range.tx_order_end + 1; + tx_order_end = min( + tx_order_start + MAX_TXS_PER_BLOCK_IN_FIX as u64 - 1, + last_order, + ); + } + while tx_order_start <= last_order { + blocks.push(BlockRange { + block_number, + tx_order_start, + tx_order_end, + }); + tx_order_start = tx_order_end + 1; + tx_order_end = min( + tx_order_start + MAX_TXS_PER_BLOCK_IN_FIX as u64 - 1, + last_order, + ); + block_number += 1; + } + Ok(blocks) + } + + // try to fix submitting blocks by last tx order(if not None) at starting, + // only could be called after try_fix_last_block_number + fn try_fix_submitting_blocks(&self, last_order: Option) -> anyhow::Result<()> { + if let Some(last_order) = last_order { + if last_order == 0 { + // only has genesis_tx + return Ok(()); + } + let last_block_number = self.get_last_block_number()?; + let ranges = + self.calc_needed_block_for_fix_submitting(last_block_number, last_order)?; + self.add_submitting_blocks(ranges)?; + } + Ok(()) + } +} + +impl DAMetaStore for DAMetaDBStore { + fn get_submitting_blocks( + &self, + start_block: u128, + exp_count: Option, + ) -> anyhow::Result> { + let exp_count = exp_count.unwrap_or(SUBMITTING_BLOCKS_PAGE_SIZE); + // try to get exp_count unsubmitted blocks + let mut blocks = Vec::with_capacity(exp_count); + let mut blocks_count = 0; + let mut block_number = start_block; + while blocks_count < exp_count { + let state = self.block_submit_state_store.kv_get(block_number)?; + if let Some(state) = state { + if !state.done { + blocks.push(BlockRange { + block_number, + tx_order_start: state.block_range.tx_order_start, + tx_order_end: state.block_range.tx_order_end, + }); + blocks_count += 1; + } + } else { + break; + } + block_number += 1; + } + + Ok(blocks) + } + + // TODO use rocksdb txn directly(Atomic writes across Column Families are supported), replacing of derive_store + fn append_submitting_block( + &self, + last_block_number: Option, + tx_order_start: u64, + tx_order_end: u64, + ) -> anyhow::Result { + let block_number = match last_block_number { + Some(last_block_number) => last_block_number + 1, + None => 0, + }; + + self.block_submit_state_store.put_sync( + block_number, + BlockSubmitState::new(block_number, tx_order_start, tx_order_end), + )?; + + self.set_last_block_number(block_number)?; + Ok(block_number) + } + + fn set_submitting_block_done( + &self, + block_number: u128, + tx_order_start: u64, + tx_order_end: u64, + ) -> anyhow::Result<()> { + self.block_submit_state_store.kv_put( + block_number, + BlockSubmitState::new_done(block_number, tx_order_start, tx_order_end), + ) + } + + fn get_background_submit_block_cursor(&self) -> anyhow::Result> { + self.block_cursor_store + .kv_get(BACKGROUND_SUBMIT_BLOCK_CURSOR_KEY.to_string()) + } + + fn set_background_submit_block_cursor(&self, cursor: u128) -> anyhow::Result<()> { + self.block_cursor_store + .kv_put(BACKGROUND_SUBMIT_BLOCK_CURSOR_KEY.to_string(), cursor) + } + + fn get_last_block_number(&self) -> anyhow::Result> { + self.block_cursor_store + .kv_get(LAST_BLOCK_NUMBER_KEY.to_string()) + } + + fn catchup_submitting_blocks(&self, last_order: Option) -> anyhow::Result<()> { + self.try_fix_submitting_blocks(last_order) + } +} diff --git a/crates/rooch-store/src/lib.rs b/crates/rooch-store/src/lib.rs index d52add70be..b2f6e6e590 100644 --- a/crates/rooch-store/src/lib.rs +++ b/crates/rooch-store/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::accumulator_store::{AccumulatorStore, TransactionAccumulatorStore}; +use crate::da_store::{DAMetaDBStore, DAMetaStore}; use crate::meta_store::{MetaDBStore, MetaStore}; use crate::state_store::{StateDBStore, StateStore}; use crate::transaction_store::{TransactionDBStore, TransactionStore}; @@ -16,6 +17,7 @@ use prometheus::Registry; use raw_store::metrics::DBMetrics; use raw_store::rocks::RocksDB; use raw_store::{ColumnFamilyName, StoreInstance}; +use rooch_types::da::batch::BlockRange; use rooch_types::sequencer::SequencerInfo; use rooch_types::transaction::LedgerTransaction; use std::fmt::{Debug, Display, Formatter}; @@ -23,11 +25,13 @@ use std::path::Path; use std::sync::Arc; pub mod accumulator_store; +pub mod da_store; pub mod meta_store; pub mod state_store; +pub mod transaction_store; + #[cfg(test)] mod tests; -pub mod transaction_store; // pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; pub const TRANSACTION_COLUMN_FAMILY_NAME: ColumnFamilyName = "transaction"; @@ -38,6 +42,9 @@ pub const TX_ACCUMULATOR_NODE_COLUMN_FAMILY_NAME: ColumnFamilyName = "transactio pub const STATE_CHANGE_SET_COLUMN_FAMILY_NAME: ColumnFamilyName = "state_change_set"; +pub const DA_BLOCK_SUBMIT_STATE_COLUMN_FAMILY_NAME: ColumnFamilyName = "da_block_submit_state"; +pub const DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME: ColumnFamilyName = "da_last_block_number"; + ///db store use cf_name vec to init /// Please note that adding a column family needs to be added in vec simultaneously, remember!! static VEC_COLUMN_FAMILY_NAME: Lazy> = Lazy::new(|| { @@ -47,6 +54,8 @@ static VEC_COLUMN_FAMILY_NAME: Lazy> = Lazy::new(|| { META_SEQUENCER_INFO_COLUMN_FAMILY_NAME, TX_ACCUMULATOR_NODE_COLUMN_FAMILY_NAME, STATE_CHANGE_SET_COLUMN_FAMILY_NAME, + DA_BLOCK_SUBMIT_STATE_COLUMN_FAMILY_NAME, + DA_BLOCK_CURSOR_COLUMN_FAMILY_NAME, ] }); @@ -65,6 +74,7 @@ pub struct RoochStore { pub meta_store: MetaDBStore, pub transaction_accumulator_store: AccumulatorStore, pub state_store: StateDBStore, + pub da_meta_store: DAMetaDBStore, } impl RoochStore { @@ -82,13 +92,15 @@ impl RoochStore { } pub fn new_with_instance(instance: StoreInstance, _registry: &Registry) -> Result { + let da_meta_store = DAMetaDBStore::new(instance.clone())?; let store = Self { transaction_store: TransactionDBStore::new(instance.clone()), meta_store: MetaDBStore::new(instance.clone()), transaction_accumulator_store: AccumulatorStore::new_transaction_accumulator_store( instance.clone(), ), - state_store: StateDBStore::new(instance), + state_store: StateDBStore::new(instance.clone()), + da_meta_store, }; Ok(store) } @@ -116,6 +128,10 @@ impl RoochStore { pub fn get_state_store(&self) -> &StateDBStore { &self.state_store } + + pub fn get_da_meta_store(&self) -> &DAMetaDBStore { + &self.da_meta_store + } } impl Display for RoochStore { @@ -197,3 +213,59 @@ impl StateStore for RoochStore { self.get_state_store().remove_state_change_set(tx_order) } } + +impl DAMetaStore for RoochStore { + fn get_submitting_blocks( + &self, + start_block: u128, + exp_count: Option, + ) -> Result> { + self.get_da_meta_store() + .get_submitting_blocks(start_block, exp_count) + } + + fn append_submitting_block( + &self, + last_block_number: Option, + tx_order_start: u64, + tx_order_end: u64, + ) -> Result { + self.get_da_meta_store().append_submitting_block( + last_block_number, + tx_order_start, + tx_order_end, + ) + } + + fn set_submitting_block_done( + &self, + block_number: u128, + tx_order_start: u64, + tx_order_end: u64, + ) -> Result<()> { + self.get_da_meta_store().set_submitting_block_done( + block_number, + tx_order_start, + tx_order_end, + ) + } + + fn get_background_submit_block_cursor(&self) -> Result> { + self.get_da_meta_store() + .get_background_submit_block_cursor() + } + + fn set_background_submit_block_cursor(&self, cursor: u128) -> Result<()> { + self.get_da_meta_store() + .set_background_submit_block_cursor(cursor) + } + + fn get_last_block_number(&self) -> Result> { + self.get_da_meta_store().get_last_block_number() + } + + fn catchup_submitting_blocks(&self, last_order: Option) -> Result<()> { + self.get_da_meta_store() + .catchup_submitting_blocks(last_order) + } +} diff --git a/crates/rooch-store/src/tests/mod.rs b/crates/rooch-store/src/tests/mod.rs index cb1dbc2af8..22e4a2649a 100644 --- a/crates/rooch-store/src/tests/mod.rs +++ b/crates/rooch-store/src/tests/mod.rs @@ -2,3 +2,4 @@ // SPDX-License-Identifier: Apache-2.0 mod test_accumulator; +mod test_da_store; diff --git a/crates/rooch-store/src/tests/test_da_store.rs b/crates/rooch-store/src/tests/test_da_store.rs new file mode 100644 index 0000000000..3b4e9873a3 --- /dev/null +++ b/crates/rooch-store/src/tests/test_da_store.rs @@ -0,0 +1,259 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::da_store::{DAMetaStore, MAX_TXS_PER_BLOCK_IN_FIX}; +use crate::RoochStore; +use rooch_types::da::batch::BlockRange; + +#[tokio::test] +async fn test_append_submitting_blocks() { + let (rooch_store, _) = RoochStore::mock_rooch_store().unwrap(); + let da_meta_store = rooch_store.get_da_meta_store(); + + da_meta_store.append_submitting_block(None, 1, 6).unwrap(); + + let last_block_number = 0; + let tx_order_start = 7; + let tx_order_end = 7; + da_meta_store + .append_submitting_block(Some(last_block_number), tx_order_start, tx_order_end) + .unwrap(); + + let submitting_blocks = da_meta_store.get_submitting_blocks(0, None).unwrap(); + assert_eq!(submitting_blocks.len(), 2); + assert_eq!(submitting_blocks[0].block_number, 0); + assert_eq!(submitting_blocks[0].tx_order_start, 1); + assert_eq!(submitting_blocks[0].tx_order_end, 6); + assert_eq!(submitting_blocks[1].block_number, 1); + assert_eq!(submitting_blocks[1].tx_order_start, 7); + assert_eq!(submitting_blocks[1].tx_order_end, 7); + + let submitting_blocks = da_meta_store.get_submitting_blocks(1, None).unwrap(); + assert_eq!(submitting_blocks.len(), 1); + assert_eq!(submitting_blocks[0].block_number, 1); + assert_eq!(submitting_blocks[0].tx_order_start, 7); + assert_eq!(submitting_blocks[0].tx_order_end, 7); +} + +#[tokio::test] +async fn test_try_fix_last_block_number() { + let (rooch_store, _) = RoochStore::mock_rooch_store().unwrap(); + let da_meta_store = rooch_store.get_da_meta_store(); + + let tx_order_start = 7; + let tx_order_end = 7; + let last_block_number = da_meta_store + .append_submitting_block(None, tx_order_start, tx_order_end) + .unwrap(); + da_meta_store + .append_submitting_block(Some(last_block_number), tx_order_start, tx_order_end) + .unwrap(); + da_meta_store.set_last_block_number(0).unwrap(); + assert_eq!(da_meta_store.get_last_block_number().unwrap().unwrap(), 0); + let submitting_blocks = da_meta_store.get_submitting_blocks(0, None).unwrap(); + assert_eq!(submitting_blocks.len(), 2); + assert_eq!(submitting_blocks[1].block_number, 1); + assert_eq!(submitting_blocks[1].tx_order_start, 7); + assert_eq!(submitting_blocks[1].tx_order_end, 7); + da_meta_store.try_fix_last_block_number().unwrap(); + assert_eq!(da_meta_store.get_last_block_number().unwrap().unwrap(), 1); +} + +#[tokio::test] +async fn test_calc_needed_block_for_fix_submitting() { + let (rooch_store, _) = RoochStore::mock_rooch_store().unwrap(); + + test_calc_need_submitting_case(0, None, 0, rooch_store.clone(), None); + test_calc_need_submitting_case(1, None, 1, rooch_store.clone(), Some(0)); + test_calc_need_submitting_case(2, None, 10, rooch_store.clone(), Some(0)); + test_calc_need_submitting_case( + 3, + None, + MAX_TXS_PER_BLOCK_IN_FIX as u64, + rooch_store.clone(), + Some(0), + ); + test_calc_need_submitting_case( + 4, + None, + MAX_TXS_PER_BLOCK_IN_FIX as u64 + 1, + rooch_store.clone(), + Some(0), + ); + test_calc_need_submitting_case( + 5, + None, + MAX_TXS_PER_BLOCK_IN_FIX as u64 + 2, + rooch_store.clone(), + Some(0), + ); + test_calc_need_submitting_case( + 6, + None, + 9 + 2 * MAX_TXS_PER_BLOCK_IN_FIX as u64, + rooch_store.clone(), + Some(0), + ); + + let tx_order_end = 3 * MAX_TXS_PER_BLOCK_IN_FIX as u64 + 1; + let last_block_number = rooch_store.append_submitting_block(None, 1, 3).unwrap(); + let last_block_number = rooch_store + .append_submitting_block(Some(last_block_number), 4, tx_order_end) + .unwrap(); + test_calc_need_submitting_case( + 7, + Some(last_block_number), + tx_order_end, + rooch_store.clone(), + None, + ); + for i in 1..2 * MAX_TXS_PER_BLOCK_IN_FIX + 2 { + test_calc_need_submitting_case( + (7 + i) as u64, + Some(last_block_number), + tx_order_end + i as u64, + rooch_store.clone(), + Some(last_block_number + 1), + ); + } +} + +fn test_calc_need_submitting_case( + test_case: u64, + last_block_number: Option, + last_order: u64, + rooch_store: RoochStore, + exp_first_block_number: Option, +) { + let da_meta_store = rooch_store.get_da_meta_store(); + + let block_ranges = da_meta_store + .calc_needed_block_for_fix_submitting(last_block_number, last_order) + .unwrap(); + + let origin_tx_order_end = if let Some(last_block_number) = last_block_number { + let submitting_blocks = da_meta_store + .get_submitting_blocks(last_block_number, None) + .unwrap(); + Some(submitting_blocks.first().unwrap().tx_order_end) + } else { + None + }; + + check_block_ranges( + test_case, + block_ranges, + exp_first_block_number, + last_order, + origin_tx_order_end, + ); +} + +fn check_block_ranges( + test_case: u64, + block_ranges: Vec, + exp_first_block_number: Option, + last_order: u64, + origin_tx_order_end: Option, +) { + if exp_first_block_number.is_none() { + assert_eq!( + block_ranges.len(), + 0, + "Test case {}: expected no block ranges, but got {:#?}", + test_case, + block_ranges, + ); + return; + } + assert!( + !block_ranges.is_empty(), + "Test case {}: expected some block ranges, but got none", + test_case, + ); + + let exp_first_block_number = exp_first_block_number.unwrap(); + let act_first_block_number = block_ranges.first().unwrap().block_number; + assert_eq!( + act_first_block_number, exp_first_block_number, + "Test case {}: first block number mismatch, expected {}, got {}", + test_case, exp_first_block_number, act_first_block_number, + ); + + let tx_start = block_ranges.first().unwrap().tx_order_start; + assert!( + tx_start > 0, + "Test case {}: first order mismatch, expected > 0, got {}", + test_case, + tx_start, + ); + + let tx_end = block_ranges.last().unwrap().tx_order_end; + assert_eq!( + tx_end, last_order, + "Test case {}: last order mismatch, expected {}, got {}", + test_case, last_order, tx_end, + ); + + for block_range in &block_ranges { + let txs = block_range.tx_order_end - block_range.tx_order_start + 1; + assert!( + txs <= MAX_TXS_PER_BLOCK_IN_FIX as u64, + "Test case {}: too many txs in block range {:#?}, max allowed {}", + test_case, + block_range, + MAX_TXS_PER_BLOCK_IN_FIX + ); + assert!( + block_range.tx_order_start <= block_range.tx_order_end, + "Test case {}: tx_order_start > tx_order_end in block range {:#?}", + test_case, + block_range, + ); + } + + for i in 0..block_ranges.len() - 1 { + let tx_order_end = block_ranges[i].tx_order_end; + let tx_order_start = block_ranges[i + 1].tx_order_start; + assert_eq!( + tx_order_end + 1, + tx_order_start, + "Test case {}: tx_order continuity issue between blocks {:#?} and {:#?}", + test_case, + block_ranges[i], + block_ranges[i + 1] + ); + + let block_number = block_ranges[i].block_number; + let next_block_number = block_ranges[i + 1].block_number; + assert_eq!( + block_number + 1, + next_block_number, + "Test case {}: block number continuity issue between blocks {:#?} and {:#?}", + test_case, + block_ranges[i], + block_ranges[i + 1] + ); + } + + let total_txs: u64 = block_ranges + .iter() + .map(|block_range| block_range.tx_order_end - block_range.tx_order_start + 1) + .sum(); + let exp_txs = if let Some(origin_tx_order_end) = origin_tx_order_end { + last_order - origin_tx_order_end + } else { + last_order + }; + assert_eq!( + exp_txs, + total_txs, + "Test case {}: total txs in new blocks mismatch, expected {}, got {}. last order: {}; origin tx order end: {:?}. block ranges: {:#?}", + test_case, + exp_txs, + total_txs, + last_order, + origin_tx_order_end, + block_ranges, + ); +} diff --git a/crates/rooch-types/Cargo.toml b/crates/rooch-types/Cargo.toml index 3d8e6a266e..eca447d429 100644 --- a/crates/rooch-types/Cargo.toml +++ b/crates/rooch-types/Cargo.toml @@ -41,6 +41,8 @@ bs58 = { workspace = true, features = ["check"] } chacha20poly1305 = { workspace = true } argon2 = { workspace = true } tracing = { workspace = true } +xxhash-rust = { workspace = true, features = ["xxh3"] } +lz4 = { workspace = true } move-core-types = { workspace = true } move-vm-types = { workspace = true } diff --git a/crates/rooch-types/src/da/batch.rs b/crates/rooch-types/src/da/batch.rs new file mode 100644 index 0000000000..0216faa808 --- /dev/null +++ b/crates/rooch-types/src/da/batch.rs @@ -0,0 +1,120 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::crypto::{RoochKeyPair, Signature}; +use crate::transaction::LedgerTransaction; +use moveos_types::h256::{sha2_256_of, H256}; +use serde::{Deserialize, Serialize}; + +#[derive(Eq, PartialEq, Hash, Deserialize, Serialize, Clone, Debug)] +/// The tx order range of the block. +pub struct BlockRange { + /// The Rooch block number for DA, each batch maps to a block + pub block_number: u128, + /// The start tx order of the block (inclusive) + pub tx_order_start: u64, + /// The end tx order of the block (inclusive) + pub tx_order_end: u64, +} + +#[derive(Eq, PartialEq, Hash, Deserialize, Serialize, Clone, Debug)] +/// The state of the block submission. +pub struct BlockSubmitState { + /// tx order range of the block + pub block_range: BlockRange, + /// submitted or not + pub done: bool, +} + +impl BlockSubmitState { + /// Create a new BlockSubmitState + pub fn new(block_number: u128, tx_order_start: u64, tx_order_end: u64) -> Self { + Self { + block_range: BlockRange { + block_number, + tx_order_start, + tx_order_end, + }, + done: false, + } + } + pub fn new_done(block_number: u128, tx_order_start: u64, tx_order_end: u64) -> Self { + Self { + block_range: BlockRange { + block_number, + tx_order_start, + tx_order_end, + }, + done: true, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +/// Meta of DA batch +pub struct DABatchMeta { + /// tx order range of the block + pub block_range: BlockRange, + /// sha256h of encoded tx_list + pub tx_list_hash: H256, +} + +impl DABatchMeta { + pub fn new( + block_number: u128, + tx_order_start: u64, + tx_order_end: u64, + tx_list_hash: H256, + ) -> Self { + Self { + block_range: BlockRange { + block_number, + tx_order_start, + tx_order_end, + }, + tx_list_hash, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct SignedDABatchMeta { + pub meta: DABatchMeta, + pub signature: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +/// A batch is a collection of transactions. It is the unit of data flow in DA Stream +pub struct DABatch { + /// The metadata of the batch + pub meta: DABatchMeta, + /// meta signature, signed by sequencer. + pub meta_signature: Vec, + /// encoded Vec + pub tx_list_bytes: Vec, +} + +impl DABatch { + pub fn new( + block_number: u128, + tx_order_start: u64, + tx_order_end: u64, + tx_list: &Vec, + sequencer_key: RoochKeyPair, + ) -> Self { + let tx_list_bytes = bcs::to_bytes(tx_list).expect("encode tx_list should success"); + let tx_list_hash = sha2_256_of(&tx_list_bytes); + let batch_meta = DABatchMeta::new(block_number, tx_order_start, tx_order_end, tx_list_hash); + let meta_bytes = bcs::to_bytes(&batch_meta).expect("encode batch_meta should success"); + let meta_hash = sha2_256_of(&meta_bytes); + let meta_signature = Signature::sign(&meta_hash.0, &sequencer_key) + .as_ref() + .to_vec(); + + Self { + meta: batch_meta, + meta_signature, + tx_list_bytes, + } + } +} diff --git a/crates/rooch-da/src/chunk.rs b/crates/rooch-types/src/da/chunk.rs similarity index 77% rename from crates/rooch-da/src/chunk.rs rename to crates/rooch-types/src/da/chunk.rs index 3f5789d113..f73873e219 100644 --- a/crates/rooch-da/src/chunk.rs +++ b/crates/rooch-types/src/da/chunk.rs @@ -1,8 +1,8 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::messages::Batch; -use crate::segment::{Segment, SegmentID, SegmentV0}; +use crate::da::batch::DABatch; +use crate::da::segment::{Segment, SegmentID, SegmentV0}; use lz4::EncoderBuilder; use serde::{Deserialize, Serialize}; use std::io; @@ -36,30 +36,24 @@ pub trait Chunk { fn to_bytes(&self) -> Vec; fn get_version(&self) -> ChunkVersion; fn to_segments(&self, max_segment_size: usize) -> Vec>; - fn get_batch(&self) -> Batch; + fn get_batches(&self) -> Vec; + fn get_chunk_id(&self) -> u128; } // ChunkV0: -// 1. each chunk maps to a batch +// 1. each chunk maps to a batch (block number is chunk_id) // 2. batch_data compressed by lz4 #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct ChunkV0 { pub version: ChunkVersion, - pub batch: Batch, + pub batch: DABatch, } -impl From for ChunkV0 { - fn from(batch: Batch) -> Self { +impl From for ChunkV0 { + fn from(batch: DABatch) -> Self { Self { version: ChunkVersion::V0, - batch: Batch { - block_number: batch.block_number, - tx_count: batch.tx_count, - prev_tx_accumulator_root: batch.prev_tx_accumulator_root, - tx_accumulator_root: batch.tx_accumulator_root, - batch_hash: batch.batch_hash, - data: batch.data, - }, + batch, } } } @@ -87,7 +81,7 @@ impl Chunk for ChunkV0 { let segments_data = bytes.chunks(max_segment_size); let segments_count = segments_data.len(); - let chunk_id = self.batch.block_number; + let chunk_id = self.get_chunk_id(); segments_data .enumerate() .map(|(i, data)| { @@ -107,8 +101,13 @@ impl Chunk for ChunkV0 { .collect::>() } - fn get_batch(&self) -> Batch { - self.batch.clone() + fn get_batches(&self) -> Vec { + vec![self.batch.clone()] + } + + /// using batch.meta.block_number as chunk_id + fn get_chunk_id(&self) -> u128 { + self.batch.meta.block_range.block_number } } @@ -175,29 +174,30 @@ impl ChunkV0 { #[cfg(test)] mod tests { use super::*; - use moveos_types::h256; + use crate::crypto::RoochKeyPair; + use crate::test_utils::random_ledger_transaction; + use crate::transaction::LedgerTransaction; #[test] fn test_chunk_v0() { - let batch = Batch { - block_number: 1, - tx_count: 1, - prev_tx_accumulator_root: Default::default(), - tx_accumulator_root: Default::default(), - batch_hash: h256::sha2_256_of(&[1, 2, 3, 4, 5]), - data: vec![1, 2, 3, 4, 5], - }; + let tx_cnt = 128; + let keypair = RoochKeyPair::generate_secp256k1(); + + let tx_list = (0..tx_cnt) + .map(|_| random_ledger_transaction()) + .collect::>(); + let batch = DABatch::new(123, 56, 78, &tx_list, keypair); let chunk = ChunkV0::from(batch.clone()); - let segments = chunk.to_segments(3); - assert_eq!(segments.len(), 39); + let segments = chunk.to_segments(1023); let chunk = chunk_from_segments(segments).unwrap(); - assert_eq!(chunk.get_batch(), batch); + let batches = chunk.get_batches(); + let act_batch = batches.first().unwrap(); + assert_eq!(act_batch, &batch); - assert_eq!( - chunk.get_batch().batch_hash, - h256::sha2_256_of(&[1, 2, 3, 4, 5]) - ); + let act_tx_list: Vec = + bcs::from_bytes(&act_batch.tx_list_bytes).expect("decode tx_list should success"); + assert_eq!(tx_list, act_tx_list); } } diff --git a/crates/rooch-da/src/server/celestia/actor/mod.rs b/crates/rooch-types/src/da/mod.rs similarity index 59% rename from crates/rooch-da/src/server/celestia/actor/mod.rs rename to crates/rooch-types/src/da/mod.rs index 83afd4d573..ee7e48696d 100644 --- a/crates/rooch-da/src/server/celestia/actor/mod.rs +++ b/crates/rooch-types/src/da/mod.rs @@ -1,4 +1,6 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -pub mod server; +pub mod batch; +pub mod chunk; +pub mod segment; diff --git a/crates/rooch-da/src/segment.rs b/crates/rooch-types/src/da/segment.rs similarity index 99% rename from crates/rooch-da/src/segment.rs rename to crates/rooch-types/src/da/segment.rs index 61cd75a285..d570a2bab6 100644 --- a/crates/rooch-da/src/segment.rs +++ b/crates/rooch-types/src/da/segment.rs @@ -1,7 +1,7 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::chunk::ChunkVersion; +use crate::da::chunk::ChunkVersion; use serde::Serialize; use std::fmt; use std::str::FromStr; diff --git a/crates/rooch-types/src/lib.rs b/crates/rooch-types/src/lib.rs index b1dce6f5af..6612bd4aeb 100644 --- a/crates/rooch-types/src/lib.rs +++ b/crates/rooch-types/src/lib.rs @@ -8,6 +8,7 @@ pub mod bitcoin; pub mod block; pub mod coin_type; pub mod crypto; +pub mod da; pub mod error; pub mod framework; pub mod function_arg; diff --git a/scripts/deploy_rooch_mainnet.sh b/scripts/deploy_rooch_mainnet.sh index 80ff632519..9a6e65c17d 100644 --- a/scripts/deploy_rooch_mainnet.sh +++ b/scripts/deploy_rooch_mainnet.sh @@ -17,5 +17,6 @@ docker run -d --name rooch-mainnet --restart unless-stopped -v /data:/root -p 67 --btc-rpc-url "$BTC_MAIN_RPC_URL" \ --btc-rpc-username rooch-main \ --btc-rpc-password "$BTC_MAIN_RPC_PWD" \ + --da "{\"da-backend\": {\"backends\": [{\"open-da\": {\"scheme\": \"gcs\", \"config\": {\"bucket\": \"$OPENDA_GCP_MAINNET_BUCKET\", \"credential\": \"$OPENDA_GCP_MAINNET_CREDENTIAL\"}}}]}}" \ --traffic-burst-size 100000 \ --traffic-per-second 1 diff --git a/scripts/deploy_rooch_testnet.sh b/scripts/deploy_rooch_testnet.sh index a89b97748c..61c8dac0ee 100644 --- a/scripts/deploy_rooch_testnet.sh +++ b/scripts/deploy_rooch_testnet.sh @@ -21,4 +21,4 @@ docker run -d --name rooch-testnet --restart unless-stopped -v /data:/root -p 67 --btc-rpc-password "$BTC_TEST_RPC_PWD" \ --traffic-burst-size 100000 \ --traffic-per-second 1 \ - --da "{\"internal-da-server\": {\"servers\": [{\"open-da\": {\"scheme\": \"gcs\", \"config\": {\"bucket\": \"$OPENDA_GCP_TESTNET_BUCKET\", \"credential\": \"$OPENDA_GCP_TESTNET_CREDENTIAL\"}}}]}}" + --da "{\"da-backend\": {\"backends\": [{\"open-da\": {\"scheme\": \"gcs\", \"config\": {\"bucket\": \"$OPENDA_GCP_TESTNET_BUCKET\", \"credential\": \"$OPENDA_GCP_TESTNET_CREDENTIAL\"}}}]}}"