-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: allow Rego policies to be context aware
Allow Rego policies, both OPA and Gatekeeper ones, to leverage Kubernetes data at evaluation time. This change doesn't break the API of policy-evaluator, nor requires any special action by the consumers of this crate. Signed-off-by: Flavio Castelli <fcastelli@suse.com>
- Loading branch information
Showing
8 changed files
with
909 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
use std::collections::{HashMap, HashSet}; | ||
|
||
use crate::{ | ||
callback_requests::{CallbackRequest, CallbackRequestType, CallbackResponse}, | ||
policy_metadata::ContextAwareResource, | ||
runtimes::rego::{gatekeeper_inventory::GatekeeperInventory, opa_inventory::OpaInventory}, | ||
}; | ||
use anyhow::{anyhow, Result}; | ||
use kube::api::ObjectList; | ||
use tokio::sync::{mpsc, oneshot}; | ||
|
||
#[derive(serde::Serialize)] | ||
pub(crate) struct EmptyContext(serde_json::Value); | ||
|
||
impl Default for EmptyContext { | ||
fn default() -> Self { | ||
EmptyContext(serde_json::Value::Null) | ||
} | ||
} | ||
|
||
#[derive(serde::Serialize)] | ||
pub(crate) enum KubernetesContext { | ||
Empty(EmptyContext), | ||
Opa(OpaInventory), | ||
Gatekeeper(GatekeeperInventory), | ||
} | ||
|
||
/// Uses the callback channel to get all the Kubernetes resources defined inside of | ||
/// the cluster whose type is mentioned inside of `allowed_resources`. | ||
/// | ||
/// The resources are returned based on the actual RBAC privileges of the client | ||
/// used by the runtime. | ||
pub(crate) fn get_allowed_resources( | ||
callback_channel: &mpsc::Sender<CallbackRequest>, | ||
allowed_resources: &HashSet<ContextAwareResource>, | ||
) -> Result<HashMap<ContextAwareResource, ObjectList<kube::core::DynamicObject>>> { | ||
let mut kube_resources: HashMap<ContextAwareResource, ObjectList<kube::core::DynamicObject>> = | ||
HashMap::new(); | ||
|
||
for resource in allowed_resources { | ||
let resource_list = get_all_resources_by_type(callback_channel, resource)?; | ||
kube_resources.insert(resource.to_owned(), resource_list); | ||
} | ||
|
||
Ok(kube_resources) | ||
} | ||
|
||
fn get_all_resources_by_type( | ||
callback_channel: &mpsc::Sender<CallbackRequest>, | ||
resource_type: &ContextAwareResource, | ||
) -> Result<ObjectList<kube::core::DynamicObject>> { | ||
let req_type = CallbackRequestType::KubernetesListResourceAll { | ||
api_version: resource_type.api_version.to_owned(), | ||
kind: resource_type.kind.to_owned(), | ||
label_selector: None, | ||
field_selector: None, | ||
}; | ||
|
||
let response = make_request_via_callback_channel(req_type, callback_channel)?; | ||
serde_json::from_slice::<ObjectList<kube::core::DynamicObject>>(&response.payload).map_err( | ||
|e| anyhow!("cannot convert callback response into a list of kubernetes objects: {e}"), | ||
) | ||
} | ||
|
||
/// Creates a map that has ContextAwareResource as key, and its plural name as value. | ||
/// For example, the key for {`apps/v1`, `Deployment`} will have `deployments` as value. | ||
/// The map is built by making request via the given callback channel. | ||
pub(crate) fn get_plural_names( | ||
callback_channel: &mpsc::Sender<CallbackRequest>, | ||
allowed_resources: &HashSet<ContextAwareResource>, | ||
) -> Result<HashMap<ContextAwareResource, String>> { | ||
let mut plural_names_by_resource: HashMap<ContextAwareResource, String> = HashMap::new(); | ||
|
||
for resource in allowed_resources { | ||
let req_type = CallbackRequestType::KubernetesGetResourcePluralName { | ||
api_version: resource.api_version.to_owned(), | ||
kind: resource.kind.to_owned(), | ||
}; | ||
|
||
let response = make_request_via_callback_channel(req_type, callback_channel)?; | ||
let plural_name = serde_json::from_slice::<String>(&response.payload).map_err(|e| { | ||
anyhow!("get plural name failure, cannot convert callback response: {e}") | ||
})?; | ||
|
||
plural_names_by_resource.insert(resource.to_owned(), plural_name); | ||
} | ||
|
||
Ok(plural_names_by_resource) | ||
} | ||
|
||
/// Internal helper function that sends a request over the callback channel and returns the | ||
/// response | ||
fn make_request_via_callback_channel( | ||
request_type: CallbackRequestType, | ||
callback_channel: &mpsc::Sender<CallbackRequest>, | ||
) -> Result<CallbackResponse> { | ||
let (tx, mut rx) = oneshot::channel::<Result<CallbackResponse>>(); | ||
let req = CallbackRequest { | ||
request: request_type, | ||
response_channel: tx, | ||
}; | ||
callback_channel | ||
.try_send(req) | ||
.map_err(|e| anyhow!("error sending request over callback channel: {e}"))?; | ||
|
||
loop { | ||
// Note: we cannot use `rx.blocking_recv`. The code would compile, but at runtime we would | ||
// have a panic because this function is used inside of an async block. The `blocking_recv` | ||
// method causes the tokio reactor to stop, which leads to a panic | ||
match rx.try_recv() { | ||
Ok(msg) => return msg, | ||
Err(oneshot::error::TryRecvError::Empty) => { | ||
// do nothing, keep waiting for a reply | ||
} | ||
Err(e) => { | ||
return Err(anyhow!( | ||
"error obtaining response from callback channel: {e}" | ||
)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) mod tests { | ||
use super::*; | ||
use assert_json_diff::assert_json_eq; | ||
use serde_json::json; | ||
use std::path::Path; | ||
|
||
pub fn dynamic_object_from_fixture( | ||
resource_type: &str, | ||
namespace: Option<&str>, | ||
name: &str, | ||
) -> Result<kube::core::DynamicObject> { | ||
let path = Path::new("test_data/fixtures/kube_context") | ||
.join(resource_type) | ||
.join(namespace.unwrap_or_default()) | ||
.join(format!("{name}.json")); | ||
let contents = std::fs::read(path.clone()) | ||
.map_err(|e| anyhow!("canont read fixture from path: {path:?}: {e}"))?; | ||
serde_json::from_slice::<kube::core::DynamicObject>(&contents) | ||
.map_err(|e| anyhow!("json conversion error: {e}")) | ||
} | ||
|
||
pub fn object_list_from_dynamic_objects( | ||
objs: &[kube::core::DynamicObject], | ||
) -> Result<ObjectList<kube::core::DynamicObject>> { | ||
let raw_json = json!( | ||
{ | ||
"items": objs, | ||
"metadata": { | ||
"resourceVersion": "" | ||
} | ||
} | ||
); | ||
|
||
let res: ObjectList<kube::core::DynamicObject> = serde_json::from_value(raw_json) | ||
.map_err(|e| anyhow!("cannot create ObjectList because of json error: {e}"))?; | ||
Ok(res) | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn get_all_resources_success() { | ||
let (callback_tx, mut callback_rx) = mpsc::channel::<CallbackRequest>(10); | ||
let resource = ContextAwareResource { | ||
api_version: "v1".to_string(), | ||
kind: "Service".to_string(), | ||
}; | ||
let expected_resource = resource.clone(); | ||
let services = [ | ||
dynamic_object_from_fixture("services", Some("kube-system"), "kube-dns").unwrap(), | ||
dynamic_object_from_fixture("services", Some("kube-system"), "metrics-server").unwrap(), | ||
]; | ||
let services_list = object_list_from_dynamic_objects(&services).unwrap(); | ||
|
||
tokio::spawn(async move { | ||
let req = match callback_rx.recv().await { | ||
Some(r) => r, | ||
None => return, | ||
}; | ||
match req.request { | ||
CallbackRequestType::KubernetesListResourceAll { | ||
api_version, | ||
kind, | ||
label_selector, | ||
field_selector, | ||
} => { | ||
assert_eq!(api_version, expected_resource.api_version); | ||
assert_eq!(kind, expected_resource.kind); | ||
assert!(label_selector.is_none()); | ||
assert!(field_selector.is_none()); | ||
} | ||
_ => { | ||
panic!("not the expected request type"); | ||
} | ||
}; | ||
|
||
let services_list = object_list_from_dynamic_objects(&services).unwrap(); | ||
let callback_response = CallbackResponse { | ||
payload: serde_json::to_vec(&services_list).unwrap(), | ||
}; | ||
|
||
req.response_channel.send(Ok(callback_response)).unwrap(); | ||
}); | ||
|
||
let actual = get_all_resources_by_type(&callback_tx, &resource).unwrap(); | ||
let actual_json = serde_json::to_value(&actual).unwrap(); | ||
let expected_json = serde_json::to_value(&services_list).unwrap(); | ||
assert_json_eq!(actual_json, expected_json); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn get_resource_plural_name_success() { | ||
let (callback_tx, mut callback_rx) = mpsc::channel::<CallbackRequest>(10); | ||
let resource = ContextAwareResource { | ||
api_version: "v1".to_string(), | ||
kind: "Service".to_string(), | ||
}; | ||
let plural_name = "services"; | ||
|
||
let mut resources: HashSet<ContextAwareResource> = HashSet::new(); | ||
resources.insert(resource.clone()); | ||
|
||
let mut expected_names: HashMap<ContextAwareResource, String> = HashMap::new(); | ||
expected_names.insert(resource.clone(), plural_name.to_string()); | ||
|
||
let expected_resource = resource.clone(); | ||
|
||
tokio::spawn(async move { | ||
let req = match callback_rx.recv().await { | ||
Some(r) => r, | ||
None => return, | ||
}; | ||
match req.request { | ||
CallbackRequestType::KubernetesGetResourcePluralName { api_version, kind } => { | ||
assert_eq!(api_version, expected_resource.api_version); | ||
assert_eq!(kind, expected_resource.kind); | ||
} | ||
_ => { | ||
panic!("not the expected request type"); | ||
} | ||
}; | ||
|
||
let callback_response = CallbackResponse { | ||
payload: serde_json::to_vec(&plural_name).unwrap(), | ||
}; | ||
|
||
req.response_channel.send(Ok(callback_response)).unwrap(); | ||
}); | ||
|
||
let actual = get_plural_names(&callback_tx, &resources).unwrap(); | ||
assert_eq!(actual, expected_names); | ||
} | ||
} |
Oops, something went wrong.