diff --git a/Cargo.toml b/Cargo.toml index 02634b6..74c24f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ kube = { version = "0.98.0", features = ["runtime", "derive"] } k8s-openapi = { version = "0.24.0", features = ["latest"] } base64 = "0.22.1" aes-gcm = "0.10.3" +sha2 = "0.10.8" [build-dependencies] cynic-codegen = { version = "3" } diff --git a/Dockerfile b/Dockerfile index 92e075d..025fb52 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.85.0-alpine AS builder +FROM rust:1.92-alpine AS builder WORKDIR /opt/xtm-composer COPY . . diff --git a/config/default.yaml b/config/default.yaml index 4b2633b..367b018 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -47,7 +47,7 @@ opencti: env_type: docker api_version: v1.41 -openbas: +openaev: enable: false url: http://host.docker.internal:4000 token: ChangeMe diff --git a/src/api/decrypt_value.rs b/src/api/decrypt_value.rs new file mode 100644 index 0000000..237dfc8 --- /dev/null +++ b/src/api/decrypt_value.rs @@ -0,0 +1,45 @@ +use base64::{engine::general_purpose, Engine as _}; +use aes_gcm::{ + aead::{Aead, KeyInit}, + Aes256Gcm, Nonce +}; +use rsa::{Oaep, Pkcs1v15Encrypt, RsaPrivateKey}; +use tracing::warn; +use sha2::Sha256; + +pub fn parse_aes_encrypted_value( + private_key: &RsaPrivateKey, + encrypted_value: String +) -> Result> { + let encrypted_bytes = general_purpose::STANDARD.decode(encrypted_value)?; + + let version = *encrypted_bytes.get(0) + .ok_or("Encrypted value is empty")?; + + let aes_key_iv_encrypted_bytes = &encrypted_bytes[1..=512]; + let aes_key_iv_decrypted_bytes = match version { + 1 => private_key.decrypt(Pkcs1v15Encrypt, aes_key_iv_encrypted_bytes)?, + 2 => private_key.decrypt(Oaep::new::(), aes_key_iv_encrypted_bytes)?, + _ => { + warn!(version, "Encryption version not handled"); + return Ok(String::new()); + } + }; + let aes_key_bytes = &aes_key_iv_decrypted_bytes[0..32]; + let aes_iv_bytes = &aes_key_iv_decrypted_bytes[32..44]; + let encrypted_value_bytes = &encrypted_bytes[513..]; + + let cipher = Aes256Gcm::new_from_slice(&aes_key_bytes)?; + let nonce = Nonce::from_slice(&aes_iv_bytes); + let plaintext_result = cipher.decrypt(&nonce, encrypted_value_bytes.as_ref()); + match plaintext_result { + Ok(plaintext) => { + let decoded_value = str::from_utf8(&plaintext)?.to_string(); + Ok(decoded_value) + }, + Err(e) => { + warn!(error = e.to_string(), "Fail to decode value"); + Ok(String::from("")) + } + } +} \ No newline at end of file diff --git a/src/api/mod.rs b/src/api/mod.rs index 20c46ec..d4952b3 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -7,8 +7,9 @@ use std::str::FromStr; use std::time::Duration; use tracing::info; -pub mod openbas; +pub mod openaev; pub mod opencti; +mod decrypt_value; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct ContractsManifest { @@ -99,11 +100,20 @@ impl ApiConnector { is_sensitive: config.is_sensitive, }) .collect::>(); - envs.push(EnvVariable { - key: "OPENCTI_URL".into(), - value: settings.opencti.url.clone(), - is_sensitive: false, - }); + if settings.opencti.enable { + envs.push(EnvVariable { + key: "OPENCTI_URL".into(), + value: settings.opencti.url.clone(), + is_sensitive: false, + }); + } + if settings.openaev.enable { + envs.push(EnvVariable { + key: "OPENAEV_URL".into(), + value: settings.openaev.url.clone(), + is_sensitive: false, + }); + } envs.push(EnvVariable { key: "OPENCTI_CONFIG_HASH".into(), value: self.contract_hash.clone(), @@ -171,7 +181,9 @@ pub trait ComposerApi { async fn patch_status(&self, id: String, status: ConnectorStatus) -> Option; - async fn patch_logs(&self, id: String, logs: Vec) -> Option; + async fn patch_logs(&self, id: String, logs: Vec) -> Option; + + async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option; - async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option; + async fn container_removed_successfully(&self, id: String) -> (); } diff --git a/src/api/openaev/api_handler.rs b/src/api/openaev/api_handler.rs new file mode 100644 index 0000000..f24317d --- /dev/null +++ b/src/api/openaev/api_handler.rs @@ -0,0 +1,64 @@ +use serde::de::DeserializeOwned; +use tracing::error; + +pub async fn handle_api_response( + response: Result, + operation_name: &str, +) -> Option +where + T: DeserializeOwned +{ + match response { + Ok(resp) if resp.status().is_success() => { + match resp.json::().await { + Ok(data) => Some(data), + Err(err) => { + error!("Failed to parse JSON for {}: {}", operation_name, err.to_string()); + None + } + } + } + Ok(resp) => { + error!( + status = resp.status().as_u16(), + "Failed to {}: non-success status code", operation_name + ); + None + } + Err(err) => { + error!( + error = err.to_string(), + "Failed to {}, check your configuration", operation_name + ); + None + } + } +} + +pub async fn handle_api_text_response( + response: Result, + operation_name: &str, +) -> Option { + match response { + Ok(resp) if resp.status().is_success() => { + resp.text().await.ok().or_else(|| { + error!("Failed to read response body for {}", operation_name); + None + }) + } + Ok(resp) => { + error!( + status = resp.status().as_u16(), + "Failed to {}: non-success status code", operation_name + ); + None + } + Err(err) => { + error!( + error = err.to_string(), + "Failed to {}, check your configuration", operation_name + ); + None + } + } +} diff --git a/src/api/openaev/connector/get_connector_instances.rs b/src/api/openaev/connector/get_connector_instances.rs new file mode 100644 index 0000000..4c59cd8 --- /dev/null +++ b/src/api/openaev/connector/get_connector_instances.rs @@ -0,0 +1,18 @@ +use crate::api::ApiConnector; +use crate::api::openaev::api_handler::handle_api_response; +use crate::api::openaev::connector::ConnectorInstances; + +pub async fn get_connector_instances(api: &crate::api::openaev::ApiOpenAEV) -> Option> { + let settings = crate::settings(); + let get_connectors = api.get(&format!("/xtm-composer/{}/connector-instances", settings.manager.id)) + .send() + .await; + + handle_api_response::>(get_connectors, "fetch connector instances") + .await.map(|connectors| { + connectors + .into_iter() + .map(|connector| connector.to_api_connector(&api.private_key)) + .collect() + }) +} \ No newline at end of file diff --git a/src/api/openaev/connector/mod.rs b/src/api/openaev/connector/mod.rs new file mode 100644 index 0000000..74ff8c0 --- /dev/null +++ b/src/api/openaev/connector/mod.rs @@ -0,0 +1,78 @@ +use rsa::{RsaPrivateKey}; +use serde::Deserialize; +use tracing::warn; +use crate::api::{ApiConnector, ApiContractConfig}; +use crate::api::decrypt_value::parse_aes_encrypted_value; + +pub mod get_connector_instances; +pub mod patch_health; +pub mod patch_status; +pub mod post_logs; +pub mod notify_container_removed; + +#[derive(Debug, Deserialize)] +pub struct ConnectorContractConfiguration { + pub configuration_key: String, + pub configuration_value: Option, + pub configuration_is_encrypted: bool, +} + +#[derive(Debug, Deserialize)] +pub struct ConnectorInstances { + pub connector_instance_id: String, + pub connector_instance_name: String, + pub connector_instance_hash: Option, + pub connector_image: Option, + pub connector_instance_current_status: Option, + pub connector_instance_requested_status: Option, + pub connector_instance_configurations: Option>, +} + +impl ConnectorInstances { + + pub fn to_api_connector(&self, private_key: &RsaPrivateKey )->ApiConnector { + let contract_configuration = self + .connector_instance_configurations + .as_ref() + .unwrap() + .into_iter() + .map(|c| { + let is_sensitive = c.configuration_is_encrypted; + if is_sensitive { + let encrypted_value = c.configuration_value.clone().unwrap_or_default(); + let decoded_value_result = parse_aes_encrypted_value(private_key, encrypted_value); + match decoded_value_result { + Ok(decoded_value) => ApiContractConfig { + key: c.configuration_key.clone(), + value: decoded_value, + is_sensitive: true, + }, + Err(e) => { + warn!(error = e.to_string(), "Fail to decode value"); + ApiContractConfig { + key: c.configuration_key.clone(), + value: String::new(), + is_sensitive: true, + } + } + } + } else { + ApiContractConfig { + key: c.configuration_key.clone(), + value: c.configuration_value.clone().unwrap_or_default(), + is_sensitive: false, + } + } + }) + .collect(); + ApiConnector { + id: self.connector_instance_id.clone(), + name: self.connector_instance_name.clone(), + image: self.connector_image.clone().unwrap(), + contract_hash: self.connector_instance_hash.clone().unwrap(), + current_status: self.connector_instance_current_status.clone(), + requested_status: self.connector_instance_requested_status.clone().unwrap(), + contract_configuration, + } + } +} \ No newline at end of file diff --git a/src/api/openaev/connector/notify_container_removed.rs b/src/api/openaev/connector/notify_container_removed.rs new file mode 100644 index 0000000..6423743 --- /dev/null +++ b/src/api/openaev/connector/notify_container_removed.rs @@ -0,0 +1,14 @@ +use crate::api::openaev::api_handler::{handle_api_text_response}; +use crate::api::openaev::ApiOpenAEV; + +pub async fn notify_container_removed(id: String, api: &ApiOpenAEV) { + let settings = crate::settings(); + let response = api.delete(&format!("/xtm-composer/{}/connector-instances/{}", settings.manager.id, id)) + .send() + .await; + + let _ = handle_api_text_response( + response, + "Notify OpenAEV that the container has been successfully removed" + ).await; +} \ No newline at end of file diff --git a/src/api/openaev/connector/patch_health.rs b/src/api/openaev/connector/patch_health.rs new file mode 100644 index 0000000..ad92ece --- /dev/null +++ b/src/api/openaev/connector/patch_health.rs @@ -0,0 +1,38 @@ +use serde::Serialize; +use crate::api::openaev::api_handler::handle_api_response; +use crate::api::openaev::ApiOpenAEV; +use crate::api::openaev::connector::ConnectorInstances; + +#[derive(Serialize)] +struct ConnectorInstanceHealthInput { + connector_instance_restart_count: u32, + connector_instance_started_at: String, + connector_instance_is_in_reboot_loop: bool +} + +pub async fn update_health( + id: String, + restart_count: u32, + started_at: String, + is_in_reboot_loop: bool, + api: &ApiOpenAEV, +)-> Option { + let settings = crate::settings(); + let health_check_input = ConnectorInstanceHealthInput { + connector_instance_restart_count: restart_count, + connector_instance_started_at: started_at, + connector_instance_is_in_reboot_loop: is_in_reboot_loop + }; + + let health_check_response = api.put(&format!("/xtm-composer/{}/connector-instances/{}/health-check", settings.manager.id, id)) + .json(&health_check_input) + .send() + .await; + + let _ = handle_api_response::( + health_check_response, + "push health metrics" + ).await; + + Some(id) +} \ No newline at end of file diff --git a/src/api/openaev/connector/patch_status.rs b/src/api/openaev/connector/patch_status.rs new file mode 100644 index 0000000..1680251 --- /dev/null +++ b/src/api/openaev/connector/patch_status.rs @@ -0,0 +1,32 @@ +use serde::Serialize; +use crate::api::{ApiConnector, ConnectorStatus}; +use crate::api::openaev::api_handler::handle_api_response; +use crate::api::openaev::ApiOpenAEV; +use crate::api::openaev::connector::ConnectorInstances; +use crate::api::opencti::connector::post_status::ConnectorCurrentStatus; + +#[derive(Serialize)] +struct UpdateConnectorInstanceStatusInput { + connector_instance_current_status: ConnectorCurrentStatus, +} + +pub async fn update_status(id: String, status: ConnectorStatus, api: &ApiOpenAEV) -> Option { + let update_status = match status { + ConnectorStatus::Started => ConnectorCurrentStatus::Started, + _ => ConnectorCurrentStatus::Stopped, + }; + + let status_input = UpdateConnectorInstanceStatusInput { + connector_instance_current_status: update_status + }; + + let settings = crate::settings(); + let update_status_response = api.put(&format!("/xtm-composer/{}/connector-instances/{}/status", settings.manager.id, id)) + .json(&status_input) + .send() + .await; + + handle_api_response::(update_status_response, "patch connector instance status") + .await + .map(|connector| connector.to_api_connector(&api.private_key)) +} \ No newline at end of file diff --git a/src/api/openaev/connector/post_logs.rs b/src/api/openaev/connector/post_logs.rs new file mode 100644 index 0000000..ada3b34 --- /dev/null +++ b/src/api/openaev/connector/post_logs.rs @@ -0,0 +1,28 @@ +use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::JSON; +use serde::Serialize; +use crate::api::openaev::api_handler::handle_api_response; +use crate::api::openaev::ApiOpenAEV; + +#[derive(Serialize)] +struct InstanceConnectorLogsInput { + connector_instance_logs: Vec, +} + +pub async fn add_logs(id: String, logs: Vec, api: &ApiOpenAEV)-> Option { + let logs_input = InstanceConnectorLogsInput{ + connector_instance_logs: logs + }; + let settings = crate::settings(); + let add_logs_response = api.post(&format!("/xtm-composer/{}/connector-instances/{}/logs",settings.manager.id, id)) + .json(&logs_input) + .send() + .await; + + // Discard the result + let _ = handle_api_response::( + add_logs_response, + "push logs for connector instance" + ).await; + + Some(id) +} \ No newline at end of file diff --git a/src/api/openaev/manager/get_version.rs b/src/api/openaev/manager/get_version.rs new file mode 100644 index 0000000..a51e9bb --- /dev/null +++ b/src/api/openaev/manager/get_version.rs @@ -0,0 +1,7 @@ +use crate::api::openaev::api_handler::{handle_api_text_response}; +use crate::api::openaev::ApiOpenAEV; + +pub async fn get_version(api: &ApiOpenAEV) -> Option { + let response = api.get("/settings/version").send().await; + handle_api_text_response(response, "fetch version").await +} \ No newline at end of file diff --git a/src/api/openaev/manager/mod.rs b/src/api/openaev/manager/mod.rs new file mode 100644 index 0000000..ff91b55 --- /dev/null +++ b/src/api/openaev/manager/mod.rs @@ -0,0 +1,11 @@ +pub mod get_version; +pub mod post_register; +pub mod ping_alive; + +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct ConnectorManager { + pub xtm_composer_id: String, + pub xtm_composer_version: String, +} \ No newline at end of file diff --git a/src/api/openaev/manager/ping_alive.rs b/src/api/openaev/manager/ping_alive.rs new file mode 100644 index 0000000..243dbf2 --- /dev/null +++ b/src/api/openaev/manager/ping_alive.rs @@ -0,0 +1,14 @@ +use crate::api::openaev::api_handler::{handle_api_response}; +use crate::api::openaev::ApiOpenAEV; +use crate::api::openaev::manager::ConnectorManager; + +pub async fn ping_alive(api: &ApiOpenAEV) -> Option { + let settings = crate::settings(); + let response = api.put(&format!("/xtm-composer/{}/refresh-connectivity", settings.manager.id)) + .send() + .await; + + handle_api_response::(response, "ping OpenAEV backend") + .await + .map(|manager| manager.xtm_composer_version) +} \ No newline at end of file diff --git a/src/api/openaev/manager/post_register.rs b/src/api/openaev/manager/post_register.rs new file mode 100644 index 0000000..4014013 --- /dev/null +++ b/src/api/openaev/manager/post_register.rs @@ -0,0 +1,41 @@ +use rsa::{RsaPublicKey}; +use rsa::pkcs1::LineEnding; +use rsa::pkcs8::EncodePublicKey; +use serde::Serialize; +use crate::api::openaev::api_handler::handle_api_response; +use crate::api::openaev::ApiOpenAEV; +use crate::api::openaev::manager::ConnectorManager; + +#[derive(Serialize)] +struct RegisterInput { + id: String, + name: String, + public_key: String, +} + +pub async fn register(api: &ApiOpenAEV) { + let settings = crate::settings(); + let priv_key = crate::private_key(); + let pub_key = RsaPublicKey::from(priv_key); + let public_key: String = pub_key + .to_public_key_pem(LineEnding::LF) + .expect("Failed to encode public key as PKCS#8"); + + let register_input = RegisterInput { + id: settings.manager.id.clone(), + name: settings.manager.name.clone(), + public_key, + }; + + let register_response = api.post("/xtm-composer/register") + .json(®ister_input) + .send() + .await; + + // Discard the result + let _ = handle_api_response::( + register_response, + "register into OpenAEV backend" + ).await; +} + diff --git a/src/api/openaev/mod.rs b/src/api/openaev/mod.rs new file mode 100644 index 0000000..a6ff316 --- /dev/null +++ b/src/api/openaev/mod.rs @@ -0,0 +1,136 @@ +mod connector; +mod manager; +mod api_handler; + +use crate::api::{ApiConnector, ComposerApi, ConnectorStatus}; +use crate::config::settings::Daemon; +use async_trait::async_trait; +use std::time::Duration; +use rsa::RsaPrivateKey; + +const BEARER: &str = "Bearer"; +const AUTHORIZATION_HEADER: &str = "Authorization"; + +pub struct ApiOpenAEV { + api_uri: String, + http_client: reqwest::Client, + bearer: String, + daemon: Daemon, + logs_schedule: u64, + private_key: RsaPrivateKey, + // TODO: Implement timeout configuration when OpenBAS API methods are implemented + // These fields are stored for future use when the todo!() macros are replaced with actual implementations + #[allow(dead_code)] + request_timeout: u64, + #[allow(dead_code)] + connect_timeout: u64, +} + +impl ApiOpenAEV { + pub fn new() -> Self { + let settings = crate::settings(); + let bearer = format!("{} {}", BEARER, settings.openaev.token); + let api_uri = format!("{}/api", &settings.openaev.url); + let daemon = settings.openaev.daemon.clone(); + let logs_schedule = settings.openaev.logs_schedule; + let request_timeout = settings.openaev.request_timeout; + let connect_timeout = settings.openaev.connect_timeout; + + let http_client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(connect_timeout)) + .timeout(Duration::from_secs(request_timeout)) + .build() + .unwrap(); // or handle the error appropriately + + let private_key = crate::private_key().clone(); + + Self { + api_uri, + http_client, + bearer, + daemon, + logs_schedule, + private_key, + request_timeout, + connect_timeout, + } + } + + pub fn post(&self, route: &str) -> reqwest::RequestBuilder { + let api_route = format!("{}{}", self.api_uri, route); + + self.http_client + .post(&api_route) + .header("Content-Type", "application/json") + .header(AUTHORIZATION_HEADER, self.bearer.as_str()) + } + + pub fn put(&self, route: &str) -> reqwest::RequestBuilder { + let api_route = format!("{}{}", self.api_uri, route); + + self.http_client + .put(&api_route) + .header("Content-Type", "application/json") + .header(AUTHORIZATION_HEADER, self.bearer.as_str()) + } + + pub fn get(&self, route: &str) -> reqwest::RequestBuilder { + let api_route = format!("{}{}", self.api_uri, route); + + self.http_client + .get(&api_route) + .header(AUTHORIZATION_HEADER, self.bearer.as_str()) + } + + pub fn delete(&self, route: &str) -> reqwest::RequestBuilder { + let api_route = format!("{}{}", self.api_uri, route); + + self.http_client + .delete(&api_route) + .header(AUTHORIZATION_HEADER, self.bearer.as_str()) + } + +} + +#[async_trait] +impl ComposerApi for ApiOpenAEV { + fn daemon(&self) -> &Daemon { + &self.daemon + } + + fn post_logs_schedule(&self) -> Duration { + Duration::from_secs(self.logs_schedule) + } + + async fn version(&self) -> Option { + manager::get_version::get_version(self).await + } + + async fn ping_alive(&self) -> Option { + manager::ping_alive::ping_alive(self).await + } + + async fn register(&self) { + manager::post_register::register(self).await + } + + async fn connectors(&self) -> Option> { + connector::get_connector_instances::get_connector_instances(self).await + } + + async fn patch_status(&self, id: String, status: ConnectorStatus) -> Option { + connector::patch_status::update_status(id, status, self).await + } + + async fn patch_logs(&self, id: String, logs: Vec) -> Option { + connector::post_logs::add_logs(id, logs, self).await + } + + async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool ) -> Option { + connector::patch_health::update_health(id, restart_count, started_at, is_in_reboot_loop, self).await + } + + async fn container_removed_successfully(&self, id: String) -> () { + connector::notify_container_removed::notify_container_removed(id, self).await + } +} diff --git a/src/api/openbas/mod.rs b/src/api/openbas/mod.rs deleted file mode 100644 index ed297d7..0000000 --- a/src/api/openbas/mod.rs +++ /dev/null @@ -1,87 +0,0 @@ -// TODO Remove macro after implementation -#![allow(unused_variables)] - -use crate::api::{ApiConnector, ComposerApi, ConnectorStatus}; -use crate::config::settings::Daemon; -use async_trait::async_trait; -use std::time::Duration; -use tracing::debug; - -const BEARER: &str = "Bearer"; - -pub struct ApiOpenBAS { - api_uri: String, - bearer: String, - daemon: Daemon, - logs_schedule: u64, - // TODO: Implement timeout configuration when OpenBAS API methods are implemented - // These fields are stored for future use when the todo!() macros are replaced with actual implementations - #[allow(dead_code)] - request_timeout: u64, - #[allow(dead_code)] - connect_timeout: u64, -} - -impl ApiOpenBAS { - pub fn new() -> Self { - let settings = crate::settings(); - let bearer = format!("{} {}", BEARER, settings.openbas.token); - let api_uri = format!("{}/api", &settings.openbas.url); - let daemon = settings.openbas.daemon.clone(); - let logs_schedule = settings.openbas.logs_schedule; - let request_timeout = settings.openbas.request_timeout; - let connect_timeout = settings.openbas.connect_timeout; - Self { - api_uri, - bearer, - daemon, - logs_schedule, - request_timeout, - connect_timeout, - } - } -} - -#[async_trait] -impl ComposerApi for ApiOpenBAS { - fn daemon(&self) -> &Daemon { - &self.daemon - } - - fn post_logs_schedule(&self) -> Duration { - Duration::from_secs(self.logs_schedule) - } - - async fn version(&self) -> Option { - todo!() - } - - async fn ping_alive(&self) -> Option { - todo!() - } - - async fn register(&self) { - debug!( - api_uri = self.api_uri, - bearer = self.bearer, - "OpenBAS register" - ); - todo!() - } - - async fn connectors(&self) -> Option> { - todo!() - } - - async fn patch_status(&self, id: String, status: ConnectorStatus) -> Option { - todo!() - } - - async fn patch_logs(&self, id: String, logs: Vec) -> Option { - todo!() - } - - async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option { - todo!() - } -} diff --git a/src/api/opencti/connector/mod.rs b/src/api/opencti/connector/mod.rs index cd7a3a3..df36c02 100644 --- a/src/api/opencti/connector/mod.rs +++ b/src/api/opencti/connector/mod.rs @@ -1,6 +1,6 @@ use serde::Serialize; use crate::api::{ApiConnector, ApiContractConfig}; -use rsa::{Pkcs1v15Encrypt, RsaPrivateKey}; +use rsa::{RsaPrivateKey}; use tracing::{warn}; use std::str; @@ -10,12 +10,8 @@ pub mod post_logs; pub mod post_health; use cynic; -use base64::{Engine as _, engine::general_purpose}; use crate::api::opencti::opencti as schema; -use aes_gcm::{ - aead::{Aead, KeyInit}, - Aes256Gcm, Nonce -}; +use crate::api::decrypt_value::parse_aes_encrypted_value; #[derive(cynic::QueryFragment, Debug, Clone, Serialize)] pub struct ConnectorContractConfiguration { @@ -42,36 +38,6 @@ pub struct ManagedConnector { impl ManagedConnector { - pub fn parse_aes_encrypted_value(&self, private_key: &RsaPrivateKey, encrypted_value: String) -> Result> { - let encrypted_bytes = general_purpose::STANDARD.decode(encrypted_value)?; - - let version = encrypted_bytes[0]; - if version != 1 { - warn!(version, "Encryption version not handled"); - Ok(String::from("")) - } else { - let aes_key_iv_encrypted_bytes = &encrypted_bytes[1..=512]; - let aes_key_iv_decrypted_bytes = private_key.decrypt(Pkcs1v15Encrypt, &aes_key_iv_encrypted_bytes)?; - let aes_key_bytes = &aes_key_iv_decrypted_bytes[0..32]; - let aes_iv_bytes = &aes_key_iv_decrypted_bytes[32..44]; - let encrypted_value_bytes = &encrypted_bytes[513..]; - - let cipher = Aes256Gcm::new_from_slice(&aes_key_bytes)?; - let nonce = Nonce::from_slice(&aes_iv_bytes); - let plaintext_result = cipher.decrypt(&nonce, encrypted_value_bytes.as_ref()); - match plaintext_result { - Ok(plaintext) => { - let decoded_value = str::from_utf8(&plaintext).unwrap().to_string(); - Ok(decoded_value) - }, - Err(e) => { - warn!(error = e.to_string(), "Fail to decode value"); - Ok(String::from("")) - } - } - } - } - pub fn to_api_connector(&self, private_key: &RsaPrivateKey) -> ApiConnector { let contract_configuration = self .manager_contract_configuration @@ -82,7 +48,7 @@ impl ManagedConnector { let is_sensitive = c.encrypted.unwrap_or_default(); if is_sensitive { let encrypted_value = c.value.unwrap_or_default(); - let decoded_value_result = self.parse_aes_encrypted_value(private_key, encrypted_value); + let decoded_value_result = parse_aes_encrypted_value(private_key, encrypted_value); match decoded_value_result { Ok(decoded_value) => ApiContractConfig { key: c.key, diff --git a/src/api/opencti/connector/post_health.rs b/src/api/opencti/connector/post_health.rs index c3ce5b9..f8cbbe9 100644 --- a/src/api/opencti/connector/post_health.rs +++ b/src/api/opencti/connector/post_health.rs @@ -34,7 +34,7 @@ pub async fn health( started_at: String, is_in_reboot_loop: bool, api: &ApiOpenCTI, -) -> Option { +) -> Option { use cynic::MutationBuilder; let vars = UpdateConnectorHealthVariables { @@ -53,7 +53,7 @@ pub async fn health( response, "update_connector_health", "OpenCTI backend does not support XTM composer health updates. The connector will continue to run but health metrics won't be sent to OpenCTI." - ).map(|data| data.update_connector_health) + ).map(|data| data.update_connector_health.inner().to_string()) } Err(e) => { error!(error = e.to_string(), "Fail to push health metrics"); diff --git a/src/api/opencti/connector/post_logs.rs b/src/api/opencti/connector/post_logs.rs index 51a193c..4d3540d 100644 --- a/src/api/opencti/connector/post_logs.rs +++ b/src/api/opencti/connector/post_logs.rs @@ -25,7 +25,7 @@ pub struct LogsConnectorStatusInput<'a> { } // endregion -pub async fn logs(id: String, logs: Vec, api: &ApiOpenCTI) -> Option { +pub async fn logs(id: String, logs: Vec, api: &ApiOpenCTI) -> Option { use cynic::MutationBuilder; let str_logs = logs.iter().map(|c| c.as_str()).collect(); let vars = ReportConnectorLogsVariables { @@ -42,7 +42,7 @@ pub async fn logs(id: String, logs: Vec, api: &ApiOpenCTI) -> Option { error!(error = e.to_string(), "Fail to push logs"); diff --git a/src/api/opencti/mod.rs b/src/api/opencti/mod.rs index 520f98d..07911ca 100644 --- a/src/api/opencti/mod.rs +++ b/src/api/opencti/mod.rs @@ -101,11 +101,15 @@ impl ComposerApi for ApiOpenCTI { connector::post_status::status(id, status, self).await } - async fn patch_logs(&self, id: String, logs: Vec) -> Option { + async fn patch_logs(&self, id: String, logs: Vec) -> Option { connector::post_logs::logs(id, logs, self).await } - async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option { + async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option { connector::post_health::health(id, restart_count, started_at, is_in_reboot_loop, self).await } + + async fn container_removed_successfully(&self, id: String) -> () { + // Do nothing + } } diff --git a/src/config/settings.rs b/src/config/settings.rs index 65f8191..687fd43 100644 --- a/src/config/settings.rs +++ b/src/config/settings.rs @@ -66,7 +66,7 @@ pub struct OpenCTI { #[derive(Debug, Deserialize, Clone)] #[allow(unused)] -pub struct OpenBAS { +pub struct OpenAEV { pub enable: bool, pub url: String, pub token: String, @@ -124,7 +124,7 @@ pub struct Docker { pub struct Settings { pub manager: Manager, pub opencti: OpenCTI, - pub openbas: OpenBAS, + pub openaev: OpenAEV, } impl Settings { diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 5a563c9..0f39049 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -1,4 +1,4 @@ -pub mod openbas; +pub mod openaev; pub mod opencti; use crate::api::ComposerApi; diff --git a/src/engine/openbas.rs b/src/engine/openaev.rs similarity index 58% rename from src/engine/openbas.rs rename to src/engine/openaev.rs index a7a4a9c..4eadda6 100644 --- a/src/engine/openbas.rs +++ b/src/engine/openaev.rs @@ -1,21 +1,21 @@ use tokio::task::JoinHandle; use tracing::info; use crate::api::ComposerApi; -use crate::api::openbas::ApiOpenBAS; +use crate::api::openaev::ApiOpenAEV; use crate::engine::{alive, orchestration}; -pub fn openbas_orchestration() -> JoinHandle<()> { - info!("Starting OpenBAS connectors orchestration"); +pub fn openaev_orchestration() -> JoinHandle<()> { + info!("Starting OpenAEV connectors orchestration"); tokio::spawn(async move { - let api: Box = Box::new(ApiOpenBAS::new()); + let api: Box = Box::new(ApiOpenAEV::new()); orchestration(api).await; }) } -pub fn openbas_alive() -> JoinHandle<()> { - info!("Starting OpenBAS Composer ping alive"); +pub fn openaev_alive() -> JoinHandle<()> { + info!("Starting OpenAEV Composer ping alive"); tokio::spawn(async move { - let api: Box = Box::new(ApiOpenBAS::new()); + let api: Box = Box::new(ApiOpenAEV::new()); alive(api).await; }) } diff --git a/src/main.rs b/src/main.rs index d280caf..56f5e4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod orchestrator; mod system; use crate::config::settings::Settings; -use crate::engine::openbas::{openbas_alive, openbas_orchestration}; +use crate::engine::openaev::{openaev_alive, openaev_orchestration}; use crate::engine::opencti::{opencti_alive, opencti_orchestration}; use futures::future::join_all; use rolling_file::{BasicRollingFileAppender, RollingConditionBasic}; @@ -153,16 +153,16 @@ fn opencti_orchestrate(orchestrations: &mut Vec>) { } } -// Init openbas -fn openbas_orchestrate(orchestrations: &mut Vec>) { +// Init openaev +fn openaev_orchestrate(orchestrations: &mut Vec>) { let setting = settings(); - if setting.openbas.enable { - let openbas_alive = openbas_alive(); - orchestrations.push(openbas_alive); - let openbas_orchestration = openbas_orchestration(); - orchestrations.push(openbas_orchestration); + if setting.openaev.enable { + let openaev_alive = openaev_alive(); + orchestrations.push(openaev_alive); + let openaev_orchestration = openaev_orchestration(); + orchestrations.push(openaev_orchestration); } else { - info!("OpenBAS connectors orchestration disabled"); + info!("OpenAEV connectors orchestration disabled"); } } @@ -176,7 +176,7 @@ async fn main() { // Start orchestration threads let mut orchestrations = Vec::new(); opencti_orchestrate(&mut orchestrations); - openbas_orchestrate(&mut orchestrations); + openaev_orchestrate(&mut orchestrations); // Wait for threads to terminate join_all(orchestrations).await; } diff --git a/src/orchestrator/composer.rs b/src/orchestrator/composer.rs index f39708b..d807ed2 100644 --- a/src/orchestrator/composer.rs +++ b/src/orchestrator/composer.rs @@ -165,7 +165,9 @@ pub async fn orchestrate( for container in existing_containers { let connector_id = container.extract_opencti_id(); if !connectors_by_id.contains_key(&connector_id) { - orchestrator.remove(&container).await; + if orchestrator.remove(&container).await { + api.container_removed_successfully(connector_id).await; + } } } } diff --git a/src/orchestrator/docker/docker.rs b/src/orchestrator/docker/docker.rs index f0ba0a5..9c1c910 100644 --- a/src/orchestrator/docker/docker.rs +++ b/src/orchestrator/docker/docker.rs @@ -127,7 +127,7 @@ impl Orchestrator for DockerOrchestrator { .await; } - async fn remove(&self, container: &OrchestratorContainer) -> () { + async fn remove(&self, container: &OrchestratorContainer) -> bool { let container_name = container.name.as_str(); let remove_response = self .docker @@ -143,6 +143,7 @@ impl Orchestrator for DockerOrchestrator { match remove_response { Ok(_) => { info!(name = container_name, "Removed container"); + true } Err(err) => { error!( @@ -150,6 +151,7 @@ impl Orchestrator for DockerOrchestrator { error = err.to_string(), "Could not remove container" ); + false } } } diff --git a/src/orchestrator/kubernetes/kubernetes.rs b/src/orchestrator/kubernetes/kubernetes.rs index 1f0d27d..48ead4b 100644 --- a/src/orchestrator/kubernetes/kubernetes.rs +++ b/src/orchestrator/kubernetes/kubernetes.rs @@ -252,7 +252,7 @@ impl Orchestrator for KubeOrchestrator { self.set_deployment_scale(connector, 0).await; } - async fn remove(&self, container: &OrchestratorContainer) -> () { + async fn remove(&self, container: &OrchestratorContainer) -> bool { let lp = &ListParams::default().labels(&format!( "opencti-connector-id={}", container.extract_opencti_id() @@ -260,11 +260,14 @@ impl Orchestrator for KubeOrchestrator { let dp = &DeleteParams::default(); let delete_response = self.deployments.delete_collection(dp, lp).await; match delete_response { - Ok(_) => info!( - id = container.extract_opencti_id(), - "Deployment successfully deleted" - ), - Err(err) => error!(error = err.to_string(), "Fail removing the deployments"), + Ok(_) => { + info!(id = container.extract_opencti_id(),"Deployment successfully deleted"); + true + } + Err(err) => { + error!(error = err.to_string(), "Fail removing the deployments"); + false + }, } } diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs index 081db34..41928b1 100644 --- a/src/orchestrator/mod.rs +++ b/src/orchestrator/mod.rs @@ -65,7 +65,7 @@ pub trait Orchestrator { async fn stop(&self, container: &OrchestratorContainer, connector: &ApiConnector) -> (); - async fn remove(&self, container: &OrchestratorContainer) -> (); + async fn remove(&self, container: &OrchestratorContainer) -> bool; async fn refresh(&self, connector: &ApiConnector) -> Option; diff --git a/src/orchestrator/portainer/docker/portainer.rs b/src/orchestrator/portainer/docker/portainer.rs index b3c1eea..443e288 100644 --- a/src/orchestrator/portainer/docker/portainer.rs +++ b/src/orchestrator/portainer/docker/portainer.rs @@ -146,7 +146,7 @@ impl Orchestrator for PortainerDockerOrchestrator { self.client.post(start_container_uri).send().await.unwrap(); } - async fn remove(&self, container: &OrchestratorContainer) -> () { + async fn remove(&self, container: &OrchestratorContainer) -> bool { let container_name = container.name.as_str(); let delete_container_uri = format!("{}/{}?v=0&force=true", self.container_uri, container.id); @@ -154,6 +154,7 @@ impl Orchestrator for PortainerDockerOrchestrator { match remove_response { Ok(_) => { info!(name = container_name, "Removed container"); + true } Err(err) => { error!( @@ -161,6 +162,7 @@ impl Orchestrator for PortainerDockerOrchestrator { error = err.to_string(), "Could not remove container" ); + false } } }