Skip to content

Commit

Permalink
Run a separated gateway sever
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Jun 13, 2024
1 parent 49942f4 commit 683517d
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 38 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
dephy-pproxy is a http proxy over litep2p.

## Usage
Run the server:
Run the server pproxy:
```shell
cargo run -- --proxy-addr 127.0.0.1:8000
cargo run -- serve --server-addr 127.0.0.1:6666 --commander-server-addr 127.0.0.1:7777 --proxy-addr 127.0.0.1:11434
```

Run the client:
Run the client pproxy:
```shell
cargo run -- --server-addr 127.0.0.1:6666 --commander-server-addr 127.0.0.1:7777 --proxy-gateway-addr 127.0.0.1:8080
cargo run -- serve
```

Add server peer to client:
Run a gateway on client:
```shell
grpcurl -plaintext -import-path ./proto -proto command_v1.proto -d '{"address": "<The litep2p multiaddr>"}' '127.0.0.1:7777' command.v1.CommandService/AddPeer
cargo run -- gateway --proxy-gateway-addr 127.0.0.1:8080 --peer-multiaddr <The litep2p multiaddr>
```

Access server via the gateway address:
Expand Down
33 changes: 20 additions & 13 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,43 @@ use hyper::service::service_fn;
use hyper::Request;
use hyper::Response;
use hyper_util::rt::TokioIo;
use multiaddr::Multiaddr;
use tokio::net::TcpListener;

use crate::command::proto::command_service_client::CommandServiceClient;
use crate::command::proto::AddPeerRequest;
use crate::command::proto::RequestHttpServerRequest;

pub async fn proxy_gateway(
addr: SocketAddr,
commander_addr: SocketAddr,
peer_multiaddr: Multiaddr,
) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(addr).await?;

let mut client = CommandServiceClient::connect(format!("http://{}", commander_addr)).await?;
let pp_response = client
.add_peer(AddPeerRequest {
address: peer_multiaddr.to_string(),
peer_id: None,
})
.await?
.into_inner();

loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let peer_id = pp_response.peer_id.clone();

tokio::task::spawn(async move {
let peer_id = peer_id.clone();
if let Err(err) = http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(io, service_fn(move |req| gateway(req, commander_addr)))
.serve_connection(
io,
service_fn(move |req| gateway(req, commander_addr, peer_id.clone())),
)
.await
{
println!("Failed to serve connection: {:?}", err);
Expand All @@ -40,20 +57,10 @@ pub async fn proxy_gateway(
}

async fn gateway(
mut req: Request<hyper::body::Incoming>,
req: Request<hyper::body::Incoming>,
commander_addr: SocketAddr,
peer_id: String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let headers = req.headers_mut();
let Some(Ok(peer_id)) = headers
.remove("peerid")
.map(|x| x.to_str().map(|x| x.to_string()))
else {
return Ok(error_response(
"Invalid peerid in header",
http::StatusCode::BAD_REQUEST,
));
};

let Ok(mut data) = request_header_to_vec(&req) else {
return Ok(error_response(
"Failed to dump headers",
Expand Down
82 changes: 63 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::Arg;
use clap::ArgAction;
use clap::ArgMatches;
use clap::Command;
use dephy_pproxy::command::PProxyCommander;
use dephy_pproxy::gateway::proxy_gateway;
Expand All @@ -12,7 +13,8 @@ fn parse_args() -> Command {
.about("A proxy tool based on p2p network")
.version("0.1.0");

app = app
let serve = Command::new("serve")
.about("Start a pproxy server")
.arg(
Arg::new("KEY")
.long("key")
Expand Down Expand Up @@ -42,6 +44,17 @@ fn parse_args() -> Command {
.num_args(1)
.action(ArgAction::Set)
.help("Will reverse proxy this address if set"),
);

let gateway = Command::new("gateway")
.about("Set up a local server that allows users proxy data to remote peer")
.arg(
Arg::new("COMMANDER_SERVER_ADDR")
.long("commander-server-addr")
.num_args(1)
.default_value("127.0.0.1:10086")
.action(ArgAction::Set)
.help("Commander server address"),
)
.arg(
Arg::new("PROXY_GATEWAY_ADDR")
Expand All @@ -50,18 +63,21 @@ fn parse_args() -> Command {
.default_value("127.0.0.1:10000")
.action(ArgAction::Set)
.help("Set up a local HTTP server that allows users use peerid header to proxy requests to remote peer"),
)
.arg(
Arg::new("PEER_MULTIADDR")
.long("peer-multiaddr")
.num_args(1)
.action(ArgAction::Set)
.required(true)
.help("The multiaddr of the remote peer"),
);

app = app.subcommand(serve).subcommand(gateway);
app
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let args = parse_args().get_matches();
async fn serve(args: &ArgMatches) {
let key = args
.get_one::<String>("KEY")
.map(|key| {
Expand All @@ -85,15 +101,9 @@ async fn main() {
let proxy_addr = args
.get_one::<String>("PROXY_ADDR")
.map(|addr| addr.parse().expect("Invalid proxy address"));
let proxy_gateway_addr = args
.get_one::<String>("PROXY_GATEWAY_ADDR")
.unwrap()
.parse()
.expect("Invalid proxy gateway address");

println!("server_addr: {}", server_addr);
println!("commander_server_addr: {}", commander_server_addr);
println!("proxy_gateway_addr: {}", proxy_gateway_addr);

let (pproxy, pproxy_handle) = PProxy::new(key, server_addr, proxy_addr);

Expand All @@ -109,11 +119,45 @@ async fn main() {
.await
.expect("Commander server failed")
},
async move {
proxy_gateway(proxy_gateway_addr, commander_server_addr)
.await
.expect("Gateway server failed")
},
async move { pproxy.run().await }
);
}

async fn gateway(args: &ArgMatches) {
let commander_server_addr = args
.get_one::<String>("COMMANDER_SERVER_ADDR")
.unwrap()
.parse()
.expect("Invalid command server address");
let proxy_gateway_addr = args
.get_one::<String>("PROXY_GATEWAY_ADDR")
.unwrap()
.parse()
.expect("Invalid proxy gateway address");
let peer_multiaddr = args
.get_one::<String>("PEER_MULTIADDR")
.unwrap()
.parse()
.expect("Missing peer multiaddr");
println!("proxy_gateway_addr: {}", proxy_gateway_addr);
proxy_gateway(proxy_gateway_addr, commander_server_addr, peer_multiaddr)
.await
.expect("Gateway server failed")
}

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let args = parse_args().get_matches();

if let Some(args) = args.subcommand_matches("serve") {
serve(args).await;
}

if let Some(args) = args.subcommand_matches("gateway") {
gateway(args).await;
}
}

0 comments on commit 683517d

Please sign in to comment.