Skip to content

Commit

Permalink
Merge branch 'revamp-api-access-methods' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
dlon committed Oct 9, 2023
2 parents 524a64d + fbefa3f commit bb2caa8
Show file tree
Hide file tree
Showing 22 changed files with 2,275 additions and 113 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mullvad-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ serde = "1"
serde_json = "1.0"
tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread", "net", "io-std", "io-util", "fs"] }
tokio-rustls = "0.24.1"
tokio-socks = "0.5.1"
rustls-pemfile = "1.0.3"
once_cell = "1.13"

Expand Down
240 changes: 183 additions & 57 deletions mullvad-api/src/https_client_with_sni.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
use talpid_types::ErrorExt;

use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpSocket, TcpStream},
time::timeout,
};
Expand Down Expand Up @@ -73,8 +74,131 @@ enum HttpsConnectorRequest {
enum InnerConnectionMode {
/// Connect directly to the target.
Direct,
/// Connect to the destination via a proxy.
Proxied(ParsedShadowsocksConfig),
/// Connect to the destination via a Shadowsocks proxy.
Shadowsocks(ShadowsocksConfig),
/// Connect to the destination via a Socks proxy.
Socks5(SocksConfig),
}

impl InnerConnectionMode {
async fn connect(
self,
hostname: &str,
addr: &SocketAddr,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
) -> Result<ApiConnection, std::io::Error> {
match self {
// Set up a TCP-socket connection.
InnerConnectionMode::Direct => {
let first_hop = *addr;
let make_proxy_stream = |tcp_stream| async { Ok(tcp_stream) };
Self::connect_proxied(
first_hop,
hostname,
make_proxy_stream,
#[cfg(target_os = "android")]
socket_bypass_tx,
)
.await
}
// Set up a Shadowsocks-connection.
InnerConnectionMode::Shadowsocks(shadowsocks) => {
let first_hop = shadowsocks.params.peer;
let make_proxy_stream = |tcp_stream| async {
Ok(ProxyClientStream::from_stream(
shadowsocks.proxy_context,
tcp_stream,
&ServerConfig::from(shadowsocks.params),
*addr,
))
};
Self::connect_proxied(
first_hop,
hostname,
make_proxy_stream,
#[cfg(target_os = "android")]
socket_bypass_tx,
)
.await
}
// Set up a SOCKS5-connection.
InnerConnectionMode::Socks5(socks) => {
let first_hop = socks.peer;
let make_proxy_stream = |tcp_stream| async {
match socks.authentication {
SocksAuth::None => {
tokio_socks::tcp::Socks5Stream::connect_with_socket(tcp_stream, addr)
.await
}
SocksAuth::Password { username, password } => {
tokio_socks::tcp::Socks5Stream::connect_with_password_and_socket(
tcp_stream, addr, &username, &password,
)
.await
}
}
.map_err(|error| {
io::Error::new(io::ErrorKind::Other, format!("SOCKS error: {error}"))
})
};
Self::connect_proxied(
first_hop,
hostname,
make_proxy_stream,
#[cfg(target_os = "android")]
socket_bypass_tx,
)
.await
}
}
}

/// Create an [`ApiConnection`] from a [`TcpStream`].
///
/// The `make_proxy_stream` closure receives a [`TcpStream`] and produces a
/// stream which can send to and receive data from some server using any
/// proxy protocol. The only restriction is that this stream must implement
/// [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`], as well as
/// [`Unpin`] and [`Send`].
///
/// If a direct connection is to be established (i.e. the stream will not be
/// using any proxy protocol) `make_proxy_stream` may return the
/// [`TcpStream`] itself. See for example how a connection is established
/// from connection mode [`InnerConnectionMode::Direct`].
async fn connect_proxied<ProxyFactory, ProxyFuture, Proxy>(
first_hop: SocketAddr,
hostname: &str,
make_proxy_stream: ProxyFactory,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
) -> Result<ApiConnection, io::Error>
where
ProxyFactory: FnOnce(TcpStream) -> ProxyFuture,
ProxyFuture: Future<Output = io::Result<Proxy>>,
Proxy: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let socket = HttpsConnectorWithSni::open_socket(
first_hop,
#[cfg(target_os = "android")]
socket_bypass_tx,
)
.await?;

let proxy = make_proxy_stream(socket).await?;

#[cfg(feature = "api-override")]
if API.disable_tls {
return Ok(ApiConnection::new(Box::new(ConnectionDecorator(proxy))));
}

let tls_stream = TlsStream::connect_https(proxy, hostname).await?;
Ok(ApiConnection::new(Box::new(tls_stream)))
}
}

#[derive(Clone)]
struct ShadowsocksConfig {
proxy_context: SharedContext,
params: ParsedShadowsocksConfig,
}

#[derive(Clone)]
Expand All @@ -90,6 +214,18 @@ impl From<ParsedShadowsocksConfig> for ServerConfig {
}
}

#[derive(Clone)]
struct SocksConfig {
peer: SocketAddr,
authentication: SocksAuth,
}

#[derive(Clone)]
pub enum SocksAuth {
None,
Password { username: String, password: String },
}

#[derive(err_derive::Error, Debug)]
enum ProxyConfigError {
#[error(display = "Unrecognized cipher selected: {}", _0)]
Expand All @@ -100,16 +236,43 @@ impl TryFrom<ApiConnectionMode> for InnerConnectionMode {
type Error = ProxyConfigError;

fn try_from(config: ApiConnectionMode) -> Result<Self, Self::Error> {
use mullvad_types::access_method;
use std::net::Ipv4Addr;
Ok(match config {
ApiConnectionMode::Direct => InnerConnectionMode::Direct,
ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(config)) => {
InnerConnectionMode::Proxied(ParsedShadowsocksConfig {
peer: config.peer,
password: config.password,
cipher: CipherKind::from_str(&config.cipher)
.map_err(|_| ProxyConfigError::InvalidCipher(config.cipher))?,
})
}
ApiConnectionMode::Proxied(proxy_settings) => match proxy_settings {
ProxyConfig::Shadowsocks(config) => {
InnerConnectionMode::Shadowsocks(ShadowsocksConfig {
params: ParsedShadowsocksConfig {
peer: config.peer,
password: config.password,
cipher: CipherKind::from_str(&config.cipher)
.map_err(|_| ProxyConfigError::InvalidCipher(config.cipher))?,
},
proxy_context: SsContext::new_shared(ServerType::Local),
})
}
ProxyConfig::Socks(config) => match config {
access_method::Socks5::Local(config) => {
InnerConnectionMode::Socks5(SocksConfig {
peer: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), config.port),
authentication: SocksAuth::None,
})
}
access_method::Socks5::Remote(config) => {
let authentication = match config.authentication {
Some(access_method::SocksAuth { username, password }) => {
SocksAuth::Password { username, password }
}
None => SocksAuth::None,
};
InnerConnectionMode::Socks5(SocksConfig {
peer: config.peer,
authentication,
})
}
},
},
})
}
}
Expand All @@ -121,7 +284,6 @@ pub struct HttpsConnectorWithSni {
sni_hostname: Option<String>,
address_cache: AddressCache,
abort_notify: Arc<tokio::sync::Notify>,
proxy_context: SharedContext,
#[cfg(target_os = "android")]
socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
}
Expand Down Expand Up @@ -186,14 +348,16 @@ impl HttpsConnectorWithSni {
sni_hostname,
address_cache,
abort_notify,
proxy_context: SsContext::new_shared(ServerType::Local),
#[cfg(target_os = "android")]
socket_bypass_tx,
},
HttpsConnectorWithSniHandle { tx },
)
}

/// Establishes a TCP connection with a peer at the specified socket address.
///
/// Will timeout after [`CONNECT_TIMEOUT`] seconds.
async fn open_socket(
addr: SocketAddr,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
Expand Down Expand Up @@ -281,7 +445,6 @@ impl Service<Uri> for HttpsConnectorWithSni {
});
let inner = self.inner.clone();
let abort_notify = self.abort_notify.clone();
let proxy_context = self.proxy_context.clone();
#[cfg(target_os = "android")]
let socket_bypass_tx = self.socket_bypass_tx.clone();
let address_cache = self.address_cache.clone();
Expand All @@ -301,50 +464,13 @@ impl Service<Uri> for HttpsConnectorWithSni {
// is selected while connecting.
let stream = loop {
let notify = abort_notify.notified();
let config = { inner.lock().unwrap().proxy_config.clone() };
let stream_fut = async {
match config {
InnerConnectionMode::Direct => {
let socket = Self::open_socket(
addr,
#[cfg(target_os = "android")]
socket_bypass_tx.clone(),
)
.await?;
#[cfg(feature = "api-override")]
if API.disable_tls {
return Ok::<_, io::Error>(ApiConnection::new(Box::new(socket)));
}

let tls_stream = TlsStream::connect_https(socket, &hostname).await?;
Ok::<_, io::Error>(ApiConnection::new(Box::new(tls_stream)))
}
InnerConnectionMode::Proxied(proxy_config) => {
let socket = Self::open_socket(
proxy_config.peer,
#[cfg(target_os = "android")]
socket_bypass_tx.clone(),
)
.await?;
let proxy = ProxyClientStream::from_stream(
proxy_context.clone(),
socket,
&ServerConfig::from(proxy_config),
addr,
);

#[cfg(feature = "api-override")]
if API.disable_tls {
return Ok(ApiConnection::new(Box::new(ConnectionDecorator(
proxy,
))));
}

let tls_stream = TlsStream::connect_https(proxy, &hostname).await?;
Ok(ApiConnection::new(Box::new(tls_stream)))
}
}
};
let proxy_config = { inner.lock().unwrap().proxy_config.clone() };
let stream_fut = proxy_config.connect(
&hostname,
&addr,
#[cfg(target_os = "android")]
socket_bypass_tx.clone(),
);

pin_mut!(stream_fut);
pin_mut!(notify);
Expand Down
Loading

0 comments on commit bb2caa8

Please sign in to comment.