diff --git a/etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2 b/etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2 new file mode 100644 index 000000000..a9acb0f56 Binary files /dev/null and b/etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2 differ diff --git a/etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2 b/etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2 new file mode 100644 index 000000000..e17e633aa Binary files /dev/null and b/etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2 differ diff --git a/modules/ingestor/src/graph/purl/creator.rs b/modules/ingestor/src/graph/purl/creator.rs index 41a4d9670..35912f81d 100644 --- a/modules/ingestor/src/graph/purl/creator.rs +++ b/modules/ingestor/src/graph/purl/creator.rs @@ -1,11 +1,10 @@ -use crate::graph::error::Error; +use crate::graph::{error::Error, purl}; use sea_orm::{ActiveValue::Set, ConnectionTrait, EntityTrait}; use sea_query::OnConflict; use std::collections::{BTreeMap, HashSet}; use tracing::instrument; use trustify_common::{db::chunk::EntityChunkedIter, purl::Purl}; use trustify_entity::{ - base_purl, qualified_purl::{self, Qualifiers}, versioned_purl, }; @@ -35,23 +34,15 @@ impl PurlCreator { return Ok(()); } - // insert all packages + // Use the shared helper to batch create base PURLs + purl::batch_create_base_purls(self.purls.iter().cloned(), db).await?; - let mut packages = BTreeMap::new(); let mut versions = BTreeMap::new(); let mut qualifieds = BTreeMap::new(); for purl in self.purls { let cp = purl.clone().into(); let (package, version, qualified) = purl.uuids(); - packages - .entry(package) - .or_insert_with(|| base_purl::ActiveModel { - id: Set(package), - r#type: Set(purl.ty), - namespace: Set(purl.namespace), - name: Set(purl.name), - }); versions .entry(version) @@ -71,16 +62,6 @@ impl PurlCreator { }); } - // insert packages - - for batch in &packages.into_values().chunked() { - base_purl::Entity::insert_many(batch) - .on_conflict(OnConflict::new().do_nothing().to_owned()) - .do_nothing() - .exec_without_returning(db) - .await?; - } - // insert all package versions for batch in &versions.into_values().chunked() { diff --git a/modules/ingestor/src/graph/purl/mod.rs b/modules/ingestor/src/graph/purl/mod.rs index f8f50f9ff..5969cbadd 100644 --- a/modules/ingestor/src/graph/purl/mod.rs +++ b/modules/ingestor/src/graph/purl/mod.rs @@ -3,6 +3,7 @@ pub mod creator; pub mod package_version; pub mod qualified_package; +pub mod status_creator; use crate::graph::{Graph, error::Error}; use package_version::PackageVersionContext; @@ -10,11 +11,14 @@ use qualified_package::QualifiedPackageContext; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set, prelude::Uuid, }; -use sea_query::SelectStatement; -use std::fmt::{Debug, Formatter}; +use sea_query::{OnConflict, SelectStatement}; +use std::{ + collections::BTreeMap, + fmt::{Debug, Formatter}, +}; use tracing::instrument; use trustify_common::{ - db::limiter::LimiterTrait, + db::{chunk::EntityChunkedIter, limiter::LimiterTrait}, model::{Paginated, PaginatedResults}, purl::{Purl, PurlErr}, }; @@ -327,6 +331,41 @@ impl<'g> PackageContext<'g> { } } +/// Batch create base PURLs (without versions or qualifiers). +/// +/// This helper function efficiently creates multiple base_purl entries in batches, +/// handling duplicates via ON CONFLICT DO NOTHING. It's used by both PurlCreator +/// and advisory loaders to avoid code duplication. +pub async fn batch_create_base_purls( + purls: impl IntoIterator, + connection: &C, +) -> Result<(), Error> { + let mut packages = BTreeMap::new(); + + for purl in purls { + let package = purl.package_uuid(); + packages + .entry(package) + .or_insert_with(|| entity::base_purl::ActiveModel { + id: Set(package), + r#type: Set(purl.ty), + namespace: Set(purl.namespace), + name: Set(purl.name), + }); + } + + // Batch insert packages + for batch in &packages.into_values().chunked() { + entity::base_purl::Entity::insert_many(batch) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .do_nothing() + .exec(connection) + .await?; + } + + Ok(()) +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { diff --git a/modules/ingestor/src/graph/purl/status_creator.rs b/modules/ingestor/src/graph/purl/status_creator.rs new file mode 100644 index 000000000..a0eacc326 --- /dev/null +++ b/modules/ingestor/src/graph/purl/status_creator.rs @@ -0,0 +1,135 @@ +use crate::graph::{ + advisory::{purl_status::PurlStatus, version::VersionInfo}, + error::Error, +}; +use sea_orm::{ActiveValue::Set, ConnectionTrait, EntityTrait, QueryFilter}; +use sea_query::{Expr, OnConflict, PgFunc}; +use std::collections::{BTreeMap, BTreeSet}; +use tracing::instrument; +use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl}; +use trustify_entity::{purl_status, status, version_range}; +use uuid::Uuid; + +/// Input data for creating a PURL status entry +#[derive(Clone, Debug)] +pub struct PurlStatusEntry { + pub advisory_id: Uuid, + pub vulnerability_id: String, + pub purl: Purl, + pub status: String, + pub version_info: VersionInfo, + pub context_cpe: Option, +} + +/// Creator for batch insertion of PURL statuses +/// +/// Follows the Creator pattern used by PurlCreator, CpeCreator, etc. +/// Collects PURL status entries and creates them in batches to avoid +/// N+1 query problems and race conditions. +#[derive(Default)] +pub struct PurlStatusCreator { + entries: Vec, +} + +impl PurlStatusCreator { + pub fn new() -> Self { + Self::default() + } + + /// Add a PURL status entry to be created + pub fn add(&mut self, entry: &PurlStatusEntry) { + self.entries.push(entry.clone()); + } + + /// Create all collected PURL statuses in batches + #[instrument(skip_all, fields(num = self.entries.len()), err(level=tracing::Level::INFO))] + pub async fn create(self, connection: &C) -> Result<(), Error> + where + C: ConnectionTrait, + { + if self.entries.is_empty() { + return Ok(()); + } + + // 1. Batch lookup all unique status slugs + let unique_statuses: Vec = self + .entries + .iter() + .map(|e| e.status.clone()) + .collect::>() + .into_iter() + .collect(); + + let status_models = status::Entity::find() + .filter(Expr::col(status::Column::Slug).eq(PgFunc::any(unique_statuses))) + .all(connection) + .await?; + + let status_map: BTreeMap = status_models + .into_iter() + .map(|s| (s.slug.clone(), s.id)) + .collect(); + + // 2. Deduplicate and build ActiveModels + let mut version_ranges = BTreeMap::new(); + let mut purl_statuses = BTreeMap::new(); + + for entry in self.entries { + // Validate status exists + let status_id = *status_map + .get(&entry.status) + .ok_or_else(|| Error::InvalidStatus(entry.status.clone()))?; + + // Create PurlStatus and use its uuid() method + let purl_status = PurlStatus { + cpe: entry.context_cpe.clone(), + purl: entry.purl.clone(), + status: status_id, + info: entry.version_info.clone(), + }; + + let uuid = purl_status.uuid(entry.advisory_id, entry.vulnerability_id.clone()); + let base_purl_id = entry.purl.package_uuid(); + let version_range_id = entry.version_info.uuid(); + let context_cpe_id = entry.context_cpe.as_ref().map(|cpe| cpe.uuid()); + + // Deduplicate version ranges + version_ranges + .entry(version_range_id) + .or_insert_with(|| entry.version_info.clone().into_active_model()); + + // Deduplicate purl_statuses by UUID + purl_statuses + .entry(uuid) + .or_insert_with(|| purl_status::ActiveModel { + id: Set(uuid), + advisory_id: Set(entry.advisory_id), + vulnerability_id: Set(entry.vulnerability_id.clone()), + status_id: Set(status_id), + base_purl_id: Set(base_purl_id), + version_range_id: Set(version_range_id), + context_cpe_id: Set(context_cpe_id), + }); + } + + // 3. Batch insert version ranges + for batch in &version_ranges.into_values().chunked() { + version_range::Entity::insert_many(batch) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .do_nothing() + .exec_without_returning(connection) + .await?; + } + + // 4. Batch insert purl_statuses + for batch in &purl_statuses.into_values().chunked() { + purl_status::Entity::insert_many(batch) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .do_nothing() + .exec_without_returning(connection) + .await?; + } + + Ok(()) + } +} diff --git a/modules/ingestor/src/service/advisory/cve/loader.rs b/modules/ingestor/src/service/advisory/cve/loader.rs index 2c6206270..a8ed0131b 100644 --- a/modules/ingestor/src/service/advisory/cve/loader.rs +++ b/modules/ingestor/src/service/advisory/cve/loader.rs @@ -5,6 +5,10 @@ use crate::{ AdvisoryInformation, AdvisoryVulnerabilityInformation, version::{Version, VersionInfo, VersionSpec}, }, + purl::{ + self, + status_creator::{PurlStatusCreator, PurlStatusEntry}, + }, vulnerability::VulnerabilityInformation, }, model::IngestResult, @@ -114,9 +118,16 @@ impl<'g> CveLoader<'g> { } } + // Initialize batch creator for efficient status ingestion + let mut purl_status_creator = PurlStatusCreator::new(); + let mut base_purls = std::collections::HashSet::new(); + if let Some(affected) = affected { for product in affected { if let Some(purl) = divine_purl(product) { + // Collect base PURL for batch creation + base_purls.insert(purl.clone()); + // okay! we have a purl, now // sort out version bounds & status for version in &product.versions { @@ -146,30 +157,39 @@ impl<'g> CveLoader<'g> { }, }; - advisory_vuln - .ingest_package_status( - None, - &purl, - match status { - Status::Affected => "affected", - Status::Unaffected => "not_affected", - Status::Unknown => "unknown", - }, - VersionInfo { - scheme: version_type - .as_deref() - .map(VersionScheme::from) - .unwrap_or(VersionScheme::Generic), - spec: version_spec, - }, - &tx, - ) - .await? + // Add package status entry to batch creator + purl_status_creator.add(&PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: match status { + Status::Affected => "affected".to_string(), + Status::Unaffected => "not_affected".to_string(), + Status::Unknown => "unknown".to_string(), + }, + version_info: VersionInfo { + scheme: version_type + .as_deref() + .map(VersionScheme::from) + .unwrap_or(VersionScheme::Generic), + spec: version_spec, + }, + context_cpe: None, + }); } } } } + // Batch create base PURLs (without versions/qualifiers) + purl::batch_create_base_purls(base_purls, &tx).await?; + + // Batch create statuses + purl_status_creator.create(&tx).await?; + vulnerability .drop_descriptions_for_advisory(advisory.advisory.id, &tx) .await?; diff --git a/modules/ingestor/src/service/advisory/osv/loader.rs b/modules/ingestor/src/service/advisory/osv/loader.rs index 206ba4dac..6e1b06f90 100644 --- a/modules/ingestor/src/service/advisory/osv/loader.rs +++ b/modules/ingestor/src/service/advisory/osv/loader.rs @@ -6,7 +6,11 @@ use crate::{ advisory_vulnerability::AdvisoryVulnerabilityContext, version::{Version, VersionInfo, VersionSpec}, }, - purl::creator::PurlCreator, + purl::{ + self, + creator::PurlCreator, + status_creator::{PurlStatusCreator, PurlStatusEntry}, + }, }, model::IngestResult, service::{ @@ -16,8 +20,8 @@ use crate::{ }; use osv::schema::{Ecosystem, Event, Range, RangeType, ReferenceType, SeverityType, Vulnerability}; use sbom_walker::report::ReportSink; -use sea_orm::{ConnectionTrait, TransactionTrait}; -use std::{fmt::Debug, str::FromStr}; +use sea_orm::TransactionTrait; +use std::{collections::HashSet, fmt::Debug, str::FromStr}; use tracing::instrument; use trustify_common::{hashing::Digests, id::Id, purl::Purl, time::ChronoExt}; use trustify_cvss::cvss3::Cvss3Base; @@ -78,6 +82,8 @@ impl<'g> OsvLoader<'g> { } let mut purl_creator = PurlCreator::new(); + let mut purl_status_creator = PurlStatusCreator::new(); + let mut base_purls = HashSet::new(); for cve_id in cve_ids { self.graph.ingest_vulnerability(&cve_id, (), &tx).await?; @@ -135,144 +141,157 @@ impl<'g> OsvLoader<'g> { // iterate through the known versions, apply the version, and create them for version in affected.versions.iter().flatten() { purl_creator.add(purl.with_version(version)); - } - - // Process explicit versions for advisory linking - for version in affected.versions.iter().flatten() { - ingest_exact(&advisory_vuln, &purl, "affected", version, &tx).await?; + // Process explicit versions for advisory linking + purl_status_creator.add(&PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "affected".to_string(), + version_info: VersionInfo { + scheme: VersionScheme::Generic, + spec: VersionSpec::Exact(version.to_string()), + }, + context_cpe: None, + }); } for range in affected.ranges.iter().flatten() { + // Collect base PURL for range-based status entries + base_purls.insert(purl.clone()); + match (&range.range_type, &package.ecosystem) { (RangeType::Semver, _) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Semver, - &tx, - ) - .await?; + VersionScheme::Semver, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Git, _) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Git, - &tx, - ) - .await?; + VersionScheme::Git, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Maven(_)) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Maven, - &tx, - ) - .await?; + VersionScheme::Maven, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::PyPI | Ecosystem::Python) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Python, - &tx, - ) - .await?; + VersionScheme::Python, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Go) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Golang, - &tx, - ) - .await?; + VersionScheme::Golang, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Npm) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Npm, - &tx, - ) - .await?; + VersionScheme::Npm, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Packagist) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Packagist, - &tx, - ) - .await?; + VersionScheme::Packagist, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::NuGet) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::NuGet, - &tx, - ) - .await?; + VersionScheme::NuGet, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::RubyGems) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Gem, - &tx, - ) - .await?; + VersionScheme::Gem, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Hex) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Hex, - &tx, - ) - .await?; + VersionScheme::Hex, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::SwiftURL) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Swift, - &tx, - ) - .await?; + VersionScheme::Swift, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Pub) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Pub, - &tx, - ) - .await?; + VersionScheme::Pub, + ) { + purl_status_creator.add(&entry); + } } (_, _) => { - create_package_status_versions( + for entry in build_package_status_versions( &advisory_vuln, &purl, range, affected.versions.iter().flatten(), - &tx, - ) - .await? + ) { + purl_status_creator.add(&entry); + } } } } @@ -282,6 +301,11 @@ impl<'g> OsvLoader<'g> { purl_creator.create(&tx).await?; + // Create base PURLs for range-based status entries + purl::batch_create_base_purls(base_purls, &tx).await?; + + purl_status_creator.create(&tx).await?; + tx.commit().await?; Ok(IngestResult { @@ -292,16 +316,87 @@ impl<'g> OsvLoader<'g> { } } -/// create package statues based on listed versions -async fn create_package_status_versions( +/// Build package status entries from range events +fn build_package_status( advisory_vuln: &AdvisoryVulnerabilityContext<'_>, purl: &Purl, range: &Range, - versions: impl IntoIterator, - connection: &C, -) -> Result<(), Error> { + version_scheme: VersionScheme, +) -> Vec { + let mut entries = Vec::new(); + let parsed_range = events_to_range(&range.events); + + let spec = match &parsed_range { + (Some(start), None) => Some(VersionSpec::Range( + Version::Inclusive(start.clone()), + Version::Unbounded, + )), + (None, Some((end, false))) => Some(VersionSpec::Range( + Version::Unbounded, + Version::Exclusive(end.clone()), + )), + (None, Some((end, true))) => Some(VersionSpec::Range( + Version::Unbounded, + Version::Inclusive(end.clone()), + )), + (Some(start), Some((end, false))) => Some(VersionSpec::Range( + Version::Inclusive(start.clone()), + Version::Exclusive(end.clone()), + )), + (Some(start), Some((end, true))) => Some(VersionSpec::Range( + Version::Inclusive(start.clone()), + Version::Inclusive(end.clone()), + )), + (None, None) => None, + }; + + if let Some(spec) = spec { + entries.push(PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "affected".to_string(), + version_info: VersionInfo { + scheme: version_scheme, + spec, + }, + context_cpe: None, + }); + } + + if let (_, Some((fixed, false))) = &parsed_range { + entries.push(PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "fixed".to_string(), + version_info: VersionInfo { + scheme: version_scheme, + spec: VersionSpec::Exact(fixed.clone()), + }, + context_cpe: None, + }); + } + + entries +} + +/// Build package status entries based on listed versions +fn build_package_status_versions<'a>( + advisory_vuln: &AdvisoryVulnerabilityContext<'_>, + purl: &Purl, + range: &Range, + versions: impl IntoIterator, +) -> Vec { // the list of versions, sorted by the range type let versions = versions.into_iter().cloned().collect::>(); + let mut entries = Vec::new(); let mut start = None; for event in &range.events { @@ -311,19 +406,31 @@ async fn create_package_status_versions( } Event::Fixed(version) | Event::LastAffected(version) => { if let Some(start) = start.take() { - ingest_range_from( + entries.extend(build_range_from( advisory_vuln, purl, "affected", start, Some(version), &versions, - connection, - ) - .await?; + )); } - ingest_exact(advisory_vuln, purl, "fixed", version, connection).await?; + // Add "fixed" status + entries.push(PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "fixed".to_string(), + version_info: VersionInfo { + scheme: VersionScheme::Generic, + spec: VersionSpec::Exact(version.to_string()), + }, + context_cpe: None, + }); } Event::Limit(_) => {} // for non_exhaustive @@ -332,23 +439,21 @@ async fn create_package_status_versions( } if let Some(start) = start { - ingest_range_from( + entries.extend(build_range_from( advisory_vuln, purl, "affected", start, None, &versions, - connection, - ) - .await?; + )); } - Ok(()) + entries } -/// Ingest all from a start to an end -async fn ingest_range_from( +/// Build status entries for all versions from a start to an end +fn build_range_from( advisory_vuln: &AdvisoryVulnerabilityContext<'_>, purl: &Purl, status: &str, @@ -356,15 +461,26 @@ async fn ingest_range_from( // exclusive end end: Option<&str>, versions: &[impl AsRef], - connection: &C, -) -> Result<(), Error> { - let versions = match_versions(versions, start, end); - - for version in versions { - ingest_exact(advisory_vuln, purl, status, version, connection).await?; - } - - Ok(()) +) -> Vec { + let matched_versions = match_versions(versions, start, end); + + matched_versions + .into_iter() + .map(|version| PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: status.to_string(), + version_info: VersionInfo { + scheme: VersionScheme::Generic, + spec: VersionSpec::Exact(version.to_string()), + }, + context_cpe: None, + }) + .collect() } /// Extract a list of versions according to OSV @@ -401,95 +517,6 @@ fn match_versions<'v>( matches.unwrap_or_default() } -/// Ingest an exact version -async fn ingest_exact( - advisory_vuln: &AdvisoryVulnerabilityContext<'_>, - purl: &Purl, - status: &str, - version: &str, - connection: &C, -) -> Result<(), Error> { - Ok(advisory_vuln - .ingest_package_status( - None, - purl, - status, - VersionInfo { - scheme: VersionScheme::Generic, - spec: VersionSpec::Exact(version.to_string()), - }, - connection, - ) - .await?) -} - -/// create a package/purl status -async fn create_package_status( - advisory_vuln: &AdvisoryVulnerabilityContext<'_>, - purl: &Purl, - range: &Range, - version_scheme: &VersionScheme, - connection: &C, -) -> Result<(), Error> { - let parsed_range = events_to_range(&range.events); - - let spec = match &parsed_range { - (Some(start), None) => Some(VersionSpec::Range( - Version::Inclusive(start.clone()), - Version::Unbounded, - )), - (None, Some((end, false))) => Some(VersionSpec::Range( - Version::Unbounded, - Version::Exclusive(end.clone()), - )), - (None, Some((end, true))) => Some(VersionSpec::Range( - Version::Unbounded, - Version::Inclusive(end.clone()), - )), - (Some(start), Some((end, false))) => Some(VersionSpec::Range( - Version::Inclusive(start.clone()), - Version::Exclusive(end.clone()), - )), - (Some(start), Some((end, true))) => Some(VersionSpec::Range( - Version::Inclusive(start.clone()), - Version::Inclusive(end.clone()), - )), - (None, None) => None, - }; - - if let Some(spec) = spec { - advisory_vuln - .ingest_package_status( - None, - purl, - "affected", - VersionInfo { - scheme: *version_scheme, - spec, - }, - connection, - ) - .await?; - } - - if let (_, Some((fixed, false))) = &parsed_range { - advisory_vuln - .ingest_package_status( - None, - purl, - "fixed", - VersionInfo { - scheme: *version_scheme, - spec: VersionSpec::Exact(fixed.clone()), - }, - connection, - ) - .await? - } - - Ok(()) -} - fn detect_organization(osv: &Vulnerability) -> Option { if let Some(references) = &osv.references { let advisory_location = references diff --git a/modules/ingestor/tests/parallel.rs b/modules/ingestor/tests/parallel.rs index a42819082..42fc9f4e3 100644 --- a/modules/ingestor/tests/parallel.rs +++ b/modules/ingestor/tests/parallel.rs @@ -89,7 +89,7 @@ fn duplicate_sbom(spdx: &SPDX) -> anyhow::Result { /// Assert that all results are ok. Logs errors other of failures. fn assert_all_ok(num: usize, result: Vec>) { - assert_eq!(result.len(), num); + assert_eq!(num, result.len()); let ok = result .iter() @@ -102,7 +102,7 @@ fn assert_all_ok(num: usize, result: Vec>) { }) .count(); - assert_eq!(ok, num); + assert_eq!(num, ok); } /// Ingest x CSAF documents in parallel @@ -273,3 +273,33 @@ async fn license_creator(ctx: &TrustifyContext) -> Result<(), anyhow::Error> { Ok(()) } + +/// Ingest advisories in parallel +#[test_context(TrustifyContext)] +#[instrument] +#[test(tokio::test(flavor = "multi_thread", worker_threads = 4))] +async fn advisories_parallel(ctx: &TrustifyContext) -> Result<(), anyhow::Error> { + let mut data = vec![]; + data.push(document_bytes("osv/GHSA-gqp3-2cvr-x8m3.json.bz2").await?); + data.push(document_bytes("osv/GHSA-h3gc-qfqq-6h8f.json.bz2").await?); + let mut tasks = vec![]; + data.iter().for_each(|advisory| { + let next = advisory.clone(); + let service = ctx.ingestor.clone(); + tasks.push(async move { + service + .ingest(&next, Format::Advisory, (), None, Cache::Skip) + .await?; + Ok::<_, anyhow::Error>(()) + }); + }); + + // progress ingestion tasks + let result = futures::future::join_all(tasks).await; + + // now test + assert_all_ok(data.len(), result); + + // done + Ok(()) +}