Skip to content

Commit

Permalink
Merge branch 'domain-support'
Browse files Browse the repository at this point in the history
Closes #5.
  • Loading branch information
mbr committed Dec 30, 2023
2 parents 67e201f + ce0ca4c commit 7df2651
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 66 deletions.
2 changes: 1 addition & 1 deletion quick-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

export REGISTRY_ADDR=127.0.0.1:3000

if [ $PODMAN_IS_REMOTE == "true" ]; then
if [ "x$PODMAN_IS_REMOTE" == "xtrue" ]; then
export REGISTRY_ADDR=$(dig +short $(hostname)):3000
fi

Expand Down
222 changes: 157 additions & 65 deletions src/reverse_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,157 @@ use std::{
fmt::{self, Display},
mem,
net::SocketAddr,
str::{self, FromStr},
sync::Arc,
};

use axum::{
body::Body,
extract::{Request, State},
http::StatusCode,
http::{
header::HOST,
uri::{Authority, Parts, PathAndQuery, Scheme},
StatusCode, Uri,
},
response::{IntoResponse, Response},
routing::any,
Router,
};
use itertools::Itertools;
use tokio::sync::RwLock;
use tracing::{trace, warn};

use crate::registry::storage::ImageLocation;

pub(crate) struct ReverseProxy {
client: reqwest::Client,
containers: RwLock<HashMap<ImageLocation, PublishedContainer>>,
routing_table: RwLock<RoutingTable>,
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) struct PublishedContainer {
host_addr: SocketAddr,
image_location: ImageLocation,
}

#[derive(Debug, Default)]
pub(crate) struct RoutingTable {
path_maps: HashMap<ImageLocation, PublishedContainer>,
domain_maps: HashMap<Domain, PublishedContainer>,
}

impl RoutingTable {
#[inline(always)]
fn get_path_route(&self, image_location: &ImageLocation) -> Option<&PublishedContainer> {
self.path_maps.get(image_location)
}

#[inline(always)]
fn get_domain_route(&self, domain: &Domain) -> Option<&PublishedContainer> {
self.domain_maps.get(domain)
}
}

#[derive(Debug, Hash, Eq, PartialEq)]
struct Domain(String);

impl Domain {
fn new(raw: &str) -> Option<Self> {
let domain_name = raw.to_lowercase();
if !domain_name.contains('.') {
return None;
}

Some(Self(domain_name))
}
}

impl PartialEq<String> for Domain {
fn eq(&self, other: &String) -> bool {
other.to_lowercase() == self.0
}
}

impl RoutingTable {
fn from_containers(containers: impl IntoIterator<Item = PublishedContainer>) -> Self {
let mut path_maps = HashMap::new();
let mut domain_maps = HashMap::new();

for container in containers {
if let Some(domain) = Domain::new(&container.image_location.repository()) {
domain_maps.insert(domain, container.clone());
}

path_maps.insert(container.image_location.clone(), container);
}

Self {
path_maps,
domain_maps,
}
}

// TODO: Consider return a `Uri`` instead.
fn get_destination_uri_from_request(&self, request: &Request) -> Option<Uri> {
let req_uri = request.uri();

// First, attempt to match a domain.
let opt_domain = if let Some(host_header) = request
.headers()
.get(HOST)
.and_then(|h| str::from_utf8(h.as_bytes()).ok())
{
let candidate = if let Some(colon) = host_header.rfind(':') {
&host_header[..colon]
} else {
host_header
};

Domain::new(candidate)
} else {
None
};

if let Some(pc) = opt_domain.and_then(|domain| self.get_domain_route(&domain)) {
// We only need to swap the protocol and domain and we're good to go.
let mut parts = req_uri.clone().into_parts();
parts.scheme = Some(Scheme::HTTP);
parts.authority = Some(
Authority::from_str(&pc.host_addr.to_string())
.expect("SocketAddr should never fail to convert to Authority"),
);
return Some(Uri::from_parts(parts).expect("should not have invalidated Uri"));
}

// Matching a domain did not succeed, let's try with a path.
// Reconstruct image location from path segments, keeping remainder intact.
if let Some((image_location, remainder)) = split_path_base_url(req_uri) {
if let Some(pc) = self.get_path_route(&image_location) {
let container_addr = pc.host_addr;

let mut dest_path_and_query = remainder;

if req_uri.path().ends_with('/') {
dest_path_and_query.push('/');
}

if let Some(query) = req_uri.query() {
dest_path_and_query.push('?');
dest_path_and_query += query;
}

let mut parts = Parts::default();
parts.scheme = Some(Scheme::HTTP);
parts.authority = Some(Authority::from_str(&container_addr.to_string()).unwrap());
parts.path_and_query = Some(PathAndQuery::from_str(&dest_path_and_query).unwrap());

return Some(Uri::from_parts(parts).unwrap());
}
}

None
}
}

#[derive(Debug)]
enum AppError {
NoSuchContainer,
Expand Down Expand Up @@ -86,88 +211,57 @@ impl ReverseProxy {
pub(crate) fn new() -> Arc<Self> {
Arc::new(ReverseProxy {
client: reqwest::Client::new(),
containers: RwLock::new(HashMap::new()),
routing_table: RwLock::new(Default::default()),
})
}

pub(crate) fn make_router(self: Arc<Self>) -> Router {
Router::new()
.route("/:repository/:image", any(reverse_proxy))
.route("/:repository/:image/", any(reverse_proxy))
.route("/:repository/:image/*remainder", any(reverse_proxy))
.with_state(self)
Router::new().fallback(route_request).with_state(self)
}

pub(crate) async fn update_containers(
&self,
containers: impl Iterator<Item = PublishedContainer>,
) {
let mut new_mapping = containers
.map(|pc| (pc.image_location.clone(), pc))
.collect();
let mut routing_table = RoutingTable::from_containers(containers);

let mut guard = self.containers.write().await;
mem::swap(&mut *guard, &mut new_mapping);
let mut guard = self.routing_table.write().await;
mem::swap(&mut *guard, &mut routing_table);
}
}

async fn reverse_proxy(
State(rp): State<Arc<ReverseProxy>>,
request: Request,
) -> Result<Response<Body>, AppError> {
// Determine rewritten URL.
let req_uri = request.uri();

let mut segments = req_uri
.path()
.split('/')
.filter(|segment| !segment.is_empty());

let image_location = ImageLocation::new(
segments
.next()
.ok_or(AppError::AssertionFailed("repository segment disappeared"))?
.to_owned(),
segments
.next()
.ok_or(AppError::AssertionFailed("image segment disappeared"))?
.to_owned(),
);

// TODO: Return better error (404?).
let dest_addr = rp
.containers
.read()
.await
.get(&image_location)
.ok_or(AppError::NoSuchContainer)?
.host_addr;
let base_url = format!("http://{dest_addr}");

// Format is: '' / repository / image / ...
// Need to skip the first three.
let cleaned_path = segments.join("/");
fn split_path_base_url(uri: &Uri) -> Option<(ImageLocation, String)> {
// Reconstruct image location from path segments, keeping remainder intact.
let mut segments = uri.path().split('/').filter(|segment| !segment.is_empty());

let mut dest_path_and_query = cleaned_path;
let image_location =
ImageLocation::new(segments.next()?.to_owned(), segments.next()?.to_owned());

if req_uri.path().ends_with('/') {
dest_path_and_query.push('/');
}
// Now create the path, format is: '' / repository / image / ...
// Need to skip the first three.
let remainder = segments.join("/");

if let Some(query) = req_uri.query() {
dest_path_and_query.push('?');
dest_path_and_query += query;
}
Some((image_location, remainder))
}

let dest_uri = format!("{base_url}/{dest_path_and_query}");
trace!(%dest_uri, "reverse proxying");
async fn route_request(
State(rp): State<Arc<ReverseProxy>>,
request: Request,
) -> Result<Response, AppError> {
let dest_uri = {
let routing_table = rp.routing_table.read().await;
routing_table.get_destination_uri_from_request(&request)
};

let dest = dest_uri.ok_or(AppError::NoSuchContainer)?;
trace!(%dest, "reverse proxying");

// Note: `reqwest` and `axum` currently use different versions of `http`
let method =
request.method().to_string().parse().map_err(|_| {
AppError::AssertionFailed("method http version mismatch workaround failed")
})?;
let response = rp.client.request(method, &dest_uri).send().await;
let response = rp.client.request(method, dest.to_string()).send().await;

match response {
Ok(response) => {
Expand All @@ -187,7 +281,7 @@ async fn reverse_proxy(
.map_err(|_| AppError::AssertionFailed("should not fail to construct response"))?)
}
Err(err) => {
warn!(%err, %dest_uri, "failed request");
warn!(%err, %dest, "failed request");
Ok(Response::builder()
.status(500)
.body(Body::empty())
Expand All @@ -213,5 +307,3 @@ mod hop_by_hop {
];
}
use hop_by_hop::HOP_BY_HOP;

use crate::registry::storage::ImageLocation;

0 comments on commit 7df2651

Please sign in to comment.