From 49aebb2b001dbb0acefd8482755013363d16c580 Mon Sep 17 00:00:00 2001 From: erhant Date: Mon, 3 Feb 2025 18:40:42 +0300 Subject: [PATCH] fix bad error return in node, some log fixes & version fix --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- compute/src/main.rs | 1 + compute/src/node/core.rs | 4 +++- p2p/src/behaviour.rs | 9 ++++++--- p2p/src/client.rs | 19 +++---------------- p2p/src/commands.rs | 2 +- 7 files changed, 19 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1554547..786cdbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1031,7 +1031,7 @@ dependencies = [ [[package]] name = "dkn-compute" -version = "0.3.0" +version = "0.3.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -1066,7 +1066,7 @@ dependencies = [ [[package]] name = "dkn-p2p" -version = "0.3.0" +version = "0.3.1" dependencies = [ "dkn-utils", "env_logger 0.11.6", @@ -1082,11 +1082,11 @@ dependencies = [ [[package]] name = "dkn-utils" -version = "0.3.0" +version = "0.3.1" [[package]] name = "dkn-workflows" -version = "0.3.0" +version = "0.3.1" dependencies = [ "dkn-utils", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 0863fbb..af34c68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ default-members = ["compute"] [workspace.package] edition = "2021" -version = "0.3.0" +version = "0.3.1" license = "Apache-2.0" readme = "README.md" diff --git a/compute/src/main.rs b/compute/src/main.rs index 7a3ac82..3f04834 100644 --- a/compute/src/main.rs +++ b/compute/src/main.rs @@ -15,6 +15,7 @@ async fn main() -> Result<()> { .filter_module("dkn_compute", log::LevelFilter::Info) .filter_module("dkn_p2p", log::LevelFilter::Info) .filter_module("dkn_workflows", log::LevelFilter::Info) + .filter_module("libp2p", log::LevelFilter::Error) .parse_default_env() // reads RUST_LOG variable .init(); diff --git a/compute/src/node/core.rs b/compute/src/node/core.rs index 235f448..1847e24 100644 --- a/compute/src/node/core.rs +++ b/compute/src/node/core.rs @@ -32,7 +32,9 @@ impl DriaComputeNode { let task_response_msg = task_response_msg_opt.ok_or( eyre!("Publish channel closed unexpectedly, we still have {} batch and {} single tasks.", self.pending_tasks_batch.len(), self.pending_tasks_single.len()) )?; { - self.handle_task_response(task_response_msg).await?; + if let Err(e) = self.handle_task_response(task_response_msg).await { + log::error!("Error responding to task: {:?}", e); + } } }, diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index a1cecac..f5f6da7 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -54,10 +54,13 @@ fn create_request_response_behaviour( protocol_name: StreamProtocol, ) -> request_response::cbor::Behaviour, Vec> { use request_response::{Behaviour, Config, ProtocolSupport}; - const REQUEST_RESPONSE_TIMEOUT: u64 = 180; - Behaviour::new([(protocol_name, ProtocolSupport::Full)], - Config::default().with_request_timeout(Duration::from_secs(REQUEST_RESPONSE_TIMEOUT))) + const REQUEST_RESPONSE_TIMEOUT_SECS: u64 = 180; + + Behaviour::new( + [(protocol_name, ProtocolSupport::Full)], + Config::default().with_request_timeout(Duration::from_secs(REQUEST_RESPONSE_TIMEOUT_SECS)), + ) } /// Configures the connection limits. diff --git a/p2p/src/client.rs b/p2p/src/client.rs index 55c08eb..ca61e2c 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -263,21 +263,9 @@ impl DriaP2PClient { ); } DriaP2PCommand::Peers { sender } => { - let mesh = self - .swarm - .behaviour() - .gossipsub - .all_mesh_peers() - .cloned() - .collect(); - let all = self - .swarm - .behaviour() - .gossipsub - .all_peers() - .map(|(p, _)| p) - .cloned() - .collect(); + let gossipsub = &self.swarm.behaviour().gossipsub; + let mesh = gossipsub.all_mesh_peers().cloned().collect(); + let all = gossipsub.all_peers().map(|(p, _)| p).cloned().collect(); let _ = sender.send((mesh, all)); } DriaP2PCommand::PeerCounts { sender } => { @@ -340,7 +328,6 @@ impl DriaP2PClient { response, } => { // while we support the protocol, we dont really make any requests - // TODO: should p2p crate support this? log::warn!( "Unexpected response message with request_id {}: {:?}", request_id, diff --git a/p2p/src/commands.rs b/p2p/src/commands.rs index 1a25ef8..ce971e1 100644 --- a/p2p/src/commands.rs +++ b/p2p/src/commands.rs @@ -183,7 +183,7 @@ impl DriaP2PCommander { receiver .await .wrap_err("could not receive")? - .wrap_err("could not publish") + .wrap_err("could not respond") } pub async fn request(