Skip to content

Commit

Permalink
Merge pull request #396 from flavio/on-demand
Browse files Browse the repository at this point in the history
feat: introduce `PolicyEvaluatorPre`
flavio authored Dec 13, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents cc76232 + 8752bbd commit 130f6ec
Showing 21 changed files with 680 additions and 600 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -46,12 +46,16 @@ tracing = "0.1"
tracing-futures = "0.2"
url = { version = "2.2", features = ["serde"] }
validator = { version = "0.16", features = ["derive"] }
wapc = "1.1"
#wapc = "1.1"
wapc = { git = "https://github.com/flavio/wapc-rs/", branch = "expose-wasmtime-provider-pre" }
wasi-cap-std-sync = { workspace = true }
wasi-common = { workspace = true }
wasmparser = "0.118"
wasmtime = { workspace = true }
wasmtime-provider = { version = "1.12", features = ["cache"] }
#wasmtime-provider = { version = "1.12", features = ["cache"] }
wasmtime-provider = { git = "https://github.com/flavio/wapc-rs/", branch = "expose-wasmtime-provider-pre", features = [
"cache",
] }
wasmtime-wasi = { workspace = true }

[workspace.dependencies]
6 changes: 3 additions & 3 deletions crates/burrego/Cargo.toml
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ edition = "2021"

[dependencies]
base64 = "0.21.5"
chrono = { version = "0.4", default-features = false }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
chrono-tz = "0.8.4"
gtmpl = "0.7.1"
gtmpl_value = "0.5.1"
@@ -22,11 +22,11 @@ serde_json = "1.0.108"
serde_yaml = "0.9.27"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version= "0.3", features = ["fmt", "env-filter"] }
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
url = "2.2.2"
wasmtime = { workspace = true }

[dev-dependencies]
anyhow = "1.0"
assert-json-diff = "2.0.2"
clap = { version = "4.0", features = [ "derive" ] }
clap = { version = "4.0", features = ["derive"] }
12 changes: 0 additions & 12 deletions src/evaluation_context.rs
Original file line number Diff line number Diff line change
@@ -36,18 +36,6 @@ impl EvaluationContext {
self.ctx_aware_resources_allow_list
.contains(&wanted_resource)
}

/// Copy data from another `EvaluationContext` instance
pub(crate) fn copy_from(&mut self, other: &EvaluationContext) {
if self.policy_id == other.policy_id {
// The current evaluation context is about the very same policy
// There's nothing to be done
return;
}
self.policy_id = other.policy_id.clone();
self.callback_channel = other.callback_channel.clone();
self.ctx_aware_resources_allow_list = other.ctx_aware_resources_allow_list.clone();
}
}

impl fmt::Debug for EvaluationContext {
33 changes: 9 additions & 24 deletions src/policy_evaluator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod policy_evaluator_builder;
pub mod policy_evaluator_pre;

use anyhow::{anyhow, Result};
use kubewarden_policy_sdk::metadata::ProtocolVersion;
@@ -15,6 +16,8 @@ use crate::runtimes::wapc::Runtime as WapcRuntime;
use crate::runtimes::wasi_cli::Runtime as WasiRuntime;
use crate::runtimes::Runtime;

pub use policy_evaluator_pre::PolicyEvaluatorPre;

#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
pub enum PolicyExecutionMode {
#[serde(rename = "kubewarden-wapc")]
@@ -81,21 +84,11 @@ pub type PolicySettings = serde_json::Map<String, serde_json::Value>;

pub struct PolicyEvaluator {
runtime: Runtime,
worker_id: u64,
policy_id: String,
}

impl PolicyEvaluator {
pub(crate) fn new(policy_id: &str, worker_id: u64, runtime: Runtime) -> Self {
Self {
runtime,
worker_id,
policy_id: policy_id.to_owned(),
}
}

pub fn policy_id(&self) -> String {
self.policy_id.clone()
pub(crate) fn new(runtime: Runtime) -> Self {
Self { runtime }
}

#[tracing::instrument(skip(request, eval_ctx))]
@@ -107,10 +100,9 @@ impl PolicyEvaluator {
) -> AdmissionResponse {
match self.runtime {
Runtime::Wapc(ref mut wapc_stack) => {
wapc_stack.set_eval_ctx(eval_ctx);
WapcRuntime(wapc_stack).validate(settings, &request)
}
Runtime::Burrego(ref mut burrego_evaluator) => {
Runtime::Rego(ref mut burrego_evaluator) => {
let kube_ctx = burrego_evaluator.build_kubernetes_context(
eval_ctx.callback_channel.as_ref(),
&eval_ctx.ctx_aware_resources_allow_list,
@@ -126,12 +118,8 @@ impl PolicyEvaluator {
}
}

#[tracing::instrument(skip(eval_ctx))]
pub fn validate_settings(
&mut self,
settings: &PolicySettings,
eval_ctx: &EvaluationContext,
) -> SettingsValidationResponse {
#[tracing::instrument]
pub fn validate_settings(&mut self, settings: &PolicySettings) -> SettingsValidationResponse {
let settings_str = match serde_json::to_string(settings) {
Ok(settings) => settings,
Err(err) => {
@@ -144,10 +132,9 @@ impl PolicyEvaluator {

match self.runtime {
Runtime::Wapc(ref mut wapc_stack) => {
wapc_stack.set_eval_ctx(eval_ctx);
WapcRuntime(wapc_stack).validate_settings(settings_str)
}
Runtime::Burrego(ref mut burrego_evaluator) => {
Runtime::Rego(ref mut burrego_evaluator) => {
BurregoRuntime(burrego_evaluator).validate_settings(settings_str)
}
Runtime::Cli(ref mut cli_stack) => {
@@ -171,8 +158,6 @@ impl fmt::Debug for PolicyEvaluator {
let runtime = self.runtime.to_string();

f.debug_struct("PolicyEvaluator")
.field("policy_id", &self.policy_id)
.field("worker_id", &self.worker_id)
.field("runtime", &runtime)
.finish()
}
184 changes: 56 additions & 128 deletions src/policy_evaluator/policy_evaluator_builder.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use anyhow::{anyhow, Result};
use std::collections::BTreeSet;
use std::path::Path;
use tokio::sync::mpsc;
use wasmtime_provider::wasmtime;

use crate::callback_requests::CallbackRequest;
use crate::evaluation_context::EvaluationContext;
use crate::policy_evaluator::{PolicyEvaluator, PolicyExecutionMode};
use crate::policy_metadata::ContextAwareResource;
use crate::runtimes::{rego::BurregoStack, wapc::WapcStack, wasi_cli, Runtime};
use crate::policy_evaluator::{
policy_evaluator_pre::StackPre, PolicyEvaluatorPre, PolicyExecutionMode,
};
use crate::runtimes::{rego, wapc, wasi_cli};

/// Configure behavior of wasmtime [epoch-based interruptions](https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.epoch_interruption)
///
@@ -30,31 +27,18 @@ pub(crate) struct EpochDeadlines {
#[derive(Default)]
pub struct PolicyEvaluatorBuilder {
engine: Option<wasmtime::Engine>,
policy_id: String,
worker_id: u64,
policy_file: Option<String>,
policy_contents: Option<Vec<u8>>,
policy_module: Option<wasmtime::Module>,
execution_mode: Option<PolicyExecutionMode>,
callback_channel: Option<mpsc::Sender<CallbackRequest>>,
wasmtime_cache: bool,
epoch_deadlines: Option<EpochDeadlines>,
ctx_aware_resources_allow_list: BTreeSet<ContextAwareResource>,
}

impl PolicyEvaluatorBuilder {
/// Create a new PolicyEvaluatorBuilder object.
/// * `policy_id`: unique identifier of the policy. This is mostly relevant for PolicyServer.
/// In this case, this is the value used to identify the policy inside of the `policies.yml`
/// file
/// * `worker_id`: unique identifier of the worker that is going to evaluate the policy. This
/// is mostly relevant for PolicyServer
pub fn new(policy_id: String, worker_id: u64) -> PolicyEvaluatorBuilder {
PolicyEvaluatorBuilder {
policy_id,
worker_id,
..Default::default()
}
pub fn new() -> PolicyEvaluatorBuilder {
PolicyEvaluatorBuilder::default()
}

/// [`wasmtime::Engine`] instance to be used when creating the
@@ -63,6 +47,7 @@ impl PolicyEvaluatorBuilder {
/// **Warning:** when used, all the [`wasmtime::Engine`] specific settings
/// must be set by the caller when creating the engine.
/// This includes options like: cache, epoch counter
#[must_use]
pub fn engine(mut self, engine: wasmtime::Engine) -> Self {
self.engine = Some(engine);
self
@@ -81,6 +66,7 @@ impl PolicyEvaluatorBuilder {

/// Build the policy by using the Wasm object given via the `data` array.
/// Cannot be used at the same time as `policy_file`
#[must_use]
pub fn policy_contents(mut self, data: &[u8]) -> PolicyEvaluatorBuilder {
self.policy_contents = Some(data.to_owned());
self
@@ -89,32 +75,26 @@ impl PolicyEvaluatorBuilder {
/// Use a pre-built [`wasmtime::Module`] instance.
/// **Warning:** you must provide also the [`wasmtime::Engine`] used
/// to allocate the `Module`, otherwise the code will panic at runtime
#[must_use]
pub fn policy_module(mut self, module: wasmtime::Module) -> Self {
self.policy_module = Some(module);
self
}

/// Sets the policy execution mode
#[must_use]
pub fn execution_mode(mut self, mode: PolicyExecutionMode) -> PolicyEvaluatorBuilder {
self.execution_mode = Some(mode);
self
}

/// Enable Wasmtime cache feature
#[must_use]
pub fn enable_wasmtime_cache(mut self) -> PolicyEvaluatorBuilder {
self.wasmtime_cache = true;
self
}

/// Set the list of Kubernetes resources the policy can have access to
pub fn context_aware_resources_allowed(
mut self,
allowed_resources: BTreeSet<ContextAwareResource>,
) -> PolicyEvaluatorBuilder {
self.ctx_aware_resources_allow_list = allowed_resources;
self
}

/// Enable Wasmtime [epoch-based interruptions](wasmtime::Config::epoch_interruption) and set
/// the deadlines to be enforced
///
@@ -147,22 +127,6 @@ impl PolicyEvaluatorBuilder {
self
}

/// Specify the channel that is used by the synchronous world (the waPC `host_callback`
/// function) to obtain information that can be computed only from within a
/// tokio runtime.
///
/// Note well: if no channel is given, the policy will still be created, but
/// some waPC functions exposed by the host will not be available at runtime.
/// The policy evaluation will not fail because of that, but the guest will
/// get an error instead of the expected result.
pub fn callback_channel(
mut self,
channel: mpsc::Sender<CallbackRequest>,
) -> PolicyEvaluatorBuilder {
self.callback_channel = Some(channel);
self
}

/// Ensure the configuration provided to the build is correct
fn validate_user_input(&self) -> Result<()> {
if self.policy_file.is_some() && self.policy_contents.is_some() {
@@ -203,12 +167,41 @@ impl PolicyEvaluatorBuilder {
Ok(())
}

/// Create the instance of `PolicyEvaluator` to be used
pub fn build(&self) -> Result<PolicyEvaluator> {
/// Create the instance of `PolicyEvaluatorPre` to be used
pub fn build_pre(&self) -> Result<PolicyEvaluatorPre> {
self.validate_user_input()?;

let engine = self
.engine
let engine = self.build_engine()?;
let module = self.build_module(&engine)?;

let execution_mode = self.execution_mode.unwrap();

let stack_pre = match execution_mode {
PolicyExecutionMode::KubewardenWapc => {
let wapc_stack_pre = wapc::StackPre::new(engine, module, self.epoch_deadlines)?;
StackPre::from(wapc_stack_pre)
}
PolicyExecutionMode::Wasi => {
let wasi_stack_pre = wasi_cli::StackPre::new(engine, module, self.epoch_deadlines)?;
StackPre::from(wasi_stack_pre)
}
PolicyExecutionMode::Opa | PolicyExecutionMode::OpaGatekeeper => {
let rego_stack_pre = rego::StackPre::new(
engine,
module,
self.epoch_deadlines,
0, // currently the entrypoint is hard coded to this value
execution_mode.try_into()?,
);
StackPre::from(rego_stack_pre)
}
};

Ok(PolicyEvaluatorPre { stack_pre })
}

fn build_engine(&self) -> Result<wasmtime::Engine> {
self.engine
.as_ref()
.map_or_else(
|| {
@@ -224,105 +217,40 @@ impl PolicyEvaluatorBuilder {
},
|e| Ok(e.clone()),
)
.map_err(|e| anyhow!("cannot create wasmtime engine: {:?}", e))?;
.map_err(|e| anyhow!("cannot create wasmtime engine: {:?}", e))
}

let module: wasmtime::Module = if let Some(m) = &self.policy_module {
fn build_module(&self, engine: &wasmtime::Engine) -> Result<wasmtime::Module> {
if let Some(m) = &self.policy_module {
// it's fine to clone a Module, this is a cheap operation that just
// copies its internal reference. See wasmtime docs
m.clone()
Ok(m.clone())
} else {
match &self.policy_file {
Some(file) => wasmtime::Module::from_file(&engine, file),
None => wasmtime::Module::new(&engine, self.policy_contents.as_ref().unwrap()),
}?
};

let execution_mode = self.execution_mode.unwrap();

let runtime = match execution_mode {
PolicyExecutionMode::KubewardenWapc => create_wapc_runtime(
&self.policy_id,
engine,
module,
self.epoch_deadlines,
self.callback_channel.clone(),
&self.ctx_aware_resources_allow_list,
)?,
PolicyExecutionMode::Wasi => {
let cli_stack = wasi_cli::Stack::new(engine, module, self.epoch_deadlines)?;
Runtime::Cli(cli_stack)
Some(file) => wasmtime::Module::from_file(engine, file),
None => wasmtime::Module::new(engine, self.policy_contents.as_ref().unwrap()),
}
PolicyExecutionMode::Opa | PolicyExecutionMode::OpaGatekeeper => {
let mut builder = burrego::EvaluatorBuilder::default()
.engine(&engine)
.module(module)
.host_callbacks(crate::runtimes::rego::new_host_callbacks());

if let Some(deadlines) = self.epoch_deadlines {
builder = builder.enable_epoch_interruptions(deadlines.wapc_func);
}
let evaluator = builder.build()?;

Runtime::Burrego(BurregoStack {
evaluator,
entrypoint_id: 0, // currently hardcoded to this value
policy_execution_mode: execution_mode.try_into()?,
})
}
};

Ok(PolicyEvaluator::new(
&self.policy_id,
self.worker_id,
runtime,
))
}
}
}

fn create_wapc_runtime(
policy_id: &str,
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
callback_channel: Option<mpsc::Sender<CallbackRequest>>,
ctx_aware_resources_allow_list: &BTreeSet<ContextAwareResource>,
) -> Result<Runtime> {
let eval_ctx = EvaluationContext {
policy_id: policy_id.to_owned(),
callback_channel,
ctx_aware_resources_allow_list: ctx_aware_resources_allow_list.to_owned(),
};

let wapc_stack = WapcStack::new(engine, module, epoch_deadlines, eval_ctx)?;

Ok(Runtime::Wapc(wapc_stack))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn build_test() {
fn build_policy_evaluator_pre() {
let engine = wasmtime::Engine::default();
let wat = include_bytes!("../../test_data/endless_wasm/wapc_endless_loop.wat");
let module = wasmtime::Module::new(&engine, wat).expect("cannot compile WAT to wasm");

let policy_evaluator_builder = PolicyEvaluatorBuilder::new("test".to_string(), 1)
let policy_evaluator_builder = PolicyEvaluatorBuilder::new()
.execution_mode(PolicyExecutionMode::KubewardenWapc)
.policy_module(module)
.engine(engine)
.enable_wasmtime_cache()
.enable_epoch_interruptions(1, 2)
.callback_channel(mpsc::channel(1).0)
.context_aware_resources_allowed(BTreeSet::new());

let policy_evaluator = policy_evaluator_builder
.build()
.expect("cannot build policy");
.enable_epoch_interruptions(1, 2);

assert_eq!(policy_evaluator.policy_id(), "test");
assert_eq!(policy_evaluator.worker_id, 1);
assert!(matches!(policy_evaluator.runtime, Runtime::Wapc(_)));
_ = policy_evaluator_builder.build_pre().unwrap();
}
}
71 changes: 71 additions & 0 deletions src/policy_evaluator/policy_evaluator_pre.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use anyhow::Result;

use crate::evaluation_context::EvaluationContext;
use crate::policy_evaluator::PolicyEvaluator;
use crate::runtimes::{rego, wapc, wasi_cli, Runtime};

/// Holds pre-initialized stacks for all the types of policies we run
///
/// Pre-initialized instances are key to reduce the evaluation time when
/// using on-demand PolicyEvaluator instances; where on-demand means that
/// each validation request has a brand new PolicyEvaluator that is discarded
/// once the evaluation is done.
pub(crate) enum StackPre {
Wapc(crate::runtimes::wapc::StackPre),
Wasi(crate::runtimes::wasi_cli::StackPre),
Rego(crate::runtimes::rego::StackPre),
}

impl From<wapc::StackPre> for StackPre {
fn from(wapc_stack_pre: wapc::StackPre) -> Self {
StackPre::Wapc(wapc_stack_pre)
}
}

impl From<wasi_cli::StackPre> for StackPre {
fn from(wasi_stack_pre: wasi_cli::StackPre) -> Self {
StackPre::Wasi(wasi_stack_pre)
}
}

impl From<rego::StackPre> for StackPre {
fn from(rego_stack_pre: rego::StackPre) -> Self {
StackPre::Rego(rego_stack_pre)
}
}

/// This struct provides a way to quickly allocate a `PolicyEvaluator`
/// object.
///
/// See the [`rehydrate`](PolicyEvaluatorPre::rehydrate) method.
pub struct PolicyEvaluatorPre {
pub(crate) stack_pre: StackPre,
}

impl PolicyEvaluatorPre {
/// Create a `PolicyEvaluator` instance. The creation of the instance is achieved by
/// using wasmtime low level primitives (like `wasmtime::InstancePre`) to make the operation
/// as fast as possible.
///
/// Warning: the Rego stack cannot make use of these low level primitives, but its
/// instantiation times are negligible. More details inside of the
/// documentation of [`rego::StackPre`](crate::rego::stack_pre::StackPre).
pub fn rehydrate(&self, eval_ctx: &EvaluationContext) -> Result<PolicyEvaluator> {
let runtime = match &self.stack_pre {
StackPre::Wapc(stack_pre) => {
let wapc_stack = wapc::WapcStack::new_from_pre(stack_pre, eval_ctx)?;
Runtime::Wapc(wapc_stack)
}
StackPre::Wasi(stack_pre) => {
let wasi_stack = wasi_cli::Stack::new_from_pre(stack_pre);
Runtime::Cli(wasi_stack)
}
StackPre::Rego(stack_pre) => {
let rego_stack = rego::Stack::new_from_pre(stack_pre)?;
Runtime::Rego(rego_stack)
}
};

Ok(PolicyEvaluator::new(runtime))
}
}
4 changes: 2 additions & 2 deletions src/runtimes.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ pub(crate) mod wasi_cli;

pub(crate) enum Runtime {
Wapc(wapc::WapcStack),
Burrego(rego::BurregoStack),
Rego(rego::Stack),
Cli(wasi_cli::Stack),
}

@@ -17,7 +17,7 @@ impl Display for Runtime {
match self {
Runtime::Cli(_) => write!(f, "wasi"),
Runtime::Wapc(_) => write!(f, "wapc"),
Runtime::Burrego(stack) => match stack.policy_execution_mode {
Runtime::Rego(stack) => match stack.policy_execution_mode {
RegoPolicyExecutionMode::Opa => {
write!(f, "OPA")
}
3 changes: 3 additions & 0 deletions src/runtimes/rego/errors.rs
Original file line number Diff line number Diff line change
@@ -48,4 +48,7 @@ pub enum RegoRuntimeError {

#[error("invalid response from policy: {0}")]
InvalidResponseWithError(#[source] serde_json::Error),

#[error("cannot allocate Rego evaluator: {0}")]
EvaluatorError(String),
}
4 changes: 3 additions & 1 deletion src/runtimes/rego/mod.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,12 @@ mod gatekeeper_inventory;
mod opa_inventory;
mod runtime;
mod stack;
mod stack_pre;

use burrego::host_callbacks::HostCallbacks;
pub(crate) use runtime::Runtime;
pub(crate) use stack::BurregoStack;
pub(crate) use stack::Stack;
pub(crate) use stack_pre::StackPre;

#[tracing::instrument(level = "error")]
fn opa_abort(msg: &str) {}
4 changes: 2 additions & 2 deletions src/runtimes/rego/runtime.rs
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@ use crate::admission_response::{AdmissionResponse, AdmissionResponseStatus};
use crate::policy_evaluator::RegoPolicyExecutionMode;
use crate::policy_evaluator::{PolicySettings, ValidateRequest};
use crate::runtimes::rego::{
context_aware, context_aware::KubernetesContext, errors::RegoRuntimeError, BurregoStack,
context_aware, context_aware::KubernetesContext, errors::RegoRuntimeError, Stack,
};

pub(crate) struct Runtime<'a>(pub(crate) &'a mut BurregoStack);
pub(crate) struct Runtime<'a>(pub(crate) &'a mut Stack);

impl<'a> Runtime<'a> {
pub fn validate(
17 changes: 15 additions & 2 deletions src/runtimes/rego/stack.rs
Original file line number Diff line number Diff line change
@@ -10,16 +10,29 @@ use crate::{
errors::{RegoRuntimeError, Result},
gatekeeper_inventory::GatekeeperInventory,
opa_inventory::OpaInventory,
stack_pre::StackPre,
},
};

pub(crate) struct BurregoStack {
pub(crate) struct Stack {
pub evaluator: burrego::Evaluator,
pub entrypoint_id: i32,
pub policy_execution_mode: RegoPolicyExecutionMode,
}

impl BurregoStack {
impl Stack {
/// Create a new `Stack` using a `StackPre` object
pub fn new_from_pre(stack_pre: &StackPre) -> Result<Self> {
let evaluator = stack_pre
.rehydrate()
.map_err(|e| RegoRuntimeError::EvaluatorError(e.to_string()))?;
Ok(Self {
evaluator,
entrypoint_id: stack_pre.entrypoint_id,
policy_execution_mode: stack_pre.policy_execution_mode.clone(),
})
}

pub fn build_kubernetes_context(
&self,
callback_channel: Option<&mpsc::Sender<CallbackRequest>>,
56 changes: 56 additions & 0 deletions src/runtimes/rego/stack_pre.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use anyhow::Result;

use crate::policy_evaluator::RegoPolicyExecutionMode;
use crate::policy_evaluator_builder::EpochDeadlines;

/// This struct allows to follow the `StackPre -> Stack`
/// "pattern" also for Rego policies.
///
/// However, Rego policies cannot make use of `wasmtime::InstancePre`
/// to reduce the instantiation times. That happens because all
/// Rego WebAssembly policies import their Wasm Memory from the host.
/// The Wasm Memory is defined inside of a `wasmtime::Store`, which is
/// something that `wasmtime::InstnacePre` objects do not have (rightfully!).
///
/// However, Rego Wasm modules are so small thta instantiating them from scratch
/// is already a cheap operation.
#[derive(Clone)]
pub(crate) struct StackPre {
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
pub entrypoint_id: i32,
pub policy_execution_mode: RegoPolicyExecutionMode,
}

impl StackPre {
pub(crate) fn new(
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
entrypoint_id: i32,
policy_execution_mode: RegoPolicyExecutionMode,
) -> Self {
Self {
engine,
module,
epoch_deadlines,
entrypoint_id,
policy_execution_mode,
}
}

/// Create a fresh `burrego::Evaluator`
pub(crate) fn rehydrate(&self) -> Result<burrego::Evaluator> {
let mut builder = burrego::EvaluatorBuilder::default()
.engine(&self.engine)
.module(self.module.clone())
.host_callbacks(crate::runtimes::rego::new_host_callbacks());

if let Some(deadlines) = self.epoch_deadlines {
builder = builder.enable_epoch_interruptions(deadlines.wapc_func);
}
let evaluator = builder.build()?;
Ok(evaluator)
}
}
487 changes: 240 additions & 247 deletions src/runtimes/wapc/callback.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/runtimes/wapc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod callback;
mod runtime;
mod stack;
mod stack_pre;

pub(crate) use runtime::Runtime;
pub(crate) use stack::WapcStack;
pub(crate) use stack_pre::StackPre;
4 changes: 2 additions & 2 deletions src/runtimes/wapc/runtime.rs
Original file line number Diff line number Diff line change
@@ -168,7 +168,7 @@ mod tests {
evaluation_context::EvaluationContext, runtimes::wapc::callback::new_host_callback,
};
use std::{
sync::{self, Arc, Mutex},
sync::{self, Arc},
thread, time,
};

@@ -206,7 +206,7 @@ mod tests {
ctx_aware_resources_allow_list: Default::default(),
};

let eval_ctx = Arc::new(Mutex::new(eval_ctx));
let eval_ctx = Arc::new(eval_ctx);

let wapc_engine = wapc_engine_builder
.build()
69 changes: 17 additions & 52 deletions src/runtimes/wapc/stack.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,26 @@
use anyhow::Result;
use std::sync::{Arc, Mutex};
use wasmtime_provider::wasmtime;
use std::sync::Arc;

use crate::evaluation_context::EvaluationContext;
use crate::policy_evaluator_builder::EpochDeadlines;
use crate::runtimes::wapc::callback::new_host_callback;

use super::StackPre;

pub(crate) struct WapcStack {
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
wapc_host: wapc::WapcHost,
eval_ctx: Arc<Mutex<EvaluationContext>>,
stack_pre: StackPre,
eval_ctx: Arc<EvaluationContext>,
}

impl WapcStack {
pub(crate) fn new(
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
eval_ctx: EvaluationContext,
) -> Result<Self> {
let eval_ctx = Arc::new(Mutex::new(eval_ctx));

let wapc_host = Self::setup_wapc_host(
// Using `clone` on an `Engine` is a cheap operation
engine.clone(),
// Using `clone` on a `Module` is a cheap operation
module.clone(),
epoch_deadlines,
// Using `clone` on an `Arc` is a cheap operation
eval_ctx.clone(),
)?;
pub(crate) fn new_from_pre(stack_pre: &StackPre, eval_ctx: &EvaluationContext) -> Result<Self> {
let eval_ctx = Arc::new(eval_ctx.to_owned());
let wapc_host = Self::wapc_host_from_pre(stack_pre, eval_ctx.clone())?;

Ok(Self {
engine,
module,
epoch_deadlines,
wapc_host,
eval_ctx,
stack_pre: stack_pre.to_owned(),
eval_ctx: eval_ctx.to_owned(),
})
}

@@ -50,12 +32,7 @@ impl WapcStack {
/// variable.
pub(crate) fn reset(&mut self) -> Result<()> {
// Create a new wapc_host
let new_wapc_host = Self::setup_wapc_host(
self.engine.clone(),
self.module.clone(),
self.epoch_deadlines,
self.eval_ctx.clone(),
)?;
let new_wapc_host = Self::wapc_host_from_pre(&self.stack_pre, self.eval_ctx.clone())?;

self.wapc_host = new_wapc_host;

@@ -71,25 +48,13 @@ impl WapcStack {
self.wapc_host.call(op, payload)
}

pub(crate) fn set_eval_ctx(&mut self, eval_ctx: &EvaluationContext) {
let mut eval_ctx_orig = self.eval_ctx.lock().unwrap();
eval_ctx_orig.copy_from(eval_ctx);
}

fn setup_wapc_host(
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
eval_ctx: Arc<Mutex<EvaluationContext>>,
/// Create a new `WapcHost` by rehydrating the `StackPre`. This is faster than creating the
/// `WasmtimeEngineProvider` from scratch
fn wapc_host_from_pre(
pre: &StackPre,
eval_ctx: Arc<EvaluationContext>,
) -> Result<wapc::WapcHost> {
let mut builder = wasmtime_provider::WasmtimeEngineProviderBuilder::new()
.engine(engine)
.module(module);
if let Some(deadlines) = epoch_deadlines {
builder = builder.enable_epoch_interruptions(deadlines.wapc_init, deadlines.wapc_func);
}

let engine_provider = builder.build()?;
let engine_provider = pre.rehydrate()?;
let wapc_host =
wapc::WapcHost::new(Box::new(engine_provider), Some(new_host_callback(eval_ctx)))?;
Ok(wapc_host)
36 changes: 36 additions & 0 deletions src/runtimes/wapc/stack_pre.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use anyhow::Result;
use wasmtime_provider::wasmtime;

use crate::policy_evaluator_builder::EpochDeadlines;

/// Reduce allocation time of new `WasmtimeProviderEngine`, see the `rehydrate` method
#[derive(Clone)]
pub(crate) struct StackPre {
engine_provider_pre: wasmtime_provider::WasmtimeEngineProviderPre,
}

impl StackPre {
pub(crate) fn new(
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
) -> Result<Self> {
let mut builder = wasmtime_provider::WasmtimeEngineProviderBuilder::new()
.engine(engine)
.module(module);
if let Some(deadlines) = epoch_deadlines {
builder = builder.enable_epoch_interruptions(deadlines.wapc_init, deadlines.wapc_func);
}

let engine_provider_pre = builder.build_pre()?;
Ok(Self {
engine_provider_pre,
})
}

/// Allocate a new `WasmtimeEngineProvider` instance by using a pre-allocated instance
pub(crate) fn rehydrate(&self) -> Result<wasmtime_provider::WasmtimeEngineProvider> {
let engine = self.engine_provider_pre.rehydrate()?;
Ok(engine)
}
}
2 changes: 2 additions & 0 deletions src/runtimes/wasi_cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod errors;
mod runtime;
mod stack;
mod stack_pre;

pub(crate) use runtime::Runtime;
pub(crate) use stack::Stack;
pub(crate) use stack_pre::StackPre;
112 changes: 10 additions & 102 deletions src/runtimes/wasi_cli/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,17 @@
use kubewarden_policy_sdk::response::ValidationResponse as PolicyValidationResponse;
use kubewarden_policy_sdk::settings::SettingsValidationResponse;
use serde_json::json;
use std::io::Cursor;
use tracing::{error, warn};
use wasi_common::pipe::{ReadPipe, WritePipe};
use wasmtime_wasi::sync::WasiCtxBuilder;

use super::{errors::WasiRuntimeError, stack};
use crate::admission_response::AdmissionResponse;
use crate::policy_evaluator::{PolicySettings, ValidateRequest};
use crate::runtimes::wasi_cli::stack::{RunResult, Stack};

const EXIT_SUCCESS: i32 = 0;

pub(crate) struct Runtime<'a>(pub(crate) &'a mut stack::Stack);

struct ExcutionResult {
stdout: String,
stderr: String,
}
pub(crate) struct Runtime<'a>(pub(crate) &'a Stack);

impl<'a> Runtime<'a> {
/// executes the wasi cli program
fn execute(
&mut self,
input: Vec<u8>,
args: &[String],
) -> std::result::Result<ExcutionResult, WasiRuntimeError> {
let stdout_pipe = WritePipe::new_in_memory();
let stderr_pipe = WritePipe::new_in_memory();
let stdin_pipe = ReadPipe::new(Cursor::new(input));

let wasi_ctx = WasiCtxBuilder::new()
.args(args)?
.stdin(Box::new(stdin_pipe))
.stdout(Box::new(stdout_pipe.clone()))
.stderr(Box::new(stderr_pipe.clone()))
.build();
let ctx = stack::Context { wasi_ctx };

let mut store = wasmtime::Store::new(&self.0.engine, ctx);
if let Some(deadline) = self.0.epoch_deadlines {
store.set_epoch_deadline(deadline.wapc_func);
}

let instance = self
.0
.instance_pre
.instantiate(&mut store)
.map_err(WasiRuntimeError::WasmInstantiate)?;
let start_fn = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(WasiRuntimeError::WasmMissingStartFn)?;
let evaluation_result = start_fn.call(&mut store, ());

// Dropping the store, this is no longer needed, plus it's keeping
// references to the WritePipe(s) that we need exclusive access to.
drop(store);

let stderr = pipe_to_string("stderr", stderr_pipe)?;

if let Err(err) = evaluation_result {
if let Some(exit_error) = err.downcast_ref::<wasmtime_wasi::I32Exit>() {
if exit_error.0 == EXIT_SUCCESS {
let stdout = pipe_to_string("stdout", stdout_pipe)?;
return Ok(ExcutionResult { stdout, stderr });
} else {
return Err(WasiRuntimeError::WasiEvaluation {
code: Some(exit_error.0),
stderr,
error: err,
});
}
}
return Err(WasiRuntimeError::WasiEvaluation {
code: None,
stderr,
error: err,
});
}

let stdout = pipe_to_string("stdout", stdout_pipe)?;
Ok(ExcutionResult { stdout, stderr })
}

pub fn validate(
&mut self,
&self,
settings: &PolicySettings,
request: &ValidateRequest,
) -> AdmissionResponse {
@@ -106,10 +33,10 @@ impl<'a> Runtime<'a> {
);
}
};
let args = vec!["policy.wasm".to_string(), "validate".to_string()];
let args = ["policy.wasm", "validate"];

match self.execute(input, &args) {
Ok(ExcutionResult { stdout, stderr }) => {
match self.0.run(&input, &args) {
Ok(RunResult { stdout, stderr }) => {
if !stderr.is_empty() {
warn!(
request = request.uid().to_string(),
@@ -152,11 +79,11 @@ impl<'a> Runtime<'a> {
}
}

pub fn validate_settings(&mut self, settings: String) -> SettingsValidationResponse {
let args = vec!["policy.wasm".to_string(), "validate-settings".to_string()];
pub fn validate_settings(&self, settings: String) -> SettingsValidationResponse {
let args = ["policy.wasm", "validate-settings"];

match self.execute(settings.as_bytes().to_owned(), &args) {
Ok(ExcutionResult { stdout, stderr }) => {
match self.0.run(settings.as_bytes(), &args) {
Ok(RunResult { stdout, stderr }) => {
if !stderr.is_empty() {
warn!(operation = "validate-settings", "stderr: {:?}", stderr)
}
@@ -175,22 +102,3 @@ impl<'a> Runtime<'a> {
}
}
}

fn pipe_to_string(
name: &str,
pipe: WritePipe<Cursor<Vec<u8>>>,
) -> std::result::Result<String, WasiRuntimeError> {
match pipe.try_into_inner() {
Ok(cursor) => {
let buf = cursor.into_inner();
String::from_utf8(buf).map_err(|e| WasiRuntimeError::PipeConversion {
name: name.to_string(),
error: format!("Cannot convert buffer to UTF8 string: {e}"),
})
}
Err(_) => Err(WasiRuntimeError::PipeConversion {
name: name.to_string(),
error: "cannot convert pipe into inner".to_string(),
}),
}
}
116 changes: 95 additions & 21 deletions src/runtimes/wasi_cli/stack.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,108 @@
use anyhow::Result;
use std::io::Cursor;
use wasi_common::pipe::{ReadPipe, WritePipe};
use wasi_common::WasiCtx;
use wasmtime::{Engine, InstancePre, Linker, Module};
use wasmtime_wasi::sync::WasiCtxBuilder;

use crate::policy_evaluator_builder::EpochDeadlines;
use crate::runtimes::wasi_cli::{errors::WasiRuntimeError, stack_pre::StackPre};

const EXIT_SUCCESS: i32 = 0;

pub(crate) struct Context {
pub(crate) wasi_ctx: WasiCtx,
}

pub(crate) struct Stack {
pub(crate) engine: Engine,
pub(crate) epoch_deadlines: Option<EpochDeadlines>,
pub(crate) instance_pre: InstancePre<Context>,
stack_pre: StackPre,
}

pub(crate) struct RunResult {
pub stdout: String,
pub stderr: String,
}

impl Stack {
pub(crate) fn new(
engine: Engine,
module: Module,
epoch_deadlines: Option<EpochDeadlines>,
) -> Result<Self> {
let mut linker = Linker::<Context>::new(&engine);
wasmtime_wasi::add_to_linker(&mut linker, |c: &mut Context| &mut c.wasi_ctx)?;

let instance_pre = linker.instantiate_pre(&module)?;

Ok(Stack {
engine,
instance_pre,
epoch_deadlines,
})
pub(crate) fn new_from_pre(stack_pre: &StackPre) -> Self {
Self {
stack_pre: stack_pre.to_owned(),
}
}

/// Run a WASI program with the given input and args
pub(crate) fn run(
&self,
input: &[u8],
args: &[&str],
) -> std::result::Result<RunResult, WasiRuntimeError> {
let stdout_pipe = WritePipe::new_in_memory();
let stderr_pipe = WritePipe::new_in_memory();
let stdin_pipe = ReadPipe::new(Cursor::new(input.to_owned()));

let args: Vec<String> = args.iter().map(|s| s.to_string()).collect();

let wasi_ctx = WasiCtxBuilder::new()
.args(&args)?
.stdin(Box::new(stdin_pipe))
.stdout(Box::new(stdout_pipe.clone()))
.stderr(Box::new(stderr_pipe.clone()))
.build();
let ctx = Context { wasi_ctx };

let mut store = self.stack_pre.build_store(ctx);
let instance = self
.stack_pre
.rehydrate(&mut store)
.map_err(WasiRuntimeError::WasmInstantiate)?;
let start_fn = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(WasiRuntimeError::WasmMissingStartFn)?;
let evaluation_result = start_fn.call(&mut store, ());

// Dropping the store, this is no longer needed, plus it's keeping
// references to the WritePipe(s) that we need exclusive access to.
drop(store);

let stderr = pipe_to_string("stderr", stderr_pipe)?;

if let Err(err) = evaluation_result {
if let Some(exit_error) = err.downcast_ref::<wasmtime_wasi::I32Exit>() {
if exit_error.0 == EXIT_SUCCESS {
let stdout = pipe_to_string("stdout", stdout_pipe)?;
return Ok(RunResult { stdout, stderr });
} else {
return Err(WasiRuntimeError::WasiEvaluation {
code: Some(exit_error.0),
stderr,
error: err,
});
}
}
return Err(WasiRuntimeError::WasiEvaluation {
code: None,
stderr,
error: err,
});
}

let stdout = pipe_to_string("stdout", stdout_pipe)?;
Ok(RunResult { stdout, stderr })
}
}

fn pipe_to_string(
name: &str,
pipe: WritePipe<Cursor<Vec<u8>>>,
) -> std::result::Result<String, WasiRuntimeError> {
match pipe.try_into_inner() {
Ok(cursor) => {
let buf = cursor.into_inner();
String::from_utf8(buf).map_err(|e| WasiRuntimeError::PipeConversion {
name: name.to_string(),
error: format!("Cannot convert buffer to UTF8 string: {e}"),
})
}
Err(_) => Err(WasiRuntimeError::PipeConversion {
name: name.to_string(),
error: "cannot convert pipe into inner".to_string(),
}),
}
}
50 changes: 50 additions & 0 deletions src/runtimes/wasi_cli/stack_pre.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use anyhow::Result;

use crate::policy_evaluator_builder::EpochDeadlines;
use crate::runtimes::wasi_cli::stack::Context;

/// Reduce the allocation time of a Wasi Stack. This is done by leveraging `wasmtime::InstancePre`.
#[derive(Clone)]
pub(crate) struct StackPre {
engine: wasmtime::Engine,
instance_pre: wasmtime::InstancePre<Context>,
epoch_deadlines: Option<EpochDeadlines>,
}

impl StackPre {
pub(crate) fn new(
engine: wasmtime::Engine,
module: wasmtime::Module,
epoch_deadlines: Option<EpochDeadlines>,
) -> Result<Self> {
let mut linker = wasmtime::Linker::<Context>::new(&engine);
wasmtime_wasi::add_to_linker(&mut linker, |c: &mut Context| &mut c.wasi_ctx)?;

let instance_pre = linker.instantiate_pre(&module)?;
Ok(Self {
engine,
instance_pre,
epoch_deadlines,
})
}

/// Create a brand new `wasmtime::Store` to be used during an evaluation
pub(crate) fn build_store(&self, ctx: Context) -> wasmtime::Store<Context> {
let mut store = wasmtime::Store::new(&self.engine, ctx);
if let Some(deadline) = self.epoch_deadlines {
store.set_epoch_deadline(deadline.wapc_func);
}

store
}

/// Allocate a new `wasmtime::Instance` that is bound to the given `wasmtime::Store`.
/// It's recommended to provide a brand new `wasmtime::Store` created by the
/// `build_store` method
pub(crate) fn rehydrate(
&self,
store: &mut wasmtime::Store<Context>,
) -> Result<wasmtime::Instance> {
self.instance_pre.instantiate(store)
}
}

0 comments on commit 130f6ec

Please sign in to comment.