Skip to content

Commit f085602

Browse files
committed
add ipfs context and use metrics
1 parent 7c9bc77 commit f085602

File tree

35 files changed

+797
-310
lines changed

35 files changed

+797
-310
lines changed

chain/ethereum/src/data_source.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use anyhow::{anyhow, Error};
22
use anyhow::{ensure, Context};
33
use graph::blockchain::{BlockPtr, TriggerWithHandler};
4+
use graph::components::link_resolver::LinkResolverContext;
45
use graph::components::metrics::subgraph::SubgraphInstanceMetrics;
56
use graph::components::store::{EthereumCallCache, StoredDynamicDataSource};
67
use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError};
78
use graph::components::trigger_processor::RunnableTriggers;
9+
use graph::data::subgraph::DeploymentHash;
810
use graph::data_source::common::{
911
CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedMappingABI,
1012
};
@@ -1197,6 +1199,7 @@ pub struct UnresolvedDataSource {
11971199
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
11981200
async fn resolve(
11991201
self,
1202+
deployment_hash: &DeploymentHash,
12001203
resolver: &Arc<dyn LinkResolver>,
12011204
logger: &Logger,
12021205
manifest_idx: u32,
@@ -1210,7 +1213,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
12101213
context,
12111214
} = self;
12121215

1213-
let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
1216+
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
12141217
format!(
12151218
"failed to resolve data source {} with source_address {:?} and source_start_block {}",
12161219
name, source.address, source.start_block
@@ -1244,6 +1247,7 @@ pub struct DataSourceTemplate {
12441247
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
12451248
async fn resolve(
12461249
self,
1250+
deployment_hash: &DeploymentHash,
12471251
resolver: &Arc<dyn LinkResolver>,
12481252
logger: &Logger,
12491253
manifest_idx: u32,
@@ -1257,7 +1261,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
12571261
} = self;
12581262

12591263
let mapping = mapping
1260-
.resolve(resolver, logger)
1264+
.resolve(deployment_hash, resolver, logger)
12611265
.await
12621266
.with_context(|| format!("failed to resolve data source template {}", name))?;
12631267

@@ -1355,6 +1359,7 @@ impl FindMappingABI for Mapping {
13551359
impl UnresolvedMapping {
13561360
pub async fn resolve(
13571361
self,
1362+
deployment_hash: &DeploymentHash,
13581363
resolver: &Arc<dyn LinkResolver>,
13591364
logger: &Logger,
13601365
) -> Result<Mapping, anyhow::Error> {
@@ -1377,13 +1382,17 @@ impl UnresolvedMapping {
13771382
abis.into_iter()
13781383
.map(|unresolved_abi| async {
13791384
Result::<_, Error>::Ok(Arc::new(
1380-
unresolved_abi.resolve(resolver, logger).await?,
1385+
unresolved_abi
1386+
.resolve(deployment_hash, resolver, logger)
1387+
.await?,
13811388
))
13821389
})
13831390
.collect::<FuturesOrdered<_>>()
13841391
.try_collect::<Vec<_>>(),
13851392
async {
1386-
let module_bytes = resolver.cat(logger, &link).await?;
1393+
let module_bytes = resolver
1394+
.cat(LinkResolverContext::new(deployment_hash, logger), &link)
1395+
.await?;
13871396
Ok(Arc::new(module_bytes))
13881397
},
13891398
)

chain/near/src/data_source.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use graph::anyhow::Context;
22
use graph::blockchain::{Block, TriggerWithHandler};
3+
use graph::components::link_resolver::LinkResolverContext;
34
use graph::components::store::StoredDynamicDataSource;
45
use graph::components::subgraph::InstanceDSTemplateInfo;
5-
use graph::data::subgraph::DataSourceContext;
6+
use graph::data::subgraph::{DataSourceContext, DeploymentHash};
67
use graph::prelude::SubgraphManifestValidationError;
78
use graph::{
89
anyhow::{anyhow, Error},
@@ -330,6 +331,7 @@ pub struct UnresolvedDataSource {
330331
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
331332
async fn resolve(
332333
self,
334+
deployment_hash: &DeploymentHash,
333335
resolver: &Arc<dyn LinkResolver>,
334336
logger: &Logger,
335337
_manifest_idx: u32,
@@ -343,7 +345,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
343345
context,
344346
} = self;
345347

346-
let mapping = mapping.resolve(resolver, logger).await.with_context(|| {
348+
let mapping = mapping.resolve(deployment_hash, resolver, logger).await.with_context(|| {
347349
format!(
348350
"failed to resolve data source {} with source_account {:?} and source_start_block {}",
349351
name, source.account, source.start_block
@@ -369,6 +371,7 @@ pub type DataSourceTemplate = BaseDataSourceTemplate<Mapping>;
369371
impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTemplate {
370372
async fn resolve(
371373
self,
374+
deployment_hash: &DeploymentHash,
372375
resolver: &Arc<dyn LinkResolver>,
373376
logger: &Logger,
374377
_manifest_idx: u32,
@@ -381,7 +384,7 @@ impl blockchain::UnresolvedDataSourceTemplate<Chain> for UnresolvedDataSourceTem
381384
} = self;
382385

383386
let mapping = mapping
384-
.resolve(resolver, logger)
387+
.resolve(deployment_hash, resolver, logger)
385388
.await
386389
.with_context(|| format!("failed to resolve data source template {}", name))?;
387390

@@ -432,6 +435,7 @@ pub struct UnresolvedMapping {
432435
impl UnresolvedMapping {
433436
pub async fn resolve(
434437
self,
438+
deployment_hash: &DeploymentHash,
435439
resolver: &Arc<dyn LinkResolver>,
436440
logger: &Logger,
437441
) -> Result<Mapping, Error> {
@@ -447,7 +451,7 @@ impl UnresolvedMapping {
447451
let api_version = semver::Version::parse(&api_version)?;
448452

449453
let module_bytes = resolver
450-
.cat(logger, &link)
454+
.cat(LinkResolverContext::new(deployment_hash, logger), &link)
451455
.await
452456
.with_context(|| format!("failed to resolve mapping {}", link.link))?;
453457

chain/substreams/src/data_source.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use anyhow::{anyhow, Context, Error};
44
use graph::{
55
blockchain,
66
cheap_clone::CheapClone,
7-
components::{link_resolver::LinkResolver, subgraph::InstanceDSTemplateInfo},
7+
components::{
8+
link_resolver::{LinkResolver, LinkResolverContext},
9+
subgraph::InstanceDSTemplateInfo,
10+
},
11+
data::subgraph::DeploymentHash,
812
prelude::{async_trait, BlockNumber, Link},
913
slog::Logger,
1014
};
@@ -184,11 +188,17 @@ pub struct UnresolvedMapping {
184188
impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
185189
async fn resolve(
186190
self,
191+
deployment_hash: &DeploymentHash,
187192
resolver: &Arc<dyn LinkResolver>,
188193
logger: &Logger,
189194
_manifest_idx: u32,
190195
) -> Result<DataSource, Error> {
191-
let content = resolver.cat(logger, &self.source.package.file).await?;
196+
let content = resolver
197+
.cat(
198+
LinkResolverContext::new(deployment_hash, logger),
199+
&self.source.package.file,
200+
)
201+
.await?;
192202

193203
let mut package = graph::substreams::Package::decode(content.as_ref())?;
194204

@@ -234,7 +244,7 @@ impl blockchain::UnresolvedDataSource<Chain> for UnresolvedDataSource {
234244
let handler = match (self.mapping.handler, self.mapping.file) {
235245
(Some(handler), Some(file)) => {
236246
let module_bytes = resolver
237-
.cat(logger, &file)
247+
.cat(LinkResolverContext::new(deployment_hash, logger), &file)
238248
.await
239249
.with_context(|| format!("failed to resolve mapping {}", file.link))?;
240250

@@ -314,6 +324,7 @@ impl blockchain::DataSourceTemplate<Chain> for NoopDataSourceTemplate {
314324
impl blockchain::UnresolvedDataSourceTemplate<Chain> for NoopDataSourceTemplate {
315325
async fn resolve(
316326
self,
327+
_deployment_hash: &DeploymentHash,
317328
_resolver: &Arc<dyn LinkResolver>,
318329
_logger: &Logger,
319330
_manifest_idx: u32,
@@ -329,7 +340,7 @@ mod test {
329340
use anyhow::Error;
330341
use graph::{
331342
blockchain::{DataSource as _, UnresolvedDataSource as _},
332-
components::link_resolver::LinkResolver,
343+
components::link_resolver::{LinkResolver, LinkResolverContext},
333344
data::subgraph::LATEST_VERSION,
334345
prelude::{async_trait, serde_yaml, JsonValueStream, Link},
335346
slog::{o, Discard, Logger},
@@ -433,7 +444,10 @@ mod test {
433444
let ds: UnresolvedDataSource = serde_yaml::from_str(TEMPLATE_DATA_SOURCE).unwrap();
434445
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
435446
let logger = Logger::root(Discard, o!());
436-
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
447+
let ds: DataSource = ds
448+
.resolve(&Default::default(), &link_resolver, &logger, 0)
449+
.await
450+
.unwrap();
437451
let expected = DataSource {
438452
kind: SUBSTREAMS_KIND.into(),
439453
network: Some("mainnet".into()),
@@ -470,7 +484,10 @@ mod test {
470484
serde_yaml::from_str(TEMPLATE_DATA_SOURCE_WITH_PARAMS).unwrap();
471485
let link_resolver: Arc<dyn LinkResolver> = Arc::new(NoopLinkResolver {});
472486
let logger = Logger::root(Discard, o!());
473-
let ds: DataSource = ds.resolve(&link_resolver, &logger, 0).await.unwrap();
487+
let ds: DataSource = ds
488+
.resolve(&Default::default(), &link_resolver, &logger, 0)
489+
.await
490+
.unwrap();
474491
let expected = DataSource {
475492
kind: SUBSTREAMS_KIND.into(),
476493
network: Some("mainnet".into()),
@@ -705,17 +722,21 @@ mod test {
705722
unimplemented!()
706723
}
707724

708-
async fn cat(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
725+
async fn cat(&self, _ctx: LinkResolverContext, _link: &Link) -> Result<Vec<u8>, Error> {
709726
Ok(gen_package().encode_to_vec())
710727
}
711728

712-
async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result<Vec<u8>, Error> {
729+
async fn get_block(
730+
&self,
731+
_ctx: LinkResolverContext,
732+
_link: &Link,
733+
) -> Result<Vec<u8>, Error> {
713734
unimplemented!()
714735
}
715736

716737
async fn json_stream(
717738
&self,
718-
_logger: &Logger,
739+
_ctx: LinkResolverContext,
719740
_link: &Link,
720741
) -> Result<JsonValueStream, Error> {
721742
unimplemented!()

core/src/polling_monitor/ipfs_service.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@ use anyhow::anyhow;
55
use anyhow::Error;
66
use bytes::Bytes;
77
use graph::futures03::future::BoxFuture;
8-
use graph::ipfs::ContentPath;
9-
use graph::ipfs::IpfsClient;
10-
use graph::ipfs::RetryPolicy;
8+
use graph::ipfs::{ContentPath, IpfsClient, IpfsContext, RetryPolicy};
119
use graph::{derive::CheapClone, prelude::CheapClone};
1210
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
1311

14-
pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
12+
pub type IpfsService = Buffer<IpfsRequest, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
13+
14+
#[derive(Clone, Debug)]
15+
pub struct IpfsRequest {
16+
pub ctx: IpfsContext,
17+
pub path: ContentPath,
18+
}
1519

1620
pub fn ipfs_service(
1721
client: Arc<dyn IpfsClient>,
@@ -43,7 +47,10 @@ struct IpfsServiceInner {
4347
}
4448

4549
impl IpfsServiceInner {
46-
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
50+
async fn call_inner(
51+
self,
52+
IpfsRequest { ctx, path }: IpfsRequest,
53+
) -> Result<Option<Bytes>, Error> {
4754
let multihash = path.cid().hash().code();
4855
if !SAFE_MULTIHASHES.contains(&multihash) {
4956
return Err(anyhow!("CID multihash {} is not allowed", multihash));
@@ -52,6 +59,7 @@ impl IpfsServiceInner {
5259
let res = self
5360
.client
5461
.cat(
62+
ctx,
5563
&path,
5664
self.max_file_size,
5765
Some(self.timeout),
@@ -126,14 +134,24 @@ mod test {
126134

127135
let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;
128136

129-
let client =
130-
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
131-
.unwrap();
137+
let client = IpfsRpcClient::new_unchecked(
138+
ServerAddress::local_rpc_api(),
139+
Default::default(),
140+
&graph::log::discard(),
141+
)
142+
.unwrap();
132143

133144
let svc = ipfs_service(Arc::new(client), 100000, Duration::from_secs(30), 10);
134145

135146
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
136-
let content = svc.oneshot(path).await.unwrap().unwrap();
147+
let content = svc
148+
.oneshot(IpfsRequest {
149+
ctx: Default::default(),
150+
path,
151+
})
152+
.await
153+
.unwrap()
154+
.unwrap();
137155

138156
assert_eq!(content.to_vec(), random_bytes);
139157
}
@@ -157,7 +175,8 @@ mod test {
157175
const CID: &str = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";
158176

159177
let server = MockServer::start().await;
160-
let ipfs_client = IpfsRpcClient::new_unchecked(server.uri(), &discard()).unwrap();
178+
let ipfs_client =
179+
IpfsRpcClient::new_unchecked(server.uri(), Default::default(), &discard()).unwrap();
161180
let ipfs_service = ipfs_service(Arc::new(ipfs_client), 10, Duration::from_secs(1), 1);
162181
let path = ContentPath::new(CID).unwrap();
163182

@@ -179,6 +198,12 @@ mod test {
179198
.await;
180199

181200
// This means that we never reached the successful response.
182-
ipfs_service.oneshot(path).await.unwrap_err();
201+
ipfs_service
202+
.oneshot(IpfsRequest {
203+
ctx: Default::default(),
204+
path,
205+
})
206+
.await
207+
.unwrap_err();
183208
}
184209
}

0 commit comments

Comments
 (0)