From 683517d94b369dca25519fcb05b341ff07fbee81 Mon Sep 17 00:00:00 2001 From: magine Date: Wed, 12 Jun 2024 23:31:38 +0800 Subject: [PATCH] Run a separated gateway sever --- README.md | 12 ++++---- src/gateway.rs | 33 ++++++++++++-------- src/main.rs | 82 ++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 89 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index ea106a2..bcbc879 100644 --- a/README.md +++ b/README.md @@ -3,19 +3,19 @@ dephy-pproxy is a http proxy over litep2p. ## Usage -Run the server: +Run the server pproxy: ```shell -cargo run -- --proxy-addr 127.0.0.1:8000 +cargo run -- serve --server-addr 127.0.0.1:6666 --commander-server-addr 127.0.0.1:7777 --proxy-addr 127.0.0.1:11434 ``` -Run the client: +Run the client pproxy: ```shell -cargo run -- --server-addr 127.0.0.1:6666 --commander-server-addr 127.0.0.1:7777 --proxy-gateway-addr 127.0.0.1:8080 +cargo run -- serve ``` -Add server peer to client: +Run a gateway on client: ```shell -grpcurl -plaintext -import-path ./proto -proto command_v1.proto -d '{"address": ""}' '127.0.0.1:7777' command.v1.CommandService/AddPeer +cargo run -- gateway --proxy-gateway-addr 127.0.0.1:8080 --peer-multiaddr ``` Access server via the gateway address: diff --git a/src/gateway.rs b/src/gateway.rs index 7b53dd6..2247736 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -11,26 +11,43 @@ use hyper::service::service_fn; use hyper::Request; use hyper::Response; use hyper_util::rt::TokioIo; +use multiaddr::Multiaddr; use tokio::net::TcpListener; use crate::command::proto::command_service_client::CommandServiceClient; +use crate::command::proto::AddPeerRequest; use crate::command::proto::RequestHttpServerRequest; pub async fn proxy_gateway( addr: SocketAddr, commander_addr: SocketAddr, + peer_multiaddr: Multiaddr, ) -> Result<(), Box> { let listener = TcpListener::bind(addr).await?; + let mut client = CommandServiceClient::connect(format!("http://{}", commander_addr)).await?; + let pp_response = client + .add_peer(AddPeerRequest { + address: peer_multiaddr.to_string(), + peer_id: None, + }) + .await? + .into_inner(); + loop { let (stream, _) = listener.accept().await?; let io = TokioIo::new(stream); + let peer_id = pp_response.peer_id.clone(); tokio::task::spawn(async move { + let peer_id = peer_id.clone(); if let Err(err) = http1::Builder::new() .preserve_header_case(true) .title_case_headers(true) - .serve_connection(io, service_fn(move |req| gateway(req, commander_addr))) + .serve_connection( + io, + service_fn(move |req| gateway(req, commander_addr, peer_id.clone())), + ) .await { println!("Failed to serve connection: {:?}", err); @@ -40,20 +57,10 @@ pub async fn proxy_gateway( } async fn gateway( - mut req: Request, + req: Request, commander_addr: SocketAddr, + peer_id: String, ) -> Result>, hyper::Error> { - let headers = req.headers_mut(); - let Some(Ok(peer_id)) = headers - .remove("peerid") - .map(|x| x.to_str().map(|x| x.to_string())) - else { - return Ok(error_response( - "Invalid peerid in header", - http::StatusCode::BAD_REQUEST, - )); - }; - let Ok(mut data) = request_header_to_vec(&req) else { return Ok(error_response( "Failed to dump headers", diff --git a/src/main.rs b/src/main.rs index 2a5c5ad..fc84b6b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use clap::Arg; use clap::ArgAction; +use clap::ArgMatches; use clap::Command; use dephy_pproxy::command::PProxyCommander; use dephy_pproxy::gateway::proxy_gateway; @@ -12,7 +13,8 @@ fn parse_args() -> Command { .about("A proxy tool based on p2p network") .version("0.1.0"); - app = app + let serve = Command::new("serve") + .about("Start a pproxy server") .arg( Arg::new("KEY") .long("key") @@ -42,6 +44,17 @@ fn parse_args() -> Command { .num_args(1) .action(ArgAction::Set) .help("Will reverse proxy this address if set"), + ); + + let gateway = Command::new("gateway") + .about("Set up a local server that allows users proxy data to remote peer") + .arg( + Arg::new("COMMANDER_SERVER_ADDR") + .long("commander-server-addr") + .num_args(1) + .default_value("127.0.0.1:10086") + .action(ArgAction::Set) + .help("Commander server address"), ) .arg( Arg::new("PROXY_GATEWAY_ADDR") @@ -50,18 +63,21 @@ fn parse_args() -> Command { .default_value("127.0.0.1:10000") .action(ArgAction::Set) .help("Set up a local HTTP server that allows users use peerid header to proxy requests to remote peer"), + ) + .arg( + Arg::new("PEER_MULTIADDR") + .long("peer-multiaddr") + .num_args(1) + .action(ArgAction::Set) + .required(true) + .help("The multiaddr of the remote peer"), ); + app = app.subcommand(serve).subcommand(gateway); app } -#[tokio::main] -async fn main() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let args = parse_args().get_matches(); +async fn serve(args: &ArgMatches) { let key = args .get_one::("KEY") .map(|key| { @@ -85,15 +101,9 @@ async fn main() { let proxy_addr = args .get_one::("PROXY_ADDR") .map(|addr| addr.parse().expect("Invalid proxy address")); - let proxy_gateway_addr = args - .get_one::("PROXY_GATEWAY_ADDR") - .unwrap() - .parse() - .expect("Invalid proxy gateway address"); println!("server_addr: {}", server_addr); println!("commander_server_addr: {}", commander_server_addr); - println!("proxy_gateway_addr: {}", proxy_gateway_addr); let (pproxy, pproxy_handle) = PProxy::new(key, server_addr, proxy_addr); @@ -109,11 +119,45 @@ async fn main() { .await .expect("Commander server failed") }, - async move { - proxy_gateway(proxy_gateway_addr, commander_server_addr) - .await - .expect("Gateway server failed") - }, async move { pproxy.run().await } ); } + +async fn gateway(args: &ArgMatches) { + let commander_server_addr = args + .get_one::("COMMANDER_SERVER_ADDR") + .unwrap() + .parse() + .expect("Invalid command server address"); + let proxy_gateway_addr = args + .get_one::("PROXY_GATEWAY_ADDR") + .unwrap() + .parse() + .expect("Invalid proxy gateway address"); + let peer_multiaddr = args + .get_one::("PEER_MULTIADDR") + .unwrap() + .parse() + .expect("Missing peer multiaddr"); + println!("proxy_gateway_addr: {}", proxy_gateway_addr); + proxy_gateway(proxy_gateway_addr, commander_server_addr, peer_multiaddr) + .await + .expect("Gateway server failed") +} + +#[tokio::main] +async fn main() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let args = parse_args().get_matches(); + + if let Some(args) = args.subcommand_matches("serve") { + serve(args).await; + } + + if let Some(args) = args.subcommand_matches("gateway") { + gateway(args).await; + } +}