From c78f3fcb15b88c9153c52a5dc9e96e89110de8c3 Mon Sep 17 00:00:00 2001 From: Sebastian Lorenz Date: Sun, 7 Sep 2025 18:45:50 +0200 Subject: [PATCH] feat: add the CID to IPFS retry logs --- graph/src/ipfs/client.rs | 6 +-- graph/src/ipfs/gateway_client.rs | 82 ++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs index 90da991152a..65b5a74a9f6 100644 --- a/graph/src/ipfs/client.rs +++ b/graph/src/ipfs/client.rs @@ -37,7 +37,7 @@ pub trait IpfsClient: Send + Sync + 'static { retry_policy: RetryPolicy, ) -> IpfsResult>> { let fut = retry_policy - .create("IPFS.cat_stream", self.logger()) + .create(format!("IPFS.cat_stream[{}]", path), self.logger()) .no_timeout() .run({ let path = path.to_owned(); @@ -67,7 +67,7 @@ pub trait IpfsClient: Send + Sync + 'static { retry_policy: RetryPolicy, ) -> IpfsResult { let fut = retry_policy - .create("IPFS.cat", self.logger()) + .create(format!("IPFS.cat[{}]", path), self.logger()) .no_timeout() .run({ let path = path.to_owned(); @@ -100,7 +100,7 @@ pub trait IpfsClient: Send + Sync + 'static { retry_policy: RetryPolicy, ) -> IpfsResult { let fut = retry_policy - .create("IPFS.get_block", self.logger()) + .create(format!("IPFS.get_block[{}]", path), self.logger()) .no_timeout() .run({ let path = path.to_owned(); diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index d2ac9f0c8b1..4b190cb0133 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -489,4 +489,86 @@ mod tests { assert_eq!(bytes.as_ref(), b"some data"); } + + #[tokio::test] + async fn operation_names_include_cid_for_debugging() { + use slog::{o, Drain, Logger, Record}; + use std::sync::{Arc, Mutex}; + + // Custom drain to capture log messages + struct LogCapture { + messages: Arc>>, + } + + impl Drain for LogCapture { + type Ok = (); + type Err = std::io::Error; + + fn log( + &self, + record: &Record, + _: &slog::OwnedKVList, + ) -> std::result::Result { + let message = format!("{}", record.msg()); + self.messages.lock().unwrap().push(message); + Ok(()) + } + } + + let captured_messages = Arc::new(Mutex::new(Vec::new())); + let drain = LogCapture { + messages: captured_messages.clone(), + }; + let logger = Logger::root(drain.fuse(), o!()); + + let server = mock_server().await; + let client = Arc::new(IpfsGatewayClient::new_unchecked(server.uri(), &logger).unwrap()); + + // Set up mock to fail twice then succeed to trigger retry with warning logs + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR)) + .up_to_n_times(2) + .expect(2) + .mount(&server) + .await; + + mock_get() + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"data")) + .expect(1) + .mount(&server) + .await; + + let path = make_path(); + + // This should trigger retry logs because we set up failures first + let _result = client + .cat(&path, usize::MAX, None, RetryPolicy::NonDeterministic) + .await + .unwrap(); + + // Check that the captured log messages include the CID + let messages = captured_messages.lock().unwrap(); + let retry_messages: Vec<_> = messages + .iter() + .filter(|msg| msg.contains("Trying again after")) + .collect(); + + assert!( + !retry_messages.is_empty(), + "Expected retry messages but found none. All messages: {:?}", + *messages + ); + + // Verify that the operation name includes the CID + let expected_cid = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn"; + let has_cid_in_operation = retry_messages + .iter() + .any(|msg| msg.contains(&format!("IPFS.cat[{}]", expected_cid))); + + assert!( + has_cid_in_operation, + "Expected operation name to include CID [{}] in retry messages: {:?}", + expected_cid, retry_messages + ); + } }