diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 240baea11..94008426b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -555,7 +555,7 @@ jobs: strategy: fail-fast: false matrix: - test_name: [zombie_tanssi, zombie_tanssi_parathreads, zombie_tanssi_rotation, zombie_tanssi_warp_sync, zombie_tanssi_relay] + test_name: [zombie_tanssi, zombie_tanssi_parathreads, zombie_tanssi_rotation, zombie_tanssi_warp_sync, zombie_tanssi_relay, zombie_data_preservers] steps: - name: Checkout uses: actions/checkout@v4 diff --git a/Cargo.lock b/Cargo.lock index 51ada918d..dc2e61248 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1375,7 +1375,7 @@ dependencies = [ [[package]] name = "ccp-authorities-noting-inherent" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "async-trait", "cumulus-primitives-core", @@ -1403,7 +1403,7 @@ dependencies = [ [[package]] name = "ccp-xcm" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "frame-support", "frame-system", @@ -3115,6 +3115,7 @@ dependencies = [ "pallet-collator-assignment-runtime-api", "pallet-configuration", "pallet-data-preservers", + "pallet-data-preservers-runtime-api", "pallet-foreign-asset-creator", "pallet-identity", "pallet-inflation-rewards", @@ -3241,7 +3242,7 @@ dependencies = [ [[package]] name = "dc-orchestrator-chain-interface" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "async-trait", "cumulus-primitives-core", @@ -3252,6 +3253,7 @@ dependencies = [ "parity-scale-codec", "polkadot-overseer", "sc-client-api", + "serde", "sp-api", "sp-blockchain", "sp-state-machine", @@ -3486,7 +3488,7 @@ checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" [[package]] name = "dp-chain-state-snapshot" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "parity-scale-codec", @@ -3498,7 +3500,7 @@ dependencies = [ [[package]] name = "dp-collator-assignment" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "frame-support", @@ -3518,7 +3520,7 @@ dependencies = [ [[package]] name = "dp-consensus" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "frame-support", @@ -3535,7 +3537,7 @@ dependencies = [ [[package]] name = "dp-container-chain-genesis-data" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "frame-support", @@ -3557,7 +3559,7 @@ dependencies = [ [[package]] name = "dp-core" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "frame-support", @@ -3572,7 +3574,7 @@ dependencies = [ [[package]] name = "dp-impl-tanssi-pallets-config" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "dp-consensus", "frame-support", @@ -3587,7 +3589,7 @@ dependencies = [ [[package]] name = "dp-slot-duration-runtime-api" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "frame-support", @@ -4456,6 +4458,7 @@ dependencies = [ "pallet-collator-assignment-runtime-api", "pallet-configuration", "pallet-data-preservers", + "pallet-data-preservers-runtime-api", "pallet-identity", "pallet-inflation-rewards", "pallet-initializer", @@ -8692,7 +8695,7 @@ dependencies = [ [[package]] name = "pallet-cc-authorities-noting" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "ccp-authorities-noting-inherent", "cumulus-pallet-parachain-system", @@ -8861,6 +8864,17 @@ dependencies = [ "tp-traits", ] +[[package]] +name = "pallet-data-preservers-runtime-api" +version = "0.1.0" +dependencies = [ + "parity-scale-codec", + "scale-info", + "serde", + "sp-api", + "thiserror", +] + [[package]] name = "pallet-delegated-staking" version = "4.0.0" @@ -10253,7 +10267,7 @@ dependencies = [ [[package]] name = "pallet-xcm-executor-utils" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "frame-benchmarking", "frame-support", @@ -17084,6 +17098,7 @@ dependencies = [ "pallet-author-noting-runtime-api", "pallet-collator-assignment-runtime-api", "pallet-configuration", + "pallet-data-preservers-runtime-api", "pallet-registrar-runtime-api", "parity-scale-codec", "polkadot-cli", @@ -17505,6 +17520,8 @@ dependencies = [ "nimbus-primitives", "node-common", "pallet-author-noting-runtime-api", + "pallet-data-preservers", + "polkadot-overseer", "polkadot-primitives", "sc-basic-authorship", "sc-chain-spec", @@ -17538,6 +17555,7 @@ dependencies = [ "substrate-prometheus-endpoint", "tc-consensus", "tokio", + "tokio-stream", "tokio-util", ] @@ -17581,7 +17599,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "test-relay-sproof-builder" version = "0.1.0" -source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#cf501fc2ad446b06256084b75a39bdb3c276eae4" +source = "git+https://github.com/moondance-labs/dancekit?branch=tanssi-polkadot-stable2407#9d06202ad00322741f0556138ce5a2a4b2e12c69" dependencies = [ "cumulus-primitives-core", "dp-collator-assignment", diff --git a/Cargo.toml b/Cargo.toml index 18b84eb6d..563cc962f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ pallet-collator-assignment = { path = "pallets/collator-assignment", default-fea pallet-collator-assignment-runtime-api = { path = "pallets/collator-assignment/runtime-api", default-features = false } pallet-configuration = { path = "pallets/configuration", default-features = false } pallet-data-preservers = { path = "pallets/data-preservers", default-features = false } +pallet-data-preservers-runtime-api = { path = "pallets/data-preservers/runtime-api", default-features = false } pallet-inflation-rewards = { path = "pallets/inflation-rewards", default-features = false } pallet-initializer = { path = "pallets/initializer", default-features = false } pallet-invulnerables = { path = "pallets/invulnerables", default-features = false } diff --git a/client/consensus/src/mocks.rs b/client/consensus/src/mocks.rs index 0e5ddc213..265c48faa 100644 --- a/client/consensus/src/mocks.rs +++ b/client/consensus/src/mocks.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Tanssi. If not, see . -use polkadot_node_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest}; use { crate::{ collators::lookahead::Params as LookAheadParams, OrchestratorAuraWorkerAuxData, @@ -29,14 +28,14 @@ use { CommittedCandidateReceipt, OverseerHandle, RelayChainInterface, RelayChainResult, StorageValue, }, - futures::channel::oneshot, - futures::prelude::*, + futures::{channel::oneshot, prelude::*}, nimbus_primitives::{ CompatibleDigestItem, NimbusId, NimbusPair, NIMBUS_ENGINE_ID, NIMBUS_KEY_ID, }, pallet_xcm_core_buyer_runtime_api::BuyingError, parity_scale_codec::Encode, polkadot_core_primitives::{Header as PHeader, InboundDownwardMessage, InboundHrmpMessage}, + polkadot_node_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest}, polkadot_overseer::dummy::dummy_overseer_builder, polkadot_parachain_primitives::primitives::HeadData, polkadot_primitives::{ @@ -513,8 +512,10 @@ impl sc_consensus::Verifier for SealExtractorVerfier { } } -use cumulus_primitives_core::relay_chain::ValidationCodeHash; -use polkadot_node_subsystem::{overseer, OverseerSignal}; +use { + cumulus_primitives_core::relay_chain::ValidationCodeHash, + polkadot_node_subsystem::{overseer, OverseerSignal}, +}; pub struct DummyCodeHashProvider; impl ValidationCodeHashProvider for DummyCodeHashProvider { diff --git a/client/orchestrator-chain-rpc-interface/src/lib.rs b/client/orchestrator-chain-rpc-interface/src/lib.rs index de91a71ff..b0385dbbf 100644 --- a/client/orchestrator-chain-rpc-interface/src/lib.rs +++ b/client/orchestrator-chain-rpc-interface/src/lib.rs @@ -20,8 +20,9 @@ use { async_trait::async_trait, core::pin::Pin, dc_orchestrator_chain_interface::{ - BlockNumber, ContainerChainGenesisData, OrchestratorChainError, OrchestratorChainInterface, - OrchestratorChainResult, PHash, PHeader, + BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId, + OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash, + PHeader, }, dp_core::ParaId, futures::{Stream, StreamExt}, @@ -114,7 +115,7 @@ impl OrchestratorChainRpcClient { }; let res = self .request_tracing::("state_call", params, |err| { - tracing::trace!( + tracing::debug!( target: LOG_TARGET, %method_name, %hash, @@ -289,8 +290,12 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient { orchestrator_parent: PHash, para_id: ParaId, ) -> OrchestratorChainResult> { - self.call_remote_runtime_function("genesis_data", orchestrator_parent, Some(para_id)) - .await + self.call_remote_runtime_function( + "RegistrarApi_genesis_data", + orchestrator_parent, + Some(para_id), + ) + .await } async fn boot_nodes( @@ -298,8 +303,12 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient { orchestrator_parent: PHash, para_id: ParaId, ) -> OrchestratorChainResult>> { - self.call_remote_runtime_function("boot_nodes", orchestrator_parent, Some(para_id)) - .await + self.call_remote_runtime_function( + "RegistrarApi_boot_nodes", + orchestrator_parent, + Some(para_id), + ) + .await } async fn latest_block_number( @@ -307,8 +316,12 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient { orchestrator_parent: PHash, para_id: ParaId, ) -> OrchestratorChainResult> { - self.call_remote_runtime_function("latest_block_number", orchestrator_parent, Some(para_id)) - .await + self.call_remote_runtime_function( + "AuthorNotingApi_latest_block_number", + orchestrator_parent, + Some(para_id), + ) + .await } async fn best_block_hash(&self) -> OrchestratorChainResult { @@ -318,4 +331,17 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient { async fn finalized_block_hash(&self) -> OrchestratorChainResult { self.request("chain_getFinalizedHead", rpc_params![]).await } + + async fn data_preserver_active_assignment( + &self, + orchestrator_parent: PHash, + profile_id: DataPreserverProfileId, + ) -> OrchestratorChainResult> { + self.call_remote_runtime_function( + "DataPreserversApi_get_active_assignment", + orchestrator_parent, + Some(profile_id), + ) + .await + } } diff --git a/client/service-container-chain/Cargo.toml b/client/service-container-chain/Cargo.toml index d144a5c49..33e9db891 100644 --- a/client/service-container-chain/Cargo.toml +++ b/client/service-container-chain/Cargo.toml @@ -22,11 +22,12 @@ tokio = { workspace = true } tokio-util = { workspace = true } # Local -ccp-authorities-noting-inherent = { workspace = true } +ccp-authorities-noting-inherent = { workspace = true, features = [ "std" ] } dancebox-runtime = { workspace = true, features = [ "std" ] } manual-xcm-rpc = { workspace = true } node-common = { workspace = true } pallet-author-noting-runtime-api = { workspace = true, features = [ "std" ] } +pallet-data-preservers = { workspace = true, features = [ "std" ] } services-payment-rpc = { workspace = true } stream-payment-rpc = { workspace = true } tc-consensus = { workspace = true } @@ -81,6 +82,10 @@ cumulus-relay-chain-interface = { workspace = true } nimbus-consensus = { workspace = true } nimbus-primitives = { workspace = true } +[dev-dependencies] +polkadot-overseer = { workspace = true } +tokio-stream = { workspace = true } + [build-dependencies] substrate-build-script-utils = { workspace = true } @@ -90,6 +95,7 @@ runtime-benchmarks = [ "cumulus-primitives-core/runtime-benchmarks", "dancebox-runtime/runtime-benchmarks", "nimbus-primitives/runtime-benchmarks", + "pallet-data-preservers/runtime-benchmarks", "polkadot-primitives/runtime-benchmarks", "sc-service/runtime-benchmarks", "sp-runtime/runtime-benchmarks", @@ -97,6 +103,7 @@ runtime-benchmarks = [ try-runtime = [ "dancebox-runtime/try-runtime", "nimbus-primitives/try-runtime", + "pallet-data-preservers/try-runtime", "sp-runtime/try-runtime", ] diff --git a/client/service-container-chain/src/data_preservers.rs b/client/service-container-chain/src/data_preservers.rs new file mode 100644 index 000000000..44997cb59 --- /dev/null +++ b/client/service-container-chain/src/data_preservers.rs @@ -0,0 +1,429 @@ +// Copyright (C) Moondance Labs Ltd. +// This file is part of Tanssi. + +// Tanssi is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Tanssi is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Tanssi. If not, see + +use { + crate::spawner::{wait_for_paritydb_lock, Spawner}, + dc_orchestrator_chain_interface::{ + DataPreserverAssignment, OrchestratorChainError, OrchestratorChainInterface, + OrchestratorChainResult, + }, + futures::stream::StreamExt, + std::{future::Future, time::Duration}, + tc_consensus::ParaId, +}; + +pub type ProfileId = ::ProfileId; + +async fn try_fut(fut: impl Future>) -> Result { + fut.await +} + +/// Watch assignements by indefinitly listening to finalized block notifications and switching to +/// the chain the profile is assigned to. +pub async fn task_watch_assignment(spawner: impl Spawner, profile_id: ProfileId) { + use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment; + + if let OrchestratorChainResult::Err(e) = try_fut(async move { + let orchestrator_chain_interface = spawner.orchestrator_chain_interface(); + + let mut current_assignment = DataPreserverAssignment::::NotAssigned; + + let mut stream = orchestrator_chain_interface + .finality_notification_stream() + .await?; + + while let Some(header) = stream.next().await { + let hash = header.hash(); + + let new_assignment = orchestrator_chain_interface + .data_preserver_active_assignment(hash, profile_id) + .await?; + + log::info!("Assignement for block {hash}: {new_assignment:?}"); + + match (current_assignment, new_assignment) { + // no change + (x, y) if x == y => continue, + // switch from not assigned/inactive to active, start embeded node + ( + Assignment::NotAssigned | Assignment::Inactive(_), + Assignment::Active(para_id), + ) => { + spawner.spawn(para_id, false).await; + } + // Assignement switches from active to inactive for same para_id, we stop the + // embeded node but keep db + (Assignment::Active(para_id), Assignment::Inactive(x)) if para_id == x => { + let db_path = spawner.stop(para_id, true); // keep db + if let Some(db_path) = db_path { + wait_for_paritydb_lock(&db_path, Duration::from_secs(10)) + .await + .map_err(OrchestratorChainError::GenericError)?; + } + } + // No longer assigned or assigned inactive to other para id, remove previous node + ( + Assignment::Active(para_id), + Assignment::Inactive(_) | Assignment::NotAssigned, + ) => { + spawner.stop(para_id, false); // don't keep db + } + // Changed para id, remove previous node and start new one + (Assignment::Active(previous_para_id), Assignment::Active(para_id)) => { + let db_path = spawner.stop(previous_para_id, false); // don't keep db + if let Some(db_path) = db_path { + wait_for_paritydb_lock(&db_path, Duration::from_secs(10)) + .await + .map_err(OrchestratorChainError::GenericError)?; + } + + spawner.spawn(para_id, false).await; + } + // don't do anything yet + ( + Assignment::NotAssigned | Assignment::Inactive(_), + Assignment::NotAssigned | Assignment::Inactive(_), + ) => (), + } + + current_assignment = new_assignment; + } + + Ok(()) + }) + .await + { + log::error!("Error in data preservers assignement watching task: {e:?}"); + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + dc_orchestrator_chain_interface::{ + BlockNumber, DataPreserverProfileId, OrchestratorChainError, PHash, PHeader, + }, + dp_container_chain_genesis_data::ContainerChainGenesisData, + futures::Stream, + polkadot_overseer::Handle, + sc_client_api::StorageProof, + sp_core::H256, + std::{ + collections::BTreeMap, + ops::DerefMut, + path::PathBuf, + pin::Pin, + sync::{Arc, Mutex}, + time::Duration, + }, + tokio::sync::broadcast, + }; + + struct MockChainInterface { + state: Mutex, + notification_sender: broadcast::Sender, + } + + struct MockChainInterfaceState { + next_block_number: BlockNumber, + blocks: BTreeMap, + } + + struct BlockAssignment { + assignments: BTreeMap>, + } + + impl MockChainInterface { + fn new() -> Self { + Self { + state: Mutex::new(MockChainInterfaceState { + next_block_number: 0, + blocks: BTreeMap::new(), + }), + + notification_sender: broadcast::Sender::new(100), + } + } + + fn mock_block(&self, assignments: BTreeMap>) { + let mut state = self.state.lock().unwrap(); + state.next_block_number += 1; + + let header = PHeader { + parent_hash: H256::zero(), + number: state.next_block_number, + state_root: H256::zero(), + extrinsics_root: H256::zero(), + digest: Default::default(), + }; + let hash = header.hash(); + + state.blocks.insert(hash, BlockAssignment { assignments }); + + self.notification_sender + .send(header) + .expect("to properly send block header"); + } + } + + #[async_trait::async_trait] + impl OrchestratorChainInterface for MockChainInterface { + fn overseer_handle(&self) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn get_storage_by_key( + &self, + _orchestrator_parent: PHash, + _key: &[u8], + ) -> OrchestratorChainResult>> { + unimplemented!("not used in test") + } + + async fn prove_read( + &self, + _orchestrator_parent: PHash, + _relevant_keys: &Vec>, + ) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn import_notification_stream( + &self, + ) -> OrchestratorChainResult + Send>>> { + unimplemented!("not used in test") + } + + async fn new_best_notification_stream( + &self, + ) -> OrchestratorChainResult + Send>>> { + unimplemented!("not used in test") + } + + async fn finality_notification_stream( + &self, + ) -> OrchestratorChainResult + Send>>> { + let receiver = self.notification_sender.subscribe(); + let stream = tokio_stream::wrappers::BroadcastStream::new(receiver) + .filter_map(|x| async { x.ok() }); + let stream = Box::pin(stream); + Ok(stream) + } + + async fn genesis_data( + &self, + _orchestrator_parent: PHash, + _para_id: ParaId, + ) -> OrchestratorChainResult> { + unimplemented!("not used in test") + } + + async fn boot_nodes( + &self, + _orchestrator_parent: PHash, + _para_id: ParaId, + ) -> OrchestratorChainResult>> { + unimplemented!("not used in test") + } + + async fn latest_block_number( + &self, + _orchestrator_parent: PHash, + _para_id: ParaId, + ) -> OrchestratorChainResult> { + unimplemented!("not used in test") + } + + async fn best_block_hash(&self) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn finalized_block_hash(&self) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn data_preserver_active_assignment( + &self, + orchestrator_parent: PHash, + profile_id: DataPreserverProfileId, + ) -> OrchestratorChainResult> { + let mut state = self.state.lock().unwrap(); + let block = state.blocks.get_mut(&orchestrator_parent).ok_or_else(|| { + OrchestratorChainError::GenericError("this block is not mocked".into()) + })?; + + Ok(block + .assignments + .get(&profile_id) + .cloned() + .unwrap_or(DataPreserverAssignment::NotAssigned)) + } + } + + #[derive(Debug, PartialEq, Eq, Hash)] + enum SpawnerEvent { + Started(ParaId, bool), + Stopped(ParaId, bool), + } + + #[derive(Clone)] + struct MockSpawner { + state: Arc>>, + chain_interface: Arc, + } + + impl MockSpawner { + fn new() -> Self { + Self { + state: Arc::new(Mutex::new(Vec::new())), + chain_interface: Arc::new(MockChainInterface::new()), + } + } + + fn collect_events(&self) -> Vec { + let mut events = vec![]; + let mut state = self.state.lock().unwrap(); + std::mem::swap(state.deref_mut(), &mut events); + events + } + } + + impl Spawner for MockSpawner { + fn orchestrator_chain_interface(&self) -> Arc { + self.chain_interface.clone() + } + + /// Try to start a new container chain. In case of an error, this does not stop the node, and + /// the container chain will be attempted to spawn again when the collator is reassigned to it. + /// + /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails + /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop, + /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in + /// `handle_update_assignment`. + fn spawn( + &self, + container_chain_para_id: ParaId, + start_collation: bool, + ) -> impl std::future::Future + Send { + let mut set = self.state.lock().unwrap(); + set.push(SpawnerEvent::Started( + container_chain_para_id, + start_collation, + )); + + async {} + } + + /// Stop a container chain. Prints a warning if the container chain was not running. + /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock` + /// to ensure that the container chain has fully stopped. The database path can be `None` if the + /// chain was not running. + fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option { + let mut set = self.state.lock().unwrap(); + set.push(SpawnerEvent::Stopped(container_chain_para_id, keep_db)); + + None + } + } + + #[tokio::test] + async fn task_logic_works() { + let spawner = MockSpawner::new(); + + let profile_id = 0; + let para_id1 = ParaId::from(1); + let para_id2 = ParaId::from(2); + + tokio::spawn(task_watch_assignment(spawner.clone(), profile_id)); + // Wait for task to start and subscribe to block stream. + tokio::time::sleep(Duration::from_millis(100)).await; + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::Active(para_id1)); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + spawner.collect_events(), + vec![SpawnerEvent::Started(para_id1, false)] + ); + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::NotAssigned); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + spawner.collect_events(), + vec![SpawnerEvent::Stopped(para_id1, false)] + ); + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::Active(para_id2)); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + spawner.collect_events(), + vec![SpawnerEvent::Started(para_id2, false)] + ); + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::Active(para_id1)); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + spawner.collect_events(), + vec![ + SpawnerEvent::Stopped(para_id2, false), + SpawnerEvent::Started(para_id1, false) + ] + ); + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::Inactive(para_id1)); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + spawner.collect_events(), + vec![SpawnerEvent::Stopped(para_id1, true)] + ); + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::Inactive(para_id2)); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(spawner.collect_events(), vec![]); + + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::NotAssigned); + map + }); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(spawner.collect_events(), vec![]); + } +} diff --git a/client/service-container-chain/src/lib.rs b/client/service-container-chain/src/lib.rs index 3d3b14ae3..e024fbb55 100644 --- a/client/service-container-chain/src/lib.rs +++ b/client/service-container-chain/src/lib.rs @@ -16,6 +16,7 @@ pub mod chain_spec; pub mod cli; +pub mod data_preservers; pub mod monitor; pub mod rpc; pub mod service; diff --git a/client/service-container-chain/src/spawner.rs b/client/service-container-chain/src/spawner.rs index bfa0834d8..0607969f1 100644 --- a/client/service-container-chain/src/spawner.rs +++ b/client/service-container-chain/src/spawner.rs @@ -118,6 +118,7 @@ pub struct ContainerChainSpawnParams { pub spawn_handle: SpawnTaskHandle, pub collation_params: Option, pub sync_mode: SelectSyncMode, + pub data_preserver: bool, } /// Params specific to collation. This struct can contain types obtained through running an @@ -187,6 +188,7 @@ async fn try_spawn( spawn_handle, mut collation_params, sync_mode, + data_preserver, .. } = try_spawn_params; // Preload genesis data from orchestrator chain storage. @@ -253,12 +255,14 @@ async fn try_spawn( container_chain_para_id ); - if !start_collation { + if !data_preserver && !start_collation { + log::info!("This is a syncing container chain, using random ports"); + collation_params = None; - log::info!("This is a syncing container chain, using random ports"); // Use random ports to avoid conflicts with the other running container chain let random_ports = [23456, 23457, 23458]; + container_chain_cli .base .base @@ -527,7 +531,37 @@ async fn try_spawn( Ok(()) } -impl ContainerChainSpawner { +/// Interface for spawning and stopping container chain embeded nodes. +pub trait Spawner { + /// Access to the Orchestrator Chain Interface + fn orchestrator_chain_interface(&self) -> Arc; + + /// Try to start a new container chain. In case of an error, this does not stop the node, and + /// the container chain will be attempted to spawn again when the collator is reassigned to it. + /// + /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails + /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop, + /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in + /// `handle_update_assignment`. + fn spawn( + &self, + container_chain_para_id: ParaId, + start_collation: bool, + ) -> impl std::future::Future + Send; + + /// Stop a container chain. Prints a warning if the container chain was not running. + /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock` + /// to ensure that the container chain has fully stopped. The database path can be `None` if the + /// chain was not running. + fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option; +} + +impl Spawner for ContainerChainSpawner { + /// Access to the Orchestrator Chain Interface + fn orchestrator_chain_interface(&self) -> Arc { + self.params.orchestrator_chain_interface.clone() + } + /// Try to start a new container chain. In case of an error, this does not stop the node, and /// the container chain will be attempted to spawn again when the collator is reassigned to it. /// @@ -600,7 +634,9 @@ impl ContainerChainSpawner { } } } +} +impl ContainerChainSpawner { /// Receive and process `CcSpawnMsg`s indefinitely pub async fn rx_loop(mut self, mut rx: mpsc::UnboundedReceiver, validator: bool) { // The node always starts as an orchestrator chain collator. @@ -1019,7 +1055,7 @@ fn parse_boot_nodes_ignore_invalid( .collect() } -async fn wait_for_paritydb_lock(db_path: &Path, max_timeout: Duration) -> Result<(), String> { +pub async fn wait_for_paritydb_lock(db_path: &Path, max_timeout: Duration) -> Result<(), String> { let now = Instant::now(); while now.elapsed() < max_timeout { diff --git a/container-chains/nodes/simple/src/cli.rs b/container-chains/nodes/simple/src/cli.rs index 67190cb84..dac607619 100644 --- a/container-chains/nodes/simple/src/cli.rs +++ b/container-chains/nodes/simple/src/cli.rs @@ -61,10 +61,6 @@ pub enum Subcommand { /// Precompile the WASM runtime into native code PrecompileWasm(sc_cli::PrecompileWasmCmd), - - /// Starts in RPC provider mode, watching orchestrator chain for assignements to provide - /// RPC services for container chains. - RpcProvider(RpcProviderSubcommand), } #[derive(Debug, Parser)] @@ -115,13 +111,48 @@ pub struct Cli { #[arg(long)] pub no_hardware_benchmarks: bool, - /// Relay chain arguments - #[arg(raw = true)] - pub relay_chain_args: Vec, - /// Optional parachain id that should be used to build chain spec. #[arg(long)] pub para_id: Option, + + /// Profile id associated with the node, whose assignements will be followed to provide RPC services. + #[arg(long)] + pub rpc_provider_profile_id: Option, + + /// Endpoints to connect to orchestrator nodes, avoiding to start a local orchestrator node. + /// If this list is empty, a local embeded orchestrator node is started. + #[arg(long)] + pub orchestrator_endpoints: Vec, + + /// Relay chain arguments, optionally followed by "--" and container chain arguments + #[arg(raw = true)] + extra_args: Vec, +} + +impl Cli { + pub fn relaychain_args(&self) -> &[String] { + let (relay_chain_args, _) = self.split_extra_args_at_first_dashdash(); + + relay_chain_args + } + + pub fn container_chain_args(&self) -> &[String] { + let (_, container_chain_args) = self.split_extra_args_at_first_dashdash(); + + container_chain_args + } + + fn split_extra_args_at_first_dashdash(&self) -> (&[String], &[String]) { + let index_of_dashdash = self.extra_args.iter().position(|x| *x == "--"); + + if let Some(i) = index_of_dashdash { + let (container_chain_args, extra_extra) = self.extra_args.split_at(i); + (&extra_extra[1..], container_chain_args) + } else { + // Only relay chain args + (&self.extra_args, &[]) + } + } } #[derive(Debug)] @@ -178,16 +209,3 @@ impl CliConfiguration for BuildSpecCmd { Some(&self.base.node_key_params) } } - -#[derive(Debug, clap::Parser)] -#[group(skip)] -pub struct RpcProviderSubcommand { - /// Endpoints to connect to orchestrator nodes, avoiding to start a local orchestrator node. - /// If this list is empty, a local embeded orchestrator node is started. - #[arg(long)] - pub orchestrator_endpoints: Vec, - - /// Account associated with the node, whose assignements will be followed to provide RPC services. - #[arg(long)] - pub assignement_account: dp_core::AccountId, -} diff --git a/container-chains/nodes/simple/src/command.rs b/container-chains/nodes/simple/src/command.rs index 7469afea3..139c3e202 100644 --- a/container-chains/nodes/simple/src/command.rs +++ b/container-chains/nodes/simple/src/command.rs @@ -27,7 +27,6 @@ use { cumulus_primitives_core::ParaId, dc_orchestrator_chain_interface::OrchestratorChainInterface, frame_benchmarking_cli::{BenchmarkCmd, SUBSTRATE_REFERENCE_HARDWARE}, - futures::stream::StreamExt, log::{info, warn}, node_common::{command::generate_genesis_block, service::NodeBuilderConfig as _}, parity_scale_codec::Encode, @@ -36,11 +35,18 @@ use { ChainSpec, CliConfiguration, DefaultConfigurationValues, ImportParams, KeystoreParams, NetworkParams, Result, SharedParams, SubstrateCli, }, - sc_service::config::{BasePath, PrometheusConfig}, + sc_service::{ + config::{BasePath, PrometheusConfig}, + KeystoreContainer, + }, sc_telemetry::TelemetryWorker, sp_core::hexdisplay::HexDisplay, sp_runtime::traits::{AccountIdConversion, Block as BlockT}, std::{net::SocketAddr, sync::Arc}, + tc_service_container_chain::{ + cli::ContainerChainCli, + spawner::{ContainerChainSpawnParams, ContainerChainSpawner}, + }, }; fn load_spec(id: &str, para_id: ParaId) -> std::result::Result, String> { @@ -200,7 +206,7 @@ pub fn run() -> Result<()> { &config, [RelayChainCli::executable_name()] .iter() - .chain(cli.relay_chain_args.iter()), + .chain(cli.relaychain_args().iter()), ); let polkadot_config = SubstrateCli::create_configuration( @@ -281,110 +287,11 @@ pub fn run() -> Result<()> { )) }) } - Some(Subcommand::RpcProvider(cmd)) => { - let runner = cli.create_runner(&cli.run.normalize())?; - - runner.run_node_until_exit(|config| async move { - let client: Arc; - let mut task_manager; - - if cmd.orchestrator_endpoints.is_empty() { - todo!("Start in process node") - } else { - task_manager = TaskManager::new(tokio::runtime::Handle::current(), None) - .map_err(|e| sc_cli::Error::Application(Box::new(e)))?; - - client = tc_orchestrator_chain_rpc_interface::create_client_and_start_worker( - cmd.orchestrator_endpoints.clone(), - &mut task_manager, - None, - ) - .await - .map(Arc::new) - .map_err(|e| sc_cli::Error::Application(Box::new(e)))?; - }; - - // POC: Try to fetch some data through the interface. - { - let client = client.clone(); - - task_manager - .spawn_handle() - .spawn("rpc_provider_exemple", None, async move { - let mut stream = client.new_best_notification_stream().await.unwrap(); - - while let Some(header) = stream.next().await { - log::info!("New best block: {}", header.hash()); - } - }); - } - - // POC: Spawn container chain embed node - { - let collator_options = cli.run.collator_options(); - - let polkadot_cli = RelayChainCli::new( - &config, - [RelayChainCli::executable_name()] - .iter() - .chain(cli.relay_chain_args.iter()), - ); - - let tokio_handle = config.tokio_handle.clone(); - let polkadot_config = SubstrateCli::create_configuration( - &polkadot_cli, - &polkadot_cli, - tokio_handle, - ) - .map_err(|err| format!("Relay chain argument error: {}", err))?; - - let telemetry = config - .telemetry_endpoints - .clone() - .filter(|x| !x.is_empty()) - .map(|endpoints| -> std::result::Result<_, sc_telemetry::Error> { - let worker = TelemetryWorker::new(16)?; - let telemetry = worker.handle().new_telemetry(endpoints); - Ok((worker, telemetry)) - }) - .transpose() - .map_err(sc_service::Error::Telemetry)?; - - let telemetry_worker_handle = - telemetry.as_ref().map(|(worker, _)| worker.handle()); - - let (relay_chain_interface, _collation_pair) = build_relay_chain_interface( - polkadot_config, - &config, - telemetry_worker_handle, - &mut task_manager, - collator_options, - None, - ) - .await - .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; - - let keystore_container = sc_service::KeystoreContainer::new( - &sc_service::config::KeystoreConfig::InMemory, - )?; - let keystore = keystore_container.keystore(); - - let (_cc_task_manager, _cc_client, _cc_backend) = - tc_service_container_chain::service::start_node_impl_container( - config, - relay_chain_interface, - client, - keystore, - ParaId::from(2001), - None, - ) - .await?; - } - - Ok(task_manager) - }) - } None => { + if let Some(profile_id) = cli.rpc_provider_profile_id { + return rpc_provider_mode(cli, profile_id); + } + let runner = cli.create_runner(&cli.run.normalize())?; let collator_options = cli.run.collator_options(); @@ -401,7 +308,7 @@ pub fn run() -> Result<()> { let polkadot_cli = RelayChainCli::new( &config, - [RelayChainCli::executable_name()].iter().chain(cli.relay_chain_args.iter()), + [RelayChainCli::executable_name()].iter().chain(cli.relaychain_args().iter()), ); let extension = chain_spec::Extensions::try_get(&*config.chain_spec); @@ -443,7 +350,7 @@ pub fn run() -> Result<()> { if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) = collator_options.clone().relay_chain_mode { - if !rpc_target_urls.is_empty() && !cli.relay_chain_args.is_empty() { + if !rpc_target_urls.is_empty() && !cli.relaychain_args().is_empty() { warn!("Detected relay chain node arguments together with --relay-chain-rpc-url. This command starts a minimal Polkadot node that only uses a network-related subset of all relay chain CLI options."); } } @@ -592,3 +499,152 @@ impl CliConfiguration for RelayChainCli { self.base.base.node_name() } } + +fn rpc_provider_mode(cli: Cli, profile_id: u64) -> Result<()> { + log::info!("Starting in RPC provider mode!"); + + let runner = cli.create_runner(&cli.run.normalize())?; + + runner.run_node_until_exit(|config| async move { + let orchestrator_chain_interface: Arc; + let mut task_manager; + + if cli.orchestrator_endpoints.is_empty() { + todo!("Start in process node") + } else { + task_manager = TaskManager::new(config.tokio_handle.clone(), None) + .map_err(|e| sc_cli::Error::Application(Box::new(e)))?; + + orchestrator_chain_interface = + tc_orchestrator_chain_rpc_interface::create_client_and_start_worker( + cli.orchestrator_endpoints.clone(), + &mut task_manager, + None, + ) + .await + .map(Arc::new) + .map_err(|e| sc_cli::Error::Application(Box::new(e)))?; + }; + + // Spawn assignment watcher + { + let mut container_chain_cli = ContainerChainCli::new( + &config, + [ContainerChainCli::executable_name()] + .iter() + .chain(cli.container_chain_args().iter()), + ); + + // If the container chain args have no --wasmtime-precompiled flag, use the same as the orchestrator + if container_chain_cli + .base + .base + .import_params + .wasmtime_precompiled + .is_none() + { + container_chain_cli + .base + .base + .import_params + .wasmtime_precompiled + .clone_from(&config.wasmtime_precompiled); + } + + log::info!("Container chain CLI: {container_chain_cli:?}"); + + let para_id = chain_spec::Extensions::try_get(&*config.chain_spec) + .map(|e| e.para_id) + .ok_or("Could not find parachain ID in chain-spec.")?; + + let para_id = ParaId::from(para_id); + + // TODO: Once there is an embeded node this should use it. + let keystore_container = KeystoreContainer::new(&config.keystore)?; + + let collator_options = cli.run.collator_options(); + + let polkadot_cli = RelayChainCli::new( + &config, + [RelayChainCli::executable_name()] + .iter() + .chain(cli.relaychain_args().iter()), + ); + + let tokio_handle = config.tokio_handle.clone(); + let polkadot_config = + SubstrateCli::create_configuration(&polkadot_cli, &polkadot_cli, tokio_handle) + .map_err(|err| format!("Relay chain argument error: {}", err))?; + + let telemetry = config + .telemetry_endpoints + .clone() + .filter(|x| !x.is_empty()) + .map(|endpoints| -> std::result::Result<_, sc_telemetry::Error> { + let worker = TelemetryWorker::new(16)?; + let telemetry = worker.handle().new_telemetry(endpoints); + Ok((worker, telemetry)) + }) + .transpose() + .map_err(sc_service::Error::Telemetry)?; + + let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle()); + + let (relay_chain_interface, _collation_pair) = build_relay_chain_interface( + polkadot_config, + &config, + telemetry_worker_handle, + &mut task_manager, + collator_options, + None, + ) + .await + .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; + + let relay_chain = crate::chain_spec::Extensions::try_get(&*config.chain_spec) + .map(|e| e.relay_chain.clone()) + .ok_or("Could not find relay_chain extension in chain-spec.")?; + + let container_chain_spawner = ContainerChainSpawner { + params: ContainerChainSpawnParams { + orchestrator_chain_interface, + container_chain_cli, + tokio_handle: config.tokio_handle.clone(), + chain_type: config.chain_spec.chain_type(), + relay_chain, + relay_chain_interface, + sync_keystore: keystore_container.keystore(), + orchestrator_para_id: para_id, + collation_params: None, + spawn_handle: task_manager.spawn_handle().clone(), + // We can use warp sync because the warp sync bug only affects collators + sync_mode: { move |_db_exists, _para_id| Ok(sc_cli::SyncMode::Warp) }, + data_preserver: true, + }, + state: Default::default(), + collate_on_tanssi: Arc::new(|| { + panic!("Called collate_on_tanssi outside of Tanssi node") + }), + collation_cancellation_constructs: None, + }; + let state = container_chain_spawner.state.clone(); + + task_manager.spawn_essential_handle().spawn( + "container-chain-assignment-watcher", + None, + tc_service_container_chain::data_preservers::task_watch_assignment( + container_chain_spawner, + profile_id, + ), + ); + + task_manager.spawn_essential_handle().spawn( + "container-chain-spawner-debug-state", + None, + tc_service_container_chain::monitor::monitor_task(state), + ); + } + + Ok(task_manager) + }) +} diff --git a/node/Cargo.toml b/node/Cargo.toml index 343eb8c24..428ad2083 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -38,6 +38,7 @@ node-common = { workspace = true } pallet-author-noting-runtime-api = { workspace = true, features = [ "std" ] } pallet-collator-assignment-runtime-api = { workspace = true, features = [ "std" ] } pallet-configuration = { workspace = true, features = [ "std" ] } +pallet-data-preservers-runtime-api = { workspace = true, features = [ "std" ] } pallet-registrar-runtime-api = { workspace = true, features = [ "std" ] } services-payment-rpc = { workspace = true } stream-payment-rpc = { workspace = true } diff --git a/node/src/cli.rs b/node/src/cli.rs index 9b7c60a9c..b7c6171da 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -184,7 +184,7 @@ pub struct Cli { #[arg(long)] pub para_id: Option, - /// Relay chain arguments, optionally followed by "--" and orchestrator chain arguments + /// Extra arguments, `container-args -- relay-args` or `relay-args` if no `--` #[arg(raw = true)] extra_args: Vec, } diff --git a/node/src/service.rs b/node/src/service.rs index fce4441ee..a8703ebac 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -34,13 +34,15 @@ use { AccountId, RuntimeApi, }, dc_orchestrator_chain_interface::{ - BlockNumber, ContainerChainGenesisData, OrchestratorChainError, OrchestratorChainInterface, - OrchestratorChainResult, PHash, PHeader, + BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId, + OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash, + PHeader, }, futures::{Stream, StreamExt}, nimbus_primitives::{NimbusId, NimbusPair}, node_common::service::{ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing}, pallet_author_noting_runtime_api::AuthorNotingApi, + pallet_data_preservers_runtime_api::DataPreserversApi, pallet_registrar_runtime_api::RegistrarApi, parity_scale_codec::Encode, polkadot_cli::ProvideRuntimeApi, @@ -439,6 +441,7 @@ async fn start_node_impl( relay_chain_interface, sync_keystore, orchestrator_para_id: para_id, + data_preserver: false, collation_params: if validator { Some(spawner::CollationParams { orchestrator_client: orchestrator_client.clone(), @@ -938,7 +941,8 @@ where Client::Api: TanssiAuthorityAssignmentApi + OnDemandBlockProductionApi + RegistrarApi - + AuthorNotingApi, + + AuthorNotingApi + + DataPreserversApi, { async fn get_storage_by_key( &self, @@ -1038,4 +1042,25 @@ where async fn finalized_block_hash(&self) -> OrchestratorChainResult { Ok(self.backend.blockchain().info().finalized_hash) } + + async fn data_preserver_active_assignment( + &self, + orchestrator_parent: PHash, + profile_id: DataPreserverProfileId, + ) -> OrchestratorChainResult> { + let runtime_api = self.full_client.runtime_api(); + + use { + dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment, + pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment, + }; + + Ok( + match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? { + RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned, + RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id), + RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id), + }, + ) + } } diff --git a/pallets/data-preservers/runtime-api/Cargo.toml b/pallets/data-preservers/runtime-api/Cargo.toml new file mode 100644 index 000000000..74bb52217 --- /dev/null +++ b/pallets/data-preservers/runtime-api/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "pallet-data-preservers-runtime-api" +authors = { workspace = true } +description = "Runtime API definition of pallet-data-preservers" +edition = "2021" +license = "GPL-3.0-only" +version = "0.1.0" + +[package.metadata.docs.rs] +targets = [ "x86_64-unknown-linux-gnu" ] + +[lints] +workspace = true + +[dependencies] +parity-scale-codec = { workspace = true } +scale-info = { workspace = true } +serde = { workspace = true, features = [ "derive" ] } +sp-api = { workspace = true } +thiserror = { workspace = true, optional = true } + +[features] +default = [ "std" ] +std = [ + "parity-scale-codec/std", + "scale-info/std", + "serde/std", + "sp-api/std", + "thiserror", +] diff --git a/pallets/data-preservers/runtime-api/src/lib.rs b/pallets/data-preservers/runtime-api/src/lib.rs new file mode 100644 index 000000000..77d2a94ed --- /dev/null +++ b/pallets/data-preservers/runtime-api/src/lib.rs @@ -0,0 +1,53 @@ +// Copyright (C) Moondance Labs Ltd. +// This file is part of Tanssi. + +// Tanssi is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Tanssi is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Tanssi. If not, see + +//! Runtime API for DataPreservers pallet + +#![cfg_attr(not(feature = "std"), no_std)] + +extern crate alloc; + +use { + parity_scale_codec::{Decode, Encode}, + serde::{Deserialize, Serialize}, +}; + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, Encode, Decode, scale_info::TypeInfo, Serialize, Deserialize, +)] +pub enum Assignment { + /// Profile is not currently assigned. + NotAssigned, + /// Profile is activly assigned to this ParaId. + Active(ParaId), + /// Profile is assigned to this ParaId but is inactive for some reason. + /// It may be causes by conditions defined in the assignement configuration, + /// such as lacking payment. + Inactive(ParaId), +} + +sp_api::decl_runtime_apis! { + pub trait DataPreserversApi + where + ProfileId: parity_scale_codec::Codec, + ParaId: parity_scale_codec::Codec, + { + /// Get the active assignment for this profile id. + fn get_active_assignment( + profile_id: ProfileId, + ) -> Assignment; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cfbfab59d..283c13c97 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -6,15 +6,6 @@ settings: importers: - .: - dependencies: - api-augment:0.400.0: - specifier: link:@tanssi/api-augment:0.400.0 - version: link:@tanssi/api-augment:0.400.0 - api-augment:latest: - specifier: link:@tanssi/api-augment:latest - version: link:@tanssi/api-augment:latest - test: dependencies: '@zombienet/orchestrator': diff --git a/runtime/dancebox/Cargo.toml b/runtime/dancebox/Cargo.toml index 5509ef701..d93db612c 100644 --- a/runtime/dancebox/Cargo.toml +++ b/runtime/dancebox/Cargo.toml @@ -32,6 +32,7 @@ pallet-collator-assignment = { workspace = true } pallet-collator-assignment-runtime-api = { workspace = true } pallet-configuration = { workspace = true } pallet-data-preservers = { workspace = true } +pallet-data-preservers-runtime-api = { workspace = true } pallet-inflation-rewards = { workspace = true } pallet-initializer = { workspace = true } pallet-pooled-staking = { workspace = true } @@ -197,6 +198,7 @@ std = [ "pallet-collator-assignment-runtime-api/std", "pallet-collator-assignment/std", "pallet-configuration/std", + "pallet-data-preservers-runtime-api/std", "pallet-data-preservers/std", "pallet-foreign-asset-creator/std", "pallet-identity/std", diff --git a/runtime/dancebox/src/lib.rs b/runtime/dancebox/src/lib.rs index 0b76eb621..6f4e99ee9 100644 --- a/runtime/dancebox/src/lib.rs +++ b/runtime/dancebox/src/lib.rs @@ -1051,13 +1051,15 @@ impl pallet_data_preservers::AssignmentPayment for PreserversAssignem } } +pub type DataPreserversProfileId = u64; + impl pallet_data_preservers::Config for Runtime { type RuntimeEvent = RuntimeEvent; type RuntimeHoldReason = RuntimeHoldReason; type Currency = Balances; type WeightInfo = weights::pallet_data_preservers::SubstrateWeight; - type ProfileId = u64; + type ProfileId = DataPreserversProfileId; type ProfileDeposit = tp_traits::BytesDeposit; type AssignmentPayment = PreserversAssignementPayment; @@ -2564,6 +2566,25 @@ impl_runtime_apis! { } } + impl pallet_data_preservers_runtime_api::DataPreserversApi for Runtime { + fn get_active_assignment( + profile_id: DataPreserversProfileId, + ) -> pallet_data_preservers_runtime_api::Assignment { + use pallet_data_preservers_runtime_api::Assignment; + + let Some((para_id, witness)) = pallet_data_preservers::Profiles::::get(profile_id) + .and_then(|x| x.assignment) else + { + return Assignment::NotAssigned; + }; + + match witness { + PreserversAssignementPaymentWitness::Free => Assignment::Active(para_id), + // TODO: Add Stream Payment. Stalled stream should return Inactive. + } + } + } + impl dp_slot_duration_runtime_api::TanssiSlotDurationApi for Runtime { fn slot_duration() -> u64 { SLOT_DURATION diff --git a/runtime/flashbox/Cargo.toml b/runtime/flashbox/Cargo.toml index 936de4952..9da1e2a72 100644 --- a/runtime/flashbox/Cargo.toml +++ b/runtime/flashbox/Cargo.toml @@ -31,6 +31,7 @@ pallet-collator-assignment = { workspace = true } pallet-collator-assignment-runtime-api = { workspace = true } pallet-configuration = { workspace = true } pallet-data-preservers = { workspace = true } +pallet-data-preservers-runtime-api = { workspace = true } pallet-inflation-rewards = { workspace = true } pallet-initializer = { workspace = true } pallet-proxy = { workspace = true } @@ -157,6 +158,7 @@ std = [ "pallet-collator-assignment-runtime-api/std", "pallet-collator-assignment/std", "pallet-configuration/std", + "pallet-data-preservers-runtime-api/std", "pallet-data-preservers/std", "pallet-identity/std", "pallet-inflation-rewards/std", diff --git a/runtime/flashbox/src/lib.rs b/runtime/flashbox/src/lib.rs index 938e5d7ec..41283f419 100644 --- a/runtime/flashbox/src/lib.rs +++ b/runtime/flashbox/src/lib.rs @@ -897,13 +897,15 @@ impl pallet_data_preservers::AssignmentPayment for PreserversAssignem } } +pub type DataPreserversProfileId = u64; + impl pallet_data_preservers::Config for Runtime { type RuntimeEvent = RuntimeEvent; type RuntimeHoldReason = RuntimeHoldReason; type Currency = Balances; type WeightInfo = weights::pallet_data_preservers::SubstrateWeight; - type ProfileId = u64; + type ProfileId = DataPreserversProfileId; type ProfileDeposit = tp_traits::BytesDeposit; type AssignmentPayment = PreserversAssignementPayment; @@ -2076,6 +2078,25 @@ impl_runtime_apis! { } } + impl pallet_data_preservers_runtime_api::DataPreserversApi for Runtime { + fn get_active_assignment( + profile_id: DataPreserversProfileId, + ) -> pallet_data_preservers_runtime_api::Assignment { + use pallet_data_preservers_runtime_api::Assignment; + + let Some((para_id, witness)) = pallet_data_preservers::Profiles::::get(profile_id) + .and_then(|x| x.assignment) else + { + return Assignment::NotAssigned; + }; + + match witness { + PreserversAssignementPaymentWitness::Free => Assignment::Active(para_id), + // TODO: Add Stream Payment. Stalled stream should return Inactive. + } + } + } + impl async_backing_primitives::UnincludedSegmentApi for Runtime { fn can_build_upon( included_hash: ::Hash, diff --git a/test/configs/zombieDataPreservers.json b/test/configs/zombieDataPreservers.json new file mode 100644 index 000000000..403440385 --- /dev/null +++ b/test/configs/zombieDataPreservers.json @@ -0,0 +1,115 @@ +{ + "settings": { + "timeout": 1000, + "provider": "native" + }, + "relaychain": { + "chain": "rococo-local", + "default_command": "tmp/polkadot", + "default_args": ["--no-hardware-benchmarks", "-lparachain=debug", "--database=paritydb", "--no-beefy"], + "genesis": { + "runtimeGenesis": { + "patch": { + "configuration": { + "config": { + "async_backing_params": { + "allowed_ancestry_len": 2, + "max_candidate_depth": 3 + }, + "scheduler_params": { + "scheduling_lookahead": 2 + } + } + } + } + } + }, + "nodes": [ + { + "name": "alice", + "ws_port": "9947", + "validator": true + }, + { + "name": "bob", + "validator": true + } + ] + }, + "parachains": [ + { + "id": 1000, + "chain_spec_path": "specs/single-container-tanssi-1000.json", + "COMMENT": "Important: these collators will not be injected to pallet-invulnerables because zombienet does not support that. When changing the collators list, make sure to update `scripts/build-spec-single-container.sh`", + "collators": [ + { + "name": "FullNode-1000", + "validator": false, + "command": "../target/release/tanssi-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"], + "ws_port": 9948 + }, + { + "name": "Collator1000-01", + "command": "../target/release/tanssi-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"] + }, + { + "name": "Collator1000-02", + "command": "../target/release/tanssi-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"] + }, + { + "name": "Collator2000-01", + "command": "../target/release/tanssi-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"] + }, + { + "name": "Collator2000-02", + "command": "../target/release/tanssi-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"] + }, + { + "name": "Collator1000-03", + "command": "../target/release/tanssi-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"] + } + ] + }, + { + "id": 2000, + "chain_spec_path": "specs/single-container-template-container-2000.json", + "collators": [ + { + "name": "FullNode-2000", + "validator": false, + "command": "../target/release/container-chain-simple-node", + "args": ["--no-hardware-benchmarks", "--database=paritydb", "--wasmtime-precompiled=wasm"], + "ws_port": 9949, + "p2p_port": 33049, + "prometheus_port": 33102 + }, + { + "name": "DataPreserver", + "validator": false, + "command": "../target/release/container-chain-simple-node", + "args": [ + "--rpc-provider-profile-id=0", + "--orchestrator-endpoints=ws://127.0.0.1:9948/", + "--database=paritydb", + "--wasmtime-precompiled=wasm", + "-- --rpc-port 9950" + ], + "prometheus_port": 33102 + } + ] + } + ], + "types": { + "Header": { + "number": "u64", + "parent_hash": "Hash", + "post_state": "Hash" + } + } +} diff --git a/test/moonwall.config.json b/test/moonwall.config.json index e134d2789..6770fe57b 100644 --- a/test/moonwall.config.json +++ b/test/moonwall.config.json @@ -707,6 +707,46 @@ } ] }, + { + "name": "zombie_data_preservers", + "testFileDir": ["suites/data-preservers"], + "runScripts": [ + "build-spec-single-container.sh", + "download-polkadot.sh", + "compile-wasm.ts compile -b ../target/release/tanssi-node -o wasm -c specs/single-container-tanssi-1000.json", + "compile-wasm.ts compile -b ../target/release/container-chain-simple-node -o wasm -c specs/single-container-template-container-2000.json" + ], + "timeout": 600000, + "foundation": { + "type": "zombie", + "zombieSpec": { + "configPath": "./configs/zombieDataPreservers.json", + "skipBlockCheck": ["DataPreserver"] + } + }, + "connections": [ + { + "name": "Relay", + "type": "polkadotJs", + "endpoints": ["ws://127.0.0.1:9947"] + }, + { + "name": "Tanssi", + "type": "polkadotJs", + "endpoints": ["ws://127.0.0.1:9948"] + }, + { + "name": "Container2000", + "type": "polkadotJs", + "endpoints": ["ws://127.0.0.1:9949"] + }, + { + "name": "DataPreserver", + "type": "polkadotJs", + "endpoints": ["ws://127.0.0.1:9950"] + } + ] + }, { "name": "dancebox_smoke", "testFileDir": ["suites/smoke-test-dancebox", "suites/smoke-test-common"], diff --git a/test/suites/data-preservers/test_data_preservers.ts b/test/suites/data-preservers/test_data_preservers.ts new file mode 100644 index 000000000..f587ef5ca --- /dev/null +++ b/test/suites/data-preservers/test_data_preservers.ts @@ -0,0 +1,142 @@ +import { beforeAll, describeSuite, expect } from "@moonwall/cli"; +import { ApiPromise, Keyring } from "@polkadot/api"; +import { signAndSendAndInclude } from "../../util/block"; +import { getHeaderFromRelay } from "../../util/relayInterface"; +import fs from "fs/promises"; + +describeSuite({ + id: "DP01", + title: "Data Preservers Test", + foundationMethods: "zombie", + testCases: function ({ it, context }) { + let paraApi: ApiPromise; + let relayApi: ApiPromise; + let container2000Api: ApiPromise; + + beforeAll(async () => { + paraApi = context.polkadotJs("Tanssi"); + relayApi = context.polkadotJs("Relay"); + container2000Api = context.polkadotJs("Container2000"); + + const relayNetwork = relayApi.consts.system.version.specName.toString(); + expect(relayNetwork, "Relay API incorrect").to.contain("rococo"); + + const paraNetwork = paraApi.consts.system.version.specName.toString(); + const paraId1000 = (await paraApi.query.parachainInfo.parachainId()).toString(); + expect(paraNetwork, "Para API incorrect").to.contain("dancebox"); + expect(paraId1000, "Para API incorrect").to.be.equal("1000"); + + const container2000Network = container2000Api.consts.system.version.specName.toString(); + const paraId2000 = (await container2000Api.query.parachainInfo.parachainId()).toString(); + expect(container2000Network, "Container2000 API incorrect").to.contain("container-chain-template"); + expect(paraId2000, "Container2000 API incorrect").to.be.equal("2000"); + + // Test block numbers in relay are 0 yet + const header2000 = await getHeaderFromRelay(relayApi, 2000); + + expect(header2000.number.toNumber()).to.be.equal(0); + }, 120000); + + it({ + id: "T01", + title: "Blocks are being produced on parachain", + test: async function () { + const blockNum = (await paraApi.rpc.chain.getBlock()).block.header.number.toNumber(); + expect(blockNum).to.be.greaterThan(0); + }, + }); + + it({ + id: "T02", + title: "Data preservers watcher properly starts", + test: async function () { + const logFilePath = getTmpZombiePath() + "/DataPreserver.log"; + await waitForLogs(logFilePath, 300, ["Assignement for block"]); + }, + }); + + it({ + id: "T03", + title: "Change assignment", + test: async function () { + const logFilePath = getTmpZombiePath() + "/DataPreserver.log"; + const keyring = new Keyring({ type: "sr25519" }); + const alice = keyring.addFromUri("//Alice", { name: "Alice default" }); + + const profile = { + url: "exemple", + paraIds: "AnyParaId", + mode: { rpc: { supportsEthereumRpc: false } }, + }; + + { + const tx = paraApi.tx.dataPreservers.forceCreateProfile(profile, alice.address); + await signAndSendAndInclude(paraApi.tx.sudo.sudo(tx), alice); + await context.waitBlock(1, "Tanssi"); + } + + { + const tx = paraApi.tx.dataPreservers.forceStartAssignment(0, 2000, "Free"); + await signAndSendAndInclude(paraApi.tx.sudo.sudo(tx), alice); + await context.waitBlock(1, "Tanssi"); + } + + await waitForLogs(logFilePath, 300, ["Active(Id(2000))"]); + }, + }); + + it({ + id: "T04", + title: "RPC endpoint is properly started", + test: async function () { + const preserverApi = context.polkadotJs("DataPreserver"); + const container2000Network = preserverApi.consts.system.version.specName.toString(); + const paraId2000 = (await preserverApi.query.parachainInfo.parachainId()).toString(); + expect(container2000Network, "Container2000 API incorrect").to.contain("container-chain-template"); + expect(paraId2000, "Container2000 API incorrect").to.be.equal("2000"); + }, + }); + }, +}); + +// Checks every second the log file to find the watcher best block notification until it is found or +// timeout is reached. +async function waitForLogs(logFilePath: string, timeout: number, logs: string[]): Promise { + for (let i = 0; i < timeout; i++) { + if (checkLogsNoFail(logFilePath, logs)) { + return; + } + + await delay(1000); + } + + expect.fail(`RPC Assignment Watch log was not found after ${timeout} seconds.`); +} + +// Read log file path and check that all the logs are found in order. +// Only supports single-line logs. +async function checkLogsNoFail(logFilePath: string, logs: string[]): Promise { + const fileContent = await fs.readFile(logFilePath, "utf8"); + const lines = fileContent.split("\n"); + + let logIndex = 0; + + for (let i = 0; i < lines.length; i++) { + if (logIndex < logs.length && lines[i].includes(logs[logIndex])) { + logIndex++; + } + + if (logIndex === logs.length) { + break; + } + } + + return logIndex === logs.length; +} + +/// Returns the /tmp/zombie-52234... path +function getTmpZombiePath() { + return process.env.MOON_ZOMBIE_DIR; +} + +const delay = (ms) => new Promise((res) => setTimeout(res, ms));