Skip to content

Commit

Permalink
feat: remove dht publication (#449)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd authored Feb 15, 2024
1 parent 90405f3 commit 7030341
Show file tree
Hide file tree
Showing 22 changed files with 861 additions and 880 deletions.
3 changes: 2 additions & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[profile.default]
slow-timeout = { period = "60s", terminate-after = 2 }
slow-timeout = { period = "60s", terminate-after = 1 }
leak-timeout = "10s"

[test-groups]
serial-integration = { max-threads = 1 }
Expand Down
178 changes: 172 additions & 6 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
use std::borrow::Cow;
use std::pin::Pin;
use std::task::Poll;

use crate::error::P2PError;
use crate::{config::DiscoveryConfig, error::CommandExecutionError};
use libp2p::kad::Event;

use libp2p::kad::{
BootstrapOk, BootstrapResult, Event as KademliaEvent, ProgressStep, QueryId, QueryResult,
};
use libp2p::swarm::ToSwarm;
use libp2p::{
identity::Keypair,
kad::{store::MemoryStore, Behaviour, BucketInserts, Config},
swarm::NetworkBehaviour,
Multiaddr, PeerId,
};
use tokio::sync::oneshot;
use tracing::info;
use tracing::{debug, error, info};

pub type PendingRecordRequest = oneshot::Sender<Result<Vec<Multiaddr>, CommandExecutionError>>;

/// DiscoveryBehaviour is responsible to discover and manage connections with peers
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
pub(crate) struct DiscoveryBehaviour {
/// The inner kademlia behaviour
pub(crate) inner: Behaviour<MemoryStore>,
/// The current bootstrap query id used to track the progress of the bootstrap
/// and to avoid to start a new bootstrap query if the previous one is still in progress
pub(crate) current_bootstrap_query_id: Option<QueryId>,
/// The next bootstrap query interval used to schedule the next bootstrap query
pub(crate) next_bootstrap_query: Option<Pin<Box<tokio::time::Interval>>>,
}

impl DiscoveryBehaviour {
Expand Down Expand Up @@ -48,10 +59,41 @@ impl DiscoveryBehaviour {
"Adding the known peer:{} reachable at {}",
&known_peer.0, &known_peer.1
);
kademlia.add_address(&known_peer.0, known_peer.1.clone());
let x = kademlia.add_address(&known_peer.0, known_peer.1.clone());
info!(
"Adding the known peer:{} reachable at {} - {:?}",
&known_peer.0, &known_peer.1, x
);
}

Self {
inner: kademlia,
current_bootstrap_query_id: None,
next_bootstrap_query: Some(Box::pin(tokio::time::interval(config.bootstrap_interval))),
}
}

/// Start the kademlia bootstrap process if it is not already in progress.
/// The bootstrap process is used to discover new peers in the network.
/// The bootstrap process starts by sending a `FIND_NODE` query of the local PeerId in the DHT.
/// Then multiple random PeerId are created in order to randomly walk the network.
pub fn bootstrap(&mut self) -> Result<(), P2PError> {
if self.current_bootstrap_query_id.is_none() {
match self.inner.bootstrap() {
Ok(query_id) => {
info!("Started kademlia bootstrap with query_id: {query_id:?}");
self.current_bootstrap_query_id = Some(query_id);
}
Err(error) => {
error!("Unable to start kademlia bootstrap: {error:?}");
return Err(P2PError::BootstrapError(
"Unable to start kademlia bootstrap",
));
}
}
}

Self { inner: kademlia }
Ok(())
}

pub fn get_addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
Expand All @@ -66,3 +108,127 @@ impl DiscoveryBehaviour {
}
}
}

impl NetworkBehaviour for DiscoveryBehaviour {
type ConnectionHandler = <Behaviour<MemoryStore> as NetworkBehaviour>::ConnectionHandler;

type ToSwarm = KademliaEvent;

fn handle_established_inbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: libp2p::core::Endpoint,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<Self::ConnectionHandler>) {
self.inner.on_swarm_event(event)
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: libp2p::swarm::ConnectionId,
event: libp2p::swarm::THandlerOutEvent<Self>,
) {
self.inner
.on_connection_handler_event(peer_id, connection_id, event)
}

fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
params: &mut impl libp2p::swarm::PollParameters,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// Poll the kademlia bootstrap interval future in order to define if we need to call the
// `bootstrap`
if let Some(next_bootstrap_query) = self.next_bootstrap_query.as_mut() {
if next_bootstrap_query.poll_tick(cx).is_ready() {
if let Err(error) = self.bootstrap() {
error!("Error while create bootstrap query: {error:?}");
}
}
}

if let Poll::Ready(event) = self.inner.poll(cx, params) {
match event {
// When a Bootstrap query ends, we reset the `query_id`
ToSwarm::GenerateEvent(KademliaEvent::OutboundQueryProgressed {
id,
result:
result @ QueryResult::Bootstrap(BootstrapResult::Ok(BootstrapOk {
num_remaining: 0,
..
})),
step: step @ ProgressStep { last: true, .. },
stats,
}) if Some(&id) == self.current_bootstrap_query_id.as_ref() => {
if let Some(interval) = self.next_bootstrap_query.as_mut() {
interval.reset();
};

self.current_bootstrap_query_id = None;
debug!("Kademlia bootstrap completed with query_id: {id:?}");

return Poll::Ready(ToSwarm::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id,
result,
stats,
step,
},
));
}
event => {
return Poll::Ready(event);
}
}
}

Poll::Pending
}

fn handle_pending_inbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), libp2p::swarm::ConnectionDenied> {
self.inner
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}

fn handle_pending_outbound_connection(
&mut self,
connection_id: libp2p::swarm::ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: libp2p::core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
self.inner.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}
}
11 changes: 6 additions & 5 deletions crates/topos-p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
use std::{num::NonZeroUsize, time::Duration};

pub struct NetworkConfig {
pub publish_retry: usize,
pub minimum_cluster_size: usize,
pub client_retry_ttl: u64,
pub discovery: DiscoveryConfig,
pub yamux_max_buffer_size: usize,
pub yamux_window_size: Option<u32>,
pub is_bootnode: bool,
}

impl Default for NetworkConfig {
fn default() -> Self {
Self {
publish_retry: Self::PUBLISH_RETRY,
minimum_cluster_size: Self::MINIMUM_CLUSTER_SIZE,
client_retry_ttl: Self::CLIENT_RETRY_TTL,
discovery: Default::default(),
yamux_max_buffer_size: usize::MAX,
yamux_window_size: None,
is_bootnode: false,
}
}
}

impl NetworkConfig {
pub const MINIMUM_CLUSTER_SIZE: usize = 5;
pub const PUBLISH_RETRY: usize = 10;
pub const CLIENT_RETRY_TTL: u64 = 200;
}

Expand All @@ -35,6 +30,8 @@ pub struct DiscoveryConfig {
pub replication_interval: Option<Duration>,
pub publication_interval: Option<Duration>,
pub provider_publication_interval: Option<Duration>,
/// Interval at which the node will send bootstrap query to the network
pub bootstrap_interval: Duration,
}

impl Default for DiscoveryConfig {
Expand All @@ -44,11 +41,15 @@ impl Default for DiscoveryConfig {
replication_interval: Some(Duration::from_secs(10)),
publication_interval: Some(Duration::from_secs(10)),
provider_publication_interval: Some(Duration::from_secs(10)),
bootstrap_interval: Duration::from_secs(Self::BOOTSTRAP_INTERVAL),
}
}
}

impl DiscoveryConfig {
/// Default bootstrap interval in seconds
pub const BOOTSTRAP_INTERVAL: u64 = 60;

pub fn with_replication_factor(mut self, replication_factor: NonZeroUsize) -> Self {
self.replication_factor = replication_factor;

Expand Down
13 changes: 0 additions & 13 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ impl<'a> NetworkBuilder<'a> {
self
}

pub fn publish_retry(mut self, retry: usize) -> Self {
self.config.publish_retry = retry;

self
}

pub fn minimum_cluster_size(mut self, size: usize) -> Self {
self.config.minimum_cluster_size = size;

Expand Down Expand Up @@ -204,17 +198,10 @@ impl<'a> NetworkBuilder<'a> {
local_peer_id: peer_id,
listening_on: listen_addr,
public_addresses,
bootstrapped: false,
active_listeners: HashSet::new(),
pending_record_requests: HashMap::new(),
shutdown,
},
))
}

pub fn is_bootnode(mut self, is_bootnode: bool) -> Self {
self.config.is_bootnode = is_bootnode;

self
}
}
1 change: 1 addition & 0 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl EventHandler<Box<Event>> for Runtime {
Event::UnroutablePeer { peer } => {
// Ignored
}

Event::OutboundQueryProgressed {
result: QueryResult::Bootstrap(res),
id,
Expand Down
Loading

0 comments on commit 7030341

Please sign in to comment.