From ad04e216b0b57a44b0cb1b64f4af26696a43c238 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Wed, 31 Dec 2025 14:48:02 +0100 Subject: [PATCH 1/2] feat: Add Prometheus metrics for connector lifecycle events and global refactor. + Remove lazy static for lazyLock + RSA Key extraction to specific mod + Improve code quality --- Cargo.toml | 2 + config/default.yaml | 4 +- src/api/mod.rs | 5 +- src/api/openbas/mod.rs | 10 +- src/api/opencti/manager/post_ping.rs | 6 +- src/api/opencti/manager/post_register.rs | 13 +- src/api/opencti/mod.rs | 18 +- src/config/mod.rs | 1 + src/config/rsa.rs | 58 +++++ src/config/settings.rs | 12 + src/engine/mod.rs | 6 +- src/engine/openbas.rs | 5 +- src/engine/opencti.rs | 6 +- src/main.rs | 90 ++------ src/orchestrator/composer.rs | 35 +-- src/orchestrator/docker/docker.rs | 215 +++++++----------- src/orchestrator/kubernetes/kubernetes.rs | 4 +- src/orchestrator/mod.rs | 2 +- .../portainer/docker/portainer.rs | 2 +- src/prometheus/mod.rs | 87 +++++++ 20 files changed, 340 insertions(+), 241 deletions(-) create mode 100644 src/config/rsa.rs create mode 100644 src/prometheus/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 3c9e07e..4cbbcf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ kube = { version = "0.98.0", features = ["runtime", "derive"] } k8s-openapi = { version = "0.24.0", features = ["latest"] } base64 = "0.22.1" aes-gcm = "0.10.3" +axum = "0.8.8" +prometheus = "0.14.0" [build-dependencies] cynic-codegen = { version = "3" } diff --git a/config/default.yaml b/config/default.yaml index 14d954a..4975833 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -15,7 +15,9 @@ manager: # credentials_key_filepath: /path/to/private_key.pem # Note: If both are set, filepath takes priority with a warning - + prometheus: + enable: false + port: 14270 logger: level: info format: json diff --git a/src/api/mod.rs b/src/api/mod.rs index 90169a1..1bb41ae 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -82,7 +82,7 @@ impl ApiConnector { } pub fn container_envs(&self) -> Vec { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let mut envs = self .contract_configuration .iter() @@ -107,8 +107,7 @@ impl ApiConnector { /// Display environment variables with sensitive values masked (if configured) pub fn display_env_variables(&self) { - let settings = crate::settings(); - + let settings = &crate::config::settings::SETTINGS; // Check if display is enabled in configuration let should_display = settings .manager diff --git a/src/api/openbas/mod.rs b/src/api/openbas/mod.rs index ed297d7..b5c0c5f 100644 --- a/src/api/openbas/mod.rs +++ b/src/api/openbas/mod.rs @@ -24,7 +24,7 @@ pub struct ApiOpenBAS { impl ApiOpenBAS { pub fn new() -> Self { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let bearer = format!("{} {}", BEARER, settings.openbas.token); let api_uri = format!("{}/api", &settings.openbas.url); let daemon = settings.openbas.daemon.clone(); @@ -81,7 +81,13 @@ impl ComposerApi for ApiOpenBAS { todo!() } - async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option { + async fn patch_health( + &self, + id: String, + restart_count: u32, + started_at: String, + is_in_reboot_loop: bool, + ) -> Option { todo!() } } diff --git a/src/api/opencti/manager/post_ping.rs b/src/api/opencti/manager/post_ping.rs index 4d72ca9..7bf00f4 100644 --- a/src/api/opencti/manager/post_ping.rs +++ b/src/api/opencti/manager/post_ping.rs @@ -1,7 +1,7 @@ use crate::api::opencti::ApiOpenCTI; +use crate::api::opencti::error_handler::{extract_optional_field, handle_graphql_response}; use crate::api::opencti::manager::ConnectorManager; -use crate::api::opencti::error_handler::{handle_graphql_response, extract_optional_field}; -use crate::settings; + use tracing::error; use crate::api::opencti::opencti as schema; @@ -32,7 +32,7 @@ pub struct UpdateConnectorManagerStatusInput<'a> { pub async fn ping(api: &ApiOpenCTI) -> Option { use cynic::MutationBuilder; - let settings = settings(); + let settings = &crate::config::settings::SETTINGS; let vars = UpdateConnectorManagerStatusVariables { input: UpdateConnectorManagerStatusInput { id: &cynic::Id::new(&settings.manager.id), diff --git a/src/api/opencti/manager/post_register.rs b/src/api/opencti/manager/post_register.rs index 3a05deb..ed713ab 100644 --- a/src/api/opencti/manager/post_register.rs +++ b/src/api/opencti/manager/post_register.rs @@ -1,10 +1,10 @@ use crate::api::opencti::ApiOpenCTI; +use crate::api::opencti::error_handler::{extract_optional_field, handle_graphql_response}; use crate::api::opencti::manager::ConnectorManager; -use crate::api::opencti::error_handler::{handle_graphql_response, extract_optional_field}; use crate::api::opencti::opencti as schema; use cynic; -use tracing::{error, info}; use rsa::{RsaPublicKey, pkcs1::EncodeRsaPublicKey}; +use tracing::{error, info}; // region schema #[derive(cynic::QueryVariables, Debug)] @@ -34,9 +34,8 @@ pub struct RegisterConnectorsManagerInput<'a> { pub async fn register(api: &ApiOpenCTI) { use cynic::MutationBuilder; - let settings = crate::settings(); - // Use the singleton private key - let priv_key = crate::private_key(); + let settings = &crate::config::settings::SETTINGS; + let priv_key = &*crate::config::rsa::PRIVATE_KEY; let pub_key = RsaPublicKey::from(priv_key); let public_key = RsaPublicKey::to_pkcs1_pem(&pub_key, Default::default()).unwrap(); @@ -54,12 +53,12 @@ pub async fn register(api: &ApiOpenCTI) { if let Some(data) = handle_graphql_response( response, "register_connectors_manager", - "OpenCTI backend does not support XTM composer manager registration. The composer will continue to run but won't be registered in OpenCTI." + "OpenCTI backend does not support XTM composer manager registration. The composer will continue to run but won't be registered in OpenCTI.", ) { if let Some(manager) = extract_optional_field( data.register_connectors_manager, "register_connectors_manager", - "register_connectors_manager" + "register_connectors_manager", ) { info!(manager_id = manager.id.into_inner(), "Manager registered"); } diff --git a/src/api/opencti/mod.rs b/src/api/opencti/mod.rs index 520f98d..26d762d 100644 --- a/src/api/opencti/mod.rs +++ b/src/api/opencti/mod.rs @@ -3,14 +3,14 @@ use crate::config::settings::Daemon; use async_trait::async_trait; use cynic::Operation; use cynic::http::CynicReqwestError; +use rsa::RsaPrivateKey; use serde::Serialize; use serde::de::DeserializeOwned; use std::time::Duration; -use rsa::RsaPrivateKey; pub mod connector; -pub mod manager; pub mod error_handler; +pub mod manager; const BEARER: &str = "Bearer"; const AUTHORIZATION_HEADER: &str = "Authorization"; @@ -30,7 +30,7 @@ pub struct ApiOpenCTI { impl ApiOpenCTI { pub fn new() -> Self { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let bearer = format!("{} {}", BEARER, settings.opencti.token); let api_uri = format!("{}/graphql", &settings.opencti.url); let daemon = settings.opencti.daemon.clone(); @@ -38,7 +38,7 @@ impl ApiOpenCTI { let request_timeout = settings.opencti.request_timeout; let connect_timeout = settings.opencti.connect_timeout; // Use the singleton private key - let private_key = crate::private_key().clone(); + let private_key = crate::config::rsa::PRIVATE_KEY.clone(); Self { api_uri, bearer, @@ -46,7 +46,7 @@ impl ApiOpenCTI { logs_schedule, request_timeout, connect_timeout, - private_key + private_key, } } @@ -105,7 +105,13 @@ impl ComposerApi for ApiOpenCTI { connector::post_logs::logs(id, logs, self).await } - async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option { + async fn patch_health( + &self, + id: String, + restart_count: u32, + started_at: String, + is_in_reboot_loop: bool, + ) -> Option { connector::post_health::health(id, restart_count, started_at, is_in_reboot_loop, self).await } } diff --git a/src/config/mod.rs b/src/config/mod.rs index 6e98cef..980f6d5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1 +1,2 @@ +pub mod rsa; pub mod settings; diff --git a/src/config/rsa.rs b/src/config/rsa.rs new file mode 100644 index 0000000..23f6473 --- /dev/null +++ b/src/config/rsa.rs @@ -0,0 +1,58 @@ +use rsa::{RsaPrivateKey, pkcs8::DecodePrivateKey}; +use std::fs; +use std::sync::LazyLock; +use tracing::{info, warn}; + +// Singleton RSA private key for all application +pub static PRIVATE_KEY: LazyLock = + LazyLock::new(|| load_and_verify_credentials_key()); + +// Load and verify RSA private key from configuration +fn load_and_verify_credentials_key() -> RsaPrivateKey { + let setting = &crate::config::settings::SETTINGS; + + // Priority: file > environment variable + let key_content = if let Some(filepath) = &setting.manager.credentials_key_filepath { + // Warning if both are set + if setting.manager.credentials_key.is_some() { + warn!( + "Both credentials_key and credentials_key_filepath are set. Using filepath (priority)." + ); + } + + // Read key from file + match fs::read_to_string(filepath) { + Ok(content) => content, + Err(e) => panic!("Failed to read credentials key file '{}': {}", filepath, e), + } + } else if let Some(key) = &setting.manager.credentials_key { + // Use environment variable or config value + key.clone() + } else { + panic!( + "No credentials key provided! Set either 'manager.credentials_key' or 'manager.credentials_key_filepath' in configuration." + ); + }; + + // Validate key format (trim to handle trailing whitespace) + // Check for presence of RSA PRIVATE KEY markers for PKCS#8 format + let trimmed_content = key_content.trim(); + if !trimmed_content.contains("BEGIN PRIVATE KEY") + || !trimmed_content.contains("END PRIVATE KEY") + { + panic!( + "Invalid private key format. Expected PKCS#8 PEM format with 'BEGIN PRIVATE KEY' and 'END PRIVATE KEY' markers." + ); + } + + // Parse and validate RSA private key using PKCS#8 format + match RsaPrivateKey::from_pkcs8_pem(&key_content) { + Ok(key) => { + info!("Successfully loaded RSA private key (PKCS#8 format)"); + key + } + Err(e) => { + panic!("Failed to decode RSA private key: {}", e); + } + } +} diff --git a/src/config/settings.rs b/src/config/settings.rs index cb2b205..7b51db5 100644 --- a/src/config/settings.rs +++ b/src/config/settings.rs @@ -3,9 +3,13 @@ use k8s_openapi::api::apps::v1::Deployment; use k8s_openapi::api::core::v1::ResourceRequirements; use serde::Deserialize; use std::env; +use std::sync::LazyLock; const ENV_PRODUCTION: &str = "production"; +// Singleton settings for all application +pub static SETTINGS: LazyLock = LazyLock::new(|| Settings::new().unwrap()); + #[derive(Debug, Deserialize, Clone)] #[allow(unused)] pub struct Logger { @@ -40,6 +44,7 @@ pub struct Manager { pub credentials_key: Option, pub credentials_key_filepath: Option, pub debug: Option, + pub prometheus: Option, } #[derive(Debug, Deserialize, Clone)] @@ -131,6 +136,13 @@ pub struct Docker { pub ulimits: Option>>, } +#[derive(Debug, Deserialize, Clone)] +#[allow(unused)] +pub struct Prometheus { + pub enable: bool, + pub port: u16, +} + #[derive(Debug, Deserialize, Clone)] #[allow(unused)] pub struct Settings { diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 5a563c9..e9f7a9b 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -6,14 +6,14 @@ use crate::orchestrator::docker::DockerOrchestrator; use crate::orchestrator::kubernetes::KubeOrchestrator; use crate::orchestrator::portainer::docker::PortainerDockerOrchestrator; use crate::orchestrator::{Orchestrator, composer}; -use crate::settings; + use crate::system::signals; use std::time::{Duration, Instant}; use tokio::task::JoinHandle; use tokio::time::interval; async fn orchestration(api: Box) { - let settings = settings(); + let settings = &crate::config::settings::SETTINGS; // Get current deployment in target orchestrator let daemon_configuration = api.daemon(); let orchestrator: Box = @@ -51,7 +51,7 @@ async fn orchestration(api: Box) { } pub async fn alive(api: Box) -> JoinHandle<()> { - let settings = settings(); + let settings = &crate::config::settings::SETTINGS; let mut interval = interval(Duration::from_secs(settings.manager.ping_alive_schedule)); tokio::spawn(async move { // Start scheduling diff --git a/src/engine/openbas.rs b/src/engine/openbas.rs index a7a4a9c..36bac1d 100644 --- a/src/engine/openbas.rs +++ b/src/engine/openbas.rs @@ -1,8 +1,8 @@ -use tokio::task::JoinHandle; -use tracing::info; use crate::api::ComposerApi; use crate::api::openbas::ApiOpenBAS; use crate::engine::{alive, orchestration}; +use tokio::task::JoinHandle; +use tracing::info; pub fn openbas_orchestration() -> JoinHandle<()> { info!("Starting OpenBAS connectors orchestration"); @@ -19,4 +19,3 @@ pub fn openbas_alive() -> JoinHandle<()> { alive(api).await; }) } - diff --git a/src/engine/opencti.rs b/src/engine/opencti.rs index 4f96b1c..3ba10bd 100644 --- a/src/engine/opencti.rs +++ b/src/engine/opencti.rs @@ -1,8 +1,8 @@ -use tokio::task::JoinHandle; -use tracing::info; use crate::api::ComposerApi; use crate::api::opencti::ApiOpenCTI; use crate::engine::{alive, orchestration}; +use tokio::task::JoinHandle; +use tracing::info; pub fn opencti_alive() -> JoinHandle<()> { info!("Starting OpenCTI Composer ping alive"); @@ -18,4 +18,4 @@ pub fn opencti_orchestration() -> JoinHandle<()> { let api: Box = Box::new(ApiOpenCTI::new()); orchestration(api).await; }) -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index d280caf..6de6e5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod api; mod config; mod engine; mod orchestrator; +mod prometheus; mod system; use crate::config::settings::Settings; @@ -10,15 +11,13 @@ use crate::engine::opencti::{opencti_alive, opencti_orchestration}; use futures::future::join_all; use rolling_file::{BasicRollingFileAppender, RollingConditionBasic}; use std::str::FromStr; -use std::sync::OnceLock; use std::{env, fs}; use tokio::task::JoinHandle; -use tracing::{Level, info, warn}; +use tracing::{Level, info}; use tracing_subscriber::fmt::Layer; use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{Registry, layer::SubscriberExt}; -use rsa::{RsaPrivateKey, pkcs8::DecodePrivateKey}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -26,21 +25,9 @@ const BASE_DIRECTORY_LOG: &str = "logs"; const BASE_DIRECTORY_SIZE: usize = 5; const PREFIX_LOG_NAME: &str = "xtm-composer.log"; -// Singleton settings for all application -fn settings() -> &'static Settings { - static CONFIG: OnceLock = OnceLock::new(); - CONFIG.get_or_init(|| Settings::new().unwrap()) -} - -// Singleton RSA private key for all application -pub fn private_key() -> &'static RsaPrivateKey { - static KEY: OnceLock = OnceLock::new(); - KEY.get_or_init(|| load_and_verify_credentials_key()) -} - // Global init logger fn init_logger() { - let setting = Settings::new().unwrap(); + let setting = &crate::config::settings::SETTINGS; let logger_config = &setting.manager.logger; // Validate log level @@ -49,7 +36,7 @@ fn init_logger() { Err(_) => panic!( "Invalid log level: '{}'. Valid values are: trace, debug, info, warn, error", logger_config.level - ) + ), }; // Validate log format @@ -95,55 +82,12 @@ fn init_logger() { } } -// Load and verify RSA private key from configuration -pub fn load_and_verify_credentials_key() -> RsaPrivateKey { - let setting = settings(); - - // Priority: file > environment variable - let key_content = if let Some(filepath) = &setting.manager.credentials_key_filepath { - // Warning if both are set - if setting.manager.credentials_key.is_some() { - warn!("Both credentials_key and credentials_key_filepath are set. Using filepath (priority)."); - } - - // Read key from file - match fs::read_to_string(filepath) { - Ok(content) => content, - Err(e) => panic!("Failed to read credentials key file '{}': {}", filepath, e) - } - } else if let Some(key) = &setting.manager.credentials_key { - // Use environment variable or config value - key.clone() - } else { - panic!( - "No credentials key provided! Set either 'manager.credentials_key' or 'manager.credentials_key_filepath' in configuration." - ); - }; - - // Validate key format (trim to handle trailing whitespace) - // Check for presence of RSA PRIVATE KEY markers for PKCS#8 format - let trimmed_content = key_content.trim(); - if !trimmed_content.contains("BEGIN PRIVATE KEY") || !trimmed_content.contains("END PRIVATE KEY") { - panic!("Invalid private key format. Expected PKCS#8 PEM format with 'BEGIN PRIVATE KEY' and 'END PRIVATE KEY' markers."); - } - - // Parse and validate RSA private key using PKCS#8 format - match RsaPrivateKey::from_pkcs8_pem(&key_content) { - Ok(key) => { - info!("Successfully loaded RSA private key (PKCS#8 format)"); - key - }, - Err(e) => { - panic!("Failed to decode RSA private key: {}", e); - } - } -} - +// Init opencti fn opencti_orchestrate(orchestrations: &mut Vec>) { - let setting = settings(); + let setting = &crate::config::settings::SETTINGS; if setting.opencti.enable { // Initialize private key singleton - let _ = private_key(); + let _ = &crate::config::rsa::PRIVATE_KEY; let opencti_alive = opencti_alive(); orchestrations.push(opencti_alive); let opencti_orchestration = opencti_orchestration(); @@ -155,7 +99,7 @@ fn opencti_orchestrate(orchestrations: &mut Vec>) { // Init openbas fn openbas_orchestrate(orchestrations: &mut Vec>) { - let setting = settings(); + let setting = &crate::config::settings::SETTINGS; if setting.openbas.enable { let openbas_alive = openbas_alive(); orchestrations.push(openbas_alive); @@ -166,6 +110,21 @@ fn openbas_orchestrate(orchestrations: &mut Vec>) { } } +// Init prometheus metrics server +fn prometheus_orchestrate(orchestrations: &mut Vec>) { + let setting = &crate::config::settings::SETTINGS; + if let Some(prometheus_config) = &setting.manager.prometheus { + if prometheus_config.enable { + let port = prometheus_config.port; + let handle = tokio::spawn(async move { + crate::prometheus::start_metrics_server(port).await; + }); + orchestrations.push(handle); + } + } +} + +// Main function #[tokio::main] async fn main() { // Initialize the global logging system @@ -173,8 +132,9 @@ async fn main() { // Log the start let env = Settings::mode(); info!(version = VERSION, env, "Starting XTM composer"); - // Start orchestration threads + // Start threads let mut orchestrations = Vec::new(); + prometheus_orchestrate(&mut orchestrations); opencti_orchestrate(&mut orchestrations); openbas_orchestrate(&mut orchestrations); // Wait for threads to terminate diff --git a/src/orchestrator/composer.rs b/src/orchestrator/composer.rs index f39708b..bc3f101 100644 --- a/src/orchestrator/composer.rs +++ b/src/orchestrator/composer.rs @@ -18,6 +18,7 @@ async fn orchestrate_missing( // Update the connector status Some(_) => { api.patch_status(id, ConnectorStatus::Stopped).await; + crate::prometheus::CONNECTOR_INITIALIZED.inc(); } None => { warn!(id = id, "Deployment canceled"); @@ -53,23 +54,23 @@ async fn orchestrate_existing( } else { container_status }; - + // Update the connector status if needed let container_status_not_aligned = final_status != connector_status; - + // Detect if connector just started - let just_started = container_status_not_aligned && - final_status == ConnectorStatus::Started && - connector_status == ConnectorStatus::Stopped; - + let just_started = container_status_not_aligned + && final_status == ConnectorStatus::Started + && connector_status == ConnectorStatus::Stopped; + // Send health metrics if: // - Connector just started (immediate reporting) // - OR connector is running and 30 seconds have elapsed let now = Instant::now(); - let should_send_health = just_started || - (final_status == ConnectorStatus::Started && - now.duration_since(health_tick.clone()) >= Duration::from_secs(30)); - + let should_send_health = just_started + || (final_status == ConnectorStatus::Started + && now.duration_since(health_tick.clone()) >= Duration::from_secs(30)); + if should_send_health { if let Some(started_at) = &container.started_at { info!(id = connector_id, "Reporting health metrics"); @@ -78,7 +79,8 @@ async fn orchestrate_existing( container.restart_count, started_at.clone(), is_in_reboot_loop, - ).await; + ) + .await; } // Reset timer only for running connectors if final_status == ConnectorStatus::Started { @@ -86,8 +88,7 @@ async fn orchestrate_existing( } } if container_status_not_aligned { - api.patch_status(connector.id.clone(), final_status) - .await; + api.patch_status(connector.id.clone(), final_status).await; info!(id = connector_id, "Patch status"); } // In case of platform upgrade, we need to align all deployed connectors @@ -101,6 +102,7 @@ async fn orchestrate_existing( "Refreshing" ); orchestrator.refresh(connector).await; + crate::prometheus::CONNECTOR_UPDATED.inc(); } // Align existing and requested status let requested_status = RequestedStatus::from_str(requested_status_fetch.as_str()).unwrap(); @@ -108,10 +110,12 @@ async fn orchestrate_existing( (RequestedStatus::Stopping, ConnectorStatus::Started) => { info!(id = connector_id, "Stopping"); orchestrator.stop(&container, connector).await; + crate::prometheus::CONNECTOR_STOPPED.inc(); } (RequestedStatus::Starting, ConnectorStatus::Stopped) => { info!(id = connector_id, "Starting"); orchestrator.start(&container, connector).await; + crate::prometheus::CONNECTOR_STARTED.inc(); } _ => { info!(id = connector_id, "Nothing to execute"); @@ -145,13 +149,16 @@ pub async fn orchestrate( if connectors_response.is_some() { // First round trip to instantiate and control if needed let connectors = connectors_response.unwrap(); + crate::prometheus::MANAGED_CONNECTORS.set(connectors.len() as i64); + // Iter on each definition and check alignment between the status and the container for connector in &connectors { // Get current containers in the orchestrator let container_get = orchestrator.get(connector).await; match container_get { Some(container) => { - orchestrate_existing(tick, health_tick, orchestrator, api, connector, container).await + orchestrate_existing(tick, health_tick, orchestrator, api, connector, container) + .await } None => orchestrate_missing(orchestrator, api, connector).await, } diff --git a/src/orchestrator/docker/docker.rs b/src/orchestrator/docker/docker.rs index 293c117..db78568 100644 --- a/src/orchestrator/docker/docker.rs +++ b/src/orchestrator/docker/docker.rs @@ -34,6 +34,47 @@ impl DockerOrchestrator { pub fn normalize_name(name: Option) -> String { name.unwrap().strip_prefix("/").unwrap().into() } + + fn build_host_config() -> HostConfig { + let settings = &crate::config::settings::SETTINGS; + let Some(docker_opts) = &settings.opencti.daemon.docker else { + return HostConfig::default(); + }; + + let ulimits = docker_opts.ulimits.as_ref().and_then(|ulimits| { + let vec: Vec = ulimits + .iter() + .filter_map(|ulimit_map| { + Some(bollard::models::ResourcesUlimits { + name: ulimit_map.get("name")?.as_str()?.to_string().into(), + soft: ulimit_map.get("soft")?.as_i64(), + hard: ulimit_map.get("hard")?.as_i64(), + }) + }) + .collect(); + if vec.is_empty() { None } else { Some(vec) } + }); + + HostConfig { + network_mode: docker_opts.network_mode.clone(), + extra_hosts: docker_opts.extra_hosts.clone(), + dns: docker_opts.dns.clone(), + dns_search: docker_opts.dns_search.clone(), + privileged: docker_opts.privileged, + cap_add: docker_opts.cap_add.clone(), + cap_drop: docker_opts.cap_drop.clone(), + security_opt: docker_opts.security_opt.clone(), + userns_mode: docker_opts.userns_mode.clone(), + pid_mode: docker_opts.pid_mode.clone(), + ipc_mode: docker_opts.ipc_mode.clone(), + uts_mode: docker_opts.uts_mode.clone(), + runtime: docker_opts.runtime.clone(), + shm_size: docker_opts.shm_size, + sysctls: docker_opts.sysctls.clone(), + ulimits, + ..Default::default() + } + } } #[async_trait] @@ -71,7 +112,7 @@ impl Orchestrator for DockerOrchestrator { } async fn list(&self) -> Vec { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let manager_label = format!("opencti-manager={}", settings.manager.id.clone()); let list_container_filters: HashMap> = HashMap::from([("label".to_string(), Vec::from([manager_label]))]); @@ -166,7 +207,7 @@ impl Orchestrator for DockerOrchestrator { } async fn deploy(&self, connector: &ApiConnector) -> Option { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let registry_config = settings.opencti.daemon.registry.clone(); let resolver = Image::new(registry_config); let auth = resolver.get_credentials(); @@ -193,140 +234,60 @@ impl Orchestrator for DockerOrchestrator { }) .await; - match deploy_response { - Ok(_) => { - // Create the container - let container_env_variables = connector - .container_envs() - .into_iter() - .map(|config| format!("{}={}", config.key, config.value)) - .collect::>(); - let labels = self.labels(connector); + if let Err(e) = deploy_response { + error!( + image = image, + error = e.to_string(), + "Error fetching container image" + ); + return None; + } - // Build host config with Docker options - let mut host_config = HostConfig::default(); + // Create the container + let container_env_variables = connector + .container_envs() + .into_iter() + .map(|config| format!("{}={}", config.key, config.value)) + .collect::>(); + let labels = self.labels(connector); - // Get settings and check for Docker options - let settings = crate::settings(); - let docker_options = settings.opencti.daemon.docker.as_ref(); + // Build host config with Docker options + let host_config = DockerOrchestrator::build_host_config(); - if let Some(docker_opts) = docker_options { - // Apply Docker options to host config - if let Some(network_mode) = &docker_opts.network_mode { - host_config.network_mode = Some(network_mode.clone()); - } - if let Some(extra_hosts) = &docker_opts.extra_hosts { - host_config.extra_hosts = Some(extra_hosts.clone()); - } - if let Some(dns) = &docker_opts.dns { - host_config.dns = Some(dns.clone()); - } - if let Some(dns_search) = &docker_opts.dns_search { - host_config.dns_search = Some(dns_search.clone()); - } - if let Some(privileged) = docker_opts.privileged { - host_config.privileged = Some(privileged); - } - if let Some(cap_add) = &docker_opts.cap_add { - host_config.cap_add = Some(cap_add.clone()); - } - if let Some(cap_drop) = &docker_opts.cap_drop { - host_config.cap_drop = Some(cap_drop.clone()); - } - if let Some(security_opt) = &docker_opts.security_opt { - host_config.security_opt = Some(security_opt.clone()); - } - if let Some(userns_mode) = &docker_opts.userns_mode { - host_config.userns_mode = Some(userns_mode.clone()); - } - if let Some(pid_mode) = &docker_opts.pid_mode { - host_config.pid_mode = Some(pid_mode.clone()); - } - if let Some(ipc_mode) = &docker_opts.ipc_mode { - host_config.ipc_mode = Some(ipc_mode.clone()); - } - if let Some(uts_mode) = &docker_opts.uts_mode { - host_config.uts_mode = Some(uts_mode.clone()); - } - if let Some(runtime) = &docker_opts.runtime { - host_config.runtime = Some(runtime.clone()); - } - if let Some(shm_size) = docker_opts.shm_size { - host_config.shm_size = Some(shm_size); - } - if let Some(sysctls) = &docker_opts.sysctls { - host_config.sysctls = Some(sysctls.clone()); - } - if let Some(ulimits) = &docker_opts.ulimits { - // Convert ulimits from HashMap to bollard's expected format - let ulimits_vec: Vec = ulimits - .iter() - .filter_map(|ulimit_map| { - if let (Some(name), Some(soft), Some(hard)) = ( - ulimit_map.get("name").and_then(|v| v.as_str()), - ulimit_map.get("soft").and_then(|v| v.as_i64()), - ulimit_map.get("hard").and_then(|v| v.as_i64()), - ) { - Some(bollard::models::ResourcesUlimits { - name: Some(name.to_string()), - soft: Some(soft), - hard: Some(hard), - }) - } else { - None - } - }) - .collect(); - if !ulimits_vec.is_empty() { - host_config.ulimits = Some(ulimits_vec); - } - } - } + let config = Config { + image: Some(image), + env: Some(container_env_variables), + labels: Some(labels), + host_config: Some(host_config), + ..Default::default() + }; - let config = Config { - image: Some(image), - env: Some(container_env_variables), - labels: Some(labels), - host_config: Some(host_config), + let create_response = self + .docker + .create_container::( + Some(CreateContainerOptions { + name: connector.container_name(), ..Default::default() - }; + }), + config, + ) + .await; - let create_response = self - .docker - .create_container::( - Some(CreateContainerOptions { - name: connector.container_name(), - platform: None, - }), - config, - ) - .await; - match create_response { - Ok(_) => {} - Err(err) => { - error!(error = err.to_string(), "Error creating container"); - } - } + if let Err(err) = create_response { + error!(error = err.to_string(), "Error creating container"); + } - // Get the created connector - let created = self.get(connector).await; - // Start the container if needed - let is_starting = connector.requested_status.clone().eq("starting"); - if is_starting { - self.start(&created.clone().unwrap(), connector).await; - } - // Return the created container - created - } - Err(e) => { - error!( - image = image, - error = e.to_string(), - "Error fetching container image" - ); - None + // Get the created connector + let created = self.get(connector).await; + // Start the container if needed + let is_starting = connector.requested_status.clone().eq("starting"); + if is_starting { + if let Some(container) = &created { + self.start(container, connector).await; } } + // Return the created container + created } async fn logs( diff --git a/src/orchestrator/kubernetes/kubernetes.rs b/src/orchestrator/kubernetes/kubernetes.rs index c78d20a..1076f61 100644 --- a/src/orchestrator/kubernetes/kubernetes.rs +++ b/src/orchestrator/kubernetes/kubernetes.rs @@ -168,7 +168,7 @@ impl KubeOrchestrator { let deployment_labels: BTreeMap = labels.into_iter().collect(); let pod_env = self.container_envs(connector); let is_starting = &connector.requested_status == "starting"; - let settings = crate::settings(); +let settings = crate::settings(); let registry_config = settings.opencti.daemon.registry.clone(); let resolver = Image::new(registry_config); let auth = resolver.get_credentials(); @@ -286,7 +286,7 @@ impl Orchestrator for KubeOrchestrator { } async fn list(&self) -> Vec { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let lp = &ListParams::default() .labels(&format!("opencti-manager={}", settings.manager.id.clone())); let get_deployments = self.deployments.list(lp).await.unwrap(); diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs index 2cd0db5..7290595 100644 --- a/src/orchestrator/mod.rs +++ b/src/orchestrator/mod.rs @@ -51,7 +51,7 @@ impl OrchestratorContainer { #[async_trait] pub trait Orchestrator { fn labels(&self, connector: &ApiConnector) -> HashMap { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let mut labels: HashMap = HashMap::new(); labels.insert("opencti-manager".into(), settings.manager.id.clone()); labels.insert("opencti-connector-id".into(), connector.id.clone()); diff --git a/src/orchestrator/portainer/docker/portainer.rs b/src/orchestrator/portainer/docker/portainer.rs index c834368..d3ad2e3 100644 --- a/src/orchestrator/portainer/docker/portainer.rs +++ b/src/orchestrator/portainer/docker/portainer.rs @@ -92,7 +92,7 @@ impl Orchestrator for PortainerDockerOrchestrator { } async fn list(&self) -> Vec { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let mut label_filters = Vec::new(); label_filters.push(format!("opencti-manager={}", settings.manager.id.clone())); let filter: HashMap> = HashMap::from([("label".into(), label_filters)]); diff --git a/src/prometheus/mod.rs b/src/prometheus/mod.rs new file mode 100644 index 0000000..2efccbe --- /dev/null +++ b/src/prometheus/mod.rs @@ -0,0 +1,87 @@ +use axum::{Router, routing::get}; +use prometheus::{Encoder, IntCounter, IntGauge, Registry, TextEncoder}; +use std::net::SocketAddr; +use std::sync::LazyLock; +use tracing::info; + +// Registry initialization +pub static REGISTRY: LazyLock = LazyLock::new(|| Registry::new()); + +// region Metrics initialization +pub static MANAGED_CONNECTORS: LazyLock = LazyLock::new(|| { + let gauge = IntGauge::new( + "xtm_managed_connectors", + "Number of protected connectors managed by the composer", + ) + .expect("metric can be created"); + REGISTRY + .register(Box::new(gauge.clone())) + .expect("collector can be registered"); + gauge +}); + +pub static CONNECTOR_INITIALIZED: LazyLock = LazyLock::new(|| { + let counter = IntCounter::new( + "xtm_connectors_initialized_total", + "Number of connectors initialized", + ) + .expect("metric can be created"); + REGISTRY + .register(Box::new(counter.clone())) + .expect("collector can be registered"); + counter +}); + +pub static CONNECTOR_STARTED: LazyLock = LazyLock::new(|| { + let counter = IntCounter::new( + "xtm_connectors_started_total", + "Number of connectors started", + ) + .expect("metric can be created"); + REGISTRY + .register(Box::new(counter.clone())) + .expect("collector can be registered"); + counter +}); + +pub static CONNECTOR_STOPPED: LazyLock = LazyLock::new(|| { + let counter = IntCounter::new( + "xtm_connectors_stopped_total", + "Number of connectors stopped", + ) + .expect("metric can be created"); + REGISTRY + .register(Box::new(counter.clone())) + .expect("collector can be registered"); + counter +}); + +pub static CONNECTOR_UPDATED: LazyLock = LazyLock::new(|| { + let counter = IntCounter::new( + "xtm_connectors_updated_total", + "Number of connectors updated", + ) + .expect("metric can be created"); + REGISTRY + .register(Box::new(counter.clone())) + .expect("collector can be registered"); + counter +}); +// endregion + +// Functions +async fn metrics_handler() -> String { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + let metric_families = REGISTRY.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} + +pub async fn start_metrics_server(port: u16) { + let app = Router::new().route("/metrics", get(metrics_handler)); + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + info!("Prometheus server listening on {}/metrics", addr); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} From 9d6982ee46a6ad35742e0d1191669644998b1682 Mon Sep 17 00:00:00 2001 From: Julien Richard Date: Fri, 2 Jan 2026 20:15:21 +0100 Subject: [PATCH 2/2] [composer] Adapt settings after rebase (#36) --- src/orchestrator/kubernetes/kubernetes.rs | 7 ++++--- src/orchestrator/portainer/docker/portainer.rs | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/orchestrator/kubernetes/kubernetes.rs b/src/orchestrator/kubernetes/kubernetes.rs index 1076f61..3d158c0 100644 --- a/src/orchestrator/kubernetes/kubernetes.rs +++ b/src/orchestrator/kubernetes/kubernetes.rs @@ -7,7 +7,8 @@ use async_trait::async_trait; use k8s_openapi::DeepMerge; use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec}; use k8s_openapi::api::core::v1::{ - Container, LocalObjectReference, Secret, ContainerStatus, EnvVar, Pod, PodSpec, PodTemplateSpec, ResourceRequirements, + Container, ContainerStatus, EnvVar, LocalObjectReference, Pod, PodSpec, PodTemplateSpec, + ResourceRequirements, Secret, }; use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta}; use kube::api::{DeleteParams, LogParams, Patch, PatchParams}; @@ -38,7 +39,7 @@ impl KubeOrchestrator { // Validate and return image pull policy async fn register_secret(secrets: &Api) { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let registry_config = settings.opencti.daemon.registry.clone(); let resolver = Image::new(registry_config); let registry_secret = resolver.get_kubernetes_registry_secret(); @@ -168,7 +169,7 @@ impl KubeOrchestrator { let deployment_labels: BTreeMap = labels.into_iter().collect(); let pod_env = self.container_envs(connector); let is_starting = &connector.requested_status == "starting"; -let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let registry_config = settings.opencti.daemon.registry.clone(); let resolver = Image::new(registry_config); let auth = resolver.get_credentials(); diff --git a/src/orchestrator/portainer/docker/portainer.rs b/src/orchestrator/portainer/docker/portainer.rs index d3ad2e3..29c3868 100644 --- a/src/orchestrator/portainer/docker/portainer.rs +++ b/src/orchestrator/portainer/docker/portainer.rs @@ -179,7 +179,7 @@ impl Orchestrator for PortainerDockerOrchestrator { } async fn deploy(&self, connector: &ApiConnector) -> Option { - let settings = crate::settings(); + let settings = &crate::config::settings::SETTINGS; let registry_config = settings.opencti.daemon.registry.clone(); let resolver = Image::new(registry_config); let auth = resolver.get_credentials();