Skip to content

Commit

Permalink
feat: allow Rego policies to be context aware
Browse files Browse the repository at this point in the history
Allow Rego policies, both OPA and Gatekeeper ones, to leverage
Kubernetes data at evaluation time.

This change doesn't break the API of policy-evaluator, nor requires any
special action by the consumers of this crate.

Signed-off-by: Flavio Castelli <fcastelli@suse.com>
  • Loading branch information
flavio committed Oct 25, 2023
1 parent 5113ecf commit 49d5726
Show file tree
Hide file tree
Showing 9 changed files with 906 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/callback_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl CallbackHandler {
handle_callback!(
req,
format!("{api_version}/{kind}"),
"Is Kubernetes resource namespaced",
"Get Kubernetes resource plural name",
{
kubernetes::get_resource_plural_name(
self.kubernetes_client.as_mut(),
Expand Down
13 changes: 12 additions & 1 deletion src/policy_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,18 @@ impl Evaluator for PolicyEvaluator {
WapcRuntime(wapc_host).validate(&self.settings, &request)
}
Runtime::Burrego(ref mut burrego_evaluator) => {
BurregoRuntime(burrego_evaluator).validate(&self.settings, &request)
let kube_ctx = burrego_evaluator.build_kubernetes_context(
self.policy.callback_channel.as_ref(),
&self.policy.ctx_aware_resources_allow_list,
);
match kube_ctx {
Ok(ctx) => {
BurregoRuntime(burrego_evaluator).validate(&self.settings, &request, &ctx)
}
Err(e) => {
AdmissionResponse::reject(request.uid().to_string(), e.to_string(), 500)
}
}
}
Runtime::Cli(ref mut cli_stack) => {
WasiRuntime(cli_stack).validate(&self.settings, &request)
Expand Down
4 changes: 2 additions & 2 deletions src/policy_evaluator_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ impl PolicyEvaluatorBuilder {
PolicyExecutionMode::Opa | PolicyExecutionMode::OpaGatekeeper => {
let policy = Self::from_contents_internal(
self.policy_id.clone(),
None, // callback_channel is not used by Rego policies
None,
self.callback_channel.clone(),
Some(self.ctx_aware_resources_allow_list.clone()),
|| None,
Policy::new,
execution_mode,
Expand Down
247 changes: 247 additions & 0 deletions src/runtimes/rego/context_aware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
use std::collections::{HashMap, HashSet};

use crate::{
callback_requests::{CallbackRequest, CallbackRequestType, CallbackResponse},
policy_metadata::ContextAwareResource,
runtimes::rego::{gatekeeper_inventory::GatekeeperInventory, opa_inventory::OpaInventory},
};
use anyhow::{anyhow, Result};
use kube::api::ObjectList;
use tokio::sync::{mpsc, oneshot};

#[derive(serde::Serialize)]
#[serde(untagged)]
pub(crate) enum KubernetesContext {
Empty,
Opa(OpaInventory),
Gatekeeper(GatekeeperInventory),
}

/// Uses the callback channel to get all the Kubernetes resources defined inside of
/// the cluster whose type is mentioned inside of `allowed_resources`.
///
/// The resources are returned based on the actual RBAC privileges of the client
/// used by the runtime.
pub(crate) fn get_allowed_resources(
callback_channel: &mpsc::Sender<CallbackRequest>,
allowed_resources: &HashSet<ContextAwareResource>,
) -> Result<HashMap<ContextAwareResource, ObjectList<kube::core::DynamicObject>>> {
let mut kube_resources: HashMap<ContextAwareResource, ObjectList<kube::core::DynamicObject>> =
HashMap::new();

for resource in allowed_resources {
let resource_list = get_all_resources_by_type(callback_channel, resource)?;
kube_resources.insert(resource.to_owned(), resource_list);
}

Ok(kube_resources)
}

fn get_all_resources_by_type(
callback_channel: &mpsc::Sender<CallbackRequest>,
resource_type: &ContextAwareResource,
) -> Result<ObjectList<kube::core::DynamicObject>> {
let req_type = CallbackRequestType::KubernetesListResourceAll {
api_version: resource_type.api_version.to_owned(),
kind: resource_type.kind.to_owned(),
label_selector: None,
field_selector: None,
};

let response = make_request_via_callback_channel(req_type, callback_channel)?;
serde_json::from_slice::<ObjectList<kube::core::DynamicObject>>(&response.payload).map_err(
|e| anyhow!("cannot convert callback response into a list of kubernetes objects: {e}"),
)
}

/// Creates a map that has ContextAwareResource as key, and its plural name as value.
/// For example, the key for {`apps/v1`, `Deployment`} will have `deployments` as value.
/// The map is built by making request via the given callback channel.
pub(crate) fn get_plural_names(
callback_channel: &mpsc::Sender<CallbackRequest>,
allowed_resources: &HashSet<ContextAwareResource>,
) -> Result<HashMap<ContextAwareResource, String>> {
let mut plural_names_by_resource: HashMap<ContextAwareResource, String> = HashMap::new();

for resource in allowed_resources {
let req_type = CallbackRequestType::KubernetesGetResourcePluralName {
api_version: resource.api_version.to_owned(),
kind: resource.kind.to_owned(),
};

let response = make_request_via_callback_channel(req_type, callback_channel)?;
let plural_name = serde_json::from_slice::<String>(&response.payload).map_err(|e| {
anyhow!("get plural name failure, cannot convert callback response: {e}")
})?;

plural_names_by_resource.insert(resource.to_owned(), plural_name);
}

Ok(plural_names_by_resource)
}

/// Internal helper function that sends a request over the callback channel and returns the
/// response
fn make_request_via_callback_channel(
request_type: CallbackRequestType,
callback_channel: &mpsc::Sender<CallbackRequest>,
) -> Result<CallbackResponse> {
let (tx, mut rx) = oneshot::channel::<Result<CallbackResponse>>();
let req = CallbackRequest {
request: request_type,
response_channel: tx,
};
callback_channel
.try_send(req)
.map_err(|e| anyhow!("error sending request over callback channel: {e}"))?;

loop {
// Note: we cannot use `rx.blocking_recv`. The code would compile, but at runtime we would
// have a panic because this function is used inside of an async block. The `blocking_recv`
// method causes the tokio reactor to stop, which leads to a panic
match rx.try_recv() {
Ok(msg) => return msg,
Err(oneshot::error::TryRecvError::Empty) => {
// do nothing, keep waiting for a reply
}
Err(e) => {
return Err(anyhow!(
"error obtaining response from callback channel: {e}"
));
}
}
}
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use assert_json_diff::assert_json_eq;
use serde_json::json;
use std::path::Path;

pub fn dynamic_object_from_fixture(
resource_type: &str,
namespace: Option<&str>,
name: &str,
) -> Result<kube::core::DynamicObject> {
let path = Path::new("test_data/fixtures/kube_context")
.join(resource_type)
.join(namespace.unwrap_or_default())
.join(format!("{name}.json"));
let contents = std::fs::read(path.clone())
.map_err(|e| anyhow!("canont read fixture from path: {path:?}: {e}"))?;
serde_json::from_slice::<kube::core::DynamicObject>(&contents)
.map_err(|e| anyhow!("json conversion error: {e}"))
}

pub fn object_list_from_dynamic_objects(
objs: &[kube::core::DynamicObject],
) -> Result<ObjectList<kube::core::DynamicObject>> {
let raw_json = json!(
{
"items": objs,
"metadata": {
"resourceVersion": ""
}
}
);

let res: ObjectList<kube::core::DynamicObject> = serde_json::from_value(raw_json)
.map_err(|e| anyhow!("cannot create ObjectList because of json error: {e}"))?;
Ok(res)
}

#[tokio::test(flavor = "multi_thread")]
async fn get_all_resources_success() {
let (callback_tx, mut callback_rx) = mpsc::channel::<CallbackRequest>(10);
let resource = ContextAwareResource {
api_version: "v1".to_string(),
kind: "Service".to_string(),
};
let expected_resource = resource.clone();
let services = [
dynamic_object_from_fixture("services", Some("kube-system"), "kube-dns").unwrap(),
dynamic_object_from_fixture("services", Some("kube-system"), "metrics-server").unwrap(),
];
let services_list = object_list_from_dynamic_objects(&services).unwrap();

tokio::spawn(async move {
let req = match callback_rx.recv().await {
Some(r) => r,
None => return,
};
match req.request {
CallbackRequestType::KubernetesListResourceAll {
api_version,
kind,
label_selector,
field_selector,
} => {
assert_eq!(api_version, expected_resource.api_version);
assert_eq!(kind, expected_resource.kind);
assert!(label_selector.is_none());
assert!(field_selector.is_none());
}
_ => {
panic!("not the expected request type");
}
};

let services_list = object_list_from_dynamic_objects(&services).unwrap();
let callback_response = CallbackResponse {
payload: serde_json::to_vec(&services_list).unwrap(),
};

req.response_channel.send(Ok(callback_response)).unwrap();
});

let actual = get_all_resources_by_type(&callback_tx, &resource).unwrap();
let actual_json = serde_json::to_value(&actual).unwrap();
let expected_json = serde_json::to_value(&services_list).unwrap();
assert_json_eq!(actual_json, expected_json);
}

#[tokio::test(flavor = "multi_thread")]
async fn get_resource_plural_name_success() {
let (callback_tx, mut callback_rx) = mpsc::channel::<CallbackRequest>(10);
let resource = ContextAwareResource {
api_version: "v1".to_string(),
kind: "Service".to_string(),
};
let plural_name = "services";

let mut resources: HashSet<ContextAwareResource> = HashSet::new();
resources.insert(resource.clone());

let mut expected_names: HashMap<ContextAwareResource, String> = HashMap::new();
expected_names.insert(resource.clone(), plural_name.to_string());

let expected_resource = resource.clone();

tokio::spawn(async move {
let req = match callback_rx.recv().await {
Some(r) => r,
None => return,
};
match req.request {
CallbackRequestType::KubernetesGetResourcePluralName { api_version, kind } => {
assert_eq!(api_version, expected_resource.api_version);
assert_eq!(kind, expected_resource.kind);
}
_ => {
panic!("not the expected request type");
}
};

let callback_response = CallbackResponse {
payload: serde_json::to_vec(&plural_name).unwrap(),
};

req.response_channel.send(Ok(callback_response)).unwrap();
});

let actual = get_plural_names(&callback_tx, &resources).unwrap();
assert_eq!(actual, expected_names);
}
}
Loading

0 comments on commit 49d5726

Please sign in to comment.