Skip to content

Commit

Permalink
Pvf refactor execute worker errors follow up (#4071)
Browse files Browse the repository at this point in the history
follow up of #2604
closes #2604

- [x] take relevant changes from Marcin's PR 
- [x] extract common duplicate code for workers (low-hanging fruits)

~Some unpassed ci problems are more general and should be fixed in
master (see #4074

Proposed labels: **T0-node**, **R0-silent**, **I4-refactor**

-----

kusama address: FZXVQLqLbFV2otNXs6BMnNch54CFJ1idpWwjMb3Z8fTLQC6

---------

Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com>
  • Loading branch information
maksimryndin and s0me0ne-unkn0wn authored Apr 19, 2024
1 parent eba3dec commit 4eabe5e
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 287 deletions.
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions polkadot/node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ cfg-if = "1.0"
futures = "0.3.30"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
is_executable = "1.0.1"
libc = "0.2.152"
is_executable = { version = "1.0.1", optional = true }
pin-project = "1.0.9"
rand = "0.8.5"
slotmap = "1.0"
tempfile = "3.3.0"
thiserror = { workspace = true }
tokio = { version = "1.24.2", features = ["fs", "process"] }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
parity-scale-codec = { version = "3.6.1", default-features = false, features = [
"derive",
] }

polkadot-parachain-primitives = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
Expand All @@ -37,14 +38,16 @@ polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-primitives = { path = "../../../primitives" }

sp-core = { path = "../../../../substrate/primitives/core" }
sp-wasm-interface = { path = "../../../../substrate/primitives/wasm-interface" }
sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob" }
sp-maybe-compressed-blob = { path = "../../../../substrate/primitives/maybe-compressed-blob", optional = true }
polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker", optional = true }
polkadot-node-core-pvf-execute-worker = { path = "execute-worker", optional = true }

[dev-dependencies]
assert_matches = "1.4.0"
criterion = { version = "0.4.0", default-features = false, features = ["async_tokio", "cargo_bench_support"] }
criterion = { version = "0.4.0", default-features = false, features = [
"async_tokio",
"cargo_bench_support",
] }
hex-literal = "0.4.1"

polkadot-node-core-pvf-common = { path = "common", features = ["test-utils"] }
Expand All @@ -57,6 +60,7 @@ adder = { package = "test-parachain-adder", path = "../../../parachain/test-para
halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" }

[target.'cfg(target_os = "linux")'.dev-dependencies]
libc = "0.2.153"
procfs = "0.16.0"
rusty-fork = "0.3.0"
sc-sysinfo = { path = "../../../../substrate/client/sysinfo" }
Expand All @@ -70,6 +74,8 @@ ci-only-tests = []
jemalloc-allocator = ["polkadot-node-core-pvf-common/jemalloc-allocator"]
# This feature is used to export test code to other crates without putting it in the production build.
test-utils = [
"polkadot-node-core-pvf-execute-worker",
"polkadot-node-core-pvf-prepare-worker",
"dep:is_executable",
"dep:polkadot-node-core-pvf-execute-worker",
"dep:polkadot-node-core-pvf-prepare-worker",
"dep:sp-maybe-compressed-blob",
]
7 changes: 4 additions & 3 deletions polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ license.workspace = true
workspace = true

[dependencies]
cfg-if = "1.0"
cpu-time = "1.0.0"
futures = "0.3.30"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.152"
nix = { version = "0.27.1", features = ["resource", "sched"] }
thiserror = { workspace = true }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
parity-scale-codec = { version = "3.6.1", default-features = false, features = [
"derive",
] }

polkadot-parachain-primitives = { path = "../../../../parachain" }
polkadot-primitives = { path = "../../../../primitives" }
Expand All @@ -34,7 +36,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }

[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
nix = { version = "0.27.1", features = ["sched"] }

[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
seccompiler = "0.4.0"
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum InternalValidationError {
/// Could not find or open compiled artifact file.
#[error("validation: could not find or open compiled artifact file: {0}")]
CouldNotOpenFile(String),
/// Could not create a pipe between the worker and a child process.
#[error("validation: could not create pipe: {0}")]
CouldNotCreatePipe(String),
/// Host could not clear the worker cache after a job.
#[error("validation: host could not clear the worker cache ({path:?}) after a job: {err}")]
CouldNotClearWorkerDir {
Expand Down
36 changes: 20 additions & 16 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,36 @@ pub struct Handshake {

/// The response from the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum WorkerResponse {
/// The job completed successfully.
Ok {
/// The result of parachain validation.
result_descriptor: ValidationResult,
/// The amount of CPU time taken by the job.
duration: Duration,
},
/// The candidate is invalid.
InvalidCandidate(String),
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
RuntimeConstruction(String),
pub struct WorkerResponse {
/// The response from the execute job process.
pub job_response: JobResponse,
/// The amount of CPU time taken by the job.
pub duration: Duration,
}

/// An error occurred in the worker process.
#[derive(thiserror::Error, Debug, Clone, Encode, Decode)]
pub enum WorkerError {
/// The job timed out.
#[error("The job timed out")]
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious.
#[error("The job process (pid {job_pid}) has died: {err}")]
JobDied { err: String, job_pid: i32 },
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious.
JobError(String),
#[error("An unexpected error occurred in the job process: {0}")]
JobError(#[from] JobError),

/// Some internal error occurred.
InternalError(InternalValidationError),
#[error("An internal error occurred: {0}")]
InternalError(#[from] InternalValidationError),
}

/// The result of a job on the execution worker.
Expand Down Expand Up @@ -101,7 +102,7 @@ impl JobResponse {
/// An unexpected error occurred in the execution job process. Because this comes from the job,
/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
/// cannot raise an internal error based on this.
#[derive(thiserror::Error, Debug, Encode, Decode)]
#[derive(thiserror::Error, Clone, Debug, Encode, Decode)]
pub enum JobError {
#[error("The job timed out")]
TimedOut,
Expand All @@ -114,4 +115,7 @@ pub enum JobError {
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
/// Since the job can return any exit status it wants, we have to treat this as untrusted.
#[error("Unexpected exit status: {0}")]
UnexpectedExitStatus(i32),
}
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Contains functionality related to PVFs that is shared by the PVF host and the PVF workers.
#![deny(unused_crate_dependencies)]

pub mod error;
pub mod execute;
Expand Down
7 changes: 1 addition & 6 deletions polkadot/node/core/pvf/common/src/pvf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@ use crate::prepare::PrepareJobKind;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParams;
use std::{
cmp::{Eq, PartialEq},
fmt,
sync::Arc,
time::Duration,
};
use std::{fmt, sync::Arc, time::Duration};

/// A struct that carries the exhaustive set of data to prepare an artifact out of plain
/// Wasm binary
Expand Down
84 changes: 80 additions & 4 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

pub mod security;

use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET};
use crate::{
framed_recv_blocking, framed_send_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::never::Never;
use parity_scale_codec::Decode;
use nix::{errno::Errno, sys::resource::Usage};
use parity_scale_codec::{Decode, Encode};
use std::{
any::Any,
fmt::{self},
Expand Down Expand Up @@ -58,8 +61,6 @@ macro_rules! decl_worker_main {

$crate::sp_tracing::try_init_simple();

let worker_pid = std::process::id();

let args = std::env::args().collect::<Vec<_>>();
if args.len() == 1 {
print_help($expected_command);
Expand Down Expand Up @@ -548,6 +549,81 @@ fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake>
Ok(worker_handshake)
}

/// Calculate the total CPU time from the given `usage` structure, returned from
/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
/// and system time.
///
/// # Arguments
///
/// - `rusage`: Contains resource usage information.
///
/// # Returns
///
/// Returns a `Duration` representing the total CPU time.
pub fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;

return Duration::from_micros(micros)
}

/// Get a job response.
pub fn recv_child_response<T>(
received_data: &mut io::BufReader<&[u8]>,
context: &'static str,
) -> io::Result<T>
where
T: Decode,
{
let response_bytes = framed_recv_blocking(received_data)?;
T::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("{} pvf recv_child_response: decode error: {}", context, e),
)
})
}

pub fn send_result<T, E>(
stream: &mut UnixStream,
result: Result<T, E>,
worker_info: &WorkerInfo,
) -> io::Result<()>
where
T: std::fmt::Debug,
E: std::fmt::Debug + std::fmt::Display,
Result<T, E>: Encode,
{
if let Err(ref err) = result {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred: {}",
err
);
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);

framed_send_blocking(stream, &result.encode()).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred sending result to host: {}",
err
);
err
})
}

pub fn stringify_errno(context: &'static str, errno: Errno) -> String {
format!("{}: {}: {}", context, errno, io::Error::last_os_error())
}

/// Functionality related to threads spawned by the workers.
///
/// The motivation for this module is to coordinate worker threads without using async Rust.
Expand Down
Loading

0 comments on commit 4eabe5e

Please sign in to comment.