Skip to content

Commit 6f9fa54

Browse files
Bugfix: Ensure can_copy_from failures fully roll back deployment creation (#6228)
* store: Reject incompatible graft schemas during site allocation Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com> * store: Move recording the active_copy in the transaction Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com> * fix(store): handle orphaned sites in graft compatibility check Move graft compatibility validation out of primary transaction to avoid holding primary and shard connections simultaneously, which could exhaust connection pools when they share the same database. - Detect orphaned sites (site exists but deployment doesn't) and re-run the graft `can_copy_from` check on redeploy - Only insert into `active_copies` when a copy is actually needed, avoiding spurious records for already-copied deployments - Maintain idempotency: failed deployments leave state that will be properly validated on the next attempt Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com> * store: Address comments and move evaluations until they are needed. Move deployment exists check into the graft_base branch so the DB exists call is only executed when we actually are creating a graft. Rename needs_check → should_validate and add a clear comment describing the validation cases. Avoid calling layout unless validation is required and keep shard connections short‑lived (acquire/drop per use) to reduce connection deadlock risk. Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com> --------- Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com>
1 parent 8c87348 commit 6f9fa54

File tree

2 files changed

+60
-55
lines changed

2 files changed

+60
-55
lines changed

store/postgres/src/deployment_store.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ impl DeploymentStore {
184184
schema: &InputSchema,
185185
deployment: DeploymentCreate,
186186
site: Arc<Site>,
187-
graft_base: Option<Arc<Layout>>,
188187
replace: bool,
189188
on_sync: OnSync,
190189
index_def: Option<IndexList>,
@@ -217,27 +216,14 @@ impl DeploymentStore {
217216
let query = format!("create schema {}", &site.namespace);
218217
conn.batch_execute(&query).await?;
219218

220-
let layout = Layout::create_relational_schema(
219+
let _ = Layout::create_relational_schema(
221220
conn,
222221
site.clone(),
223222
schema,
224223
entities_with_causality_region.into_iter().collect(),
225224
index_def,
226225
)
227226
.await?;
228-
// See if we are grafting and check that the graft is permissible
229-
if let Some(base) = graft_base {
230-
let errors = layout.can_copy_from(&base);
231-
if !errors.is_empty() {
232-
return Err(StoreError::Unknown(anyhow!(
233-
"The subgraph `{}` cannot be used as the graft base \
234-
for `{}` because the schemas are incompatible:\n - {}",
235-
&base.catalog.site.namespace,
236-
&layout.catalog.site.namespace,
237-
errors.join("\n - ")
238-
)));
239-
}
240-
}
241227

242228
// Create data sources table
243229
if site.schema_version.private_data_sources() {

store/postgres/src/subgraph_store.rs

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use graph::{
3939
use graph::{derive::CheapClone, futures03::future::join_all, prelude::alloy::primitives::Address};
4040

4141
use crate::{
42+
catalog::Catalog,
4243
deployment::{OnSync, SubgraphHealth},
4344
primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site},
4445
relational::{
@@ -88,7 +89,7 @@ impl Shard {
8889
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
8990
{
9091
return Err(StoreError::InvalidIdentifier(format!(
91-
"shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'", name
92+
"shard name `{name}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'"
9293
)));
9394
}
9495
Ok(Shard(name))
@@ -339,7 +340,7 @@ impl SubgraphStore {
339340
self.evict(schema.id())?;
340341
let graft_base = deployment.graft_base.as_ref();
341342

342-
let (site, exists, node_id) = {
343+
let (site, deployment_store, node_id) = {
343344
// We need to deal with two situations:
344345
// (1) We are really creating a new subgraph; it therefore needs
345346
// to go in the shard and onto the node that the placement
@@ -351,42 +352,67 @@ impl SubgraphStore {
351352
// assignment that we used last time to avoid creating
352353
// the same deployment in another shard
353354
let (shard, node_id) = self.place(&name, &network_name, node_id).await?;
354-
let mut conn = self.primary_conn().await?;
355-
let (site, site_was_created) = conn
356-
.allocate_site(shard, schema.id(), network_name, graft_base)
357-
.await?;
358-
let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id);
359-
(site, !site_was_created, node_id)
360-
};
361-
let site = Arc::new(site);
362355

363-
// if the deployment already exists, we don't need to perform any copying
364-
// so we can set graft_base to None
365-
// if it doesn't exist, we need to copy the graft base to the new deployment
366-
let graft_base_layout = if !exists {
367-
let graft_base = match deployment.graft_base.as_ref() {
368-
Some(base) => Some(self.layout(base).await?),
369-
None => None,
370-
};
356+
let mut pconn = self.primary_conn().await?;
371357

372-
if let Some(graft_base) = &graft_base {
373-
self.primary_conn()
374-
.await?
375-
.record_active_copy(graft_base.site.as_ref(), site.as_ref())
358+
let (site, site_was_created) = pconn
359+
.allocate_site(shard, schema.id(), network_name, graft_base)
360+
.await?;
361+
let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id);
362+
let site = Arc::new(site);
363+
let deployment_store = self
364+
.stores
365+
.get(&site.shard)
366+
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;
367+
368+
if let Some(graft_base) = graft_base {
369+
// Perform schema compatibility validation when:
370+
// - the site was just created (first-time deployment), or
371+
// - the site exists but the deployment is missing (recovering from a failed create).
372+
// The `exists` DB call is only made if `site_was_created` is false.
373+
let should_validate = {
374+
let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?;
375+
site_was_created || !crate::deployment::exists(&mut shard_conn, &site).await?
376+
};
377+
378+
if should_validate {
379+
let base_layout = self.layout(graft_base).await?;
380+
let entities_with_causality_region =
381+
deployment.manifest.entities_with_causality_region.clone();
382+
let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?;
383+
let catalog = Catalog::for_creation(
384+
&mut shard_conn,
385+
site.cheap_clone(),
386+
entities_with_causality_region.into_iter().collect(),
387+
)
376388
.await?;
389+
let layout = Layout::new(site.cheap_clone(), schema, catalog)?;
390+
391+
let errors = layout.can_copy_from(&base_layout);
392+
if !errors.is_empty() {
393+
return Err(StoreError::Unknown(anyhow!(
394+
"The subgraph `{}` cannot be used as the graft base \
395+
for `{}` because the schemas are incompatible:\n - {}",
396+
&base_layout.catalog.site.namespace,
397+
&layout.catalog.site.namespace,
398+
errors.join("\n - ")
399+
)));
400+
}
401+
402+
// Only record active copy when the graft check passes and a copy is needed.
403+
// If deployment already exists, the copy has either completed (no active_copies
404+
// record) or is in progress (active_copies record already exists).
405+
pconn
406+
.record_active_copy(base_layout.site.as_ref(), site.as_ref())
407+
.await?;
408+
}
377409
}
378-
graft_base
379-
} else {
380-
None
410+
411+
(site, deployment_store, node_id)
381412
};
382413

383414
// Create the actual databases schema and metadata entries
384-
let deployment_store = self
385-
.stores
386-
.get(&site.shard)
387-
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;
388-
389-
let index_def = if let Some(graft) = &graft_base.clone() {
415+
let index_def = if let Some(graft) = graft_base {
390416
if let Some(site) = self.sites.get(graft) {
391417
let store = self
392418
.stores
@@ -406,7 +432,6 @@ impl SubgraphStore {
406432
schema,
407433
deployment,
408434
site.clone(),
409-
graft_base_layout,
410435
replace,
411436
OnSync::None,
412437
index_def,
@@ -731,18 +756,15 @@ impl Inner {
731756

732757
if src.id == dst.id {
733758
return Err(StoreError::Unknown(anyhow!(
734-
"can not copy deployment {} onto itself",
735-
src_loc
759+
"can not copy deployment {src_loc} onto itself"
736760
)));
737761
}
738762
// The very last thing we do when we set up a copy here is assign it
739763
// to a node. Therefore, if `dst` is already assigned, this function
740764
// should not have been called.
741765
if let Some(node) = self.mirror.assigned_node(dst.as_ref()).await? {
742766
return Err(StoreError::Unknown(anyhow!(
743-
"can not copy into deployment {} since it is already assigned to node `{}`",
744-
dst_loc,
745-
node
767+
"can not copy into deployment {dst_loc} since it is already assigned to node `{node}`"
746768
)));
747769
}
748770
let deployment = src_store.load_deployment(src.clone()).await?;
@@ -758,8 +780,6 @@ impl Inner {
758780
history_blocks_override: None,
759781
};
760782

761-
let graft_base = self.layout(&src.deployment).await?;
762-
763783
self.primary_conn()
764784
.await?
765785
.record_active_copy(src.as_ref(), dst.as_ref())
@@ -776,7 +796,6 @@ impl Inner {
776796
&src_layout.input_schema,
777797
deployment,
778798
dst.clone(),
779-
Some(graft_base),
780799
false,
781800
on_sync,
782801
Some(index_def),

0 commit comments

Comments
 (0)