Skip to content

Commit

Permalink
feat: update tce config addresses (#415)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd authored Jan 8, 2024
1 parent 7503fa7 commit 476948f
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 116 deletions.
7 changes: 0 additions & 7 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ pub struct NetworkClient {
}

impl NetworkClient {
pub async fn start_listening(&self, peer_addr: libp2p::Multiaddr) -> Result<(), P2PError> {
let (sender, receiver) = oneshot::channel();
let command = Command::StartListening { peer_addr, sender };

Self::send_command_with_receiver(&self.sender, command, receiver).await
}

pub async fn connected_peers(&self) -> Result<Vec<PeerId>, P2PError> {
let (sender, receiver) = oneshot::channel();
Self::send_command_with_receiver(&self.sender, Command::ConnectedPeers { sender }, receiver)
Expand Down
7 changes: 0 additions & 7 deletions crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ use crate::{

#[derive(Debug)]
pub enum Command {
/// Executed when the node is starting
StartListening {
peer_addr: Multiaddr,
sender: oneshot::Sender<Result<(), P2PError>>,
},

/// Command to ask for the current connected peer id list
ConnectedPeers {
sender: oneshot::Sender<Result<Vec<PeerId>, P2PError>>,
Expand Down Expand Up @@ -56,7 +50,6 @@ pub enum Command {
impl Display for Command {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Command::StartListening { .. } => write!(f, "StartListening"),
Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"),
Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"),
Command::Disconnect { .. } => write!(f, "Disconnect"),
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum P2PError {

#[error("Unable to create gRPC client")]
UnableToCreateGrpcClient(#[from] OutboundConnectionError),

#[error("Public addresses is empty")]
MissingPublicAddresses,
}

#[derive(Error, Debug)]
Expand Down
38 changes: 24 additions & 14 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ const TWO_HOURS: Duration = Duration::from_secs(60 * 60 * 2);
pub struct NetworkBuilder<'a> {
discovery_protocol: Option<&'static str>,
peer_key: Option<Keypair>,
listen_addr: Option<Multiaddr>,
exposed_addresses: Option<Multiaddr>,
listen_addresses: Option<Vec<Multiaddr>>,
public_addresses: Option<Vec<Multiaddr>>,
store: Option<MemoryStore>,
known_peers: &'a [(PeerId, Multiaddr)],
local_port: Option<u8>,
Expand Down Expand Up @@ -79,14 +79,14 @@ impl<'a> NetworkBuilder<'a> {
self
}

pub fn exposed_addresses(mut self, addr: Multiaddr) -> Self {
self.exposed_addresses = Some(addr);
pub fn public_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.public_addresses = Some(addresses);

self
}

pub fn listen_addr(mut self, addr: Multiaddr) -> Self {
self.listen_addr = Some(addr);
pub fn listen_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.listen_addresses = Some(addresses);

self
}
Expand Down Expand Up @@ -171,6 +171,22 @@ impl<'a> NetworkBuilder<'a> {

let grpc_over_p2p = GrpcOverP2P::new(command_sender.clone());

let listen_addr = self
.listen_addresses
.take()
.expect("Node requires at least one address to listen for incoming connections");

let public_addresses = self
.public_addresses
.map(|addresses| {
if addresses.is_empty() {
listen_addr.clone()
} else {
addresses
}
})
.unwrap_or(listen_addr.clone());

Ok((
NetworkClient {
retry_ttl: self.config.client_retry_ttl,
Expand All @@ -188,14 +204,8 @@ impl<'a> NetworkBuilder<'a> {
command_receiver,
event_sender,
local_peer_id: peer_id,
listening_on: self
.listen_addr
.take()
.expect("P2P runtime expect a MultiAddr"),
addresses: self
.exposed_addresses
.take()
.expect("P2P runtime expect a MultiAddr"),
listening_on: listen_addr,
public_addresses,
bootstrapped: false,
active_listeners: HashSet::new(),
pending_record_requests: HashMap::new(),
Expand Down
5 changes: 0 additions & 5 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ impl Runtime {

_ = response.send(connection);
}
Command::StartListening { peer_addr, sender } => {
if sender.send(self.start_listening(peer_addr)).is_err() {
warn!("Unable to notify StartListening response: initiator is dropped");
}
}

Command::ConnectedPeers { sender } => {
if sender
Expand Down
40 changes: 20 additions & 20 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ pub struct Runtime {
pub(crate) command_receiver: mpsc::Receiver<Command>,
pub(crate) event_sender: mpsc::Sender<Event>,
pub(crate) local_peer_id: PeerId,
pub(crate) listening_on: Multiaddr,
#[allow(unused)]
pub(crate) addresses: Multiaddr,
pub(crate) listening_on: Vec<Multiaddr>,
pub(crate) public_addresses: Vec<Multiaddr>,
pub(crate) bootstrapped: bool,
pub(crate) is_boot_node: bool,

Expand All @@ -46,13 +45,6 @@ mod handle_command;
mod handle_event;

impl Runtime {
fn start_listening(&mut self, peer_addr: Multiaddr) -> Result<(), P2PError> {
self.swarm
.listen_on(peer_addr)
.map(|_| ())
.map_err(Into::into)
}

pub async fn bootstrap(mut self) -> Result<Self, Box<dyn std::error::Error>> {
if self.bootstrapped {
return Err(Box::new(P2PError::BootstrapError(
Expand All @@ -62,16 +54,24 @@ impl Runtime {

self.bootstrapped = true;

self.swarm.add_external_address(self.addresses.clone());
debug!("Added public addresses: {:?}", self.public_addresses);
for address in &self.public_addresses {
self.swarm.add_external_address(address.clone());
}

let addr = self.listening_on.clone();
if let Err(error) = self.swarm.listen_on(addr) {
error!(
"Couldn't start listening on {} because of {error:?}",
self.listening_on
);
let dht_address = self
.public_addresses
.first()
.map(Multiaddr::to_vec)
.ok_or(P2PError::MissingPublicAddresses)?;

return Err(Box::new(error));
debug!("Starting to listen on {:?}", self.listening_on);
for addr in &self.listening_on {
if let Err(error) = self.swarm.listen_on(addr.clone()) {
error!("Couldn't start listening on {} because of {error:?}", addr);

return Err(Box::new(error));
}
}

debug!("Starting a boot node ? {:?}", self.is_boot_node);
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Runtime {
let key = Key::new(&self.local_peer_id.to_string());
addr_query_id = if let Ok(query_id_record) =
self.swarm.behaviour_mut().discovery.inner.put_record(
Record::new(key, self.addresses.to_vec()),
Record::new(key, dht_address.clone()),
Quorum::Majority,
) {
Some(query_id_record)
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Runtime {
let key = Key::new(&self.local_peer_id.to_string());
if let Ok(query_id_record) =
self.swarm.behaviour_mut().discovery.inner.put_record(
Record::new(key, self.addresses.to_vec()),
Record::new(key, dht_address.clone()),
Quorum::Majority,
)
{
Expand Down
12 changes: 6 additions & 6 deletions crates/topos-p2p/src/tests/command/random_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async fn no_random_peer() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.build()
.await
.expect("Unable to create p2p network");
Expand Down Expand Up @@ -46,8 +46,8 @@ async fn return_a_peer() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.build()
.await
.expect("Unable to create p2p network");
Expand Down Expand Up @@ -75,8 +75,8 @@ async fn return_a_random_peer_among_100() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.build()
.await
.expect("Unable to create p2p network");
Expand Down
14 changes: 11 additions & 3 deletions crates/topos-p2p/src/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::StreamExt;
use libp2p::{
kad::{record::Key, KademliaEvent, PutRecordOk, QueryResult, Record},
swarm::SwarmEvent,
Multiaddr,
};
use rstest::rstest;
use test_log::test;
Expand All @@ -23,8 +24,8 @@ async fn put_value_in_dht() {
let (_, _, runtime) = crate::network::builder()
.peer_key(peer_2.keypair.clone())
.known_peers(&[(peer_1.peer_id(), peer_1.addr.clone())])
.exposed_addresses(peer_2.addr.clone())
.listen_addr(peer_2.addr.clone())
.public_addresses(vec![peer_2.addr.clone()])
.listen_addresses(vec![peer_2.addr.clone()])
.minimum_cluster_size(1)
.discovery_config(
DiscoveryConfig::default().with_replication_factor(NonZeroUsize::new(1).unwrap()),
Expand All @@ -40,7 +41,14 @@ async fn put_value_in_dht() {
_ = kad
.inner
.put_record(
Record::new(input_key.clone(), runtime.addresses.to_vec()),
Record::new(
input_key.clone(),
runtime
.public_addresses
.first()
.map(Multiaddr::to_vec)
.unwrap(),
),
libp2p::kad::Quorum::One,
)
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub async fn dummy_peer() -> (NetworkClient, PeerAddr) {

let (client, _stream, runtime): (_, _, Runtime) = NetworkBuilder::default()
.peer_key(key)
.listen_addr(addr_dummy.clone())
.exposed_addresses(addr_dummy)
.listen_addresses(vec![addr_dummy.clone()])
.public_addresses(vec![addr_dummy])
.build()
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ pub struct TceConfiguration {
pub api_addr: SocketAddr,
pub graphql_api_addr: SocketAddr,
pub metrics_api_addr: SocketAddr,
pub tce_addr: String,
pub tce_local_port: u16,
pub storage: StorageConfiguration,
pub network_bootstrap_timeout: Duration,
pub minimum_cluster_size: usize,
pub version: &'static str,
pub listen_addresses: Vec<Multiaddr>,
pub public_addresses: Vec<Multiaddr>,
}

#[derive(Debug)]
Expand Down
9 changes: 2 additions & 7 deletions crates/topos-tce/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ pub async fn run(

tracing::Span::current().record("peer_id", &peer_id.to_string());

let external_addr: Multiaddr =
format!("{}/tcp/{}", config.tce_addr, config.tce_local_port).parse()?;

let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", config.tce_local_port).parse()?;

let mut boot_peers = config.boot_peers.clone();

// Remove myself from the bootnode list
Expand Down Expand Up @@ -144,9 +139,9 @@ pub async fn run(

let (network_client, event_stream, unbootstrapped_runtime) = topos_p2p::network::builder()
.peer_key(key)
.listen_addr(addr)
.listen_addresses(config.listen_addresses.clone())
.minimum_cluster_size(config.minimum_cluster_size)
.exposed_addresses(external_addr)
.public_addresses(config.public_addresses.clone())
.known_peers(&boot_peers)
.grpc_context(grpc_context)
.build()
Expand Down
37 changes: 36 additions & 1 deletion crates/topos-test-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ pub mod sequencer;
pub mod storage;
pub mod tce;

use std::{collections::HashSet, net::SocketAddr, sync::Mutex};
use std::{
collections::HashSet,
net::SocketAddr,
path::PathBuf,
str::FromStr,
sync::Mutex,
thread,
time::{SystemTime, UNIX_EPOCH},
};

use lazy_static::lazy_static;
use rand::Rng;
use rstest::fixture;

lazy_static! {
pub static ref PORT_MAPPING: Mutex<HashSet<u16>> = Mutex::new(HashSet::new());
Expand Down Expand Up @@ -92,3 +102,28 @@ fn next_available_port() -> SocketAddr {

addr
}

#[fixture]
fn folder_name() -> &'static str {
Box::leak(Box::new(
thread::current().name().unwrap().replace("::", "_"),
))
}

#[fixture]
pub fn create_folder(folder_name: &str) -> PathBuf {
let dir = env!("TOPOS_TEST_SDK_TMP");
let mut temp_dir =
std::path::PathBuf::from_str(dir).expect("Unable to read CARGO_TARGET_TMPDIR");
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut rng = rand::thread_rng();

temp_dir.push(format!(
"{}/data_{}_{}",
folder_name,
time.as_nanos(),
rng.gen::<u64>()
));

temp_dir
}
Loading

0 comments on commit 476948f

Please sign in to comment.