Skip to content

Commit

Permalink
Bind outbound proxy to the IPv6 loopback (#2854)
Browse files Browse the repository at this point in the history
In order for the proxy to support IPv6 traffic we need to bind the outbound proxy to the IPv6 loopback (`::1`), in addition to the existing binding to the IPv4 loopback (`127.0.0.1`).

Note, this is not necessary for inbound; on the proxy side we'll set `LINKERD2_PROXY_INBOUND_LISTEN_ADDR` (and the other `*_LISTEN_ADDR`) vars to the IPv6 wildcard address (`::`) which also works for IPv4.

This change adds the `LINKERD2_PROXY_OUTBOUND_LISTEN_ADDRS` var, which is supposed to be set as `127.0.0.1:4140,[::1]:4140` (maximum two entries). If not set, we default to the value in `LINKERD2_PROXY_OUTBOUND_LISTEN_ADDR`.

The field `ServerConfig.addr` is now of the new type `DualListenAddr` which holds a primary `SocketAddr` and an optional one.
The outbound listener is now bound via the `DualBind` implementation of `Bind` which also binds to the second `SocketAddr` if set, merging the two streams using Tokio's `merge` stream extension method.

Co-authored-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
alpeb and olix0r authored Apr 22, 2024
1 parent e9b8246 commit 5268124
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 39 deletions.
2 changes: 1 addition & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Config {
) -> Result<Task>
where
R: FmtMetrics + Clone + Send + Sync + Unpin + 'static,
B: Bind<ServerConfig>,
B: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>>,
B::Addrs: svc::Param<Remote<ClientAddr>>,
B::Addrs: svc::Param<Local<ServerAddr>>,
B::Addrs: svc::Param<AddrPair>,
Expand Down
12 changes: 9 additions & 3 deletions linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ pub use crate::exp_backoff::ExponentialBackoff;
use crate::{
proxy::http::{self, h1, h2},
svc::{queue, CloneParam, ExtractParam, Param},
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive, ListenAddr},
};
use std::time::Duration;

#[derive(Clone, Debug)]
pub struct ServerConfig {
pub addr: ListenAddr,
pub addr: DualListenAddr,
pub keepalive: Keepalive,
pub h2_settings: h2::Settings,
}
Expand Down Expand Up @@ -67,9 +67,15 @@ impl ProxyConfig {

// === impl ServerConfig ===

impl Param<DualListenAddr> for ServerConfig {
fn param(&self) -> DualListenAddr {
self.addr
}
}

impl Param<ListenAddr> for ServerConfig {
fn param(&self) -> ListenAddr {
self.addr
ListenAddr(self.addr.0)
}
}

Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive},
ProxyRuntime,
};
pub use linkerd_app_test as support;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub fn default_config() -> Config {
allow_discovery: Some(cluster_local).into_iter().collect(),
proxy: config::ProxyConfig {
server: config::ServerConfig {
addr: ListenAddr(([0, 0, 0, 0], 0).into()),
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
},
Expand Down
51 changes: 43 additions & 8 deletions linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::*;
use linkerd_app_core::{
svc::Param,
transport::OrigDstAddr,
transport::{listen, orig_dst, Keepalive, ListenAddr},
transport::{listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr},
Result,
};
use std::{collections::HashSet, thread};
Expand All @@ -21,7 +20,7 @@ pub struct Proxy {
/// Inbound/outbound addresses helpful for mocking connections that do not
/// implement `server::Listener`.
inbound: MockOrigDst,
outbound: MockOrigDst,
outbound: MockDualOrigDst,

/// Inbound/outbound addresses for mocking connections that implement
/// `server::Listener`.
Expand Down Expand Up @@ -60,18 +59,24 @@ enum MockOrigDst {
None,
}

#[derive(Copy, Clone, Debug, Default)]
struct MockDualOrigDst {
inner: MockOrigDst,
}

// === impl MockOrigDst ===

impl<T> listen::Bind<T> for MockOrigDst
where
T: Param<Keepalive> + Param<ListenAddr>,
{
type Addrs = orig_dst::Addrs;
type BoundAddrs = Local<ServerAddr>;
type Io = tokio::net::TcpStream;
type Incoming =
Pin<Box<dyn Stream<Item = Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>>;

fn bind(self, params: &T) -> Result<listen::Bound<Self::Incoming>> {
fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> {
let (bound, incoming) = listen::BindTcp::default().bind(params)?;
let incoming = Box::pin(incoming.map(move |res| {
let (inner, tcp) = res?;
Expand Down Expand Up @@ -109,6 +114,24 @@ impl fmt::Debug for MockOrigDst {
}
}

// === impl MockDualOrigDst ===

impl<T> listen::Bind<T> for MockDualOrigDst
where
T: Param<Keepalive> + Param<ListenAddr>,
{
type Addrs = orig_dst::Addrs;
type BoundAddrs = (Local<ServerAddr>, Option<Local<ServerAddr>>);
type Io = tokio::net::TcpStream;
type Incoming =
Pin<Box<dyn Stream<Item = Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>>;

fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> {
let (bound, incoming) = self.inner.bind(params)?;
Ok(((bound, None), incoming))
}
}

// === impl Proxy ===

impl Proxy {
Expand Down Expand Up @@ -163,13 +186,17 @@ impl Proxy {
}

pub fn outbound(mut self, s: server::Listening) -> Self {
self.outbound = MockOrigDst::Addr(s.addr);
self.outbound = MockDualOrigDst {
inner: MockOrigDst::Addr(s.addr),
};
self.outbound_server = Some(s);
self
}

pub fn outbound_ip(mut self, s: SocketAddr) -> Self {
self.outbound = MockOrigDst::Addr(s);
self.outbound = MockDualOrigDst {
inner: MockOrigDst::Addr(s),
};
self
}

Expand Down Expand Up @@ -476,6 +503,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
identity_addr,
main.inbound_addr(),
main.outbound_addr(),
main.outbound_addr_additional(),
main.admin_addr(),
);
let mut running = Some((running_tx, addrs));
Expand Down Expand Up @@ -531,15 +559,22 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
})
.expect("spawn");

let (tap_addr, identity_addr, inbound_addr, outbound_addr, admin_addr) =
running_rx.await.unwrap();
let (
tap_addr,
identity_addr,
inbound_addr,
outbound_addr,
outbound_addr_additional,
admin_addr,
) = running_rx.await.unwrap();

tracing::info!(
tap.addr = ?tap_addr,
identity.addr = ?identity_addr,
inbound.addr = ?inbound_addr,
inbound.orig_dst = ?inbound,
outbound.addr = ?outbound_addr,
outbound.addr.additional = ?outbound_addr_additional,
outbound.orig_dst = ?outbound,
metrics.addr = ?admin_addr,
);
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive},
IpMatch, IpNet, ProxyRuntime,
};
pub use linkerd_app_test as support;
Expand All @@ -24,7 +24,7 @@ pub(crate) fn default_config() -> Config {
allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(),
proxy: config::ProxyConfig {
server: config::ServerConfig {
addr: ListenAddr(([0, 0, 0, 0], 0).into()),
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
},
Expand Down
37 changes: 28 additions & 9 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use linkerd_app_core::{
control::{Config as ControlConfig, ControlAddr},
proxy::http::{h1, h2},
tls,
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive, ListenAddr},
Addr, AddrMatch, Conditional, IpNet,
};
use linkerd_tonic_stream::ReceiveLimits;
Expand Down Expand Up @@ -81,6 +81,8 @@ pub enum ParseError {
NotAPortRange,
#[error(transparent)]
AddrError(addr::Error),
#[error("only two addresses are supported")]
TooManyAddrs,
#[error("not a valid identity name")]
NameError,
#[error("could not read token source")]
Expand All @@ -93,6 +95,7 @@ pub enum ParseError {

// Environment variables to look at when loading the configuration
pub const ENV_OUTBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_LISTEN_ADDR";
pub const ENV_OUTBOUND_LISTEN_ADDRS: &str = "LINKERD2_PROXY_OUTBOUND_LISTEN_ADDRS";
pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR";
pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR";
pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR";
Expand Down Expand Up @@ -340,6 +343,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
// Parse all the environment variables. `parse` will log any errors so
// defer returning any errors until all of them have been parsed.
let outbound_listener_addr = parse(strings, ENV_OUTBOUND_LISTEN_ADDR, parse_socket_addr);
let outbound_listener_addrs = parse(strings, ENV_OUTBOUND_LISTEN_ADDRS, parse_socket_addrs);
let inbound_listener_addr = parse(strings, ENV_INBOUND_LISTEN_ADDR, parse_socket_addr);
let admin_listener_addr = parse(strings, ENV_ADMIN_LISTEN_ADDR, parse_socket_addr);

Expand Down Expand Up @@ -463,10 +467,16 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
)?
.unwrap_or(ingress_mode);

let addr = ListenAddr(
outbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()),
);
let addr = match outbound_listener_addrs? {
Some(addrs) if addrs.len() == 1 => DualListenAddr(addrs[0], None),
Some(addrs) if addrs.len() == 2 => DualListenAddr(addrs[0], Some(addrs[1])),
_ => {
let addr = outbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap());
DualListenAddr(addr, None)
}
};

let keepalive = Keepalive(outbound_accept_keepalive?);
let server = ServerConfig {
addr,
Expand Down Expand Up @@ -544,9 +554,10 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
.unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap());

let inbound = {
let addr = ListenAddr(
let addr = DualListenAddr(
inbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()),
None,
);
let keepalive = Keepalive(inbound_accept_keepalive?);
let server = ServerConfig {
Expand Down Expand Up @@ -586,7 +597,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
// Ensure that connections that directly target the inbound port are secured (unless
// identity is disabled).
let policy = {
let inbound_port = server.addr.port();
let inbound_port = ListenAddr(server.addr.0).port();

let cluster_nets = parse(strings, ENV_POLICY_CLUSTER_NETWORKS, parse_networks)?
.unwrap_or_else(|| {
Expand Down Expand Up @@ -745,7 +756,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let admin = super::admin::Config {
metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE),
server: ServerConfig {
addr: ListenAddr(admin_listener_addr),
addr: DualListenAddr(admin_listener_addr, None),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
},
Expand Down Expand Up @@ -804,7 +815,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
.map(|(addr, ids)| super::tap::Config::Enabled {
permitted_client_ids: ids,
config: ServerConfig {
addr: ListenAddr(addr),
addr: DualListenAddr(addr, None),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
},
Expand Down Expand Up @@ -1036,6 +1047,14 @@ fn parse_socket_addr(s: &str) -> Result<SocketAddr, ParseError> {
}
}

fn parse_socket_addrs(s: &str) -> Result<Vec<SocketAddr>, ParseError> {
let addrs: Vec<&str> = s.split(',').collect();
if addrs.len() > 2 {
return Err(ParseError::TooManyAddrs);
}
addrs.iter().map(|s| parse_socket_addr(s)).collect()
}

fn parse_ip_set(s: &str) -> Result<HashSet<IpAddr>, ParseError> {
s.split(',')
.map(|s| s.parse::<IpAddr>().map_err(Into::into))
Expand Down
14 changes: 10 additions & 4 deletions linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct App {
inbound_addr: Local<ServerAddr>,
oc_collector: oc_collector::OcCollector,
outbound_addr: Local<ServerAddr>,
outbound_addr_additional: Option<Local<ServerAddr>>,
start_proxy: Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
tap: tap::Tap,
}
Expand All @@ -102,17 +103,17 @@ impl Config {
mut registry: prom::Registry,
) -> Result<App, Error>
where
BIn: Bind<ServerConfig> + 'static,
BIn: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>> + 'static,
BIn::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BOut: Bind<ServerConfig> + 'static,
BOut: Bind<ServerConfig, BoundAddrs = DualLocal<ServerAddr>> + 'static,
BOut::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BAdmin: Bind<ServerConfig> + Clone + 'static,
BAdmin: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>> + Clone + 'static,
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<AddrPair>,
{
let Config {
Expand Down Expand Up @@ -250,7 +251,7 @@ impl Config {
gateway.into_inner(),
);

let (outbound_addr, outbound_listen) = bind_out
let ((outbound_addr, outbound_addr_additional), outbound_listen) = bind_out
.bind(&outbound.config().proxy.server)
.expect("Failed to bind outbound listener");
let outbound_metrics = outbound.metrics();
Expand Down Expand Up @@ -310,6 +311,7 @@ impl Config {
inbound_addr,
oc_collector,
outbound_addr,
outbound_addr_additional,
start_proxy,
tap,
})
Expand Down Expand Up @@ -344,6 +346,10 @@ impl App {
self.outbound_addr
}

pub fn outbound_addr_additional(&self) -> Option<Local<ServerAddr>> {
self.outbound_addr_additional
}

pub fn tap_addr(&self) -> Option<Local<ServerAddr>> {
match self.tap {
tap::Tap::Disabled { .. } => None,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Config {
drain: drain::Watch,
) -> Result<Tap, Error>
where
B: Bind<ServerConfig>,
B: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>>,
B::Addrs: Param<Remote<ClientAddr>>,
B::Addrs: Param<AddrPair>,
{
Expand Down
7 changes: 7 additions & 0 deletions linkerd/proxy/transport/src/addrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub struct ClientAddr(pub SocketAddr);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct ListenAddr(pub SocketAddr);

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct DualListenAddr(pub SocketAddr, pub Option<SocketAddr>);

/// The address of a server.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct ServerAddr(pub SocketAddr);
Expand All @@ -24,6 +27,10 @@ pub struct OrigDstAddr(pub SocketAddr);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Local<T>(pub T);

/// Pair wrapping a local address and optionally a second one, used when binding to both IPv4 and
/// IPv6
pub type DualLocal<T> = (Local<T>, Option<Local<T>>);

/// Wraps an address type to indicate it describes another process.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Remote<T>(pub T);
Expand Down
Loading

0 comments on commit 5268124

Please sign in to comment.