diff --git a/Cargo.lock b/Cargo.lock index a51680b..b9b92f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,9 +182,9 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "itoa", "matchit", "memchr", @@ -208,8 +208,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -249,6 +249,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.3.2" @@ -547,12 +553,19 @@ dependencies = [ name = "dephy-pproxy" version = "0.1.0" dependencies = [ + "base64 0.22.1", + "bytes", "clap", "futures", "hex", + "http 1.1.0", + "http-body-util", "httparse", + "hyper 1.3.1", + "hyper-util", "litep2p", "multiaddr", + "percent-encoding", "prost 0.12.4", "thiserror", "tokio", @@ -887,7 +900,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -971,6 +984,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -978,7 +1002,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1011,8 +1058,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1024,18 +1071,52 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "tokio", +] + [[package]] name = "idna" version = "0.4.0" @@ -2738,9 +2819,9 @@ dependencies = [ "base64 0.21.7", "bytes", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-timeout", "percent-encoding", "pin-project", @@ -2774,9 +2855,9 @@ checksum = "dc3b0e1cedbf19fdfb78ef3d672cb9928e0a91a9cb4629cc0c916e8cff8aaaa1" dependencies = [ "base64 0.21.7", "bytes", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "pin-project", "tokio-stream", "tonic", @@ -2816,8 +2897,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "http-range-header", "pin-project-lite", "tower-layer", @@ -2959,7 +3040,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.12", "httparse", "log", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 65d2355..e2e88d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,13 +4,20 @@ version = "0.1.0" edition = "2021" [dependencies] +base64 = "0.22.1" +bytes = "1.6.0" clap = "4.5.4" futures = "0.3.30" hex = "0.4.3" +http = "1.1.0" +http-body-util = "0.1.1" httparse = "1.8.0" +hyper = { version = "1.3.1", features = ["server", "http1"] } +hyper-util = { version = "0.1.5", features = ["tokio"] } litep2p = "0.5.0" # Do not upgrade multiaddr, see: https://github.com/paritytech/litep2p/pull/91 multiaddr = "0.17.1" +percent-encoding = "2.3.1" prost = "0.12.4" thiserror = "1.0.60" tokio = { version = "1.37.0", features = ["rt-multi-thread"] } diff --git a/README.md b/README.md index 0fc5cdc..ea106a2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,24 @@ # dephy-pproxy dephy-pproxy is a http proxy over litep2p. + +## Usage +Run the server: +```shell +cargo run -- --proxy-addr 127.0.0.1:8000 +``` + +Run the client: +```shell +cargo run -- --server-addr 127.0.0.1:6666 --commander-server-addr 127.0.0.1:7777 --proxy-gateway-addr 127.0.0.1:8080 +``` + +Add server peer to client: +```shell +grpcurl -plaintext -import-path ./proto -proto command_v1.proto -d '{"address": ""}' '127.0.0.1:7777' command.v1.CommandService/AddPeer +``` + +Access server via the gateway address: +```shell +curl -H 'peerid: ' 127.0.0.1:8080 +``` diff --git a/src/gateway.rs b/src/gateway.rs new file mode 100644 index 0000000..7b53dd6 --- /dev/null +++ b/src/gateway.rs @@ -0,0 +1,223 @@ +use std::net::SocketAddr; +use std::str::FromStr; + +use base64::prelude::*; +use bytes::Bytes; +use http_body_util::combinators::BoxBody; +use http_body_util::BodyExt; +use http_body_util::Full; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::Request; +use hyper::Response; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; + +use crate::command::proto::command_service_client::CommandServiceClient; +use crate::command::proto::RequestHttpServerRequest; + +pub async fn proxy_gateway( + addr: SocketAddr, + commander_addr: SocketAddr, +) -> Result<(), Box> { + let listener = TcpListener::bind(addr).await?; + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .preserve_header_case(true) + .title_case_headers(true) + .serve_connection(io, service_fn(move |req| gateway(req, commander_addr))) + .await + { + println!("Failed to serve connection: {:?}", err); + } + }); + } +} + +async fn gateway( + mut req: Request, + commander_addr: SocketAddr, +) -> Result>, hyper::Error> { + let headers = req.headers_mut(); + let Some(Ok(peer_id)) = headers + .remove("peerid") + .map(|x| x.to_str().map(|x| x.to_string())) + else { + return Ok(error_response( + "Invalid peerid in header", + http::StatusCode::BAD_REQUEST, + )); + }; + + let Ok(mut data) = request_header_to_vec(&req) else { + return Ok(error_response( + "Failed to dump headers", + http::StatusCode::INTERNAL_SERVER_ERROR, + )); + }; + + let body = req.into_body().collect().await?; + data.extend(body.to_bytes()); + + let Ok(mut client) = CommandServiceClient::connect(format!("http://{}", commander_addr)).await + else { + return Ok(error_response( + "Failed to connect to pproxy", + http::StatusCode::BAD_GATEWAY, + )); + }; + + let pp_request = RequestHttpServerRequest { peer_id, data }; + let Ok(pp_response) = client + .request_http_server(pp_request) + .await + .map(|r| r.into_inner()) + else { + return Ok(error_response( + "Failed to request pproxy", + http::StatusCode::BAD_GATEWAY, + )); + }; + + let Ok(Some((resp, trailer))) = parse_response_header_easy(&pp_response.data) else { + return Ok(error_response( + "Failed to parse response headers from pproxy", + http::StatusCode::INTERNAL_SERVER_ERROR, + )); + }; + + let (parts, _) = resp.into_parts(); + let t = trailer.to_vec(); + Ok(Response::from_parts(parts, full(t))) +} + +fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} + +fn error_response( + msg: &'static str, + status: http::StatusCode, +) -> Response> { + let mut resp = Response::new(full(msg)); + *resp.status_mut() = status; + resp +} + +fn io_other_error(msg: &'static str) -> std::io::Error { + let e: Box = msg.into(); + std::io::Error::new(std::io::ErrorKind::Other, e) +} + +fn write_request_header( + r: &http::Request, + mut io: impl std::io::Write, +) -> std::io::Result { + let mut len = 0; + let verb = r.method().as_str(); + let path = r + .uri() + .path_and_query() + .ok_or_else(|| io_other_error("Invalid URI"))?; + + let need_to_insert_host = + r.uri().host().is_some() && !r.headers().contains_key(http::header::HOST); + + macro_rules! w { + ($x:expr) => { + io.write_all($x)?; + len += $x.len(); + }; + } + w!(verb.as_bytes()); + w!(b" "); + w!(path.as_str().as_bytes()); + w!(b" HTTP/1.1\r\n"); + + if need_to_insert_host { + w!(b"Host: "); + let host = r.uri().host().unwrap(); + w!(host.as_bytes()); + if let Some(p) = r.uri().port() { + w!(b":"); + w!(p.as_str().as_bytes()); + } + w!(b"\r\n"); + } + + let already_present = r.headers().get(http::header::AUTHORIZATION).is_some(); + let at_sign = r + .uri() + .authority() + .map_or(false, |x| x.as_str().contains('@')); + if !already_present && at_sign { + w!(b"Authorization: Basic "); + let a = r.uri().authority().unwrap().as_str(); + let a = &a[0..(a.find('@').unwrap())]; + let a = a + .as_bytes() + .split(|v| *v == b':') + .map(|v| percent_encoding::percent_decode(v).collect::>()) + .collect::>>() + .join(&b':'); + let a = BASE64_STANDARD.encode(a); + w!(a.as_bytes()); + w!(b"\r\n"); + } + + for (hn, hv) in r.headers() { + w!(hn.as_str().as_bytes()); + w!(b": "); + w!(hv.as_bytes()); + w!(b"\r\n"); + } + + w!(b"\r\n"); + + Ok(len) +} + +fn request_header_to_vec(r: &http::Request) -> std::io::Result> { + let v = Vec::with_capacity(120); + let mut c = std::io::Cursor::new(v); + write_request_header(r, &mut c)?; + Ok(c.into_inner()) +} + +#[allow(clippy::type_complexity)] +fn parse_response_header<'a>( + buf: &'a [u8], + headers_buffer: &mut [httparse::Header<'a>], +) -> Result, &'a [u8])>, Box> { + let mut x = httparse::Response::new(headers_buffer); + let n = match x.parse(buf)? { + httparse::Status::Partial => return Ok(None), + httparse::Status::Complete(size) => size, + }; + let trailer = &buf[n..]; + let mut r = Response::new(()); + *r.status_mut() = http::StatusCode::from_u16(x.code.unwrap())?; + // x.reason goes to nowhere + *r.version_mut() = http::Version::HTTP_11; // FIXME? + for h in x.headers { + let n = http::HeaderName::from_str(h.name)?; + let v = http::HeaderValue::from_bytes(h.value)?; + r.headers_mut().append(n, v); + } + Ok(Some((r, trailer))) +} + +#[allow(clippy::type_complexity)] +fn parse_response_header_easy( + buf: &[u8], +) -> Result, &[u8])>, Box> { + let mut h = [httparse::EMPTY_HEADER; 50]; + parse_response_header(buf, h.as_mut()) +} diff --git a/src/lib.rs b/src/lib.rs index a80b9a9..f6406bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,12 +21,12 @@ use crate::command::proto::AddPeerRequest; use crate::command::proto::AddPeerResponse; use crate::command::proto::RequestHttpServerRequest; use crate::command::proto::RequestHttpServerResponse; -pub use crate::error::Error; use crate::server::*; pub mod command; pub mod error; -pub mod server; +pub mod gateway; +mod server; /// pproxy version pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -34,7 +34,8 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); /// Default channel size. const DEFAULT_CHANNEL_SIZE: usize = 4096; -/// Public result type used by the crate. +/// Public result type error type used by the crate. +pub use crate::error::Error; pub type Result = std::result::Result; type CommandNotification = Result; diff --git a/src/main.rs b/src/main.rs index 5e4be40..2a5c5ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use clap::Arg; use clap::ArgAction; use clap::Command; use dephy_pproxy::command::PProxyCommander; +use dephy_pproxy::gateway::proxy_gateway; use dephy_pproxy::PProxy; use litep2p::crypto::ed25519::SecretKey; use tonic::transport::Server; @@ -40,7 +41,15 @@ fn parse_args() -> Command { .long("proxy-addr") .num_args(1) .action(ArgAction::Set) - .help("Will proxy to this address if set"), + .help("Will reverse proxy this address if set"), + ) + .arg( + Arg::new("PROXY_GATEWAY_ADDR") + .long("proxy-gateway-addr") + .num_args(1) + .default_value("127.0.0.1:10000") + .action(ArgAction::Set) + .help("Set up a local HTTP server that allows users use peerid header to proxy requests to remote peer"), ); app @@ -76,9 +85,15 @@ async fn main() { let proxy_addr = args .get_one::("PROXY_ADDR") .map(|addr| addr.parse().expect("Invalid proxy address")); + let proxy_gateway_addr = args + .get_one::("PROXY_GATEWAY_ADDR") + .unwrap() + .parse() + .expect("Invalid proxy gateway address"); println!("server_addr: {}", server_addr); println!("commander_server_addr: {}", commander_server_addr); + println!("proxy_gateway_addr: {}", proxy_gateway_addr); let (pproxy, pproxy_handle) = PProxy::new(key, server_addr, proxy_addr); @@ -94,6 +109,11 @@ async fn main() { .await .expect("Commander server failed") }, + async move { + proxy_gateway(proxy_gateway_addr, commander_server_addr) + .await + .expect("Gateway server failed") + }, async move { pproxy.run().await } ); }