Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Adding start up commands for airflow #530

Merged
merged 15 commits into from
Nov 11, 2024
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 125 additions & 9 deletions rust/crd/src/authentication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt, Snafu};
use stackable_operator::{
client::Client,
commons::authentication::{
ldap,
oidc::{self, IdentityProviderHint},
AuthenticationClass, AuthenticationClassProvider, ClientAuthenticationDetails,
commons::{
authentication::{
ldap,
oidc::{self, IdentityProviderHint},
AuthenticationClass, AuthenticationClassProvider, ClientAuthenticationDetails,
},
tls_verification::TlsClientDetails,
},
schemars::{self, JsonSchema},
};
Expand Down Expand Up @@ -78,6 +81,8 @@ pub enum Error {
supported: String,
auth_class_name: String,
},
#[snafu(display("Currently only one CA certificate is supported."))]
MultipleCaCertsNotSupported,
Maleware marked this conversation as resolved.
Show resolved Hide resolved
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -130,6 +135,7 @@ pub struct AirflowClientAuthenticationDetailsResolved {
pub user_registration: bool,
pub user_registration_role: String,
pub sync_roles_at: FlaskRolesSyncMoment,
pub tls_ca_cert_mount_path: Option<String>,
Maleware marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone, Debug, Eq, PartialEq)]
Expand All @@ -143,6 +149,24 @@ pub enum AirflowAuthenticationClassResolved {
},
}

impl AirflowAuthenticationClassResolved {
pub fn tls_ca_cert_mount_path(&self) -> Option<String> {
self.tls_client_details().tls_ca_cert_mount_path()
}

pub fn tls_client_details(&self) -> &TlsClientDetails {
match self {
AirflowAuthenticationClassResolved::Ldap {
provider: ldap::AuthenticationProvider { tls, .. },
} => tls,
AirflowAuthenticationClassResolved::Oidc {
provider: oidc::AuthenticationProvider { tls, .. },
..
} => tls,
}
}
}

Maleware marked this conversation as resolved.
Show resolved Hide resolved
impl AirflowClientAuthenticationDetailsResolved {
pub async fn from(
auth_details: &[AirflowClientAuthenticationDetails],
Expand Down Expand Up @@ -254,12 +278,24 @@ impl AirflowClientAuthenticationDetailsResolved {
None => sync_roles_at = Some(entry.sync_roles_at.to_owned()),
}
}

let mut tls_ca_cert_mount_paths = resolved_auth_classes
.iter()
.filter_map(AirflowAuthenticationClassResolved::tls_ca_cert_mount_path)
.collect::<BTreeSet<_>>();
let tls_ca_cert_mount_path = tls_ca_cert_mount_paths.pop_first();
ensure!(
tls_ca_cert_mount_paths.is_empty(),
MultipleCaCertsNotSupportedSnafu
);

Maleware marked this conversation as resolved.
Show resolved Hide resolved
Ok(AirflowClientAuthenticationDetailsResolved {
authentication_classes_resolved: resolved_auth_classes,
user_registration: user_registration.unwrap_or_else(default_user_registration),
user_registration_role: user_registration_role
.unwrap_or_else(default_user_registration_role),
sync_roles_at: sync_roles_at.unwrap_or_else(FlaskRolesSyncMoment::default),
tls_ca_cert_mount_path,
Maleware marked this conversation as resolved.
Show resolved Hide resolved
})
}

Expand Down Expand Up @@ -336,7 +372,8 @@ mod tests {
authentication_classes_resolved: Vec::default(),
user_registration: default_user_registration(),
user_registration_role: default_user_registration_role(),
sync_roles_at: FlaskRolesSyncMoment::default()
sync_roles_at: FlaskRolesSyncMoment::default(),
tls_ca_cert_mount_path: None
Maleware marked this conversation as resolved.
Show resolved Hide resolved
},
auth_details_resolved
);
Expand All @@ -362,18 +399,32 @@ mod tests {
provider:
ldap:
hostname: my.ldap.server
tls:
verification:
server:
caCert:
secretClass: tls-keycloak
Maleware marked this conversation as resolved.
Show resolved Hide resolved
"},
)
.await;

assert_eq!(
AirflowClientAuthenticationDetailsResolved {
authentication_classes_resolved: vec![AirflowAuthenticationClassResolved::Ldap {
provider: serde_yaml::from_str("hostname: my.ldap.server").unwrap()
provider: serde_yaml::from_str(indoc! {"
hostname: my.ldap.server
tls:
verification:
server:
caCert:
secretClass: tls-keycloak
"})
.unwrap()
Maleware marked this conversation as resolved.
Show resolved Hide resolved
}],
user_registration: false,
user_registration_role: "Gamma".into(),
sync_roles_at: FlaskRolesSyncMoment::Login
sync_roles_at: FlaskRolesSyncMoment::Login,
tls_ca_cert_mount_path: Some("/stackable/secrets/tls-keycloak/ca.crt".into()),
Maleware marked this conversation as resolved.
Show resolved Hide resolved
},
auth_details_resolved
);
Expand Down Expand Up @@ -437,6 +488,11 @@ mod tests {
- openid
- email
- profile
tls:
verification:
server:
caCert:
secretClass: tls
Maleware marked this conversation as resolved.
Show resolved Hide resolved
"},
)
.await;
Expand Down Expand Up @@ -471,7 +527,13 @@ mod tests {
HostName::try_from("second.oidc.server".to_string()).unwrap(),
None,
"/realms/test".into(),
TlsClientDetails { tls: None },
TlsClientDetails {
tls: Some(Tls {
verification: TlsVerification::Server(TlsServerVerification {
ca_cert: CaCert::SecretClass("tls".into())
})
})
},
Maleware marked this conversation as resolved.
Show resolved Hide resolved
"preferred_username".into(),
vec!["openid".into(), "email".into(), "profile".into()],
None
Expand All @@ -485,7 +547,8 @@ mod tests {
],
user_registration: false,
user_registration_role: "Gamma".into(),
sync_roles_at: FlaskRolesSyncMoment::Login
sync_roles_at: FlaskRolesSyncMoment::Login,
tls_ca_cert_mount_path: Some("/stackable/secrets/tls/ca.crt".into()),
Maleware marked this conversation as resolved.
Show resolved Hide resolved
},
auth_details_resolved
);
Expand Down Expand Up @@ -765,6 +828,59 @@ mod tests {
error_message
);
}
#[tokio::test]
async fn reject_different_tls_ca_certs() {
let error_message = test_resolve_and_expect_error(
indoc! {"
- authenticationClass: oidc1
oidc:
clientCredentialsSecret: airflow-oidc-client1
- authenticationClass: oidc2
oidc:
clientCredentialsSecret: airflow-oidc-client2
"},
indoc! {"
---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
name: oidc1
spec:
provider:
oidc:
hostname: first.oidc.server
principalClaim: preferred_username
scopes: []
tls:
verification:
server:
caCert:
secretClass: tls1
---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
name: oidc2
spec:
provider:
oidc:
hostname: second.oidc.server
principalClaim: preferred_username
scopes: []
tls:
verification:
server:
caCert:
secretClass: tls2
"},
)
.await;

assert_eq!(
"Currently only one CA certificate is supported.",
error_message
);
}
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved

siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
#[tokio::test]
async fn reject_wrong_principal_claim() {
Expand Down
61 changes: 55 additions & 6 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};

use authentication::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
};
use git_sync::GitSync;
use product_config::flask_app_config_writer::{FlaskAppConfigOptions, PythonType};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -322,7 +325,12 @@ impl AirflowRole {
/// components to have the same image/configuration (e.g. DAG folder location), even if not all
/// configuration settings are used everywhere. For this reason we ensure that the webserver
/// config file is in the Airflow home directory on all pods.
pub fn get_commands(&self) -> Vec<String> {
/// Only the webserver needs to know about authentication CA's which is added via python's certify
/// if authentication is enabled.
pub fn get_commands(
&self,
auth_config: &AirflowClientAuthenticationDetailsResolved,
) -> Vec<String> {
let mut command = vec![
format!("cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}"),
// graceful shutdown part
Expand All @@ -331,10 +339,16 @@ impl AirflowRole {
];

match &self {
AirflowRole::Webserver => command.extend(vec![
"prepare_signal_handlers".to_string(),
"airflow webserver &".to_string(),
]),
AirflowRole::Webserver => {
// Getting auth commands for AuthClass
let auth_commands = vec![Self::authentication_start_commands(auth_config)];
command.extend(vec![
"prepare_signal_handlers".to_string(),
"airflow webserver &".to_string(),
]);
command.extend(auth_commands);
}
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved

AirflowRole::Scheduler => command.extend(vec![
// Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
"airflow db init".to_string(),
Expand Down Expand Up @@ -363,6 +377,41 @@ impl AirflowRole {

command
}
fn authentication_start_commands(
auth_config: &AirflowClientAuthenticationDetailsResolved,
) -> String {
let mut commands = Vec::new();
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved

let mut tls_client_credentials = BTreeSet::new();

for auth_class_resolved in &auth_config.authentication_classes_resolved {
match auth_class_resolved {
AirflowAuthenticationClassResolved::Oidc { provider, .. } => {
tls_client_credentials.insert(&provider.tls);

// WebPKI will be handled implicitly
}
AirflowAuthenticationClassResolved::Ldap { .. } => {}
}
}

for tls in tls_client_credentials {
commands.push(tls.tls_ca_cert_mount_path().map(|tls_ca_cert_mount_path| {
Self::add_cert_to_python_certifi_command(&tls_ca_cert_mount_path)
}));
}

commands
.iter()
.flatten()
.cloned()
.collect::<Vec<_>>()
.join("\n")
}
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
// Adding certificate to the mount path for airflow startup commands
fn add_cert_to_python_certifi_command(cert_file: &str) -> String {
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
format!("cat {cert_file} >> \"$(python -c 'import certifi; print(certifi.where())')\"")
}

/// Will be used to expose service ports and - by extension - which roles should be
/// created as services.
Expand Down
2 changes: 1 addition & 1 deletion rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ fn build_server_rolegroup_statefulset(
.context(GracefulShutdownSnafu)?;

let mut airflow_container_args = Vec::new();
airflow_container_args.extend(airflow_role.get_commands());
airflow_container_args.extend(airflow_role.get_commands(authentication_config));

airflow_container
.image_from_product_image(resolved_product_image)
Expand Down
3 changes: 3 additions & 0 deletions rust/operator-binary/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ mod tests {
user_registration: true,
user_registration_role: "User".to_string(),
sync_roles_at: FlaskRolesSyncMoment::Registration,
tls_ca_cert_mount_path: None,
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
};

let mut result = BTreeMap::new();
Expand Down Expand Up @@ -314,6 +315,7 @@ mod tests {
user_registration: true,
user_registration_role: "Admin".to_string(),
sync_roles_at: FlaskRolesSyncMoment::Registration,
tls_ca_cert_mount_path: Some("stackable/secrets/openldap-tls".to_string()),
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
};

let mut result = BTreeMap::new();
Expand Down Expand Up @@ -394,6 +396,7 @@ mod tests {
user_registration: default_user_registration(),
user_registration_role: "Admin".to_string(),
sync_roles_at: default_sync_roles_at(),
tls_ca_cert_mount_path: None,
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
};

let mut result = BTreeMap::new();
Expand Down
8 changes: 0 additions & 8 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,6 @@ pub fn build_airflow_statefulset_envs(
AirflowRole::Webserver => {
let auth_vars = authentication_env_vars(auth_config);
env.extend(auth_vars.into_iter().map(|var| (var.name.to_owned(), var)));
env.insert(
"REQUESTS_CA_BUNDLE".into(),
EnvVar {
name: "REQUESTS_CA_BUNDLE".to_string(),
value: Some("/stackable/secrets/tls/ca.crt".to_string()),
..Default::default()
},
);
}
_ => {}
}
Expand Down
Loading