From d7fd868dca93255dbd12ca84505a2a4c4fee34c8 Mon Sep 17 00:00:00 2001
From: driftluo <driftluo@foxmail.com>
Date: Wed, 15 Jan 2025 15:04:41 +0800
Subject: [PATCH] feat: add transport type to outbound service

---
 benches/benches/benchmarks/overall.rs         |   3 +-
 chain/src/tests/util.rs                       |   3 +-
 network/src/network.rs                        |  37 +++++--
 network/src/peer_store/addr_manager.rs        | 104 +++++++++---------
 network/src/peer_store/peer_store_impl.rs     |   1 -
 network/src/peer_store/types.rs               |   1 +
 network/src/services/outbound_peer.rs         |  29 ++++-
 network/src/tests/mod.rs                      |   2 +-
 network/src/tests/peer_store.rs               |  18 +++
 rpc/src/tests/setup.rs                        |   3 +-
 sync/src/relayer/tests/helper.rs              |   7 +-
 util/launcher/src/lib.rs                      |   5 +-
 .../src/tests/utils/chain.rs                  |   3 +-
 13 files changed, 137 insertions(+), 79 deletions(-)

diff --git a/benches/benches/benchmarks/overall.rs b/benches/benches/benchmarks/overall.rs
index 103cab0893a..17b8c91ce86 100644
--- a/benches/benches/benchmarks/overall.rs
+++ b/benches/benches/benchmarks/overall.rs
@@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController};
 use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow};
 use ckb_dao_utils::genesis_dao_data;
 use ckb_jsonrpc_types::JsonBytes;
-use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
+use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
 use ckb_shared::{Shared, SharedBuilder};
 use ckb_store::ChainStore;
 use ckb_types::{
@@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController {
             "test".to_string(),
             Flags::COMPATIBILITY,
         ),
+        TransportType::Tcp,
     )
     .start(shared.async_handle())
     .expect("Start network service failed")
diff --git a/chain/src/tests/util.rs b/chain/src/tests/util.rs
index f29cd97ad72..2d1cde059fb 100644
--- a/chain/src/tests/util.rs
+++ b/chain/src/tests/util.rs
@@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig};
 use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
 use ckb_dao_utils::genesis_dao_data;
 use ckb_jsonrpc_types::ScriptHashType;
-use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
+use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
 use ckb_shared::{Shared, SharedBuilder};
 use ckb_store::ChainStore;
 use ckb_test_chain_utils::{always_success_cell, create_always_success_tx};
@@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
             "test".to_string(),
             Flags::COMPATIBILITY,
         ),
+        TransportType::Tcp,
     )
     .start(shared.async_handle())
     .expect("Start network service failed")
diff --git a/network/src/network.rs b/network/src/network.rs
index d51fe87916f..630d82a1986 100644
--- a/network/src/network.rs
+++ b/network/src/network.rs
@@ -831,6 +831,7 @@ impl NetworkService {
         required_protocol_ids: Vec<ProtocolId>,
         // name, version, flags
         identify_announce: (String, String, Flags),
+        transport_type: TransportType,
     ) -> Self {
         let config = &network_state.config;
 
@@ -1017,7 +1018,7 @@ impl NetworkService {
                                 service_builder = service_builder.tcp_config(bind_fn);
                             }
                         }
-                        TransportType::Ws => {
+                        TransportType::Ws | TransportType::Wss => {
                             // only bind once
                             if matches!(init, BindType::Ws) {
                                 continue;
@@ -1074,6 +1075,7 @@ impl NetworkService {
                 Arc::clone(&network_state),
                 p2p_service.control().to_owned().into(),
                 Duration::from_secs(config.connect_outbound_interval_secs),
+                transport_type,
             );
             bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
         };
@@ -1521,18 +1523,33 @@ pub(crate) async fn async_disconnect_with_message(
 }
 
 #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
-pub(crate) enum TransportType {
+pub enum TransportType {
     Tcp,
     Ws,
+    Wss,
 }
 
-pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
-    if addr
-        .iter()
-        .any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss))
-    {
-        TransportType::Ws
-    } else {
-        TransportType::Tcp
+impl<'a> From<TransportType> for p2p::multiaddr::Protocol<'a> {
+    fn from(value: TransportType) -> Self {
+        match value {
+            TransportType::Ws => Protocol::Ws,
+            TransportType::Wss => Protocol::Wss,
+            _ => unreachable!(),
+        }
     }
 }
+
+pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
+    let mut iter = addr.iter();
+
+    iter.find_map(|proto| {
+        if let Protocol::Ws = proto {
+            Some(TransportType::Ws)
+        } else if let Protocol::Wss = proto {
+            Some(TransportType::Wss)
+        } else {
+            None
+        }
+    })
+    .unwrap_or(TransportType::Tcp)
+}
diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs
index b4b5179ba3b..a94ee35a7b4 100644
--- a/network/src/peer_store/addr_manager.rs
+++ b/network/src/peer_store/addr_manager.rs
@@ -3,13 +3,12 @@ use crate::peer_store::types::AddrInfo;
 use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr};
 use rand::Rng;
 use std::collections::{HashMap, HashSet};
-use std::net::SocketAddr;
 
 /// Address manager
 #[derive(Default)]
 pub struct AddrManager {
     next_id: u64,
-    addr_to_id: HashMap<SocketAddr, u64>,
+    addr_to_id: HashMap<Multiaddr, u64>,
     id_to_info: HashMap<u64, AddrInfo>,
     random_ids: Vec<u64>,
 }
@@ -17,27 +16,25 @@ pub struct AddrManager {
 impl AddrManager {
     /// Add an address information to address manager
     pub fn add(&mut self, mut addr_info: AddrInfo) {
-        if let Some(key) = multiaddr_to_socketaddr(&addr_info.addr) {
-            if let Some(&id) = self.addr_to_id.get(&key) {
-                let (exist_last_connected_at_ms, random_id_pos) = {
-                    let info = self.id_to_info.get(&id).expect("must exists");
-                    (info.last_connected_at_ms, info.random_id_pos)
-                };
-                // Get time earlier than record time, return directly
-                if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
-                    addr_info.random_id_pos = random_id_pos;
-                    self.id_to_info.insert(id, addr_info);
-                }
-                return;
+        if let Some(&id) = self.addr_to_id.get(&addr_info.addr) {
+            let (exist_last_connected_at_ms, random_id_pos) = {
+                let info = self.id_to_info.get(&id).expect("must exists");
+                (info.last_connected_at_ms, info.random_id_pos)
+            };
+            // Get time earlier than record time, return directly
+            if addr_info.last_connected_at_ms >= exist_last_connected_at_ms {
+                addr_info.random_id_pos = random_id_pos;
+                self.id_to_info.insert(id, addr_info);
             }
-
-            let id = self.next_id;
-            self.addr_to_id.insert(key, id);
-            addr_info.random_id_pos = self.random_ids.len();
-            self.id_to_info.insert(id, addr_info);
-            self.random_ids.push(id);
-            self.next_id += 1;
+            return;
         }
+
+        let id = self.next_id;
+        self.addr_to_id.insert(addr_info.addr.clone(), id);
+        addr_info.random_id_pos = self.random_ids.len();
+        self.id_to_info.insert(id, addr_info);
+        self.random_ids.push(id);
+        self.next_id += 1;
     }
 
     /// Randomly return addrs that worth to try or connect.
@@ -55,23 +52,30 @@ impl AddrManager {
             let j = rng.gen_range(i..self.random_ids.len());
             self.swap_random_id(j, i);
             let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned();
-            if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) {
-                let ip = socket_addr.ip();
-                let is_unique_ip = !duplicate_ips.contains(&ip);
-                // A trick to make our tests work
-                // TODO remove this after fix the network tests.
-                let is_test_ip = ip.is_unspecified() || ip.is_loopback();
-                if (is_test_ip || is_unique_ip)
-                    && addr_info.is_connectable(now_ms)
-                    && filter(&addr_info)
-                {
-                    duplicate_ips.insert(ip);
-                    addr_infos.push(addr_info);
+            match multiaddr_to_socketaddr(&addr_info.addr) {
+                Some(socket_addr) => {
+                    let ip = socket_addr.ip();
+                    let is_unique_ip = !duplicate_ips.contains(&ip);
+                    // A trick to make our tests work
+                    // TODO remove this after fix the network tests.
+                    let is_test_ip = ip.is_unspecified() || ip.is_loopback();
+                    if (is_test_ip || is_unique_ip)
+                        && addr_info.is_connectable(now_ms)
+                        && filter(&addr_info)
+                    {
+                        duplicate_ips.insert(ip);
+                        addr_infos.push(addr_info);
+                    }
                 }
-                if addr_infos.len() == count {
-                    break;
+                None => {
+                    if addr_info.is_connectable(now_ms) && filter(&addr_info) {
+                        addr_infos.push(addr_info);
+                    }
                 }
             }
+            if addr_infos.len() == count {
+                break;
+            }
         }
         addr_infos
     }
@@ -88,34 +92,26 @@ impl AddrManager {
 
     /// Remove an address by ip and port
     pub fn remove(&mut self, addr: &Multiaddr) -> Option<AddrInfo> {
-        multiaddr_to_socketaddr(addr).and_then(|addr| {
-            self.addr_to_id.remove(&addr).and_then(|id| {
-                let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
-                // swap with last index, then remove the last index
-                self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
-                self.random_ids.pop();
-                self.id_to_info.remove(&id)
-            })
+        self.addr_to_id.remove(&addr).and_then(|id| {
+            let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos;
+            // swap with last index, then remove the last index
+            self.swap_random_id(random_id_pos, self.random_ids.len() - 1);
+            self.random_ids.pop();
+            self.id_to_info.remove(&id)
         })
     }
 
     /// Get an address information by ip and port
     pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> {
-        multiaddr_to_socketaddr(addr).and_then(|addr| {
-            self.addr_to_id
-                .get(&addr)
-                .and_then(|id| self.id_to_info.get(id))
-        })
+        self.addr_to_id
+            .get(&addr)
+            .and_then(|id| self.id_to_info.get(id))
     }
 
     /// Get a mutable address information by ip and port
     pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> {
-        if let Some(addr) = multiaddr_to_socketaddr(addr) {
-            if let Some(id) = self.addr_to_id.get(&addr) {
-                self.id_to_info.get_mut(id)
-            } else {
-                None
-            }
+        if let Some(id) = self.addr_to_id.get(&addr) {
+            self.id_to_info.get_mut(id)
         } else {
             None
         }
diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs
index 3169cb45f79..a8078b88775 100644
--- a/network/src/peer_store/peer_store_impl.rs
+++ b/network/src/peer_store/peer_store_impl.rs
@@ -1,4 +1,3 @@
-use crate::network::{find_type, TransportType};
 use crate::{
     errors::{PeerStoreError, Result},
     extract_peer_id, multiaddr_to_socketaddr,
diff --git a/network/src/peer_store/types.rs b/network/src/peer_store/types.rs
index 2db34c306bb..ef089bdb633 100644
--- a/network/src/peer_store/types.rs
+++ b/network/src/peer_store/types.rs
@@ -66,6 +66,7 @@ impl AddrInfo {
             addr: addr
                 .iter()
                 .filter_map(|p| {
+                    let p = p;
                     if matches!(
                         p,
                         Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_)
diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs
index 5a59d766845..a9ddccf2073 100644
--- a/network/src/services/outbound_peer.rs
+++ b/network/src/services/outbound_peer.rs
@@ -1,4 +1,5 @@
 use crate::{
+    network::TransportType,
     peer_store::{types::AddrInfo, PeerStore},
     NetworkState,
 };
@@ -27,6 +28,7 @@ pub struct OutboundPeerService {
     interval: Option<Interval>,
     try_connect_interval: Duration,
     try_identify_count: u8,
+    transport_type: TransportType,
 }
 
 impl OutboundPeerService {
@@ -34,6 +36,7 @@ impl OutboundPeerService {
         network_state: Arc<NetworkState>,
         p2p_control: ServiceControl,
         try_connect_interval: Duration,
+        _transport_type: TransportType,
     ) -> Self {
         OutboundPeerService {
             network_state,
@@ -41,6 +44,10 @@ impl OutboundPeerService {
             interval: None,
             try_connect_interval,
             try_identify_count: 0,
+            #[cfg(not(target_family = "wasm"))]
+            transport_type: TransportType::Tcp,
+            #[cfg(target_family = "wasm")]
+            transport_type: _transport_type,
         }
     }
 
@@ -63,8 +70,15 @@ impl OutboundPeerService {
             attempt_peers,
         );
 
-        for addr in attempt_peers.into_iter().map(|info| info.addr) {
-            self.network_state.dial_feeler(&self.p2p_control, addr);
+        for mut addr in attempt_peers.into_iter().map(|info| info.addr) {
+            self.network_state.dial_feeler(&self.p2p_control, {
+                if !matches!(self.transport_type, TransportType::Tcp) {
+                    addr.push(self.transport_type.into());
+                    addr
+                } else {
+                    addr
+                }
+            });
         }
     }
 
@@ -132,8 +146,15 @@ impl OutboundPeerService {
             Box::new(attempt_peers.into_iter().map(|info| info.addr))
         };
 
-        for addr in peers {
-            self.network_state.dial_identify(&self.p2p_control, addr);
+        for mut addr in peers {
+            self.network_state.dial_identify(&self.p2p_control, {
+                if !matches!(self.transport_type, TransportType::Tcp) {
+                    addr.push(self.transport_type.into());
+                    addr
+                } else {
+                    addr
+                }
+            });
         }
     }
 
diff --git a/network/src/tests/mod.rs b/network/src/tests/mod.rs
index dde66545f05..d40867f40f2 100644
--- a/network/src/tests/mod.rs
+++ b/network/src/tests/mod.rs
@@ -20,7 +20,7 @@ fn random_addr_v6() -> crate::multiaddr::Multiaddr {
 
     multi_addr.push(crate::multiaddr::Protocol::Tcp(43));
     multi_addr.push(crate::multiaddr::Protocol::P2P(
-        crate::PeerId::random().to_base58().into_bytes().into(),
+        crate::PeerId::random().into_bytes().into(),
     ));
     multi_addr
 }
diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs
index e23db5c4129..85881b278ee 100644
--- a/network/src/tests/peer_store.rs
+++ b/network/src/tests/peer_store.rs
@@ -603,3 +603,21 @@ fn test_only_tcp_store() {
         addr
     });
 }
+
+#[test]
+fn test_support_dns_store() {
+    let mut peer_store = PeerStore::default();
+    let addr: Multiaddr = format!(
+        "/dns4/www.abc.com/tcp/{}/p2p/{}",
+        rand::random::<u16>(),
+        crate::PeerId::random().to_base58()
+    )
+    .parse()
+    .unwrap();
+
+    peer_store
+        .add_addr(addr.clone(), Flags::COMPATIBILITY)
+        .unwrap();
+    assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1);
+    assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, addr);
+}
diff --git a/rpc/src/tests/setup.rs b/rpc/src/tests/setup.rs
index 888186fe57a..4afea1fa6d9 100644
--- a/rpc/src/tests/setup.rs
+++ b/rpc/src/tests/setup.rs
@@ -9,7 +9,7 @@ use ckb_chain::start_chain_services;
 use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
 use ckb_chain_spec::versionbits::{ActiveMode, Deployment, DeploymentPos};
 use ckb_dao_utils::genesis_dao_data;
-use ckb_network::{Flags, NetworkService, NetworkState};
+use ckb_network::{network::TransportType, Flags, NetworkService, NetworkState};
 use ckb_network_alert::alert_relayer::AlertRelayer;
 use ckb_notify::NotifyService;
 use ckb_shared::SharedBuilder;
@@ -112,6 +112,7 @@ pub(crate) fn setup_rpc_test_suite(height: u64, consensus: Option<Consensus>) ->
                 "0.1.0".to_string(),
                 Flags::COMPATIBILITY,
             ),
+            TransportType::Tcp,
         )
         .start(shared.async_handle())
         .expect("Start network service failed")
diff --git a/sync/src/relayer/tests/helper.rs b/sync/src/relayer/tests/helper.rs
index a11ceb1fe82..ed872ac550e 100644
--- a/sync/src/relayer/tests/helper.rs
+++ b/sync/src/relayer/tests/helper.rs
@@ -5,9 +5,9 @@ use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder};
 use ckb_dao::DaoCalculator;
 use ckb_dao_utils::genesis_dao_data;
 use ckb_network::{
-    async_trait, bytes::Bytes as P2pBytes, Behaviour, CKBProtocolContext, Error, Flags,
-    NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, SupportProtocols,
-    TargetSession,
+    async_trait, bytes::Bytes as P2pBytes, network::TransportType, Behaviour, CKBProtocolContext,
+    Error, Flags, NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId,
+    SupportProtocols, TargetSession,
 };
 use ckb_reward_calculator::RewardCalculator;
 use ckb_shared::{Shared, SharedBuilder, Snapshot};
@@ -127,6 +127,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
             "test".to_string(),
             Flags::COMPATIBILITY,
         ),
+        TransportType::Tcp,
     )
     .start(shared.async_handle())
     .expect("Start network service failed")
diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs
index 87e87a2a2f5..8bf7105b162 100644
--- a/util/launcher/src/lib.rs
+++ b/util/launcher/src/lib.rs
@@ -15,8 +15,8 @@ use ckb_light_client_protocol_server::LightClientProtocol;
 use ckb_logger::info;
 use ckb_logger::internal::warn;
 use ckb_network::{
-    observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, NetworkService,
-    NetworkState, SupportProtocols,
+    network::TransportType, observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController,
+    NetworkService, NetworkState, SupportProtocols,
 };
 use ckb_network_alert::alert_relayer::AlertRelayer;
 use ckb_resource::Resource;
@@ -384,6 +384,7 @@ impl Launcher {
                 self.version.to_string(),
                 flags,
             ),
+            TransportType::Tcp,
         )
         .start(shared.async_handle())
         .expect("Start network service failed");
diff --git a/util/light-client-protocol-server/src/tests/utils/chain.rs b/util/light-client-protocol-server/src/tests/utils/chain.rs
index 03e37e704bf..94777dfa670 100644
--- a/util/light-client-protocol-server/src/tests/utils/chain.rs
+++ b/util/light-client-protocol-server/src/tests/utils/chain.rs
@@ -8,7 +8,7 @@ use ckb_chain::{start_chain_services, ChainController};
 use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder};
 use ckb_dao_utils::genesis_dao_data;
 use ckb_jsonrpc_types::ScriptHashType;
-use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
+use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
 use ckb_shared::{Shared, SharedBuilder};
 use ckb_systemtime::unix_time_as_millis;
 use ckb_test_chain_utils::always_success_cell;
@@ -240,6 +240,7 @@ fn dummy_network(shared: &Shared) -> NetworkController {
             "test".to_string(),
             Flags::all(),
         ),
+        TransportType::Tcp,
     )
     .start(shared.async_handle())
     .expect("Start network service failed")