Skip to content

Commit 53e8e39

Browse files
committed
graph: Log the path we are accessing in IPFS errors
1 parent 12a7f50 commit 53e8e39

File tree

1 file changed

+49
-47
lines changed

1 file changed

+49
-47
lines changed

graph/src/ipfs/client.rs

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ use crate::ipfs::ContentPath;
1414
use crate::ipfs::IpfsError;
1515
use crate::ipfs::IpfsResult;
1616
use crate::ipfs::RetryPolicy;
17+
use crate::util::futures::RetryConfigNoTimeout;
18+
19+
fn retry_no_timeout<O: Send + Sync + 'static>(
20+
retry_policy: RetryPolicy,
21+
opname: &str,
22+
logger: &Logger,
23+
path: &ContentPath,
24+
) -> RetryConfigNoTimeout<O, IpfsError> {
25+
let logger = logger.new(slog::o!("path" => path.to_string()));
26+
retry_policy.create(opname, &logger).no_timeout()
27+
}
1728

1829
/// A read-only connection to an IPFS server.
1930
#[async_trait]
@@ -36,19 +47,16 @@ pub trait IpfsClient: Send + Sync + 'static {
3647
timeout: Option<Duration>,
3748
retry_policy: RetryPolicy,
3849
) -> IpfsResult<BoxStream<'static, IpfsResult<Bytes>>> {
39-
let fut = retry_policy
40-
.create("IPFS.cat_stream", self.logger())
41-
.no_timeout()
42-
.run({
43-
let path = path.to_owned();
50+
let fut = retry_no_timeout(retry_policy, "IPFS.cat_stream", self.logger(), path).run({
51+
let path = path.to_owned();
4452

45-
move || {
46-
let path = path.clone();
47-
let client = self.clone();
53+
move || {
54+
let path = path.clone();
55+
let client = self.clone();
4856

49-
async move { client.call(IpfsRequest::Cat(path)).await }
50-
}
51-
});
57+
async move { client.call(IpfsRequest::Cat(path)).await }
58+
}
59+
});
5260

5361
let resp = run_with_optional_timeout(path, fut, timeout).await?;
5462

@@ -66,25 +74,22 @@ pub trait IpfsClient: Send + Sync + 'static {
6674
timeout: Option<Duration>,
6775
retry_policy: RetryPolicy,
6876
) -> IpfsResult<Bytes> {
69-
let fut = retry_policy
70-
.create("IPFS.cat", self.logger())
71-
.no_timeout()
72-
.run({
73-
let path = path.to_owned();
74-
75-
move || {
76-
let path = path.clone();
77-
let client = self.clone();
78-
79-
async move {
80-
client
81-
.call(IpfsRequest::Cat(path))
82-
.await?
83-
.bytes(Some(max_size))
84-
.await
85-
}
77+
let fut = retry_no_timeout(retry_policy, "IPFS.cat", self.logger(), path).run({
78+
let path = path.to_owned();
79+
80+
move || {
81+
let path = path.clone();
82+
let client = self.clone();
83+
84+
async move {
85+
client
86+
.call(IpfsRequest::Cat(path))
87+
.await?
88+
.bytes(Some(max_size))
89+
.await
8690
}
87-
});
91+
}
92+
});
8893

8994
run_with_optional_timeout(path, fut, timeout).await
9095
}
@@ -99,25 +104,22 @@ pub trait IpfsClient: Send + Sync + 'static {
99104
timeout: Option<Duration>,
100105
retry_policy: RetryPolicy,
101106
) -> IpfsResult<Bytes> {
102-
let fut = retry_policy
103-
.create("IPFS.get_block", self.logger())
104-
.no_timeout()
105-
.run({
106-
let path = path.to_owned();
107-
108-
move || {
109-
let path = path.clone();
110-
let client = self.clone();
111-
112-
async move {
113-
client
114-
.call(IpfsRequest::GetBlock(path))
115-
.await?
116-
.bytes(None)
117-
.await
118-
}
107+
let fut = retry_no_timeout(retry_policy, "IPFS.get_block", self.logger(), path).run({
108+
let path = path.to_owned();
109+
110+
move || {
111+
let path = path.clone();
112+
let client = self.clone();
113+
114+
async move {
115+
client
116+
.call(IpfsRequest::GetBlock(path))
117+
.await?
118+
.bytes(None)
119+
.await
119120
}
120-
});
121+
}
122+
});
121123

122124
run_with_optional_timeout(path, fut, timeout).await
123125
}

0 commit comments

Comments
 (0)