Skip to content

Commit

Permalink
Merge pull request #364 from flavio/rego-context-aware
Browse files Browse the repository at this point in the history
rego context aware
  • Loading branch information
flavio authored Oct 25, 2023
2 parents 086640e + 2c19206 commit 22d424a
Show file tree
Hide file tree
Showing 19 changed files with 1,059 additions and 34 deletions.
26 changes: 26 additions & 0 deletions src/callback_handler/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ impl Client {
.map_err(anyhow::Error::new)?
.ok_or_else(|| anyhow!("Cannot find {api_version}/{kind} named '{name}' inside of namespace '{namespace:?}'"))
}

async fn get_resource_plural_name(&mut self, api_version: &str, kind: &str) -> Result<String> {
let resource = self.build_kube_resource(api_version, kind).await?;
Ok(resource.resource.plural)
}
}

#[cached(
Expand Down Expand Up @@ -287,3 +292,24 @@ pub(crate) async fn get_resource_cached(
) -> Result<cached::Return<kube::core::DynamicObject>> {
get_resource(client, api_version, kind, name, namespace).await
}

pub(crate) async fn get_resource_plural_name(
client: Option<&mut Client>,
api_version: &str,
kind: &str,
) -> Result<cached::Return<String>> {
if client.is_none() {
return Err(anyhow!("kube::Client was not initialized properly"));
}

client
.unwrap()
.get_resource_plural_name(api_version, kind)
.await
.map(|value| cached::Return {
// this is always cached, because the client builds an overview of
// the cluster resources at bootstrap time
was_cached: true,
value,
})
}
18 changes: 18 additions & 0 deletions src/callback_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,24 @@ impl CallbackHandler {
)
}
}
CallbackRequestType::KubernetesGetResourcePluralName {
api_version,
kind,
} => {
handle_callback!(
req,
format!("{api_version}/{kind}"),
"Get Kubernetes resource plural name",
{
kubernetes::get_resource_plural_name(
self.kubernetes_client.as_mut(),
&api_version,
&kind,
)
}
)
}

}
}
},
Expand Down
8 changes: 8 additions & 0 deletions src/callback_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ pub enum CallbackRequestType {
/// might cause issues to the cluster
disable_cache: bool,
},

/// Get the plural name of a Kubernetes resource. E.g. `v1/Service` -> `services`
KubernetesGetResourcePluralName {
/// apiVersion of the resource (v1 for core group, groupName/groupVersions for other).
api_version: String,
/// Singular PascalCase name of the resource
kind: String,
},
}

impl From<SigstoreVerificationInputV2> for CallbackRequestType {
Expand Down
15 changes: 13 additions & 2 deletions src/policy_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{convert::TryFrom, fmt};
use crate::admission_request::AdmissionRequest;
use crate::admission_response::AdmissionResponse;
use crate::policy::Policy;
use crate::runtimes::burrego::Runtime as BurregoRuntime;
use crate::runtimes::rego::Runtime as BurregoRuntime;
use crate::runtimes::wapc::Runtime as WapcRuntime;
use crate::runtimes::wasi_cli::Runtime as WasiRuntime;
use crate::runtimes::Runtime;
Expand Down Expand Up @@ -110,7 +110,18 @@ impl Evaluator for PolicyEvaluator {
WapcRuntime(wapc_host).validate(&self.settings, &request)
}
Runtime::Burrego(ref mut burrego_evaluator) => {
BurregoRuntime(burrego_evaluator).validate(&self.settings, &request)
let kube_ctx = burrego_evaluator.build_kubernetes_context(
self.policy.callback_channel.as_ref(),
&self.policy.ctx_aware_resources_allow_list,
);
match kube_ctx {
Ok(ctx) => {
BurregoRuntime(burrego_evaluator).validate(&self.settings, &request, &ctx)
}
Err(e) => {
AdmissionResponse::reject(request.uid().to_string(), e.to_string(), 500)
}
}
}
Runtime::Cli(ref mut cli_stack) => {
WasiRuntime(cli_stack).validate(&self.settings, &request)
Expand Down
8 changes: 4 additions & 4 deletions src/policy_evaluator_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::policy::Policy;
use crate::policy_evaluator::{PolicyEvaluator, PolicyExecutionMode};
use crate::policy_metadata::ContextAwareResource;
use crate::runtimes::wapc::WAPC_POLICY_MAPPING;
use crate::runtimes::{burrego::BurregoStack, wapc::WapcStack, wasi_cli, Runtime};
use crate::runtimes::{rego::BurregoStack, wapc::WapcStack, wasi_cli, Runtime};

/// Configure behavior of wasmtime [epoch-based interruptions](https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.epoch_interruption)
///
Expand Down Expand Up @@ -279,8 +279,8 @@ impl PolicyEvaluatorBuilder {
PolicyExecutionMode::Opa | PolicyExecutionMode::OpaGatekeeper => {
let policy = Self::from_contents_internal(
self.policy_id.clone(),
None, // callback_channel is not used by Rego policies
None,
self.callback_channel.clone(),
Some(self.ctx_aware_resources_allow_list.clone()),
|| None,
Policy::new,
execution_mode,
Expand All @@ -289,7 +289,7 @@ impl PolicyEvaluatorBuilder {
let mut builder = burrego::EvaluatorBuilder::default()
.engine(&engine)
.module(module)
.host_callbacks(crate::runtimes::burrego::new_host_callbacks());
.host_callbacks(crate::runtimes::rego::new_host_callbacks());

if let Some(deadlines) = self.epoch_deadlines {
builder = builder.enable_epoch_interruptions(deadlines.wapc_func);
Expand Down
4 changes: 2 additions & 2 deletions src/runtimes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub mod burrego;
pub(crate) mod rego;
pub(crate) mod wapc;
pub(crate) mod wasi_cli;

pub(crate) enum Runtime {
Wapc(wapc::WapcStack),
Burrego(burrego::BurregoStack),
Burrego(rego::BurregoStack),
Cli(wasi_cli::Stack),
}
247 changes: 247 additions & 0 deletions src/runtimes/rego/context_aware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
use std::collections::{HashMap, HashSet};

use crate::{
callback_requests::{CallbackRequest, CallbackRequestType, CallbackResponse},
policy_metadata::ContextAwareResource,
runtimes::rego::{gatekeeper_inventory::GatekeeperInventory, opa_inventory::OpaInventory},
};
use anyhow::{anyhow, Result};
use kube::api::ObjectList;
use tokio::sync::{mpsc, oneshot};

#[derive(serde::Serialize)]
#[serde(untagged)]
pub(crate) enum KubernetesContext {
Empty,
Opa(OpaInventory),
Gatekeeper(GatekeeperInventory),
}

/// Uses the callback channel to get all the Kubernetes resources defined inside of
/// the cluster whose type is mentioned inside of `allowed_resources`.
///
/// The resources are returned based on the actual RBAC privileges of the client
/// used by the runtime.
pub(crate) fn get_allowed_resources(
callback_channel: &mpsc::Sender<CallbackRequest>,
allowed_resources: &HashSet<ContextAwareResource>,
) -> Result<HashMap<ContextAwareResource, ObjectList<kube::core::DynamicObject>>> {
let mut kube_resources: HashMap<ContextAwareResource, ObjectList<kube::core::DynamicObject>> =
HashMap::new();

for resource in allowed_resources {
let resource_list = get_all_resources_by_type(callback_channel, resource)?;
kube_resources.insert(resource.to_owned(), resource_list);
}

Ok(kube_resources)
}

fn get_all_resources_by_type(
callback_channel: &mpsc::Sender<CallbackRequest>,
resource_type: &ContextAwareResource,
) -> Result<ObjectList<kube::core::DynamicObject>> {
let req_type = CallbackRequestType::KubernetesListResourceAll {
api_version: resource_type.api_version.to_owned(),
kind: resource_type.kind.to_owned(),
label_selector: None,
field_selector: None,
};

let response = make_request_via_callback_channel(req_type, callback_channel)?;
serde_json::from_slice::<ObjectList<kube::core::DynamicObject>>(&response.payload).map_err(
|e| anyhow!("cannot convert callback response into a list of kubernetes objects: {e}"),
)
}

/// Creates a map that has ContextAwareResource as key, and its plural name as value.
/// For example, the key for {`apps/v1`, `Deployment`} will have `deployments` as value.
/// The map is built by making request via the given callback channel.
pub(crate) fn get_plural_names(
callback_channel: &mpsc::Sender<CallbackRequest>,
allowed_resources: &HashSet<ContextAwareResource>,
) -> Result<HashMap<ContextAwareResource, String>> {
let mut plural_names_by_resource: HashMap<ContextAwareResource, String> = HashMap::new();

for resource in allowed_resources {
let req_type = CallbackRequestType::KubernetesGetResourcePluralName {
api_version: resource.api_version.to_owned(),
kind: resource.kind.to_owned(),
};

let response = make_request_via_callback_channel(req_type, callback_channel)?;
let plural_name = serde_json::from_slice::<String>(&response.payload).map_err(|e| {
anyhow!("get plural name failure, cannot convert callback response: {e}")
})?;

plural_names_by_resource.insert(resource.to_owned(), plural_name);
}

Ok(plural_names_by_resource)
}

/// Internal helper function that sends a request over the callback channel and returns the
/// response
fn make_request_via_callback_channel(
request_type: CallbackRequestType,
callback_channel: &mpsc::Sender<CallbackRequest>,
) -> Result<CallbackResponse> {
let (tx, mut rx) = oneshot::channel::<Result<CallbackResponse>>();
let req = CallbackRequest {
request: request_type,
response_channel: tx,
};
callback_channel
.try_send(req)
.map_err(|e| anyhow!("error sending request over callback channel: {e}"))?;

loop {
// Note: we cannot use `rx.blocking_recv`. The code would compile, but at runtime we would
// have a panic because this function is used inside of an async block. The `blocking_recv`
// method causes the tokio reactor to stop, which leads to a panic
match rx.try_recv() {
Ok(msg) => return msg,
Err(oneshot::error::TryRecvError::Empty) => {
// do nothing, keep waiting for a reply
}
Err(e) => {
return Err(anyhow!(
"error obtaining response from callback channel: {e}"
));
}
}
}
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use assert_json_diff::assert_json_eq;
use serde_json::json;
use std::path::Path;

pub fn dynamic_object_from_fixture(
resource_type: &str,
namespace: Option<&str>,
name: &str,
) -> Result<kube::core::DynamicObject> {
let path = Path::new("test_data/fixtures/kube_context")
.join(resource_type)
.join(namespace.unwrap_or_default())
.join(format!("{name}.json"));
let contents = std::fs::read(path.clone())
.map_err(|e| anyhow!("canont read fixture from path: {path:?}: {e}"))?;
serde_json::from_slice::<kube::core::DynamicObject>(&contents)
.map_err(|e| anyhow!("json conversion error: {e}"))
}

pub fn object_list_from_dynamic_objects(
objs: &[kube::core::DynamicObject],
) -> Result<ObjectList<kube::core::DynamicObject>> {
let raw_json = json!(
{
"items": objs,
"metadata": {
"resourceVersion": ""
}
}
);

let res: ObjectList<kube::core::DynamicObject> = serde_json::from_value(raw_json)
.map_err(|e| anyhow!("cannot create ObjectList because of json error: {e}"))?;
Ok(res)
}

#[tokio::test(flavor = "multi_thread")]
async fn get_all_resources_success() {
let (callback_tx, mut callback_rx) = mpsc::channel::<CallbackRequest>(10);
let resource = ContextAwareResource {
api_version: "v1".to_string(),
kind: "Service".to_string(),
};
let expected_resource = resource.clone();
let services = [
dynamic_object_from_fixture("services", Some("kube-system"), "kube-dns").unwrap(),
dynamic_object_from_fixture("services", Some("kube-system"), "metrics-server").unwrap(),
];
let services_list = object_list_from_dynamic_objects(&services).unwrap();

tokio::spawn(async move {
let req = match callback_rx.recv().await {
Some(r) => r,
None => return,
};
match req.request {
CallbackRequestType::KubernetesListResourceAll {
api_version,
kind,
label_selector,
field_selector,
} => {
assert_eq!(api_version, expected_resource.api_version);
assert_eq!(kind, expected_resource.kind);
assert!(label_selector.is_none());
assert!(field_selector.is_none());
}
_ => {
panic!("not the expected request type");
}
};

let services_list = object_list_from_dynamic_objects(&services).unwrap();
let callback_response = CallbackResponse {
payload: serde_json::to_vec(&services_list).unwrap(),
};

req.response_channel.send(Ok(callback_response)).unwrap();
});

let actual = get_all_resources_by_type(&callback_tx, &resource).unwrap();
let actual_json = serde_json::to_value(&actual).unwrap();
let expected_json = serde_json::to_value(&services_list).unwrap();
assert_json_eq!(actual_json, expected_json);
}

#[tokio::test(flavor = "multi_thread")]
async fn get_resource_plural_name_success() {
let (callback_tx, mut callback_rx) = mpsc::channel::<CallbackRequest>(10);
let resource = ContextAwareResource {
api_version: "v1".to_string(),
kind: "Service".to_string(),
};
let plural_name = "services";

let mut resources: HashSet<ContextAwareResource> = HashSet::new();
resources.insert(resource.clone());

let mut expected_names: HashMap<ContextAwareResource, String> = HashMap::new();
expected_names.insert(resource.clone(), plural_name.to_string());

let expected_resource = resource.clone();

tokio::spawn(async move {
let req = match callback_rx.recv().await {
Some(r) => r,
None => return,
};
match req.request {
CallbackRequestType::KubernetesGetResourcePluralName { api_version, kind } => {
assert_eq!(api_version, expected_resource.api_version);
assert_eq!(kind, expected_resource.kind);
}
_ => {
panic!("not the expected request type");
}
};

let callback_response = CallbackResponse {
payload: serde_json::to_vec(&plural_name).unwrap(),
};

req.response_channel.send(Ok(callback_response)).unwrap();
});

let actual = get_plural_names(&callback_tx, &resources).unwrap();
assert_eq!(actual, expected_names);
}
}
Loading

0 comments on commit 22d424a

Please sign in to comment.