Skip to content

Commit 5171eb4

Browse files
committed
Handle chunked http response
1 parent 67305be commit 5171eb4

File tree

4 files changed

+80
-10
lines changed

4 files changed

+80
-10
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ base64 = "0.22.1"
88
bytes = "1.6.0"
99
clap = "4.5.4"
1010
futures = "0.3.30"
11+
futures-util = "0.3.30"
1112
hex = "0.4.3"
1213
http = "1.1.0"
14+
http-body = "1.0.0"
1315
http-body-util = "0.1.1"
1416
httparse = "1.8.0"
1517
hyper = { version = "1.3.1", features = ["server", "http1"] }
1618
hyper-util = { version = "0.1.5", features = ["tokio"] }
17-
litep2p = "0.6.0"
19+
litep2p = { git = "https://github.com/Ma233/litep2p.git", branch = "keepalive" }
1820
# Do not upgrade multiaddr, see: https://github.com/paritytech/litep2p/pull/91
1921
multiaddr = "0.17.1"
2022
percent-encoding = "2.3.1"

src/gateway.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
use std::convert::Infallible;
12
use std::net::SocketAddr;
23
use std::str::FromStr;
34

45
use base64::prelude::*;
56
use bytes::Bytes;
7+
use http_body::Frame;
68
use http_body_util::combinators::BoxBody;
79
use http_body_util::BodyExt;
810
use http_body_util::Full;
11+
use http_body_util::StreamBody;
912
use hyper::server::conn::http1;
1013
use hyper::service::service_fn;
1114
use hyper::Request;
@@ -99,6 +102,16 @@ async fn gateway(
99102
};
100103

101104
let (parts, _) = resp.into_parts();
105+
106+
// Handle chunked encoding
107+
if let Some(true) = parts.headers.get(http::header::TRANSFER_ENCODING).map(|v| {
108+
let Ok(v) = v.to_str() else { return false };
109+
v.to_lowercase().contains("chunked")
110+
}) {
111+
let chunks = split_chunked_body(trailer);
112+
return Ok(Response::from_parts(parts, stream_body(chunks)));
113+
}
114+
102115
let t = trailer.to_vec();
103116
Ok(Response::from_parts(parts, full(t)))
104117
}
@@ -109,6 +122,12 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
109122
.boxed()
110123
}
111124

125+
fn stream_body(chunks: Vec<Result<Frame<Bytes>, Infallible>>) -> BoxBody<Bytes, hyper::Error> {
126+
StreamBody::new(futures_util::stream::iter(chunks))
127+
.map_err(|never| match never {})
128+
.boxed()
129+
}
130+
112131
fn error_response(
113132
msg: &'static str,
114133
status: http::StatusCode,
@@ -228,3 +247,33 @@ fn parse_response_header_easy(
228247
let mut h = [httparse::EMPTY_HEADER; 50];
229248
parse_response_header(buf, h.as_mut())
230249
}
250+
251+
fn split_chunked_body(input: &[u8]) -> Vec<Result<Frame<Bytes>, Infallible>> {
252+
use std::io::BufRead;
253+
use std::io::BufReader;
254+
use std::io::Cursor;
255+
use std::io::Read;
256+
257+
let mut reader = BufReader::new(Cursor::new(input));
258+
let mut chunks = Vec::new();
259+
260+
loop {
261+
let mut chunk_size = String::new();
262+
let read = reader.read_line(&mut chunk_size).unwrap();
263+
if read == 0 {
264+
break;
265+
}
266+
267+
let chunk_size = usize::from_str_radix(chunk_size.trim_end(), 16).unwrap();
268+
269+
let mut chunk = vec![0; chunk_size];
270+
reader.read_exact(&mut chunk).unwrap();
271+
272+
let mut crlf = [0; 2];
273+
reader.read_exact(&mut crlf).unwrap();
274+
275+
chunks.push(Ok(Frame::data(Bytes::from(chunk))));
276+
}
277+
278+
chunks
279+
}

src/lib.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub struct PProxyHandle {
6666
pub enum FullLength {
6767
NotParsed,
6868
NotSet,
69+
Chunked,
6970
Parsed(usize),
7071
}
7172

@@ -74,8 +75,8 @@ impl FullLength {
7475
matches!(self, FullLength::NotParsed)
7576
}
7677

77-
pub fn not_set(&self) -> bool {
78-
matches!(self, FullLength::NotSet)
78+
pub fn chunked(&self) -> bool {
79+
matches!(self, FullLength::Chunked)
7980
}
8081
}
8182

@@ -139,7 +140,7 @@ impl PProxy {
139140
return Err(Error::IncompleteHttpRequest);
140141
}
141142

142-
let mut stream = tcp_connect_with_timeout(&proxy_addr, 1).await?;
143+
let mut stream = tcp_connect_with_timeout(&proxy_addr, 2).await?;
143144
stream.write_all(request).await?;
144145

145146
let mut response = Vec::new();
@@ -149,7 +150,7 @@ impl PProxy {
149150
let mut buf = [0u8; 30000];
150151

151152
let Ok(Ok(n)) =
152-
timeout(std::time::Duration::from_secs(5), stream.read(&mut buf)).await
153+
timeout(std::time::Duration::from_secs(2), stream.read(&mut buf)).await
153154
else {
154155
break;
155156
};
@@ -175,12 +176,25 @@ impl PProxy {
175176
value.parse::<usize>().ok()
176177
});
177178

178-
match content_length {
179-
Some(content_length) => {
179+
let transfor_encoding = resp_checker.headers.iter().find_map(|h| {
180+
if h.name.to_lowercase() != "transfer-encoding" {
181+
return None;
182+
}
183+
let Ok(value) = std::str::from_utf8(h.value) else {
184+
return None;
185+
};
186+
Some(value)
187+
});
188+
189+
match (content_length, transfor_encoding) {
190+
(Some(content_length), _) => {
180191
let header_length = res.unwrap();
181192
full_length = FullLength::Parsed(header_length + content_length)
182193
}
183-
None => {
194+
(None, Some(value)) if value.to_lowercase().contains("chunked") => {
195+
full_length = FullLength::Chunked;
196+
}
197+
_ => {
184198
full_length = FullLength::NotSet;
185199
}
186200
}
@@ -192,6 +206,10 @@ impl PProxy {
192206
break;
193207
}
194208
}
209+
210+
if full_length.chunked() && response.ends_with(b"0\r\n\r\n") {
211+
break;
212+
}
195213
}
196214

197215
if response.is_empty() {

0 commit comments

Comments
 (0)