Skip to content

Commit aa6115b

Browse files
committed
store: Asyncify deployment::schema
1 parent 185d52a commit aa6115b

File tree

8 files changed

+110
-81
lines changed

8 files changed

+110
-81
lines changed

store/postgres/src/deployment.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,10 @@ pub async fn debug_fork(
324324
}
325325
}
326326

327-
pub fn schema(conn: &mut PgConnection, site: &Site) -> Result<(InputSchema, bool), StoreError> {
327+
pub async fn schema(
328+
conn: &mut PgConnection,
329+
site: &Site,
330+
) -> Result<(InputSchema, bool), StoreError> {
328331
use subgraph_manifest as sm;
329332
let (s, spec_ver, use_bytea_prefix) = sm::table
330333
.select((sm::schema, sm::spec_version, sm::use_bytea_prefix))

store/postgres/src/deployment_store.rs

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,12 @@ impl DeploymentStore {
258258
.await
259259
}
260260

261-
pub(crate) fn load_deployment(
261+
pub(crate) async fn load_deployment(
262262
&self,
263263
site: Arc<Site>,
264264
) -> Result<SubgraphDeploymentEntity, StoreError> {
265265
let mut conn = self.get_conn()?;
266-
let layout = self.layout(&mut conn, site.clone())?;
266+
let layout = self.layout(&mut conn, site.clone()).await?;
267267
Ok(
268268
detail::deployment_entity(&mut conn, &site, &layout.input_schema)
269269
.with_context(|| format!("Deployment details not found for {}", site.deployment))?,
@@ -293,7 +293,7 @@ impl DeploymentStore {
293293
site: Arc<Site>,
294294
query: EntityQuery,
295295
) -> Result<(Vec<T>, Trace), QueryExecutionError> {
296-
let layout = self.layout(conn, site)?;
296+
let layout = self.layout(conn, site).await?;
297297

298298
let logger = query
299299
.logger
@@ -452,24 +452,24 @@ impl DeploymentStore {
452452
/// the Store. Layout objects with a pending migration can not be
453453
/// cached for longer than a transaction since they might change
454454
/// without us knowing
455-
pub(crate) fn layout(
455+
pub(crate) async fn layout(
456456
&self,
457457
conn: &mut PgConnection,
458458
site: Arc<Site>,
459459
) -> Result<Arc<Layout>, StoreError> {
460-
self.layout_cache.get(&self.logger, conn, site)
460+
self.layout_cache.get(&self.logger, conn, site).await
461461
}
462462

463463
/// Return the layout for a deployment. This might use a database
464464
/// connection for the lookup and should only be called if the caller
465465
/// does not have a connection currently. If it does, use `layout`
466-
pub(crate) fn find_layout(&self, site: Arc<Site>) -> Result<Arc<Layout>, StoreError> {
466+
pub(crate) async fn find_layout(&self, site: Arc<Site>) -> Result<Arc<Layout>, StoreError> {
467467
if let Some(layout) = self.layout_cache.find(site.as_ref()) {
468468
return Ok(layout);
469469
}
470470

471471
let mut conn = self.get_conn()?;
472-
self.layout(&mut conn, site)
472+
self.layout(&mut conn, site).await
473473
}
474474

475475
async fn subgraph_info_with_conn(
@@ -481,7 +481,7 @@ impl DeploymentStore {
481481
return Ok(info.clone());
482482
}
483483

484-
let layout = self.layout(conn, site.cheap_clone())?;
484+
let layout = self.layout(conn, site.cheap_clone()).await?;
485485
let manifest_info = deployment::ManifestInfo::load(conn, &site)?;
486486

487487
let graft_block = deployment::graft_point(conn, &site.deployment)
@@ -644,9 +644,13 @@ impl DeploymentStore {
644644
}
645645

646646
/// Runs the SQL `ANALYZE` command in a table.
647-
pub(crate) fn analyze(&self, site: Arc<Site>, entity: Option<&str>) -> Result<(), StoreError> {
647+
pub(crate) async fn analyze(
648+
&self,
649+
site: Arc<Site>,
650+
entity: Option<&str>,
651+
) -> Result<(), StoreError> {
648652
let mut conn = self.get_conn()?;
649-
let layout = self.layout(&mut conn, site)?;
653+
let layout = self.layout(&mut conn, site).await?;
650654
let tables = entity
651655
.map(|entity| resolve_table_name(&layout, entity))
652656
.transpose()?
@@ -677,7 +681,7 @@ impl DeploymentStore {
677681
target: i32,
678682
) -> Result<(), StoreError> {
679683
let mut conn = self.get_conn()?;
680-
let layout = self.layout(&mut conn, site.clone())?;
684+
let layout = self.layout(&mut conn, site.clone()).await?;
681685

682686
let tables = entity
683687
.map(|entity| resolve_table_name(&layout, entity))
@@ -706,15 +710,15 @@ impl DeploymentStore {
706710
}
707711

708712
/// Runs the SQL `ANALYZE` command in a table, with a shared connection.
709-
pub(crate) fn analyze_with_conn(
713+
pub(crate) async fn analyze_with_conn(
710714
&self,
711715
site: Arc<Site>,
712716
entity_name: &str,
713717
conn: &mut PgConnection,
714718
) -> Result<(), StoreError> {
715719
let store = self.clone();
716720
let entity_name = entity_name.to_owned();
717-
let layout = store.layout(conn, site)?;
721+
let layout = store.layout(conn, site).await?;
718722
let table = resolve_table_name(&layout, &entity_name)?;
719723
table.analyze(conn)
720724
}
@@ -734,7 +738,7 @@ impl DeploymentStore {
734738
let entity_name = entity_name.to_owned();
735739
self.with_conn(async move |conn, _| {
736740
let schema_name = site.namespace.clone();
737-
let layout = store.layout(conn, site)?;
741+
let layout = store.layout(conn, site).await?;
738742
let (index_name, sql) = generate_index_creation_sql(
739743
layout,
740744
&entity_name,
@@ -772,7 +776,7 @@ impl DeploymentStore {
772776
let entity_name = entity_name.to_owned();
773777
self.with_conn(async move |conn, _| {
774778
let schema_name = site.namespace.clone();
775-
let layout = store.layout(conn, site)?;
779+
let layout = store.layout(conn, site).await?;
776780
let table = resolve_table_name(&layout, &entity_name)?;
777781
let table_name = &table.name;
778782
let indexes =
@@ -783,11 +787,11 @@ impl DeploymentStore {
783787
.await
784788
}
785789

786-
pub(crate) fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
790+
pub(crate) async fn load_indexes(&self, site: Arc<Site>) -> Result<IndexList, StoreError> {
787791
let store = self.clone();
788792
let mut binding = self.get_conn()?;
789793
let conn = binding.deref_mut();
790-
IndexList::load(conn, site, store)
794+
IndexList::load(conn, site, store).await
791795
}
792796

793797
/// Drops an index for a given deployment, concurrently.
@@ -813,7 +817,7 @@ impl DeploymentStore {
813817
let store = self.clone();
814818
let table = table.to_string();
815819
self.with_conn(async move |mut conn, _| {
816-
let layout = store.layout(&mut conn, site.clone())?;
820+
let layout = store.layout(&mut conn, site.clone()).await?;
817821
let table = resolve_table_name(&layout, &table)?;
818822
catalog::set_account_like(&mut conn, &site, &table.name, is_account_like)
819823
.map_err(Into::into)
@@ -858,7 +862,7 @@ impl DeploymentStore {
858862
req: PruneRequest,
859863
mut reporter: Box<dyn PruneReporter>,
860864
) -> Result<Box<dyn PruneReporter>, CancelableError<StoreError>> {
861-
let layout = store.layout(&mut conn, site.clone())?;
865+
let layout = store.layout(&mut conn, site.clone()).await?;
862866
cancel.check_cancel()?;
863867
let state = deployment::state(&mut conn, &site)?;
864868

@@ -910,7 +914,9 @@ impl DeploymentStore {
910914
let store = self.cheap_clone();
911915
let layout = self
912916
.pool
913-
.with_conn(async move |conn, _| store.layout(conn, site.clone()).map_err(|e| e.into()))
917+
.with_conn(async move |conn, _| {
918+
store.layout(conn, site.clone()).await.map_err(|e| e.into())
919+
})
914920
.await?;
915921

916922
Ok(relational::prune::Viewer::new(self.pool.clone(), layout))
@@ -943,11 +949,14 @@ impl DeploymentStore {
943949
.await
944950
}
945951

946-
pub(crate) fn block_time(&self, site: Arc<Site>) -> Result<Option<BlockTime>, StoreError> {
952+
pub(crate) async fn block_time(
953+
&self,
954+
site: Arc<Site>,
955+
) -> Result<Option<BlockTime>, StoreError> {
947956
let store = self.cheap_clone();
948957

949958
let mut conn = self.get_conn()?;
950-
let layout = store.layout(&mut conn, site.cheap_clone())?;
959+
let layout = store.layout(&mut conn, site.cheap_clone()).await?;
951960
layout.last_rollup(&mut conn)
952961
}
953962

@@ -960,7 +969,7 @@ impl DeploymentStore {
960969
let indexer = *indexer;
961970
let site2 = site.cheap_clone();
962971
let store = self.cheap_clone();
963-
let layout = self.find_layout(site.cheap_clone())?;
972+
let layout = self.find_layout(site.cheap_clone()).await?;
964973
let info = self.subgraph_info(site.cheap_clone()).await?;
965974
let poi_digest = layout.input_schema.poi_digest();
966975

@@ -969,7 +978,7 @@ impl DeploymentStore {
969978
let site = site.clone();
970979
cancel.check_cancel()?;
971980

972-
let layout = store.layout(conn, site.cheap_clone())?;
981+
let layout = store.layout(conn, site.cheap_clone()).await?;
973982

974983
let mut block_ptr = block.cheap_clone();
975984
let latest_block_ptr = match Self::block_ptr_with_conn(conn, site.cheap_clone())? {
@@ -1049,20 +1058,20 @@ impl DeploymentStore {
10491058

10501059
/// Get the entity matching `key` from the deployment `site`. Only
10511060
/// consider entities as of the given `block`
1052-
pub(crate) fn get(
1061+
pub(crate) async fn get(
10531062
&self,
10541063
site: Arc<Site>,
10551064
key: &EntityKey,
10561065
block: BlockNumber,
10571066
) -> Result<Option<Entity>, StoreError> {
10581067
let mut conn = self.get_conn()?;
1059-
let layout = self.layout(&mut conn, site)?;
1068+
let layout = self.layout(&mut conn, site).await?;
10601069
layout.find(&mut conn, key, block)
10611070
}
10621071

10631072
/// Retrieve all the entities matching `ids_for_type`, both the type and causality region, from
10641073
/// the deployment `site`. Only consider entities as of the given `block`
1065-
pub(crate) fn get_many(
1074+
pub(crate) async fn get_many(
10661075
&self,
10671076
site: Arc<Site>,
10681077
ids_for_type: &BTreeMap<(EntityType, CausalityRegion), IdList>,
@@ -1072,42 +1081,42 @@ impl DeploymentStore {
10721081
return Ok(BTreeMap::new());
10731082
}
10741083
let mut conn = self.get_conn()?;
1075-
let layout = self.layout(&mut conn, site)?;
1084+
let layout = self.layout(&mut conn, site).await?;
10761085

10771086
layout.find_many(&mut conn, ids_for_type, block)
10781087
}
10791088

1080-
pub(crate) fn get_range(
1089+
pub(crate) async fn get_range(
10811090
&self,
10821091
site: Arc<Site>,
10831092
entity_types: Vec<EntityType>,
10841093
causality_region: CausalityRegion,
10851094
block_range: Range<BlockNumber>,
10861095
) -> Result<BTreeMap<BlockNumber, Vec<EntitySourceOperation>>, StoreError> {
10871096
let mut conn = self.get_conn()?;
1088-
let layout = self.layout(&mut conn, site)?;
1097+
let layout = self.layout(&mut conn, site).await?;
10891098
layout.find_range(&mut conn, entity_types, causality_region, block_range)
10901099
}
10911100

1092-
pub(crate) fn get_derived(
1101+
pub(crate) async fn get_derived(
10931102
&self,
10941103
site: Arc<Site>,
10951104
derived_query: &DerivedEntityQuery,
10961105
block: BlockNumber,
10971106
excluded_keys: &Vec<EntityKey>,
10981107
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
10991108
let mut conn = self.get_conn()?;
1100-
let layout = self.layout(&mut conn, site)?;
1109+
let layout = self.layout(&mut conn, site).await?;
11011110
layout.find_derived(&mut conn, derived_query, block, excluded_keys)
11021111
}
11031112

1104-
pub(crate) fn get_changes(
1113+
pub(crate) async fn get_changes(
11051114
&self,
11061115
site: Arc<Site>,
11071116
block: BlockNumber,
11081117
) -> Result<Vec<EntityOperation>, StoreError> {
11091118
let mut conn = self.get_conn()?;
1110-
let layout = self.layout(&mut conn, site)?;
1119+
let layout = self.layout(&mut conn, site).await?;
11111120
let changes = layout.find_changes(&mut conn, block)?;
11121121

11131122
Ok(changes)
@@ -1144,7 +1153,7 @@ impl DeploymentStore {
11441153
conn.transaction_async(|conn| {
11451154
async {
11461155
// Make the changes
1147-
let layout = self.layout(conn, site.clone())?;
1156+
let layout = self.layout(conn, site.clone()).await?;
11481157

11491158
let section = stopwatch.start_section("apply_entity_modifications");
11501159
let count = self.apply_entity_modifications(
@@ -1334,7 +1343,7 @@ impl DeploymentStore {
13341343
deployment::revert_block_ptr(conn, &site, block_ptr_to, firehose_cursor)?;
13351344

13361345
// Revert the data
1337-
let layout = self.layout(conn, site.clone())?;
1346+
let layout = self.layout(conn, site.clone()).await?;
13381347

13391348
if truncate {
13401349
layout.truncate_tables(conn)?;
@@ -1550,7 +1559,7 @@ impl DeploymentStore {
15501559
site: Arc<Site>,
15511560
graft_src: Option<(Arc<Layout>, BlockPtr, SubgraphDeploymentEntity, IndexList)>,
15521561
) -> Result<(), StoreError> {
1553-
let dst = self.find_layout(site.cheap_clone())?;
1562+
let dst = self.find_layout(site.cheap_clone()).await?;
15541563

15551564
// If `graft_src` is `Some`, then there is a pending graft.
15561565
if let Some((src, block, src_deployment, index_list)) = graft_src {
@@ -1563,7 +1572,8 @@ impl DeploymentStore {
15631572

15641573
let src_manifest_idx_and_name = src_deployment.manifest.template_idx_and_name()?;
15651574
let dst_manifest_idx_and_name = self
1566-
.load_deployment(dst.site.clone())?
1575+
.load_deployment(dst.site.clone())
1576+
.await?
15671577
.manifest
15681578
.template_idx_and_name()?;
15691579

@@ -1612,7 +1622,8 @@ impl DeploymentStore {
16121622
// Analyze all tables for this deployment
16131623
info!(logger, "Analyzing all {} tables", dst.tables.len());
16141624
for entity_name in dst.tables.keys() {
1615-
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)?;
1625+
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), conn)
1626+
.await?;
16161627
}
16171628

16181629
// Rewind the subgraph so that entity versions that are
@@ -1662,7 +1673,7 @@ impl DeploymentStore {
16621673
let mut conn = self.get_conn()?;
16631674
if ENV_VARS.postpone_attribute_index_creation {
16641675
// check if all indexes are valid and recreate them if they aren't
1665-
self.load_indexes(site.clone())?
1676+
self.load_indexes(site.clone()).await?
16661677
.recreate_invalid_indexes(&mut conn, &dst)
16671678
.await?;
16681679
}

store/postgres/src/query_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl QueryStoreTrait for QueryStore {
128128
}
129129

130130
async fn input_schema(&self) -> Result<InputSchema, QueryExecutionError> {
131-
let layout = self.store.find_layout(self.site.cheap_clone())?;
131+
let layout = self.store.find_layout(self.site.cheap_clone()).await?;
132132
Ok(layout.input_schema.cheap_clone())
133133
}
134134

store/postgres/src/relational.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1764,8 +1764,8 @@ impl LayoutCache {
17641764
}
17651765
}
17661766

1767-
fn load(conn: &mut PgConnection, site: Arc<Site>) -> Result<Arc<Layout>, StoreError> {
1768-
let (subgraph_schema, use_bytea_prefix) = deployment::schema(conn, site.as_ref())?;
1767+
async fn load(conn: &mut PgConnection, site: Arc<Site>) -> Result<Arc<Layout>, StoreError> {
1768+
let (subgraph_schema, use_bytea_prefix) = deployment::schema(conn, site.as_ref()).await?;
17691769
let has_causality_region =
17701770
deployment::entities_with_causality_region(conn, site.id, &subgraph_schema)?;
17711771
let catalog = Catalog::load(conn, site.clone(), use_bytea_prefix, has_causality_region)?;
@@ -1797,7 +1797,7 @@ impl LayoutCache {
17971797
/// Get the layout for `site`. If it's not in cache, load it. If it is
17981798
/// expired, try to refresh it if there isn't another refresh happening
17991799
/// already
1800-
pub fn get(
1800+
pub async fn get(
18011801
&self,
18021802
logger: &Logger,
18031803
conn: &mut PgConnection,
@@ -1827,7 +1827,7 @@ impl LayoutCache {
18271827
}
18281828
}
18291829
None => {
1830-
let layout = Self::load(conn, site)?;
1830+
let layout = Self::load(conn, site).await?;
18311831
self.cache(layout.cheap_clone());
18321832
layout
18331833
}

store/postgres/src/relational/index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ pub fn load_indexes_from_table(
763763
}
764764

765765
impl IndexList {
766-
pub fn load(
766+
pub async fn load(
767767
conn: &mut PgConnection,
768768
site: Arc<Site>,
769769
store: DeploymentStore,
@@ -772,7 +772,7 @@ impl IndexList {
772772
indexes: HashMap::new(),
773773
};
774774
let schema_name = site.namespace.clone();
775-
let layout = store.layout(conn, site)?;
775+
let layout = store.layout(conn, site).await?;
776776
for (_, table) in &layout.tables {
777777
let indexes = load_indexes_from_table(conn, table, schema_name.as_str())?;
778778
list.indexes.insert(table.name.to_string(), indexes);

0 commit comments

Comments
 (0)