Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bind outbound proxy to the IPv6 loopback #2854

Merged
merged 7 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Config {
) -> Result<Task>
where
R: FmtMetrics + Clone + Send + Sync + Unpin + 'static,
B: Bind<ServerConfig>,
B: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>>,
B::Addrs: svc::Param<Remote<ClientAddr>>,
B::Addrs: svc::Param<Local<ServerAddr>>,
B::Addrs: svc::Param<AddrPair>,
Expand Down
12 changes: 9 additions & 3 deletions linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ pub use crate::exp_backoff::ExponentialBackoff;
use crate::{
proxy::http::{self, h1, h2},
svc::{queue, CloneParam, ExtractParam, Param},
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive, ListenAddr},
};
use std::time::Duration;

#[derive(Clone, Debug)]
pub struct ServerConfig {
pub addr: ListenAddr,
pub addr: DualListenAddr,
pub keepalive: Keepalive,
pub h2_settings: h2::Settings,
}
Expand Down Expand Up @@ -67,9 +67,15 @@ impl ProxyConfig {

// === impl ServerConfig ===

impl Param<DualListenAddr> for ServerConfig {
fn param(&self) -> DualListenAddr {
self.addr
}
}

impl Param<ListenAddr> for ServerConfig {
fn param(&self) -> ListenAddr {
self.addr
ListenAddr(self.addr.0)
}
}

Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive},
ProxyRuntime,
};
pub use linkerd_app_test as support;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub fn default_config() -> Config {
allow_discovery: Some(cluster_local).into_iter().collect(),
proxy: config::ProxyConfig {
server: config::ServerConfig {
addr: ListenAddr(([0, 0, 0, 0], 0).into()),
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
},
Expand Down
51 changes: 43 additions & 8 deletions linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::*;
use linkerd_app_core::{
svc::Param,
transport::OrigDstAddr,
transport::{listen, orig_dst, Keepalive, ListenAddr},
transport::{listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr},
Result,
};
use std::{collections::HashSet, thread};
Expand All @@ -21,7 +20,7 @@ pub struct Proxy {
/// Inbound/outbound addresses helpful for mocking connections that do not
/// implement `server::Listener`.
inbound: MockOrigDst,
outbound: MockOrigDst,
outbound: MockDualOrigDst,

/// Inbound/outbound addresses for mocking connections that implement
/// `server::Listener`.
Expand Down Expand Up @@ -60,18 +59,24 @@ enum MockOrigDst {
None,
}

#[derive(Copy, Clone, Debug, Default)]
struct MockDualOrigDst {
inner: MockOrigDst,
}

// === impl MockOrigDst ===

impl<T> listen::Bind<T> for MockOrigDst
where
T: Param<Keepalive> + Param<ListenAddr>,
{
type Addrs = orig_dst::Addrs;
type BoundAddrs = Local<ServerAddr>;
type Io = tokio::net::TcpStream;
type Incoming =
Pin<Box<dyn Stream<Item = Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>>;

fn bind(self, params: &T) -> Result<listen::Bound<Self::Incoming>> {
fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> {
let (bound, incoming) = listen::BindTcp::default().bind(params)?;
let incoming = Box::pin(incoming.map(move |res| {
let (inner, tcp) = res?;
Expand Down Expand Up @@ -109,6 +114,24 @@ impl fmt::Debug for MockOrigDst {
}
}

// === impl MockDualOrigDst ===

impl<T> listen::Bind<T> for MockDualOrigDst
where
T: Param<Keepalive> + Param<ListenAddr>,
{
type Addrs = orig_dst::Addrs;
type BoundAddrs = (Local<ServerAddr>, Option<Local<ServerAddr>>);
type Io = tokio::net::TcpStream;
type Incoming =
Pin<Box<dyn Stream<Item = Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>>;

fn bind(self, params: &T) -> Result<(Self::BoundAddrs, Self::Incoming)> {
let (bound, incoming) = self.inner.bind(params)?;
Ok(((bound, None), incoming))
}
}

// === impl Proxy ===

impl Proxy {
Expand Down Expand Up @@ -163,13 +186,17 @@ impl Proxy {
}

pub fn outbound(mut self, s: server::Listening) -> Self {
self.outbound = MockOrigDst::Addr(s.addr);
self.outbound = MockDualOrigDst {
inner: MockOrigDst::Addr(s.addr),
};
self.outbound_server = Some(s);
self
}

pub fn outbound_ip(mut self, s: SocketAddr) -> Self {
self.outbound = MockOrigDst::Addr(s);
self.outbound = MockDualOrigDst {
inner: MockOrigDst::Addr(s),
};
self
}

Expand Down Expand Up @@ -476,6 +503,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
identity_addr,
main.inbound_addr(),
main.outbound_addr(),
main.outbound_addr_additional(),
main.admin_addr(),
);
let mut running = Some((running_tx, addrs));
Expand Down Expand Up @@ -531,15 +559,22 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
})
.expect("spawn");

let (tap_addr, identity_addr, inbound_addr, outbound_addr, admin_addr) =
running_rx.await.unwrap();
let (
tap_addr,
identity_addr,
inbound_addr,
outbound_addr,
outbound_addr_additional,
admin_addr,
) = running_rx.await.unwrap();

tracing::info!(
tap.addr = ?tap_addr,
identity.addr = ?identity_addr,
inbound.addr = ?inbound_addr,
inbound.orig_dst = ?inbound,
outbound.addr = ?outbound_addr,
outbound.addr.additional = ?outbound_addr_additional,
outbound.orig_dst = ?outbound,
metrics.addr = ?admin_addr,
);
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive},
IpMatch, IpNet, ProxyRuntime,
};
pub use linkerd_app_test as support;
Expand All @@ -24,7 +24,7 @@ pub(crate) fn default_config() -> Config {
allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(),
proxy: config::ProxyConfig {
server: config::ServerConfig {
addr: ListenAddr(([0, 0, 0, 0], 0).into()),
addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
},
Expand Down
37 changes: 28 additions & 9 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use linkerd_app_core::{
control::{Config as ControlConfig, ControlAddr},
proxy::http::{h1, h2},
tls,
transport::{Keepalive, ListenAddr},
transport::{DualListenAddr, Keepalive, ListenAddr},
Addr, AddrMatch, Conditional, IpNet,
};
use linkerd_tonic_stream::ReceiveLimits;
Expand Down Expand Up @@ -81,6 +81,8 @@ pub enum ParseError {
NotAPortRange,
#[error(transparent)]
AddrError(addr::Error),
#[error("only two addresses are supported")]
TooManyAddrs,
#[error("not a valid identity name")]
NameError,
#[error("could not read token source")]
Expand All @@ -93,6 +95,7 @@ pub enum ParseError {

// Environment variables to look at when loading the configuration
pub const ENV_OUTBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_LISTEN_ADDR";
pub const ENV_OUTBOUND_LISTEN_ADDRS: &str = "LINKERD2_PROXY_OUTBOUND_LISTEN_ADDRS";
pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR";
pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR";
pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR";
Expand Down Expand Up @@ -340,6 +343,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
// Parse all the environment variables. `parse` will log any errors so
// defer returning any errors until all of them have been parsed.
let outbound_listener_addr = parse(strings, ENV_OUTBOUND_LISTEN_ADDR, parse_socket_addr);
let outbound_listener_addrs = parse(strings, ENV_OUTBOUND_LISTEN_ADDRS, parse_socket_addrs);
let inbound_listener_addr = parse(strings, ENV_INBOUND_LISTEN_ADDR, parse_socket_addr);
let admin_listener_addr = parse(strings, ENV_ADMIN_LISTEN_ADDR, parse_socket_addr);

Expand Down Expand Up @@ -463,10 +467,16 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
)?
.unwrap_or(ingress_mode);

let addr = ListenAddr(
outbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()),
);
let addr = match outbound_listener_addrs {
Ok(Some(addrs)) if addrs.len() == 1 => DualListenAddr(addrs[0], None),
Ok(Some(addrs)) if addrs.len() == 2 => DualListenAddr(addrs[0], Some(addrs[1])),
olix0r marked this conversation as resolved.
Show resolved Hide resolved
_ => {
let addr = outbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap());
DualListenAddr(addr, None)
}
};

let keepalive = Keepalive(outbound_accept_keepalive?);
let server = ServerConfig {
addr,
Expand Down Expand Up @@ -544,9 +554,10 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
.unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap());

let inbound = {
let addr = ListenAddr(
let addr = DualListenAddr(
inbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()),
None,
);
let keepalive = Keepalive(inbound_accept_keepalive?);
let server = ServerConfig {
Expand Down Expand Up @@ -586,7 +597,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
// Ensure that connections that directly target the inbound port are secured (unless
// identity is disabled).
let policy = {
let inbound_port = server.addr.port();
let inbound_port = ListenAddr(server.addr.0).port();

let cluster_nets = parse(strings, ENV_POLICY_CLUSTER_NETWORKS, parse_networks)?
.unwrap_or_else(|| {
Expand Down Expand Up @@ -745,7 +756,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let admin = super::admin::Config {
metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE),
server: ServerConfig {
addr: ListenAddr(admin_listener_addr),
addr: DualListenAddr(admin_listener_addr, None),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
},
Expand Down Expand Up @@ -804,7 +815,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
.map(|(addr, ids)| super::tap::Config::Enabled {
permitted_client_ids: ids,
config: ServerConfig {
addr: ListenAddr(addr),
addr: DualListenAddr(addr, None),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
},
Expand Down Expand Up @@ -1036,6 +1047,14 @@ fn parse_socket_addr(s: &str) -> Result<SocketAddr, ParseError> {
}
}

fn parse_socket_addrs(s: &str) -> Result<Vec<SocketAddr>, ParseError> {
let addrs: Vec<&str> = s.split(',').collect();
if addrs.len() > 2 {
return Err(ParseError::TooManyAddrs);
}
addrs.iter().map(|s| parse_socket_addr(s)).collect()
}

fn parse_ip_set(s: &str) -> Result<HashSet<IpAddr>, ParseError> {
s.split(',')
.map(|s| s.parse::<IpAddr>().map_err(Into::into))
Expand Down
14 changes: 10 additions & 4 deletions linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct App {
inbound_addr: Local<ServerAddr>,
oc_collector: oc_collector::OcCollector,
outbound_addr: Local<ServerAddr>,
outbound_addr_additional: Option<Local<ServerAddr>>,
start_proxy: Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
tap: tap::Tap,
}
Expand All @@ -102,17 +103,17 @@ impl Config {
mut registry: prom::Registry,
) -> Result<App, Error>
where
BIn: Bind<ServerConfig> + 'static,
BIn: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>> + 'static,
BIn::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BOut: Bind<ServerConfig> + 'static,
BOut: Bind<ServerConfig, BoundAddrs = DualLocal<ServerAddr>> + 'static,
BOut::Addrs: Param<Remote<ClientAddr>>
+ Param<Local<ServerAddr>>
+ Param<OrigDstAddr>
+ Param<AddrPair>,
BAdmin: Bind<ServerConfig> + Clone + 'static,
BAdmin: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>> + Clone + 'static,
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<AddrPair>,
{
let Config {
Expand Down Expand Up @@ -250,7 +251,7 @@ impl Config {
gateway.into_inner(),
);

let (outbound_addr, outbound_listen) = bind_out
let ((outbound_addr, outbound_addr_additional), outbound_listen) = bind_out
.bind(&outbound.config().proxy.server)
.expect("Failed to bind outbound listener");
let outbound_metrics = outbound.metrics();
Expand Down Expand Up @@ -310,6 +311,7 @@ impl Config {
inbound_addr,
oc_collector,
outbound_addr,
outbound_addr_additional,
start_proxy,
tap,
})
Expand Down Expand Up @@ -344,6 +346,10 @@ impl App {
self.outbound_addr
}

pub fn outbound_addr_additional(&self) -> Option<Local<ServerAddr>> {
self.outbound_addr_additional
}

pub fn tap_addr(&self) -> Option<Local<ServerAddr>> {
match self.tap {
tap::Tap::Disabled { .. } => None,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Config {
drain: drain::Watch,
) -> Result<Tap, Error>
where
B: Bind<ServerConfig>,
B: Bind<ServerConfig, BoundAddrs = Local<ServerAddr>>,
B::Addrs: Param<Remote<ClientAddr>>,
B::Addrs: Param<AddrPair>,
{
Expand Down
7 changes: 7 additions & 0 deletions linkerd/proxy/transport/src/addrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub struct ClientAddr(pub SocketAddr);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct ListenAddr(pub SocketAddr);

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct DualListenAddr(pub SocketAddr, pub Option<SocketAddr>);

/// The address of a server.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct ServerAddr(pub SocketAddr);
Expand All @@ -24,6 +27,10 @@ pub struct OrigDstAddr(pub SocketAddr);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Local<T>(pub T);

/// Pair wrapping a local address and optionally a second one, used when binding to both IPv4 and
/// IPv6
pub type DualLocal<T> = (Local<T>, Option<Local<T>>);

/// Wraps an address type to indicate it describes another process.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Remote<T>(pub T);
Expand Down
Loading
Loading