Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Adding start up commands for airflow #530

Merged
merged 15 commits into from
Nov 11, 2024
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Use the env var `KUBERNETES_CLUSTER_DOMAIN` or the operator Helm chart property `kubernetesClusterDomain` to set a non-default cluster domain ([#518]).
- Support for `2.9.3` ([#494]).
- Experimental Support for `2.10.2` ([#512]).
- Add support for OpenID Connect ([#524])
- Add support for OpenID Connect ([#524], [#530])

### Changed

Expand All @@ -32,6 +32,7 @@
[#518]: https://github.com/stackabletech/airflow-operator/pull/518
[#520]: https://github.com/stackabletech/airflow-operator/pull/520
[#524]: https://github.com/stackabletech/airflow-operator/pull/524
[#530]: https://github.com/stackabletech/airflow-operator/pull/530

## [24.7.0] - 2024-07-24

Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 51 additions & 6 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};

use authentication::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
};
use git_sync::GitSync;
use product_config::flask_app_config_writer::{FlaskAppConfigOptions, PythonType};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -322,7 +325,12 @@ impl AirflowRole {
/// components to have the same image/configuration (e.g. DAG folder location), even if not all
/// configuration settings are used everywhere. For this reason we ensure that the webserver
/// config file is in the Airflow home directory on all pods.
pub fn get_commands(&self) -> Vec<String> {
/// Only the webserver needs to know about authentication CA's which is added via python's certify
/// if authentication is enabled.
pub fn get_commands(
&self,
auth_config: &AirflowClientAuthenticationDetailsResolved,
) -> Vec<String> {
let mut command = vec![
format!("cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}"),
// graceful shutdown part
Expand All @@ -331,10 +339,15 @@ impl AirflowRole {
];

match &self {
AirflowRole::Webserver => command.extend(vec![
"prepare_signal_handlers".to_string(),
"airflow webserver &".to_string(),
]),
AirflowRole::Webserver => {
// Getting auth commands for AuthClass
command.extend(Self::authentication_start_commands(auth_config));
command.extend(vec![
"prepare_signal_handlers".to_string(),
"airflow webserver &".to_string(),
]);
}

AirflowRole::Scheduler => command.extend(vec![
// Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
"airflow db init".to_string(),
Expand Down Expand Up @@ -364,6 +377,38 @@ impl AirflowRole {
command
}

fn authentication_start_commands(
auth_config: &AirflowClientAuthenticationDetailsResolved,
) -> Vec<String> {
let mut commands = Vec::new();

let mut tls_client_credentials = BTreeSet::new();

for auth_class_resolved in &auth_config.authentication_classes_resolved {
match auth_class_resolved {
AirflowAuthenticationClassResolved::Oidc { provider, .. } => {
tls_client_credentials.insert(&provider.tls);

// WebPKI will be handled implicitly
}
AirflowAuthenticationClassResolved::Ldap { .. } => {}
}
}

for tls in tls_client_credentials {
commands.push(tls.tls_ca_cert_mount_path().map(|tls_ca_cert_mount_path| {
Self::add_cert_to_python_certifi_command(&tls_ca_cert_mount_path)
}));
}

commands.iter().flatten().cloned().collect::<Vec<_>>()
}

// Adding certificate to the mount path for airflow startup commands
fn add_cert_to_python_certifi_command(cert_file: &str) -> String {
siegfriedweber marked this conversation as resolved.
Show resolved Hide resolved
format!("cat {cert_file} >> \"$(python -c 'import certifi; print(certifi.where())')\"")
}

/// Will be used to expose service ports and - by extension - which roles should be
/// created as services.
pub fn get_http_port(&self) -> Option<u16> {
Expand Down
2 changes: 1 addition & 1 deletion rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ fn build_server_rolegroup_statefulset(
.context(GracefulShutdownSnafu)?;

let mut airflow_container_args = Vec::new();
airflow_container_args.extend(airflow_role.get_commands());
airflow_container_args.extend(airflow_role.get_commands(authentication_config));

airflow_container
.image_from_product_image(resolved_product_image)
Expand Down
8 changes: 0 additions & 8 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,6 @@ pub fn build_airflow_statefulset_envs(
AirflowRole::Webserver => {
let auth_vars = authentication_env_vars(auth_config);
env.extend(auth_vars.into_iter().map(|var| (var.name.to_owned(), var)));
env.insert(
"REQUESTS_CA_BUNDLE".into(),
EnvVar {
name: "REQUESTS_CA_BUNDLE".to_string(),
value: Some("/stackable/secrets/tls/ca.crt".to_string()),
..Default::default()
},
);
}
_ => {}
}
Expand Down
Loading