Skip to content

Commit 8102cbf

Browse files
fix(store): handle orphaned sites in graft compatibility check
Move graft compatibility validation out of primary transaction to avoid holding primary and shard connections simultaneously, which could exhaust connection pools when they share the same database. - Detect orphaned sites (site exists but deployment doesn't) and re-run the graft `can_copy_from` check on redeploy - Only insert into `active_copies` when a copy is actually needed, avoiding spurious records for already-copied deployments - Maintain idempotency: failed deployments leave state that will be properly validated on the next attempt Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com>
1 parent dac49e6 commit 8102cbf

File tree

1 file changed

+55
-49
lines changed

1 file changed

+55
-49
lines changed

store/postgres/src/subgraph_store.rs

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -354,57 +354,63 @@ impl SubgraphStore {
354354
let (shard, node_id) = self.place(&name, &network_name, node_id).await?;
355355

356356
let mut pconn = self.primary_conn().await?;
357-
pconn
358-
.transaction(|pconn| {
359-
async {
360-
let (site, site_was_created) = pconn
361-
.allocate_site(shard, schema.id(), network_name, graft_base)
362-
.await?;
363-
let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id);
364-
let site = Arc::new(site);
365-
let deployment_store = self
366-
.stores
367-
.get(&site.shard)
368-
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;
369-
370-
if site_was_created {
371-
if let Some(graft_base) = graft_base {
372-
// Ensure that the graft base exists
373-
let base_layout = self.layout(graft_base).await?;
374-
let mut shard_conn =
375-
deployment_store.get_replica_conn(ReplicaId::Main).await?;
376-
let entities_with_causality_region =
377-
deployment.manifest.entities_with_causality_region.clone();
378-
let catalog = Catalog::for_creation(
379-
&mut shard_conn,
380-
site.cheap_clone(),
381-
entities_with_causality_region.into_iter().collect(),
382-
)
383-
.await?;
384-
let layout = Layout::new(site.cheap_clone(), schema, catalog)?;
385-
386-
let errors = layout.can_copy_from(&base_layout);
387-
if !errors.is_empty() {
388-
return Err(StoreError::Unknown(anyhow!(
389-
"The subgraph `{}` cannot be used as the graft base \
357+
358+
let (site, site_was_created) = pconn
359+
.allocate_site(shard, schema.id(), network_name, graft_base)
360+
.await?;
361+
let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id);
362+
let site = Arc::new(site);
363+
let deployment_store = self
364+
.stores
365+
.get(&site.shard)
366+
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;
367+
368+
let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?;
369+
let needs_check = if site_was_created {
370+
true
371+
} else {
372+
// If deployment does not exist, but site exists it means
373+
// that we are recovering from a failed deployment creation with an orphaned site.
374+
// In that case, we should check graft compatibility again.
375+
let exists = crate::deployment::exists(&mut shard_conn, &site).await?;
376+
!exists
377+
};
378+
379+
if let Some(graft_base) = graft_base {
380+
let base_layout = self.layout(graft_base).await?;
381+
382+
if needs_check {
383+
let entities_with_causality_region =
384+
deployment.manifest.entities_with_causality_region.clone();
385+
let catalog = Catalog::for_creation(
386+
&mut shard_conn,
387+
site.cheap_clone(),
388+
entities_with_causality_region.into_iter().collect(),
389+
)
390+
.await?;
391+
let layout = Layout::new(site.cheap_clone(), schema, catalog)?;
392+
393+
let errors = layout.can_copy_from(&base_layout);
394+
if !errors.is_empty() {
395+
return Err(StoreError::Unknown(anyhow!(
396+
"The subgraph `{}` cannot be used as the graft base \
390397
for `{}` because the schemas are incompatible:\n - {}",
391-
&base_layout.catalog.site.namespace,
392-
&layout.catalog.site.namespace,
393-
errors.join("\n - ")
394-
)));
395-
}
396-
397-
pconn
398-
.record_active_copy(base_layout.site.as_ref(), site.as_ref())
399-
.await?;
400-
}
401-
}
402-
403-
Ok((site, deployment_store, node_id))
398+
&base_layout.catalog.site.namespace,
399+
&layout.catalog.site.namespace,
400+
errors.join("\n - ")
401+
)));
404402
}
405-
.scope_boxed()
406-
})
407-
.await?
403+
404+
// Only record active copy when the graft check passes and a copy is needed.
405+
// If deployment already exists, the copy has either completed (no active_copies
406+
// record) or is in progress (active_copies record already exists).
407+
pconn
408+
.record_active_copy(base_layout.site.as_ref(), site.as_ref())
409+
.await?;
410+
}
411+
}
412+
413+
(site, deployment_store, node_id)
408414
};
409415

410416
// Create the actual databases schema and metadata entries

0 commit comments

Comments
 (0)