Skip to content
This repository was archived by the owner on Jun 21, 2024. It is now read-only.

Commit 250082c

Browse files
committed
hook-worker: deny traffic to internal IPs
1 parent e2ce466 commit 250082c

File tree

5 files changed

+82
-2
lines changed

5 files changed

+82
-2
lines changed

hook-worker/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub struct Config {
3737

3838
#[envconfig(default = "1")]
3939
pub dequeue_batch_size: u32,
40+
41+
#[envconfig(default = "false")]
42+
pub allow_internal_ips: bool,
4043
}
4144

4245
impl Config {

hook-worker/src/dns.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::error::Error as StdError;
2+
use std::io;
3+
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
4+
5+
use futures::FutureExt;
6+
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
7+
use tokio::task::spawn_blocking;
8+
9+
/// Internal reqwest type, copied here as part of Resolving
10+
pub(crate) type BoxError = Box<dyn StdError + Send + Sync>;
11+
12+
/// Returns [`true`] if the address appears to be a globally reachable IPv4.
13+
///
14+
/// Trimmed down version of the unstable IpAddr::is_global, move to it when it's stable.
15+
fn is_global_ipv4(addr: &SocketAddr) -> bool {
16+
match addr.ip() {
17+
IpAddr::V4(ip) => {
18+
!(ip.octets()[0] == 0 // "This network"
19+
|| ip.is_private()
20+
|| ip.is_loopback()
21+
|| ip.is_link_local()
22+
|| ip.is_broadcast())
23+
}
24+
IpAddr::V6(_) => false, // Our network does not currently support ipv6, let's ignore for now
25+
}
26+
}
27+
28+
/// DNS resolver using the stdlib resolver, but filtering results to only pass public IPv4 results.
29+
///
30+
/// Private and broadcast addresses are filtered out, so are IPv6 results for now (as our infra
31+
/// does not currently support IPv6 routing anyway).
32+
/// This is adapted from the GaiResolver in hyper and reqwest.
33+
pub struct PublicIPv4Resolver {}
34+
35+
impl Resolve for PublicIPv4Resolver {
36+
fn resolve(&self, name: Name) -> Resolving {
37+
// Closure to call the system's resolver (blocking call) through the ToSocketAddrs trait.
38+
let resolve_host = move || (name.as_str(), 0).to_socket_addrs();
39+
40+
// Execute the blocking call in a separate worker thread then process its result asynchronously.
41+
// spawn_blocking returns a JoinHandle that implements Future<Result<(closure result), JoinError>>.
42+
let future_result = spawn_blocking(resolve_host).map(|result| match result {
43+
Ok(Ok(addr)) => {
44+
// Resolution succeeded, pass the IPs in a Box after filtering
45+
let addrs: Addrs = Box::new(addr.filter(is_global_ipv4));
46+
Ok(addrs)
47+
}
48+
Ok(Err(err)) => {
49+
// Resolution failed, pass error through in a Box
50+
let err: BoxError = Box::new(err);
51+
Err(err)
52+
}
53+
Err(join_err) => {
54+
// The tokio task failed, error handled copied from hyper's GaiResolver
55+
if join_err.is_cancelled() {
56+
let err: BoxError =
57+
Box::new(io::Error::new(io::ErrorKind::Interrupted, join_err));
58+
Err(err)
59+
} else {
60+
panic!("background task failed: {:?}", join_err)
61+
}
62+
}
63+
});
64+
65+
// Box the Future to satisfy the Resolving interface.
66+
Box::pin(future_result)
67+
}
68+
}

hook-worker/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod config;
2+
pub mod dns;
23
pub mod error;
34
pub mod util;
45
pub mod worker;

hook-worker/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async fn main() -> Result<(), WorkerError> {
5252
config.request_timeout.0,
5353
config.max_concurrent_jobs,
5454
retry_policy_builder.provide(),
55+
config.allow_internal_ips,
5556
worker_liveness,
5657
);
5758

hook-worker/src/worker.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use reqwest::header;
1818
use tokio::sync;
1919
use tracing::error;
2020

21+
use crate::dns::PublicIPv4Resolver;
2122
use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError};
2223
use crate::util::first_n_bytes_of_response;
2324

@@ -84,6 +85,7 @@ impl<'p> WebhookWorker<'p> {
8485
request_timeout: time::Duration,
8586
max_concurrent_jobs: usize,
8687
retry_policy: RetryPolicy,
88+
allow_internal_ips: bool,
8789
liveness: HealthHandle,
8890
) -> Self {
8991
let mut headers = header::HeaderMap::new();
@@ -92,10 +94,14 @@ impl<'p> WebhookWorker<'p> {
9294
header::HeaderValue::from_static("application/json"),
9395
);
9496

95-
let client = reqwest::Client::builder()
97+
let mut client_builder = reqwest::Client::builder()
9698
.default_headers(headers)
9799
.user_agent("PostHog Webhook Worker")
98-
.timeout(request_timeout)
100+
.timeout(request_timeout);
101+
if !allow_internal_ips {
102+
client_builder = client_builder.dns_resolver(Arc::new(PublicIPv4Resolver {}))
103+
}
104+
let client = client_builder
99105
.build()
100106
.expect("failed to construct reqwest client for webhook worker");
101107

@@ -569,6 +575,7 @@ mod tests {
569575
time::Duration::from_millis(5000),
570576
10,
571577
RetryPolicy::default(),
578+
false,
572579
liveness,
573580
);
574581

0 commit comments

Comments
 (0)