diff --git a/.github/workflows/backend-test.yml b/.github/workflows/backend-test.yml index 12d7d0b3760f6..8cb445496d428 100644 --- a/.github/workflows/backend-test.yml +++ b/.github/workflows/backend-test.yml @@ -67,4 +67,4 @@ jobs: DISABLE_EMBEDDING=true RUST_LOG=info PYTHON_PATH=$(which python) DENO_PATH=$(which deno) BUN_PATH=$(which bun) GO_PATH=$(which go) UV_PATH=$(which uv) cargo test --features - enterprise,deno_core,license,python,rust --all -- --nocapture + enterprise,deno_core,license,python,rust,scoped_cache --all -- --nocapture diff --git a/backend/.sqlx/query-0bf123446bebbc357c58a53a9319f4954dbf3225e91cbe999e5b264c1a747664.json b/backend/.sqlx/query-0bf123446bebbc357c58a53a9319f4954dbf3225e91cbe999e5b264c1a747664.json new file mode 100644 index 0000000000000..2ae7c8df909f4 --- /dev/null +++ b/backend/.sqlx/query-0bf123446bebbc357c58a53a9319f4954dbf3225e91cbe999e5b264c1a747664.json @@ -0,0 +1,72 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT content AS \"content!: String\",\n lock AS \"lock: String\", language AS \"language: Option\", envs AS \"envs: Vec\", codebase AS \"codebase: String\" FROM script WHERE hash = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "content!: String", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "lock: String", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "language: Option", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "envs: Vec", + "type_info": "VarcharArray" + }, + { + "ordinal": 4, + "name": "codebase: String", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + true, + false, + true, + true + ] + }, + "hash": "0bf123446bebbc357c58a53a9319f4954dbf3225e91cbe999e5b264c1a747664" +} diff --git a/backend/.sqlx/query-3ee2d60ca93eeaf02ab0ee96aca399ec055b044a06284c0ab19b67d97f803894.json b/backend/.sqlx/query-3ee2d60ca93eeaf02ab0ee96aca399ec055b044a06284c0ab19b67d97f803894.json new file mode 100644 index 0000000000000..89b87e585f75c --- /dev/null +++ b/backend/.sqlx/query-3ee2d60ca93eeaf02ab0ee96aca399ec055b044a06284c0ab19b67d97f803894.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT code AS \"raw_code: String\", lock AS \"raw_lock: String\", flow AS \"raw_flow: Json>\" FROM flow_node WHERE id = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "raw_code: String", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "raw_lock: String", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "raw_flow: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "3ee2d60ca93eeaf02ab0ee96aca399ec055b044a06284c0ab19b67d97f803894" +} diff --git a/backend/.sqlx/query-b7f2ed32e933b65fa5455928c71f61068ad7dfee8352a76122f5af893ffe6517.json b/backend/.sqlx/query-b7f2ed32e933b65fa5455928c71f61068ad7dfee8352a76122f5af893ffe6517.json new file mode 100644 index 0000000000000..e894cf350732e --- /dev/null +++ b/backend/.sqlx/query-b7f2ed32e933b65fa5455928c71f61068ad7dfee8352a76122f5af893ffe6517.json @@ -0,0 +1,72 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT content AS \"content!: String\",\n lock AS \"lock: String\", language AS \"language: Option\", envs AS \"envs: Vec\", codebase AS \"codebase: String\" FROM script WHERE hash = $1 LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "content!: String", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "lock: String", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "language: Option", + "type_info": { + "Custom": { + "name": "script_lang", + "kind": { + "Enum": [ + "python3", + "deno", + "go", + "bash", + "postgresql", + "nativets", + "bun", + "mysql", + "bigquery", + "snowflake", + "graphql", + "powershell", + "mssql", + "php", + "bunnative", + "rust", + "ansible", + "csharp" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "envs: Vec", + "type_info": "VarcharArray" + }, + { + "ordinal": 4, + "name": "codebase: String", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + true, + false, + true, + true + ] + }, + "hash": "b7f2ed32e933b65fa5455928c71f61068ad7dfee8352a76122f5af893ffe6517" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 2d65bb572d1d4..27553ac63369b 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -10871,6 +10871,7 @@ dependencies = [ "serde_json", "sha2 0.10.8", "sqlx", + "tempfile", "thiserror 2.0.8", "tikv-jemalloc-ctl", "tokio", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 6ff3b76ddd21d..1706c19d86709 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -79,6 +79,7 @@ oauth2 = ["windmill-api/oauth2"] http_trigger = ["windmill-api/http_trigger"] zip = ["windmill-api/zip"] static_frontend = ["windmill-api/static_frontend"] +scoped_cache = ["windmill-common/scoped_cache"] [dependencies] diff --git a/backend/tests/worker.rs b/backend/tests/worker.rs index c9a162635b656..f189536c16a99 100644 --- a/backend/tests/worker.rs +++ b/backend/tests/worker.rs @@ -3814,7 +3814,7 @@ mod job_payload { let flow_data = cache::flow::fetch_version_lite(&db, 1443253234253454) .await .unwrap(); - let flow_value = flow_data.value().unwrap(); + let flow_value = flow_data.value(); let flow_scripts = { async fn load(db: &Pool, modules: &[FlowModule]) -> Vec { let mut res = vec![]; @@ -3825,9 +3825,7 @@ mod job_payload { FlowModuleValue::FlowScript { id, .. } => res.push(id), FlowModuleValue::ForloopFlow { modules_node: Some(flow_node), .. } => { let flow_data = cache::flow::fetch_flow(db, flow_node).await.unwrap(); - res.extend( - Box::pin(load(db, &flow_data.value().unwrap().modules)).await, - ); + res.extend(Box::pin(load(db, &flow_data.value().modules)).await); } _ => {} } @@ -3904,7 +3902,7 @@ mod job_payload { let flow_data = cache::flow::fetch_version_lite(&db, 1443253234253454) .await .unwrap(); - let flow_value = flow_data.value().unwrap(); + let flow_value = flow_data.value(); let forloop_module = serde_json::from_str::(flow_value.modules[0].value.get()).unwrap(); let FlowModuleValue::ForloopFlow { modules_node: Some(id), .. } = forloop_module else { diff --git a/backend/windmill-api/src/apps.rs b/backend/windmill-api/src/apps.rs index 814fd58cb02de..a29883053c18e 100644 --- a/backend/windmill-api/src/apps.rs +++ b/backend/windmill-api/src/apps.rs @@ -1322,10 +1322,7 @@ async fn execute_component( // 2. Otherwise, always fetch the policy from the database. let policy = if let Some(id) = payload.version { let cache = cache::anon!({ u64 => Arc } in "policy" <= 1000); - arc_policy = policy_fut - .map_ok(sqlx::types::Json) // cache as json. - .cached(cache, id as u64, |sqlx::types::Json(x)| Arc::new(x)) - .await?; + arc_policy = policy_fut.map_ok(Arc::new).cached(cache, id as u64).await?; &*arc_policy } else { policy = policy_fut.await?; @@ -1352,8 +1349,8 @@ async fn execute_component( ) .fetch_one(&db) .map_err(Into::::into) - .map_ok(sqlx::types::Json) // cache as json. - .cached(cache, *id as u64, |sqlx::types::Json(x)| Arc::new(x)) + .map_ok(Arc::new) + .cached(cache, *id as u64) .await? } _ => unreachable!(), diff --git a/backend/windmill-common/Cargo.toml b/backend/windmill-common/Cargo.toml index 9c35ef7ea1db8..2c689cecfbbbd 100644 --- a/backend/windmill-common/Cargo.toml +++ b/backend/windmill-common/Cargo.toml @@ -15,6 +15,7 @@ parquet = ["dep:object_store", "dep:aws-config", "dep:aws-sdk-sts"] otel = ["dep:opentelemetry-semantic-conventions", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk", "dep:opentelemetry", "dep:tracing-opentelemetry", "dep:opentelemetry-appender-tracing", "dep:tonic"] smtp = ["dep:mail-send"] +scoped_cache = [] [lib] name = "windmill_common" @@ -65,6 +66,7 @@ croner = "2.0.6" quick_cache.workspace = true pin-project-lite.workspace = true futures.workspace = true +tempfile.workspace = true opentelemetry-semantic-conventions = { workspace = true, optional = true } opentelemetry-otlp = { workspace = true, optional = true } diff --git a/backend/windmill-common/src/cache.rs b/backend/windmill-common/src/cache.rs index 81043025c2a63..622718433c0a8 100644 --- a/backend/windmill-common/src/cache.rs +++ b/backend/windmill-common/src/cache.rs @@ -1,74 +1,180 @@ -use crate::apps::AppScriptId; -use crate::error; -use crate::flows::FlowNodeId; -use crate::flows::FlowValue; -use crate::scripts::ScriptHash; -use crate::scripts::ScriptLang; - -use std::future::Future; -use std::hash::Hash; -use std::panic::Location; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +//! Windmill cache system. +//! +//! # Features +//! - `scoped_cache`: Use scoped cache instead of global cache. +//! 1. The cache is made thread-local, so each thread has its own entries. +//! 2. The cache is made temporary, so it is deleted when the program exits. +//! This shall only be used for testing, e.g. [`sqlx::test`] spawn a database per test, +//! and there is only one test per thread, so using thread-local cache avoid unexpected results. + +use crate::{ + apps::AppScriptId, error, flows::FlowNodeId, flows::FlowValue, scripts::ScriptHash, + scripts::ScriptLang, +}; + +#[cfg(feature = "scoped_cache")] +use std::thread::ThreadId; +use std::{ + future::Future, + hash::Hash, + panic::Location, + path::{Path, PathBuf}, + sync::Arc, +}; +use futures::TryFutureExt; use serde::{Deserialize, Serialize}; -use sqlx::types::{Json, JsonRawValue as RawValue}; -use sqlx::PgExecutor; +use sqlx::{ + types::{Json, JsonRawValue as RawValue}, + PgExecutor, +}; +use uuid::Uuid; pub use const_format::concatcp; pub use lazy_static::lazy_static; pub use quick_cache::sync::Cache; -/// Cache directory for windmill server/worker(s). -pub const CACHE_DIR: &str = "/tmp/windmill/cache/"; +#[cfg(not(feature = "scoped_cache"))] +lazy_static! { + /// Cache directory for windmill server/worker(s). + /// 1. If `XDG_CACHE_HOME` is set, use `"${XDG_CACHE_HOME}/windmill"`. + /// 2. If `HOME` is set, use `"${HOME}/.cache/windmill"`. + /// 3. Otherwise, use `"{std::env::temp_dir()}/windmill/cache"`. + pub static ref CACHE_PATH: PathBuf = { + std::env::var("XDG_CACHE_HOME") + .map(PathBuf::from) + .or_else(|_| std::env::var("HOME").map(|home| PathBuf::from(home).join(".cache"))) + .map(|cache| cache.join("windmill")) + .unwrap_or_else(|_| std::env::temp_dir().join("windmill/cache")) + }; +} + +#[cfg(feature = "scoped_cache")] +lazy_static! { + /// Temporary directory for thread-local cache. + pub static ref CACHE_PATH_TMP: tempfile::TempDir = { + tempfile::tempdir().expect("Failed to create temporary directory") + }; + + /// Cache directory for windmill server/worker(s). + pub static ref CACHE_PATH: PathBuf = CACHE_PATH_TMP.as_ref().to_path_buf(); +} + +/// An item that can be imported/exported from/into the file-system. +pub trait Item: Sized { + /// Returns the path of the item within the given `root` path. + fn path(&self, root: impl AsRef) -> PathBuf; +} + +/// Bytes storage. +pub trait Storage { + /// Get bytes for `item`. + fn get(&self, item: impl Item) -> std::io::Result>; + /// Put bytes for `item`. + fn put(&self, item: impl Item, data: impl AsRef<[u8]>) -> std::io::Result<()>; + + /// Get utf8 string for `item`. + #[inline(always)] + fn get_utf8(&self, item: impl Item) -> error::Result { + Ok(String::from_utf8(self.get(item)?)?) + } + + /// Get json for `item`. + #[inline(always)] + fn get_json Deserialize<'de>>(&self, item: impl Item) -> error::Result { + Ok(serde_json::from_slice(&self.get(item)?)?) + } + + /// Get json raw value for `item`. + #[inline(always)] + fn get_json_raw(&self, item: impl Item) -> error::Result> { + Ok(RawValue::from_string(self.get_utf8(item)?)?) + } +} + +/// A type that can be imported from [`Storage`]. +pub trait Import: Sized { + fn import(src: &impl Storage) -> error::Result; +} + +/// A type that can be exported to [`Storage`]. +pub trait Export: Clone { + /// The untrusted type that can be imported from [`Storage`]. + type Untrusted: Import; + + /// Resolve the untrusted type into the trusted type. + fn resolve(src: Self::Untrusted) -> error::Result; + /// Export the trusted type into storage. + fn export(&self, dst: &impl Storage) -> error::Result<()>; +} /// A file-system backed concurrent cache. -pub struct FsBackedCache { +pub struct FsBackedCache { + #[cfg(not(feature = "scoped_cache"))] cache: Cache, - root: &'static str, + #[cfg(feature = "scoped_cache")] + cache: Cache<(ThreadId, Key), Val>, + root: Root, } -impl FsBackedCache { +impl> FsBackedCache { /// Create a new file-system backed cache with `items_capacity` capacity. /// The cache will be stored in the `root` directory. - pub fn new(root: &'static str, items_capacity: usize) -> Self { + pub fn new(root: Root, items_capacity: usize) -> Self { Self { cache: Cache::new(items_capacity), root } } /// Build a path for the given `key`. pub fn path(&self, key: &Key) -> PathBuf { - key.path(self.root) + #[cfg(feature = "scoped_cache")] + let key = &(std::thread::current().id(), key.clone()); + key.path(&self.root) } /// Remove the item with the given `key` from the cache. pub fn remove(&self, key: &Key) -> Option<(Key, Val)> { let _ = std::fs::remove_dir_all(self.path(key)); - self.cache.remove(key) + #[cfg(feature = "scoped_cache")] + let key = &(std::thread::current().id(), key.clone()); + let res = self.cache.remove(key); + #[cfg(feature = "scoped_cache")] + let res = res.map(|(k, v)| (k.1, v)); + res } /// Gets or inserts an item in the cache with key `key`. - pub async fn get_or_insert_async<'a, T: fs::Bundle, F>( - &'a self, - key: Key, - map: impl Fn(T) -> Val, - with: F, - ) -> error::Result + pub async fn get_or_insert_async<'a, F>(&'a self, key: Key, with: F) -> error::Result where - Key: Clone, - F: Future>, + F: Future>, { - if cfg!(test) { - // Disable caching in tests: since `#[sqlx::test]` spawn a database per test, the cache - // could yield unexpected results. - return with.await.map(map); - } - self.cache - .get_or_insert_async(&key, async { - fs::import_or_insert_with(self.path(&key), with) - .await - .map(map) - }) - .await + let import_or_fetch = async { + let path = &self.path(&key); + // Retrieve the data from the cache directory or the database. + if std::fs::metadata(path).is_ok() { + // Cache path exists, read its contents. + match ::Untrusted::import(path).and_then(Val::resolve) { + Ok(data) => return Ok(data), + Err(err) => tracing::warn!( + "Failed to import from file-system, fetch source: {path:?}: {err:?}" + ), + } + } + // Cache path doesn't exist or import failed, generate the content. + let data = Val::resolve(with.await?)?; + // Try to export data to the file-system. + // If failed, remove the directory but still return the data. + if let Err(err) = std::fs::create_dir_all(path) + .map_err(Into::into) + .and_then(|_| data.export(&path)) + { + tracing::warn!("Failed to export to file-system: {path:?}: {err:?}"); + let _ = std::fs::remove_dir_all(path); + } + Ok(data) + }; + #[cfg(feature = "scoped_cache")] + let key = (std::thread::current().id(), key.clone()); + self.cache.get_or_insert_async(&key, import_or_fetch).await } } @@ -92,9 +198,9 @@ macro_rules! make_static { $crate::cache::lazy_static! { $( $(#[$attr])* - static ref $name: $crate::cache::FsBackedCache<$Key, $Val> = + static ref $name: $crate::cache::FsBackedCache<$Key, $Val, ::std::path::PathBuf> = $crate::cache::FsBackedCache::new( - $crate::cache::concatcp!($crate::cache::CACHE_DIR, $root), + $crate::cache::CACHE_PATH.join($root), $cap ); )+ @@ -130,7 +236,7 @@ pub mod future { use super::*; /// Extension trait for futures that can be cached. - pub trait FutureCachedExt: Future> + Sized { + pub trait FutureCachedExt: Future> + Sized { /// Get or insert the future result in the cache. /// /// # Example @@ -140,27 +246,23 @@ pub mod future { /// /// #[allow(unused)] /// async { - /// let result = std::future::ready(Ok(Json(42))) - /// .cached(cache::anon!({ u64 => Json } in "test" <= 1), 42, |x| x) + /// let result = std::future::ready(Ok(42u64)) + /// .cached(cache::anon!({ u64 => u64 } in "test" <= 1), 42u64) /// .await; /// - /// assert_eq!(result.unwrap(), Json(42)); + /// assert_eq!(result.unwrap(), 42u64); /// }; /// ``` - fn cached( + fn cached, Root: AsRef>( self, - cache: &FsBackedCache, + cache: &FsBackedCache, key: Key, - map: impl Fn(T) -> Val, - ) -> impl Future> - where - Key: Clone, - { - cache.get_or_insert_async(key.to_owned(), map, self) + ) -> impl Future> { + cache.get_or_insert_async(key.to_owned(), self) } } - impl> + Sized> FutureCachedExt for F {} + impl> + Sized> FutureCachedExt for F {} } /// Flow data: i.e. a cached `raw_flow`. @@ -168,62 +270,67 @@ pub mod future { #[derive(Debug, Clone)] pub struct FlowData { pub raw_flow: Box, - pub flow: Result, + pub flow: FlowValue, } impl FlowData { - pub fn from_utf8(vec: Vec) -> error::Result { - Ok(Self::from_raw(RawValue::from_string(String::from_utf8( - vec, - )?)?)) - } - - pub fn from_raw(raw_flow: Box) -> Self { - let flow = serde_json::from_str(raw_flow.get()) - .map_err(|e| format!("Invalid flow value: {:?}", e)); - Self { raw_flow, flow } - } - - pub fn value(&self) -> error::Result<&FlowValue> { - self.flow - .as_ref() - .map_err(|err| error::Error::InternalErr(err.clone())) + pub fn from_raw(raw_flow: Box) -> error::Result { + let flow = serde_json::from_str(raw_flow.get())?; + Ok(Self { raw_flow, flow }) } -} -impl Default for FlowData { - fn default() -> Self { - Self { raw_flow: Default::default(), flow: Err(Default::default()) } + pub fn value(&self) -> &FlowValue { + &self.flow } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct ScriptData { pub lock: Option, pub code: String, } -impl ScriptData { - pub fn from_raw(lock: Option, code: Option) -> Self { - let lock = lock.and_then(|x| if x.is_empty() { None } else { Some(x) }); - let code = code.unwrap_or_default(); - Self { lock, code } - } -} - #[derive(Debug, Clone)] pub enum RawData { Flow(Arc), Script(Arc), } -#[derive(Serialize, Deserialize, Debug, Clone, Default)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ScriptMetadata { pub language: Option, pub envs: Option>, pub codebase: Option, } +#[derive(Debug)] +pub struct RawScript { + pub content: String, + pub lock: Option, + pub meta: Option, +} + +#[derive(Debug)] +pub struct RawFlow { + pub raw_flow: Box, +} + +#[derive(Debug)] +pub struct RawNode { + pub raw_code: Option, + pub raw_lock: Option, + pub raw_flow: Option>, +} + +#[derive(Debug, Clone)] +struct Entry(Arc); + +#[derive(Debug, Clone)] +struct ScriptFull { + pub data: Arc, + pub meta: Arc, +} + fn unwrap_or_error( at: &'static Location, entity: &'static str, @@ -235,70 +342,6 @@ fn unwrap_or_error( } } -const _: () = { - impl fs::Bundle for FlowData { - type Item = &'static str; - - fn items() -> impl Iterator { - ["flow.json"].into_iter() - } - - fn import(&mut self, _: Self::Item, data: Vec) -> error::Result<()> { - *self = Self::from_utf8(data)?; - Ok(()) - } - - fn export(&self, _: Self::Item) -> error::Result>> { - match self.raw_flow.get().is_empty() { - false => Ok(Some(self.raw_flow.get().as_bytes().to_vec())), - true => Ok(None), - } - } - } - - impl fs::Bundle for ScriptData { - type Item = &'static str; - - fn items() -> impl Iterator { - ["lock.txt", "code.txt"].into_iter() - } - - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { - match item { - "lock.txt" => self.lock = Some(String::from_utf8(data)?), - "code.txt" => self.code = String::from_utf8(data)?, - _ => {} - } - Ok(()) - } - - fn export(&self, item: Self::Item) -> error::Result>> { - match item { - "lock.txt" => Ok(self.lock.as_ref().map(|s| s.as_bytes().to_vec())), - "code.txt" if !self.code.is_empty() => Ok(Some(self.code.as_bytes().to_vec())), - _ => Ok(None), - } - } - } - - impl fs::Bundle for ScriptMetadata { - type Item = &'static str; - - fn items() -> impl Iterator { - ["info.json"].into_iter() - } - - fn import(&mut self, _: Self::Item, data: Vec) -> error::Result<()> { - *self = serde_json::from_slice(&data)?; - Ok(()) - } - - fn export(&self, _: Self::Item) -> error::Result>> { - Ok(Some(serde_json::to_vec(self)?)) - } - } -}; - pub mod flow { use super::*; @@ -307,9 +350,9 @@ pub mod flow { /// FIXME: Use `Arc` for cheap cloning. static ref NODES: { FlowNodeId => RawData } in "flow" <= 1000; /// Flow version value cache (version id => value). - static ref FLOWS: { i64 => Arc } in "flows" <= 1000; + static ref FLOWS: { i64 => Entry } in "flows" <= 1000; /// Flow version lite value cache (version id => value). - static ref FLOWS_LITE: { i64 => Arc } in "flowslite" <= 1000; + static ref FLOWS_LITE: { i64 => Entry } in "flowslite" <= 1000; } /// Fetch the flow node script referenced by `node` from the cache. @@ -367,33 +410,25 @@ pub mod flow { // If not present, `get_or_insert_async` will lock the key until the future completes, // so only one thread will be able to fetch the data from the database and write it to // the file system and cache, hence no race on the file system. - NODES.get_or_insert_async( - node, - |(script, flow)| match flow { - Some(flow) => RawData::Flow(Arc::new(flow)), - _ => RawData::Script(Arc::new(script)), - }, - async move { - sqlx::query!( - "SELECT \ - lock AS \"lock: String\", \ - code AS \"code: String\", \ - flow AS \"flow: Json>\" \ - FROM flow_node WHERE id = $1 LIMIT 1", - node.0, - ) - .fetch_optional(e) - .await - .map_err(Into::into) - .and_then(unwrap_or_error(&loc, "Flow node", node)) - .map(|r| { - ( - ScriptData::from_raw(r.lock, r.code), - r.flow.map(|Json(raw_flow)| FlowData::from_raw(raw_flow)), - ) - }) - }, - ) + NODES.get_or_insert_async(node, async move { + sqlx::query!( + "SELECT \ + code AS \"raw_code: String\", \ + lock AS \"raw_lock: String\", \ + flow AS \"raw_flow: Json>\" \ + FROM flow_node WHERE id = $1 LIMIT 1", + node.0, + ) + .fetch_optional(e) + .await + .map_err(Into::into) + .and_then(unwrap_or_error(&loc, "Flow node", node)) + .map(|r| RawNode { + raw_code: r.raw_code, + raw_lock: r.raw_lock, + raw_flow: r.raw_flow.map(|Json(raw_flow)| raw_flow), + }) + }) } #[track_caller] @@ -402,7 +437,7 @@ pub mod flow { id: i64, ) -> impl Future>> { let loc = Location::caller(); - FLOWS.get_or_insert_async(id, Arc::new, async move { + let fut = FLOWS.get_or_insert_async(id, async move { sqlx::query_scalar!( "SELECT value AS \"value!: Json>\" FROM flow_version WHERE id = $1 LIMIT 1", @@ -412,8 +447,9 @@ pub mod flow { .await .map_err(Into::into) .and_then(unwrap_or_error(&loc, "Flow version", id)) - .map(|Json(raw_flow)| FlowData::from_raw(raw_flow)) - }) + .map(|Json(raw_flow)| RawFlow { raw_flow }) + }); + fut.map_ok(|Entry(data)| data) } #[track_caller] @@ -422,7 +458,7 @@ pub mod flow { id: i64, ) -> impl Future>> { let loc = Location::caller(); - FLOWS_LITE.get_or_insert_async(id, Arc::new, async move { + let fut = FLOWS_LITE.get_or_insert_async(id, async move { sqlx::query_scalar!( "SELECT value AS \"value!: Json>\" FROM flow_version_lite WHERE id = $1 LIMIT 1", @@ -432,8 +468,9 @@ pub mod flow { .await .map_err(Into::into) .and_then(unwrap_or_error(&loc, "Flow version \"lite\"", id)) - .map(|Json(raw_flow)| FlowData::from_raw(raw_flow)) - }) + .map(|Json(raw_flow)| RawFlow { raw_flow }) + }); + fut.map_ok(|Entry(data)| data) } } @@ -443,7 +480,7 @@ pub mod script { make_static! { /// Scripts cache. /// FIXME: Use `Arc` for cheap cloning. - static ref CACHE: { ScriptHash => (Arc, Arc) } in "script" <= 1000; + static ref CACHE: { ScriptHash => ScriptFull } in "script" <= 1000; } /// Fetch the script referenced by `hash` from the cache. @@ -459,32 +496,32 @@ pub mod script { // so only one thread will be able to fetch the data from the database and write it to // the file system and cache, hence no race on the file system. let loc = Location::caller(); - CACHE.get_or_insert_async( - hash, - |(data, metadata)| (Arc::new(data), Arc::new(metadata)), - async move { - sqlx::query!( - "SELECT \ - lock AS \"lock: String\", \ - content AS \"code!: String\", - language AS \"language: Option\", \ - envs AS \"envs: Vec\", \ - codebase AS \"codebase: String\" \ - FROM script WHERE hash = $1 LIMIT 1", - hash.0 - ) - .fetch_optional(e) - .await - .map_err(Into::into) - .and_then(unwrap_or_error(&loc, "Script", hash)) - .map(|r| { - ( - ScriptData::from_raw(r.lock, Some(r.code)), - ScriptMetadata { language: r.language, envs: r.envs, codebase: r.codebase }, - ) - }) - }, - ) + let fut = CACHE.get_or_insert_async(hash, async move { + sqlx::query!( + "SELECT \ + content AS \"content!: String\", + lock AS \"lock: String\", \ + language AS \"language: Option\", \ + envs AS \"envs: Vec\", \ + codebase AS \"codebase: String\" \ + FROM script WHERE hash = $1 LIMIT 1", + hash.0 + ) + .fetch_optional(e) + .await + .map_err(Into::into) + .and_then(unwrap_or_error(&loc, "Script", hash)) + .map(|r| RawScript { + content: r.content, + lock: r.lock, + meta: Some(ScriptMetadata { + language: r.language, + envs: r.envs, + codebase: r.codebase, + }), + }) + }); + fut.map_ok(|ScriptFull { data, meta }| (data, meta)) } /// Invalidate the script cache for the given `hash`. @@ -498,7 +535,7 @@ pub mod app { make_static! { /// App scripts cache. - static ref CACHE: { AppScriptId => Arc } in "app" <= 1000; + static ref CACHE: { AppScriptId => Entry } in "app" <= 1000; } /// Fetch the app script referenced by `id` from the cache. @@ -514,7 +551,7 @@ pub mod app { // so only one thread will be able to fetch the data from the database and write it to // the file system and cache, hence no race on the file system. let loc = Location::caller(); - CACHE.get_or_insert_async(id, Arc::new, async move { + let fut = CACHE.get_or_insert_async(id, async move { sqlx::query!( "SELECT lock, code FROM app_script WHERE id = $1 LIMIT 1", id.0, @@ -523,8 +560,9 @@ pub mod app { .await .map_err(Into::into) .and_then(unwrap_or_error(&loc, "Application script", id)) - .map(|r| ScriptData::from_raw(r.lock, Some(r.code))) - }) + .map(|r| RawScript { content: r.code, lock: r.lock, meta: None }) + }); + fut.map_ok(|Entry(data)| data) } } @@ -532,13 +570,6 @@ pub mod job { use super::*; use crate::jobs::JobKind; - use uuid::Uuid; - - lazy_static! { - /// Very small in-memory cache for "preview" jobs raw data. - static ref PREVIEWS: Cache = Cache::new(50); - } - #[track_caller] pub fn fetch_preview_flow<'a, 'c>( e: impl PgExecutor<'c> + 'a, @@ -588,6 +619,18 @@ pub mod job { raw_code: Option, raw_flow: Option>>, ) -> impl Future> + 'a { + #[cfg(not(feature = "scoped_cache"))] + lazy_static! { + /// Very small in-memory cache for "preview" jobs raw data. + static ref PREVIEWS: Cache = Cache::new(50); + } + + #[cfg(feature = "scoped_cache")] + lazy_static! { + /// Very small in-memory cache for "preview" jobs raw data. + static ref PREVIEWS: Cache<(ThreadId, Uuid), RawData> = Cache::new(50); + } + let loc = Location::caller(); let fetch = async move { match (raw_lock, raw_code, raw_flow) { @@ -603,17 +646,21 @@ pub mod job { .map(|r| (r.raw_lock, r.raw_code, r.raw_flow)), (lock, code, flow) => Ok((lock, code, flow)), } - .map(|(lock, code, flow)| match flow { - Some(Json(flow)) => RawData::Flow(Arc::new(FlowData::from_raw(flow))), - _ => RawData::Script(Arc::new(ScriptData::from_raw(lock, code))), + .and_then(|(lock, code, flow)| match flow { + Some(Json(flow)) => FlowData::from_raw(flow).map(Arc::new).map(RawData::Flow), + _ => Ok(RawData::Script(Arc::new(ScriptData { + code: code.unwrap_or_default(), + lock, + }))), }) }; - // Disable caching in tests: as `#[sqlx::test]` spawn a database per test, the cache - // could yield unexpected results. - #[cfg(test)] - return fetch; - #[cfg(not(test))] - PREVIEWS.get_or_insert_async(job, fetch) + #[cfg(not(feature = "scoped_cache"))] + return PREVIEWS.get_or_insert_async(job, fetch); + #[cfg(feature = "scoped_cache")] + async move { + let job = &(std::thread::current().id(), job.clone()); + PREVIEWS.get_or_insert_async(job, fetch).await + } } #[track_caller] @@ -629,7 +676,7 @@ pub mod job { (FlowScript, Some(id)) => flow::fetch_script(e, FlowNodeId(id)).await, (Script | Dependencies, Some(hash)) => script::fetch(e, ScriptHash(hash)) .await - .map(|(raw_script, _metadata)| raw_script), + .map(|(data, _meta)| data), (AppScript, Some(id)) => app::fetch_script(e, AppScriptId(id)).await, _ => Err(error::Error::InternalErr(format!( "Isn't a script job: {:?}", @@ -666,173 +713,161 @@ pub mod job { } } -mod fs { - use super::*; +const _: () = { + impl Import for RawFlow { + fn import(src: &impl Storage) -> error::Result { + Ok(Self { raw_flow: src.get_json_raw("flow.json")? }) + } + } - use std::fs::{self, OpenOptions}; - use std::io::{Read, Write}; + impl Export for FlowData { + type Untrusted = RawFlow; - use uuid::Uuid; + fn resolve(src: Self::Untrusted) -> error::Result { + FlowData::from_raw(src.raw_flow) + } - /// A bundle of items that can be imported/exported from/into the file-system. - pub trait Bundle: Default { - /// Item type of the bundle. - type Item: Item + Copy; - /// Returns a slice of all items than **can** exists within the bundle. - fn items() -> impl Iterator; - /// Import the given `data` into the `item`. - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()>; - /// Export the `item` into a `Vec`. - fn export(&self, item: Self::Item) -> error::Result>>; + fn export(&self, dst: &impl Storage) -> error::Result<()> { + Ok(dst.put("flow.json", self.raw_flow.get().as_bytes())?) + } } - /// An item that can be imported/exported from/into the file-system. - pub trait Item: Sized { - /// Returns the path of the item within the given `root` path. - fn path(&self, root: impl AsRef) -> PathBuf; + impl Import for RawScript { + fn import(src: &impl Storage) -> error::Result { + let content = src.get_utf8("code.txt")?; + let lock = src.get_utf8("lock.txt").ok(); + let meta = src.get_json("info.json").ok(); + Ok(Self { content, lock, meta }) + } } - /// Import or insert a bundle within the given combination of `{root}/{key}/`. - pub async fn import_or_insert_with(path: impl AsRef, f: F) -> error::Result - where - T: Bundle, - F: Future>, - { - let path = path.as_ref(); - // Retrieve the data from the cache directory or the database. - if fs::metadata(path).is_ok() { - // Cache path exists, read its contents. - let import = || -> error::Result { - let mut data = T::default(); - for item in T::items() { - let mut buf = vec![]; - let Ok(mut file) = OpenOptions::new().read(true).open(item.path(path)) else { - continue; - }; - file.read_to_end(&mut buf)?; - data.import(item, buf)?; - } - tracing::debug!("Imported from file-system: {:?}", path); - Ok(data) - }; - match import() { - Ok(data) => return Ok(data), - Err(err) => tracing::warn!( - "Failed to import from file-system, fetch source..: {path:?}: {err:?}" - ), - } + impl Export for ScriptData { + type Untrusted = RawScript; + + fn resolve(src: Self::Untrusted) -> error::Result { + Ok(ScriptData { code: src.content, lock: src.lock }) } - // Cache path doesn't exist or import failed, generate the content. - let data = f.await?; - let export = |data: &T| -> error::Result<()> { - fs::create_dir_all(path)?; - // Write the generated data to the file. - for item in T::items() { - let Some(buf) = data.export(item)? else { - continue; - }; - let mut file = OpenOptions::new() - .write(true) - .create(true) - .open(item.path(path))?; - file.write_all(&buf)?; + + fn export(&self, dst: &impl Storage) -> error::Result<()> { + dst.put("code.txt", self.code.as_bytes())?; + if let Some(lock) = self.lock.as_ref() { + dst.put("lock.txt", lock.as_bytes())?; } - tracing::debug!("Exported to file-system: {:?}", path); Ok(()) - }; - // Try to export data to the file-system. - // If failed, remove the directory but still return the data. - if let Err(err) = export(&data) { - tracing::warn!("Failed to export to file-system: {path:?}: {err:?}"); - let _ = fs::remove_dir_all(path); } - Ok(data) } - // Implement `Bundle`. - - // Empty bundle. - impl Bundle for () { - type Item = &'static str; + impl Export for ScriptFull { + type Untrusted = RawScript; - fn items() -> impl Iterator { - [].into_iter() + fn resolve(mut src: Self::Untrusted) -> error::Result { + let Some(meta) = src.meta.take() else { + return Err(error::Error::InternalErr("Invalid script src".to_string())); + }; + Ok(ScriptFull { + data: Arc::new(ScriptData { code: src.content, lock: src.lock }), + meta: Arc::new(meta), + }) } - fn import(&mut self, _: Self::Item, _: Vec) -> error::Result<()> { + fn export(&self, dst: &impl Storage) -> error::Result<()> { + self.data.export(dst)?; + self.meta.export(dst)?; Ok(()) } + } - fn export(&self, _: Self::Item) -> error::Result>> { - Ok(None) + impl Import for RawNode { + fn import(src: &impl Storage) -> error::Result { + let code = src.get_utf8("code.txt").ok(); + let lock = src.get_utf8("lock.txt").ok(); + let flow = src.get_json_raw("flow.json").ok(); + Ok(Self { raw_code: code, raw_lock: lock, raw_flow: flow }) } } - // JSON bundle. - impl Deserialize<'de> + Serialize + Default> Bundle for Json { - type Item = &'static str; + impl Export for RawData { + type Untrusted = RawNode; - fn items() -> impl Iterator { - ["self.json"].into_iter() - } - - fn import(&mut self, _: Self::Item, data: Vec) -> error::Result<()> { - self.0 = serde_json::from_slice(&data)?; - Ok(()) + fn resolve(src: Self::Untrusted) -> error::Result { + match src { + RawNode { raw_flow: Some(flow), .. } => { + FlowData::from_raw(flow).map(Arc::new).map(Self::Flow) + } + RawNode { raw_code: Some(code), raw_lock: lock, .. } => { + Ok(Self::Script(Arc::new(ScriptData { code, lock }))) + } + _ => Err(error::Error::InternalErr( + "Invalid raw data src".to_string(), + )), + } } - fn export(&self, _: Self::Item) -> error::Result>> { - Ok(Some(serde_json::to_vec(&self.0)?)) + fn export(&self, dst: &impl Storage) -> error::Result<()> { + match self { + RawData::Flow(data) => data.export(dst), + RawData::Script(data) => data.export(dst), + } } } - // Optional bundle. - impl Bundle for Option { - type Item = T::Item; + impl Export for Entry { + type Untrusted = T::Untrusted; - fn items() -> impl Iterator { - T::items() + fn resolve(src: Self::Untrusted) -> error::Result { + Ok(Entry(Arc::new(T::resolve(src)?))) } - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { - let mut x = T::default(); - x.import(item, data)?; - *self = Some(x); - Ok(()) + fn export(&self, dst: &impl Storage) -> error::Result<()> { + self.0.export(dst) } + } - fn export(&self, item: Self::Item) -> error::Result>> { - match self { - Some(x) => x.export(item), - _ => Ok(None), - } + impl Deserialize<'de> + Serialize> Import for T { + fn import(src: &impl Storage) -> error::Result { + let data = src.get("self.json")?; + Ok(serde_json::from_slice(&data)?) } } - // Bundle pair. - impl, B: Bundle> Bundle for (A, B) { - type Item = I; + impl Deserialize<'de> + Serialize> Export for T { + type Untrusted = Self; - fn items() -> impl Iterator { - A::items().chain(B::items()) + fn resolve(src: Self::Untrusted) -> error::Result { + Ok(src) } - fn import(&mut self, item: Self::Item, data: Vec) -> error::Result<()> { - match A::items().any(|i| i == item) { - true => self.0.import(item, data), - _ => self.1.import(item, data), - } + fn export(&self, dst: &impl Storage) -> error::Result<()> { + Ok(dst.put("self.json", serde_json::to_vec(self)?)?) } + } - fn export(&self, item: Self::Item) -> error::Result>> { - match A::items().any(|i| i == item) { - true => self.0.export(item), - _ => self.1.export(item), - } + impl> Storage for T { + fn get(&self, item: impl Item) -> std::io::Result> { + use std::fs::OpenOptions; + use std::io::Read; + + OpenOptions::new() + .read(true) + .open(item.path(self)) + .and_then(|mut file| { + let mut buf = vec![]; + file.read_to_end(&mut buf)?; + Ok(buf) + }) } - } - // Implement `Item`. + fn put(&self, item: impl Item, data: impl AsRef<[u8]>) -> std::io::Result<()> { + use std::fs::OpenOptions; + use std::io::Write; + + OpenOptions::new() + .write(true) + .create(true) + .open(item.path(self)) + .and_then(|mut file| file.write_all(data.as_ref())) + } + } macro_rules! impl_item { ($( ($t:ty, |$x:ident| $join:expr) ),*) => { @@ -857,22 +892,11 @@ mod fs { (AppScriptId, |x| format!("{:016x}", x.0)) } - #[cfg(test)] - #[test] - fn test_items() { - let p = "test".path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/test")); - let p = i64::MAX.path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); - let p = u64::MAX.path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/ffffffffffffffff")); - let p = Uuid::from_u128(u128::MAX).path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/ffffffffffffffffffffffffffffffff")); - let p = ScriptHash(i64::MAX).path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); - let p = FlowNodeId(i64::MAX).path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); - let p = AppScriptId(i64::MAX).path("/tmp"); - assert_eq!(p, PathBuf::from("/tmp/7fffffffffffffff")); + #[cfg(feature = "scoped_cache")] + impl Item for (ThreadId, T) { + fn path(&self, root: impl AsRef) -> PathBuf { + let (id, item) = self; + item.path(root.as_ref().join(format!("{id:?}"))) + } } -} +}; diff --git a/backend/windmill-common/src/flows.rs b/backend/windmill-common/src/flows.rs index 4beca757f48b0..370b8a69ecf45 100644 --- a/backend/windmill-common/src/flows.rs +++ b/backend/windmill-common/src/flows.rs @@ -840,7 +840,7 @@ pub async fn resolve_modules( if let Some(id) = modules_node { *modules = cache::flow::fetch_flow(e, id) .await - .and_then(|data| Ok(data.value()?.modules.clone()))?; + .map(|data| data.value().modules.clone())?; } for module in modules.iter_mut() { Box::pin(resolve_module( diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 88d53e50db5fa..a99f9a3fcfae5 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -2868,7 +2868,7 @@ pub async fn push<'c, 'd>( ), JobPayload::FlowNode { id, path } => { let data = cache::flow::fetch_flow(_db, id).await?; - let value = data.value()?; + let value = data.value(); let status = Some(FlowStatus::new(value)); // Keep inserting `value` if not all workers are updated. // Starting at `v1.440`, the value is fetched on pull from the flow node id. @@ -3020,7 +3020,7 @@ pub async fn push<'c, 'd>( // The version has been inserted only within the transaction. let data = cache::flow::fetch_version(&mut *ntx, version).await?; tx = PushIsolationLevel::Transaction(ntx); - Some(data.value()?.clone()) + Some(data.value().clone()) } else { // `raw_flow` is fetched on pull. None @@ -3218,7 +3218,7 @@ pub async fn push<'c, 'd>( }?; tx = PushIsolationLevel::Transaction(ntx); - let value = data.value()?.clone(); + let value = data.value().clone(); let priority = value.priority; let cache_ttl = value.cache_ttl.map(|x| x as i32); let custom_concurrency_key = value.concurrency_key.clone(); @@ -3311,7 +3311,7 @@ pub async fn push<'c, 'd>( user_states, preprocessor_module: None, }; - let value = flow_data.value()?; + let value = flow_data.value(); let priority = value.priority; let concurrency_key = value.concurrency_key.clone(); let concurrent_limit = value.concurrent_limit; @@ -3781,7 +3781,7 @@ async fn restarted_flows_resolution( let flow_data = cache::job::fetch_flow(db, row.job_kind, row.script_hash) .or_else(|_| cache::job::fetch_preview_flow(db, &completed_flow_id, row.raw_flow)) .await?; - let flow_value = flow_data.value()?; + let flow_value = flow_data.value(); let flow_status = row .flow_status .as_ref() diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 9f71a7d80725c..40978527da9f6 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -250,7 +250,7 @@ pub async fn update_flow_status_after_job_completion_internal( let flow_data = cache::job::fetch_flow(db, job_kind, script_hash) .or_else(|_| cache::job::fetch_preview_flow(db, &flow, raw_flow)) .await?; - let flow_value = flow_data.value()?; + let flow_value = flow_data.value(); let module_step = Step::from_i32_and_len(old_status.step, old_status.modules.len()); let current_module = match module_step { @@ -1499,7 +1499,7 @@ pub async fn handle_flow( worker_dir: &str, job_completed_tx: Sender, ) -> anyhow::Result<()> { - let flow = flow_data.value()?; + let flow = flow_data.value(); let status = flow_job .parse_flow_status() .with_context(|| "Unable to parse flow status")?; diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index 38611e2b0b4d2..8d45ec9ca295b 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -603,7 +603,7 @@ pub async fn handle_flow_dependency_job( _ => return Err(Error::InternalErr("expected script hash".into())), }, } - .value()? + .value() .clone(); let mut tx = db.begin().await?;