From 9114a952b698360ced989cb0df698357ea43c6a2 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 12 Feb 2026 12:45:49 +0100 Subject: [PATCH 01/22] test: create a test for m0002010 using DS4 --- Cargo.lock | 34 ++++- Cargo.toml | 2 + common/Cargo.toml | 3 + common/db/Cargo.toml | 2 +- common/db/src/embedded.rs | 20 +-- common/src/decompress.rs | 37 ++++- migration/tests/data/m0002010.rs | 23 ++- test-context/Cargo.toml | 3 + test-context/src/ctx/migration.rs | 122 ++++++++++++--- test-context/src/ctx/mod.rs | 2 +- test-context/src/migration.rs | 237 ++++++++++++++++++++---------- 11 files changed, 366 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58388e948..d6bcc71f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,6 +535,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "async-tar" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e9933aa2da420042c67e2ea83eec765919347d9742592744dc97cc42ef20c5d" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.7.0", + "tokio", + "tokio-stream", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -1621,7 +1635,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" dependencies = [ "compression-core", + "flate2", "liblzma", + "memchr", "zstd", "zstd-safe", ] @@ -4244,6 +4260,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "lzma-rust2" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d673a11333485e7d8b93d62a9a5b07b22daf5e8a8655a44c1bb18aa4bf3d1524" +dependencies = [ + "crc", + "sha2", +] + [[package]] name = "matchers" version = "0.2.0" @@ -8135,12 +8161,14 @@ version = "0.5.0-alpha.1" dependencies = [ "actix-web", "anyhow", + "async-compression", "bytes", "bytesize", "chrono", "clap", "cpe", "deepsize", + "flate2", "hex", "hide", "human-date-parser", @@ -8148,6 +8176,7 @@ dependencies = [ "itertools 0.14.0", "lenient_semver", "log", + "lzma-rust2 0.16.1", "native-tls", "packageurl", "pem", @@ -8678,10 +8707,12 @@ dependencies = [ "actix-http", "actix-web", "anyhow", + "async-tar", "bytes", "futures", "git2", "hex", + "indicatif", "log", "osv", "peak_alloc", @@ -8708,6 +8739,7 @@ dependencies = [ "trustify-migration", "trustify-module-ingestor", "trustify-module-storage", + "urlencoding", "utoipa-actix-web", "walkdir", "zip 8.1.0", @@ -10125,7 +10157,7 @@ dependencies = [ "getrandom 0.4.1", "hmac", "indexmap 2.13.0", - "lzma-rust2", + "lzma-rust2 0.13.0", "memchr", "pbkdf2", "ppmd-rust", diff --git a/Cargo.toml b/Cargo.toml index 3ca4d4ab1..06ea3dfe8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ actix-web-static-files = "4.0.1" anyhow = "1.0.72" async-compression = "0.4.13" async-recursion = "1" +async-tar = { version = "0.6", default-features = false, features = ["runtime-tokio"] } async-trait = "0.1.74" aws-config = { version = "1.8.14", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.124.0", features = ["behavior-version-latest"] } @@ -88,6 +89,7 @@ json-merge-patch = "0.0.1" jsonpath-rust = "1.0.1" lenient_semver = "0.4.2" liblzma = "0.4" +lzma-rust2 = "0.16.1" libz-sys = "*" log = "0.4.19" mime = "0.3.17" diff --git a/common/Cargo.toml b/common/Cargo.toml index 74f7a937b..56f830cc7 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,12 +9,14 @@ rust-version.workspace = true [dependencies] actix-web = { workspace = true } anyhow = { workspace = true } +async-compression = { workspace = true, features = ["gzip", "lzma"] } bytes = { workspace = true } bytesize = { workspace = true, features = ["serde"] } chrono = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } cpe = { workspace = true } deepsize = { workspace = true } +flate2 = { workspace = true } hex = { workspace = true } hide = { workspace = true } human-date-parser = { workspace = true } @@ -22,6 +24,7 @@ humantime = { workspace = true } itertools = { workspace = true } lenient_semver = { workspace = true } log = { workspace = true } +lzma-rust2 = { workspace = true } native-tls = { workspace = true } packageurl = { workspace = true } pem = { workspace = true } diff --git a/common/db/Cargo.toml b/common/db/Cargo.toml index 22ac4a734..bcad42c67 100644 --- a/common/db/Cargo.toml +++ b/common/db/Cargo.toml @@ -11,7 +11,7 @@ trustify-migration = { workspace = true } trustify-common = { workspace = true } anyhow = { workspace = true } -async-compression = { workspace = true, features = ["tokio", "lzma"] } +async-compression = { workspace = true, features = ["tokio", "lzma", "gzip"] } log = { workspace = true } postgresql_commands = { workspace = true, features = ["tokio"] } postgresql_embedded = { workspace = true, features = ["blocking", "tokio"] } diff --git a/common/db/src/embedded.rs b/common/db/src/embedded.rs index c9d9f822e..8a370987f 100644 --- a/common/db/src/embedded.rs +++ b/common/db/src/embedded.rs @@ -1,12 +1,8 @@ use anyhow::Context; use postgresql_embedded::{PostgreSQL, Settings, VersionReq}; -use std::{ - path::{Path, PathBuf}, - pin::Pin, -}; -use tokio::io::{AsyncRead, BufReader}; +use std::path::{Path, PathBuf}; use tracing::{Instrument, info_span}; -use trustify_common::db::Database; +use trustify_common::{db::Database, decompress::decompress_async_read}; /// Create common default settings for the embedded database pub fn default_settings() -> anyhow::Result { @@ -92,17 +88,7 @@ pub async fn create_for( Source::Import(path) => { log::info!("Importing from: {}", path.display()); - let source = tokio::fs::File::open(&path).await?; - let source = BufReader::new(source); - - let source: Pin> = match path - .extension() - .and_then(|ext| ext.to_str()) - { - None | Some("sql") => Box::pin(source), - Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)), - Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), - }; + let source = decompress_async_read(path).await?; super::Database::import(&config, source) .await diff --git a/common/src/decompress.rs b/common/src/decompress.rs index 2b568ca76..84809a70d 100644 --- a/common/src/decompress.rs +++ b/common/src/decompress.rs @@ -1,7 +1,12 @@ use actix_web::http::header; use anyhow::anyhow; use bytes::Bytes; -use tokio::{runtime::Handle, task::JoinError}; +use std::{io::Read, path::Path, pin::Pin}; +use tokio::{ + io::{AsyncRead, BufReader}, + runtime::Handle, + task::JoinError, +}; use tracing::instrument; use walker_common::compression::{Compression, DecompressionOptions, Detector}; @@ -95,6 +100,36 @@ fn detect(content_type: Option, bytes: &[u8]) -> Result, +) -> anyhow::Result>> { + let path = path.as_ref(); + let source = tokio::fs::File::open(path).await?; + let source = BufReader::new(source); + + Ok(match path.extension().and_then(|ext| ext.to_str()) { + None | Some("sql") => Box::pin(source), + Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)), + Some("gz") => Box::pin(async_compression::tokio::bufread::GzipDecoder::new(source)), + Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), + }) +} + +/// Take a file, return a wrapped [`Read`], and wrap that with the required compression decoder. +pub fn decompress_read(path: impl AsRef) -> anyhow::Result> { + let path = path.as_ref(); + let source = std::fs::File::open(path)?; + let source = std::io::BufReader::new(source); + + Ok(match path.extension().and_then(|ext| ext.to_str()) { + None | Some("sql") => Box::new(source), + Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)), + Some("gz") => Box::new(flate2::read::GzDecoder::new(source)), + Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), + }) +} + #[cfg(test)] mod test { use crate::decompress::decompress_async; diff --git a/migration/tests/data/m0002010.rs b/migration/tests/data/m0002010.rs index c85fd52cd..96fe5bc82 100644 --- a/migration/tests/data/m0002010.rs +++ b/migration/tests/data/m0002010.rs @@ -4,7 +4,7 @@ use migration::data::{Database, Direction, MigrationWithData, Options, Runner}; use sea_orm_migration::MigratorTrait; use test_context::test_context; use test_log::test; -use trustify_test_context::{TrustifyMigrationContext, commit, ctx::DumpId}; +use trustify_test_context::{TrustifyMigrationContext, commit, dump}; commit!(Commit("6d3ea814b4b44fe16ea8f21724dda5abb0fc7932")); @@ -42,3 +42,24 @@ async fn examples( Ok(()) } + +// A dump created before merging the SBOM CVSS enhancements +dump!(Ds4( + "https://trustify-dumps-ds4.s3.eu-west-1.amazonaws.com/20251104T095645Z", + storage = "dump.tar.gz", + no_digests, +)); + +/// Test the performance of applying the data migration of `m0002010`. +#[test_context(TrustifyMigrationContext)] +#[test(tokio::test)] +async fn performance(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Error> { + MigrationWithData::run_with_test(ctx.storage.clone(), Options::default(), async { + MigratorTest::up(&ctx.db, None).await + }) + .await?; + + // done + + Ok(()) +} diff --git a/test-context/Cargo.toml b/test-context/Cargo.toml index 83f13a1d6..d34afb3f4 100644 --- a/test-context/Cargo.toml +++ b/test-context/Cargo.toml @@ -18,9 +18,11 @@ trustify-module-storage = { workspace = true } actix-http = { workspace = true } actix-web = { workspace = true } anyhow = { workspace = true } +async-tar = { workspace = true } bytes = { workspace = true } futures = { workspace = true } git2 = { workspace = true } +indicatif = { workspace = true } log = { workspace = true } peak_alloc = { workspace = true } postgresql_embedded = { workspace = true } @@ -38,6 +40,7 @@ tracing = { workspace = true } tracing-flame = { workspace = true } tracing-log = { workspace = true } tracing-subscriber = { workspace = true } +urlencoding = { workspace = true } utoipa-actix-web = { workspace = true } walkdir = { workspace = true } zip = { workspace = true } diff --git a/test-context/src/ctx/migration.rs b/test-context/src/ctx/migration.rs index 93e309715..8cbc848f3 100644 --- a/test-context/src/ctx/migration.rs +++ b/test-context/src/ctx/migration.rs @@ -1,11 +1,14 @@ -use crate::{TrustifyTestContext, migration::Migration}; +use crate::{ + TrustifyTestContext, + migration::Migration, + migration::{Dump, Dumps}, +}; use anyhow::Context; -use std::borrow::Cow; -use std::marker::PhantomData; -use std::ops::Deref; +use std::{borrow::Cow, marker::PhantomData, ops::Deref}; use tar::Archive; use test_context::AsyncTestContext; -use trustify_db::embedded::{Options, Source, default_settings}; +use trustify_common::decompress::decompress_read; +use trustify_db::embedded::{Options, default_settings}; use trustify_module_storage::service::fs::FileSystemBackend; #[macro_export] @@ -13,21 +16,71 @@ macro_rules! commit { ($t:ident($id:literal)) => { pub struct $t; - impl DumpId for $t { - fn dump_id() -> Option<&'static str> { - Some($id) + impl $crate::ctx::DumpId for $t { + fn dump_id() -> $crate::ctx::MigrationSource { + $crate::ctx::MigrationSource::Migration(Some($id)) } } }; } +#[macro_export] +macro_rules! dump { + ($t:ident($url:literal $(, $($rest:tt)*)? )) => { + $crate::dump!(@parse $t, $url, db = "dump.sql.gz", storage = "dump.tar", digests = true, $($($rest)*)?); + }; + + (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr, db = $new_db:literal, $($rest:tt)*) => { + $crate::dump!(@parse $t, $url, db = $new_db, storage = $storage, digests = $digests, $($rest)*); + }; + (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr, storage = $new_storage:literal, $($rest:tt)*) => { + $crate::dump!(@parse $t, $url, db = $db, storage = $new_storage, digests = $digests, $($rest)*); + }; + (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr, no_digests, $($rest:tt)*) => { + $crate::dump!(@parse $t, $url, db = $db, storage = $storage, digests = false, $($rest)*); + }; + + (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr,) => { + $crate::dump!(@emit $t, $url, $db, $storage, $digests); + }; + + (@emit $t:ident, $url:literal, $db:literal, $storage:literal, $digests:expr) => { + pub struct $t; + + impl $crate::ctx::DumpId for $t { + fn dump_id() -> $crate::ctx::Source { + $crate::ctx::Source::Dump { + base_url: $url, + db_file: $db, + storage_file: $storage, + digests: $digests, + } + } + } + }; +} + +pub enum Source { + Migration(Option<&'static str>), + Dump { + /// base URL to the dump files + base_url: &'static str, + /// DB file name + db_file: &'static str, + /// storage archive + storage_file: &'static str, + /// if there are digests for the files + digests: bool, + }, +} + pub trait DumpId { - fn dump_id() -> Option<&'static str>; + fn dump_id() -> Source; } impl DumpId for () { - fn dump_id() -> Option<&'static str> { - None + fn dump_id() -> Source { + Source::Migration(None) } } @@ -47,12 +100,37 @@ impl Deref for TrustifyMigrationContext { impl TrustifyMigrationContext { pub async fn new() -> anyhow::Result { - let migration = Migration::new().expect("failed to create migration manager"); - let id: Cow<'static, str> = match ID::dump_id() { - Some(id) => format!("commit-{id}").into(), - None => "latest".into(), + let (base, db_file, storage_file) = match ID::dump_id() { + Source::Migration(migration) => { + let id: Cow<'static, str> = match migration { + Some(id) => format!("commit-{id}").into(), + None => "latest".into(), + }; + let migration = Migration::new().context("failed to create migration manager")?; + let base = migration.provide(&id).await?; + (base, "dump.sql.xz", "dump.tar") + } + + Source::Dump { + base_url, + db_file, + storage_file, + digests, + } => { + let base = Dumps::new()? + .provide(Dump { + url: base_url, + files: &[db_file, storage_file], + digests, + }) + .await?; + + (base, db_file, storage_file) + } }; - let base = migration.provide(&id).await?; + + let storage_file = base.join(storage_file); + log::info!("Importing dump: {}", storage_file.display()); // create storage @@ -60,13 +138,15 @@ impl TrustifyMigrationContext { .await .expect("Unable to create storage backend"); - let mut archive = Archive::new( - std::fs::File::open(base.join("dump.tar")).context("failed to open storage dump")?, - ); + let source = decompress_read(storage_file).context("failed to open storage dump")?; + + let mut archive = Archive::new(source); archive .unpack(tmp.path()) .context("failed to unpack storage dump")?; + log::info!("Storage unpacked"); + // create DB let settings = default_settings().context("unable to create default settings")?; @@ -74,12 +154,14 @@ impl TrustifyMigrationContext { let (db, postgresql) = trustify_db::embedded::create_for( settings, Options { - source: Source::Import(base.join("dump.sql.xz")), + source: trustify_db::embedded::Source::Import(base.join(db_file)), }, ) .await .context("failed to create an embedded database")?; + log::info!("Database imported"); + Ok(Self( TrustifyTestContext::new(db, storage, tmp, postgresql).await, Default::default(), diff --git a/test-context/src/ctx/mod.rs b/test-context/src/ctx/mod.rs index b2f56979a..08dc48c1d 100644 --- a/test-context/src/ctx/mod.rs +++ b/test-context/src/ctx/mod.rs @@ -3,5 +3,5 @@ mod migration; mod read_only; pub use default::*; -pub use migration::*; +pub use migration::{Source as MigrationSource, *}; pub use read_only::*; diff --git a/test-context/src/migration.rs b/test-context/src/migration.rs index 64a5be59c..6153f0147 100644 --- a/test-context/src/migration.rs +++ b/test-context/src/migration.rs @@ -1,6 +1,7 @@ use anyhow::{Context, anyhow}; use futures::StreamExt; use git2::{BranchType, Repository}; +use indicatif::{ProgressBar, ProgressStyle}; use sha2::Digest; use std::{env, fs::File, path::Path, path::PathBuf}; use tokio::{ @@ -12,7 +13,7 @@ use tokio::{ /// Manage the download of migration dumps #[derive(Debug)] pub struct Migration { - base: PathBuf, + dump: Dumps, branch: String, region: String, @@ -22,18 +23,6 @@ pub struct Migration { impl Migration { /// Create a new instance, detecting paths and the branch pub fn new() -> anyhow::Result { - // base for storing dumps, does not include the branch name - let base = env::var_os("TRUSTIFY_MIGRATION_DUMPS") - .map(PathBuf::from) - .or_else(|| { - env::current_dir() - .ok() - .map(|path| path.join(".trustify").join("migration-dumps")) - }) - .ok_or_else(|| anyhow!("unable to determine migration dumps directory"))?; - - log::info!("Using migration base: '{}'", base.display()); - // get the base of the source code let cwd: PathBuf = match option_env!("CARGO_MANIFEST_DIR") { @@ -58,7 +47,7 @@ impl Migration { // done Ok(Self { - base, + dump: Dumps::new()?, branch, region, bucket, @@ -68,56 +57,22 @@ impl Migration { /// Provide the base dump path, for this branch. /// /// This may include downloading content from S3. - pub async fn provide(&self, id: &str) -> anyhow::Result { - let base = self.base.join(&self.branch).join(id); - - log::info!("branch base path: '{}'", base.display()); - - fs::create_dir_all(&base).await?; - - // lock file, we can't lock directories cross-platform - - let lock = task::spawn_blocking({ - let base = base.clone(); - move || { - let lock = File::create(base.join(".lock"))?; - // the existence of the lock file means nothing, only the lock on it - lock.lock()?; - - Ok::<_, anyhow::Error>(lock) - } - }) - .await??; - - // holding the lock - - let files = ["dump.sql.xz", "dump.tar"]; - - if files.iter().any(|file| !base.join(file).exists()) { - let client = reqwest::Client::new(); - download_artifacts( - client, - &base, - &self.bucket, - &self.region, - &self.branch, - id, - files, + pub async fn provide(&self, commit: &str) -> anyhow::Result { + self.dump + .provide_raw( + "migration", + Dump { + url: &format!( + "https://{bucket}.s3.{region}.amazonaws.com/{branch}/{commit}", + bucket = self.bucket, + region = self.region, + branch = self.branch, + ), + files: &["dump.sql.xz", "dump.tar"], + digests: true, + }, ) - .await? - } else { - log::debug!("dump files already exist"); - } - - // validate checksums - - validate_checksums(&base, files).await?; - - // unlock - - lock.unlock()?; - - Ok(base) + .await } } @@ -257,6 +212,7 @@ async fn validate_checksums( for file in files { let file = file.as_ref(); + log::info!("validating checksum: {file}"); // open content file @@ -303,44 +259,171 @@ async fn validate_checksums( } /// just download artifacts (and their digest files) from the dump bucket -async fn download_artifacts( +async fn download_artifacts_raw( client: reqwest::Client, base: impl AsRef, - bucket: &str, - region: &str, - branch: &str, - commit: &str, + base_url: &str, + digests: bool, files: impl IntoIterator>, ) -> anyhow::Result<()> { let base = base.as_ref(); for file in files.into_iter().flat_map(|file| { let file = file.as_ref(); - vec![file.to_string(), format!("{file}.sha256")] + let mut files = vec![file.to_string()]; + if digests { + files.push(format!("{file}.sha256")); + } + files }) { - let url = format!("https://{bucket}.s3.{region}.amazonaws.com/{branch}/{commit}/{file}",); + let url = format!("{base_url}/{file}"); log::info!("downloading file: '{url}'"); - let mut dest = fs::File::create(base.join(file)).await?; - let mut stream = client - .get(&url) - .send() - .await? - .error_for_status()? - .bytes_stream(); + let response = client.get(&url).send().await?.error_for_status()?; + + let total_size = response.content_length(); + log::info!("total size: {total_size:?}"); + + let pb = if let Some(size) = total_size { + let pb = ProgressBar::new(size); + pb.set_style( + ProgressStyle::with_template( + "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})", + )? + .progress_chars("##-"), + ); + pb + } else { + let pb = ProgressBar::new_spinner(); + pb.set_style(ProgressStyle::with_template( + "{spinner:.green} [{elapsed_precise}] {bytes} ({bytes_per_sec})", + )?); + pb + }; + + pb.set_message(file.clone()); + + let mut dest = fs::File::create(base.join(&file)).await?; + let mut stream = response.bytes_stream(); while let Some(chunk) = stream.next().await { let chunk = chunk?; + pb.inc(chunk.len() as u64); io::copy(&mut chunk.as_ref(), &mut dest).await?; } + pb.finish_with_message(format!("{file} done")); dest.shutdown().await?; } Ok(()) } +pub struct Dump<'a, S: AsRef + 'a> { + pub url: &'a str, + pub files: &'a [S], + pub digests: bool, +} + +/// Manage raw dump downloads +#[derive(Debug)] +pub struct Dumps { + base: PathBuf, +} + +impl Dumps { + pub fn new() -> anyhow::Result { + // base for storing dumps, does not include the branch name + let base = env::var_os("TRUSTIFY_MIGRATION_DUMPS") + .map(PathBuf::from) + .or_else(|| { + env::current_dir() + .ok() + .map(|path| path.join(".trustify").join("migration-dumps")) + }) + .ok_or_else(|| anyhow!("unable to determine migration dumps directory"))?; + + log::info!("Using migration base: '{}'", base.display()); + + Ok(Self { base }) + } + + /// Provide the base dump path, for this branch. + /// + /// This may include downloading content from S3. + pub async fn provide<'a, S>(&self, dump: Dump<'a, S>) -> anyhow::Result + where + S: AsRef + 'a, + { + self.provide_raw("url", dump).await + } + + /// Provide the base dump path, for this branch. + /// + /// This may include downloading content from S3. + async fn provide_raw<'a, S>( + &self, + r#rtype: &str, + Dump { + url, + files, + digests, + }: Dump<'a, S>, + ) -> anyhow::Result + where + S: AsRef + 'a, + { + let dir = urlencoding::encode(url); + let files = files.iter().map(|s| s.as_ref()).collect::>(); + + let base = self.base.join(r#rtype).join(&*dir); + + log::debug!("base path: '{}'", base.display()); + + fs::create_dir_all(&base).await?; + + // lock file, we can't lock directories cross-platform + + let lock = task::spawn_blocking({ + let base = base.clone(); + move || { + let lock = File::create(base.join(".lock"))?; + log::debug!("Waiting for lock file"); + // the existence of the lock file means nothing, only the lock on it + lock.lock()?; + log::debug!("Lock acquired"); + Ok::<_, anyhow::Error>(lock) + } + }) + .await??; + + // holding the lock + + if files.iter().any(|file| !base.join(file).exists()) { + let client = reqwest::Client::new(); + download_artifacts_raw(client, &base, url, digests, &files).await? + } else { + log::debug!("dump files already exist"); + } + + // validate checksums + + if digests { + validate_checksums(&base, files).await?; + } else { + log::debug!("Skip checking digests"); + } + + // unlock + + log::debug!("releasing lock"); + lock.unlock()?; + + Ok(base) + } +} + #[cfg(test)] mod tests { use super::*; From aa4cca83e960bf29d3868e5556fec08564685e55 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 24 Feb 2026 10:35:51 +0100 Subject: [PATCH 02/22] chore: refresh deps --- Cargo.lock | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6bcc71f4..6bfa4d967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -544,7 +544,7 @@ dependencies = [ "filetime", "futures-core", "libc", - "redox_syscall 0.7.0", + "redox_syscall 0.7.1", "tokio", "tokio-stream", ] @@ -4251,15 +4251,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" -[[package]] -name = "lzma-rust2" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47bb1e988e6fb779cf720ad431242d3f03167c1b3f2b1aae7f1a94b2495b36ae" -dependencies = [ - "sha2", -] - [[package]] name = "lzma-rust2" version = "0.16.1" @@ -8176,7 +8167,7 @@ dependencies = [ "itertools 0.14.0", "lenient_semver", "log", - "lzma-rust2 0.16.1", + "lzma-rust2", "native-tls", "packageurl", "pem", @@ -10157,7 +10148,7 @@ dependencies = [ "getrandom 0.4.1", "hmac", "indexmap 2.13.0", - "lzma-rust2 0.13.0", + "lzma-rust2", "memchr", "pbkdf2", "ppmd-rust", From 49747cee4b562bb70d27f665416a182541bebffe Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Fri, 27 Feb 2026 13:09:51 +0100 Subject: [PATCH 03/22] chore: refactor the dump macro, and allow appending zstd suffixes --- Cargo.lock | 2 +- migration/tests/data/m0002010.rs | 15 ++- modules/storage/src/service/fs.rs | 2 + test-context/src/ctx/migration.rs | 155 +++++++++++++++++++++--------- 4 files changed, 124 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6bfa4d967..3b144f8b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -544,7 +544,7 @@ dependencies = [ "filetime", "futures-core", "libc", - "redox_syscall 0.7.1", + "redox_syscall 0.7.2", "tokio", "tokio-stream", ] diff --git a/migration/tests/data/m0002010.rs b/migration/tests/data/m0002010.rs index 96fe5bc82..7d8f96abe 100644 --- a/migration/tests/data/m0002010.rs +++ b/migration/tests/data/m0002010.rs @@ -44,13 +44,18 @@ async fn examples( } // A dump created before merging the SBOM CVSS enhancements -dump!(Ds4( - "https://trustify-dumps-ds4.s3.eu-west-1.amazonaws.com/20251104T095645Z", - storage = "dump.tar.gz", - no_digests, -)); +dump!( + Ds4("https://trustify-dumps-ds4.s3.eu-west-1.amazonaws.com/20251104T095645Z") + .storage_file("dump.tar.gz") + .no_digests() + .strip(5) + .fix_zstd() +); /// Test the performance of applying the data migration of `m0002010`. +/// +/// **NOTE:** If this test runs out of disk space, you can set `TMPDIR` to a directory with more +/// space. #[test_context(TrustifyMigrationContext)] #[test(tokio::test)] async fn performance(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Error> { diff --git a/modules/storage/src/service/fs.rs b/modules/storage/src/service/fs.rs index f5edb9d3a..5eb7a926f 100644 --- a/modules/storage/src/service/fs.rs +++ b/modules/storage/src/service/fs.rs @@ -111,9 +111,11 @@ impl FileSystemBackend { for compression in &self.read_compressions { target.set_extension(compression.extension()); if try_exists(&target).await? { + log::debug!("Located: {}", target.display()); return Ok(Some((target, *compression))); } } + log::info!("Missing - base: {target:?}"); Ok(None) } } diff --git a/test-context/src/ctx/migration.rs b/test-context/src/ctx/migration.rs index 8cbc848f3..7372ada9f 100644 --- a/test-context/src/ctx/migration.rs +++ b/test-context/src/ctx/migration.rs @@ -4,7 +4,8 @@ use crate::{ migration::{Dump, Dumps}, }; use anyhow::Context; -use std::{borrow::Cow, marker::PhantomData, ops::Deref}; +use std::{borrow::Cow, fs::OpenOptions, io::Write, marker::PhantomData, ops::Deref, path::PathBuf}; +use walkdir::WalkDir; use tar::Archive; use test_context::AsyncTestContext; use trustify_common::decompress::decompress_read; @@ -26,52 +27,76 @@ macro_rules! commit { #[macro_export] macro_rules! dump { - ($t:ident($url:literal $(, $($rest:tt)*)? )) => { - $crate::dump!(@parse $t, $url, db = "dump.sql.gz", storage = "dump.tar", digests = true, $($($rest)*)?); - }; - - (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr, db = $new_db:literal, $($rest:tt)*) => { - $crate::dump!(@parse $t, $url, db = $new_db, storage = $storage, digests = $digests, $($rest)*); - }; - (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr, storage = $new_storage:literal, $($rest:tt)*) => { - $crate::dump!(@parse $t, $url, db = $db, storage = $new_storage, digests = $digests, $($rest)*); - }; - (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr, no_digests, $($rest:tt)*) => { - $crate::dump!(@parse $t, $url, db = $db, storage = $storage, digests = false, $($rest)*); - }; - - (@parse $t:ident, $url:literal, db = $db:literal, storage = $storage:literal, digests = $digests:expr,) => { - $crate::dump!(@emit $t, $url, $db, $storage, $digests); - }; - - (@emit $t:ident, $url:literal, $db:literal, $storage:literal, $digests:expr) => { + ($t:ident($url:literal) $($chain:tt)*) => { pub struct $t; impl $crate::ctx::DumpId for $t { fn dump_id() -> $crate::ctx::Source { - $crate::ctx::Source::Dump { - base_url: $url, - db_file: $db, - storage_file: $storage, - digests: $digests, - } + $crate::ctx::Source::Dump( + $crate::ctx::DumpSource::new($url) $($chain)* + ) } } }; } +pub struct DumpSource { + pub base_url: &'static str, + pub db_file: &'static str, + pub storage_file: &'static str, + pub digests: bool, + pub strip: usize, + pub fix_zstd: bool, +} + +impl DumpSource { + pub fn new(base_url: &'static str) -> Self { + Self { + base_url, + db_file: "dump.sql.gz", + storage_file: "dump.tar", + digests: true, + strip: 0, + fix_zstd: false, + } + } + + pub fn db_file(mut self, v: &'static str) -> Self { + self.db_file = v; + self + } + + pub fn storage_file(mut self, v: &'static str) -> Self { + self.storage_file = v; + self + } + + pub fn digests(mut self, v: bool) -> Self { + self.digests = v; + self + } + + pub fn no_digests(self) -> Self { + self.digests(false) + } + + pub fn strip(mut self, v: usize) -> Self { + self.strip = v; + self + } + + /// Appends the zstd EOF marker (`[0x01, 0x00, 0x00]`) to all `.zstd` files in the storage + /// directory after unpacking. Older dump generation did not properly close the zstd stream, + /// leaving the EOF marker unwritten. + pub fn fix_zstd(mut self) -> Self { + self.fix_zstd = true; + self + } +} + pub enum Source { Migration(Option<&'static str>), - Dump { - /// base URL to the dump files - base_url: &'static str, - /// DB file name - db_file: &'static str, - /// storage archive - storage_file: &'static str, - /// if there are digests for the files - digests: bool, - }, + Dump(DumpSource), } pub trait DumpId { @@ -100,7 +125,7 @@ impl Deref for TrustifyMigrationContext { impl TrustifyMigrationContext { pub async fn new() -> anyhow::Result { - let (base, db_file, storage_file) = match ID::dump_id() { + let (base, db_file, storage_file, strip, fix_zstd) = match ID::dump_id() { Source::Migration(migration) => { let id: Cow<'static, str> = match migration { Some(id) => format!("commit-{id}").into(), @@ -108,15 +133,17 @@ impl TrustifyMigrationContext { }; let migration = Migration::new().context("failed to create migration manager")?; let base = migration.provide(&id).await?; - (base, "dump.sql.xz", "dump.tar") + (base, "dump.sql.xz", "dump.tar", 0usize, false) } - Source::Dump { + Source::Dump(DumpSource { base_url, db_file, storage_file, digests, - } => { + strip, + fix_zstd, + }) => { let base = Dumps::new()? .provide(Dump { url: base_url, @@ -125,7 +152,7 @@ impl TrustifyMigrationContext { }) .await?; - (base, db_file, storage_file) + (base, db_file, storage_file, strip, fix_zstd) } }; @@ -141,12 +168,52 @@ impl TrustifyMigrationContext { let source = decompress_read(storage_file).context("failed to open storage dump")?; let mut archive = Archive::new(source); - archive - .unpack(tmp.path()) - .context("failed to unpack storage dump")?; + if strip == 0 { + archive + .unpack(tmp.path()) + .context("failed to unpack storage dump")?; + } else { + for entry in archive + .entries() + .context("failed to read storage archive entries")? + { + let mut entry = entry.context("failed to read storage archive entry")?; + let path = entry + .path() + .context("failed to get entry path")? + .into_owned(); + let stripped: PathBuf = path.components().skip(strip).collect(); + if stripped.as_os_str().is_empty() { + continue; + } + // NOTE: `unpack` (vs `unpack_in`) has no path traversal protection, but + // this is test-only code and the archive content is generated by us and trusted. + entry + .unpack(tmp.path().join(stripped)) + .context("failed to unpack storage archive entry")?; + } + } log::info!("Storage unpacked"); + if fix_zstd { + const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; + for entry in WalkDir::new(tmp.path()) { + let entry = entry.context("failed to walk storage directory")?; + if entry.file_type().is_file() + && entry.path().extension().and_then(|e| e.to_str()) == Some("zstd") + { + let mut file = OpenOptions::new() + .append(true) + .open(entry.path()) + .with_context(|| format!("failed to open zstd file: {}", entry.path().display()))?; + file.write_all(&ZSTD_EOF_BYTES) + .with_context(|| format!("failed to append EOF bytes to: {}", entry.path().display()))?; + } + } + log::info!("Fixed zstd EOF bytes"); + } + // create DB let settings = default_settings().context("unable to create default settings")?; From 3e4cdebf77197a0aca8533646271dc9c3192f96d Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 2 Mar 2026 08:55:23 +0100 Subject: [PATCH 04/22] chore: run cargo fmt --- test-context/src/ctx/migration.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/test-context/src/ctx/migration.rs b/test-context/src/ctx/migration.rs index 7372ada9f..a56099ea8 100644 --- a/test-context/src/ctx/migration.rs +++ b/test-context/src/ctx/migration.rs @@ -4,13 +4,15 @@ use crate::{ migration::{Dump, Dumps}, }; use anyhow::Context; -use std::{borrow::Cow, fs::OpenOptions, io::Write, marker::PhantomData, ops::Deref, path::PathBuf}; -use walkdir::WalkDir; +use std::{ + borrow::Cow, fs::OpenOptions, io::Write, marker::PhantomData, ops::Deref, path::PathBuf, +}; use tar::Archive; use test_context::AsyncTestContext; use trustify_common::decompress::decompress_read; use trustify_db::embedded::{Options, default_settings}; use trustify_module_storage::service::fs::FileSystemBackend; +use walkdir::WalkDir; #[macro_export] macro_rules! commit { @@ -206,9 +208,12 @@ impl TrustifyMigrationContext { let mut file = OpenOptions::new() .append(true) .open(entry.path()) - .with_context(|| format!("failed to open zstd file: {}", entry.path().display()))?; - file.write_all(&ZSTD_EOF_BYTES) - .with_context(|| format!("failed to append EOF bytes to: {}", entry.path().display()))?; + .with_context(|| { + format!("failed to open zstd file: {}", entry.path().display()) + })?; + file.write_all(&ZSTD_EOF_BYTES).with_context(|| { + format!("failed to append EOF bytes to: {}", entry.path().display()) + })?; } } log::info!("Fixed zstd EOF bytes"); From 6a3458e14358e3200838df60b211d9c2d74fb22c Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 2 Mar 2026 09:15:30 +0100 Subject: [PATCH 05/22] test: add a way to deal with long running tests --- docs/design/tests.md | 38 +++++++++++++++++++++ migration/Cargo.toml | 3 ++ migration/tests/data/m0002010.rs | 4 +++ modules/fundamental/Cargo.toml | 3 ++ modules/fundamental/tests/sbom/spdx/perf.rs | 5 ++- 5 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 docs/design/tests.md diff --git a/docs/design/tests.md b/docs/design/tests.md new file mode 100644 index 000000000..eee36db88 --- /dev/null +++ b/docs/design/tests.md @@ -0,0 +1,38 @@ +# Unit and integration tests + +This section features a bit of information about unit and integration tests. Test which you can run using `cargo test`. + +## panic, unwrap, expect, and println + +In most cases, calls that panic are not allowed in this project. However, for tests, that is different. It is ok +to have tests panic. Ideally, there is a proper error message attached to that panic. That is why using `expect` is +preferable over `unwrap`. + +It is also ok to dump information via `println` and others. It might still make more sense to use logging/tracing. + +Also see: [log_tracing.md#println-and-panic](log_tracing.md#println-and-panic) + +## Long-running tests + +Some tests are expected to be long-running. Such tests might not fit into the test run for GitHub CI workflows which tests PRs. + +Those tests may be skipped by default, but should be enabled using the `long_running` feature flag. This +should be done using: + +```rust +#[test] +#[cfg_attr(not(feature = "long_running"), ignore = "enable with: cargo test --features long_running")] +fn long_running_test() { + // runs for several hours +} +``` + +This will keep the tests available, show them as "ignored", and give the user instructions on how to enable them. + +The feature needs to be declared once for each module. If it hasn't been in the module you are using it, +you can do this using: + +```toml +[features] +long_running = [] # enable long-running tests +``` diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 198d182cc..0c1a0c86f 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -10,6 +10,9 @@ rust-version.workspace = true name = "migration" path = "src/lib.rs" +[features] +long_running = [] # enable long-running tests + [dependencies] trustify-common = { workspace = true } trustify-entity = { workspace = true } diff --git a/migration/tests/data/m0002010.rs b/migration/tests/data/m0002010.rs index 7d8f96abe..7f6aa2739 100644 --- a/migration/tests/data/m0002010.rs +++ b/migration/tests/data/m0002010.rs @@ -58,6 +58,10 @@ dump!( /// space. #[test_context(TrustifyMigrationContext)] #[test(tokio::test)] +#[cfg_attr( + not(feature = "long_running"), + ignore = "enable with: cargo test --features long_running" +)] async fn performance(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Error> { MigrationWithData::run_with_test(ctx.storage.clone(), Options::default(), async { MigratorTest::up(&ctx.db, None).await diff --git a/modules/fundamental/Cargo.toml b/modules/fundamental/Cargo.toml index 973ffcf4b..fa1b3e223 100644 --- a/modules/fundamental/Cargo.toml +++ b/modules/fundamental/Cargo.toml @@ -6,6 +6,9 @@ publish.workspace = true license.workspace = true rust-version.workspace = true +[features] +long_running = [] # enable long-running tests + [dependencies] trustify-auth = { workspace = true } trustify-common = { workspace = true} diff --git a/modules/fundamental/tests/sbom/spdx/perf.rs b/modules/fundamental/tests/sbom/spdx/perf.rs index a1ae60505..6a7b0d35d 100644 --- a/modules/fundamental/tests/sbom/spdx/perf.rs +++ b/modules/fundamental/tests/sbom/spdx/perf.rs @@ -68,8 +68,11 @@ async fn ingest_spdx_medium(ctx: &TrustifyContext) -> Result<(), anyhow::Error> // ignore because it's a slow slow slow test. #[test_context(TrustifyContext)] -#[ignore] #[test(tokio::test)] +#[cfg_attr( + not(feature = "long_running"), + ignore = "enable with: cargo test --features long_running" +)] async fn ingest_spdx_large(ctx: &TrustifyContext) -> Result<(), anyhow::Error> { test_with_spdx( ctx, From 39127d2be944ab3847a40a11a404e11a23f31aee Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 2 Mar 2026 09:59:43 +0100 Subject: [PATCH 06/22] chore: allow other uncompressed files too The logic treated .sql special, as being uncompressed. However, there are more extensions (.tar) which indicate an uncompressed file. We can't distinguish that properly. So whatever we understand, we process. But everything else, we take as plain data, and let the next step fail. --- common/src/decompress.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/common/src/decompress.rs b/common/src/decompress.rs index 84809a70d..194d2dd56 100644 --- a/common/src/decompress.rs +++ b/common/src/decompress.rs @@ -109,10 +109,11 @@ pub async fn decompress_async_read( let source = BufReader::new(source); Ok(match path.extension().and_then(|ext| ext.to_str()) { - None | Some("sql") => Box::pin(source), Some("xz") => Box::pin(async_compression::tokio::bufread::LzmaDecoder::new(source)), Some("gz") => Box::pin(async_compression::tokio::bufread::GzipDecoder::new(source)), - Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), + // Anything else could be .sql, .tar, or an unsupported compression format. + // In that case, the following code would fail to understand the compressed content. + None | Some(_) => Box::pin(source), }) } @@ -123,10 +124,11 @@ pub fn decompress_read(path: impl AsRef) -> anyhow::Result Box::new(source), Some("xz") => Box::new(lzma_rust2::XzReader::new(source, false)), Some("gz") => Box::new(flate2::read::GzDecoder::new(source)), - Some(ext) => anyhow::bail!("Unsupported file type ({ext})"), + // Anything else could be .sql, .tar, or an unsupported compression format. + // In that case, the following code would fail to understand the compressed content. + None | Some(_) => Box::new(source), }) } From 52a809994e801bc2ca25c6cdb0344b77ca0b8187 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 2 Mar 2026 15:39:09 +0100 Subject: [PATCH 07/22] ci: run "all features" for codecov too --- .github/workflows/codecov.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/codecov.yaml b/.github/workflows/codecov.yaml index 0ad88eefa..a766d506f 100644 --- a/.github/workflows/codecov.yaml +++ b/.github/workflows/codecov.yaml @@ -70,7 +70,7 @@ jobs: - name: Test # use only one job, trying to reduce memory usage - run: cargo llvm-cov --codecov --jobs 1 --features _test-s3 --output-path codecov.json + run: cargo llvm-cov --codecov --jobs 1 --all-features --output-path codecov.json env: GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" # for embedded postgresql From 8dd21b739bde81157a9fb16f42f4939c34ddf281 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 3 Mar 2026 10:28:46 +0100 Subject: [PATCH 08/22] chore: tweak output a bit for testing migrations --- Cargo.lock | 2 +- common/db/src/embedded.rs | 2 +- migration/src/data/mod.rs | 2 +- test-context/src/ctx/migration.rs | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b144f8b3..8ed89d6f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -544,7 +544,7 @@ dependencies = [ "filetime", "futures-core", "libc", - "redox_syscall 0.7.2", + "redox_syscall 0.7.3", "tokio", "tokio-stream", ] diff --git a/common/db/src/embedded.rs b/common/db/src/embedded.rs index 8a370987f..3a48ad7f2 100644 --- a/common/db/src/embedded.rs +++ b/common/db/src/embedded.rs @@ -86,7 +86,7 @@ pub async fn create_for( .await .context("Bootstrapping the test database")?, Source::Import(path) => { - log::info!("Importing from: {}", path.display()); + log::info!("Importing database from: {}", path.display()); let source = decompress_async_read(path).await?; diff --git a/migration/src/data/mod.rs b/migration/src/data/mod.rs index 33be9616b..97ef9de2b 100644 --- a/migration/src/data/mod.rs +++ b/migration/src/data/mod.rs @@ -187,7 +187,7 @@ impl<'c> DocumentProcessor for SchemaManager<'c> { let pb = Arc::new(ProgressBar::new(count as u64)); pb.set_style( ProgressStyle::with_template( - "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})", + "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} ({per_sec}) ({eta})", ) .map_err(|err| DbErr::Migration(err.to_string()))? .progress_chars("##-"), diff --git a/test-context/src/ctx/migration.rs b/test-context/src/ctx/migration.rs index a56099ea8..ab2e877af 100644 --- a/test-context/src/ctx/migration.rs +++ b/test-context/src/ctx/migration.rs @@ -159,7 +159,7 @@ impl TrustifyMigrationContext { }; let storage_file = base.join(storage_file); - log::info!("Importing dump: {}", storage_file.display()); + log::info!("Importing storage dump: {}", storage_file.display()); // create storage @@ -216,6 +216,7 @@ impl TrustifyMigrationContext { })?; } } + log::info!("Fixed zstd EOF bytes"); } From ed99c42ee34794458832c9bd790be6fecad35993 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 3 Mar 2026 13:25:22 +0100 Subject: [PATCH 09/22] chore: try running with more concurrent workers --- migration/tests/data/m0002010.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/migration/tests/data/m0002010.rs b/migration/tests/data/m0002010.rs index 7f6aa2739..bf4c5e125 100644 --- a/migration/tests/data/m0002010.rs +++ b/migration/tests/data/m0002010.rs @@ -1,7 +1,10 @@ use crate::MigratorTest; -use migration::Migrator; -use migration::data::{Database, Direction, MigrationWithData, Options, Runner}; +use migration::{ + Migrator, + data::{Database, Direction, MigrationWithData, Options, Runner}, +}; use sea_orm_migration::MigratorTrait; +use std::num::NonZeroUsize; use test_context::test_context; use test_log::test; use trustify_test_context::{TrustifyMigrationContext, commit, dump}; @@ -63,9 +66,14 @@ dump!( ignore = "enable with: cargo test --features long_running" )] async fn performance(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Error> { - MigrationWithData::run_with_test(ctx.storage.clone(), Options::default(), async { - MigratorTest::up(&ctx.db, None).await - }) + MigrationWithData::run_with_test( + ctx.storage.clone(), + Options { + concurrent: NonZeroUsize::new(32).unwrap(), + ..Options::default() + }, + async { MigratorTest::up(&ctx.db, None).await }, + ) .await?; // done From 6f2f3f1e3bf6979bc926910973670f530020bd9b Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Wed, 4 Mar 2026 15:59:46 +0100 Subject: [PATCH 10/22] test: leverage btrfs to speed up database test setups --- Cargo.lock | 38 +++ Cargo.toml | 1 + common/db/src/embedded.rs | 9 +- common/src/config.rs | 11 + docs/env-vars.md | 1 + docs/test-snapshots.md | 32 ++ modules/fundamental/benches/common.rs | 8 +- modules/storage/src/service/fs.rs | 8 +- server/src/profile/api.rs | 2 +- test-context/Cargo.toml | 5 + test-context/src/ctx/default.rs | 25 +- .../ctx/{migration.rs => migration/mod.rs} | 148 +++------ .../src/ctx/migration/snapshot/btrfs.rs | 314 ++++++++++++++++++ .../src/ctx/migration/snapshot/mod.rs | 206 ++++++++++++ test-context/src/lib.rs | 24 +- test-context/src/resource.rs | 96 ++++++ 16 files changed, 798 insertions(+), 130 deletions(-) create mode 100644 docs/test-snapshots.md rename test-context/src/ctx/{migration.rs => migration/mod.rs} (51%) create mode 100644 test-context/src/ctx/migration/snapshot/btrfs.rs create mode 100644 test-context/src/ctx/migration/snapshot/mod.rs create mode 100644 test-context/src/resource.rs diff --git a/Cargo.lock b/Cargo.lock index 8ed89d6f9..7d3294430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2568,6 +2568,12 @@ dependencies = [ "log", ] +[[package]] +name = "env_home" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" + [[package]] name = "env_logger" version = "0.11.9" @@ -4382,6 +4388,18 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "nix" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -8705,6 +8723,7 @@ dependencies = [ "hex", "indicatif", "log", + "nix", "osv", "peak_alloc", "postgresql_embedded", @@ -8732,7 +8751,9 @@ dependencies = [ "trustify-module-storage", "urlencoding", "utoipa-actix-web", + "uuid", "walkdir", + "which", "zip 8.1.0", ] @@ -9344,6 +9365,17 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fabb953106c3c8eea8306e4393700d7657561cb43122571b172bbfb7c7ba1d" +dependencies = [ + "env_home", + "rustix", + "winsafe", +] + [[package]] name = "whoami" version = "1.6.1" @@ -9821,6 +9853,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winsafe" +version = "0.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" + [[package]] name = "wiremock" version = "0.6.5" diff --git a/Cargo.toml b/Cargo.toml index 06ea3dfe8..afeab4b5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,6 +161,7 @@ utoipa-swagger-ui = "9.0.0" uuid = "1.7.0" walkdir = "2.5" walker-common = "0.15.0" +which = "8" wiremock = "0.6" zip = "8" diff --git a/common/db/src/embedded.rs b/common/db/src/embedded.rs index 3a48ad7f2..5b3525832 100644 --- a/common/db/src/embedded.rs +++ b/common/db/src/embedded.rs @@ -72,14 +72,7 @@ pub async fn create_for( .instrument(info_span!("start database")) .await?; - let config = crate::config::Database { - username: "postgres".into(), - password: "trustify".into(), - host: "localhost".into(), - name: "test".into(), - port: postgresql.settings().port, - ..crate::config::Database::from_env()? - }; + let config = crate::config::Database::from_port(postgresql.settings().port)?; let db = match options.source { Source::Bootstrap => super::Database::bootstrap(&config) diff --git a/common/src/config.rs b/common/src/config.rs index 7d1bc9dcf..97d134f9c 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -151,6 +151,17 @@ impl Database { sslmode = &self.sslmode, ) } + + pub fn from_port(port: u16) -> anyhow::Result { + Ok(Self { + username: "postgres".into(), + password: "trustify".into(), + host: "localhost".into(), + name: "test".into(), + port, + ..Self::from_env()? + }) + } } #[cfg(test)] diff --git a/docs/env-vars.md b/docs/env-vars.md index b50aeb244..2d761e20c 100644 --- a/docs/env-vars.md +++ b/docs/env-vars.md @@ -61,3 +61,4 @@ | `EXTERNAL_TEST_DB` | Run tests against external test database if set | | | `EXTERNAL_TEST_DB_BOOTSTRAP` | Run tests against external test database if set | | | `MEM_LIMIT_MB` | Set memory limit for tests that use TrustifyContext, shows the memory usage when the test reaches the limit | `500 MiB` | +| `TRUST_TEST_BTRFS_STORE` | Path to a BTRFS-backed directory for using snapshots for tests | User's home | diff --git a/docs/test-snapshots.md b/docs/test-snapshots.md new file mode 100644 index 000000000..aec830294 --- /dev/null +++ b/docs/test-snapshots.md @@ -0,0 +1,32 @@ +# Snapshots for tests + +Some tests require not only a database, but also an imported dataset. Depending on the dataset, the import can run +for several minutes, even hours, before the actual test runs. In order to speed this up, it is possible to leverage +BTRFS' subvolumes and snapshots. + +The idea is to run the import once, but then, before running tests, create a snapshot of it. Tests then run against +a freshly instantiated snapshot from that import. This takes seconds rather than minutes or hours. + +## Requirements + +* Run on Linux, with BTRFS available +* Have the `btrfs` command line tool installed +* Have a BTRFS volume mounted, with the following options: `defaults,user,exec,user_subvol_rm_allowed` +* Set `TRUST_TEST_BTRFS_STORE` to a directory which is on such a volume, otherwise the current working directory is used, which must be on a BTRFS volume with those options + +## Maintenance + +It may happen that, at the end of a run, subvolumes don't get cleaned up. You can check using the following command: + +```bash +sudo btrfs subvolume list . +``` + +Subvolumes in `templates` are expected to be kept, while the ones in `running` and `prepare` are expected to be +short-lived and removed after a test was run. + +## Alternatives + +If the requirements are not met (non-Linux platform, missing `btrfs` tool, or the store path is not on a BTRFS +filesystem), tests will automatically fall back to creating a temporary directory per test and running import +operations every time. This is slower, but does run tests. diff --git a/modules/fundamental/benches/common.rs b/modules/fundamental/benches/common.rs index d5f4a0563..1fb2cbbe0 100644 --- a/modules/fundamental/benches/common.rs +++ b/modules/fundamental/benches/common.rs @@ -12,7 +12,7 @@ use csaf::product_tree::ProductTree; use csaf::vulnerability::Vulnerability; use packageurl::PackageUrl; use std::io::Error; -use std::sync::Arc; +use std::rc::Rc; use uuid::Uuid; use std::str::FromStr; @@ -20,13 +20,13 @@ use std::str::FromStr; use csaf::definitions::{BranchesT, ProductIdentificationHelper}; use sea_orm::ConnectionTrait; -pub fn setup_runtime_and_ctx() -> (Runtime, Arc) { +pub fn setup_runtime_and_ctx() -> (Runtime, Rc) { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); let ctx = runtime.block_on(async { TrustifyContext::setup().await }); - (runtime, Arc::new(ctx)) + (runtime, Rc::new(ctx)) } pub async fn document_generated_from(path: &str, rev: u64) -> Result { @@ -107,7 +107,7 @@ pub async fn document_generated_from(path: &str, rev: u64) -> Result) { +pub async fn reset_db(ctx: &Rc) { // reset DB tables to a clean state... for table in [ "advisory", diff --git a/modules/storage/src/service/fs.rs b/modules/storage/src/service/fs.rs index 5eb7a926f..c9bda3b7e 100644 --- a/modules/storage/src/service/fs.rs +++ b/modules/storage/src/service/fs.rs @@ -88,12 +88,16 @@ impl FileSystemBackend { /// Create a new storage for testing pub async fn for_test() -> anyhow::Result<(Self, TempDir)> { let dir = tempdir()?; - - Self::new(dir.path(), Compression::None) + Self::for_test_in(dir.path()) .await .map(|result| (result, dir)) } + /// Create a new storage for testing, with an existing directory + pub async fn for_test_in(dir: impl Into) -> anyhow::Result { + Self::new(dir, Compression::None).await + } + /// Create a new storage for testing pub async fn for_test_with(compression: Compression) -> anyhow::Result<(Self, TempDir)> { let dir = tempdir()?; diff --git a/server/src/profile/api.rs b/server/src/profile/api.rs index c0b856572..d70d32fa1 100644 --- a/server/src/profile/api.rs +++ b/server/src/profile/api.rs @@ -447,7 +447,7 @@ mod test { "--db-name", "test", "--db-port", - &ctx.postgresql.as_ref().expect("database").settings().port.to_string(), + &ctx.port.to_string(), ], ))?; InitData::new(context, run).await.map(|_| ()) diff --git a/test-context/Cargo.toml b/test-context/Cargo.toml index d34afb3f4..af4fd4f41 100644 --- a/test-context/Cargo.toml +++ b/test-context/Cargo.toml @@ -43,7 +43,12 @@ tracing-subscriber = { workspace = true } urlencoding = { workspace = true } utoipa-actix-web = { workspace = true } walkdir = { workspace = true } +which = { workspace = true } zip = { workspace = true } +uuid = { version = "1.21.0", features = ["v4"] } + +[target.'cfg(target_os = "linux")'.dependencies] +nix = { version = "0.31.2", features = ["fs"] } [dev-dependencies] hex = { workspace = true } diff --git a/test-context/src/ctx/default.rs b/test-context/src/ctx/default.rs index b8130217b..51958506e 100644 --- a/test-context/src/ctx/default.rs +++ b/test-context/src/ctx/default.rs @@ -1,7 +1,8 @@ -use crate::TrustifyTestContext; -use postgresql_embedded::PostgreSQL; +use crate::{ + TrustifyTestContext, + resource::{ResourceStack, TestResourceExt, defer}, +}; use std::{env, ops::Deref}; -use tempfile::TempDir; use test_context::AsyncTestContext; use tracing::instrument; use trustify_common::{config, db}; @@ -12,11 +13,11 @@ pub struct TrustifyContext(pub(crate) TrustifyTestContext); impl TrustifyContext { pub async fn new( db: db::Database, + port: u16, storage: FileSystemBackend, - tmp: TempDir, - postgresql: impl Into>, + resources: impl Into, ) -> Self { - Self(TrustifyTestContext::new(db, storage, tmp, postgresql).await) + Self(TrustifyTestContext::new(db, port, storage, resources)) } } @@ -56,17 +57,23 @@ impl AsyncTestContext for TrustifyContext { } .expect("Configuring the database"); - return TrustifyContext::new(db, storage, tmp, None).await; + return TrustifyContext::new(db, config.port, storage, defer(tmp)).await; } let (db, postgresql) = trustify_db::embedded::create() .await .expect("Create an embedded database"); - TrustifyContext::new(db, storage, tmp, postgresql).await + TrustifyContext::new( + db, + postgresql.settings().port, + storage, + defer(tmp).then(defer(postgresql)), + ) + .await } async fn teardown(self) { - self.0.teardown(); + self.0.teardown().await; } } diff --git a/test-context/src/ctx/migration.rs b/test-context/src/ctx/migration/mod.rs similarity index 51% rename from test-context/src/ctx/migration.rs rename to test-context/src/ctx/migration/mod.rs index ab2e877af..eb76ce27f 100644 --- a/test-context/src/ctx/migration.rs +++ b/test-context/src/ctx/migration/mod.rs @@ -1,18 +1,14 @@ +mod snapshot; + use crate::{ TrustifyTestContext, - migration::Migration, - migration::{Dump, Dumps}, + ctx::migration::snapshot::Snapshot, + migration::{Dump, Dumps, Migration}, }; use anyhow::Context; -use std::{ - borrow::Cow, fs::OpenOptions, io::Write, marker::PhantomData, ops::Deref, path::PathBuf, -}; -use tar::Archive; +use std::{borrow::Cow, marker::PhantomData, ops::Deref}; use test_context::AsyncTestContext; -use trustify_common::decompress::decompress_read; -use trustify_db::embedded::{Options, default_settings}; -use trustify_module_storage::service::fs::FileSystemBackend; -use walkdir::WalkDir; +use uuid::Uuid; #[macro_export] macro_rules! commit { @@ -42,6 +38,7 @@ macro_rules! dump { }; } +#[derive(Debug)] pub struct DumpSource { pub base_url: &'static str, pub db_file: &'static str, @@ -96,11 +93,29 @@ impl DumpSource { } } +#[derive(Debug)] pub enum Source { Migration(Option<&'static str>), Dump(DumpSource), } +impl Source { + /// generate a reproducible, unique ID for a source + /// + /// We do this by generating a debug string, which contains all the necessary information, and + /// then creating a v5 UUID, which is basically a SHA-1 digest. + pub fn id(&self) -> String { + const NAMESPACE: Uuid = Uuid::from_bytes([ + 0x2c, 0x84, 0x27, 0x45, 0xb6, 0xc8, 0x4a, 0xf7, 0x9a, 0xdb, 0x28, 0x76, 0x8b, 0x45, + 0x6e, 0x95, + ]); + + let debug_str = format!("{self:?}"); + + Uuid::new_v5(&NAMESPACE, debug_str.as_bytes()).to_string() + } +} + pub trait DumpId { fn dump_id() -> Source; } @@ -127,7 +142,10 @@ impl Deref for TrustifyMigrationContext { impl TrustifyMigrationContext { pub async fn new() -> anyhow::Result { - let (base, db_file, storage_file, strip, fix_zstd) = match ID::dump_id() { + let source = ID::dump_id(); + let source_id = source.id(); + + let snapshot = match source { Source::Migration(migration) => { let id: Cow<'static, str> = match migration { Some(id) => format!("commit-{id}").into(), @@ -135,7 +153,14 @@ impl TrustifyMigrationContext { }; let migration = Migration::new().context("failed to create migration manager")?; let base = migration.provide(&id).await?; - (base, "dump.sql.xz", "dump.tar", 0usize, false) + Snapshot { + id: source_id, + base, + db_file: "dump.sql.xz".to_string(), + storage_file: "dump.tar".to_string(), + strip: 0, + fix_zstd: false, + } } Source::Dump(DumpSource { @@ -153,92 +178,18 @@ impl TrustifyMigrationContext { digests, }) .await?; - - (base, db_file, storage_file, strip, fix_zstd) - } - }; - - let storage_file = base.join(storage_file); - log::info!("Importing storage dump: {}", storage_file.display()); - - // create storage - - let (storage, tmp) = FileSystemBackend::for_test() - .await - .expect("Unable to create storage backend"); - - let source = decompress_read(storage_file).context("failed to open storage dump")?; - - let mut archive = Archive::new(source); - if strip == 0 { - archive - .unpack(tmp.path()) - .context("failed to unpack storage dump")?; - } else { - for entry in archive - .entries() - .context("failed to read storage archive entries")? - { - let mut entry = entry.context("failed to read storage archive entry")?; - let path = entry - .path() - .context("failed to get entry path")? - .into_owned(); - let stripped: PathBuf = path.components().skip(strip).collect(); - if stripped.as_os_str().is_empty() { - continue; - } - // NOTE: `unpack` (vs `unpack_in`) has no path traversal protection, but - // this is test-only code and the archive content is generated by us and trusted. - entry - .unpack(tmp.path().join(stripped)) - .context("failed to unpack storage archive entry")?; - } - } - - log::info!("Storage unpacked"); - - if fix_zstd { - const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; - for entry in WalkDir::new(tmp.path()) { - let entry = entry.context("failed to walk storage directory")?; - if entry.file_type().is_file() - && entry.path().extension().and_then(|e| e.to_str()) == Some("zstd") - { - let mut file = OpenOptions::new() - .append(true) - .open(entry.path()) - .with_context(|| { - format!("failed to open zstd file: {}", entry.path().display()) - })?; - file.write_all(&ZSTD_EOF_BYTES).with_context(|| { - format!("failed to append EOF bytes to: {}", entry.path().display()) - })?; + Snapshot { + id: source_id, + base, + db_file: db_file.to_string(), + storage_file: storage_file.to_string(), + strip, + fix_zstd, } } + }; - log::info!("Fixed zstd EOF bytes"); - } - - // create DB - - let settings = default_settings().context("unable to create default settings")?; - - let (db, postgresql) = trustify_db::embedded::create_for( - settings, - Options { - source: trustify_db::embedded::Source::Import(base.join(db_file)), - }, - ) - .await - .context("failed to create an embedded database")?; - - log::info!("Database imported"); - - Ok(Self( - TrustifyTestContext::new(db, storage, tmp, postgresql).await, - Default::default(), - )) + Ok(Self(snapshot.materialize().await?, Default::default())) } } @@ -248,4 +199,11 @@ impl AsyncTestContext for TrustifyMigrationContext { .await .expect("failed to create migration context") } + + #[allow(clippy::manual_async_fn)] + fn teardown(self) -> impl Future { + async { + self.0.teardown().await; + } + } } diff --git a/test-context/src/ctx/migration/snapshot/btrfs.rs b/test-context/src/ctx/migration/snapshot/btrfs.rs new file mode 100644 index 000000000..881699f35 --- /dev/null +++ b/test-context/src/ctx/migration/snapshot/btrfs.rs @@ -0,0 +1,314 @@ +use crate::resource::TestResource; +use anyhow::{Context, anyhow, bail}; +use futures::future::BoxFuture; +use postgresql_embedded::{PostgreSQL, Settings}; +use std::{ + ffi::OsStr, + fmt::Debug, + io, + path::{Path, PathBuf}, +}; +use tempfile::TempDir; +use tokio::fs; +use trustify_common::{config, db::Database}; +use trustify_module_storage::service::fs::FileSystemBackend; + +#[derive(Clone, Debug)] +struct Command { + pub btrfs: PathBuf, + pub store: PathBuf, +} + +#[cfg(not(target_os = "linux"))] +impl Command { + pub fn new() -> anyhow::Result { + bail!("btrfs is only supported on Linux"); + } +} + +#[cfg(target_os = "linux")] +impl Command { + pub fn is_btrfs(path: impl AsRef) -> io::Result { + const BTRFS_SUPER_MAGIC: nix::libc::c_long = 0x9123683E; + + let stat = nix::sys::statfs::statfs(path.as_ref()).map_err(io::Error::from)?; + + Ok(stat.filesystem_type().0 == BTRFS_SUPER_MAGIC) + } + + pub fn new() -> anyhow::Result { + let btrfs = which::which("btrfs").context( + r#"unable to locate btrfs: + +You can install `btrfs`: + * On Fedora using: sudo dnf install btrfs-progs + +"#, + )?; + + let store = match std::env::var_os("TRUST_TEST_BTRFS_STORE") { + Some(store) => Some(PathBuf::from(store)), + None => std::env::current_dir().ok(), + } + .ok_or_else(|| anyhow!("unable to locate btrfs store"))?; + + if !Self::is_btrfs(store.as_path())? { + bail!( + r#"btrfs store ({}) is not on a btrfs volume. + +You can set `TRUST_TEST_BTRFS_STORE` to a directory on a BTRFS volume mounted with: defaults,user,exec,user_subvol_rm_allowed +"#, + store.display() + ); + } + + Ok(Self { btrfs, store }) + } + + async fn execute( + &self, + args: impl IntoIterator + Debug>, + ) -> anyhow::Result<()> { + let args = args.into_iter().collect::>(); + log::info!("{} {args:?}", self.btrfs.display()); + + let mut command = tokio::process::Command::new(&self.btrfs); + + command.args(args); + + let status = command.status().await?; + + if !status.success() { + bail!("btrfs exited with status {}", status); + } + + Ok(()) + } +} + +/// A running content instance +#[derive(Debug)] +pub enum Running { + /// Plain simple temp dir + Temporary(TempDir), + /// Using an existing, ready to use, snapshot + Existing(BtrfsSnapshot), + /// A preparation step to create a snapshot + Collecting(Collect), +} + +impl Running { + pub async fn new(id: impl Into) -> anyhow::Result { + let btrfs = match Command::new() { + Ok(btrfs) => btrfs, + Err(err) => { + log::warn!("failed to detect btrfs support: {}", err); + return Ok(Running::Temporary(TempDir::new()?)); + } + }; + + let id = id.into(); + + // detect existing + + let template = btrfs.store.join("templates").join(&id); + if template.is_dir() { + return Ok(Running::Existing(BtrfsSnapshot { btrfs, id })); + } + + // return new collecting + + Ok(Running::Collecting(Collect::new(btrfs, id).await?)) + } +} + +/// A mounted snapshot, ready to use +#[derive(Debug)] +pub struct BtrfsSnapshot { + btrfs: Command, + id: String, +} + +impl BtrfsSnapshot { + pub async fn start(&self) -> anyhow::Result { + // mount + + let snapshot = self.btrfs.store.join("templates").join(&self.id); + + let running = self.btrfs.store.join("running"); + fs::create_dir_all(&running).await?; + let running = running.join(uuid::Uuid::new_v4().to_string()); + + // create volume (running) from snapshot + + self.btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("snapshot"), + snapshot.as_os_str(), + running.as_os_str(), + ]) + .await?; + + // create instances + + let storage = FileSystemBackend::for_test_in(running.join("storage")).await?; + + let db_base = running.join("db"); + let settings = Settings { + data_dir: db_base.join("data"), + temporary: false, + ..Default::default() + }; + let mut psql = PostgreSQL::new(settings); + psql.setup().await?; + psql.start().await?; + + let db = config::Database::from_port(psql.settings().port)?; + + // done + + Ok(BtrfsStarted { + btrfs: self.btrfs.clone(), + storage, + db: Database::new(&db).await?, + psql, + path: running, + }) + } +} + +pub struct BtrfsStarted { + storage: FileSystemBackend, + psql: PostgreSQL, + db: Database, + path: PathBuf, + btrfs: Command, +} + +impl BtrfsStarted { + pub fn storage(&self) -> &FileSystemBackend { + &self.storage + } + + pub fn db(&self) -> &Database { + &self.db + } + + pub fn settings(&self) -> &Settings { + self.psql.settings() + } + + pub async fn stop(self) -> anyhow::Result<()> { + // stop the database + + let _ = self.psql.stop().await; + + // delete the subvolume + + self.btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("delete"), + self.path.as_os_str(), + ]) + .await?; + + // done + + Ok(()) + } +} + +impl TestResource for BtrfsStarted { + fn drop(self: Box) -> BoxFuture<'static, ()> { + Box::pin(async move { + let _ = self.stop().await; + }) + } +} + +#[derive(Debug)] +pub struct Collect { + // directory to prepare in + path: PathBuf, + btrfs: Command, + id: String, +} + +impl Collect { + async fn new(btrfs: Command, id: String) -> anyhow::Result { + let path = btrfs.store.join("prepare").join(&id); + + if path.exists() { + log::info!( + "Deleting existing preparation directory: {}", + path.display() + ); + + btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("delete"), + path.as_os_str(), + ]) + .await?; + } + + btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("create"), + path.as_os_str(), + ]) + .await?; + + Ok(Self { path, btrfs, id }) + } + + pub async fn create(self, psql: PostgreSQL) -> anyhow::Result { + log::info!("Collecting snapshot"); + + // stop the instance to allow creating a consistent snapshot + + psql.stop().await?; + + // take the snapshot + + let target = self.btrfs.store.join("templates"); + fs::create_dir_all(&target).await?; + let target = target.join(&self.id); + + self.btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("snapshot"), + self.path.as_os_str(), + target.as_os_str(), + ]) + .await?; + + // now delete the prepared volume + + self.btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("delete"), + self.path.as_os_str(), + ]) + .await?; + + // return the result + + Ok(BtrfsSnapshot { + btrfs: self.btrfs, + id: self.id, + }) + } +} + +impl AsRef for Collect { + fn as_ref(&self) -> &Path { + &self.path + } +} diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs new file mode 100644 index 000000000..bfc86df08 --- /dev/null +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -0,0 +1,206 @@ +#[cfg(target_os = "linux")] +pub mod btrfs; + +use crate::{TrustifyTestContext, resource::TestResourceExt, resource::defer}; +use anyhow::Context; +use postgresql_embedded::{PostgreSQL, Settings}; +use std::{ + fs::OpenOptions, + io::Write, + path::{Path, PathBuf}, + time::Duration, +}; +use tar::Archive; +use trustify_common::{db::Database, decompress::decompress_read}; +use trustify_db::embedded::{Options, default_settings}; +use trustify_module_storage::service::fs::FileSystemBackend; +use walkdir::WalkDir; + +pub struct Snapshot { + pub id: String, + pub base: PathBuf, + pub db_file: String, + pub storage_file: String, + pub strip: usize, + pub fix_zstd: bool, +} + +impl Snapshot { + #[cfg(not(target_os = "linux"))] + pub async fn materialize(self) -> anyhow::Result { + let tmp = tempfile::TempDir::new()?; + let (db, storage, psql) = self.setup(&tmp).await?; + + Ok(TrustifyTestContext::new( + db, + storage, + defer(psql).then(defer(tmp)), + )) + } + + /// Ensure that a snapshot, with data, is available + /// + /// Either by running with a fresh import in a temporary directory. Or, if available, with + /// a BTRFS snapshot of an import. If such a snapshot doesn't exist yet, it will be created + /// first. + #[cfg(target_os = "linux")] + pub async fn materialize(self) -> anyhow::Result { + let running = btrfs::Running::new(&self.id).await?; + + log::info!("Snapshot state: {running:?}"); + + Ok(match running { + // we are running with a normal, temporary directory + btrfs::Running::Temporary(tmp) => { + // set up the content in the target directory + let (db, storage, psql) = self.setup(&tmp).await?; + + TrustifyTestContext::new( + db, + psql.settings().port, + storage, + defer(psql).then(defer(tmp)), + ) + } + // We are running with an existing snapshot, just enable it + btrfs::Running::Existing(snapshot) => { + // activate the snapshot, starts the database + let started = snapshot.start().await?; + + TrustifyTestContext::new( + started.db().clone(), + started.settings().port, + started.storage().clone(), + started, + ) + } + // We need to create the snapshot first, then run it + btrfs::Running::Collecting(collect) => { + // set up the content in preparation directory + let (_, _, psql) = self.setup(&collect).await?; + + // create the snapshot + let snapshot = collect.create(psql).await?; + // and activate it + let started = snapshot.start().await?; + + TrustifyTestContext::new( + started.db().clone(), + started.settings().port, + started.storage().clone(), + started, + ) + } + }) + } + + /// This performs the actual DB and storage preparation + async fn setup( + self, + tmp: impl AsRef, + ) -> anyhow::Result<(Database, FileSystemBackend, PostgreSQL)> { + let tmp = tmp.as_ref(); + log::info!("Setting up context in: {}", tmp.display()); + + // create content + + let Self { + id: _, + base, + db_file, + storage_file, + strip, + fix_zstd, + } = self; + + let storage_file = base.join(storage_file); + log::info!("Importing storage dump: {}", storage_file.display()); + + // create storage + + let tmp_storage = tmp.join("storage"); + let storage = FileSystemBackend::for_test_in(&tmp_storage) + .await + .expect("Unable to create storage backend"); + + let source = decompress_read(storage_file).context("failed to open storage dump")?; + + let mut archive = Archive::new(source); + if strip == 0 { + archive + .unpack(&tmp_storage) + .context("failed to unpack storage dump")?; + } else { + for entry in archive + .entries() + .context("failed to read storage archive entries")? + { + let mut entry = entry.context("failed to read storage archive entry")?; + let path = entry + .path() + .context("failed to get entry path")? + .into_owned(); + let stripped: PathBuf = path.components().skip(strip).collect(); + if stripped.as_os_str().is_empty() { + continue; + } + // NOTE: `unpack` (vs `unpack_in`) has no path traversal protection, but + // this is test-only code and the archive content is generated by us and trusted. + entry + .unpack(tmp_storage.join(stripped)) + .context("failed to unpack storage archive entry")?; + } + } + + log::info!("Storage unpacked"); + + if fix_zstd { + log::info!("Fixing zstd EOF markers"); + + const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; + for entry in WalkDir::new(tmp_storage) { + let entry = entry.context("failed to walk storage directory")?; + if entry.file_type().is_file() + && entry.path().extension().and_then(|e| e.to_str()) == Some("zstd") + { + let mut file = OpenOptions::new() + .append(true) + .open(entry.path()) + .with_context(|| { + format!("failed to open zstd file: {}", entry.path().display()) + })?; + file.write_all(&ZSTD_EOF_BYTES).with_context(|| { + format!("failed to append EOF bytes to: {}", entry.path().display()) + })?; + } + } + + log::info!("Fixed zstd EOF bytes"); + } + + // create DB + + let db_tmp = tmp.join("db"); + log::info!("Creating DB instance: {}", db_tmp.display()); + + let settings = Settings { + data_dir: db_tmp.join("data"), + installation_dir: db_tmp.join("instance"), + timeout: Some(Duration::from_secs(30)), + ..default_settings().context("unable to create default settings")? + }; + + let (db, postgresql) = trustify_db::embedded::create_for( + settings, + Options { + source: trustify_db::embedded::Source::Import(base.join(db_file)), + }, + ) + .await + .context("failed to create an embedded database")?; + + log::info!("Database imported"); + + Ok((db, storage, postgresql)) + } +} diff --git a/test-context/src/lib.rs b/test-context/src/lib.rs index 2d3029dcc..60940b00e 100644 --- a/test-context/src/lib.rs +++ b/test-context/src/lib.rs @@ -8,18 +8,19 @@ pub mod ctx; pub mod flame; pub mod migration; pub mod q; +mod resource; pub mod spdx; pub mod subset; pub use ctx::{ReadOnly, TrustifyContext, TrustifyMigrationContext}; +use crate::resource::ResourceStack; use ::migration::{ ConnectionTrait, DbErr, sea_orm::{RuntimeErr, Statement, prelude::Uuid, sqlx}, }; use futures::Stream; use peak_alloc::PeakAlloc; -use postgresql_embedded::PostgreSQL; use serde::Serialize; use std::{ env, @@ -27,7 +28,6 @@ use std::{ io::{Cursor, Read, Seek, Write}, path::{Path, PathBuf}, }; -use tempfile::TempDir; use tokio_util::{bytes::Bytes, io::ReaderStream}; use trustify_common::{db::Database, decompress::decompress_async, hashing::Digests}; use trustify_entity::labels::Labels; @@ -58,24 +58,24 @@ impl AsRef for Dataset { #[allow(dead_code)] pub struct TrustifyTestContext { pub db: Database, + /// The PostgreSQL database port + pub port: u16, pub graph: Graph, pub storage: FileSystemBackend, pub ingestor: IngestorService, pub mem_limit_mb: f32, - pub postgresql: Option, - /// Temp directory resource, will be deleted when dropped - _tmp: TempDir, + resources: ResourceStack, } #[global_allocator] static PEAK_ALLOC: PeakAlloc = PeakAlloc; impl TrustifyTestContext { - async fn new( + fn new( db: Database, + port: u16, storage: FileSystemBackend, - tmp: TempDir, - postgresql: impl Into>, + resources: impl Into, ) -> Self { let graph = Graph::new(db.clone()); let ingestor = IngestorService::new(graph.clone(), storage.clone(), Default::default()); @@ -86,12 +86,12 @@ impl TrustifyTestContext { Self { db, + port, graph, storage, ingestor, mem_limit_mb, - postgresql: postgresql.into(), - _tmp: tmp, + resources: resources.into(), } } @@ -257,7 +257,9 @@ $$; .await?) } - pub(crate) fn teardown(&self) { + pub(crate) async fn teardown(self) { + self.resources.drop().await; + let peak_mem = PEAK_ALLOC.peak_usage_as_mb(); let args: Vec = env::args().collect(); // Prints the error message when running the tests with threads=1 diff --git a/test-context/src/resource.rs b/test-context/src/resource.rs new file mode 100644 index 000000000..728f8a065 --- /dev/null +++ b/test-context/src/resource.rs @@ -0,0 +1,96 @@ +use futures::future::BoxFuture; + +#[derive(Default)] +pub struct ResourceStack { + resources: Vec>, +} + +pub trait TestResource: Send + 'static { + fn drop(self: Box) -> BoxFuture<'static, ()>; +} + +pub trait TestResourceExt { + fn then(self, other: impl TestResource) -> Box; +} + +impl TestResourceExt for R { + fn then(self, other: impl TestResource) -> Box { + Box::new(vec![ + Box::new(self) as Box, + Box::new(other) as Box, + ]) + } +} + +impl TestResourceExt for Box { + fn then(self, other: impl TestResource) -> Box { + Box::new(vec![self, Box::new(other) as Box]) + } +} + +impl ResourceStack { + pub fn new() -> Self { + Self::default() + } + + pub async fn drop(mut self) { + while let Some(r) = self.resources.pop() { + TestResource::drop(r).await; + } + } + + pub fn then(mut self, r: impl TestResource) -> Self { + self.resources.push(Box::new(r)); + self + } +} + +impl TestResource for Vec> { + fn drop(mut self: Box) -> BoxFuture<'static, ()> { + Box::pin(async move { + while let Some(r) = self.pop() { + r.drop().await; + } + }) + } +} + +impl From for ResourceStack { + fn from(value: R) -> Self { + ResourceStack { + resources: vec![Box::new(value)], + } + } +} + +impl From> for ResourceStack { + fn from(value: Box) -> Self { + ResourceStack { + resources: vec![value], + } + } +} + +impl From<()> for ResourceStack { + fn from(_: ()) -> Self { + ResourceStack::new() + } +} + +#[allow(drop_bounds)] +pub fn defer(d: impl Drop + Send + 'static) -> impl TestResource { + struct Defer(D); + + impl TestResource for Defer { + fn drop(self: Box) -> BoxFuture<'static, ()> { + let value = self; + Box::pin(async move { + tokio::task::spawn_blocking(move || drop(value)) + .await + .expect("failed to await spawn_blocking"); + }) + } + } + + Defer(d) +} From d7e4b97b9f7b32bb9164a2534f0678c95fb152ca Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Wed, 4 Mar 2026 16:29:58 +0100 Subject: [PATCH 11/22] ci: run ci check on all main OSes --- .github/workflows/ci.yaml | 53 +++++++++++++++---- .../src/ctx/migration/snapshot/mod.rs | 1 + 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e2adb49c3..8ca9d04aa 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -18,7 +18,50 @@ env: jobs: + check: + name: Check + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: + - windows-2025 + - ubuntu-24.04 + - macos-15 + include: + - os: windows-2025 + install: | + git config --system core.longpaths true + echo "VCPKG_ROOT=$env:VCPKG_INSTALLATION_ROOT" | Out-File -FilePath $env:GITHUB_ENV -Append + vcpkg install openssl:x64-windows-static-md + + steps: + - uses: actions/checkout@v5 + - uses: Swatinem/rust-cache@v2 + + - name: Cache Theseus Postgresql Installation + uses: actions/cache@v4 + with: + path: ~/.theseus/postgresql + key: ${{ runner.os }}-theseus-postgresql-${{ hashFiles('**/Cargo.lock') }} + + - name: Install dependencies + if: matrix.install != '' + run: ${{ matrix.install }} + + - name: Check + env: + GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" # for embedded postgresql + run: cargo check + - name: Clippy + env: + GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" # for embedded postgresql + run: cargo clippy --all-targets --all-features -- -D warnings -D clippy::unwrap_used -D clippy::expect_used + ci: + needs: + - check runs-on: ubuntu-24.04 steps: @@ -62,16 +105,6 @@ jobs: - name: Format run: cargo fmt --check - - name: Check - env: - GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" # for embedded postgresql - run: cargo check - - - name: Clippy - env: - GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}" # for embedded postgresql - run: cargo clippy --all-targets --all-features -- -D warnings -D clippy::unwrap_used -D clippy::expect_used - - name: Start minio run: | docker compose -f etc/deploy/compose/compose-minio.yaml up -d --wait diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs index bfc86df08..b054b4e82 100644 --- a/test-context/src/ctx/migration/snapshot/mod.rs +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -33,6 +33,7 @@ impl Snapshot { Ok(TrustifyTestContext::new( db, + psql.settings().port, storage, defer(psql).then(defer(tmp)), )) From 07f8ed1007106d3835bc5568c6b47d05ee653875 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Wed, 4 Mar 2026 17:28:54 +0100 Subject: [PATCH 12/22] chore: ignore warning on non-linux OSes --- test-context/src/ctx/migration/snapshot/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs index b054b4e82..95b7cdedd 100644 --- a/test-context/src/ctx/migration/snapshot/mod.rs +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -17,6 +17,7 @@ use trustify_module_storage::service::fs::FileSystemBackend; use walkdir::WalkDir; pub struct Snapshot { + #[allow(unused)] pub id: String, pub base: PathBuf, pub db_file: String, From ce422836ddf0b0f0c42a7a1c80cb35788e3ab547 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 10 Mar 2026 17:00:20 +0100 Subject: [PATCH 13/22] chore: work towards snapshot loading --- Cargo.lock | 1 + test-context/Cargo.toml | 1 + test-context/src/ctx/migration/mod.rs | 14 ++- .../src/ctx/migration/snapshot/btrfs.rs | 97 +++++++++++++++++-- .../src/ctx/migration/snapshot/mod.rs | 27 +++++- test-context/src/migration.rs | 84 +++++++++++----- 6 files changed, 185 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d3294430..d55078d79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8716,6 +8716,7 @@ dependencies = [ "actix-http", "actix-web", "anyhow", + "async-compression", "async-tar", "bytes", "futures", diff --git a/test-context/Cargo.toml b/test-context/Cargo.toml index af4fd4f41..58ee2b049 100644 --- a/test-context/Cargo.toml +++ b/test-context/Cargo.toml @@ -48,6 +48,7 @@ zip = { workspace = true } uuid = { version = "1.21.0", features = ["v4"] } [target.'cfg(target_os = "linux")'.dependencies] +async-compression = { workspace = true, features = ["tokio", "zstd", "xz"] } nix = { version = "0.31.2", features = ["fs"] } [dev-dependencies] diff --git a/test-context/src/ctx/migration/mod.rs b/test-context/src/ctx/migration/mod.rs index eb76ce27f..f15fe4e48 100644 --- a/test-context/src/ctx/migration/mod.rs +++ b/test-context/src/ctx/migration/mod.rs @@ -6,6 +6,7 @@ use crate::{ migration::{Dump, Dumps, Migration}, }; use anyhow::Context; +use migration::Iden; use std::{borrow::Cow, marker::PhantomData, ops::Deref}; use test_context::AsyncTestContext; use uuid::Uuid; @@ -145,19 +146,24 @@ impl TrustifyMigrationContext { let source = ID::dump_id(); let source_id = source.id(); + let dumps = Dumps::new()?; + let snapshot = match source { Source::Migration(migration) => { let id: Cow<'static, str> = match migration { Some(id) => format!("commit-{id}").into(), None => "latest".into(), }; - let migration = Migration::new().context("failed to create migration manager")?; + let migration = + Migration::new(dumps.clone()).context("failed to create migration manager")?; let base = migration.provide(&id).await?; Snapshot { id: source_id, + dumps, base, db_file: "dump.sql.xz".to_string(), storage_file: "dump.tar".to_string(), + snapshot_file: None, strip: 0, fix_zstd: false, } @@ -171,7 +177,9 @@ impl TrustifyMigrationContext { strip, fix_zstd, }) => { - let base = Dumps::new()? + let snapshot_file = Snapshot::is_supported().then(|| "snapshot.tar.xz".to_string()); + + let base = dumps .provide(Dump { url: base_url, files: &[db_file, storage_file], @@ -179,10 +187,12 @@ impl TrustifyMigrationContext { }) .await?; Snapshot { + dumps, id: source_id, base, db_file: db_file.to_string(), storage_file: storage_file.to_string(), + snapshot_file, strip, fix_zstd, } diff --git a/test-context/src/ctx/migration/snapshot/btrfs.rs b/test-context/src/ctx/migration/snapshot/btrfs.rs index 881699f35..396d65ca1 100644 --- a/test-context/src/ctx/migration/snapshot/btrfs.rs +++ b/test-context/src/ctx/migration/snapshot/btrfs.rs @@ -1,5 +1,7 @@ +use crate::migration::Dumps; use crate::resource::TestResource; use anyhow::{Context, anyhow, bail}; +use async_compression::tokio::bufread::XzDecoder; use futures::future::BoxFuture; use postgresql_embedded::{PostgreSQL, Settings}; use std::{ @@ -13,6 +15,10 @@ use tokio::fs; use trustify_common::{config, db::Database}; use trustify_module_storage::service::fs::FileSystemBackend; +pub fn is_supported() -> bool { + Command::new().is_ok() +} + #[derive(Clone, Debug)] struct Command { pub btrfs: PathBuf, @@ -86,6 +92,82 @@ You can set `TRUST_TEST_BTRFS_STORE` to a directory on a BTRFS volume mounted wi } } +pub struct SnapshotProvider { + pub id: String, + /// Migration download base + pub base: PathBuf, + pub dumps: Dumps, +} + +impl SnapshotProvider { + async fn provide(&self, btrfs: &Command) -> anyhow::Result> { + let template = btrfs.store.join("templates").join(&self.id); + if template.is_dir() { + return Ok(Some(BtrfsSnapshot { + btrfs: btrfs.clone(), + id: self.id.to_string(), + })); + } + + // detect existing snapshot file + + if self.load_snapshot(&btrfs).await? { + log::info!("Imported new snapshot"); + return Ok(Some(BtrfsSnapshot { + btrfs: btrfs.clone(), + id: self.id.to_string(), + })); + } + + Ok(None) + } + + /// if the snapshot file exists, do load it + async fn load_snapshot(&self, btrfs: &Command) -> anyhow::Result { + let snapshot_file = self.base.join("archives").join(&self.id); + + // get remote file to snapshot_file + if !snapshot_file.is_file() && !download_file("", &snapshot_file) { + // file wasn't there and couldn't be downloaded + log::info!( + "Snapshot file doesn't exist and couldn't be downloaded: {}", + snapshot_file.display() + ); + return Ok(false); + } + + log::info!("Extracting snapshot from: {}", snapshot_file.display()); + + let templates = btrfs.store.join("templates"); + fs::create_dir_all(&templates).await?; + + let target = templates.join(&self.id); + + btrfs + .execute([ + OsStr::new("subvolume"), + OsStr::new("create"), + target.as_os_str(), + ]) + .await?; + + let file = fs::File::open(&snapshot_file).await.with_context(|| { + format!("failed to open snapshot file: {}", snapshot_file.display()) + })?; + + let decoder = XzDecoder::new(tokio::io::BufReader::new(file)); + let archive = async_tar::Archive::new(decoder); + archive + .unpack(&target) + .await + .context("failed to extract snapshot into subvolume")?; + + log::info!("Snapshot extracted into templates/{}", self.id); + + Ok(true) + } +} + /// A running content instance #[derive(Debug)] pub enum Running { @@ -97,8 +179,10 @@ pub enum Running { Collecting(Collect), } +const BTRFS_SNAPSHOT_FILE: &str = "snapshot.tar.xz"; + impl Running { - pub async fn new(id: impl Into) -> anyhow::Result { + pub async fn new(provider: SnapshotProvider) -> anyhow::Result { let btrfs = match Command::new() { Ok(btrfs) => btrfs, Err(err) => { @@ -107,18 +191,15 @@ impl Running { } }; - let id = id.into(); + // detect existing template - // detect existing - - let template = btrfs.store.join("templates").join(&id); - if template.is_dir() { - return Ok(Running::Existing(BtrfsSnapshot { btrfs, id })); + if let Some(snapshot) = provider.provide(&btrfs).await? { + return Ok(Running::Existing(snapshot)); } // return new collecting - Ok(Running::Collecting(Collect::new(btrfs, id).await?)) + Ok(Running::Collecting(Collect::new(btrfs, provider.id).await?)) } } diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs index 95b7cdedd..0ccb34f6b 100644 --- a/test-context/src/ctx/migration/snapshot/mod.rs +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -1,6 +1,8 @@ #[cfg(target_os = "linux")] pub mod btrfs; +use crate::ctx::migration::snapshot::btrfs::SnapshotProvider; +use crate::migration::Dumps; use crate::{TrustifyTestContext, resource::TestResourceExt, resource::defer}; use anyhow::Context; use postgresql_embedded::{PostgreSQL, Settings}; @@ -19,15 +21,17 @@ use walkdir::WalkDir; pub struct Snapshot { #[allow(unused)] pub id: String, + pub dumps: Dumps, pub base: PathBuf, pub db_file: String, pub storage_file: String, + pub snapshot_file: Option, pub strip: usize, pub fix_zstd: bool, } +#[cfg(not(target_os = "linux"))] impl Snapshot { - #[cfg(not(target_os = "linux"))] pub async fn materialize(self) -> anyhow::Result { let tmp = tempfile::TempDir::new()?; let (db, storage, psql) = self.setup(&tmp).await?; @@ -40,14 +44,29 @@ impl Snapshot { )) } + pub fn is_supported() -> bool { + false + } +} + +#[cfg(target_os = "linux")] +impl Snapshot { + pub fn is_supported() -> bool { + btrfs::is_supported() + } + /// Ensure that a snapshot, with data, is available /// /// Either by running with a fresh import in a temporary directory. Or, if available, with /// a BTRFS snapshot of an import. If such a snapshot doesn't exist yet, it will be created /// first. - #[cfg(target_os = "linux")] pub async fn materialize(self) -> anyhow::Result { - let running = btrfs::Running::new(&self.id).await?; + let running = btrfs::Running::new(SnapshotProvider { + id: self.id.clone(), + base: self.base.clone(), + dumps: self.dumps.clone(), + }) + .await?; log::info!("Snapshot state: {running:?}"); @@ -108,9 +127,11 @@ impl Snapshot { let Self { id: _, + dumps: _, base, db_file, storage_file, + snapshot_file: _, // we don't work with the snapshot file strip, fix_zstd, } = self; diff --git a/test-context/src/migration.rs b/test-context/src/migration.rs index 6153f0147..5a3002d10 100644 --- a/test-context/src/migration.rs +++ b/test-context/src/migration.rs @@ -2,6 +2,7 @@ use anyhow::{Context, anyhow}; use futures::StreamExt; use git2::{BranchType, Repository}; use indicatif::{ProgressBar, ProgressStyle}; +use reqwest::{StatusCode, Url}; use sha2::Digest; use std::{env, fs::File, path::Path, path::PathBuf}; use tokio::{ @@ -13,16 +14,15 @@ use tokio::{ /// Manage the download of migration dumps #[derive(Debug)] pub struct Migration { - dump: Dumps, + dumps: Dumps, branch: String, - region: String, bucket: String, } impl Migration { /// Create a new instance, detecting paths and the branch - pub fn new() -> anyhow::Result { + pub fn new(dumps: Dumps) -> anyhow::Result { // get the base of the source code let cwd: PathBuf = match option_env!("CARGO_MANIFEST_DIR") { @@ -47,7 +47,7 @@ impl Migration { // done Ok(Self { - dump: Dumps::new()?, + dumps, branch, region, bucket, @@ -58,7 +58,7 @@ impl Migration { /// /// This may include downloading content from S3. pub async fn provide(&self, commit: &str) -> anyhow::Result { - self.dump + self.dumps .provide_raw( "migration", Dump { @@ -260,11 +260,12 @@ async fn validate_checksums( /// just download artifacts (and their digest files) from the dump bucket async fn download_artifacts_raw( - client: reqwest::Client, + client: &reqwest::Client, base: impl AsRef, base_url: &str, digests: bool, files: impl IntoIterator>, + ignore_missing: bool, ) -> anyhow::Result<()> { let base = base.as_ref(); @@ -280,7 +281,14 @@ async fn download_artifacts_raw( log::info!("downloading file: '{url}'"); - let response = client.get(&url).send().await?.error_for_status()?; + let response = client.get(&url).send().await?; + + if ignore_missing && response.status() == StatusCode::NOT_FOUND { + log::info!("Ignoring missing file: {url}"); + continue; + } + + let response = response.error_for_status()?; let total_size = response.content_length(); log::info!("total size: {total_size:?}"); @@ -327,7 +335,7 @@ pub struct Dump<'a, S: AsRef + 'a> { } /// Manage raw dump downloads -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Dumps { base: PathBuf, } @@ -381,12 +389,49 @@ impl Dumps { log::debug!("base path: '{}'", base.display()); - fs::create_dir_all(&base).await?; + let client = reqwest::Client::new(); + + // holding the lock + + self.with_lock(&base, async || { + // download files + + if files.iter().any(|file| !base.join(file).exists()) { + download_artifacts_raw(&client, &base, url, digests, &files, false).await? + } else { + log::debug!("dump files already exist"); + } + + // validate checksums + + if digests { + validate_checksums(&base, files).await?; + } else { + log::debug!("Skip checking digests"); + } + + Ok(()) + }) + .await?; + + Ok(base) + } + + async fn with_lock( + &self, + base: &Path, + f: impl AsyncFnOnce() -> anyhow::Result, + ) -> anyhow::Result { + // lock + + log::debug!("base path: '{}'", base.display()); + + fs::create_dir_all(base).await?; // lock file, we can't lock directories cross-platform let lock = task::spawn_blocking({ - let base = base.clone(); + let base = base.to_owned(); move || { let lock = File::create(base.join(".lock"))?; log::debug!("Waiting for lock file"); @@ -398,29 +443,16 @@ impl Dumps { }) .await??; - // holding the lock + // with lock - if files.iter().any(|file| !base.join(file).exists()) { - let client = reqwest::Client::new(); - download_artifacts_raw(client, &base, url, digests, &files).await? - } else { - log::debug!("dump files already exist"); - } - - // validate checksums - - if digests { - validate_checksums(&base, files).await?; - } else { - log::debug!("Skip checking digests"); - } + let result = f().await; // unlock log::debug!("releasing lock"); lock.unlock()?; - Ok(base) + result } } From 346be295ffb6e49afb786a2cbea158ea0d9ec9c8 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Thu, 12 Mar 2026 10:23:04 +0100 Subject: [PATCH 14/22] feat: work with snapshot archives This speeds things up, starting a new database. Assisted-by: Claude Code --- test-context/src/ctx/migration/mod.rs | 21 ++++--- .../src/ctx/migration/snapshot/btrfs.rs | 50 ++++++++++------- .../src/ctx/migration/snapshot/mod.rs | 15 +++-- test-context/src/migration.rs | 55 ++++++++----------- 4 files changed, 74 insertions(+), 67 deletions(-) diff --git a/test-context/src/ctx/migration/mod.rs b/test-context/src/ctx/migration/mod.rs index f15fe4e48..fca63ef42 100644 --- a/test-context/src/ctx/migration/mod.rs +++ b/test-context/src/ctx/migration/mod.rs @@ -6,7 +6,6 @@ use crate::{ migration::{Dump, Dumps, Migration}, }; use anyhow::Context; -use migration::Iden; use std::{borrow::Cow, marker::PhantomData, ops::Deref}; use test_context::AsyncTestContext; use uuid::Uuid; @@ -155,11 +154,12 @@ impl TrustifyMigrationContext { None => "latest".into(), }; let migration = - Migration::new(dumps.clone()).context("failed to create migration manager")?; - let base = migration.provide(&id).await?; + Migration::new(&id).context("failed to create migration manager")?; + + let base = dumps.provide_raw("migration", migration.as_dump()).await?; + Snapshot { id: source_id, - dumps, base, db_file: "dump.sql.xz".to_string(), storage_file: "dump.tar".to_string(), @@ -177,22 +177,27 @@ impl TrustifyMigrationContext { strip, fix_zstd, }) => { - let snapshot_file = Snapshot::is_supported().then(|| "snapshot.tar.xz".to_string()); + let snapshot_file = Snapshot::is_supported().then(|| "snapshot.tar.xz"); + + let files: Vec<_> = [db_file, storage_file] + .into_iter() + .chain(snapshot_file) + .collect(); let base = dumps .provide(Dump { url: base_url, - files: &[db_file, storage_file], + files: files.as_slice(), digests, }) .await?; + Snapshot { - dumps, id: source_id, base, db_file: db_file.to_string(), storage_file: storage_file.to_string(), - snapshot_file, + snapshot_file: snapshot_file.map(ToOwned::to_owned), strip, fix_zstd, } diff --git a/test-context/src/ctx/migration/snapshot/btrfs.rs b/test-context/src/ctx/migration/snapshot/btrfs.rs index 396d65ca1..b87535852 100644 --- a/test-context/src/ctx/migration/snapshot/btrfs.rs +++ b/test-context/src/ctx/migration/snapshot/btrfs.rs @@ -1,4 +1,3 @@ -use crate::migration::Dumps; use crate::resource::TestResource; use anyhow::{Context, anyhow, bail}; use async_compression::tokio::bufread::XzDecoder; @@ -8,11 +7,14 @@ use std::{ ffi::OsStr, fmt::Debug, io, + os::unix::fs::PermissionsExt, path::{Path, PathBuf}, + time::Duration, }; use tempfile::TempDir; use tokio::fs; use trustify_common::{config, db::Database}; +use trustify_db::embedded::default_settings; use trustify_module_storage::service::fs::FileSystemBackend; pub fn is_supported() -> bool { @@ -94,9 +96,8 @@ You can set `TRUST_TEST_BTRFS_STORE` to a directory on a BTRFS volume mounted wi pub struct SnapshotProvider { pub id: String, - /// Migration download base - pub base: PathBuf, - pub dumps: Dumps, + /// snapshot archive + pub file: Option, } impl SnapshotProvider { @@ -124,17 +125,13 @@ impl SnapshotProvider { /// if the snapshot file exists, do load it async fn load_snapshot(&self, btrfs: &Command) -> anyhow::Result { - let snapshot_file = self.base.join("archives").join(&self.id); + // check if snapshot archive is present - // get remote file to snapshot_file - if !snapshot_file.is_file() && !download_file("", &snapshot_file) { - // file wasn't there and couldn't be downloaded - log::info!( - "Snapshot file doesn't exist and couldn't be downloaded: {}", - snapshot_file.display() - ); + let Some(snapshot_file) = &self.file else { + // file wasn't there + log::info!("Snapshot file wasn't available"); return Ok(false); - } + }; log::info!("Extracting snapshot from: {}", snapshot_file.display()); @@ -179,8 +176,6 @@ pub enum Running { Collecting(Collect), } -const BTRFS_SNAPSHOT_FILE: &str = "snapshot.tar.xz"; - impl Running { pub async fn new(provider: SnapshotProvider) -> anyhow::Result { let btrfs = match Command::new() { @@ -236,16 +231,33 @@ impl BtrfsSnapshot { let storage = FileSystemBackend::for_test_in(running.join("storage")).await?; let db_base = running.join("db"); + fs::set_permissions(&db_base, std::fs::Permissions::from_mode(0o700)).await?; + let settings = Settings { - data_dir: db_base.join("data"), + // data_dir: db_base.join("data"), + data_dir: db_base, temporary: false, - ..Default::default() + timeout: Some(Duration::from_mins(2)), + username: "trustify".to_string(), + ..default_settings()? }; let mut psql = PostgreSQL::new(settings); psql.setup().await?; - psql.start().await?; + psql.start().await.inspect_err(|_| { + let log = std::fs::read_to_string(psql.settings().data_dir.join("start.log")) + .unwrap_or_default(); + log::info!("{}", log); + })?; - let db = config::Database::from_port(psql.settings().port)?; + let db = config::Database { + url: None, + username: psql.settings().username.clone(), + password: psql.settings().password.clone().into(), + host: psql.settings().host.clone(), + port: psql.settings().port, + name: "trustify".into(), + ..config::Database::from_env()? + }; // done diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs index 0ccb34f6b..6e1ead65c 100644 --- a/test-context/src/ctx/migration/snapshot/mod.rs +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -1,9 +1,11 @@ #[cfg(target_os = "linux")] pub mod btrfs; -use crate::ctx::migration::snapshot::btrfs::SnapshotProvider; -use crate::migration::Dumps; -use crate::{TrustifyTestContext, resource::TestResourceExt, resource::defer}; +use crate::{ + TrustifyTestContext, + ctx::migration::snapshot::btrfs::SnapshotProvider, + resource::{TestResourceExt, defer}, +}; use anyhow::Context; use postgresql_embedded::{PostgreSQL, Settings}; use std::{ @@ -21,7 +23,6 @@ use walkdir::WalkDir; pub struct Snapshot { #[allow(unused)] pub id: String, - pub dumps: Dumps, pub base: PathBuf, pub db_file: String, pub storage_file: String, @@ -63,15 +64,14 @@ impl Snapshot { pub async fn materialize(self) -> anyhow::Result { let running = btrfs::Running::new(SnapshotProvider { id: self.id.clone(), - base: self.base.clone(), - dumps: self.dumps.clone(), + file: self.snapshot_file.as_ref().map(|file| self.base.join(file)), }) .await?; log::info!("Snapshot state: {running:?}"); Ok(match running { - // we are running with a normal, temporary directory + // We are running with a normal, temporary directory btrfs::Running::Temporary(tmp) => { // set up the content in the target directory let (db, storage, psql) = self.setup(&tmp).await?; @@ -127,7 +127,6 @@ impl Snapshot { let Self { id: _, - dumps: _, base, db_file, storage_file, diff --git a/test-context/src/migration.rs b/test-context/src/migration.rs index 5a3002d10..4a7cc59d3 100644 --- a/test-context/src/migration.rs +++ b/test-context/src/migration.rs @@ -2,7 +2,7 @@ use anyhow::{Context, anyhow}; use futures::StreamExt; use git2::{BranchType, Repository}; use indicatif::{ProgressBar, ProgressStyle}; -use reqwest::{StatusCode, Url}; +use reqwest::StatusCode; use sha2::Digest; use std::{env, fs::File, path::Path, path::PathBuf}; use tokio::{ @@ -14,15 +14,12 @@ use tokio::{ /// Manage the download of migration dumps #[derive(Debug)] pub struct Migration { - dumps: Dumps, - branch: String, - region: String, - bucket: String, + url: String, } impl Migration { /// Create a new instance, detecting paths and the branch - pub fn new(dumps: Dumps) -> anyhow::Result { + pub fn new(commit: &str) -> anyhow::Result { // get the base of the source code let cwd: PathBuf = match option_env!("CARGO_MANIFEST_DIR") { @@ -47,32 +44,24 @@ impl Migration { // done Ok(Self { - dumps, - branch, - region, - bucket, + url: format!( + "https://{bucket}.s3.{region}.amazonaws.com/{branch}/{commit}", + bucket = bucket, + region = region, + branch = branch, + ), }) } /// Provide the base dump path, for this branch. /// /// This may include downloading content from S3. - pub async fn provide(&self, commit: &str) -> anyhow::Result { - self.dumps - .provide_raw( - "migration", - Dump { - url: &format!( - "https://{bucket}.s3.{region}.amazonaws.com/{branch}/{commit}", - bucket = self.bucket, - region = self.region, - branch = self.branch, - ), - files: &["dump.sql.xz", "dump.tar"], - digests: true, - }, - ) - .await + pub fn as_dump(&self) -> Dump<'_, &'static str> { + Dump { + url: &self.url, + files: &["dump.sql.xz", "dump.tar"], + digests: true, + } } } @@ -370,7 +359,7 @@ impl Dumps { /// Provide the base dump path, for this branch. /// /// This may include downloading content from S3. - async fn provide_raw<'a, S>( + pub async fn provide_raw<'a, S>( &self, r#rtype: &str, Dump { @@ -396,11 +385,13 @@ impl Dumps { self.with_lock(&base, async || { // download files - if files.iter().any(|file| !base.join(file).exists()) { - download_artifacts_raw(&client, &base, url, digests, &files, false).await? - } else { - log::debug!("dump files already exist"); - } + let (existing, missing) = files + .iter() + .partition::, _>(|file| base.join(file).exists()); + + download_artifacts_raw(&client, &base, url, digests, &missing, false).await?; + + log::info!("Already existed: {}", existing.join(", ")); // validate checksums From 9e2fb82f8b5fd717962c29fb4f46d0d919d6920a Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 16 Mar 2026 13:35:08 +0100 Subject: [PATCH 15/22] chore: fix clippy warnings --- test-context/src/ctx/migration/mod.rs | 2 +- test-context/src/ctx/migration/snapshot/btrfs.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test-context/src/ctx/migration/mod.rs b/test-context/src/ctx/migration/mod.rs index fca63ef42..ec0471162 100644 --- a/test-context/src/ctx/migration/mod.rs +++ b/test-context/src/ctx/migration/mod.rs @@ -177,7 +177,7 @@ impl TrustifyMigrationContext { strip, fix_zstd, }) => { - let snapshot_file = Snapshot::is_supported().then(|| "snapshot.tar.xz"); + let snapshot_file = Snapshot::is_supported().then_some("snapshot.tar.xz"); let files: Vec<_> = [db_file, storage_file] .into_iter() diff --git a/test-context/src/ctx/migration/snapshot/btrfs.rs b/test-context/src/ctx/migration/snapshot/btrfs.rs index b87535852..59042aca9 100644 --- a/test-context/src/ctx/migration/snapshot/btrfs.rs +++ b/test-context/src/ctx/migration/snapshot/btrfs.rs @@ -112,7 +112,7 @@ impl SnapshotProvider { // detect existing snapshot file - if self.load_snapshot(&btrfs).await? { + if self.load_snapshot(btrfs).await? { log::info!("Imported new snapshot"); return Ok(Some(BtrfsSnapshot { btrfs: btrfs.clone(), From 2f9e1ce689a2774c712395e2e3a7a36492bc98ba Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 16 Mar 2026 13:35:22 +0100 Subject: [PATCH 16/22] chore: raise recursion limit --- modules/ingestor/tests/parallel.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/ingestor/tests/parallel.rs b/modules/ingestor/tests/parallel.rs index b23b30088..82735cf47 100644 --- a/modules/ingestor/tests/parallel.rs +++ b/modules/ingestor/tests/parallel.rs @@ -1,4 +1,6 @@ +//! Testing parallel operations #![recursion_limit = "512"] + use bytes::Bytes; use csaf::Csaf; use serde_json::Value; From b70894a21f819b9c0fb5831436ce35538aff294e Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 16 Mar 2026 13:38:22 +0100 Subject: [PATCH 17/22] fix: use a dedicated DB pool only for out-of-band data migrations In case of in-band migrations, the sea orm migration manager will execute all migrations inside a single transaction. This leads to the problem, that concurrent operations are not possible. However, there can be the strategy to run data migrations upfront, using concurrency. And then re-run the migrations, but skipping the data migration. --- Cargo.lock | 1 + Cargo.toml | 1 + migration/Cargo.toml | 1 + migration/src/data/migration.rs | 31 +++++++++++--------- migration/src/data/mod.rs | 34 ++++++++++++++-------- migration/src/data/run.rs | 50 ++++++++++++++++++++++---------- migration/tests/data/m0002010.rs | 29 ++++++++++++++++-- 7 files changed, 104 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d55078d79..172f09e43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8322,6 +8322,7 @@ dependencies = [ "futures-util", "humantime", "indicatif", + "num_cpus", "osv", "sea-orm", "sea-orm-migration", diff --git a/Cargo.toml b/Cargo.toml index afeab4b5f..757046541 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,7 @@ mime = "0.3.17" moka = "0.12.10" native-tls = "0.2" num-traits = "0.2" +num_cpus = "1" oci-client = "0.16.0" openid = "0.22.0" openssl = "0.10" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 0c1a0c86f..7682d646a 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } indicatif = { workspace = true, features = ["tokio", "futures"] } +num_cpus = { workspace = true } osv = { workspace = true, features = ["schema"] } sea-orm = { workspace = true } sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres", "with-uuid"] } diff --git a/migration/src/data/migration.rs b/migration/src/data/migration.rs index ccc1bb6bb..c01ef9d3f 100644 --- a/migration/src/data/migration.rs +++ b/migration/src/data/migration.rs @@ -4,7 +4,7 @@ use crate::{ }; use clap::Parser; use futures::executor::block_on; -use sea_orm::DbErr; +use sea_orm::{DatabaseConnection, DbErr}; use sea_orm_migration::{MigrationName, MigrationTrait, SchemaManager}; use std::{ffi::OsString, ops::Deref, sync::LazyLock}; use tokio::task_local; @@ -79,18 +79,10 @@ impl MigrationWithData { } } -impl From for MigrationWithData -where - M: MigrationTraitWithData + 'static, -{ - fn from(value: M) -> Self { - MigrationWithData::new(Box::new(value)) - } -} - /// A [`SchemaManager`], extended with data migration features. pub struct SchemaDataManager<'c> { pub manager: &'c SchemaManager<'c>, + pub db: Option<&'c DatabaseConnection>, storage: &'c DispatchBackend, options: &'c Options, } @@ -106,11 +98,13 @@ impl<'a> Deref for SchemaDataManager<'a> { impl<'c> SchemaDataManager<'c> { pub fn new( manager: &'c SchemaManager<'c>, + db: Option<&'c DatabaseConnection>, storage: &'c DispatchBackend, options: &'c Options, ) -> Self { Self { manager, + db, storage, options, } @@ -126,7 +120,18 @@ impl<'c> SchemaDataManager<'c> { return Ok(()); } - self.manager.process(self.storage, self.options, f).await + match self.db { + Some(db) => { + self.manager + .process(db, self.storage, self.options, f) + .await + } + None => { + self.manager + .process(self.manager.get_connection(), self.storage, self.options, f) + .await + } + } } } @@ -141,7 +146,7 @@ impl MigrationTrait for MigrationWithData { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { MigrationTraitWithData::up( &*self.migration, - &SchemaDataManager::new(manager, &self.storage, &self.options), + &SchemaDataManager::new(manager, None, &self.storage, &self.options), ) .await .inspect_err(|err| tracing::warn!("Migration failed: {err}")) @@ -150,7 +155,7 @@ impl MigrationTrait for MigrationWithData { async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { MigrationTraitWithData::down( &*self.migration, - &SchemaDataManager::new(manager, &self.storage, &self.options), + &SchemaDataManager::new(manager, None, &self.storage, &self.options), ) .await } diff --git a/migration/src/data/mod.rs b/migration/src/data/mod.rs index 97ef9de2b..56b2efcbd 100644 --- a/migration/src/data/mod.rs +++ b/migration/src/data/mod.rs @@ -13,12 +13,10 @@ use futures_util::{ stream::{self, TryStreamExt}, }; use indicatif::{ProgressBar, ProgressStyle}; -use sea_orm::{DatabaseTransaction, DbErr, TransactionTrait}; +use sea_orm::{ConnectionTrait, DatabaseTransaction, DbErr, TransactionTrait}; use sea_orm_migration::{MigrationTrait, SchemaManager}; -use std::{ - num::{NonZeroU64, NonZeroUsize}, - sync::Arc, -}; +use std::{num::NonZeroU64, sync::Arc}; +use tracing::log; use trustify_module_storage::service::dispatch::DispatchBackend; /// A handler for processing a [`Document`] data migration. @@ -43,8 +41,10 @@ where #[derive(Clone, Debug, PartialEq, Eq, clap::Parser)] pub struct Options { /// Number of concurrent documents being processes - #[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "5")] - pub concurrent: NonZeroUsize, + /// + /// If the value is zero, use the number of logical CPUs + #[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "0")] + pub concurrent: usize, /// The instance number of the current runner (zero based) #[arg(long, env = "MIGRATION_DATA_CURRENT_RUNNER", default_value = "0")] @@ -70,7 +70,7 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { - concurrent: unsafe { NonZeroUsize::new_unchecked(5) }, + concurrent: 5, current: 0, total: unsafe { NonZeroU64::new_unchecked(1) }, skip_all: false, @@ -118,6 +118,7 @@ impl From<&Options> for Partition { pub trait DocumentProcessor { fn process( &self, + db: &(impl ConnectionTrait + TransactionTrait), storage: &DispatchBackend, options: &Options, f: impl Handler, @@ -165,6 +166,7 @@ impl<'c> DocumentProcessor for SchemaManager<'c> { /// actual system is still running from the read-only clone of the original data. async fn process( &self, + db: &(impl ConnectionTrait + TransactionTrait), storage: &DispatchBackend, options: &Options, f: impl Handler, @@ -173,7 +175,6 @@ impl<'c> DocumentProcessor for SchemaManager<'c> { D: Document, { let partition: Partition = options.into(); - let db = self.get_connection(); let tx = db.begin().await?; let all: Vec<_> = D::all(&tx) @@ -189,12 +190,21 @@ impl<'c> DocumentProcessor for SchemaManager<'c> { ProgressStyle::with_template( "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} ({per_sec}) ({eta})", ) - .map_err(|err| DbErr::Migration(err.to_string()))? - .progress_chars("##-"), + .map_err(|err| DbErr::Migration(err.to_string()))? + .progress_chars("#>-"), ); let pb = Some(pb); + // get concurrency value + let mut concurrent = options.concurrent; + if concurrent == 0 { + // if zero, use number of logical CPUs + concurrent = num_cpus::get(); + } + + log::info!("Running {concurrent} parallel operations"); + stream::iter(all) .map(async |model| { let tx = db.begin().await?; @@ -220,7 +230,7 @@ impl<'c> DocumentProcessor for SchemaManager<'c> { Ok::<_, DbErr>(()) }) - .buffer_unordered(options.concurrent.into()) + .buffer_unordered(concurrent) .try_collect::>() .await?; diff --git a/migration/src/data/run.rs b/migration/src/data/run.rs index b576870fa..8ece1d40d 100644 --- a/migration/src/data/run.rs +++ b/migration/src/data/run.rs @@ -1,6 +1,6 @@ use crate::data::{MigratorWithData, Options, SchemaDataManager}; use anyhow::bail; -use sea_orm::ConnectOptions; +use sea_orm::{ConnectOptions, DatabaseConnection, DbErr}; use sea_orm_migration::{IntoSchemaManagerConnection, SchemaManager}; use std::{collections::HashMap, time::SystemTime}; use trustify_module_storage::service::dispatch::DispatchBackend; @@ -20,9 +20,39 @@ pub struct Runner { pub options: Options, } +#[derive(Clone)] pub enum Database { Config { url: String, schema: Option }, - Provided(sea_orm::DatabaseConnection), + Provided(DatabaseConnection), +} + +impl From for Database { + fn from(value: DatabaseConnection) -> Self { + Self::Provided(value) + } +} + +impl From for Database { + fn from(value: trustify_common::db::Database) -> Self { + Self::Provided(value.into_connection()) + } +} + +impl Database { + pub async fn try_into_connection(self) -> Result { + Ok(match self { + Self::Config { url, schema } => { + let schema = schema.clone().unwrap_or_else(|| "public".to_owned()); + + let connect_options = ConnectOptions::new(url) + .set_schema_search_path(schema) + .to_owned(); + + sea_orm::Database::connect(connect_options).await? + } + Self::Provided(database) => database.clone(), + }) + } } impl Runner { @@ -41,21 +71,11 @@ impl Runner { running.push(migration); } - let database = match self.database { - Database::Config { url, schema } => { - let schema = schema.unwrap_or_else(|| "public".to_owned()); - - let connect_options = ConnectOptions::new(url) - .set_schema_search_path(schema) - .to_owned(); - - sea_orm::Database::connect(connect_options).await? - } - Database::Provided(database) => database, - }; + let database = self.database.clone().try_into_connection().await?; let manager = SchemaManager::new(database.into_schema_manager_connection()); - let manager = SchemaDataManager::new(&manager, &self.storage, &self.options); + let manager = + SchemaDataManager::new(&manager, Some(&database), &self.storage, &self.options); for run in running { tracing::info!(name = run.name(), "Running data migration"); diff --git a/migration/tests/data/m0002010.rs b/migration/tests/data/m0002010.rs index bf4c5e125..e9dc273eb 100644 --- a/migration/tests/data/m0002010.rs +++ b/migration/tests/data/m0002010.rs @@ -4,9 +4,9 @@ use migration::{ data::{Database, Direction, MigrationWithData, Options, Runner}, }; use sea_orm_migration::MigratorTrait; -use std::num::NonZeroUsize; use test_context::test_context; use test_log::test; +use tracing::log; use trustify_test_context::{TrustifyMigrationContext, commit, dump}; commit!(Commit("6d3ea814b4b44fe16ea8f21724dda5abb0fc7932")); @@ -66,13 +66,36 @@ dump!( ignore = "enable with: cargo test --features long_running" )] async fn performance(ctx: &TrustifyMigrationContext) -> Result<(), anyhow::Error> { + let migrations = vec![ + "m0002000_add_sbom_properties".into(), + "m0002010_add_advisory_scores".into(), + ]; + + // we simulate running the migrations out-of-band + + log::info!("Running data migrations out-of-band"); + + Runner { + database: ctx.db.clone().into(), + storage: ctx.storage.clone().into(), + direction: Default::default(), + migrations: migrations.clone(), + options: Default::default(), + } + .run::() + .await?; + + // now run the standard migration, skipping the out-of-band ones + + log::info!("Running migrations"); + MigrationWithData::run_with_test( ctx.storage.clone(), Options { - concurrent: NonZeroUsize::new(32).unwrap(), + skip: migrations, ..Options::default() }, - async { MigratorTest::up(&ctx.db, None).await }, + async { Migrator::up(&ctx.db, None).await }, ) .await?; From 929d3659c7d564feb8f437fcd3be10e459b955db Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 16 Mar 2026 16:29:44 +0100 Subject: [PATCH 18/22] chore: fix conditional compilation issues --- test-context/src/ctx/migration/mod.rs | 2 + .../src/ctx/migration/snapshot/mod.rs | 175 +++++++++--------- 2 files changed, 92 insertions(+), 85 deletions(-) diff --git a/test-context/src/ctx/migration/mod.rs b/test-context/src/ctx/migration/mod.rs index ec0471162..965553640 100644 --- a/test-context/src/ctx/migration/mod.rs +++ b/test-context/src/ctx/migration/mod.rs @@ -163,6 +163,7 @@ impl TrustifyMigrationContext { base, db_file: "dump.sql.xz".to_string(), storage_file: "dump.tar".to_string(), + #[cfg(target_os = "linux")] snapshot_file: None, strip: 0, fix_zstd: false, @@ -197,6 +198,7 @@ impl TrustifyMigrationContext { base, db_file: db_file.to_string(), storage_file: storage_file.to_string(), + #[cfg(target_os = "linux")] snapshot_file: snapshot_file.map(ToOwned::to_owned), strip, fix_zstd, diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs index 6e1ead65c..d8ea299b8 100644 --- a/test-context/src/ctx/migration/snapshot/mod.rs +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -3,7 +3,6 @@ pub mod btrfs; use crate::{ TrustifyTestContext, - ctx::migration::snapshot::btrfs::SnapshotProvider, resource::{TestResourceExt, defer}, }; use anyhow::Context; @@ -26,95 +25,13 @@ pub struct Snapshot { pub base: PathBuf, pub db_file: String, pub storage_file: String, + #[cfg(target_os = "linux")] pub snapshot_file: Option, pub strip: usize, pub fix_zstd: bool, } -#[cfg(not(target_os = "linux"))] -impl Snapshot { - pub async fn materialize(self) -> anyhow::Result { - let tmp = tempfile::TempDir::new()?; - let (db, storage, psql) = self.setup(&tmp).await?; - - Ok(TrustifyTestContext::new( - db, - psql.settings().port, - storage, - defer(psql).then(defer(tmp)), - )) - } - - pub fn is_supported() -> bool { - false - } -} - -#[cfg(target_os = "linux")] impl Snapshot { - pub fn is_supported() -> bool { - btrfs::is_supported() - } - - /// Ensure that a snapshot, with data, is available - /// - /// Either by running with a fresh import in a temporary directory. Or, if available, with - /// a BTRFS snapshot of an import. If such a snapshot doesn't exist yet, it will be created - /// first. - pub async fn materialize(self) -> anyhow::Result { - let running = btrfs::Running::new(SnapshotProvider { - id: self.id.clone(), - file: self.snapshot_file.as_ref().map(|file| self.base.join(file)), - }) - .await?; - - log::info!("Snapshot state: {running:?}"); - - Ok(match running { - // We are running with a normal, temporary directory - btrfs::Running::Temporary(tmp) => { - // set up the content in the target directory - let (db, storage, psql) = self.setup(&tmp).await?; - - TrustifyTestContext::new( - db, - psql.settings().port, - storage, - defer(psql).then(defer(tmp)), - ) - } - // We are running with an existing snapshot, just enable it - btrfs::Running::Existing(snapshot) => { - // activate the snapshot, starts the database - let started = snapshot.start().await?; - - TrustifyTestContext::new( - started.db().clone(), - started.settings().port, - started.storage().clone(), - started, - ) - } - // We need to create the snapshot first, then run it - btrfs::Running::Collecting(collect) => { - // set up the content in preparation directory - let (_, _, psql) = self.setup(&collect).await?; - - // create the snapshot - let snapshot = collect.create(psql).await?; - // and activate it - let started = snapshot.start().await?; - - TrustifyTestContext::new( - started.db().clone(), - started.settings().port, - started.storage().clone(), - started, - ) - } - }) - } - /// This performs the actual DB and storage preparation async fn setup( self, @@ -130,7 +47,8 @@ impl Snapshot { base, db_file, storage_file, - snapshot_file: _, // we don't work with the snapshot file + #[cfg(target_os = "linux")] + snapshot_file: _, // we don't work with the snapshot file strip, fix_zstd, } = self; @@ -226,3 +144,90 @@ impl Snapshot { Ok((db, storage, postgresql)) } } + +#[cfg(not(target_os = "linux"))] +impl Snapshot { + pub async fn materialize(self) -> anyhow::Result { + let tmp = tempfile::TempDir::new()?; + let (db, storage, psql) = self.setup(&tmp).await?; + + Ok(TrustifyTestContext::new( + db, + psql.settings().port, + storage, + defer(psql).then(defer(tmp)), + )) + } + + pub fn is_supported() -> bool { + false + } +} + +#[cfg(target_os = "linux")] +impl Snapshot { + pub fn is_supported() -> bool { + btrfs::is_supported() + } + + /// Ensure that a snapshot, with data, is available + /// + /// Either by running with a fresh import in a temporary directory. Or, if available, with + /// a BTRFS snapshot of an import. If such a snapshot doesn't exist yet, it will be created + /// first. + pub async fn materialize(self) -> anyhow::Result { + use crate::ctx::migration::snapshot::btrfs::SnapshotProvider; + + let running = btrfs::Running::new(SnapshotProvider { + id: self.id.clone(), + file: self.snapshot_file.as_ref().map(|file| self.base.join(file)), + }) + .await?; + + log::info!("Snapshot state: {running:?}"); + + Ok(match running { + // We are running with a normal, temporary directory + btrfs::Running::Temporary(tmp) => { + // set up the content in the target directory + let (db, storage, psql) = self.setup(&tmp).await?; + + TrustifyTestContext::new( + db, + psql.settings().port, + storage, + defer(psql).then(defer(tmp)), + ) + } + // We are running with an existing snapshot, just enable it + btrfs::Running::Existing(snapshot) => { + // activate the snapshot, starts the database + let started = snapshot.start().await?; + + TrustifyTestContext::new( + started.db().clone(), + started.settings().port, + started.storage().clone(), + started, + ) + } + // We need to create the snapshot first, then run it + btrfs::Running::Collecting(collect) => { + // set up the content in preparation directory + let (_, _, psql) = self.setup(&collect).await?; + + // create the snapshot + let snapshot = collect.create(psql).await?; + // and activate it + let started = snapshot.start().await?; + + TrustifyTestContext::new( + started.db().clone(), + started.settings().port, + started.storage().clone(), + started, + ) + } + }) + } +} From 030658fb2361a5b08803709a8f57694fb19cc6db Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 16 Mar 2026 16:43:40 +0100 Subject: [PATCH 19/22] chore: drop num_cpus for Rust provided feature --- Cargo.lock | 1 - Cargo.toml | 1 - migration/Cargo.toml | 1 - migration/src/data/mod.rs | 4 ++-- 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 172f09e43..d55078d79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8322,7 +8322,6 @@ dependencies = [ "futures-util", "humantime", "indicatif", - "num_cpus", "osv", "sea-orm", "sea-orm-migration", diff --git a/Cargo.toml b/Cargo.toml index 757046541..afeab4b5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,6 @@ mime = "0.3.17" moka = "0.12.10" native-tls = "0.2" num-traits = "0.2" -num_cpus = "1" oci-client = "0.16.0" openid = "0.22.0" openssl = "0.10" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 7682d646a..0c1a0c86f 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -28,7 +28,6 @@ futures = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } indicatif = { workspace = true, features = ["tokio", "futures"] } -num_cpus = { workspace = true } osv = { workspace = true, features = ["schema"] } sea-orm = { workspace = true } sea-orm-migration = { workspace = true, features = ["runtime-tokio-rustls", "sqlx-postgres", "with-uuid"] } diff --git a/migration/src/data/mod.rs b/migration/src/data/mod.rs index 56b2efcbd..4ba138544 100644 --- a/migration/src/data/mod.rs +++ b/migration/src/data/mod.rs @@ -15,7 +15,7 @@ use futures_util::{ use indicatif::{ProgressBar, ProgressStyle}; use sea_orm::{ConnectionTrait, DatabaseTransaction, DbErr, TransactionTrait}; use sea_orm_migration::{MigrationTrait, SchemaManager}; -use std::{num::NonZeroU64, sync::Arc}; +use std::{num::NonZeroU64, num::NonZeroUsize, sync::Arc, thread::available_parallelism}; use tracing::log; use trustify_module_storage::service::dispatch::DispatchBackend; @@ -200,7 +200,7 @@ impl<'c> DocumentProcessor for SchemaManager<'c> { let mut concurrent = options.concurrent; if concurrent == 0 { // if zero, use number of logical CPUs - concurrent = num_cpus::get(); + concurrent = available_parallelism().map(NonZeroUsize::get).unwrap_or(1); } log::info!("Running {concurrent} parallel operations"); From a57666251d7ae08f5cc67b4ceb83373a9233515b Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 17 Mar 2026 09:13:23 +0100 Subject: [PATCH 20/22] chore: put TMPDIR on /mnt Tests with larger dataset run out of disk space. --- .github/workflows/ci.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8ca9d04aa..4232c9191 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -109,6 +109,11 @@ jobs: run: | docker compose -f etc/deploy/compose/compose-minio.yaml up -d --wait + - name: Create tmp dir + run: | + sudo mkdir /mnt/trustify + sudo chmod a+rwx /mnt/trustify + - name: Test run: cargo test --all-features env: @@ -116,6 +121,7 @@ jobs: RUST_LOG: info,sqlx=error,sea_orm=error TRUSTIFY_S3_AWS_REGION: eu-west-1 TRUSTIFY_S3_AWS_BUCKET: guacsec-migration-dumps + TMPDIR: /mnt/trustify - name: Export and Validate Generated Openapi Spec run: | From 9640b5a242801fedcfca8119a4e026e901177bf2 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 17 Mar 2026 09:16:05 +0100 Subject: [PATCH 21/22] chore: align defaults --- migration/src/data/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/migration/src/data/mod.rs b/migration/src/data/mod.rs index 4ba138544..ac39f07a6 100644 --- a/migration/src/data/mod.rs +++ b/migration/src/data/mod.rs @@ -40,7 +40,7 @@ where #[derive(Clone, Debug, PartialEq, Eq, clap::Parser)] pub struct Options { - /// Number of concurrent documents being processes + /// Number of concurrent documents being processed /// /// If the value is zero, use the number of logical CPUs #[arg(long, env = "MIGRATION_DATA_CONCURRENT", default_value = "0")] @@ -70,7 +70,7 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { - concurrent: 5, + concurrent: 0, current: 0, total: unsafe { NonZeroU64::new_unchecked(1) }, skip_all: false, From 5f2702e955ca3275e5a384fbd73e3df1956811ae Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Tue, 17 Mar 2026 09:18:54 +0100 Subject: [PATCH 22/22] docs: document some design decisions --- test-context/src/ctx/migration/snapshot/mod.rs | 2 ++ test-context/src/migration.rs | 7 ++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/test-context/src/ctx/migration/snapshot/mod.rs b/test-context/src/ctx/migration/snapshot/mod.rs index d8ea299b8..98195cc1c 100644 --- a/test-context/src/ctx/migration/snapshot/mod.rs +++ b/test-context/src/ctx/migration/snapshot/mod.rs @@ -98,6 +98,8 @@ impl Snapshot { log::info!("Fixing zstd EOF markers"); const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00]; + // this is using sync FS operations, which is not perfect, but ok considering this is + // test only code for entry in WalkDir::new(tmp_storage) { let entry = entry.context("failed to walk storage directory")?; if entry.file_type().is_file() diff --git a/test-context/src/migration.rs b/test-context/src/migration.rs index 4a7cc59d3..c6d35a76f 100644 --- a/test-context/src/migration.rs +++ b/test-context/src/migration.rs @@ -385,9 +385,10 @@ impl Dumps { self.with_lock(&base, async || { // download files - let (existing, missing) = files - .iter() - .partition::, _>(|file| base.join(file).exists()); + let (existing, missing) = files.iter().partition::, _>(|file| { + // we're using a block call here, this keeps things simple, and it's only test code + base.join(file).exists() + }); download_artifacts_raw(&client, &base, url, digests, &missing, false).await?;