From fe29823a6f21e549bfb5fc081cc30a041a7943e4 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 1 Apr 2022 14:13:31 +0200 Subject: [PATCH] Retry runtime calls if the proof is invalid --- bin/light-base/src/json_rpc_service.rs | 105 +++++++------- .../src/json_rpc_service/chain_head.rs | 16 ++- bin/light-base/src/runtime_service.rs | 114 +++++++++------ bin/light-base/src/sync_service/parachain.rs | 97 +++++++------ bin/light-base/src/transactions_service.rs | 136 +++++++++--------- 5 files changed, 264 insertions(+), 204 deletions(-) diff --git a/bin/light-base/src/json_rpc_service.rs b/bin/light-base/src/json_rpc_service.rs index 9023f479be..e2d49e1495 100644 --- a/bin/light-base/src/json_rpc_service.rs +++ b/bin/light-base/src/json_rpc_service.rs @@ -1345,7 +1345,7 @@ impl Background { } }; - let (runtime_call_lock, virtual_machine) = precall + let (mut runtime_call_lock, mut virtual_machine) = precall .start( function_to_call, call_parameters.clone(), @@ -1356,56 +1356,65 @@ impl Background { .await .unwrap(); // TODO: don't unwrap - // Now that we have obtained the virtual machine, we can perform the call. - // This is a CPU-only operation that executes the virtual machine. - // The virtual machine might access the storage. - // TODO: finish doc - - let mut runtime_call = match read_only_runtime_host::run(read_only_runtime_host::Config { - virtual_machine, - function_to_call, - parameter: call_parameters, - }) { - Ok(vm) => vm, - Err((err, prototype)) => { - runtime_call_lock.unlock(prototype); - return Err(RuntimeCallError::StartError(err)); - } - }; - loop { - match runtime_call { - read_only_runtime_host::RuntimeHostVm::Finished(Ok(success)) => { - let output = success.virtual_machine.value().as_ref().to_vec(); - runtime_call_lock.unlock(success.virtual_machine.into_prototype()); - break Ok(output); - } - read_only_runtime_host::RuntimeHostVm::Finished(Err(error)) => { - runtime_call_lock.unlock(error.prototype); - break Err(RuntimeCallError::ReadOnlyRuntime(error.detail)); - } - read_only_runtime_host::RuntimeHostVm::StorageGet(get) => { - let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) { - Ok(v) => v, - Err(err) => { - runtime_call_lock.unlock( + // Now that we have obtained the virtual machine, we can perform the call. + // This is a CPU-only operation that executes the virtual machine. + // The virtual machine might access the storage. + // TODO: finish doc + + let mut runtime_call = + match read_only_runtime_host::run(read_only_runtime_host::Config { + virtual_machine, + function_to_call, + parameter: call_parameters.clone(), + }) { + Ok(vm) => vm, + Err((err, prototype)) => { + runtime_call_lock.unlock_finish(prototype); + return Err(RuntimeCallError::StartError(err)); + } + }; + + loop { + match runtime_call { + read_only_runtime_host::RuntimeHostVm::Finished(Ok(success)) => { + let output = success.virtual_machine.value().as_ref().to_vec(); + runtime_call_lock.unlock_finish(success.virtual_machine.into_prototype()); + return Ok(output); + } + read_only_runtime_host::RuntimeHostVm::Finished(Err(error)) => { + runtime_call_lock.unlock_finish(error.prototype); + return Err(RuntimeCallError::ReadOnlyRuntime(error.detail)); + } + read_only_runtime_host::RuntimeHostVm::StorageGet(get) => { + if let Ok(storage_value) = + runtime_call_lock.storage_entry(&get.key_as_vec()) + { + runtime_call = get.inject_value(storage_value.map(iter::once)); + continue; + } + + let (new_lock, new_vm) = runtime_call_lock + .unlock_retry( read_only_runtime_host::RuntimeHostVm::StorageGet(get) .into_prototype(), - ); - break Err(RuntimeCallError::Call(err)); - } - }; - runtime_call = get.inject_value(storage_value.map(iter::once)); - } - read_only_runtime_host::RuntimeHostVm::NextKey(nk) => { - // TODO: - runtime_call_lock.unlock( - read_only_runtime_host::RuntimeHostVm::NextKey(nk).into_prototype(), - ); - break Err(RuntimeCallError::NextKeyForbidden); - } - read_only_runtime_host::RuntimeHostVm::StorageRoot(storage_root) => { - runtime_call = storage_root.resume(runtime_call_lock.block_storage_root()); + ) + .await + .unwrap(); // TODO: don't unwrap + runtime_call_lock = new_lock; + virtual_machine = new_vm; + break; + } + read_only_runtime_host::RuntimeHostVm::NextKey(nk) => { + // TODO: + runtime_call_lock.unlock_finish( + read_only_runtime_host::RuntimeHostVm::NextKey(nk).into_prototype(), + ); + return Err(RuntimeCallError::NextKeyForbidden); + } + read_only_runtime_host::RuntimeHostVm::StorageRoot(storage_root) => { + runtime_call = storage_root.resume(runtime_call_lock.block_storage_root()); + } } } } diff --git a/bin/light-base/src/json_rpc_service/chain_head.rs b/bin/light-base/src/json_rpc_service/chain_head.rs index 778f09349b..e54b069bca 100644 --- a/bin/light-base/src/json_rpc_service/chain_head.rs +++ b/bin/light-base/src/json_rpc_service/chain_head.rs @@ -188,7 +188,7 @@ impl Background { storage_top_trie_changes: Default::default(), }) { Err((error, prototype)) => { - runtime_call_lock.unlock(prototype); + runtime_call_lock.unlock_finish(prototype); methods::ServerToClient::chainHead_unstable_callEvent { subscription: (&subscription_id).into(), result: methods::ChainHeadCallEvent::Error { @@ -203,8 +203,9 @@ impl Background { runtime_host::RuntimeHostVm::Finished(Ok(success)) => { let output = success.virtual_machine.value().as_ref().to_owned(); - runtime_call_lock - .unlock(success.virtual_machine.into_prototype()); + runtime_call_lock.unlock_finish( + success.virtual_machine.into_prototype(), + ); break methods::ServerToClient::chainHead_unstable_callEvent { subscription: (&subscription_id).into(), result: methods::ChainHeadCallEvent::Done { @@ -214,7 +215,7 @@ impl Background { .to_json_call_object_parameters(None); } runtime_host::RuntimeHostVm::Finished(Err(error)) => { - runtime_call_lock.unlock(error.prototype); + runtime_call_lock.unlock_finish(error.prototype); break methods::ServerToClient::chainHead_unstable_callEvent { subscription: (&subscription_id).into(), result: methods::ChainHeadCallEvent::Error { @@ -230,7 +231,8 @@ impl Background { { Ok(v) => v, Err(error) => { - runtime_call_lock.unlock( + // TODO: call unlock_retry instead of ending with error + runtime_call_lock.unlock_finish( runtime_host::RuntimeHostVm::StorageGet( get, ) @@ -250,7 +252,7 @@ impl Background { } runtime_host::RuntimeHostVm::NextKey(nk) => { // TODO: implement somehow - runtime_call_lock.unlock( + runtime_call_lock.unlock_finish( runtime_host::RuntimeHostVm::NextKey(nk) .into_prototype(), ); @@ -264,7 +266,7 @@ impl Background { } runtime_host::RuntimeHostVm::PrefixKeys(nk) => { // TODO: implement somehow - runtime_call_lock.unlock( + runtime_call_lock.unlock_finish( runtime_host::RuntimeHostVm::PrefixKeys(nk) .into_prototype(), ); diff --git a/bin/light-base/src/runtime_service.rs b/bin/light-base/src/runtime_service.rs index 5d3727f73f..0e9e5b96f6 100644 --- a/bin/light-base/src/runtime_service.rs +++ b/bin/light-base/src/runtime_service.rs @@ -725,46 +725,55 @@ impl<'a, TPlat: Platform> RuntimeLock<'a, TPlat> { ) -> Result<(RuntimeCallLock<'a>, executor::host::HostVmPrototype), RuntimeCallError> { // TODO: DRY :-/ this whole thing is messy - // Perform the call proof request. - // Note that `guarded` is not locked. - // TODO: there's no way to verify that the call proof is actually correct; we have to ban the peer and restart the whole call process if it turns out that it's not - // TODO: also, an empty proof will be reported as an error right now, which is weird - let call_proof = self - .service - .sync_service - .clone() - .call_proof_query( - self.block_number, - protocol::CallProofRequestConfig { - block_hash: self.hash, - method, - parameter_vectored: parameter_vectored.clone(), - }, - total_attempts, - timeout_per_request, - max_parallel, - ) - .await - .map_err(RuntimeCallError::CallProof); - - let (guarded, virtual_machine) = match self.runtime.runtime.as_ref() { - Ok(r) => { - let mut lock = r.virtual_machine.lock().await; - let vm = lock.take().unwrap(); - (lock, vm) - } - Err(err) => { - return Err(RuntimeCallError::InvalidRuntime(err.clone())); - } - }; + let mut attempts_remaining = total_attempts; + + while attempts_remaining > 0 { + attempts_remaining -= 1; + + // Perform the call proof request. + // Note that `guarded` is not locked. + // TODO: there's no way to verify that the call proof is actually correct; we have to ban the peer and restart the whole call process if it turns out that it's not + // TODO: also, an empty proof will be reported as an error right now, which is weird + let call_proof = self + .service + .sync_service + .clone() + .call_proof_query( + self.block_number, + protocol::CallProofRequestConfig { + block_hash: self.hash, + method, + parameter_vectored: parameter_vectored.clone(), + }, + 1, // TODO: we do this as `total_attempts` is enforced manually; this can only be done properly after a refactoring of `call_proof_query` + timeout_per_request, + max_parallel, + ) + .await + .map_err(RuntimeCallError::CallProof); + + let (guarded, virtual_machine) = match self.runtime.runtime.as_ref() { + Ok(r) => { + let mut lock = r.virtual_machine.lock().await; + let vm = lock.take().unwrap(); + (lock, vm) + } + Err(err) => { + return Err(RuntimeCallError::InvalidRuntime(err.clone())); + } + }; - let lock = RuntimeCallLock { - guarded, - block_state_root_hash: self.block_state_root_hash, - call_proof, - }; + let lock = RuntimeCallLock { + guarded, + block_state_root_hash: self.block_state_root_hash, + call_proof, + attempts_remaining, + }; - Ok((lock, virtual_machine)) + return Ok((lock, virtual_machine)); + } + + todo!() // Err(RuntimeCallError::) } } @@ -774,6 +783,7 @@ pub struct RuntimeCallLock<'a> { guarded: MutexGuard<'a, Option>, block_state_root_hash: [u8; 32], call_proof: Result>, RuntimeCallError>, + attempts_remaining: u32, } impl<'a> RuntimeCallLock<'a> { @@ -862,10 +872,34 @@ impl<'a> RuntimeCallLock<'a> { Ok(output.into_iter()) } - /// End the runtime call. + /// End the runtime call, then . + /// + /// The provided runtime is put back into the state of the [`RuntimeService`] while the + /// networking request is in progress, then extracted again. + pub async fn unlock_retry( + mut self, + vm: executor::host::HostVmPrototype, + ) -> Result<(RuntimeCallLock<'a>, executor::host::HostVmPrototype), RuntimeCallError> { + // We're destroying `self` below. Let's first extract the fields we need. + let block_state_root_hash = self.block_state_root_hash; + let attempts_remaining = self.attempts_remaining; + + // Unlock everything. + debug_assert!(self.guarded.is_none()); + *self.guarded = Some(vm); + drop(self); + + if attempts_remaining == 0 { + // TODO: return Err() + } + + todo!() + } + + /// End the runtime call, either after a success or because we are giving up. /// /// This method **must** be called. - pub fn unlock(mut self, vm: executor::host::HostVmPrototype) { + pub fn unlock_finish(mut self, vm: executor::host::HostVmPrototype) { debug_assert!(self.guarded.is_none()); *self.guarded = Some(vm); } diff --git a/bin/light-base/src/sync_service/parachain.rs b/bin/light-base/src/sync_service/parachain.rs index a603928f62..03f5699b09 100644 --- a/bin/light-base/src/sync_service/parachain.rs +++ b/bin/light-base/src/sync_service/parachain.rs @@ -520,7 +520,7 @@ async fn parahead( let precall = relay_chain_sync .pinned_block_runtime_lock(subscription_id, block_hash) .await; - let (runtime_call_lock, virtual_machine) = precall + let (mut runtime_call_lock, mut virtual_machine) = precall .start( para::PERSISTED_VALIDATION_FUNCTION_NAME, para::persisted_validation_data_parameters( @@ -536,52 +536,59 @@ async fn parahead( // TODO: move the logic below in the `para` module - let mut runtime_call = match read_only_runtime_host::run(read_only_runtime_host::Config { - virtual_machine, - function_to_call: para::PERSISTED_VALIDATION_FUNCTION_NAME, - parameter: para::persisted_validation_data_parameters( - parachain_id, - para::OccupiedCoreAssumption::TimedOut, - ), - }) { - Ok(vm) => vm, - Err((err, prototype)) => { - runtime_call_lock.unlock(prototype); - return Err(ParaheadError::StartError(err)); - } - }; - - let output = loop { - match runtime_call { - read_only_runtime_host::RuntimeHostVm::Finished(Ok(success)) => { - let output = success.virtual_machine.value().as_ref().to_owned(); - runtime_call_lock.unlock(success.virtual_machine.into_prototype()); - break output; - } - read_only_runtime_host::RuntimeHostVm::Finished(Err(error)) => { - runtime_call_lock.unlock(error.prototype); - return Err(ParaheadError::ReadOnlyRuntime(error.detail)); + let output = 'outer: loop { + let mut runtime_call = match read_only_runtime_host::run(read_only_runtime_host::Config { + virtual_machine, + function_to_call: para::PERSISTED_VALIDATION_FUNCTION_NAME, + parameter: para::persisted_validation_data_parameters( + parachain_id, + para::OccupiedCoreAssumption::TimedOut, + ), + }) { + Ok(vm) => vm, + Err((err, prototype)) => { + runtime_call_lock.unlock_finish(prototype); + return Err(ParaheadError::StartError(err)); } - read_only_runtime_host::RuntimeHostVm::StorageGet(get) => { - let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) { - Ok(v) => v, - Err(err) => { - runtime_call_lock.unlock( - read_only_runtime_host::RuntimeHostVm::StorageGet(get).into_prototype(), - ); - return Err(ParaheadError::Call(err)); + }; + + loop { + match runtime_call { + read_only_runtime_host::RuntimeHostVm::Finished(Ok(success)) => { + let output = success.virtual_machine.value().as_ref().to_owned(); + runtime_call_lock.unlock_finish(success.virtual_machine.into_prototype()); + break 'outer output; + } + read_only_runtime_host::RuntimeHostVm::Finished(Err(error)) => { + runtime_call_lock.unlock_finish(error.prototype); + return Err(ParaheadError::ReadOnlyRuntime(error.detail)); + } + read_only_runtime_host::RuntimeHostVm::StorageGet(get) => { + if let Ok(storage_value) = runtime_call_lock.storage_entry(&get.key_as_vec()) { + runtime_call = get.inject_value(storage_value.map(iter::once)); + continue; } - }; - runtime_call = get.inject_value(storage_value.map(iter::once)); - } - read_only_runtime_host::RuntimeHostVm::NextKey(nk) => { - // TODO: - runtime_call_lock - .unlock(read_only_runtime_host::RuntimeHostVm::NextKey(nk).into_prototype()); - return Err(ParaheadError::NextKeyForbidden); - } - read_only_runtime_host::RuntimeHostVm::StorageRoot(storage_root) => { - runtime_call = storage_root.resume(runtime_call_lock.block_storage_root()); + + let (new_lock, new_vm) = runtime_call_lock + .unlock_retry( + read_only_runtime_host::RuntimeHostVm::StorageGet(get).into_prototype(), + ) + .await + .map_err(ParaheadError::Call)?; + runtime_call_lock = new_lock; + virtual_machine = new_vm; + break; + } + read_only_runtime_host::RuntimeHostVm::NextKey(nk) => { + // TODO: + runtime_call_lock.unlock_finish( + read_only_runtime_host::RuntimeHostVm::NextKey(nk).into_prototype(), + ); + return Err(ParaheadError::NextKeyForbidden); + } + read_only_runtime_host::RuntimeHostVm::StorageRoot(storage_root) => { + runtime_call = storage_root.resume(runtime_call_lock.block_storage_root()); + } } } }; diff --git a/bin/light-base/src/transactions_service.rs b/bin/light-base/src/transactions_service.rs index f6f7887a63..24663fd6e8 100644 --- a/bin/light-base/src/transactions_service.rs +++ b/bin/light-base/src/transactions_service.rs @@ -1079,7 +1079,7 @@ async fn validate_transaction( ); let block_hash = *runtime_lock.block_hash(); - let (runtime_call_lock, runtime) = runtime_lock + let (mut runtime_call_lock, mut runtime) = runtime_lock .start( validate::VALIDATION_FUNCTION_NAME, // TODO: don't hardcode v3 but determine parameters dynamically from the runtime @@ -1096,72 +1096,80 @@ async fn validate_transaction( .map_err(ValidateTransactionError::Call) .map_err(InvalidOrError::ValidateError)?; - let mut validation_in_progress = validate::validate_transaction(validate::Config { - runtime, - scale_encoded_header: block_scale_encoded_header, - scale_encoded_transaction: iter::once(scale_encoded_transaction), - source, - }); - loop { - match validation_in_progress { - validate::Query::Finished { - result: Ok(Ok(success)), - virtual_machine, - } => { - runtime_call_lock.unlock(virtual_machine); - break Ok(success); - } - validate::Query::Finished { - result: Ok(Err(invalid)), - virtual_machine, - } => { - runtime_call_lock.unlock(virtual_machine); - break Err(InvalidOrError::Invalid(invalid)); - } - validate::Query::Finished { - result: Err(error), - virtual_machine, - } => { - runtime_call_lock.unlock(virtual_machine); - break Err(InvalidOrError::ValidateError( - ValidateTransactionError::Validation(error), - )); - } - validate::Query::StorageGet(get) => { - let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) { - Ok(v) => v, - Err(err) => { - runtime_call_lock.unlock(validate::Query::StorageGet(get).into_prototype()); - return Err(InvalidOrError::ValidateError( - ValidateTransactionError::Call(err), - )); + let mut validation_in_progress = validate::validate_transaction(validate::Config { + runtime, + scale_encoded_header: block_scale_encoded_header, + scale_encoded_transaction: iter::once(scale_encoded_transaction.as_ref()), + source, + }); + + loop { + match validation_in_progress { + validate::Query::Finished { + result: Ok(Ok(success)), + virtual_machine, + } => { + runtime_call_lock.unlock_finish(virtual_machine); + return Ok(success); + } + validate::Query::Finished { + result: Ok(Err(invalid)), + virtual_machine, + } => { + runtime_call_lock.unlock_finish(virtual_machine); + return Err(InvalidOrError::Invalid(invalid)); + } + validate::Query::Finished { + result: Err(error), + virtual_machine, + } => { + runtime_call_lock.unlock_finish(virtual_machine); + return Err(InvalidOrError::ValidateError( + ValidateTransactionError::Validation(error), + )); + } + validate::Query::StorageGet(get) => { + if let Ok(storage_value) = runtime_call_lock.storage_entry(&get.key_as_vec()) { + validation_in_progress = get.inject_value(storage_value.map(iter::once)); + continue; } - }; - validation_in_progress = get.inject_value(storage_value.map(iter::once)); - } - validate::Query::NextKey(nk) => { - // TODO: - runtime_call_lock.unlock(validate::Query::NextKey(nk).into_prototype()); - break Err(InvalidOrError::ValidateError( - ValidateTransactionError::NextKeyForbidden, - )); - } - validate::Query::PrefixKeys(prefix) => { - // TODO: lots of allocations because I couldn't figure how to make this annoying borrow checker happy - let rq_prefix = prefix.prefix().as_ref().to_owned(); - let result = runtime_call_lock - .storage_prefix_keys_ordered(&rq_prefix) - .map(|i| i.map(|v| v.as_ref().to_owned()).collect::>()); - match result { - Ok(v) => validation_in_progress = prefix.inject_keys_ordered(v.into_iter()), - Err(err) => { - runtime_call_lock - .unlock(validate::Query::PrefixKeys(prefix).into_prototype()); - return Err(InvalidOrError::ValidateError( - ValidateTransactionError::Call(err), - )); + + let (new_lock, new_vm) = runtime_call_lock + .unlock_retry(validate::Query::StorageGet(get).into_prototype()) + .await + .map_err(ValidateTransactionError::Call) + .map_err(InvalidOrError::ValidateError)?; + runtime_call_lock = new_lock; + runtime = new_vm; + break; + } + validate::Query::NextKey(nk) => { + // TODO: + runtime_call_lock.unlock_finish(validate::Query::NextKey(nk).into_prototype()); + return Err(InvalidOrError::ValidateError( + ValidateTransactionError::NextKeyForbidden, + )); + } + validate::Query::PrefixKeys(prefix) => { + // TODO: lots of allocations because I couldn't figure how to make this annoying borrow checker happy + let rq_prefix = prefix.prefix().as_ref().to_owned(); + let result = runtime_call_lock + .storage_prefix_keys_ordered(&rq_prefix) + .map(|i| i.map(|v| v.as_ref().to_owned()).collect::>()); + if let Ok(v) = result { + validation_in_progress = prefix.inject_keys_ordered(v.into_iter()); + continue; } + + let (new_lock, new_vm) = runtime_call_lock + .unlock_retry(validate::Query::PrefixKeys(prefix).into_prototype()) + .await + .map_err(ValidateTransactionError::Call) + .map_err(InvalidOrError::ValidateError)?; + runtime_call_lock = new_lock; + runtime = new_vm; + break; } } }