Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: critical error side channel #3625

Merged
merged 13 commits into from
May 2, 2024
2 changes: 1 addition & 1 deletion backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
66d9cbb158ab9a5869a45ba253bf57f2cbb5ecb6
f59f9c1e55e5e93f9eb6c081847cc566611029d2
24 changes: 15 additions & 9 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use tokio::fs::DirBuilder;
use windmill_api::HTTP_CLIENT;
use windmill_common::{
global_settings::{
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CUSTOM_TAGS_SETTING,
DEFAULT_TAGS_PER_WORKSPACE_SETTING, ENV_SETTINGS, EXPOSE_DEBUG_METRICS_SETTING,
EXPOSE_METRICS_SETTING, EXTRA_PIP_INDEX_URL_SETTING, HUB_BASE_URL_SETTING,
JOB_DEFAULT_TIMEOUT_SECS_SETTING, KEEP_JOB_DIR_SETTING, LICENSE_KEY_SETTING,
NPM_CONFIG_REGISTRY_SETTING, OAUTH_SETTING, PIP_INDEX_URL_SETTING,
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ERROR_CHANNELS_SETTING,
CUSTOM_TAGS_SETTING, DEFAULT_TAGS_PER_WORKSPACE_SETTING, ENV_SETTINGS,
EXPOSE_DEBUG_METRICS_SETTING, EXPOSE_METRICS_SETTING, EXTRA_PIP_INDEX_URL_SETTING,
HUB_BASE_URL_SETTING, JOB_DEFAULT_TIMEOUT_SECS_SETTING, KEEP_JOB_DIR_SETTING,
LICENSE_KEY_SETTING, NPM_CONFIG_REGISTRY_SETTING, OAUTH_SETTING, PIP_INDEX_URL_SETTING,
REQUEST_SIZE_LIMIT_SETTING, REQUIRE_PREEXISTING_USER_FOR_OAUTH_SETTING,
RETENTION_PERIOD_SECS_SETTING, SAML_METADATA_SETTING, SCIM_TOKEN_SETTING,
},
Expand All @@ -47,10 +47,11 @@ use windmill_worker::{
use crate::monitor::{
initial_load, load_keep_job_dir, load_require_preexisting_user, load_tag_per_workspace_enabled,
monitor_db, monitor_pool, reload_base_url_setting, reload_bunfig_install_scopes_setting,
reload_extra_pip_index_url_setting, reload_hub_base_url_setting,
reload_job_default_timeout_setting, reload_license_key, reload_npm_config_registry_setting,
reload_pip_index_url_setting, reload_retention_period_setting, reload_scim_token_setting,
reload_server_config, reload_worker_config,
reload_critical_error_channels_setting, reload_extra_pip_index_url_setting,
reload_hub_base_url_setting, reload_job_default_timeout_setting, reload_license_key,
reload_npm_config_registry_setting, reload_pip_index_url_setting,
reload_retention_period_setting, reload_scim_token_setting, reload_server_config,
reload_worker_config,
};

#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -503,6 +504,11 @@ Windmill Community Edition {GIT_VERSION}
tracing::error!(error = %e, "Could not reload hub base url setting");
}
},
CRITICAL_ERROR_CHANNELS_SETTING => {
if let Err(e) = reload_critical_error_channels_setting(&db).await {
tracing::error!(error = %e, "Could not reload critical error emails setting");
}
},
a @_ => {
tracing::info!("Unrecognized Global Setting Change Payload: {:?}", a);
}
Expand Down
45 changes: 38 additions & 7 deletions backend/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ use windmill_api::{
DEFAULT_BODY_LIMIT, IS_SECURE, OAUTH_CLIENTS, REQUEST_SIZE_LIMIT, SAML_METADATA, SCIM_TOKEN,
};
use windmill_common::{
ee::CriticalErrorChannel,
error,
flow_status::FlowStatusModule,
global_settings::{
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, DEFAULT_TAGS_PER_WORKSPACE_SETTING,
EXPOSE_DEBUG_METRICS_SETTING, EXPOSE_METRICS_SETTING, EXTRA_PIP_INDEX_URL_SETTING,
HUB_BASE_URL_SETTING, JOB_DEFAULT_TIMEOUT_SECS_SETTING, KEEP_JOB_DIR_SETTING,
LICENSE_KEY_SETTING, NPM_CONFIG_REGISTRY_SETTING, OAUTH_SETTING, PIP_INDEX_URL_SETTING,
REQUEST_SIZE_LIMIT_SETTING, REQUIRE_PREEXISTING_USER_FOR_OAUTH_SETTING,
RETENTION_PERIOD_SECS_SETTING, SAML_METADATA_SETTING, SCIM_TOKEN_SETTING,
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ERROR_CHANNELS_SETTING,
DEFAULT_TAGS_PER_WORKSPACE_SETTING, EXPOSE_DEBUG_METRICS_SETTING, EXPOSE_METRICS_SETTING,
EXTRA_PIP_INDEX_URL_SETTING, HUB_BASE_URL_SETTING, JOB_DEFAULT_TIMEOUT_SECS_SETTING,
KEEP_JOB_DIR_SETTING, LICENSE_KEY_SETTING, NPM_CONFIG_REGISTRY_SETTING, OAUTH_SETTING,
PIP_INDEX_URL_SETTING, REQUEST_SIZE_LIMIT_SETTING,
REQUIRE_PREEXISTING_USER_FOR_OAUTH_SETTING, RETENTION_PERIOD_SECS_SETTING,
SAML_METADATA_SETTING, SCIM_TOKEN_SETTING,
},
jobs::QueuedJob,
oauth2::REQUIRE_PREEXISTING_USER_FOR_OAUTH,
Expand All @@ -40,7 +42,8 @@ use windmill_common::{
load_worker_config, reload_custom_tags_setting, DEFAULT_TAGS_PER_WORKSPACE, SERVER_CONFIG,
WORKER_CONFIG,
},
BASE_URL, DB, DEFAULT_HUB_BASE_URL, HUB_BASE_URL, METRICS_DEBUG_ENABLED, METRICS_ENABLED,
BASE_URL, CRITICAL_ERROR_CHANNELS, DB, DEFAULT_HUB_BASE_URL, HUB_BASE_URL,
METRICS_DEBUG_ENABLED, METRICS_ENABLED,
};
use windmill_queue::cancel_job;
use windmill_worker::{
Expand Down Expand Up @@ -143,6 +146,10 @@ pub async fn initial_load(
tracing::error!("Error reloading hub base url: {:?}", e)
}

if let Err(e) = reload_critical_error_channels_setting(&db).await {
tracing::error!("Could not reload critical error emails setting: {:?}", e);
}

#[cfg(feature = "parquet")]
if !_is_agent {
reload_s3_cache_setting(&db).await;
Expand Down Expand Up @@ -1074,3 +1081,27 @@ pub async fn reload_hub_base_url_setting(db: &DB, server_mode: bool) -> error::R

Ok(())
}

pub async fn reload_critical_error_channels_setting(db: &DB) -> error::Result<()> {
let critical_error_channels =
load_value_from_global_settings(db, CRITICAL_ERROR_CHANNELS_SETTING).await?;

let critical_error_channels = if let Some(q) = critical_error_channels {
if let Ok(v) = serde_json::from_value::<Vec<CriticalErrorChannel>>(q.clone()) {
v
} else {
tracing::error!(
"Could not parse critical_error_emails setting as an array of channels, found: {:#?}",
&q
);
vec![]
}
} else {
vec![]
};

let mut l = CRITICAL_ERROR_CHANNELS.write().await;
*l = critical_error_channels;

Ok(())
}
1 change: 0 additions & 1 deletion backend/windmill-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ async_zip.workspace = true
rsmq_async.workspace = true
regex.workspace = true
bytes.workspace = true
mail-send.workspace = true
samael = { workspace = true, optional = true }
async-recursion.workspace = true
rsa.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ async fn update_flow(
clear_schedule(tx.transaction_mut(), &schedule.path, &w_id).await?;

if schedule.enabled {
tx = push_scheduled_job(&db, tx, schedule).await?;
tx = push_scheduled_job(&db, tx, &schedule).await?;
}
}

Expand Down
8 changes: 4 additions & 4 deletions backend/windmill-api/src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async fn create_schedule(
.await?;

if ns.enabled.unwrap_or(true) {
tx = push_scheduled_job(&db, tx, schedule).await?
tx = push_scheduled_job(&db, tx, &schedule).await?
}
tx.commit().await?;

Expand Down Expand Up @@ -303,7 +303,7 @@ async fn edit_schedule(
.await?;

if schedule.enabled {
tx = push_scheduled_job(&db, tx, schedule).await?;
tx = push_scheduled_job(&db, tx, &schedule).await?;
}
tx.commit().await?;

Expand Down Expand Up @@ -512,7 +512,7 @@ pub async fn set_enabled(
.await?;

if payload.enabled {
tx = push_scheduled_job(&db, tx, schedule).await?;
tx = push_scheduled_job(&db, tx, &schedule).await?;
}
tx.commit().await?;

Expand Down Expand Up @@ -560,7 +560,7 @@ pub async fn set_enabled(
// .await?;

// if payload.enabled {
// tx = push_scheduled_job(&db, tx, schedule).await?;
// tx = push_scheduled_job(&db, tx, &schedule).await?;
// }
// tx.commit().await?;

Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-api/src/scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ async fn create_script(
clear_schedule(tx.transaction_mut(), &schedule.path, &w_id).await?;

if schedule.enabled {
tx = push_scheduled_job(&db, tx, schedule).await?;
tx = push_scheduled_job(&db, tx, &schedule).await?;
}
}
} else {
Expand Down
47 changes: 16 additions & 31 deletions backend/windmill-api/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ use axum::{
Json, Router,
};

use mail_send::{mail_builder::MessageBuilder, SmtpClientBuilder};
use serde::Deserialize;
use tokio::time::timeout;
use windmill_common::{
error::{self, to_anyhow, JsonResult, Result},
error::{self, JsonResult, Result},
global_settings::{AUTOMATE_USERNAME_CREATION_SETTING, ENV_SETTINGS, HUB_BASE_URL_SETTING},
server::Smtp,
utils::send_email,
};

#[cfg(feature = "parquet")]
use windmill_common::error::to_anyhow;

pub fn global_service() -> Router {
#[warn(unused_mut)]
let r = Router::new()
Expand Down Expand Up @@ -67,34 +69,17 @@ pub async fn test_email(
require_super_admin(&db, &authed.email).await?;
let smtp = test_email.smtp;
let to = test_email.to;
let mut client = SmtpClientBuilder::new(smtp.host, smtp.port)
.implicit_tls(smtp.tls_implicit.unwrap_or(false));
if std::env::var("ACCEPT_INVALID_CERTS").is_ok() {
client = client.allow_invalid_certs();
}
let client = if let (Some(username), Some(password)) = (smtp.username, smtp.password) {
if !username.is_empty() {
client.credentials((username, password))
} else {
client
}
} else {
client
};
let message = MessageBuilder::new()
.from(("Windmill", smtp.from.as_str()))
.to(to.clone())
.subject("Test email from Windmill")
.text_body("Test email content");
let dur = Duration::from_secs(3);
timeout(dur, client.connect())
.await
.map_err(to_anyhow)?
.map_err(to_anyhow)?
.send(message)
.await
.map_err(to_anyhow)?;
tracing::info!("Sent test email to {to}");

let client_timeout = Duration::from_secs(3);
send_email(
"Test email from Windmill",
"Test email content",
vec![to],
smtp,
Some(client_timeout),
)
.await?;

Ok("Sent test email".to_string())
}

Expand Down
39 changes: 6 additions & 33 deletions backend/windmill-api/src/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use axum::{
};
use hyper::{header::LOCATION, StatusCode};
use lazy_static::lazy_static;
use mail_send::mail_builder::MessageBuilder;
use mail_send::SmtpClientBuilder;
use quick_cache::sync::Cache;
use rand::rngs::OsRng;
use regex::Regex;
Expand All @@ -46,10 +44,11 @@ use windmill_audit::audit_ee::audit_log;
use windmill_audit::ActionKind;
use windmill_common::global_settings::AUTOMATE_USERNAME_CREATION_SETTING;
use windmill_common::users::truncate_token;
use windmill_common::utils::send_email;
use windmill_common::worker::{CLOUD_HOSTED, SERVER_CONFIG};
use windmill_common::{
db::UserDB,
error::{self, to_anyhow, Error, JsonResult, Result},
error::{self, Error, JsonResult, Result},
users::SUPERADMIN_SECRET_EMAIL,
utils::{not_found_if_none, rd_string, require_admin, Pagination, StripPath},
};
Expand Down Expand Up @@ -1824,41 +1823,15 @@ pub fn send_email_if_possible(subject: &str, content: &str, to: &str) {
let content = content.to_string();
let to = to.to_string();
tokio::spawn(async move {
if let Err(e) = send_email_if_possible_intern(&subject, &content, &to).await {
tracing::error!("Failed to send email to {}: {}", &to, e);
if let Err(e) = send_email_if_possible_intern(&subject, &content, to.clone()).await {
tracing::error!("Failed to send email to {}: {}", to, e);
}
});
}

pub async fn send_email_if_possible_intern(subject: &str, content: &str, to: &str) -> Result<()> {
pub async fn send_email_if_possible_intern(subject: &str, content: &str, to: String) -> Result<()> {
if let Some(smtp) = SERVER_CONFIG.read().await.smtp.clone() {
let mut client = SmtpClientBuilder::new(smtp.host, smtp.port)
.implicit_tls(smtp.tls_implicit.unwrap_or(false));
if std::env::var("ACCEPT_INVALID_CERTS").is_ok() {
client = client.allow_invalid_certs();
}
let client = if let (Some(username), Some(password)) = (smtp.username, smtp.password) {
if !username.is_empty() {
client.credentials((username, password))
} else {
client
}
} else {
client
};
let message = MessageBuilder::new()
.from(("Windmill", smtp.from.as_str()))
.to(to)
.subject(subject)
.text_body(content);
client
.connect()
.await
.map_err(to_anyhow)?
.send(message)
.await
.map_err(to_anyhow)?;
tracing::info!("Sent email to {to}: {subject}");
send_email(subject, content, vec![to], smtp, None).await?;
}
return Ok(());
}
Expand Down
17 changes: 8 additions & 9 deletions backend/windmill-api/src/workspaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use windmill_common::{
variables::ExportableListableVariable,
};
use windmill_git_sync::handle_deployment_metadata;
use windmill_queue::QueueTransaction;

use crate::oauth2_ee::InstanceEvent;
use crate::variables::{decrypt, encrypt};
Expand Down Expand Up @@ -488,25 +487,25 @@ async fn run_slack_message_test_job(
json!(format!("$res:{WORKSPACE_SLACK_BOT_TOKEN_PATH}")),
);

let tx: QueueTransaction<'_, _> = (rsmq.clone(), db.begin().await?).into();
let (uuid, tx) = windmill_queue::handle_on_failure(
let uuid = windmill_queue::push_error_handler(
&db,
tx,
rsmq,
Uuid::parse_str("00000000-0000-0000-0000-000000000000")?,
"slack_message_test",
"slack_message_test",
None,
Some("slack_message_test".to_string()),
false,
w_id.as_str(),
&format!("script/{}", req.hub_script_path.as_str()),
sqlx::types::Json(&fake_result),
0,
Utc::now(),
None,
Some(Utc::now()),
Some(json!(extra_args)),
authed.email.as_str(),
false,
false,
None, // Note: we could mark it as high priority to return result quickly to the user
)
.await?;
tx.commit().await?;

Ok(Json(RunSlackMessageTestJobResponse {
job_uuid: uuid.to_string(),
Expand Down
3 changes: 2 additions & 1 deletion backend/windmill-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ object_store = { workspace = true, optional = true }
prometheus = { workspace = true, optional = true }
aws-config = { workspace = true, optional = true }
aws-sdk-sts = { workspace = true, optional = true }
indexmap.workspace = true
indexmap.workspace = true
mail-send.workspace = true
Loading
Loading