Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Spawn embeded Container chain node on data preservers assignment #666

Merged
merged 30 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5533d82
wip
nanocryk Aug 9, 2024
9a1f79d
setup new moonwall profile (not working yet)
nanocryk Aug 13, 2024
bcc1545
get non-modified test suite working
nanocryk Aug 19, 2024
4fc4e5b
wip spawner
nanocryk Aug 19, 2024
6c0a964
fix runtime api call
nanocryk Aug 20, 2024
22186f4
hack to ignore absence of metrics in rpc mode
nanocryk Aug 20, 2024
98aad02
watcher properly reacts to assignment
nanocryk Aug 22, 2024
977850e
working RPC endpoint spawning with test
nanocryk Aug 23, 2024
c3822a7
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Aug 26, 2024
ee7ba6c
rename interface fn
nanocryk Aug 26, 2024
91a9b93
update lock + data preservers flag in spawner
nanocryk Aug 27, 2024
fd9eb96
get rid of script
nanocryk Aug 28, 2024
51de92b
cleanup
nanocryk Aug 28, 2024
2016586
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Aug 28, 2024
e2f02d2
cleanup
nanocryk Aug 28, 2024
302b0a9
Improve docs
nanocryk Aug 28, 2024
92c7757
add suite to CI
nanocryk Aug 29, 2024
3e34c36
cleanup and comments
nanocryk Aug 29, 2024
055e7ce
zepter
nanocryk Aug 29, 2024
4e22f63
api augment
nanocryk Aug 29, 2024
22e510b
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Aug 29, 2024
59f96d3
add back log
nanocryk Aug 29, 2024
cd35c26
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Sep 2, 2024
20fdcf7
spawner mock
nanocryk Sep 3, 2024
97f6ffa
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Sep 3, 2024
b22442f
test + PR feedback
nanocryk Sep 4, 2024
a2efc47
lints
nanocryk Sep 5, 2024
02a5170
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Sep 5, 2024
1f1b95e
improve test
nanocryk Sep 5, 2024
cf2bd36
Merge remote-tracking branch 'origin/master' into jeremy-spawn-cc-on-…
nanocryk Sep 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pallet-collator-assignment = { path = "pallets/collator-assignment", default-fea
pallet-collator-assignment-runtime-api = { path = "pallets/collator-assignment/runtime-api", default-features = false }
pallet-configuration = { path = "pallets/configuration", default-features = false }
pallet-data-preservers = { path = "pallets/data-preservers", default-features = false }
pallet-data-preservers-runtime-api = { path = "pallets/data-preservers/runtime-api", default-features = false }
pallet-inflation-rewards = { path = "pallets/inflation-rewards", default-features = false }
pallet-initializer = { path = "pallets/initializer", default-features = false }
pallet-invulnerables = { path = "pallets/invulnerables", default-features = false }
Expand Down
26 changes: 20 additions & 6 deletions client/orchestrator-chain-rpc-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use {
async_trait::async_trait,
core::pin::Pin,
dc_orchestrator_chain_interface::{
BlockNumber, ContainerChainGenesisData, OrchestratorChainError, OrchestratorChainInterface,
OrchestratorChainResult, PHash, PHeader,
BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
PHeader,
},
dp_core::ParaId,
futures::{Stream, StreamExt},
Expand Down Expand Up @@ -114,7 +115,7 @@ impl OrchestratorChainRpcClient {
};
let res = self
.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
tracing::trace!(
tracing::debug!(
target: LOG_TARGET,
%method_name,
%hash,
Expand Down Expand Up @@ -289,7 +290,7 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient {
orchestrator_parent: PHash,
para_id: ParaId,
) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
self.call_remote_runtime_function("genesis_data", orchestrator_parent, Some(para_id))
self.call_remote_runtime_function("RegistrarApi_genesis_data", orchestrator_parent, Some(para_id))
.await
}

Expand All @@ -298,7 +299,7 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient {
orchestrator_parent: PHash,
para_id: ParaId,
) -> OrchestratorChainResult<Vec<Vec<u8>>> {
self.call_remote_runtime_function("boot_nodes", orchestrator_parent, Some(para_id))
self.call_remote_runtime_function("RegistrarApi_boot_nodes", orchestrator_parent, Some(para_id))
.await
}

Expand All @@ -307,7 +308,7 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient {
orchestrator_parent: PHash,
para_id: ParaId,
) -> OrchestratorChainResult<Option<BlockNumber>> {
self.call_remote_runtime_function("latest_block_number", orchestrator_parent, Some(para_id))
self.call_remote_runtime_function("AuthorNotingApi_latest_block_number", orchestrator_parent, Some(para_id))
.await
}

Expand All @@ -318,4 +319,17 @@ impl OrchestratorChainInterface for OrchestratorChainRpcClient {
async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
self.request("chain_getFinalizedHead", rpc_params![]).await
}

async fn data_preserver_active_assignment(
nanocryk marked this conversation as resolved.
Show resolved Hide resolved
&self,
orchestrator_parent: PHash,
profile_id: DataPreserverProfileId,
) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
self.call_remote_runtime_function(
"DataPreserversApi_get_active_assignment",
orchestrator_parent,
Some(profile_id),
)
.await
}
}
1 change: 1 addition & 0 deletions client/service-container-chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dancebox-runtime = { workspace = true, features = [ "std" ] }
manual-xcm-rpc = { workspace = true }
node-common = { workspace = true }
pallet-author-noting-runtime-api = { workspace = true, features = [ "std" ] }
pallet-data-preservers = { workspace = true, features = [ "std" ] }
services-payment-rpc = { workspace = true }
stream-payment-rpc = { workspace = true }
tc-consensus = { workspace = true }
Expand Down
90 changes: 90 additions & 0 deletions client/service-container-chain/src/data_preservers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (C) Moondance Labs Ltd.
// This file is part of Tanssi.

// Tanssi is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Tanssi is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Tanssi. If not, see <http://www.gnu.org/licenses/>

use {
crate::spawner::{ContainerChainSpawner, TSelectSyncMode},
dc_orchestrator_chain_interface::{
DataPreserverAssignment, OrchestratorChainInterface, OrchestratorChainResult,
},
futures::stream::StreamExt,
std::{future::Future},
tc_consensus::ParaId,
};

pub type ProfileId = <dancebox_runtime::Runtime as pallet_data_preservers::Config>::ProfileId;
girazoki marked this conversation as resolved.
Show resolved Hide resolved

async fn try_fut<T, E>(fut: impl Future<Output = Result<T, E>>) -> Result<T, E> {
fut.await
}

/// Watch assignements by indefinitly listening to finalized block notifications and switching to
/// the chain the profile is assigned to.
pub async fn task_watch_assignment<S: TSelectSyncMode>(
girazoki marked this conversation as resolved.
Show resolved Hide resolved
spawner: ContainerChainSpawner<S>,
profile_id: ProfileId,
) {
use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment;

if let OrchestratorChainResult::Err(e) = try_fut(async move {
let orchestrator_chain_interface = spawner.params.orchestrator_chain_interface.clone();

let mut current_assignment = DataPreserverAssignment::<ParaId>::NotAssigned;

let mut stream = orchestrator_chain_interface
girazoki marked this conversation as resolved.
Show resolved Hide resolved
.finality_notification_stream()
.await?;

while let Some(header) = stream.next().await {
let hash = header.hash();

let new_assignment = orchestrator_chain_interface
.data_preserver_active_assignment(hash, profile_id)
.await?;

log::info!("Assignement for block {hash}: {new_assignment:?}");

match (current_assignment, new_assignment) {
// no change
(x, y) if x == y => continue,
(
Assignment::NotAssigned | Assignment::Inactive(_),
Assignment::Active(para_id),
) => {
spawner.spawn(para_id, false).await;
}
(Assignment::Active(para_id), Assignment::Inactive(x)) if para_id == x => {
girazoki marked this conversation as resolved.
Show resolved Hide resolved
spawner.stop(para_id, true); // keep db
}
(Assignment::Active(para_id), _) => {
spawner.stop(para_id, false); // don't keep db
}
// don't do anything yet
(
Assignment::NotAssigned | Assignment::Inactive(_),
Assignment::NotAssigned | Assignment::Inactive(_),
) => (),
}

current_assignment = new_assignment;
}

Ok(())
})
.await
{
log::error!("Error in data preservers assignement watching task: {e:?}");
}
}
1 change: 1 addition & 0 deletions client/service-container-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

pub mod chain_spec;
pub mod cli;
pub mod data_preservers;
pub mod monitor;
pub mod rpc;
pub mod service;
Expand Down
15 changes: 9 additions & 6 deletions client/service-container-chain/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,19 @@ async fn try_spawn<SelectSyncMode: TSelectSyncMode>(
if !start_collation {
nanocryk marked this conversation as resolved.
Show resolved Hide resolved
collation_params = None;

log::info!("This is a syncing container chain, using random ports");
log::info!("This is a syncing container chain, using random ports unless they are explicitly provided as CLI args");
// Use random ports to avoid conflicts with the other running container chain
// Don't override provided port.
// TODO: How does this prevent conflicts? All containers will have the same ports.
nanocryk marked this conversation as resolved.
Show resolved Hide resolved
let random_ports = [23456, 23457, 23458];

container_chain_cli
.base
.base
.prometheus_params
.prometheus_port = Some(random_ports[0]);
container_chain_cli.base.base.network_params.port = Some(random_ports[1]);
container_chain_cli.base.base.rpc_port = Some(random_ports[2]);
.prometheus_port.get_or_insert(random_ports[0]);
container_chain_cli.base.base.network_params.port.get_or_insert(random_ports[1]);
container_chain_cli.base.base.rpc_port.get_or_insert(random_ports[2]);
}

let validator = collation_params.is_some();
Expand Down Expand Up @@ -535,7 +538,7 @@ impl<SelectSyncMode: TSelectSyncMode> ContainerChainSpawner<SelectSyncMode> {
/// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
/// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
/// `handle_update_assignment`.
async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
pub async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
let try_spawn_params = self.params.clone();
let state = self.state.clone();
let state2 = state.clone();
Expand Down Expand Up @@ -566,7 +569,7 @@ impl<SelectSyncMode: TSelectSyncMode> ContainerChainSpawner<SelectSyncMode> {
/// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
/// to ensure that the container chain has fully stopped. The database path can be `None` if the
/// chain was not running.
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
pub fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
let mut state = self.state.lock().expect("poison error");
let stop_handle = state
.spawned_container_chains
Expand Down
60 changes: 39 additions & 21 deletions container-chains/nodes/simple/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ pub enum Subcommand {

/// Precompile the WASM runtime into native code
PrecompileWasm(sc_cli::PrecompileWasmCmd),

/// Starts in RPC provider mode, watching orchestrator chain for assignements to provide
/// RPC services for container chains.
RpcProvider(RpcProviderSubcommand),
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -115,13 +111,48 @@ pub struct Cli {
#[arg(long)]
pub no_hardware_benchmarks: bool,

/// Relay chain arguments
#[arg(raw = true)]
pub relay_chain_args: Vec<String>,

/// Optional parachain id that should be used to build chain spec.
#[arg(long)]
pub para_id: Option<u32>,

/// Profile id associated with the node, whose assignements will be followed to provide RPC services.
#[arg(long)]
pub rpc_provider_profile_id: Option<u64>,

/// Endpoints to connect to orchestrator nodes, avoiding to start a local orchestrator node.
/// If this list is empty, a local embeded orchestrator node is started.
#[arg(long)]
pub orchestrator_endpoints: Vec<Url>,

/// Relay chain arguments, optionally followed by "--" and container chain arguments
#[arg(raw = true)]
extra_args: Vec<String>,
}

impl Cli {
pub fn relaychain_args(&self) -> &[String] {
let (relay_chain_args, _) = self.split_extra_args_at_first_dashdash();

relay_chain_args
}

pub fn container_chain_args(&self) -> &[String] {
let (_, container_chain_args) = self.split_extra_args_at_first_dashdash();

container_chain_args
}

fn split_extra_args_at_first_dashdash(&self) -> (&[String], &[String]) {
let index_of_dashdash = self.extra_args.iter().position(|x| *x == "--");

if let Some(i) = index_of_dashdash {
let (container_chain_args, extra_extra) = self.extra_args.split_at(i);
(&extra_extra[1..], container_chain_args)
} else {
// Only relay chain args
(&self.extra_args, &[])
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -178,16 +209,3 @@ impl CliConfiguration for BuildSpecCmd {
Some(&self.base.node_key_params)
}
}

#[derive(Debug, clap::Parser)]
#[group(skip)]
pub struct RpcProviderSubcommand {
/// Endpoints to connect to orchestrator nodes, avoiding to start a local orchestrator node.
/// If this list is empty, a local embeded orchestrator node is started.
#[arg(long)]
pub orchestrator_endpoints: Vec<Url>,

/// Account associated with the node, whose assignements will be followed to provide RPC services.
#[arg(long)]
pub assignement_account: dp_core::AccountId,
}
Loading
Loading