Skip to content

Commit

Permalink
fix bad error return in node, some log fixes & version fix
Browse files Browse the repository at this point in the history
  • Loading branch information
erhant committed Feb 3, 2025
1 parent 624a60a commit 49aebb2
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 26 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ default-members = ["compute"]

[workspace.package]
edition = "2021"
version = "0.3.0"
version = "0.3.1"
license = "Apache-2.0"
readme = "README.md"

Expand Down
1 change: 1 addition & 0 deletions compute/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async fn main() -> Result<()> {
.filter_module("dkn_compute", log::LevelFilter::Info)
.filter_module("dkn_p2p", log::LevelFilter::Info)
.filter_module("dkn_workflows", log::LevelFilter::Info)
.filter_module("libp2p", log::LevelFilter::Error)
.parse_default_env() // reads RUST_LOG variable
.init();

Expand Down
4 changes: 3 additions & 1 deletion compute/src/node/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ impl DriaComputeNode {
let task_response_msg = task_response_msg_opt.ok_or(
eyre!("Publish channel closed unexpectedly, we still have {} batch and {} single tasks.", self.pending_tasks_batch.len(), self.pending_tasks_single.len())
)?; {
self.handle_task_response(task_response_msg).await?;
if let Err(e) = self.handle_task_response(task_response_msg).await {
log::error!("Error responding to task: {:?}", e);
}
}
},

Expand Down
9 changes: 6 additions & 3 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ fn create_request_response_behaviour(
protocol_name: StreamProtocol,
) -> request_response::cbor::Behaviour<Vec<u8>, Vec<u8>> {
use request_response::{Behaviour, Config, ProtocolSupport};
const REQUEST_RESPONSE_TIMEOUT: u64 = 180;

Behaviour::new([(protocol_name, ProtocolSupport::Full)],
Config::default().with_request_timeout(Duration::from_secs(REQUEST_RESPONSE_TIMEOUT)))
const REQUEST_RESPONSE_TIMEOUT_SECS: u64 = 180;

Behaviour::new(
[(protocol_name, ProtocolSupport::Full)],
Config::default().with_request_timeout(Duration::from_secs(REQUEST_RESPONSE_TIMEOUT_SECS)),
)
}

/// Configures the connection limits.
Expand Down
19 changes: 3 additions & 16 deletions p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,9 @@ impl DriaP2PClient {
);
}
DriaP2PCommand::Peers { sender } => {
let mesh = self
.swarm
.behaviour()
.gossipsub
.all_mesh_peers()
.cloned()
.collect();
let all = self
.swarm
.behaviour()
.gossipsub
.all_peers()
.map(|(p, _)| p)
.cloned()
.collect();
let gossipsub = &self.swarm.behaviour().gossipsub;
let mesh = gossipsub.all_mesh_peers().cloned().collect();
let all = gossipsub.all_peers().map(|(p, _)| p).cloned().collect();
let _ = sender.send((mesh, all));
}
DriaP2PCommand::PeerCounts { sender } => {
Expand Down Expand Up @@ -340,7 +328,6 @@ impl DriaP2PClient {
response,
} => {
// while we support the protocol, we dont really make any requests
// TODO: should p2p crate support this?
log::warn!(
"Unexpected response message with request_id {}: {:?}",
request_id,
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl DriaP2PCommander {
receiver
.await
.wrap_err("could not receive")?
.wrap_err("could not publish")
.wrap_err("could not respond")
}

pub async fn request(
Expand Down

0 comments on commit 49aebb2

Please sign in to comment.