Skip to content

Commit

Permalink
Add PKI config dir logic and restructure DSH module
Browse files Browse the repository at this point in the history
  • Loading branch information
toelo3 committed Jun 11, 2024
1 parent 76928f6 commit 13bc862
Show file tree
Hide file tree
Showing 10 changed files with 1,061 additions and 794 deletions.
1 change: 1 addition & 0 deletions dsh_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hyper = { version = "1.3", features = ["server", "http1"], optional = true }
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
lazy_static = { version = "1.4", optional = true }
log = "0.4"
pem = "3"
prometheus = { version = "0.13", features = ["process"], optional = true }
rcgen = { version = "0.13", optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "blocking"], optional = true }
Expand Down
206 changes: 72 additions & 134 deletions dsh_sdk/src/dsh/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,101 +5,82 @@
//! used in the get_signed_certificates_json.sh script.
//!
//! ## Note
//!
//! This module is not intended to be used directly, but through the `Properties` struct. It will
//! always be used when getting a `Properties` struct vja dsh::Properties::get().
//!
//! If this module returns an error, it defaults to the local_datastreams.json file, so it can be used
//! in a local environment. (when feature `local` is enabled)
//! always be used when getting a `Properties` struct via dsh::Properties::get().
//!
//! ## Example
//! ```
//! use dsh_sdk::Properties;
//!
//! let dsh_properties = Properties::get();
//! ```
//! If this module returns an error, it defaults to the local_datastreams.json file or the
//! default properties. This means it can be used in you local development environment without
//! the need to connect to DSH.
use log::{debug, info, warn};
use log::{info, warn};
use reqwest::blocking::Client;

use std::env;

use crate::error::DshError;

use super::{certificates::Cert, datastream::Datastream, Properties};
use super::{
certificates::Cert, datastream::Datastream, utils, VAR_DSH_CA_CERTIFICATE,
VAR_DSH_SECRET_TOKEN, VAR_DSH_SECRET_TOKEN_PATH, VAR_KAFKA_CONFIG_HOST,
};

impl Properties {
/// Connect to DSH and retrieve the certificates and datastreams.json to create the properties struct
pub(crate) fn new_dsh() -> Result<Self, DshError> {
let dsh_config = DshConfig::new()?;
let client = Properties::reqwest_client(dsh_config.dsh_ca_certificate.as_bytes())?;
let dn = DshCall::Dn(&dsh_config).perform_call(&client)?;
let dn = Dn::parse_string(&dn)?;
let certificates = Cert::new(dn, &dsh_config, &client)?;
let client_with_cert = certificates.reqwest_blocking_client_config()?.build()?;
let datastreams_string =
DshCall::Datastream(&dsh_config).perform_call(&client_with_cert)?;
let datastream: Datastream = serde_json::from_str(&datastreams_string)?;
Ok(Self {
client_id: dsh_config.task_id.to_string(),
tenant_name: dsh_config.tenant_name.to_string(),
task_id: dsh_config.task_id.to_string(),
datastream,
certificates: Some(certificates),
})
}
/// Connect to DSH and retrieve the certificates and datastreams.json to create the properties struct
pub(crate) fn bootstrap(tenant_name: &str, task_id: &str) -> Result<(Cert, Datastream), DshError> {
let dsh_config = DshConfig::new(tenant_name, task_id)?;
let client = reqwest_ca_client(dsh_config.dsh_ca_certificate.as_bytes())?;
let dn = DshCall::Dn(&dsh_config).perform_call(&client)?;
let dn = Dn::parse_string(&dn)?;
let certificates = Cert::bootstrap(dn, &dsh_config, &client)?;
let client_with_cert = certificates.reqwest_blocking_client_config()?.build()?;
let datastreams_string = DshCall::Datastream(&dsh_config).perform_call(&client_with_cert)?;
let datastream: Datastream = serde_json::from_str(&datastreams_string)?;
Ok((certificates, datastream))
}

/// Build a request client with the DSH CA certificate.
fn reqwest_client(dsh_ca_certificate: &[u8]) -> Result<Client, reqwest::Error> {
let reqwest_cert = reqwest::tls::Certificate::from_pem(dsh_ca_certificate)?;
let client = Client::builder()
.add_root_certificate(reqwest_cert)
.build()?;
Ok(client)
}
/// Build a request client with the DSH CA certificate.
fn reqwest_ca_client(dsh_ca_certificate: &[u8]) -> Result<Client, reqwest::Error> {
let reqwest_cert = reqwest::tls::Certificate::from_pem(dsh_ca_certificate)?;
let client = Client::builder()
.add_root_certificate(reqwest_cert)
.build()?;
Ok(client)
}

#[derive(Debug)]
pub(crate) struct DshConfig {
pub(crate) struct DshConfig<'a> {
config_host: String,
tenant_name: String,
task_id: String,
tenant_name: &'a str,
task_id: &'a str,
dsh_secret_token: String,
dsh_ca_certificate: String,
}

/// Helper struct to store the config needed for bootstrapping to DSH
impl DshConfig {
fn new() -> Result<Self, DshError> {
let config_host = Self::get_env_var("KAFKA_CONFIG_HOST")
impl<'a> DshConfig<'a> {
fn new(tenant_name: &'a str, task_id: &'a str) -> Result<Self, DshError> {
let config_host = utils::get_env_var(VAR_KAFKA_CONFIG_HOST)
.map(|host| format!("https://{}", host))
.unwrap_or_else(|_| {
let default = "https://pikachu.dsh.marathon.mesos:4443".to_string();
let default = "https://pikachu.dsh.marathon.mesos:4443";
warn!(
"KAFKA_CONFIG_HOST is not set, using default value {}",
default
"{} is not set, using default value {}",
VAR_KAFKA_CONFIG_HOST, default
);
default
default.to_string()
});

let task_id = Self::get_env_var("MESOS_TASK_ID")?;
let app_id = Self::get_env_var("MARATHON_APP_ID")?;
let dsh_secret_token = match Self::get_env_var("DSH_SECRET_TOKEN") {
let dsh_secret_token = match utils::get_env_var(VAR_DSH_SECRET_TOKEN) {
Ok(token) => token,
Err(_) => {
// if DSH_SECRET_TOKEN is not set, try to read it from a file (for system space applications)
info!("trying to read DSH_SECRET_TOKEN from file");
let secret_token_path = Self::get_env_var("DSH_SECRET_TOKEN_PATH")?;
let secret_token_path = utils::get_env_var(VAR_DSH_SECRET_TOKEN_PATH)?;
let path = std::path::PathBuf::from(secret_token_path);
std::fs::read_to_string(path)?
}
};
let dsh_ca_certificate = Self::get_env_var("DSH_CA_CERTIFICATE")?;
let tenant_name = DshConfig::tenant_name(&app_id);
let dsh_ca_certificate = utils::get_env_var(VAR_DSH_CA_CERTIFICATE)?;
Ok(DshConfig {
config_host,
task_id,
tenant_name: tenant_name.to_string(),
tenant_name,
dsh_secret_token,
dsh_ca_certificate,
})
Expand All @@ -108,41 +89,18 @@ impl DshConfig {
pub(crate) fn dsh_ca_certificate(&self) -> &str {
&self.dsh_ca_certificate
}

/// Derive the tenant name from the app id.
fn tenant_name(app_id: &str) -> &str {
let tenant_name = app_id.split('/').nth(1);
match tenant_name {
Some(tenant_name) => tenant_name,
None => {
warn!(
"MARATHON_APP_ID is not as expected, missing expected slashes, using \"{}\" as tenant name",
app_id
);
app_id
}
}
}

fn get_env_var(var_name: &str) -> Result<String, DshError> {
debug!("Reading {} from environment variable", var_name);
match env::var(var_name) {
Ok(value) => Ok(value),
Err(e) => {
warn!("{} is not set", var_name);
Err(e.into())
}
}
}
}

pub(crate) enum DshCall<'a> {
/// Call to retreive distinguished name.
Dn(&'a DshConfig),
Dn(&'a DshConfig<'a>),
/// Call to retreive datastreams.json.
Datastream(&'a DshConfig),
Datastream(&'a DshConfig<'a>),
/// Call to post the certificate signing request.
CertificateSignRequest { config: &'a DshConfig, csr: &'a str },
CertificateSignRequest {
config: &'a DshConfig<'a>,
csr: &'a str,
},
}

impl DshCall<'_> {
Expand Down Expand Up @@ -248,35 +206,17 @@ impl Dn {

#[cfg(test)]
mod tests {
use std::str::from_utf8;

use super::*;

#[test]
fn test_dsh_config_tenant_name() {
let app_id = "/greenbox-dev/app-name";
let result = DshConfig::tenant_name(app_id);
assert_eq!(
result,
"greenbox-dev".to_string(),
"{} is not parsed correctly",
app_id
);
let app_id = "greenbox-dev";
let result = DshConfig::tenant_name(app_id);
assert_eq!(
result, app_id,
"{} is not parsed correctly, should be the same",
app_id
);
}
use serial_test::serial;
use std::env;
use std::str::from_utf8;

#[test]
fn test_dsh_call_request_builder() {
let dsh_config = DshConfig {
config_host: "https://test_host".to_string(),
tenant_name: "test_tenant_name".to_string(),
task_id: "test_task_id".to_string(),
tenant_name: "test_tenant_name",
task_id: "test_task_id",
dsh_secret_token: "test_token".to_string(),
dsh_ca_certificate: "test_ca_certificate".to_string(),
};
Expand Down Expand Up @@ -324,8 +264,8 @@ mod tests {
// create a DshConfig struct
let dsh_config = DshConfig {
config_host: dsh.url(),
tenant_name: "tenant".to_string(),
task_id: "test_task_id".to_string(),
tenant_name: "tenant",
task_id: "test_task_id",
dsh_secret_token: "test_token".to_string(),
dsh_ca_certificate: "test_ca_certificate".to_string(),
};
Expand All @@ -344,41 +284,39 @@ mod tests {
}

#[test]
fn test_get_env_var() {
env::set_var("TEST_ENV_VAR", "test_value");
let result = DshConfig::get_env_var("TEST_ENV_VAR").unwrap();
assert_eq!(result, "test_value");
}

#[test]
#[serial(env_depencency)]
fn test_dsh_config_new() {
// normal situation where DSH variables are set
env::set_var("KAFKA_CONFIG_HOST", "test_host");
env::set_var("MESOS_TASK_ID", "test_task_id");
env::set_var("MARATHON_APP_ID", "/test_tenant/test_app");
env::set_var("DSH_SECRET_TOKEN", "test_token");
env::set_var("DSH_CA_CERTIFICATE", "test_ca_certificate");
let dsh_config = DshConfig::new().unwrap();
env::set_var(VAR_KAFKA_CONFIG_HOST, "test_host");
env::set_var(VAR_DSH_SECRET_TOKEN, "test_token");
env::set_var(VAR_DSH_CA_CERTIFICATE, "test_ca_certificate");
let tenant_name = "test_tenant";
let task_id = "test_task_id";
let dsh_config = DshConfig::new(tenant_name, task_id).unwrap();
assert_eq!(dsh_config.config_host, "https://test_host");
assert_eq!(dsh_config.task_id, "test_task_id");
assert_eq!(dsh_config.tenant_name, "test_tenant");
assert_eq!(dsh_config.dsh_secret_token, "test_token");
assert_eq!(dsh_config.dsh_ca_certificate, "test_ca_certificate");
// DSH_SECRET_TOKEN is not set, but DSH_SECRET_TOKEN_PATH is set
env::remove_var("DSH_SECRET_TOKEN");
env::remove_var(VAR_DSH_SECRET_TOKEN);
let test_token_dir = "test_files";
std::fs::create_dir_all(test_token_dir).unwrap();
let test_token_dir = format!("{}/test_token", test_token_dir);
let _ = std::fs::remove_file(&test_token_dir);
env::set_var("DSH_SECRET_TOKEN_PATH", &test_token_dir);
let result = DshConfig::new();
env::set_var(VAR_DSH_SECRET_TOKEN_PATH, &test_token_dir);
let result = DshConfig::new(tenant_name, task_id);
assert!(result.is_err());
std::fs::write(test_token_dir.as_str(), "test_token_from_file").unwrap();
let dsh_config = DshConfig::new().unwrap();
let dsh_config = DshConfig::new(tenant_name, task_id).unwrap();
assert_eq!(dsh_config.dsh_secret_token, "test_token_from_file");
// fail if DSH_CA_CERTIFICATE is not set
env::remove_var("DSH_CA_CERTIFICATE");
let result = DshConfig::new();
env::remove_var(VAR_DSH_CA_CERTIFICATE);
let result = DshConfig::new(tenant_name, task_id);
assert!(result.is_err());
env::remove_var(VAR_KAFKA_CONFIG_HOST);
env::remove_var(VAR_DSH_SECRET_TOKEN);
env::remove_var(VAR_DSH_CA_CERTIFICATE);
env::remove_var(VAR_DSH_SECRET_TOKEN_PATH);
}
}
38 changes: 27 additions & 11 deletions dsh_sdk/src/dsh/certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,41 @@ use rcgen::{CertificateParams, CertificateSigningRequest, DnType, KeyPair};
#[derive(Debug, Clone)]
pub struct Cert {
dsh_ca_certificate_pem: String,
dsh_kafka_certificate_pem: String,
dsh_client_certificate_pem: String,
key_pair: Arc<KeyPair>,
}

impl Cert {
/// Create a new certificate struct.
pub(crate) fn new(dn: Dn, dsh_config: &DshConfig, client: &Client) -> Result<Self, DshError> {
/// Create new `Cert` struct
pub(crate) fn new(
dsh_ca_certificate_pem: String,
dsh_client_certificate_pem: String,
key_pair: KeyPair,
) -> Cert {
Self {
dsh_ca_certificate_pem,
dsh_client_certificate_pem,
key_pair: Arc::new(key_pair),
}
}
///
pub(crate) fn bootstrap(
dn: Dn,
dsh_config: &DshConfig,
client: &Client,
) -> Result<Self, DshError> {
let key_pair = KeyPair::generate_for(&rcgen::PKCS_ECDSA_P384_SHA384)?;
let csr = Self::generate_csr(&key_pair, dn)?;
let dsh_kafka_certificate_pem = DshCall::CertificateSignRequest {
let client_certificate = DshCall::CertificateSignRequest {
config: dsh_config,
csr: &csr.pem()?,
}
.perform_call(client)?;
Ok(Self {
dsh_ca_certificate_pem: dsh_config.dsh_ca_certificate().to_string(),
dsh_kafka_certificate_pem,
key_pair: Arc::new(key_pair),
})
Ok(Self::new(
dsh_config.dsh_ca_certificate().to_string(),
client_certificate,
key_pair,
))
}

/// Build an async reqwest client with the DSH Kafka certificate included.
Expand Down Expand Up @@ -108,7 +124,7 @@ impl Cert {

/// Get the kafka certificate as PEM string. Equivalent to client.pem.
pub fn dsh_kafka_certificate_pem(&self) -> &str {
self.dsh_kafka_certificate_pem.as_str()
&self.dsh_client_certificate_pem.as_str()
}

/// Get the private key as PKCS8 and return bytes based on asn1 DER format.
Expand Down Expand Up @@ -200,7 +216,7 @@ mod tests {
fn set_test_cert() -> Cert {
Cert {
dsh_ca_certificate_pem: CA_CERT.to_string(),
dsh_kafka_certificate_pem: KAFKA_CERT.to_string(),
dsh_client_certificate_pem: KAFKA_CERT.to_string(),
key_pair: Arc::new(KeyPair::generate().unwrap()),
}
}
Expand Down
Loading

0 comments on commit 13bc862

Please sign in to comment.