Skip to content

Commit dac49e6

Browse files
store: Move recording the active_copy in the transaction
Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com>
1 parent 3ee81d6 commit dac49e6

File tree

1 file changed

+51
-57
lines changed

1 file changed

+51
-57
lines changed

store/postgres/src/subgraph_store.rs

Lines changed: 51 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ impl SubgraphStore {
340340
self.evict(schema.id())?;
341341
let graft_base = deployment.graft_base.as_ref();
342342

343-
let (site, exists, node_id) = {
343+
let (site, deployment_store, node_id) = {
344344
// We need to deal with two situations:
345345
// (1) We are really creating a new subgraph; it therefore needs
346346
// to go in the shard and onto the node that the placement
@@ -353,67 +353,61 @@ impl SubgraphStore {
353353
// the same deployment in another shard
354354
let (shard, node_id) = self.place(&name, &network_name, node_id).await?;
355355

356-
let mut conn = self.primary_conn().await?;
357-
conn.transaction(|conn| {
358-
async {
359-
let (site, site_was_created) = conn
360-
.allocate_site(shard, schema.id(), network_name, graft_base)
361-
.await?;
362-
let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id);
363-
let site = Arc::new(site);
364-
365-
if let Some(graft_base) = graft_base {
366-
// Ensure that the graft base exists
367-
let base_layout = self.layout(graft_base).await?;
368-
let entities_with_causality_region =
369-
deployment.manifest.entities_with_causality_region.clone();
370-
let catalog = Catalog::for_tests(
371-
site.cheap_clone(),
372-
entities_with_causality_region.into_iter().collect(),
373-
)?;
374-
let layout = Layout::new(site.cheap_clone(), schema, catalog)?;
375-
376-
let errors = layout.can_copy_from(&base_layout);
377-
if !errors.is_empty() {
378-
return Err(StoreError::Unknown(anyhow!(
379-
"The subgraph `{}` cannot be used as the graft base \
380-
for `{}` because the schemas are incompatible:\n - {}",
381-
&base_layout.catalog.site.namespace,
382-
&layout.catalog.site.namespace,
383-
errors.join("\n - ")
384-
)));
356+
let mut pconn = self.primary_conn().await?;
357+
pconn
358+
.transaction(|pconn| {
359+
async {
360+
let (site, site_was_created) = pconn
361+
.allocate_site(shard, schema.id(), network_name, graft_base)
362+
.await?;
363+
let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id);
364+
let site = Arc::new(site);
365+
let deployment_store = self
366+
.stores
367+
.get(&site.shard)
368+
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;
369+
370+
if site_was_created {
371+
if let Some(graft_base) = graft_base {
372+
// Ensure that the graft base exists
373+
let base_layout = self.layout(graft_base).await?;
374+
let mut shard_conn =
375+
deployment_store.get_replica_conn(ReplicaId::Main).await?;
376+
let entities_with_causality_region =
377+
deployment.manifest.entities_with_causality_region.clone();
378+
let catalog = Catalog::for_creation(
379+
&mut shard_conn,
380+
site.cheap_clone(),
381+
entities_with_causality_region.into_iter().collect(),
382+
)
383+
.await?;
384+
let layout = Layout::new(site.cheap_clone(), schema, catalog)?;
385+
386+
let errors = layout.can_copy_from(&base_layout);
387+
if !errors.is_empty() {
388+
return Err(StoreError::Unknown(anyhow!(
389+
"The subgraph `{}` cannot be used as the graft base \
390+
for `{}` because the schemas are incompatible:\n - {}",
391+
&base_layout.catalog.site.namespace,
392+
&layout.catalog.site.namespace,
393+
errors.join("\n - ")
394+
)));
395+
}
396+
397+
pconn
398+
.record_active_copy(base_layout.site.as_ref(), site.as_ref())
399+
.await?;
400+
}
385401
}
386-
}
387-
388-
Ok((site, !site_was_created, node_id))
389-
}
390-
.scope_boxed()
391-
})
392-
.await?
393-
};
394402

395-
// If the deployment already exists, we don't need to perform any copying
396-
// If it doesn't exist, we need to copy the graft base to the new deployment
397-
if !exists {
398-
let graft_base_layout = match graft_base {
399-
Some(base) => Some(self.layout(base).await?),
400-
None => None,
401-
};
402-
403-
if let Some(graft_base_layout) = &graft_base_layout {
404-
self.primary_conn()
405-
.await?
406-
.record_active_copy(graft_base_layout.site.as_ref(), site.as_ref())
407-
.await?;
408-
}
403+
Ok((site, deployment_store, node_id))
404+
}
405+
.scope_boxed()
406+
})
407+
.await?
409408
};
410409

411410
// Create the actual databases schema and metadata entries
412-
let deployment_store = self
413-
.stores
414-
.get(&site.shard)
415-
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;
416-
417411
let index_def = if let Some(graft) = graft_base {
418412
if let Some(site) = self.sites.get(graft) {
419413
let store = self

0 commit comments

Comments
 (0)