Skip to content

Commit d88d160

Browse files
committed
Add http request api
1 parent 1ab4e9c commit d88d160

File tree

9 files changed

+122
-45
lines changed

9 files changed

+122
-45
lines changed

.pre-commit-config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,18 @@ repos:
1313
- id: trailing-whitespace
1414
- repo: local
1515
hooks:
16+
- id: buf-format
17+
name: protobuf format
18+
description: Format protobuf files with buf.
19+
entry: bash -c 'buf format --write'
20+
language: system
21+
files: \.proto$
22+
- id: buf-lint
23+
name: protobuf lint
24+
description: Lint protobuf files for errors.
25+
entry: bash -c 'buf lint'
26+
language: system
27+
files: \.proto$
1628
- id: taplo
1729
name: taplo format
1830
description: Format Cargo.toml files with taplo.

buf.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
version: v1
2+
lint:
3+
except:
4+
- PACKAGE_DIRECTORY_MATCH

build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
fn main() -> Result<(), Box<dyn std::error::Error>> {
2-
tonic_build::compile_protos("proto/command.proto")?;
2+
tonic_build::compile_protos("proto/command_v1.proto")?;
33
Ok(())
44
}

proto/command.proto

Lines changed: 0 additions & 13 deletions
This file was deleted.

proto/command_v1.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
syntax = "proto3";
2+
package command.v1;
3+
4+
message ConnectNodeRequest {
5+
string address = 1;
6+
}
7+
8+
message ConnectNodeResponse {}
9+
10+
message RequestHttpServerRequest {
11+
string address = 1;
12+
bytes data = 2;
13+
}
14+
15+
message RequestHttpServerResponse {
16+
bytes data = 1;
17+
}
18+
19+
service CommandService {
20+
rpc ConnectNode(ConnectNodeRequest) returns (ConnectNodeResponse);
21+
rpc RequestHttpServer(RequestHttpServerRequest) returns (RequestHttpServerResponse);
22+
}

src/command.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ use tonic::Request;
22
use tonic::Response;
33
use tonic::Status;
44
use tracing::trace;
5-
use tracing::warn;
65

76
use crate::PProxyHandle;
87

98
pub mod proto {
10-
tonic::include_proto!("command");
9+
tonic::include_proto!("command.v1");
1110
}
1211

1312
pub struct PProxyCommander {
@@ -19,24 +18,36 @@ impl PProxyCommander {
1918
Self { handle }
2019
}
2120

22-
pub fn into_tonic_service(self) -> proto::command_server::CommandServer<Self> {
23-
proto::command_server::CommandServer::new(self)
21+
pub fn into_tonic_service(self) -> proto::command_service_server::CommandServiceServer<Self> {
22+
proto::command_service_server::CommandServiceServer::new(self)
2423
}
2524
}
2625

2726
#[tonic::async_trait]
28-
impl proto::command_server::Command for PProxyCommander {
27+
impl proto::command_service_server::CommandService for PProxyCommander {
2928
async fn connect_node(
3029
&self,
3130
request: Request<proto::ConnectNodeRequest>,
32-
) -> Result<Response<proto::ConnectNodeReply>, Status> {
31+
) -> Result<Response<proto::ConnectNodeResponse>, Status> {
3332
trace!("handle request: {:?}", request);
3433

35-
if let Err(e) = self.handle.connect_node(request.into_inner()).await {
36-
warn!("failed to handle request: {:?}", e);
37-
}
34+
self.handle
35+
.connect_node(request.into_inner())
36+
.await
37+
.map(Response::new)
38+
.map_err(|e| tonic::Status::internal(format!("{:?}", e)))
39+
}
40+
41+
async fn request_http_server(
42+
&self,
43+
request: tonic::Request<proto::RequestHttpServerRequest>,
44+
) -> std::result::Result<tonic::Response<proto::RequestHttpServerResponse>, tonic::Status> {
45+
trace!("handle request: {:?}", request);
3846

39-
let reply = proto::ConnectNodeReply {};
40-
Ok(Response::new(reply))
47+
self.handle
48+
.request_http_server(request.into_inner())
49+
.await
50+
.map(Response::new)
51+
.map_err(|e| tonic::Status::internal(format!("{:?}", e)))
4152
}
4253
}

src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,9 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
1313
Error::EssentialTaskClosed
1414
}
1515
}
16+
17+
impl From<tokio::sync::oneshot::error::RecvError> for Error {
18+
fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
19+
Error::EssentialTaskClosed
20+
}
21+
}

src/lib.rs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::net::SocketAddr;
22

3-
use tokio::sync::mpsc::channel;
4-
use tokio::sync::mpsc::Receiver;
5-
use tokio::sync::mpsc::Sender;
3+
use tokio::sync::mpsc;
4+
use tokio::sync::oneshot;
65
use tracing::trace;
76
use tracing::warn;
87

9-
use crate::command::proto::ConnectNodeReply;
108
use crate::command::proto::ConnectNodeRequest;
9+
use crate::command::proto::ConnectNodeResponse;
10+
use crate::command::proto::RequestHttpServerRequest;
11+
use crate::command::proto::RequestHttpServerResponse;
1112
pub use crate::error::Error;
1213
use crate::server::*;
1314

@@ -26,31 +27,35 @@ pub type Result<T> = std::result::Result<T, error::Error>;
2627

2728
#[derive(Debug)]
2829
pub enum PProxyEvent {
29-
ConnectNodeReply(ConnectNodeReply),
30+
ConnectNodeResponse(ConnectNodeResponse),
3031
}
3132

3233
#[derive(Debug)]
3334
pub enum PProxyCommand {
34-
ConnectNodeRequest(ConnectNodeRequest),
35+
ConnectNode(ConnectNodeRequest),
36+
RequestHttpServer(
37+
RequestHttpServerRequest,
38+
oneshot::Sender<RequestHttpServerResponse>,
39+
),
3540
}
3641

3742
pub struct PProxy {
3843
#[allow(dead_code)]
39-
event_tx: Sender<PProxyEvent>,
40-
command_rx: Receiver<PProxyCommand>,
44+
event_tx: mpsc::Sender<PProxyEvent>,
45+
command_rx: mpsc::Receiver<PProxyCommand>,
4146
p2p_server: P2pServer,
4247
}
4348

4449
pub struct PProxyHandle {
4550
#[allow(dead_code)]
46-
event_rx: Receiver<PProxyEvent>,
47-
command_tx: Sender<PProxyCommand>,
51+
event_rx: mpsc::Receiver<PProxyEvent>,
52+
command_tx: mpsc::Sender<PProxyCommand>,
4853
}
4954

5055
impl PProxy {
5156
pub fn new(server_addr: SocketAddr) -> (Self, PProxyHandle) {
52-
let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE);
53-
let (command_tx, command_rx) = channel(DEFAULT_CHANNEL_SIZE);
57+
let (event_tx, event_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
58+
let (command_tx, command_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
5459

5560
(
5661
Self {
@@ -82,11 +87,16 @@ impl PProxy {
8287
command = self.command_rx.recv() => match command {
8388
None => return,
8489
Some(ref command) => match command {
85-
PProxyCommand::ConnectNodeRequest(request) => {
90+
PProxyCommand::ConnectNode(request) => {
8691
if let Err(error) = self.on_connect_node(request).await {
8792
warn!("failed to handle command {:?}: {:?}", command, error);
8893
}
8994
}
95+
PProxyCommand::RequestHttpServer(request, _rx) => {
96+
if let Err(error) = self.on_request_http_server(request).await {
97+
warn!("failed to handle command {:?}: {:?}", command, error);
98+
}
99+
}
90100
}
91101
}
92102
}
@@ -109,13 +119,32 @@ impl PProxy {
109119
.await
110120
.map_err(From::from)
111121
}
122+
123+
async fn on_request_http_server(&mut self, _request: &RequestHttpServerRequest) -> Result<()> {
124+
Ok(())
125+
}
112126
}
113127

114128
impl PProxyHandle {
115-
pub async fn connect_node(&self, request: ConnectNodeRequest) -> Result<()> {
129+
pub async fn connect_node(&self, request: ConnectNodeRequest) -> Result<ConnectNodeResponse> {
116130
self.command_tx
117-
.send(PProxyCommand::ConnectNodeRequest(request))
118-
.await
119-
.map_err(From::from)
131+
.send(PProxyCommand::ConnectNode(request))
132+
.await?;
133+
134+
Ok(ConnectNodeResponse {})
135+
}
136+
137+
pub async fn request_http_server(
138+
&self,
139+
request: RequestHttpServerRequest,
140+
) -> Result<RequestHttpServerResponse> {
141+
let (tx, rx) = oneshot::channel();
142+
143+
self.command_tx
144+
.send(PProxyCommand::RequestHttpServer(request, tx))
145+
.await?;
146+
147+
let response = rx.await?;
148+
Ok(response)
120149
}
121150
}

src/main.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use clap::Arg;
22
use clap::ArgAction;
33
use clap::Command;
4-
use pproxy::command::PProxyCommander;
5-
use pproxy::PProxy;
4+
use dephy_pproxy::command::PProxyCommander;
5+
use dephy_pproxy::PProxy;
66
use tonic::transport::Server;
77

88
fn parse_args() -> Command {
@@ -13,7 +13,6 @@ fn parse_args() -> Command {
1313
app = app
1414
.arg(
1515
Arg::new("SERVER_ADDR")
16-
.short('s')
1716
.long("server-addr")
1817
.num_args(1)
1918
.default_value("127.0.0.1:10010")
@@ -27,6 +26,13 @@ fn parse_args() -> Command {
2726
.default_value("127.0.0.1:10086")
2827
.action(ArgAction::Set)
2928
.help("Commander server address"),
29+
)
30+
.arg(
31+
Arg::new("PROXY_ADDR")
32+
.long("proxy-addr")
33+
.num_args(1)
34+
.action(ArgAction::Set)
35+
.help("Will proxy to this address if set"),
3036
);
3137

3238
app

0 commit comments

Comments
 (0)