Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2
Binary file not shown.
Binary file added etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2
Binary file not shown.
25 changes: 3 additions & 22 deletions modules/ingestor/src/graph/purl/creator.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::graph::error::Error;
use crate::graph::{error::Error, purl};
use sea_orm::{ActiveValue::Set, ConnectionTrait, EntityTrait};
use sea_query::OnConflict;
use std::collections::{BTreeMap, HashSet};
use tracing::instrument;
use trustify_common::{db::chunk::EntityChunkedIter, purl::Purl};
use trustify_entity::{
base_purl,
qualified_purl::{self, Qualifiers},
versioned_purl,
};
Expand Down Expand Up @@ -35,23 +34,15 @@ impl PurlCreator {
return Ok(());
}

// insert all packages
// Use the shared helper to batch create base PURLs
purl::batch_create_base_purls(self.purls.iter().cloned(), db).await?;

let mut packages = BTreeMap::new();
let mut versions = BTreeMap::new();
let mut qualifieds = BTreeMap::new();

for purl in self.purls {
let cp = purl.clone().into();
let (package, version, qualified) = purl.uuids();
packages
.entry(package)
.or_insert_with(|| base_purl::ActiveModel {
id: Set(package),
r#type: Set(purl.ty),
namespace: Set(purl.namespace),
name: Set(purl.name),
});

versions
.entry(version)
Expand All @@ -71,16 +62,6 @@ impl PurlCreator {
});
}

// insert packages

for batch in &packages.into_values().chunked() {
base_purl::Entity::insert_many(batch)
.on_conflict(OnConflict::new().do_nothing().to_owned())
.do_nothing()
.exec_without_returning(db)
.await?;
}

// insert all package versions

for batch in &versions.into_values().chunked() {
Expand Down
45 changes: 42 additions & 3 deletions modules/ingestor/src/graph/purl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
pub mod creator;
pub mod package_version;
pub mod qualified_package;
pub mod status_creator;

use crate::graph::{Graph, error::Error};
use package_version::PackageVersionContext;
use qualified_package::QualifiedPackageContext;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set, prelude::Uuid,
};
use sea_query::SelectStatement;
use std::fmt::{Debug, Formatter};
use sea_query::{OnConflict, SelectStatement};
use std::{
collections::BTreeMap,
fmt::{Debug, Formatter},
};
use tracing::instrument;
use trustify_common::{
db::limiter::LimiterTrait,
db::{chunk::EntityChunkedIter, limiter::LimiterTrait},
model::{Paginated, PaginatedResults},
purl::{Purl, PurlErr},
};
Expand Down Expand Up @@ -327,6 +331,41 @@ impl<'g> PackageContext<'g> {
}
}

/// Batch create base PURLs (without versions or qualifiers).
///
/// This helper function efficiently creates multiple base_purl entries in batches,
/// handling duplicates via ON CONFLICT DO NOTHING. It's used by both PurlCreator
/// and advisory loaders to avoid code duplication.
pub async fn batch_create_base_purls<C: ConnectionTrait>(
purls: impl IntoIterator<Item = Purl>,
connection: &C,
) -> Result<(), Error> {
let mut packages = BTreeMap::new();

for purl in purls {
let package = purl.package_uuid();
packages
.entry(package)
.or_insert_with(|| entity::base_purl::ActiveModel {
id: Set(package),
r#type: Set(purl.ty),
namespace: Set(purl.namespace),
name: Set(purl.name),
});
}

// Batch insert packages
for batch in &packages.into_values().chunked() {
entity::base_purl::Entity::insert_many(batch)
.on_conflict(OnConflict::new().do_nothing().to_owned())
.do_nothing()
.exec(connection)
.await?;
}

Ok(())
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
Expand Down
135 changes: 135 additions & 0 deletions modules/ingestor/src/graph/purl/status_creator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::graph::{
advisory::{purl_status::PurlStatus, version::VersionInfo},
error::Error,
};
use sea_orm::{ActiveValue::Set, ConnectionTrait, EntityTrait, QueryFilter};
use sea_query::{Expr, OnConflict, PgFunc};
use std::collections::{BTreeMap, BTreeSet};
use tracing::instrument;
use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl};
use trustify_entity::{purl_status, status, version_range};
use uuid::Uuid;

/// Input data for creating a PURL status entry
#[derive(Clone, Debug)]
pub struct PurlStatusEntry {
pub advisory_id: Uuid,
pub vulnerability_id: String,
pub purl: Purl,
pub status: String,
pub version_info: VersionInfo,
pub context_cpe: Option<Cpe>,
}

/// Creator for batch insertion of PURL statuses
///
/// Follows the Creator pattern used by PurlCreator, CpeCreator, etc.
/// Collects PURL status entries and creates them in batches to avoid
/// N+1 query problems and race conditions.
#[derive(Default)]
pub struct PurlStatusCreator {
entries: Vec<PurlStatusEntry>,
}

impl PurlStatusCreator {
pub fn new() -> Self {
Self::default()
}

/// Add a PURL status entry to be created
pub fn add(&mut self, entry: &PurlStatusEntry) {
self.entries.push(entry.clone());
}

/// Create all collected PURL statuses in batches
#[instrument(skip_all, fields(num = self.entries.len()), err(level=tracing::Level::INFO))]
pub async fn create<C>(self, connection: &C) -> Result<(), Error>
where
C: ConnectionTrait,
{
if self.entries.is_empty() {
return Ok(());
}

// 1. Batch lookup all unique status slugs
let unique_statuses: Vec<String> = self
.entries
.iter()
.map(|e| e.status.clone())
.collect::<BTreeSet<_>>()
.into_iter()
.collect();

let status_models = status::Entity::find()
.filter(Expr::col(status::Column::Slug).eq(PgFunc::any(unique_statuses)))
.all(connection)
.await?;

let status_map: BTreeMap<String, Uuid> = status_models
.into_iter()
.map(|s| (s.slug.clone(), s.id))
.collect();

// 2. Deduplicate and build ActiveModels
let mut version_ranges = BTreeMap::new();
let mut purl_statuses = BTreeMap::new();

for entry in self.entries {
// Validate status exists
let status_id = *status_map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Error variant Error::InvalidStatus may benefit from more context.

Consider adding relevant fields (e.g., advisory_id or purl) to Error::InvalidStatus to make debugging easier.

Suggested implementation:

                .get(&entry.status)
                .ok_or_else(|| Error::InvalidStatus {
                    status: entry.status.clone(),
                    advisory_id: entry.advisory_id.clone(),
                    purl: entry.purl.clone(),
                })?;

You will also need to:

  1. Update the definition of Error::InvalidStatus in your error enum (likely in a file like modules/ingestor/src/error.rs or similar) to be a struct variant with fields: status: String, advisory_id: AdvisoryIdType, purl: PurlType (replace types as appropriate).
  2. Update any Display, Debug, or error handling implementations to format and use these new fields.
  3. Update any other code that constructs or matches on Error::InvalidStatus to use the new struct variant.

.get(&entry.status)
.ok_or_else(|| Error::InvalidStatus(entry.status.clone()))?;

// Create PurlStatus and use its uuid() method
let purl_status = PurlStatus {
cpe: entry.context_cpe.clone(),
purl: entry.purl.clone(),
status: status_id,
info: entry.version_info.clone(),
};

let uuid = purl_status.uuid(entry.advisory_id, entry.vulnerability_id.clone());
let base_purl_id = entry.purl.package_uuid();
let version_range_id = entry.version_info.uuid();
let context_cpe_id = entry.context_cpe.as_ref().map(|cpe| cpe.uuid());

// Deduplicate version ranges
version_ranges
.entry(version_range_id)
.or_insert_with(|| entry.version_info.clone().into_active_model());

// Deduplicate purl_statuses by UUID
purl_statuses
.entry(uuid)
.or_insert_with(|| purl_status::ActiveModel {
id: Set(uuid),
advisory_id: Set(entry.advisory_id),
vulnerability_id: Set(entry.vulnerability_id.clone()),
status_id: Set(status_id),
base_purl_id: Set(base_purl_id),
version_range_id: Set(version_range_id),
context_cpe_id: Set(context_cpe_id),
});
}

// 3. Batch insert version ranges
for batch in &version_ranges.into_values().chunked() {
version_range::Entity::insert_many(batch)
.on_conflict(OnConflict::new().do_nothing().to_owned())
.do_nothing()
.exec_without_returning(connection)
.await?;
}

// 4. Batch insert purl_statuses
for batch in &purl_statuses.into_values().chunked() {
purl_status::Entity::insert_many(batch)
.on_conflict(OnConflict::new().do_nothing().to_owned())
.do_nothing()
.exec_without_returning(connection)
.await?;
}

Ok(())
}
}
58 changes: 39 additions & 19 deletions modules/ingestor/src/service/advisory/cve/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use crate::{
AdvisoryInformation, AdvisoryVulnerabilityInformation,
version::{Version, VersionInfo, VersionSpec},
},
purl::{
self,
status_creator::{PurlStatusCreator, PurlStatusEntry},
},
vulnerability::VulnerabilityInformation,
},
model::IngestResult,
Expand Down Expand Up @@ -114,9 +118,16 @@ impl<'g> CveLoader<'g> {
}
}

// Initialize batch creator for efficient status ingestion
let mut purl_status_creator = PurlStatusCreator::new();
let mut base_purls = std::collections::HashSet::new();

if let Some(affected) = affected {
for product in affected {
if let Some(purl) = divine_purl(product) {
// Collect base PURL for batch creation
base_purls.insert(purl.clone());

// okay! we have a purl, now
// sort out version bounds & status
for version in &product.versions {
Expand Down Expand Up @@ -146,30 +157,39 @@ impl<'g> CveLoader<'g> {
},
};

advisory_vuln
.ingest_package_status(
None,
&purl,
match status {
Status::Affected => "affected",
Status::Unaffected => "not_affected",
Status::Unknown => "unknown",
},
VersionInfo {
scheme: version_type
.as_deref()
.map(VersionScheme::from)
.unwrap_or(VersionScheme::Generic),
spec: version_spec,
},
&tx,
)
.await?
// Add package status entry to batch creator
purl_status_creator.add(&PurlStatusEntry {
advisory_id: advisory_vuln.advisory.advisory.id,
vulnerability_id: advisory_vuln
.advisory_vulnerability
.vulnerability_id
.clone(),
purl: purl.clone(),
status: match status {
Status::Affected => "affected".to_string(),
Status::Unaffected => "not_affected".to_string(),
Status::Unknown => "unknown".to_string(),
},
version_info: VersionInfo {
scheme: version_type
.as_deref()
.map(VersionScheme::from)
.unwrap_or(VersionScheme::Generic),
spec: version_spec,
},
context_cpe: None,
});
}
}
}
}

// Batch create base PURLs (without versions/qualifiers)
purl::batch_create_base_purls(base_purls, &tx).await?;

// Batch create statuses
purl_status_creator.create(&tx).await?;

vulnerability
.drop_descriptions_for_advisory(advisory.advisory.id, &tx)
.await?;
Expand Down
Loading