Skip to content

Commit

Permalink
feat: docker runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Dec 9, 2024
1 parent 81f35e7 commit e36281c
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 39 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
platforms: linux/amd64,linux/arm64
push: true
build-args: |
features=embedding,parquet,openidconnect,jemalloc,deno_core
features=embedding,parquet,openidconnect,jemalloc,deno_core,dind
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ env.DEV_SHA }}
${{ steps.meta-public.outputs.tags }}
Expand Down Expand Up @@ -138,7 +138,7 @@ jobs:
platforms: linux/amd64,linux/arm64
push: true
build-args: |
features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel
features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel,dind
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ee:${{ env.DEV_SHA }}
${{ steps.meta-ee-public.outputs.tags }}
Expand Down Expand Up @@ -200,7 +200,7 @@ jobs:
platforms: linux/amd64
push: true
build-args: |
features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel
features=enterprise,enterprise_saml,stripe,embedding,parquet,prometheus,openidconnect,cloud,jemalloc,tantivy,deno_core,kafka,otel,dind
PYTHON_IMAGE=python:3.12.2-slim-bookworm
tags: |
${{ steps.meta-ee-public-py312.outputs.tags }}
Expand Down
15 changes: 12 additions & 3 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::{
use tokio::sync::RwLock;
use windmill_macros::annotations;

use crate::{error, global_settings::CUSTOM_TAGS_SETTING, indexer::TantivyIndexerSettings, server::Smtp, DB};
use crate::{
error, global_settings::CUSTOM_TAGS_SETTING, indexer::TantivyIndexerSettings, server::Smtp, DB,
};

lazy_static::lazy_static! {
pub static ref WORKER_GROUP: String = std::env::var("WORKER_GROUP").unwrap_or_else(|_| "default".to_string());
Expand Down Expand Up @@ -352,6 +354,11 @@ pub struct SqlAnnotations {
pub return_last_result: bool,
}

#[annotations("#")]
pub struct BashAnnotations {
pub docker: bool,
}

pub async fn load_cache(bin_path: &str, _remote_path: &str) -> (bool, String) {
if tokio::fs::metadata(&bin_path).await.is_ok() {
(true, format!("loaded from local cache: {}\n", bin_path))
Expand Down Expand Up @@ -577,8 +584,10 @@ pub fn get_windmill_memory_usage() -> Option<i64> {
}
}

pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postgres>>(executor: E) -> bool {
use crate::utils::{GIT_VERSION, GIT_SEM_VERSION};
pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postgres>>(
executor: E,
) -> bool {
use crate::utils::{GIT_SEM_VERSION, GIT_VERSION};

// fetch all pings with a different version than self from the last 5 minutes.
let pings = sqlx::query_scalar!(
Expand Down
228 changes: 227 additions & 1 deletion backend/windmill-worker/src/bash_executor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
use std::{collections::HashMap, fs, process::Stdio};

#[cfg(feature = "dind")]
use bollard::container::{
KillContainerOptions, RemoveContainerOptions, StatsOptions, StopContainerOptions,
};
#[cfg(feature = "dind")]
use futures::{stream, StreamExt, TryStreamExt};
use regex::Regex;
use serde_json::{json, value::RawValue};
use sqlx::types::Json;
use tokio::process::Command;

#[cfg(feature = "dind")]
use uuid::Uuid;
use windmill_common::{
error::Error,
jobs::QueuedJob,
worker::{to_raw_value, write_file},
};

#[cfg(feature = "dind")]
use windmill_common::DB;

#[cfg(feature = "dind")]
use windmill_common::error::to_anyhow;

use windmill_queue::{append_logs, CanceledBy};

lazy_static::lazy_static! {
Expand All @@ -22,6 +38,9 @@ lazy_static::lazy_static! {
static ref RE_POWERSHELL_IMPORTS: Regex = Regex::new(r#"^Import-Module\s+(?:-Name\s+)?"?([^-\s"]+)"?"#).unwrap();
}

#[cfg(feature = "dind")]
use crate::handle_child::run_future_with_polling_update_job_poller;

use crate::{
common::{
build_args_map, get_reserved_variables, read_file, read_file_content, start_child_process,
Expand Down Expand Up @@ -54,8 +73,14 @@ pub async fn handle_bash_job(
worker_name: &str,
envs: HashMap<String, String>,
occupancy_metrics: &mut OccupancyMetrics,
_killpill_rx: &mut tokio::sync::broadcast::Receiver<()>,
) -> Result<Box<RawValue>, Error> {
let logs1 = "\n\n--- BASH CODE EXECUTION ---\n".to_string();
let annotation = windmill_common::worker::BashAnnotations::parse(&content);

let mut logs1 = "\n\n--- BASH CODE EXECUTION ---\n".to_string();
if annotation.docker {
logs1.push_str("docker mode\n");
}
append_logs(&job.id, &job.workspace_id, logs1, db).await;

write_file(job_dir, "main.sh", &format!("set -e\n{content}"))?;
Expand Down Expand Up @@ -193,6 +218,22 @@ exit $exit_status
)
.await?;

#[cfg(feature = "dind")]
if annotation.docker {
return handle_docker_job(
job.id,
&job.workspace_id,
db,
job.timeout,
mem_peak,
canceled_by,
worker_name,
occupancy_metrics,
_killpill_rx,
)
.await;
}

let result_json_path = format!("{job_dir}/result.json");
if let Ok(metadata) = tokio::fs::metadata(&result_json_path).await {
if metadata.len() > 0 {
Expand All @@ -214,6 +255,7 @@ exit $exit_status
.await?
.trim()
.to_string();

return Ok(to_raw_value(&json!(result)));
}

Expand All @@ -222,6 +264,190 @@ exit $exit_status
)))
}

#[cfg(feature = "dind")]
async fn handle_docker_job(
job_id: Uuid,
workspace_id: &str,
db: &DB,
job_timeout: Option<i32>,
mem_peak: &mut i32,
canceled_by: &mut Option<CanceledBy>,
worker_name: &str,
occupancy_metrics: &mut OccupancyMetrics,
killpill_rx: &mut tokio::sync::broadcast::Receiver<()>,
) -> Result<Box<RawValue>, Error> {
let client = bollard::Docker::connect_with_unix_defaults().map_err(to_anyhow)?;

let container_id = job_id.to_string();
let inspected = client.inspect_container(&container_id, None).await;

if inspected.is_err() || inspected.unwrap().state.is_none() {
return Ok(to_raw_value(&format!(
"Container not found at {job_id}, you must use --name and not --rm it"
)));
}

let wait_f = async {
let wait = client
.wait_container::<String>(&container_id, None)
.try_collect::<Vec<_>>()
.await
.map_err(|e| {
tracing::error!("Error waiting for container: {:?}", e);
anyhow::anyhow!("Error waiting for container")
})?;
let waited = wait.first().map(|x| x.status_code);
Ok(waited)
};

let ncontainer_id = container_id.to_string();
let w_id = workspace_id.to_string();
let j_id = job_id.clone();
let db2 = db.clone();
let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1);

let mut killpill_rx = killpill_rx.resubscribe();
let logs = tokio::spawn(async move {
let client = bollard::Docker::connect_with_unix_defaults().map_err(to_anyhow);
if let Ok(client) = client {
let mut log_stream = client.logs(
&ncontainer_id,
Some(bollard::container::LogsOptions {
follow: true,
stdout: true,
stderr: true,
tail: "all",
..Default::default()
}),
);
loop {
tokio::select! {
log = log_stream.next() => {
match log {
Some(Ok(log)) => {
append_logs(&j_id, w_id.clone(), log.to_string(), db2.clone()).await;
}
Some(Err(e)) => {
tracing::error!("Error getting logs: {:?}", e);
}
_ => {
tracing::error!("End of stream");
return
}
};
},
_ = killpill_rx.recv() => {
tracing::error!("killing container after receving killpill");
if let Err(e) = client
.stop_container(&ncontainer_id, Some(StopContainerOptions { t: 3 }))
.await
{
tracing::error!("Error stopping container: {:?}", e);
}
return
},
_ = rx.recv() => {
return
}
}
}
}
});

let mem_client = bollard::Docker::connect_with_unix_defaults().map_err(to_anyhow);
let ncontainer_id = container_id.clone();
let result = run_future_with_polling_update_job_poller(
job_id,
job_timeout,
db,
mem_peak,
canceled_by,
wait_f,
worker_name,
workspace_id,
&mut Some(occupancy_metrics),
Box::pin(match mem_client {
Ok(client) => client
.stats(
&ncontainer_id,
Some(StatsOptions { stream: true, one_shot: false }),
)
.map(|x| {
x.map(|x| x.memory_stats.usage.map(|m| m / 1024).unwrap_or_default() as i32)
.unwrap_or_default()
})
.boxed(),
_ => stream::once(async { 0 }).boxed(),
}),
)
.await;

if let Err(e) = result {
if !logs.is_finished() {
let _ = tx.send(());
let _ = logs.await;
}
if container_is_alive(&client, &container_id).await {
kill_container(&client, &container_id, "SIGINT").await;
if container_is_alive(&client, &container_id).await {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
if let Err(e) = client
.stop_container(&container_id, Some(StopContainerOptions { t: 3 }))
.await
{
tracing::error!("Error stopping container: {:?}", e);
}
}
}

return Err(e);
}

if let Err(e) = client
.remove_container(
&container_id,
Some(RemoveContainerOptions { force: true, ..Default::default() }),
)
.await
{
tracing::error!("Error removing container: {:?}", e);
}

let result = result.unwrap();

return Ok(to_raw_value(&json!(format!(
"Docker exit status: {}",
result
.map(|x| x.to_string())
.unwrap_or_else(|| "none".to_string())
))));
}

#[cfg(feature = "dind")]
async fn kill_container(client: &bollard::Docker, container_id: &str, signal: &str) {
if let Err(e) = client
.kill_container(&container_id, Some(KillContainerOptions { signal }))
.await
{
tracing::error!("Error killing container with signal {signal}: {:?}", e);
}
}

#[cfg(feature = "dind")]
async fn container_is_alive(client: &bollard::Docker, container_id: &str) -> bool {
let inspect = client.inspect_container(container_id, None).await;
if let Ok(inspect) = inspect {
let r = inspect
.state
.map(|x| x.running.unwrap_or_default())
.unwrap_or_default();
tracing::error!("Container {container_id} is alive: {r}");
r
} else {
false
}
}

fn raw_to_string(x: &str) -> String {
match serde_json::from_str::<serde_json::Value>(x) {
Ok(serde_json::Value::String(x)) => x,
Expand Down
3 changes: 2 additions & 1 deletion backend/windmill-worker/src/graphql_executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use anyhow::anyhow;
use futures::TryStreamExt;
use futures::{stream, TryStreamExt};
use serde_json::{json, value::RawValue};
use sqlx::types::Json;
use windmill_common::jobs::QueuedJob;
Expand Down Expand Up @@ -154,6 +154,7 @@ pub async fn do_graphql(
worker_name,
&job.workspace_id,
&mut Some(occupation_metrics),
Box::pin(stream::once(async { 0 })),
)
.await?;

Expand Down
Loading

0 comments on commit e36281c

Please sign in to comment.