Skip to content

Commit

Permalink
spawner mock
Browse files Browse the repository at this point in the history
  • Loading branch information
nanocryk committed Sep 3, 2024
1 parent cd35c26 commit 20fdcf7
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions client/service-container-chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ cumulus-relay-chain-interface = { workspace = true }
nimbus-consensus = { workspace = true }
nimbus-primitives = { workspace = true }

[dev-dependencies]
polkadot-overseer = { workspace = true }
tokio-stream = { workspace = true }

[build-dependencies]
substrate-build-script-utils = { workspace = true }

Expand Down
270 changes: 264 additions & 6 deletions client/service-container-chain/src/data_preservers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Tanssi. If not, see <http://www.gnu.org/licenses/>

use {
crate::spawner::{ContainerChainSpawner, TSelectSyncMode},
crate::spawner::{ContainerChainSpawner, Spawner, TSelectSyncMode},
dc_orchestrator_chain_interface::{
DataPreserverAssignment, OrchestratorChainInterface, OrchestratorChainResult,
},
Expand All @@ -32,14 +32,11 @@ async fn try_fut<T, E>(fut: impl Future<Output = Result<T, E>>) -> Result<T, E>

/// Watch assignements by indefinitly listening to finalized block notifications and switching to
/// the chain the profile is assigned to.
pub async fn task_watch_assignment<S: TSelectSyncMode>(
spawner: ContainerChainSpawner<S>,
profile_id: ProfileId,
) {
pub async fn task_watch_assignment(spawner: impl Spawner, profile_id: ProfileId) {
use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment;

if let OrchestratorChainResult::Err(e) = try_fut(async move {
let orchestrator_chain_interface = spawner.params.orchestrator_chain_interface.clone();
let orchestrator_chain_interface = spawner.orchestrator_chain_interface();

let mut current_assignment = DataPreserverAssignment::<ParaId>::NotAssigned;

Expand Down Expand Up @@ -100,3 +97,264 @@ pub async fn task_watch_assignment<S: TSelectSyncMode>(
log::error!("Error in data preservers assignement watching task: {e:?}");
}
}

#[cfg(test)]
mod tests {
use {
super::*,
dc_orchestrator_chain_interface::{
BlockNumber, DataPreserverProfileId, OrchestratorChainError, PHash, PHeader,
},
dp_container_chain_genesis_data::ContainerChainGenesisData,
futures::{FutureExt, Stream},
polkadot_overseer::Handle,
sc_client_api::StorageProof,
sp_core::H256,
sp_runtime::offchain::storage::StorageValue,
std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
pin::Pin,
sync::{Arc, Mutex},
},
tokio::sync::{broadcast, oneshot},
};

struct MockChainInterface {
state: Mutex<MockChainInterfaceState>,
notification_sender: broadcast::Sender<PHeader>,
}

struct MockChainInterfaceState {
next_block_number: BlockNumber,
blocks: BTreeMap<H256, BlockAssignment>,
}

struct BlockAssignment {
assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>,
}

impl MockChainInterface {
fn new() -> Self {
Self {
state: Mutex::new(MockChainInterfaceState {
next_block_number: 0,
blocks: BTreeMap::new(),
}),

notification_sender: broadcast::Sender::new(100),
}
}

fn mock_block(&self, assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>) {
let mut state = self.state.lock().unwrap();
state.next_block_number += 1;

let header = PHeader {
parent_hash: H256::zero(),
number: state.next_block_number,
state_root: H256::zero(),
extrinsics_root: H256::zero(),
digest: Default::default(),
};
let hash = header.hash();

state.blocks.insert(hash, BlockAssignment { assignments });

self.notification_sender
.send(header)
.expect("to properly send block header");
}
}

#[async_trait::async_trait]
impl OrchestratorChainInterface for MockChainInterface {
fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
unimplemented!("not used in test")
}

async fn get_storage_by_key(
&self,
_orchestrator_parent: PHash,
_key: &[u8],
) -> OrchestratorChainResult<Option<Vec<u8>>> {
unimplemented!("not used in test")
}

async fn prove_read(
&self,
_orchestrator_parent: PHash,
_relevant_keys: &Vec<Vec<u8>>,
) -> OrchestratorChainResult<StorageProof> {
unimplemented!("not used in test")
}

async fn import_notification_stream(
&self,
) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
unimplemented!("not used in test")
}

async fn new_best_notification_stream(
&self,
) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
unimplemented!("not used in test")
}

async fn finality_notification_stream(
&self,
) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
let receiver = self.notification_sender.subscribe();
let stream = tokio_stream::wrappers::BroadcastStream::new(receiver)
.filter_map(|x| async { x.ok() });
let stream = Box::pin(stream);
Ok(stream)
}

async fn genesis_data(
&self,
_orchestrator_parent: PHash,
_para_id: ParaId,
) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
unimplemented!("not used in test")
}

async fn boot_nodes(
&self,
_orchestrator_parent: PHash,
_para_id: ParaId,
) -> OrchestratorChainResult<Vec<Vec<u8>>> {
unimplemented!("not used in test")
}

async fn latest_block_number(
&self,
_orchestrator_parent: PHash,
_para_id: ParaId,
) -> OrchestratorChainResult<Option<BlockNumber>> {
unimplemented!("not used in test")
}

async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
unimplemented!("not used in test")
}

async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
unimplemented!("not used in test")
}

async fn data_preserver_active_assignment(
&self,
orchestrator_parent: PHash,
profile_id: DataPreserverProfileId,
) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
let mut state = self.state.lock().unwrap();
let block = state.blocks.get_mut(&orchestrator_parent).ok_or_else(|| {
OrchestratorChainError::GenericError("this block is not mocked".into())
})?;

Ok(block
.assignments
.get(&profile_id)
.cloned()
.unwrap_or(DataPreserverAssignment::NotAssigned))
}
}

#[derive(Debug, PartialEq, Eq, Hash)]
enum SpawnerEvent {
Started(ParaId, bool),
Stopped(ParaId, bool),
}

#[derive(Clone)]
struct MockSpawner {
state: Arc<Mutex<HashSet<SpawnerEvent>>>,
chain_interface: Arc<MockChainInterface>,
}

impl MockSpawner {
fn new() -> Self {
Self {
state: Arc::new(Mutex::new(HashSet::new())),
chain_interface: Arc::new(MockChainInterface::new()),
}
}

fn set_expectations(&self, events: Vec<SpawnerEvent>) {
let mut set = self.state.lock().unwrap();

set.clear();

for e in events {
set.insert(e);
}
}

fn ensure_all_events_were_emitted(&self) {
let set = self.state.lock().unwrap();

assert!(set.is_empty(), "Not all events were emitted: {set:?}");
}
}

impl Spawner for MockSpawner {
fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface> {
self.chain_interface.clone()
}

/// Try to start a new container chain. In case of an error, this does not stop the node, and
/// the container chain will be attempted to spawn again when the collator is reassigned to it.
///
/// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
/// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
/// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
/// `handle_update_assignment`.
fn spawn(
&self,
container_chain_para_id: ParaId,
start_collation: bool,
) -> impl std::future::Future<Output = ()> + Send {
let mut set = self.state.lock().unwrap();

let event = SpawnerEvent::Started(container_chain_para_id, start_collation);

assert!(set.remove(&event), "Unexpected event {event:?}");

async {}
}

/// Stop a container chain. Prints a warning if the container chain was not running.
/// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
/// to ensure that the container chain has fully stopped. The database path can be `None` if the
/// chain was not running.
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
let mut set = self.state.lock().unwrap();

let event = SpawnerEvent::Stopped(container_chain_para_id, keep_db);

assert!(set.remove(&event), "Unexpected event {event:?}");

None
}
}

#[tokio::test]
async fn task_logic_works() {
let spawner = MockSpawner::new();

let profile_id = 0;
let para_id1 = ParaId::from(1);
let para_id2 = ParaId::from(2);

tokio::spawn(task_watch_assignment(spawner.clone(), profile_id));

spawner.set_expectations(vec![SpawnerEvent::Started(para_id1, false)]);
spawner.chain_interface.mock_block({
let mut map = BTreeMap::new();
map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
map
});
spawner.ensure_all_events_were_emitted();
}
}
38 changes: 35 additions & 3 deletions client/service-container-chain/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,45 @@ async fn try_spawn<SelectSyncMode: TSelectSyncMode>(
Ok(())
}

impl<SelectSyncMode: TSelectSyncMode> ContainerChainSpawner<SelectSyncMode> {
/// Interface for spawning and stopping container chain embeded nodes.
pub trait Spawner {
/// Access to the Orchestrator Chain Interface
fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface>;

/// Try to start a new container chain. In case of an error, this does not stop the node, and
/// the container chain will be attempted to spawn again when the collator is reassigned to it.
///
/// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
/// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
/// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
/// `handle_update_assignment`.
fn spawn(
&self,
container_chain_para_id: ParaId,
start_collation: bool,
) -> impl std::future::Future<Output = ()> + Send;

/// Stop a container chain. Prints a warning if the container chain was not running.
/// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
/// to ensure that the container chain has fully stopped. The database path can be `None` if the
/// chain was not running.
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf>;
}

impl<SelectSyncMode: TSelectSyncMode> Spawner for ContainerChainSpawner<SelectSyncMode> {
/// Access to the Orchestrator Chain Interface
fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface> {
self.params.orchestrator_chain_interface.clone()
}

/// Try to start a new container chain. In case of an error, this does not stop the node, and
/// the container chain will be attempted to spawn again when the collator is reassigned to it.
///
/// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
/// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
/// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
/// `handle_update_assignment`.
pub async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
let try_spawn_params = self.params.clone();
let state = self.state.clone();
let state2 = state.clone();
Expand Down Expand Up @@ -570,7 +600,7 @@ impl<SelectSyncMode: TSelectSyncMode> ContainerChainSpawner<SelectSyncMode> {
/// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
/// to ensure that the container chain has fully stopped. The database path can be `None` if the
/// chain was not running.
pub fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
let mut state = self.state.lock().expect("poison error");
let stop_handle = state
.spawned_container_chains
Expand Down Expand Up @@ -604,7 +634,9 @@ impl<SelectSyncMode: TSelectSyncMode> ContainerChainSpawner<SelectSyncMode> {
}
}
}
}

impl<SelectSyncMode: TSelectSyncMode> ContainerChainSpawner<SelectSyncMode> {
/// Receive and process `CcSpawnMsg`s indefinitely
pub async fn rx_loop(mut self, mut rx: mpsc::UnboundedReceiver<CcSpawnMsg>, validator: bool) {
// The node always starts as an orchestrator chain collator.
Expand Down

0 comments on commit 20fdcf7

Please sign in to comment.