Skip to content

Commit d3704cc

Browse files
committed
Do http request and response
1 parent 1f824cf commit d3704cc

File tree

5 files changed

+81
-7
lines changed

5 files changed

+81
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2021"
66
[dependencies]
77
clap = "4.5.4"
88
futures = "0.3.30"
9+
httparse = "1.8.0"
910
litep2p = "0.5.0"
1011
# Do not upgrade multiaddr, see: https://github.com/paritytech/litep2p/pull/91
1112
multiaddr = "0.17.1"

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ pub enum Error {
1212
Litep2p(#[from] litep2p::Error),
1313
#[error("Litep2p request response error: {0:?}")]
1414
Litep2pRequestResponseError(litep2p::protocol::request_response::RequestResponseError),
15+
#[error("Httparse error: {0}")]
16+
Httparse(#[from] httparse::Error),
17+
#[error("Incomplete http request")]
18+
IncompleteHttpRequest,
19+
#[error("Protocol not support")]
20+
ProtocolNotSupport,
21+
#[error("Io error: {0}")]
22+
Io(#[from] std::io::Error),
1523
#[error("Unexpected response type")]
1624
UnexpectedResponseType,
1725
}

src/lib.rs

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
use std::collections::HashMap;
22
use std::net::SocketAddr;
3+
use std::time::Duration;
34

45
use litep2p::protocol::request_response::DialOptions;
56
use litep2p::protocol::request_response::RequestResponseEvent;
67
use litep2p::types::RequestId;
78
use litep2p::PeerId;
89
use multiaddr::Multiaddr;
910
use multiaddr::Protocol;
11+
use tokio::io::AsyncReadExt;
12+
use tokio::io::AsyncWriteExt;
13+
use tokio::net::TcpStream;
1014
use tokio::sync::mpsc;
1115
use tokio::sync::oneshot;
16+
use tokio::time::timeout;
1217
use tracing::warn;
1318

1419
use crate::command::proto::AddPeerRequest;
@@ -49,21 +54,23 @@ pub struct PProxy {
4954
command_rx: mpsc::Receiver<(PProxyCommand, CommandNotifier)>,
5055
tunnel_notifier: HashMap<RequestId, CommandNotifier>,
5156
p2p_server: P2pServer,
57+
proxy_addr: Option<SocketAddr>,
5258
}
5359

5460
pub struct PProxyHandle {
5561
command_tx: mpsc::Sender<(PProxyCommand, CommandNotifier)>,
5662
}
5763

5864
impl PProxy {
59-
pub fn new(server_addr: SocketAddr) -> (Self, PProxyHandle) {
65+
pub fn new(server_addr: SocketAddr, proxy_addr: Option<SocketAddr>) -> (Self, PProxyHandle) {
6066
let (command_tx, command_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
6167

6268
(
6369
Self {
6470
command_rx,
6571
tunnel_notifier: HashMap::new(),
6672
p2p_server: P2pServer::new(server_addr),
73+
proxy_addr,
6774
},
6875
PProxyHandle { command_tx },
6976
)
@@ -97,11 +104,42 @@ impl PProxy {
97104
match event {
98105
P2pServerEvent::TunnelEvent(RequestResponseEvent::RequestReceived {
99106
request_id,
107+
request,
100108
..
101-
}) => self
102-
.p2p_server
103-
.tunnel_handle
104-
.send_response(*request_id, vec![1, 2, 3, 4]),
109+
}) => {
110+
let Some(proxy_addr) = self.proxy_addr else {
111+
return Err(Error::ProtocolNotSupport);
112+
};
113+
114+
let mut headers = [httparse::EMPTY_HEADER; 64];
115+
let mut req = httparse::Request::new(&mut headers);
116+
if req.parse(request)?.is_partial() {
117+
return Err(Error::IncompleteHttpRequest);
118+
}
119+
120+
let mut stream = tcp_connect_with_timeout(&proxy_addr, 5).await?;
121+
stream.write_all(request).await?;
122+
123+
let mut response = Vec::new();
124+
loop {
125+
let mut buf = [0u8; 30000];
126+
match stream.read(&mut buf).await {
127+
Err(_) => break,
128+
Ok(0) => break,
129+
Ok(n) => {
130+
response.extend_from_slice(&buf[..n]);
131+
}
132+
}
133+
}
134+
135+
if response.is_empty() {
136+
response = b"HTTP/1.1 500 Internal Server Error\r\n\r\n".to_vec();
137+
}
138+
139+
self.p2p_server
140+
.tunnel_handle
141+
.send_response(*request_id, response);
142+
}
105143

106144
P2pServerEvent::TunnelEvent(RequestResponseEvent::RequestFailed {
107145
request_id,
@@ -212,6 +250,12 @@ impl PProxyHandle {
212250
&self,
213251
request: RequestHttpServerRequest,
214252
) -> Result<RequestHttpServerResponse> {
253+
let mut headers = [httparse::EMPTY_HEADER; 64];
254+
let mut req = httparse::Request::new(&mut headers);
255+
if req.parse(&request.data)?.is_partial() {
256+
return Err(Error::IncompleteHttpRequest);
257+
}
258+
215259
let (tx, rx) = oneshot::channel();
216260

217261
self.command_tx
@@ -239,3 +283,20 @@ fn extract_peer_id_from_multiaddr(multiaddr: &Multiaddr) -> Result<PeerId> {
239283
PeerId::from_multihash(multihash)
240284
.map_err(|_| Error::FailedToExtractPeerIdFromMultiaddr(multiaddr.to_string()))
241285
}
286+
287+
pub async fn tcp_connect_with_timeout(
288+
addr: &SocketAddr,
289+
request_timeout_s: u64,
290+
) -> Result<TcpStream> {
291+
match timeout(
292+
Duration::from_secs(request_timeout_s),
293+
TcpStream::connect(addr),
294+
)
295+
.await
296+
{
297+
Ok(result) => result.map_err(Error::Io),
298+
Err(_) => Err(Error::Io(std::io::Error::from(
299+
std::io::ErrorKind::TimedOut,
300+
))),
301+
}
302+
}

src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,15 @@ async fn main() {
5454
.get_one::<String>("COMMANDER_SERVER_ADDR")
5555
.unwrap()
5656
.parse()
57-
.expect("Invalid server address");
57+
.expect("Invalid command server address");
58+
let proxy_addr = args
59+
.get_one::<String>("PROXY_ADDR")
60+
.map(|addr| addr.parse().expect("Invalid proxy address"));
5861

5962
println!("server_addr: {:?}", server_addr);
6063
println!("commander_server_addr: {:?}", commander_server_addr);
6164

62-
let (pproxy, pproxy_handle) = PProxy::new(server_addr);
65+
let (pproxy, pproxy_handle) = PProxy::new(server_addr, proxy_addr);
6366

6467
let commander = PProxyCommander::new(pproxy_handle);
6568
let commander_server = Server::builder()

0 commit comments

Comments
 (0)