Skip to content

Commit

Permalink
feat: add transport type to outbound service
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Jan 15, 2025
1 parent 24cf5f0 commit 4c828e4
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 79 deletions.
3 changes: 2 additions & 1 deletion benches/benches/benchmarks/overall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController};
use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::JsonBytes;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_types::{
Expand Down Expand Up @@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
3 changes: 2 additions & 1 deletion chain/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig};
use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_test_chain_utils::{always_success_cell, create_always_success_tx};
Expand Down Expand Up @@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
41 changes: 31 additions & 10 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ impl NetworkService {
required_protocol_ids: Vec<ProtocolId>,
// name, version, flags
identify_announce: (String, String, Flags),
transport_type: TransportType,
) -> Self {
let config = &network_state.config;

Expand Down Expand Up @@ -1017,7 +1018,7 @@ impl NetworkService {
service_builder = service_builder.tcp_config(bind_fn);
}
}
TransportType::Ws => {
TransportType::Ws | TransportType::Wss => {
// only bind once
if matches!(init, BindType::Ws) {
continue;
Expand Down Expand Up @@ -1074,6 +1075,7 @@ impl NetworkService {
Arc::clone(&network_state),
p2p_service.control().to_owned().into(),
Duration::from_secs(config.connect_outbound_interval_secs),
transport_type,
);
bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
};
Expand Down Expand Up @@ -1520,19 +1522,38 @@ pub(crate) async fn async_disconnect_with_message(
control.disconnect(peer_index).await
}

/// Transport type on ckb
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub(crate) enum TransportType {
pub enum TransportType {
/// Tcp
Tcp,
/// Ws
Ws,
/// Wss only on wasm
Wss,
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
if addr
.iter()
.any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss))
{
TransportType::Ws
} else {
TransportType::Tcp
impl<'a> From<TransportType> for p2p::multiaddr::Protocol<'a> {
fn from(value: TransportType) -> Self {
match value {
TransportType::Ws => Protocol::Ws,
TransportType::Wss => Protocol::Wss,
_ => unreachable!(),
}
}
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
let mut iter = addr.iter();

iter.find_map(|proto| {
if let Protocol::Ws = proto {
Some(TransportType::Ws)
} else if let Protocol::Wss = proto {
Some(TransportType::Wss)
} else {
None
}
})
.unwrap_or(TransportType::Tcp)
}
104 changes: 50 additions & 54 deletions network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,38 @@ use crate::peer_store::types::AddrInfo;
use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr};
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;

/// Address manager
#[derive(Default)]
pub struct AddrManager {
next_id: u64,
addr_to_id: HashMap<SocketAddr, u64>,
addr_to_id: HashMap<Multiaddr, u64>,
id_to_info: HashMap<u64, AddrInfo>,
random_ids: Vec<u64>,
}

impl AddrManager {
/// Add an address information to address manager
pub fn add(&mut self, mut addr_info: AddrInfo) {
if let Some(key) = multiaddr_to_socketaddr(&addr_info.addr) {
if let Some(&id) = self.addr_to_id.get(&key) {
let (exist_last_connected_at_ms, random_id_pos) = {
let info = self.id_to_info.get(&id).expect("must exists");
(info.last_connected_at_ms, info.random_id_pos)
};
// Get time earlier than record time, return directly
if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
addr_info.random_id_pos = random_id_pos;
self.id_to_info.insert(id, addr_info);
}
return;
if let Some(&id) = self.addr_to_id.get(&addr_info.addr) {
let (exist_last_connected_at_ms, random_id_pos) = {
let info = self.id_to_info.get(&id).expect("must exists");
(info.last_connected_at_ms, info.random_id_pos)
};
// Get time earlier than record time, return directly
if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
addr_info.random_id_pos = random_id_pos;
self.id_to_info.insert(id, addr_info);
}

let id = self.next_id;
self.addr_to_id.insert(key, id);
addr_info.random_id_pos = self.random_ids.len();
self.id_to_info.insert(id, addr_info);
self.random_ids.push(id);
self.next_id += 1;
return;
}

let id = self.next_id;
self.addr_to_id.insert(addr_info.addr.clone(), id);
addr_info.random_id_pos = self.random_ids.len();
self.id_to_info.insert(id, addr_info);
self.random_ids.push(id);
self.next_id += 1;
}

/// Randomly return addrs that worth to try or connect.
Expand All @@ -55,23 +52,30 @@ impl AddrManager {
let j = rng.gen_range(i..self.random_ids.len());
self.swap_random_id(j, i);
let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned();
if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) {
let ip = socket_addr.ip();
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
match multiaddr_to_socketaddr(&addr_info.addr) {
Some(socket_addr) => {
let ip = socket_addr.ip();
let is_unique_ip = !duplicate_ips.contains(&ip);
// A trick to make our tests work
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
duplicate_ips.insert(ip);
addr_infos.push(addr_info);
}
}
if addr_infos.len() == count {
break;
None => {
if addr_info.is_connectable(now_ms) && filter(&addr_info) {
addr_infos.push(addr_info);
}
}
}
if addr_infos.len() == count {
break;
}
}
addr_infos
}
Expand All @@ -88,34 +92,26 @@ impl AddrManager {

/// Remove an address by ip and port
pub fn remove(&mut self, addr: &Multiaddr) -> Option<AddrInfo> {
multiaddr_to_socketaddr(addr).and_then(|addr| {
self.addr_to_id.remove(&addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
self.random_ids.pop();
self.id_to_info.remove(&id)
})
self.addr_to_id.remove(addr).and_then(|id| {
let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
// swap with last index, then remove the last index
self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
self.random_ids.pop();
self.id_to_info.remove(&id)
})
}

/// Get an address information by ip and port
pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> {
multiaddr_to_socketaddr(addr).and_then(|addr| {
self.addr_to_id
.get(&addr)
.and_then(|id| self.id_to_info.get(id))
})
self.addr_to_id
.get(addr)
.and_then(|id| self.id_to_info.get(id))
}

/// Get a mutable address information by ip and port
pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> {
if let Some(addr) = multiaddr_to_socketaddr(addr) {
if let Some(id) = self.addr_to_id.get(&addr) {
self.id_to_info.get_mut(id)
} else {
None
}
if let Some(id) = self.addr_to_id.get(addr) {
self.id_to_info.get_mut(id)
} else {
None
}
Expand Down
1 change: 0 additions & 1 deletion network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::network::{find_type, TransportType};
use crate::{
errors::{PeerStoreError, Result},
extract_peer_id, multiaddr_to_socketaddr,
Expand Down
25 changes: 21 additions & 4 deletions network/src/services/outbound_peer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
network::TransportType,
peer_store::{types::AddrInfo, PeerStore},
NetworkState,
};
Expand Down Expand Up @@ -27,20 +28,26 @@ pub struct OutboundPeerService {
interval: Option<Interval>,
try_connect_interval: Duration,
try_identify_count: u8,
transport_type: TransportType,
}

impl OutboundPeerService {
pub fn new(
network_state: Arc<NetworkState>,
p2p_control: ServiceControl,
try_connect_interval: Duration,
_transport_type: TransportType,
) -> Self {
OutboundPeerService {
network_state,
p2p_control,
interval: None,
try_connect_interval,
try_identify_count: 0,
#[cfg(not(target_family = "wasm"))]
transport_type: TransportType::Tcp,
#[cfg(target_family = "wasm")]
transport_type: _transport_type,
}
}

Expand All @@ -63,8 +70,13 @@ impl OutboundPeerService {
attempt_peers,
);

for addr in attempt_peers.into_iter().map(|info| info.addr) {
self.network_state.dial_feeler(&self.p2p_control, addr);
for mut addr in attempt_peers.into_iter().map(|info| info.addr) {
self.network_state.dial_feeler(&self.p2p_control, {
if !matches!(self.transport_type, TransportType::Tcp) {
addr.push(self.transport_type.into());
}
addr
});
}
}

Expand Down Expand Up @@ -132,8 +144,13 @@ impl OutboundPeerService {
Box::new(attempt_peers.into_iter().map(|info| info.addr))
};

for addr in peers {
self.network_state.dial_identify(&self.p2p_control, addr);
for mut addr in peers {
self.network_state.dial_identify(&self.p2p_control, {
if !matches!(self.transport_type, TransportType::Tcp) {
addr.push(self.transport_type.into());
}
addr
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion network/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn random_addr_v6() -> crate::multiaddr::Multiaddr {

multi_addr.push(crate::multiaddr::Protocol::Tcp(43));
multi_addr.push(crate::multiaddr::Protocol::P2P(
crate::PeerId::random().to_base58().into_bytes().into(),
crate::PeerId::random().into_bytes().into(),
));
multi_addr
}
18 changes: 18 additions & 0 deletions network/src/tests/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,21 @@ fn test_only_tcp_store() {
addr
});
}

#[test]
fn test_support_dns_store() {
let mut peer_store = PeerStore::default();
let addr: Multiaddr = format!(
"/dns4/www.abc.com/tcp/{}/p2p/{}",
rand::random::<u16>(),
crate::PeerId::random().to_base58()
)
.parse()
.unwrap();

peer_store
.add_addr(addr.clone(), Flags::COMPATIBILITY)
.unwrap();
assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, addr);
}
3 changes: 2 additions & 1 deletion rpc/src/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ckb_chain::start_chain_services;
use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
use ckb_chain_spec::versionbits::{ActiveMode, Deployment, DeploymentPos};
use ckb_dao_utils::genesis_dao_data;
use ckb_network::{Flags, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkService, NetworkState};
use ckb_network_alert::alert_relayer::AlertRelayer;
use ckb_notify::NotifyService;
use ckb_shared::SharedBuilder;
Expand Down Expand Up @@ -112,6 +112,7 @@ pub(crate) fn setup_rpc_test_suite(height: u64, consensus: Option<Consensus>) ->
"0.1.0".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
7 changes: 4 additions & 3 deletions sync/src/relayer/tests/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder};
use ckb_dao::DaoCalculator;
use ckb_dao_utils::genesis_dao_data;
use ckb_network::{
async_trait, bytes::Bytes as P2pBytes, Behaviour, CKBProtocolContext, Error, Flags,
NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, SupportProtocols,
TargetSession,
async_trait, bytes::Bytes as P2pBytes, network::TransportType, Behaviour, CKBProtocolContext,
Error, Flags, NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId,
SupportProtocols, TargetSession,
};
use ckb_reward_calculator::RewardCalculator;
use ckb_shared::{Shared, SharedBuilder, Snapshot};
Expand Down Expand Up @@ -127,6 +127,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
Loading

0 comments on commit 4c828e4

Please sign in to comment.