diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index f1446210cc401..1f263e26968cb 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -76,7 +76,7 @@ jobs: platforms: linux/amd64,linux/arm64 push: true build-args: | - features=embedding,parquet,openidconnect,jemalloc,deno_core + features=embedding,parquet,openidconnect,jemalloc,deno_core,dind tags: | ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ env.DEV_SHA }} ${{ steps.meta-public.outputs.tags }} @@ -138,7 +138,7 @@ jobs: platforms: linux/amd64,linux/arm64 push: true build-args: | - features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel + features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel,dind tags: | ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ee:${{ env.DEV_SHA }} ${{ steps.meta-ee-public.outputs.tags }} @@ -200,7 +200,7 @@ jobs: platforms: linux/amd64 push: true build-args: | - features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel + features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel,dind PYTHON_IMAGE=python:3.12.2-slim-bookworm tags: | ${{ steps.meta-ee-public-py312.outputs.tags }} diff --git a/backend/windmill-common/src/worker.rs b/backend/windmill-common/src/worker.rs index 9a4064ff1d80c..73205f6620037 100644 --- a/backend/windmill-common/src/worker.rs +++ b/backend/windmill-common/src/worker.rs @@ -17,7 +17,9 @@ use std::{ use tokio::sync::RwLock; use windmill_macros::annotations; -use crate::{error, global_settings::CUSTOM_TAGS_SETTING, indexer::TantivyIndexerSettings, server::Smtp, DB}; +use crate::{ + error, global_settings::CUSTOM_TAGS_SETTING, indexer::TantivyIndexerSettings, server::Smtp, DB, +}; lazy_static::lazy_static! { pub static ref WORKER_GROUP: String = std::env::var("WORKER_GROUP").unwrap_or_else(|_| "default".to_string()); @@ -352,6 +354,11 @@ pub struct SqlAnnotations { pub return_last_result: bool, } +#[annotations("#")] +pub struct BashAnnotations { + pub docker: bool, +} + pub async fn load_cache(bin_path: &str, _remote_path: &str) -> (bool, String) { if tokio::fs::metadata(&bin_path).await.is_ok() { (true, format!("loaded from local cache: {}\n", bin_path)) @@ -577,8 +584,10 @@ pub fn get_windmill_memory_usage() -> Option { } } -pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postgres>>(executor: E) -> bool { - use crate::utils::{GIT_VERSION, GIT_SEM_VERSION}; +pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postgres>>( + executor: E, +) -> bool { + use crate::utils::{GIT_SEM_VERSION, GIT_VERSION}; // fetch all pings with a different version than self from the last 5 minutes. let pings = sqlx::query_scalar!( diff --git a/backend/windmill-worker/src/bash_executor.rs b/backend/windmill-worker/src/bash_executor.rs index 680831e990b63..b6279ccf97dfb 100644 --- a/backend/windmill-worker/src/bash_executor.rs +++ b/backend/windmill-worker/src/bash_executor.rs @@ -1,14 +1,30 @@ use std::{collections::HashMap, fs, process::Stdio}; +#[cfg(feature = "dind")] +use bollard::container::{ + KillContainerOptions, RemoveContainerOptions, StatsOptions, StopContainerOptions, +}; +#[cfg(feature = "dind")] +use futures::{stream, StreamExt, TryStreamExt}; use regex::Regex; use serde_json::{json, value::RawValue}; use sqlx::types::Json; use tokio::process::Command; + +#[cfg(feature = "dind")] +use uuid::Uuid; use windmill_common::{ error::Error, jobs::QueuedJob, worker::{to_raw_value, write_file}, }; + +#[cfg(feature = "dind")] +use windmill_common::DB; + +#[cfg(feature = "dind")] +use windmill_common::error::to_anyhow; + use windmill_queue::{append_logs, CanceledBy}; lazy_static::lazy_static! { @@ -22,6 +38,9 @@ lazy_static::lazy_static! { static ref RE_POWERSHELL_IMPORTS: Regex = Regex::new(r#"^Import-Module\s+(?:-Name\s+)?"?([^-\s"]+)"?"#).unwrap(); } +#[cfg(feature = "dind")] +use crate::handle_child::run_future_with_polling_update_job_poller; + use crate::{ common::{ build_args_map, get_reserved_variables, read_file, read_file_content, start_child_process, @@ -54,8 +73,14 @@ pub async fn handle_bash_job( worker_name: &str, envs: HashMap, occupancy_metrics: &mut OccupancyMetrics, + _killpill_rx: &mut tokio::sync::broadcast::Receiver<()>, ) -> Result, Error> { - let logs1 = "\n\n--- BASH CODE EXECUTION ---\n".to_string(); + let annotation = windmill_common::worker::BashAnnotations::parse(&content); + + let mut logs1 = "\n\n--- BASH CODE EXECUTION ---\n".to_string(); + if annotation.docker { + logs1.push_str("docker mode\n"); + } append_logs(&job.id, &job.workspace_id, logs1, db).await; write_file(job_dir, "main.sh", &format!("set -e\n{content}"))?; @@ -193,6 +218,22 @@ exit $exit_status ) .await?; + #[cfg(feature = "dind")] + if annotation.docker { + return handle_docker_job( + job.id, + &job.workspace_id, + db, + job.timeout, + mem_peak, + canceled_by, + worker_name, + occupancy_metrics, + _killpill_rx, + ) + .await; + } + let result_json_path = format!("{job_dir}/result.json"); if let Ok(metadata) = tokio::fs::metadata(&result_json_path).await { if metadata.len() > 0 { @@ -214,6 +255,7 @@ exit $exit_status .await? .trim() .to_string(); + return Ok(to_raw_value(&json!(result))); } @@ -222,6 +264,190 @@ exit $exit_status ))) } +#[cfg(feature = "dind")] +async fn handle_docker_job( + job_id: Uuid, + workspace_id: &str, + db: &DB, + job_timeout: Option, + mem_peak: &mut i32, + canceled_by: &mut Option, + worker_name: &str, + occupancy_metrics: &mut OccupancyMetrics, + killpill_rx: &mut tokio::sync::broadcast::Receiver<()>, +) -> Result, Error> { + let client = bollard::Docker::connect_with_unix_defaults().map_err(to_anyhow)?; + + let container_id = job_id.to_string(); + let inspected = client.inspect_container(&container_id, None).await; + + if inspected.is_err() || inspected.unwrap().state.is_none() { + return Ok(to_raw_value(&format!( + "Container not found at {job_id}, you must use --name and not --rm it" + ))); + } + + let wait_f = async { + let wait = client + .wait_container::(&container_id, None) + .try_collect::>() + .await + .map_err(|e| { + tracing::error!("Error waiting for container: {:?}", e); + anyhow::anyhow!("Error waiting for container") + })?; + let waited = wait.first().map(|x| x.status_code); + Ok(waited) + }; + + let ncontainer_id = container_id.to_string(); + let w_id = workspace_id.to_string(); + let j_id = job_id.clone(); + let db2 = db.clone(); + let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1); + + let mut killpill_rx = killpill_rx.resubscribe(); + let logs = tokio::spawn(async move { + let client = bollard::Docker::connect_with_unix_defaults().map_err(to_anyhow); + if let Ok(client) = client { + let mut log_stream = client.logs( + &ncontainer_id, + Some(bollard::container::LogsOptions { + follow: true, + stdout: true, + stderr: true, + tail: "all", + ..Default::default() + }), + ); + loop { + tokio::select! { + log = log_stream.next() => { + match log { + Some(Ok(log)) => { + append_logs(&j_id, w_id.clone(), log.to_string(), db2.clone()).await; + } + Some(Err(e)) => { + tracing::error!("Error getting logs: {:?}", e); + } + _ => { + tracing::error!("End of stream"); + return + } + }; + }, + _ = killpill_rx.recv() => { + tracing::error!("killing container after receving killpill"); + if let Err(e) = client + .stop_container(&ncontainer_id, Some(StopContainerOptions { t: 3 })) + .await + { + tracing::error!("Error stopping container: {:?}", e); + } + return + }, + _ = rx.recv() => { + return + } + } + } + } + }); + + let mem_client = bollard::Docker::connect_with_unix_defaults().map_err(to_anyhow); + let ncontainer_id = container_id.clone(); + let result = run_future_with_polling_update_job_poller( + job_id, + job_timeout, + db, + mem_peak, + canceled_by, + wait_f, + worker_name, + workspace_id, + &mut Some(occupancy_metrics), + Box::pin(match mem_client { + Ok(client) => client + .stats( + &ncontainer_id, + Some(StatsOptions { stream: true, one_shot: false }), + ) + .map(|x| { + x.map(|x| x.memory_stats.usage.map(|m| m / 1024).unwrap_or_default() as i32) + .unwrap_or_default() + }) + .boxed(), + _ => stream::once(async { 0 }).boxed(), + }), + ) + .await; + + if let Err(e) = result { + if !logs.is_finished() { + let _ = tx.send(()); + let _ = logs.await; + } + if container_is_alive(&client, &container_id).await { + kill_container(&client, &container_id, "SIGINT").await; + if container_is_alive(&client, &container_id).await { + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + if let Err(e) = client + .stop_container(&container_id, Some(StopContainerOptions { t: 3 })) + .await + { + tracing::error!("Error stopping container: {:?}", e); + } + } + } + + return Err(e); + } + + if let Err(e) = client + .remove_container( + &container_id, + Some(RemoveContainerOptions { force: true, ..Default::default() }), + ) + .await + { + tracing::error!("Error removing container: {:?}", e); + } + + let result = result.unwrap(); + + return Ok(to_raw_value(&json!(format!( + "Docker exit status: {}", + result + .map(|x| x.to_string()) + .unwrap_or_else(|| "none".to_string()) + )))); +} + +#[cfg(feature = "dind")] +async fn kill_container(client: &bollard::Docker, container_id: &str, signal: &str) { + if let Err(e) = client + .kill_container(&container_id, Some(KillContainerOptions { signal })) + .await + { + tracing::error!("Error killing container with signal {signal}: {:?}", e); + } +} + +#[cfg(feature = "dind")] +async fn container_is_alive(client: &bollard::Docker, container_id: &str) -> bool { + let inspect = client.inspect_container(container_id, None).await; + if let Ok(inspect) = inspect { + let r = inspect + .state + .map(|x| x.running.unwrap_or_default()) + .unwrap_or_default(); + tracing::error!("Container {container_id} is alive: {r}"); + r + } else { + false + } +} + fn raw_to_string(x: &str) -> String { match serde_json::from_str::(x) { Ok(serde_json::Value::String(x)) => x, diff --git a/backend/windmill-worker/src/graphql_executor.rs b/backend/windmill-worker/src/graphql_executor.rs index 8fb8cf13b9f9d..67bb6a2c7ba64 100644 --- a/backend/windmill-worker/src/graphql_executor.rs +++ b/backend/windmill-worker/src/graphql_executor.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use anyhow::anyhow; -use futures::TryStreamExt; +use futures::{stream, TryStreamExt}; use serde_json::{json, value::RawValue}; use sqlx::types::Json; use windmill_common::jobs::QueuedJob; @@ -154,6 +154,7 @@ pub async fn do_graphql( worker_name, &job.workspace_id, &mut Some(occupation_metrics), + Box::pin(stream::once(async { 0 })), ) .await?; diff --git a/backend/windmill-worker/src/handle_child.rs b/backend/windmill-worker/src/handle_child.rs index 54eda3b1ba872..f2fead39c9cf9 100644 --- a/backend/windmill-worker/src/handle_child.rs +++ b/backend/windmill-worker/src/handle_child.rs @@ -137,7 +137,9 @@ pub async fn handle_child( db, mem_peak, canceled_by_ref, - || get_mem_peak(pid, nsjail), + Box::pin(stream::unfold((), move |_| async move { + Some((get_mem_peak(pid, nsjail).await, ())) + })), worker, w_id, rx, @@ -486,7 +488,7 @@ async fn get_mem_peak(pid: Option, nsjail: bool) -> i32 { } } -pub async fn run_future_with_polling_update_job_poller( +pub async fn run_future_with_polling_update_job_poller( job_id: Uuid, timeout: Option, db: &DB, @@ -496,9 +498,11 @@ pub async fn run_future_with_polling_update_job_poller( worker_name: &str, w_id: &str, occupancy_metrics: &mut Option<&mut OccupancyMetrics>, + get_mem: S, ) -> error::Result where Fut: Future>, + S: stream::Stream + Unpin, { let (tx, rx) = broadcast::channel::<()>(3); @@ -507,7 +511,7 @@ where db, mem_peak, canceled_by_ref, - || async { 0 }, + get_mem, worker_name, w_id, rx, @@ -548,20 +552,19 @@ pub enum UpdateJobPollingExit { AlreadyCompleted, } -pub async fn update_job_poller( +pub async fn update_job_poller( job_id: Uuid, db: &DB, mem_peak: &mut i32, canceled_by_ref: &mut Option, - get_mem: F, + mut get_mem: S, worker_name: &str, w_id: &str, mut rx: broadcast::Receiver<()>, occupancy_metrics: &mut Option<&mut OccupancyMetrics>, ) -> UpdateJobPollingExit where - F: Fn() -> Fut, - Fut: Future, + S: stream::Stream + Unpin, { let update_job_interval = Duration::from_millis(500); @@ -606,7 +609,7 @@ where .expect("update worker ping"); } } - let current_mem = get_mem().await; + let current_mem = get_mem.next().await.unwrap_or(0); if current_mem > *mem_peak { *mem_peak = current_mem } diff --git a/backend/windmill-worker/src/js_eval.rs b/backend/windmill-worker/src/js_eval.rs index 8dd1f8d252e0f..4519638fef133 100644 --- a/backend/windmill-worker/src/js_eval.rs +++ b/backend/windmill-worker/src/js_eval.rs @@ -73,23 +73,23 @@ pub struct ContainerRootCertStoreProvider { root_cert_store: RootCertStore, } -#[cfg(feature = "deno_core")] -impl ContainerRootCertStoreProvider { - fn new() -> ContainerRootCertStoreProvider { - return ContainerRootCertStoreProvider { - root_cert_store: deno_tls::create_default_root_cert_store(), - }; - } - - fn add_certificate(&mut self, cert_path: String) -> io::Result<()> { - let cert_file = std::fs::File::open(cert_path)?; - let mut reader = BufReader::new(cert_file); - let pem_file = rustls_pemfile::certs(&mut reader).collect::, _>>()?; - - self.root_cert_store.add_parsable_certificates(pem_file); - Ok(()) - } -} +// #[cfg(feature = "deno_core")] +// impl ContainerRootCertStoreProvider { +// fn new() -> ContainerRootCertStoreProvider { +// return ContainerRootCertStoreProvider { +// root_cert_store: deno_tls::create_default_root_cert_store(), +// }; +// } + +// fn add_certificate(&mut self, cert_path: String) -> io::Result<()> { +// let cert_file = std::fs::File::open(cert_path)?; +// let mut reader = BufReader::new(cert_file); +// let pem_file = rustls_pemfile::certs(&mut reader).collect::, _>>()?; + +// self.root_cert_store.add_parsable_certificates(pem_file); +// Ok(()) +// } +// } #[cfg(feature = "deno_core")] impl deno_tls::RootCertStoreProvider for ContainerRootCertStoreProvider { diff --git a/backend/windmill-worker/src/mysql_executor.rs b/backend/windmill-worker/src/mysql_executor.rs index 3ed6a9bf34be4..b98ec365eec9c 100644 --- a/backend/windmill-worker/src/mysql_executor.rs +++ b/backend/windmill-worker/src/mysql_executor.rs @@ -298,6 +298,7 @@ pub async fn do_mysql( worker_name, &job.workspace_id, &mut Some(occupancy_metrics), + Box::pin(futures::stream::once(async { 0 })), ) .await?; diff --git a/backend/windmill-worker/src/pg_executor.rs b/backend/windmill-worker/src/pg_executor.rs index 57d7f48905fb6..2ad6eb403863a 100644 --- a/backend/windmill-worker/src/pg_executor.rs +++ b/backend/windmill-worker/src/pg_executor.rs @@ -355,6 +355,7 @@ pub async fn do_postgresql( worker_name, &job.workspace_id, &mut Some(occupancy_metrics), + Box::pin(futures::stream::once(async { 0 })), ) .await?; diff --git a/backend/windmill-worker/src/python_executor.rs b/backend/windmill-worker/src/python_executor.rs index 3d1ba79def398..6df2c0bc0bd03 100644 --- a/backend/windmill-worker/src/python_executor.rs +++ b/backend/windmill-worker/src/python_executor.rs @@ -318,7 +318,7 @@ pub async fn uv_pip_compile( if let Some(cert_path) = PIP_INDEX_CERT.as_ref() { args.extend(["--cert", cert_path]); } - tracing::debug!("uv args: {:?}", args); + tracing::error!("uv args: {:?}", args); #[cfg(windows)] let uv_cmd = "uv"; @@ -329,7 +329,7 @@ pub async fn uv_pip_compile( let mut child_cmd = Command::new(uv_cmd); child_cmd .current_dir(job_dir) - .args(args) + .args(&args) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let child_process = start_child_process(child_cmd, uv_cmd).await?; @@ -350,7 +350,7 @@ pub async fn uv_pip_compile( occupancy_metrics, ) .await - .map_err(|e| Error::ExecutionErr(format!("Lock file generation failed: {e:?}")))?; + .map_err(|e| Error::ExecutionErr(format!("Lock file generation failed.\n\ncommand: {uv_cmd} {}\n\n{e:?}", args.join(" "))))?; } let path_lock = format!("{job_dir}/requirements.txt"); diff --git a/backend/windmill-worker/src/worker.rs b/backend/windmill-worker/src/worker.rs index f1793961232fa..4af815cd7694e 100644 --- a/backend/windmill-worker/src/worker.rs +++ b/backend/windmill-worker/src/worker.rs @@ -1081,6 +1081,7 @@ pub async fn run_worker( let mut last_suspend_first = Instant::now(); let mut killed_but_draining_same_worker_jobs = false; + let mut killpill_rx2 = killpill_rx.resubscribe(); loop { #[cfg(feature = "enterprise")] { @@ -1575,6 +1576,7 @@ pub async fn run_worker( base_internal_url, job_completed_tx.clone(), &mut occupancy_metrics, + &mut killpill_rx2, #[cfg(feature = "benchmark")] &mut bench, ) @@ -1867,6 +1869,7 @@ async fn handle_queued_job( base_internal_url: &str, job_completed_tx: JobCompletedSender, occupancy_metrics: &mut OccupancyMetrics, + killpill_rx: &mut tokio::sync::broadcast::Receiver<()>, #[cfg(feature = "benchmark")] bench: &mut BenchmarkIter, ) -> windmill_common::error::Result { // Extract the active span from the context @@ -2177,6 +2180,7 @@ async fn handle_queued_job( &mut column_order, &mut new_args, occupancy_metrics, + killpill_rx, ) .await; occupancy_metrics.total_duration_of_running_jobs += @@ -2321,6 +2325,8 @@ async fn handle_code_execution_job( column_order: &mut Option>, new_args: &mut Option>>, occupancy_metrics: &mut OccupancyMetrics, + killpill_rx: &mut tokio::sync::broadcast::Receiver<()>, + ) -> error::Result> { let ContentReqLangEnvs { content: inner_content, @@ -2664,6 +2670,7 @@ mount {{ worker_name, envs, occupancy_metrics, + killpill_rx, ) .await } diff --git a/frontend/src/lib/script_helpers.ts b/frontend/src/lib/script_helpers.ts index 9f78127b45347..3cac89df8c5ec 100644 --- a/frontend/src/lib/script_helpers.ts +++ b/frontend/src/lib/script_helpers.ts @@ -630,8 +630,11 @@ def preprocessor( ` const DOCKER_INIT_CODE = `# shellcheck shell=bash -# Bash script that calls docker as a client to the host daemon -# See documentation: https://www.windmill.dev/docs/advanced/docker +# docker +# The annotation "docker" above is important, it tells windmill that after +# the end of the bash script, it should manage the container at id $WM_JOB_ID: +# pipe logs, monitor memory usage, kill container if job is cancelled. + msg="\${1:-world}" IMAGE="alpine:latest" @@ -639,7 +642,9 @@ COMMAND="/bin/echo Hello $msg" # ensure that the image is up-to-date docker pull $IMAGE -docker run --rm $IMAGE $COMMAND + +# if using the 'docker' mode, name it with $WM_JOB_ID for windmill to monitor it +docker run --name $WM_JOB_ID -it -d $IMAGE $COMMAND ` const POWERSHELL_INIT_CODE = `param($Msg, $Dflt = "default value", [int]$Nb = 3)