From 3d45a55e7ad53a2805710bc119ba3f13fd195c5e Mon Sep 17 00:00:00 2001 From: mrizzi Date: Tue, 4 Nov 2025 22:03:46 +0100 Subject: [PATCH 1/3] fix: package ingestion race condition (TC-3152) Signed-off-by: mrizzi Assisted-by: Claude Code --- .../osv/GHSA-gqp3-2cvr-x8m3.json.bz2 | Bin 0 -> 1170 bytes .../osv/GHSA-h3gc-qfqq-6h8f.json.bz2 | Bin 0 -> 1391 bytes modules/ingestor/src/graph/purl/mod.rs | 99 +++++++++++++----- .../src/graph/purl/package_version.rs | 69 +++++++++--- modules/ingestor/tests/parallel.rs | 34 +++++- 5 files changed, 161 insertions(+), 41 deletions(-) create mode 100644 etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2 create mode 100644 etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2 diff --git a/etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2 b/etc/test-data/osv/GHSA-gqp3-2cvr-x8m3.json.bz2 new file mode 100644 index 0000000000000000000000000000000000000000..a9acb0f56c085c6d2b441c9b84dbc8874f11c8f1 GIT binary patch literal 1170 zcmV;D1a135T4*^jL0KkKSWA1dPx(Ln(Kap#tKuDwM6nWdg)b4_hTUS4tNPa#A5_(opobcuQ?_ju2{ zX4)Sa0a3A_=D;X-UT%H;f!xla=NmQSbo+TATbLX15S!P)5|+*}pcx?ons+-wj8Ya! zV=y!d8Ne(ez9|wB0&*14Da6>A;s_@bKv`+wz>q+Y#SqvtHG6XD3?_w`Y>s&OMTvSv z!W=K@(=!Y*sWM<^2&Iy87(8g9WVlxZK|HfdK#hnbi-{lWL|shW3rh1kg_sn=YxaWs zV46)K_6uKrYq9e&u*>Qd**t{4S@3nBsZf@u1DG~MFvT!`22sWgKFd1jOrwjkh(LoN z-Qs@!mSvU9w2Ymz@4NY7%LbaL(U#FVf{4`i8BKu0Sm*37ke@c!+HB3Pl{DhQi8G?O zJ=YYqC)TbuLbiIbG0!oB46L#SyKK7WUm!?0Fb!xZnWVaM2Hx-9cRJ~5S`)9AVDO3D z#`LLsI8Jy);Zev3i&&Vb88{&7?V0ivMF^T2 zGJ$nCEvk>+S0AP;(#rvCrJC{r1!oa`#rTXSwCZicy7l&MzZ<3B)9_pn5`nUmjJD2! zw4C?GEFJMRgzmi4{`H3u3KqwZsp}6%RY#lC+kvDx>X5lTK)U@}Pp48y*fzaF=AIf5 zIk<3|<`T{HHcVvn$Zy%AYUqLJ(WjD$OP2B#ysD++i#VzUvST)#HNj4l^;SgGHr?oH zEirfsM1-=L{;OoBcRMVbvKJLfzN6qUVP?t-3~k9ZiWG0J#6VBpxGsT1oua_EHG_;8 znSNr`VCqd`XyX}XJi2p@o1(2)RlN%ZCafZ5lL(2W7cjJ}0Wr^#%Pp~D48sXnHFoi6 zqX;X=94*EzSnHE)T1aH|Xd603f|Wyz+t^tNrKqte%_t`z(ms1P7PQ*~a78uu8%6_` zTgKE#XsKY+ava<$8FQHIFdF657L75A)tK>2xN16W&uQD>9WeGowA2D2kYQLVNtj$X zgol(W;AgO_*Uaw=fZhRi2Q@dBAdd;fYg{(9Is%b6hMM0N$asz}DFs;Zv@dED!;)MY zr>?_P4U2jkh3>beR#3pu;X`(gp^Zhm!+o%UO38u~YKMEUTIO6i8?fLz?3w#U9wr!Z zLx5|1hW)$H!FK>LnB;H`xkX+OmNPMLLh^W+k6;Y0BweO#rb|1F^!M1Q+q+d}gGHLO kDx?8$QO*Fx@Gsz4;7}kxmpOGpU$T0MOhZwg3PC literal 0 HcmV?d00001 diff --git a/etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2 b/etc/test-data/osv/GHSA-h3gc-qfqq-6h8f.json.bz2 new file mode 100644 index 0000000000000000000000000000000000000000..e17e633aaace4c7c0ae20680541b579aa9fe3679 GIT binary patch literal 1391 zcmV-#1(5neT4*^jL0KkKS!`m5X8;Nb-+)z6PzV3^KJVYYzwiJ0PzD}5mMS$yD_W#P zKslyHLna7l!eEUwXv7UNWMBXw(KRqCdrb*8f;2QV1JpDC0BN9T5=1}|kisS!MDmHF z(W&YU4^tD!k7`3f%`!3>FhfQY1Zks2AZe2$009D|h$TNlG-XdFMEz4V)M#kQp_3t? z4Fe`fMDu&JG^o8PNVDtrs*58G6A?(FQZN`Kx=V}(BEsVlWWZApz$_}5OR@`6LL*`l zPP8PP7()yLBd3=G<3u1saRbV3Q86vp=V|WX5Z}P#ro>x66zwdB7bik;@B7i2kDNv zDAdcdb`+&gOerldQ0>N9Fq3jx8unDv$UUQkQC!qd_09{lMr+Mf_OESKeDld8F4h+? zoINf3A`PPZ!%R~ztNQxw&_M$*odYD9nP1$OR0Zx+_qi_08)ok1K0}GKDv)}dPT6ya zz{Y5xga%9+RSeQO>tQ8-xl<*|ihvv;hJ2P0Scy9!0iBmlW_GT{u&T>BcD>J|2+TxH zIG%y7|8Cd1g{D@_X{=H-j~SePAdn3JNK0&eQ?tDt_0;{(tLRW{_}ad!tJIujwbuSpo<5-HUT)61n-zO?$3Om_u@Sn*D8egB%_|a zLr-zh-SzFWRun{?9Wf68ErBFO-sg5cPy`Do7_nZafCdaQsEGA+NwVX;7t(DqTyUWH zX`Lb5sXq(ybMJCQq+vYCJ~Lfz+a|jF><5MHwV1gB7q3)9V7OUQ84HZrhc`FVk2=Fg8O44f$&?OoyX z_Z=e^*GrFI9**sI?aSZU*=v%m#F}dzoT>U+s&qFK*<{oxb`BvqzsAk`1FI}J!G7*I z2H~G5x@J`>#5lRumt20xb`+(){%uc>QCL{jfo!aSv&59v=QZ%aPYuG-mW-GRE*Jee zm_+<-0*y$v<9RH9DRMYcODu7oAyHd*SX~pJJjO+~%~>3rbiPXJ)-AK%V3$lyLnc>j zNyp13Yz9N0RjuFUo{es9fxF|*+%TPS*`z9wR9NJZ&6?@TE^$vpx{je2s*sCITQ{ z#}pPS64;lYUcB>K1ae#ACr)>O&lsmE^QsRGKO}WL$L5RcFs>WURMv z>6lEtLZ=d?7^A3TvrCPRZ4rc= xvrxw_c{`i;Qh4Z&QCmCO{UlOPr0D_W;F%^uTcBpVo^*}%z0oQePd literal 0 HcmV?d00001 diff --git a/modules/ingestor/src/graph/purl/mod.rs b/modules/ingestor/src/graph/purl/mod.rs index f8f50f9ff..d7430adc0 100644 --- a/modules/ingestor/src/graph/purl/mod.rs +++ b/modules/ingestor/src/graph/purl/mod.rs @@ -8,7 +8,8 @@ use crate::graph::{Graph, error::Error}; use package_version::PackageVersionContext; use qualified_package::QualifiedPackageContext; use sea_orm::{ - ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set, prelude::Uuid, + ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, FromQueryResult, QueryFilter, + Statement, prelude::Uuid, }; use sea_query::SelectStatement; use std::fmt::{Debug, Formatter}; @@ -64,18 +65,45 @@ impl Graph { purl: &Purl, connection: &C, ) -> Result, Error> { - if let Some(found) = self.get_package(purl, connection).await? { - Ok(found) + let id = purl.package_uuid(); + + // Raw SQL required: SeaORM's .exec() with ON CONFLICT DO NOTHING doesn't support RETURNING, + // forcing a SELECT after every INSERT attempt (2 queries always). This approach uses + // RETURNING to get the row in 1 query on success, only doing a SELECT on conflict. + let sql_insert = r#" + INSERT INTO base_purl (id, type, namespace, name) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + RETURNING * + "#; + + let result = entity::base_purl::Model::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + sql_insert, + vec![ + id.into(), + purl.ty.clone().into(), + purl.namespace.as_deref().into(), + purl.name.clone().into(), + ], + )) + .one(connection) + .await?; + + // If INSERT returned None (conflict occurred), fetch the existing row + let result = if let Some(model) = result { + model } else { - let model = entity::base_purl::ActiveModel { - id: Set(purl.package_uuid()), - r#type: Set(purl.ty.clone()), - namespace: Set(purl.namespace.clone()), - name: Set(purl.name.clone()), - }; + // Use the deterministic id to fetch the exact row + entity::base_purl::Entity::find_by_id(id) + .one(connection) + .await? + .ok_or_else(|| { + Error::Any(anyhow::anyhow!("Failed to find base_purl after conflict")) + })? + }; - Ok(PackageContext::new(self, model.insert(connection).await?)) - } + Ok(PackageContext::new(self, result)) } /// Retrieve a *fully-qualified* package entry, if it exists. @@ -252,20 +280,43 @@ impl<'g> PackageContext<'g> { connection: &C, ) -> Result, Error> { if let Some(version) = &purl.version { - if let Some(found) = self.get_package_version(purl, connection).await? { - Ok(found) - } else { - let model = entity::versioned_purl::ActiveModel { - id: Set(purl.version_uuid()), - base_purl_id: Set(self.base_purl.id), - version: Set(version.clone()), - }; - - Ok(PackageVersionContext::new( - self, - model.insert(connection).await?, + let id = purl.version_uuid(); + + // Raw SQL required: SeaORM's .exec() with ON CONFLICT DO NOTHING doesn't support RETURNING, + // forcing a SELECT after every INSERT attempt (2 queries always). This approach uses + // RETURNING to get the row in 1 query on success, only doing a SELECT on conflict. + let sql_insert = r#" + INSERT INTO versioned_purl (id, base_purl_id, version) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + RETURNING * + "#; + + let result = + entity::versioned_purl::Model::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + sql_insert, + vec![id.into(), self.base_purl.id.into(), version.clone().into()], )) - } + .one(connection) + .await?; + + // If INSERT returned None (conflict occurred), fetch the existing row + let result = if let Some(model) = result { + model + } else { + // Use the deterministic id to fetch the exact row + entity::versioned_purl::Entity::find_by_id(id) + .one(connection) + .await? + .ok_or_else(|| { + Error::Any(anyhow::anyhow!( + "Failed to find versioned_purl after conflict" + )) + })? + }; + + Ok(PackageVersionContext::new(self, result)) } else { Err(Error::Purl(PurlErr::MissingVersion(purl.to_string()))) } diff --git a/modules/ingestor/src/graph/purl/package_version.rs b/modules/ingestor/src/graph/purl/package_version.rs index fcffe9cc8..897bde08d 100644 --- a/modules/ingestor/src/graph/purl/package_version.rs +++ b/modules/ingestor/src/graph/purl/package_version.rs @@ -4,10 +4,17 @@ use crate::graph::{ error::Error, purl::{PackageContext, qualified_package::QualifiedPackageContext}, }; -use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set}; +use sea_orm::{ + ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, FromQueryResult, QueryFilter, + Statement, +}; use std::fmt::{Debug, Formatter}; use trustify_common::purl::Purl; -use trustify_entity::{self as entity, qualified_purl::Qualifiers, versioned_purl}; +use trustify_entity::{ + self as entity, + qualified_purl::{CanonicalPurl, Qualifiers}, + versioned_purl, +}; /// Live context for a package version. #[derive(Clone)] @@ -35,21 +42,53 @@ impl<'g> PackageVersionContext<'g> { purl: &Purl, connection: &C, ) -> Result, Error> { - if let Some(found) = self.get_qualified_package(purl, connection).await? { - return Ok(found); - } - let cp = purl.clone().into(); - // No appropriate qualified package, create one. - let qualified_package = entity::qualified_purl::ActiveModel { - id: Set(purl.qualifier_uuid()), - versioned_purl_id: Set(self.package_version.id), - qualifiers: Set(Qualifiers(purl.qualifiers.clone())), - purl: Set(cp), - }; + let id = purl.qualifier_uuid(); + let cp: CanonicalPurl = purl.clone().into(); + let qualifiers_json = serde_json::to_value(Qualifiers(purl.qualifiers.clone())) + .map_err(|e| Error::Any(anyhow::anyhow!("Failed to serialize qualifiers: {}", e)))?; + let purl_json = serde_json::to_value(&cp) + .map_err(|e| Error::Any(anyhow::anyhow!("Failed to serialize purl: {}", e)))?; + + // Raw SQL required: SeaORM's .exec() with ON CONFLICT DO NOTHING doesn't support RETURNING, + // forcing a SELECT after every INSERT attempt (2 queries always). This approach uses + // RETURNING to get the row in 1 query on success, only doing a SELECT on conflict. + let sql_insert = r#" + INSERT INTO qualified_purl (id, versioned_purl_id, qualifiers, purl) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + RETURNING * + "#; - let qualified_package = qualified_package.insert(connection).await?; + let result = + entity::qualified_purl::Model::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + sql_insert, + vec![ + id.into(), + self.package_version.id.into(), + sea_query::Value::Json(Some(Box::new(qualifiers_json.clone()))), + sea_query::Value::Json(Some(Box::new(purl_json.clone()))), + ], + )) + .one(connection) + .await?; + + // If INSERT returned None (conflict occurred), fetch the existing row + let result = if let Some(model) = result { + model + } else { + // Use the deterministic id to fetch the exact row + entity::qualified_purl::Entity::find_by_id(id) + .one(connection) + .await? + .ok_or_else(|| { + Error::Any(anyhow::anyhow!( + "Failed to find qualified_purl after conflict" + )) + })? + }; - Ok(QualifiedPackageContext::new(self, qualified_package)) + Ok(QualifiedPackageContext::new(self, result)) } pub async fn get_qualified_package( diff --git a/modules/ingestor/tests/parallel.rs b/modules/ingestor/tests/parallel.rs index a42819082..42fc9f4e3 100644 --- a/modules/ingestor/tests/parallel.rs +++ b/modules/ingestor/tests/parallel.rs @@ -89,7 +89,7 @@ fn duplicate_sbom(spdx: &SPDX) -> anyhow::Result { /// Assert that all results are ok. Logs errors other of failures. fn assert_all_ok(num: usize, result: Vec>) { - assert_eq!(result.len(), num); + assert_eq!(num, result.len()); let ok = result .iter() @@ -102,7 +102,7 @@ fn assert_all_ok(num: usize, result: Vec>) { }) .count(); - assert_eq!(ok, num); + assert_eq!(num, ok); } /// Ingest x CSAF documents in parallel @@ -273,3 +273,33 @@ async fn license_creator(ctx: &TrustifyContext) -> Result<(), anyhow::Error> { Ok(()) } + +/// Ingest advisories in parallel +#[test_context(TrustifyContext)] +#[instrument] +#[test(tokio::test(flavor = "multi_thread", worker_threads = 4))] +async fn advisories_parallel(ctx: &TrustifyContext) -> Result<(), anyhow::Error> { + let mut data = vec![]; + data.push(document_bytes("osv/GHSA-gqp3-2cvr-x8m3.json.bz2").await?); + data.push(document_bytes("osv/GHSA-h3gc-qfqq-6h8f.json.bz2").await?); + let mut tasks = vec![]; + data.iter().for_each(|advisory| { + let next = advisory.clone(); + let service = ctx.ingestor.clone(); + tasks.push(async move { + service + .ingest(&next, Format::Advisory, (), None, Cache::Skip) + .await?; + Ok::<_, anyhow::Error>(()) + }); + }); + + // progress ingestion tasks + let result = futures::future::join_all(tasks).await; + + // now test + assert_all_ok(data.len(), result); + + // done + Ok(()) +} From 8d1df91833dc49543fa3ff96e25c483c2ab40ed1 Mon Sep 17 00:00:00 2001 From: mrizzi Date: Fri, 7 Nov 2025 18:40:33 +0100 Subject: [PATCH 2/3] fix: PurlStatusCreator to avoid package ingestion race condition (TC-3152) Signed-off-by: mrizzi Assisted-by: Claude Code --- modules/ingestor/src/graph/purl/creator.rs | 25 +- modules/ingestor/src/graph/purl/mod.rs | 144 +++---- .../src/graph/purl/package_version.rs | 69 +-- .../ingestor/src/graph/purl/status_creator.rs | 135 ++++++ .../src/service/advisory/cve/loader.rs | 58 ++- .../src/service/advisory/osv/loader.rs | 401 ++++++++++-------- 6 files changed, 472 insertions(+), 360 deletions(-) create mode 100644 modules/ingestor/src/graph/purl/status_creator.rs diff --git a/modules/ingestor/src/graph/purl/creator.rs b/modules/ingestor/src/graph/purl/creator.rs index 41a4d9670..35912f81d 100644 --- a/modules/ingestor/src/graph/purl/creator.rs +++ b/modules/ingestor/src/graph/purl/creator.rs @@ -1,11 +1,10 @@ -use crate::graph::error::Error; +use crate::graph::{error::Error, purl}; use sea_orm::{ActiveValue::Set, ConnectionTrait, EntityTrait}; use sea_query::OnConflict; use std::collections::{BTreeMap, HashSet}; use tracing::instrument; use trustify_common::{db::chunk::EntityChunkedIter, purl::Purl}; use trustify_entity::{ - base_purl, qualified_purl::{self, Qualifiers}, versioned_purl, }; @@ -35,23 +34,15 @@ impl PurlCreator { return Ok(()); } - // insert all packages + // Use the shared helper to batch create base PURLs + purl::batch_create_base_purls(self.purls.iter().cloned(), db).await?; - let mut packages = BTreeMap::new(); let mut versions = BTreeMap::new(); let mut qualifieds = BTreeMap::new(); for purl in self.purls { let cp = purl.clone().into(); let (package, version, qualified) = purl.uuids(); - packages - .entry(package) - .or_insert_with(|| base_purl::ActiveModel { - id: Set(package), - r#type: Set(purl.ty), - namespace: Set(purl.namespace), - name: Set(purl.name), - }); versions .entry(version) @@ -71,16 +62,6 @@ impl PurlCreator { }); } - // insert packages - - for batch in &packages.into_values().chunked() { - base_purl::Entity::insert_many(batch) - .on_conflict(OnConflict::new().do_nothing().to_owned()) - .do_nothing() - .exec_without_returning(db) - .await?; - } - // insert all package versions for batch in &versions.into_values().chunked() { diff --git a/modules/ingestor/src/graph/purl/mod.rs b/modules/ingestor/src/graph/purl/mod.rs index d7430adc0..5969cbadd 100644 --- a/modules/ingestor/src/graph/purl/mod.rs +++ b/modules/ingestor/src/graph/purl/mod.rs @@ -3,19 +3,22 @@ pub mod creator; pub mod package_version; pub mod qualified_package; +pub mod status_creator; use crate::graph::{Graph, error::Error}; use package_version::PackageVersionContext; use qualified_package::QualifiedPackageContext; use sea_orm::{ - ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, FromQueryResult, QueryFilter, - Statement, prelude::Uuid, + ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set, prelude::Uuid, +}; +use sea_query::{OnConflict, SelectStatement}; +use std::{ + collections::BTreeMap, + fmt::{Debug, Formatter}, }; -use sea_query::SelectStatement; -use std::fmt::{Debug, Formatter}; use tracing::instrument; use trustify_common::{ - db::limiter::LimiterTrait, + db::{chunk::EntityChunkedIter, limiter::LimiterTrait}, model::{Paginated, PaginatedResults}, purl::{Purl, PurlErr}, }; @@ -65,45 +68,18 @@ impl Graph { purl: &Purl, connection: &C, ) -> Result, Error> { - let id = purl.package_uuid(); - - // Raw SQL required: SeaORM's .exec() with ON CONFLICT DO NOTHING doesn't support RETURNING, - // forcing a SELECT after every INSERT attempt (2 queries always). This approach uses - // RETURNING to get the row in 1 query on success, only doing a SELECT on conflict. - let sql_insert = r#" - INSERT INTO base_purl (id, type, namespace, name) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - RETURNING * - "#; - - let result = entity::base_purl::Model::find_by_statement(Statement::from_sql_and_values( - DatabaseBackend::Postgres, - sql_insert, - vec![ - id.into(), - purl.ty.clone().into(), - purl.namespace.as_deref().into(), - purl.name.clone().into(), - ], - )) - .one(connection) - .await?; - - // If INSERT returned None (conflict occurred), fetch the existing row - let result = if let Some(model) = result { - model + if let Some(found) = self.get_package(purl, connection).await? { + Ok(found) } else { - // Use the deterministic id to fetch the exact row - entity::base_purl::Entity::find_by_id(id) - .one(connection) - .await? - .ok_or_else(|| { - Error::Any(anyhow::anyhow!("Failed to find base_purl after conflict")) - })? - }; + let model = entity::base_purl::ActiveModel { + id: Set(purl.package_uuid()), + r#type: Set(purl.ty.clone()), + namespace: Set(purl.namespace.clone()), + name: Set(purl.name.clone()), + }; - Ok(PackageContext::new(self, result)) + Ok(PackageContext::new(self, model.insert(connection).await?)) + } } /// Retrieve a *fully-qualified* package entry, if it exists. @@ -280,43 +256,20 @@ impl<'g> PackageContext<'g> { connection: &C, ) -> Result, Error> { if let Some(version) = &purl.version { - let id = purl.version_uuid(); - - // Raw SQL required: SeaORM's .exec() with ON CONFLICT DO NOTHING doesn't support RETURNING, - // forcing a SELECT after every INSERT attempt (2 queries always). This approach uses - // RETURNING to get the row in 1 query on success, only doing a SELECT on conflict. - let sql_insert = r#" - INSERT INTO versioned_purl (id, base_purl_id, version) - VALUES ($1, $2, $3) - ON CONFLICT DO NOTHING - RETURNING * - "#; - - let result = - entity::versioned_purl::Model::find_by_statement(Statement::from_sql_and_values( - DatabaseBackend::Postgres, - sql_insert, - vec![id.into(), self.base_purl.id.into(), version.clone().into()], - )) - .one(connection) - .await?; - - // If INSERT returned None (conflict occurred), fetch the existing row - let result = if let Some(model) = result { - model + if let Some(found) = self.get_package_version(purl, connection).await? { + Ok(found) } else { - // Use the deterministic id to fetch the exact row - entity::versioned_purl::Entity::find_by_id(id) - .one(connection) - .await? - .ok_or_else(|| { - Error::Any(anyhow::anyhow!( - "Failed to find versioned_purl after conflict" - )) - })? - }; - - Ok(PackageVersionContext::new(self, result)) + let model = entity::versioned_purl::ActiveModel { + id: Set(purl.version_uuid()), + base_purl_id: Set(self.base_purl.id), + version: Set(version.clone()), + }; + + Ok(PackageVersionContext::new( + self, + model.insert(connection).await?, + )) + } } else { Err(Error::Purl(PurlErr::MissingVersion(purl.to_string()))) } @@ -378,6 +331,41 @@ impl<'g> PackageContext<'g> { } } +/// Batch create base PURLs (without versions or qualifiers). +/// +/// This helper function efficiently creates multiple base_purl entries in batches, +/// handling duplicates via ON CONFLICT DO NOTHING. It's used by both PurlCreator +/// and advisory loaders to avoid code duplication. +pub async fn batch_create_base_purls( + purls: impl IntoIterator, + connection: &C, +) -> Result<(), Error> { + let mut packages = BTreeMap::new(); + + for purl in purls { + let package = purl.package_uuid(); + packages + .entry(package) + .or_insert_with(|| entity::base_purl::ActiveModel { + id: Set(package), + r#type: Set(purl.ty), + namespace: Set(purl.namespace), + name: Set(purl.name), + }); + } + + // Batch insert packages + for batch in &packages.into_values().chunked() { + entity::base_purl::Entity::insert_many(batch) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .do_nothing() + .exec(connection) + .await?; + } + + Ok(()) +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { diff --git a/modules/ingestor/src/graph/purl/package_version.rs b/modules/ingestor/src/graph/purl/package_version.rs index 897bde08d..fcffe9cc8 100644 --- a/modules/ingestor/src/graph/purl/package_version.rs +++ b/modules/ingestor/src/graph/purl/package_version.rs @@ -4,17 +4,10 @@ use crate::graph::{ error::Error, purl::{PackageContext, qualified_package::QualifiedPackageContext}, }; -use sea_orm::{ - ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, FromQueryResult, QueryFilter, - Statement, -}; +use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set}; use std::fmt::{Debug, Formatter}; use trustify_common::purl::Purl; -use trustify_entity::{ - self as entity, - qualified_purl::{CanonicalPurl, Qualifiers}, - versioned_purl, -}; +use trustify_entity::{self as entity, qualified_purl::Qualifiers, versioned_purl}; /// Live context for a package version. #[derive(Clone)] @@ -42,53 +35,21 @@ impl<'g> PackageVersionContext<'g> { purl: &Purl, connection: &C, ) -> Result, Error> { - let id = purl.qualifier_uuid(); - let cp: CanonicalPurl = purl.clone().into(); - let qualifiers_json = serde_json::to_value(Qualifiers(purl.qualifiers.clone())) - .map_err(|e| Error::Any(anyhow::anyhow!("Failed to serialize qualifiers: {}", e)))?; - let purl_json = serde_json::to_value(&cp) - .map_err(|e| Error::Any(anyhow::anyhow!("Failed to serialize purl: {}", e)))?; - - // Raw SQL required: SeaORM's .exec() with ON CONFLICT DO NOTHING doesn't support RETURNING, - // forcing a SELECT after every INSERT attempt (2 queries always). This approach uses - // RETURNING to get the row in 1 query on success, only doing a SELECT on conflict. - let sql_insert = r#" - INSERT INTO qualified_purl (id, versioned_purl_id, qualifiers, purl) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - RETURNING * - "#; - - let result = - entity::qualified_purl::Model::find_by_statement(Statement::from_sql_and_values( - DatabaseBackend::Postgres, - sql_insert, - vec![ - id.into(), - self.package_version.id.into(), - sea_query::Value::Json(Some(Box::new(qualifiers_json.clone()))), - sea_query::Value::Json(Some(Box::new(purl_json.clone()))), - ], - )) - .one(connection) - .await?; - - // If INSERT returned None (conflict occurred), fetch the existing row - let result = if let Some(model) = result { - model - } else { - // Use the deterministic id to fetch the exact row - entity::qualified_purl::Entity::find_by_id(id) - .one(connection) - .await? - .ok_or_else(|| { - Error::Any(anyhow::anyhow!( - "Failed to find qualified_purl after conflict" - )) - })? + if let Some(found) = self.get_qualified_package(purl, connection).await? { + return Ok(found); + } + let cp = purl.clone().into(); + // No appropriate qualified package, create one. + let qualified_package = entity::qualified_purl::ActiveModel { + id: Set(purl.qualifier_uuid()), + versioned_purl_id: Set(self.package_version.id), + qualifiers: Set(Qualifiers(purl.qualifiers.clone())), + purl: Set(cp), }; - Ok(QualifiedPackageContext::new(self, result)) + let qualified_package = qualified_package.insert(connection).await?; + + Ok(QualifiedPackageContext::new(self, qualified_package)) } pub async fn get_qualified_package( diff --git a/modules/ingestor/src/graph/purl/status_creator.rs b/modules/ingestor/src/graph/purl/status_creator.rs new file mode 100644 index 000000000..a0eacc326 --- /dev/null +++ b/modules/ingestor/src/graph/purl/status_creator.rs @@ -0,0 +1,135 @@ +use crate::graph::{ + advisory::{purl_status::PurlStatus, version::VersionInfo}, + error::Error, +}; +use sea_orm::{ActiveValue::Set, ConnectionTrait, EntityTrait, QueryFilter}; +use sea_query::{Expr, OnConflict, PgFunc}; +use std::collections::{BTreeMap, BTreeSet}; +use tracing::instrument; +use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl}; +use trustify_entity::{purl_status, status, version_range}; +use uuid::Uuid; + +/// Input data for creating a PURL status entry +#[derive(Clone, Debug)] +pub struct PurlStatusEntry { + pub advisory_id: Uuid, + pub vulnerability_id: String, + pub purl: Purl, + pub status: String, + pub version_info: VersionInfo, + pub context_cpe: Option, +} + +/// Creator for batch insertion of PURL statuses +/// +/// Follows the Creator pattern used by PurlCreator, CpeCreator, etc. +/// Collects PURL status entries and creates them in batches to avoid +/// N+1 query problems and race conditions. +#[derive(Default)] +pub struct PurlStatusCreator { + entries: Vec, +} + +impl PurlStatusCreator { + pub fn new() -> Self { + Self::default() + } + + /// Add a PURL status entry to be created + pub fn add(&mut self, entry: &PurlStatusEntry) { + self.entries.push(entry.clone()); + } + + /// Create all collected PURL statuses in batches + #[instrument(skip_all, fields(num = self.entries.len()), err(level=tracing::Level::INFO))] + pub async fn create(self, connection: &C) -> Result<(), Error> + where + C: ConnectionTrait, + { + if self.entries.is_empty() { + return Ok(()); + } + + // 1. Batch lookup all unique status slugs + let unique_statuses: Vec = self + .entries + .iter() + .map(|e| e.status.clone()) + .collect::>() + .into_iter() + .collect(); + + let status_models = status::Entity::find() + .filter(Expr::col(status::Column::Slug).eq(PgFunc::any(unique_statuses))) + .all(connection) + .await?; + + let status_map: BTreeMap = status_models + .into_iter() + .map(|s| (s.slug.clone(), s.id)) + .collect(); + + // 2. Deduplicate and build ActiveModels + let mut version_ranges = BTreeMap::new(); + let mut purl_statuses = BTreeMap::new(); + + for entry in self.entries { + // Validate status exists + let status_id = *status_map + .get(&entry.status) + .ok_or_else(|| Error::InvalidStatus(entry.status.clone()))?; + + // Create PurlStatus and use its uuid() method + let purl_status = PurlStatus { + cpe: entry.context_cpe.clone(), + purl: entry.purl.clone(), + status: status_id, + info: entry.version_info.clone(), + }; + + let uuid = purl_status.uuid(entry.advisory_id, entry.vulnerability_id.clone()); + let base_purl_id = entry.purl.package_uuid(); + let version_range_id = entry.version_info.uuid(); + let context_cpe_id = entry.context_cpe.as_ref().map(|cpe| cpe.uuid()); + + // Deduplicate version ranges + version_ranges + .entry(version_range_id) + .or_insert_with(|| entry.version_info.clone().into_active_model()); + + // Deduplicate purl_statuses by UUID + purl_statuses + .entry(uuid) + .or_insert_with(|| purl_status::ActiveModel { + id: Set(uuid), + advisory_id: Set(entry.advisory_id), + vulnerability_id: Set(entry.vulnerability_id.clone()), + status_id: Set(status_id), + base_purl_id: Set(base_purl_id), + version_range_id: Set(version_range_id), + context_cpe_id: Set(context_cpe_id), + }); + } + + // 3. Batch insert version ranges + for batch in &version_ranges.into_values().chunked() { + version_range::Entity::insert_many(batch) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .do_nothing() + .exec_without_returning(connection) + .await?; + } + + // 4. Batch insert purl_statuses + for batch in &purl_statuses.into_values().chunked() { + purl_status::Entity::insert_many(batch) + .on_conflict(OnConflict::new().do_nothing().to_owned()) + .do_nothing() + .exec_without_returning(connection) + .await?; + } + + Ok(()) + } +} diff --git a/modules/ingestor/src/service/advisory/cve/loader.rs b/modules/ingestor/src/service/advisory/cve/loader.rs index 2c6206270..a8ed0131b 100644 --- a/modules/ingestor/src/service/advisory/cve/loader.rs +++ b/modules/ingestor/src/service/advisory/cve/loader.rs @@ -5,6 +5,10 @@ use crate::{ AdvisoryInformation, AdvisoryVulnerabilityInformation, version::{Version, VersionInfo, VersionSpec}, }, + purl::{ + self, + status_creator::{PurlStatusCreator, PurlStatusEntry}, + }, vulnerability::VulnerabilityInformation, }, model::IngestResult, @@ -114,9 +118,16 @@ impl<'g> CveLoader<'g> { } } + // Initialize batch creator for efficient status ingestion + let mut purl_status_creator = PurlStatusCreator::new(); + let mut base_purls = std::collections::HashSet::new(); + if let Some(affected) = affected { for product in affected { if let Some(purl) = divine_purl(product) { + // Collect base PURL for batch creation + base_purls.insert(purl.clone()); + // okay! we have a purl, now // sort out version bounds & status for version in &product.versions { @@ -146,30 +157,39 @@ impl<'g> CveLoader<'g> { }, }; - advisory_vuln - .ingest_package_status( - None, - &purl, - match status { - Status::Affected => "affected", - Status::Unaffected => "not_affected", - Status::Unknown => "unknown", - }, - VersionInfo { - scheme: version_type - .as_deref() - .map(VersionScheme::from) - .unwrap_or(VersionScheme::Generic), - spec: version_spec, - }, - &tx, - ) - .await? + // Add package status entry to batch creator + purl_status_creator.add(&PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: match status { + Status::Affected => "affected".to_string(), + Status::Unaffected => "not_affected".to_string(), + Status::Unknown => "unknown".to_string(), + }, + version_info: VersionInfo { + scheme: version_type + .as_deref() + .map(VersionScheme::from) + .unwrap_or(VersionScheme::Generic), + spec: version_spec, + }, + context_cpe: None, + }); } } } } + // Batch create base PURLs (without versions/qualifiers) + purl::batch_create_base_purls(base_purls, &tx).await?; + + // Batch create statuses + purl_status_creator.create(&tx).await?; + vulnerability .drop_descriptions_for_advisory(advisory.advisory.id, &tx) .await?; diff --git a/modules/ingestor/src/service/advisory/osv/loader.rs b/modules/ingestor/src/service/advisory/osv/loader.rs index 206ba4dac..6e1b06f90 100644 --- a/modules/ingestor/src/service/advisory/osv/loader.rs +++ b/modules/ingestor/src/service/advisory/osv/loader.rs @@ -6,7 +6,11 @@ use crate::{ advisory_vulnerability::AdvisoryVulnerabilityContext, version::{Version, VersionInfo, VersionSpec}, }, - purl::creator::PurlCreator, + purl::{ + self, + creator::PurlCreator, + status_creator::{PurlStatusCreator, PurlStatusEntry}, + }, }, model::IngestResult, service::{ @@ -16,8 +20,8 @@ use crate::{ }; use osv::schema::{Ecosystem, Event, Range, RangeType, ReferenceType, SeverityType, Vulnerability}; use sbom_walker::report::ReportSink; -use sea_orm::{ConnectionTrait, TransactionTrait}; -use std::{fmt::Debug, str::FromStr}; +use sea_orm::TransactionTrait; +use std::{collections::HashSet, fmt::Debug, str::FromStr}; use tracing::instrument; use trustify_common::{hashing::Digests, id::Id, purl::Purl, time::ChronoExt}; use trustify_cvss::cvss3::Cvss3Base; @@ -78,6 +82,8 @@ impl<'g> OsvLoader<'g> { } let mut purl_creator = PurlCreator::new(); + let mut purl_status_creator = PurlStatusCreator::new(); + let mut base_purls = HashSet::new(); for cve_id in cve_ids { self.graph.ingest_vulnerability(&cve_id, (), &tx).await?; @@ -135,144 +141,157 @@ impl<'g> OsvLoader<'g> { // iterate through the known versions, apply the version, and create them for version in affected.versions.iter().flatten() { purl_creator.add(purl.with_version(version)); - } - - // Process explicit versions for advisory linking - for version in affected.versions.iter().flatten() { - ingest_exact(&advisory_vuln, &purl, "affected", version, &tx).await?; + // Process explicit versions for advisory linking + purl_status_creator.add(&PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "affected".to_string(), + version_info: VersionInfo { + scheme: VersionScheme::Generic, + spec: VersionSpec::Exact(version.to_string()), + }, + context_cpe: None, + }); } for range in affected.ranges.iter().flatten() { + // Collect base PURL for range-based status entries + base_purls.insert(purl.clone()); + match (&range.range_type, &package.ecosystem) { (RangeType::Semver, _) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Semver, - &tx, - ) - .await?; + VersionScheme::Semver, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Git, _) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Git, - &tx, - ) - .await?; + VersionScheme::Git, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Maven(_)) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Maven, - &tx, - ) - .await?; + VersionScheme::Maven, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::PyPI | Ecosystem::Python) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Python, - &tx, - ) - .await?; + VersionScheme::Python, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Go) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Golang, - &tx, - ) - .await?; + VersionScheme::Golang, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Npm) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Npm, - &tx, - ) - .await?; + VersionScheme::Npm, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Packagist) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Packagist, - &tx, - ) - .await?; + VersionScheme::Packagist, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::NuGet) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::NuGet, - &tx, - ) - .await?; + VersionScheme::NuGet, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::RubyGems) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Gem, - &tx, - ) - .await?; + VersionScheme::Gem, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Hex) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Hex, - &tx, - ) - .await?; + VersionScheme::Hex, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::SwiftURL) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Swift, - &tx, - ) - .await?; + VersionScheme::Swift, + ) { + purl_status_creator.add(&entry); + } } (RangeType::Ecosystem, Ecosystem::Pub) => { - create_package_status( + for entry in build_package_status( &advisory_vuln, &purl, range, - &VersionScheme::Pub, - &tx, - ) - .await?; + VersionScheme::Pub, + ) { + purl_status_creator.add(&entry); + } } (_, _) => { - create_package_status_versions( + for entry in build_package_status_versions( &advisory_vuln, &purl, range, affected.versions.iter().flatten(), - &tx, - ) - .await? + ) { + purl_status_creator.add(&entry); + } } } } @@ -282,6 +301,11 @@ impl<'g> OsvLoader<'g> { purl_creator.create(&tx).await?; + // Create base PURLs for range-based status entries + purl::batch_create_base_purls(base_purls, &tx).await?; + + purl_status_creator.create(&tx).await?; + tx.commit().await?; Ok(IngestResult { @@ -292,16 +316,87 @@ impl<'g> OsvLoader<'g> { } } -/// create package statues based on listed versions -async fn create_package_status_versions( +/// Build package status entries from range events +fn build_package_status( advisory_vuln: &AdvisoryVulnerabilityContext<'_>, purl: &Purl, range: &Range, - versions: impl IntoIterator, - connection: &C, -) -> Result<(), Error> { + version_scheme: VersionScheme, +) -> Vec { + let mut entries = Vec::new(); + let parsed_range = events_to_range(&range.events); + + let spec = match &parsed_range { + (Some(start), None) => Some(VersionSpec::Range( + Version::Inclusive(start.clone()), + Version::Unbounded, + )), + (None, Some((end, false))) => Some(VersionSpec::Range( + Version::Unbounded, + Version::Exclusive(end.clone()), + )), + (None, Some((end, true))) => Some(VersionSpec::Range( + Version::Unbounded, + Version::Inclusive(end.clone()), + )), + (Some(start), Some((end, false))) => Some(VersionSpec::Range( + Version::Inclusive(start.clone()), + Version::Exclusive(end.clone()), + )), + (Some(start), Some((end, true))) => Some(VersionSpec::Range( + Version::Inclusive(start.clone()), + Version::Inclusive(end.clone()), + )), + (None, None) => None, + }; + + if let Some(spec) = spec { + entries.push(PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "affected".to_string(), + version_info: VersionInfo { + scheme: version_scheme, + spec, + }, + context_cpe: None, + }); + } + + if let (_, Some((fixed, false))) = &parsed_range { + entries.push(PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "fixed".to_string(), + version_info: VersionInfo { + scheme: version_scheme, + spec: VersionSpec::Exact(fixed.clone()), + }, + context_cpe: None, + }); + } + + entries +} + +/// Build package status entries based on listed versions +fn build_package_status_versions<'a>( + advisory_vuln: &AdvisoryVulnerabilityContext<'_>, + purl: &Purl, + range: &Range, + versions: impl IntoIterator, +) -> Vec { // the list of versions, sorted by the range type let versions = versions.into_iter().cloned().collect::>(); + let mut entries = Vec::new(); let mut start = None; for event in &range.events { @@ -311,19 +406,31 @@ async fn create_package_status_versions( } Event::Fixed(version) | Event::LastAffected(version) => { if let Some(start) = start.take() { - ingest_range_from( + entries.extend(build_range_from( advisory_vuln, purl, "affected", start, Some(version), &versions, - connection, - ) - .await?; + )); } - ingest_exact(advisory_vuln, purl, "fixed", version, connection).await?; + // Add "fixed" status + entries.push(PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: "fixed".to_string(), + version_info: VersionInfo { + scheme: VersionScheme::Generic, + spec: VersionSpec::Exact(version.to_string()), + }, + context_cpe: None, + }); } Event::Limit(_) => {} // for non_exhaustive @@ -332,23 +439,21 @@ async fn create_package_status_versions( } if let Some(start) = start { - ingest_range_from( + entries.extend(build_range_from( advisory_vuln, purl, "affected", start, None, &versions, - connection, - ) - .await?; + )); } - Ok(()) + entries } -/// Ingest all from a start to an end -async fn ingest_range_from( +/// Build status entries for all versions from a start to an end +fn build_range_from( advisory_vuln: &AdvisoryVulnerabilityContext<'_>, purl: &Purl, status: &str, @@ -356,15 +461,26 @@ async fn ingest_range_from( // exclusive end end: Option<&str>, versions: &[impl AsRef], - connection: &C, -) -> Result<(), Error> { - let versions = match_versions(versions, start, end); - - for version in versions { - ingest_exact(advisory_vuln, purl, status, version, connection).await?; - } - - Ok(()) +) -> Vec { + let matched_versions = match_versions(versions, start, end); + + matched_versions + .into_iter() + .map(|version| PurlStatusEntry { + advisory_id: advisory_vuln.advisory.advisory.id, + vulnerability_id: advisory_vuln + .advisory_vulnerability + .vulnerability_id + .clone(), + purl: purl.clone(), + status: status.to_string(), + version_info: VersionInfo { + scheme: VersionScheme::Generic, + spec: VersionSpec::Exact(version.to_string()), + }, + context_cpe: None, + }) + .collect() } /// Extract a list of versions according to OSV @@ -401,95 +517,6 @@ fn match_versions<'v>( matches.unwrap_or_default() } -/// Ingest an exact version -async fn ingest_exact( - advisory_vuln: &AdvisoryVulnerabilityContext<'_>, - purl: &Purl, - status: &str, - version: &str, - connection: &C, -) -> Result<(), Error> { - Ok(advisory_vuln - .ingest_package_status( - None, - purl, - status, - VersionInfo { - scheme: VersionScheme::Generic, - spec: VersionSpec::Exact(version.to_string()), - }, - connection, - ) - .await?) -} - -/// create a package/purl status -async fn create_package_status( - advisory_vuln: &AdvisoryVulnerabilityContext<'_>, - purl: &Purl, - range: &Range, - version_scheme: &VersionScheme, - connection: &C, -) -> Result<(), Error> { - let parsed_range = events_to_range(&range.events); - - let spec = match &parsed_range { - (Some(start), None) => Some(VersionSpec::Range( - Version::Inclusive(start.clone()), - Version::Unbounded, - )), - (None, Some((end, false))) => Some(VersionSpec::Range( - Version::Unbounded, - Version::Exclusive(end.clone()), - )), - (None, Some((end, true))) => Some(VersionSpec::Range( - Version::Unbounded, - Version::Inclusive(end.clone()), - )), - (Some(start), Some((end, false))) => Some(VersionSpec::Range( - Version::Inclusive(start.clone()), - Version::Exclusive(end.clone()), - )), - (Some(start), Some((end, true))) => Some(VersionSpec::Range( - Version::Inclusive(start.clone()), - Version::Inclusive(end.clone()), - )), - (None, None) => None, - }; - - if let Some(spec) = spec { - advisory_vuln - .ingest_package_status( - None, - purl, - "affected", - VersionInfo { - scheme: *version_scheme, - spec, - }, - connection, - ) - .await?; - } - - if let (_, Some((fixed, false))) = &parsed_range { - advisory_vuln - .ingest_package_status( - None, - purl, - "fixed", - VersionInfo { - scheme: *version_scheme, - spec: VersionSpec::Exact(fixed.clone()), - }, - connection, - ) - .await? - } - - Ok(()) -} - fn detect_organization(osv: &Vulnerability) -> Option { if let Some(references) = &osv.references { let advisory_location = references From 0ea919f147b0be4459b38228b7dcf59a86c5254f Mon Sep 17 00:00:00 2001 From: mrizzi Date: Mon, 10 Nov 2025 12:18:19 +0100 Subject: [PATCH 3/3] fix: PurlStatusCreator to take PurlStatusEntry ownership (TC-3152) Signed-off-by: mrizzi Assisted-by: Claude Code --- .../ingestor/src/graph/purl/status_creator.rs | 4 +-- .../src/service/advisory/cve/loader.rs | 7 ++--- .../src/service/advisory/osv/loader.rs | 28 +++++++++---------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/modules/ingestor/src/graph/purl/status_creator.rs b/modules/ingestor/src/graph/purl/status_creator.rs index a0eacc326..adfa60acd 100644 --- a/modules/ingestor/src/graph/purl/status_creator.rs +++ b/modules/ingestor/src/graph/purl/status_creator.rs @@ -37,8 +37,8 @@ impl PurlStatusCreator { } /// Add a PURL status entry to be created - pub fn add(&mut self, entry: &PurlStatusEntry) { - self.entries.push(entry.clone()); + pub fn add(&mut self, entry: PurlStatusEntry) { + self.entries.push(entry); } /// Create all collected PURL statuses in batches diff --git a/modules/ingestor/src/service/advisory/cve/loader.rs b/modules/ingestor/src/service/advisory/cve/loader.rs index a8ed0131b..273443107 100644 --- a/modules/ingestor/src/service/advisory/cve/loader.rs +++ b/modules/ingestor/src/service/advisory/cve/loader.rs @@ -20,8 +20,7 @@ use cve::{ }; use sea_orm::TransactionTrait; use serde_json::Value; -use std::fmt::Debug; -use std::str::FromStr; +use std::{collections::HashSet, fmt::Debug, str::FromStr}; use time::OffsetDateTime; use tracing::instrument; use trustify_common::{hashing::Digests, id::Id}; @@ -120,7 +119,7 @@ impl<'g> CveLoader<'g> { // Initialize batch creator for efficient status ingestion let mut purl_status_creator = PurlStatusCreator::new(); - let mut base_purls = std::collections::HashSet::new(); + let mut base_purls = HashSet::new(); if let Some(affected) = affected { for product in affected { @@ -158,7 +157,7 @@ impl<'g> CveLoader<'g> { }; // Add package status entry to batch creator - purl_status_creator.add(&PurlStatusEntry { + purl_status_creator.add(PurlStatusEntry { advisory_id: advisory_vuln.advisory.advisory.id, vulnerability_id: advisory_vuln .advisory_vulnerability diff --git a/modules/ingestor/src/service/advisory/osv/loader.rs b/modules/ingestor/src/service/advisory/osv/loader.rs index 6e1b06f90..c87c485df 100644 --- a/modules/ingestor/src/service/advisory/osv/loader.rs +++ b/modules/ingestor/src/service/advisory/osv/loader.rs @@ -142,7 +142,7 @@ impl<'g> OsvLoader<'g> { for version in affected.versions.iter().flatten() { purl_creator.add(purl.with_version(version)); // Process explicit versions for advisory linking - purl_status_creator.add(&PurlStatusEntry { + purl_status_creator.add(PurlStatusEntry { advisory_id: advisory_vuln.advisory.advisory.id, vulnerability_id: advisory_vuln .advisory_vulnerability @@ -170,7 +170,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Semver, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Git, _) => { @@ -180,7 +180,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Git, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::Maven(_)) => { @@ -190,7 +190,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Maven, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::PyPI | Ecosystem::Python) => { @@ -200,7 +200,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Python, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::Go) => { @@ -210,7 +210,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Golang, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::Npm) => { @@ -220,7 +220,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Npm, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::Packagist) => { @@ -230,7 +230,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Packagist, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::NuGet) => { @@ -240,7 +240,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::NuGet, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::RubyGems) => { @@ -250,7 +250,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Gem, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::Hex) => { @@ -260,7 +260,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Hex, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::SwiftURL) => { @@ -270,7 +270,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Swift, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (RangeType::Ecosystem, Ecosystem::Pub) => { @@ -280,7 +280,7 @@ impl<'g> OsvLoader<'g> { range, VersionScheme::Pub, ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } (_, _) => { @@ -290,7 +290,7 @@ impl<'g> OsvLoader<'g> { range, affected.versions.iter().flatten(), ) { - purl_status_creator.add(&entry); + purl_status_creator.add(entry); } } }