From c922507746b3df6d2af0c3ef144d1cc68c03dcb1 Mon Sep 17 00:00:00 2001 From: wendrul Date: Mon, 6 May 2024 10:00:04 +0200 Subject: [PATCH] Add concurrency plot to the runs page --- ...0240428204638_job_concurrency_key.down.sql | 1 - .../20240428204638_job_concurrency_key.up.sql | 11 +- backend/windmill-api/openapi.yaml | 66 +++++- .../windmill-api/src/concurrency_groups.rs | 200 ++++++++++++++--- backend/windmill-api/src/jobs.rs | 39 +--- backend/windmill-api/src/lib.rs | 1 + backend/windmill-queue/src/jobs.rs | 44 ++-- .../lib/components/ConcurrentJobsChart.svelte | 212 ++++++++++++++++++ .../src/lib/components/FlowMetadata.svelte | 4 +- .../src/lib/components/runs/JobLoader.svelte | 24 +- .../(logged)/runs/[...path]/+page.svelte | 52 +++-- 11 files changed, 530 insertions(+), 124 deletions(-) create mode 100644 frontend/src/lib/components/ConcurrentJobsChart.svelte diff --git a/backend/migrations/20240428204638_job_concurrency_key.down.sql b/backend/migrations/20240428204638_job_concurrency_key.down.sql index 48c39d9130b29..a35fe6b139e07 100644 --- a/backend/migrations/20240428204638_job_concurrency_key.down.sql +++ b/backend/migrations/20240428204638_job_concurrency_key.down.sql @@ -9,7 +9,6 @@ ALTER TABLE concurrency_key ALTER TABLE custom_concurrency_key_ended DROP COLUMN job_id, - DROP COLUMN concurrency_time_window_s, ADD PRIMARY KEY (key, ended_at); diff --git a/backend/migrations/20240428204638_job_concurrency_key.up.sql b/backend/migrations/20240428204638_job_concurrency_key.up.sql index 965b1c5c619c5..d40bd00063891 100644 --- a/backend/migrations/20240428204638_job_concurrency_key.up.sql +++ b/backend/migrations/20240428204638_job_concurrency_key.up.sql @@ -2,17 +2,10 @@ ALTER TABLE custom_concurrency_key_ended RENAME TO concurrency_key; -DROP EXTENSION "uuid-ossp"; -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - ALTER TABLE concurrency_key DROP CONSTRAINT custom_concurrency_key_ended_pkey, - -- uuid_generate_v4 was not present until i dropped and reloaded uuid-ossp extension - ADD COLUMN job_id UUID DEFAULT uuid_generate_v4() NOT NULL PRIMARY KEY, - -- ADD COLUMN job_id UUID DEFAULT (SELECT md5(random()::text || clock_timestamp()::text)::uuid) NOT NULL PRIMARY KEY, - ADD COLUMN concurrency_time_window_s INTEGER DEFAULT 0 NOT NULL, + ADD COLUMN job_id UUID DEFAULT gen_random_uuid() NOT NULL PRIMARY KEY, ALTER COLUMN ended_at DROP NOT NULL, ALTER COLUMN ended_at SET DEFAULT NULL; - -CREATE INDEX concurrency_key_ended_at_idx ON concurrency_key (key, ended_at); +CREATE INDEX concurrency_key_ended_at_idx ON concurrency_key (key, ended_at DESC); diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 89934c51279b4..0a126a6c9e25b 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -7936,9 +7936,9 @@ paths: schema: type: object properties: {} - /concurrency_groups/w/{workspace}/{id}: + /w/{workspace}/concurrency_groups/job_concurrency_key/{id}: get: - summary: Get the concurrency key for a job that's active in the concurrency_group + summary: Get the concurrency key for a job that has concurrency limits enabled operationId: getConcurrencyKey tags: - concurrencyGroups @@ -7947,11 +7947,31 @@ paths: - $ref: "#/components/parameters/JobId" responses: "200": - description: concurrency key returned + description: concurrency key for given job content: application/json: schema: type: string + /w/{workspace}/concurrency_groups/intervals: + get: + summary: Get intervals of job runtime concurrency + operationId: getConcurrencyIntervals + tags: + - concurrencyGroups + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - in: query + name: concurrency_key + required: false + schema: + type: string + responses: + "200": + description: time + content: + application/json: + schema: + $ref: "#/components/schemas/ConcurrencyIntervals" components: securitySchemes: @@ -10339,12 +10359,42 @@ components: ConcurrencyGroup: type: object properties: - concurrency_id: + concurrency_key: + type: string + total_running: + type: number + total_completed_within_time_window: + type: number + required: + - concurrency_key + - total_running + - total_completed_within_time_window + + ConcurrencyIntervals: + type: object + properties: + concurrency_key: type: string - job_uuids: + running_jobs: type: array items: - type: string + type: object + properties: + started_at: + type: string + format: date-time + completed_jobs: + type: array + items: + type: object + properties: + started_at: + type: string + format: date-time + ended_at: + type: string + format: date-time required: - - concurrency_id - - job_uuids + - concurrency_key + - running_jobs + - completed_jobs diff --git a/backend/windmill-api/src/concurrency_groups.rs b/backend/windmill-api/src/concurrency_groups.rs index ce033023479a2..2704102d6f178 100644 --- a/backend/windmill-api/src/concurrency_groups.rs +++ b/backend/windmill-api/src/concurrency_groups.rs @@ -5,7 +5,8 @@ use axum::extract::Path; #[cfg(feature = "enterprise")] use axum::routing::{delete, get}; #[cfg(feature = "enterprise")] -use axum::{Extension, Json}; +use axum::{extract::Query, Extension, Json}; +use serde::Deserialize; use axum::Router; @@ -14,18 +15,17 @@ use serde::Serialize; #[cfg(feature = "enterprise")] use std::collections::HashMap; #[cfg(feature = "enterprise")] +use uuid::Uuid; +#[cfg(feature = "enterprise")] use windmill_common::error::Error::{InternalErr, PermissionDenied}; #[cfg(feature = "enterprise")] use windmill_common::error::JsonResult; -#[cfg(feature = "enterprise")] -use uuid::Uuid; #[cfg(feature = "enterprise")] pub fn global_service() -> Router { Router::new() .route("/list", get(list_concurrency_groups)) .route("/*id", delete(delete_concurrency_group)) - .route("/w/:workspace_id/:job_id", get(get_concurrency_key)) } #[cfg(not(feature = "enterprise"))] @@ -33,11 +33,18 @@ pub fn global_service() -> Router { Router::new() } +pub fn workspaced_service() -> Router { + Router::new() + .route("/intervals", get(get_concurrent_intervals)) + .route("/job_concurrency_key/:job_id", get(get_concurrency_key)) +} + #[cfg(feature = "enterprise")] #[derive(Serialize)] pub struct ConcurrencyGroups { - concurrency_id: String, - job_uuids: Vec, + concurrency_key: String, + total_running: usize, + total_completed_within_time_window: usize, } #[cfg(feature = "enterprise")] @@ -46,44 +53,57 @@ async fn list_concurrency_groups( Extension(db): Extension, ) -> JsonResult> { if !authed.is_admin { - return Err(PermissionDenied( - "Only administrators can see concurrency groups".to_string(), + return Err(PermissionDenied( "Only administrators can see concurrency groups".to_string(), )); } + + // let concurrency_time_window_s = concurrency.time_window; + let concurrency_time_window_s = 51; let concurrency_groups_raw = sqlx::query_as::<_, (String, serde_json::Value)>( - "SELECT * FROM concurrency_counter ORDER BY concurrency_id ASC", + "SELECT concurrency_id, job_uuids FROM concurrency_counter ORDER BY concurrency_id ASC", ) .fetch_all(&db) .await?; - // let completed_count = sqlx::query!( - // "SELECT COUNT(*) as count, COALESCE(MAX(ended_at), now() - INTERVAL '1 second' * $2) as max_ended_at FROM concurrency_key WHERE key = $1 AND ended_at >= (now() - INTERVAL '1 second' * $2)", - // job_concurrency_key, - // f64::from(job_custom_concurrency_time_window_s), - // ).fetch_one(&mut tx).await.map_err(|e| { - // Error::InternalErr(format!( - // "Error getting completed count for key {job_concurrency_key}: {e}" - // )) - // })?; + let completed_count = sqlx::query!( + "SELECT key, COUNT(*) as count FROM concurrency_key + WHERE ended_at IS NOT NULL AND ended_at >= (now() - INTERVAL '1 second' * $1) GROUP BY key", + f64::from(concurrency_time_window_s) + ) + .fetch_all(&db) + .await + .map_err(|e| { + InternalErr(format!( + "Error getting concurrency limited completed jobs count: {e}" + )) + })?; + let completed_by_key = completed_count + .iter() + .fold(HashMap::new(), |mut acc, entry| { + *acc.entry(entry.key.clone()).or_insert(0) += entry.count.unwrap_or(0); + acc + }); let mut concurrency_groups: Vec = vec![]; - for (concurrency_id, job_uuids_json) in concurrency_groups_raw { + for (concurrency_key, job_uuids_json) in concurrency_groups_raw { let job_uuids_map = serde_json::from_value::>( job_uuids_json, ) .map_err(|err| { - tracing::error!( - "Error deserializing concurrency_counter table content: {:?}", - err - ); InternalErr(format!( "Error deserializing concurrency_counter table content: {}", err.to_string() )) })?; concurrency_groups.push(ConcurrencyGroups { - concurrency_id: concurrency_id.clone(), - job_uuids: job_uuids_map.keys().cloned().collect(), + concurrency_key: concurrency_key.clone(), + total_running: job_uuids_map.len().into(), + total_completed_within_time_window: completed_by_key + .get(&concurrency_key) + .cloned() + .unwrap_or(0) + .try_into() + .unwrap_or(0), }) } @@ -94,7 +114,7 @@ async fn list_concurrency_groups( async fn delete_concurrency_group( authed: ApiAuthed, Extension(db): Extension, - Path(concurrency_id): Path, + Path(concurrency_key): Path, ) -> JsonResult<()> { if !authed.is_admin { return Err(PermissionDenied( @@ -106,7 +126,7 @@ async fn delete_concurrency_group( let concurrency_group = sqlx::query_as::<_, (String, i64)>( "SELECT concurrency_id, (select COUNT(*) from jsonb_object_keys(job_uuids)) as n_job_uuids FROM concurrency_counter WHERE concurrency_id = $1 FOR UPDATE", ) - .bind(concurrency_id.clone()) + .bind(concurrency_key.clone()) .fetch_optional(&mut *tx) .await?; @@ -121,14 +141,14 @@ async fn delete_concurrency_group( sqlx::query!( "DELETE FROM concurrency_counter WHERE concurrency_id = $1", - concurrency_id.clone(), + concurrency_key.clone(), ) .execute(&mut *tx) .await?; sqlx::query!( "DELETE FROM concurrency_key WHERE key = $1", - concurrency_id.clone(), + concurrency_key.clone(), ) .execute(&mut *tx) .await?; @@ -137,14 +157,130 @@ async fn delete_concurrency_group( Ok(Json(())) } -#[cfg(feature = "enterprise")] + +#[derive(Serialize)] +struct ConcurrencyIntervals { + concurrency_key: Option, + running_jobs: Vec, + completed_jobs: Vec, +} + +#[derive(Serialize)] +struct CompletedJobDuration { + started_at: chrono::DateTime, + ended_at: chrono::DateTime, +} + +#[derive(Serialize)] +struct RunningJobDuration { + started_at: chrono::DateTime, +} + +#[derive(Deserialize)] +struct ConcurrentIntervalsParams { + concurrency_key: Option, + row_limit: Option, +} + +async fn get_concurrent_intervals( + authed: ApiAuthed, + Extension(db): Extension, + Path(w_id): Path, + Query(iq): Query, +) -> JsonResult { + let row_limit = iq.row_limit.unwrap_or(1000); + let concurrency_key = iq.concurrency_key; + + let running_jobs = match &concurrency_key { + Some(key) => { + sqlx::query!( + "SELECT id, started_at FROM queue + JOIN concurrency_key ON concurrency_key.job_id = queue.id + WHERE started_at IS NOT NULL AND workspace_id = $2 AND key = $1 + LIMIT $3", + // "SELECT started_at FROM queue JOIN ( + // SELECT uuid(jsonb_object_keys(job_uuids)) AS job_id + // FROM concurrency_counter WHERE concurrency_id = $1 + // ) + // AS expanded_concurrency_table + // ON queue.id = expanded_concurrency_table.job_id + // WHERE started_at IS NOT NULL + // LIMIT $2", + key, + w_id, + row_limit, + ) + .fetch_all(&db) + .await? + .iter() + .map(|row| RunningJobDuration { started_at: row.started_at.unwrap() }) + .collect() + } + + None => sqlx::query!( + "SELECT started_at FROM queue WHERE started_at IS NOT NULL AND workspace_id = $1 AND script_path IS NOT NULL LIMIT $2", + w_id, + row_limit, + ) + .fetch_all(&db) + .await? + .iter() + .map(|row| RunningJobDuration { started_at: row.started_at.unwrap() }) + .collect(), + }; + + let completed_jobs = match &concurrency_key { + Some(key) => { + sqlx::query!( + "SELECT job_id, ended_at, started_at FROM concurrency_key JOIN completed_job ON concurrency_key.job_id = completed_job.id + WHERE workspace_id = $1 AND key = $2 AND ended_at IS NOT NULL LIMIT $3", + w_id, + key, + row_limit, + ) + .fetch_all(&db) + .await? + .iter() + .map(|row| CompletedJobDuration{started_at: row.started_at, ended_at: row.ended_at.unwrap()}) + .collect() + } + None => { + sqlx::query!( + "SELECT started_at, duration_ms FROM completed_job WHERE workspace_id = $1 LIMIT $2", + w_id, + row_limit, + ) + .fetch_all(&db) + .await? + .iter() + .map(|row| CompletedJobDuration{started_at: row.started_at, ended_at: row.started_at + std::time::Duration::from_millis(row.duration_ms.try_into().unwrap()) }) + .collect() + } + }; + + Ok(Json(ConcurrencyIntervals { + concurrency_key, + running_jobs, + completed_jobs, + })) +} + async fn get_concurrency_key( authed: ApiAuthed, Extension(db): Extension, Path((w_id, job_id)): Path<(String, Uuid)>, -) -> JsonResult { - +) -> JsonResult> { let job = crate::jobs::get_job_internal(&db, &w_id, job_id, true).await?; Ok(Json(job.concurrency_key(&db).await?)) } + +// async fn get_concurrency_keys_for_jobs( +// authed: ApiAuthed, +// Extension(db): Extension, +// Path((w_id, job_id)): Path<(String, Uuid)>, +// ) -> JsonResult> { +// +// +// let ret = vec![] +// } diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 652147aabe869..5acb9f0092df5 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1991,38 +1991,13 @@ impl Job { ) } - pub async fn concurrency_key(&self, db: &Pool) -> Result { - let concurrency_key = windmill_queue::custom_concurrency_key(db, self.id()).await?; - let k = match concurrency_key { - Some(custom_concurrency_key) => { - let workspaced = - custom_concurrency_key.replace("$workspace", self.workspace_id().as_str()); - if RE_ARG_TAG.is_match(&workspaced) { - let mut interpolated = workspaced.clone(); - for cap in RE_ARG_TAG.captures_iter(&workspaced) { - let arg_name = cap.get(1).unwrap().as_str(); - let arg_value = match self.args() { - Some(sqlx::types::Json(args_map_json)) => { - match args_map_json.get(arg_name) { - Some(arg_value_raw) => { - serde_json::to_string(arg_value_raw).unwrap_or_default() - } - None => "".to_string(), - } - } - None => "".to_string(), - }; - interpolated = interpolated - .replace(format!("$args[{}]", arg_name).as_str(), arg_value.as_str()); - } - interpolated - } else { - workspaced - } - } - None => self.full_path_with_workspace(), - }; - Ok(k) + pub async fn concurrency_key(&self, db: &Pool) -> Result, sqlx::Error> { + sqlx::query_scalar!( + "SELECT key FROM concurrency_key WHERE job_id = $1", + self.id() + ) + .fetch_optional(db) + .await } } diff --git a/backend/windmill-api/src/lib.rs b/backend/windmill-api/src/lib.rs index 29ad9354474b2..7623d0ce66354 100644 --- a/backend/windmill-api/src/lib.rs +++ b/backend/windmill-api/src/lib.rs @@ -196,6 +196,7 @@ pub async fn run_server( .nest("/apps", apps::workspaced_service()) .nest("/audit", audit::workspaced_service()) .nest("/capture", capture::workspaced_service()) + .nest("/concurrency_groups", concurrency_groups::workspaced_service()) .nest("/embeddings", embeddings::workspaced_service()) .nest("/drafts", drafts::workspaced_service()) .nest("/favorites", favorite::workspaced_service()) diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index a3f6b8ab3896d..e2108e56426f2 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -710,8 +710,8 @@ pub async fn add_completed_job< } if let Err(e) = sqlx::query_scalar!( - "UPDATE concurrency_key SET ended_at = now() WHERE key = $1", - concurrency_key, + "UPDATE concurrency_key SET ended_at = now() WHERE job_id = $1", + queued_job.id, ) .execute(&mut tx) .await @@ -1935,12 +1935,9 @@ pub async fn custom_concurrency_key( db: &Pool, job_id: Uuid, ) -> Result, sqlx::Error> { - sqlx::query_scalar!( - "SELECT key FROM concurrency_key WHERE job_id = $1", - job_id - ) - .fetch_optional(db) // this should no longer be fetch optional - .await + sqlx::query_scalar!("SELECT key FROM concurrency_key WHERE job_id = $1", job_id) + .fetch_optional(db) // this should no longer be fetch optional + .await } async fn concurrency_key( @@ -3439,24 +3436,19 @@ pub async fn push<'c, T: Serialize + Send + Sync, R: rsmq_async::RsmqConnection .await .map_err(|e| Error::InternalErr(format!("Could not insert into queue {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}: {e}")))?; - tracing::error!("concurrency jeje {:?}", concurrent_limit); - - if let Some(concurrent_limit_count) = concurrent_limit { - if concurrent_limit_count > 0 { - // let concurrency_key = process_custom_concurrency_key(queued_job, custom_concurrency_key) - let concurrency_key = custom_concurrency_key.unwrap_or("process concurrency key fn not yet implemented".to_string()); - tracing::error!("concurrency key jeje {:?}", concurrency_key); - sqlx::query!( - "INSERT INTO concurrency_key(key, job_id, concurrency_time_window_s) VALUES ($1, $2, $3)", - concurrency_key, - job_id, - concurrency_time_window_s.unwrap_or(0), - ) - .execute(&mut tx) - .await - .map_err(|e| Error::InternalErr(format!("Could not insert concurrency_key={concurrency_key} for job_id={job_id} script_path={script_path:?} workspace_id={workspace_id}: {e}")))?; - } - }; + if concurrent_limit.is_some() { + // let concurrency_key = process_custom_concurrency_key(queued_job, custom_concurrency_key) + let concurrency_key = custom_concurrency_key + .unwrap_or("process concurrency key fn not yet implemented".to_string()); + sqlx::query!( + "INSERT INTO concurrency_key(key, job_id) VALUES ($1, $2)", + concurrency_key, + job_id, + ) + .execute(&mut tx) + .await + .map_err(|e| Error::InternalErr(format!("Could not insert concurrency_key={concurrency_key} for job_id={job_id} script_path={script_path:?} workspace_id={workspace_id}: {e}")))?; + } // TODO: technically the job isn't queued yet, as the transaction can be rolled back. Should be solved when moving these metrics to the queue abstraction. #[cfg(feature = "prometheus")] diff --git a/frontend/src/lib/components/ConcurrentJobsChart.svelte b/frontend/src/lib/components/ConcurrentJobsChart.svelte new file mode 100644 index 0000000000000..c944d888a8c7a --- /dev/null +++ b/frontend/src/lib/components/ConcurrentJobsChart.svelte @@ -0,0 +1,212 @@ + + +
+ +
diff --git a/frontend/src/lib/components/FlowMetadata.svelte b/frontend/src/lib/components/FlowMetadata.svelte index c85662eef6aa3..cca487a71d917 100644 --- a/frontend/src/lib/components/FlowMetadata.svelte +++ b/frontend/src/lib/components/FlowMetadata.svelte @@ -39,11 +39,11 @@ {job?.created_at} - {#if job && "concurrent_limit" in job && job.concurrent_limit > 0} + {#if concurrencyKey}
- {concurrencyKey} + {concurrencyKey} This job has a concurrency limit
diff --git a/frontend/src/lib/components/runs/JobLoader.svelte b/frontend/src/lib/components/runs/JobLoader.svelte index ef338055fb0bc..dd72a954f5191 100644 --- a/frontend/src/lib/components/runs/JobLoader.svelte +++ b/frontend/src/lib/components/runs/JobLoader.svelte @@ -1,6 +1,12 @@