Skip to content

Commit

Permalink
Merge pull request #32 from UnstoppableSwap/fix/announce-address-immed
Browse files Browse the repository at this point in the history
fix: Immediately announce onion address to swarm
  • Loading branch information
umgefahren authored Dec 17, 2024
2 parents 87f0e41 + 3b227a6 commit e127d90
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 81 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ jobs:

- uses: Swatinem/rust-cache@v2

- name: Run all tests
- name: Run tests with all features enabled
run: cargo test --all-features

- name: Run tests without features enabled
run: cargo test

clippy:
name: Clippy
runs-on: ubuntu-latest
Expand All @@ -57,9 +60,12 @@ jobs:

- uses: Swatinem/rust-cache@v2

- name: Run cargo clippy
- name: Run cargo clippy with all features enabled
run: cargo clippy --all-features

- name: Run cargo clippy without any features enabled
run: cargo clippy

rustfmt:
name: Rust format
runs-on: ubuntu-latest
Expand Down
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tracing = "0.1.40"
tor-hsservice = { version = "0.24.0", optional = true }
tor-cell = { version = "0.24.0", optional = true }
tor-proto = { version = "0.24.0", optional = true }
data-encoding = { version = "2.6.0", optional = true }
data-encoding = { version = "2.6.0" }

[dev-dependencies]
libp2p = { version = "0.53", default-features = false, features = ["tokio", "noise", "yamux", "ping", "macros", "tcp", "tls"] }
Expand All @@ -35,8 +35,7 @@ listen-onion-service = [
"arti-client/onion-service-service",
"dep:tor-hsservice",
"dep:tor-cell",
"dep:tor-proto",
"dep:data-encoding"
"dep:tor-proto"
]

[[example]]
Expand Down
4 changes: 0 additions & 4 deletions examples/ping-onion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.dial(remote)?;
println!("Dialed {addr}")
} else {
// TODO: We need to do this because otherwise the status of the onion service is gonna be [`Shutdown`]
// when we first poll it and then the swarm will not pull it again (?). I don't know why this is the case.
tokio::time::sleep(std::time::Duration::from_secs(20)).await;

// If we are not dialing, we need to listen
// Tell the swarm to listen on a specific onion address
swarm.listen_on(onion_listen_address).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub fn safe_extract(multiaddr: &Multiaddr) -> Option<TorAddr> {

fn libp2p_onion_address_to_domain_and_port<'a>(
onion_address: &'a Onion3Addr<'_>,
) -> Option<(&'a str, u16)> {
) -> (&'a str, u16) {
// Here we convert from Onion3Addr to TorAddr
// We need to leak the string because it's a temporary string that would otherwise be freed
let hash = data_encoding::BASE32.encode(onion_address.hash());
let onion_domain = format!("{}.onion", hash);
let onion_domain = format!("{hash}.onion");
let onion_domain = Box::leak(onion_domain.into_boxed_str());

Some((onion_domain, onion_address.port()))
(onion_domain, onion_address.port())
}

fn try_to_domain_and_port<'a>(
Expand All @@ -72,7 +72,7 @@ fn try_to_domain_and_port<'a>(
Protocol::Dns(domain) | Protocol::Dns4(domain) | Protocol::Dns6(domain),
Some(Protocol::Tcp(port)),
) => Some((domain.as_ref(), *port)),
(Protocol::Onion3(domain), _) => libp2p_onion_address_to_domain_and_port(domain),
(Protocol::Onion3(domain), _) => Some(libp2p_onion_address_to_domain_and_port(domain)),
_ => None,
}
}
Expand Down
141 changes: 73 additions & 68 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,28 @@
use arti_client::{TorClient, TorClientBuilder};
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use libp2p::multiaddr::Protocol;
use libp2p::{
core::transport::{ListenerId, TransportEvent},
Multiaddr, Transport, TransportError,
};
use std::collections::HashMap;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{collections::HashSet, pin::Pin};
use std::{collections::VecDeque, sync::Arc};
use thiserror::Error;
use tor_hsservice::handle_rend_requests;
use tor_hsservice::status::OnionServiceStatus;
use tor_hsservice::StreamRequest;
use tor_rtcompat::tokio::TokioRustlsRuntime;

// We only need these imports if the `listen-onion-service` feature is enabled
#[cfg(feature = "listen-onion-service")]
use std::collections::HashMap;
#[cfg(feature = "listen-onion-service")]
use std::str::FromStr;
#[cfg(feature = "listen-onion-service")]
use tor_cell::relaycell::msg::{Connected, End, EndReason};
#[cfg(feature = "listen-onion-service")]
use tor_hsservice::{HsId, OnionServiceConfig, RunningOnionService};
use tor_hsservice::{
handle_rend_requests, status::OnionServiceStatus, HsId, OnionServiceConfig,
RunningOnionService, StreamRequest,
};
#[cfg(feature = "listen-onion-service")]
use tor_proto::stream::IncomingStreamRequest;

Expand All @@ -88,9 +88,9 @@ pub type TorError = arti_client::Error;

type PendingUpgrade = BoxFuture<'static, Result<TokioTorStream, TorTransportError>>;
#[cfg(feature = "listen-onion-service")]
type OnionServiceStream = BoxStream<'static, StreamRequest>;
type OnionServiceStream = futures::stream::BoxStream<'static, StreamRequest>;
#[cfg(feature = "listen-onion-service")]
type OnionServiceStatusStream = BoxStream<'static, OnionServiceStatus>;
type OnionServiceStatusStream = futures::stream::BoxStream<'static, OnionServiceStatus>;

/// Struct representing an onion address we are listening on for libp2p connections.
#[cfg(feature = "listen-onion-service")]
Expand All @@ -107,6 +107,8 @@ struct TorListener {
port: u16,
/// The onion address we are listening on
onion_address: Multiaddr,
/// Whether we have already announced this address
announced: bool,
}

/// Mode of address conversion.
Expand Down Expand Up @@ -176,8 +178,8 @@ impl TorTransport {
conversion_mode: AddressConversion,
) -> Self {
Self {
client,
conversion_mode,
client,
#[cfg(feature = "listen-onion-service")]
listeners: HashMap::new(),
#[cfg(feature = "listen-onion-service")]
Expand Down Expand Up @@ -206,6 +208,9 @@ impl TorTransport {
/// # Returns
/// Returns the Multiaddr of the onion address that the transport can be instructed to listen on
/// To actually listen on the address, you need to call [`listen_on`] with the returned address
///
/// # Errors
/// Returns an error if we cannot get the onion address of the service
#[cfg(feature = "listen-onion-service")]
pub fn add_onion_service(
&mut self,
Expand All @@ -217,7 +222,7 @@ impl TorTransport {

let multiaddr = service
.onion_name()
.ok_or_else(|| anyhow::anyhow!("Onion service has no nickname"))?
.ok_or_else(|| anyhow::anyhow!("Onion service has no onion address"))?
.to_multiaddr(port);

self.services.push((service, request_stream));
Expand Down Expand Up @@ -251,63 +256,50 @@ trait HsIdExt {

#[cfg(feature = "listen-onion-service")]
impl HsIdExt for HsId {
/// Convert an HsId to a Multiaddr
/// Convert an `HsId` to a `Multiaddr`
fn to_multiaddr(&self, port: u16) -> Multiaddr {
let onion_domain = self.to_string();
let onion_without_dot_onion = onion_domain
.split(".")
.split('.')
.nth(0)
.expect("Display formatting of HsId to contain .onion suffix");
let multiaddress_string = format!("/onion3/{}:{}", onion_without_dot_onion, port);
let multiaddress_string = format!("/onion3/{onion_without_dot_onion}:{port}");

Multiaddr::from_str(&multiaddress_string)
.expect("A valid onion address to be convertible to a Multiaddr")
}
}

trait StatusExt {
fn is_reachable(&self) -> bool;
fn is_broken(&self) -> bool;
}

impl StatusExt for OnionServiceStatus {
/// Returns true if the onion service is reachable
fn is_reachable(&self) -> bool {
match self.state() {
tor_hsservice::status::State::Running => true,
tor_hsservice::status::State::DegradedReachable => true,
_ => false,
}
}

fn is_broken(&self) -> bool {
matches!(self.state(), tor_hsservice::status::State::Broken)
}
}

impl Transport for TorTransport {
type Output = TokioTorStream;
type Error = TorTransportError;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type ListenerUpgrade = PendingUpgrade;

#[cfg(not(feature = "listen-onion-service"))]
fn listen_on(
&mut self,
id: ListenerId,
_id: ListenerId,
onion_address: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
// If the `listen-onion-service` feature is not enabled, immediately return an error
#[cfg(not(feature = "listen-onion-service"))]
return Err(TransportError::MultiaddrNotSupported(onion_address.clone()));
// If the `listen-onion-service` feature is not enabled, we do not support listening
Err(TransportError::MultiaddrNotSupported(onion_address.clone()))
}

#[cfg(feature = "listen-onion-service")]
fn listen_on(
&mut self,
id: ListenerId,
onion_address: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
// If the address is not an onion3 address, return an error
let Some(Protocol::Onion3(address)) = onion_address.into_iter().nth(0) else {
let Some(libp2p::multiaddr::Protocol::Onion3(address)) = onion_address.into_iter().nth(0)
else {
return Err(TransportError::MultiaddrNotSupported(onion_address.clone()));
};

// Find the running onion service that matches the requested address
// If we find it, remove it from [`services`] and insert it into [`listeners`]

let position = self
.services
.iter()
Expand All @@ -330,17 +322,21 @@ impl Transport for TorTransport {
onion_address: onion_address.clone(),
port: address.port(),
status_stream,
announced: false,
},
);

return Ok(());
Ok(())
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
// If the `listen-onion-service` feature is not enabled, we do not support listening
#[cfg(not(feature = "listen-onion-service"))]
return false;
// We do not support removing listeners if the `listen-onion-service` feature is not enabled
#[cfg(not(feature = "listen-onion-service"))]
fn remove_listener(&mut self, _id: ListenerId) -> bool {
false
}

#[cfg(feature = "listen-onion-service")]
fn remove_listener(&mut self, id: ListenerId) -> bool {
// Take the listener out of the map. This will stop listening on onion service for libp2p connections (we will not poll it anymore)
// However, we will not stop the onion service itself because we might want to reuse it later
// The onion service will be stopped when the transport is dropped
Expand Down Expand Up @@ -383,37 +379,45 @@ impl Transport for TorTransport {
None
}

#[cfg(not(feature = "listen-onion-service"))]
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
// If the `listen-onion-service` feature is not enabled, we do not support listening
#[cfg(not(feature = "listen-onion-service"))]
return Poll::Pending;
Poll::Pending
}

for (listener_id, listener) in self.listeners.iter_mut() {
#[cfg(feature = "listen-onion-service")]
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
for (listener_id, listener) in &mut self.listeners {
// Check if the service has any new statuses
if let Poll::Ready(Some(status)) = listener.status_stream.as_mut().poll_next(cx) {
tracing::debug!(
status = ?status.state(),
address = listener.onion_address.to_string(),
"Onion service status changed"
);
}

if status.is_reachable() {
// TODO: We might report the address here multiple time to the swarm. Is this a problem?
return Poll::Ready(TransportEvent::NewAddress {
listener_id: *listener_id,
listen_addr: listener.onion_address.clone(),
});
}

if status.is_broken() {
return Poll::Ready(TransportEvent::ListenerError {
listener_id: *listener_id,
error: TorTransportError::Broken,
});
}
// Check if we have already announced this address, if not, do it now
if !listener.announced {
listener.announced = true;

// We announce the address here to the swarm even though we technically cannot guarantee
// that the address is reachable yet from the outside. We might not have registered the
// onion service fully yet (introduction points, hsdir, ...)
//
// However, we need to announce it now because otherwise libp2p might not poll the listener
// again and we will not be able to announce it later.
// TODO: Find out why this is the case, if this is intended behaviour or a bug
return Poll::Ready(TransportEvent::NewAddress {
listener_id: *listener_id,
listen_addr: listener.onion_address.clone(),
});
}

match listener.request_stream.as_mut().poll_next(cx) {
Expand Down Expand Up @@ -445,7 +449,8 @@ impl Transport for TorTransport {
});
}

// The stream has ended. Most likely because the service was shut down
// The stream has ended
// This means that the onion service was shut down, and we will not receive any more connections on it
Poll::Ready(None) => {
return Poll::Ready(TransportEvent::ListenerClosed {
listener_id: *listener_id,
Expand Down

0 comments on commit e127d90

Please sign in to comment.