Skip to content

Commit

Permalink
Merge pull request #33 from wacker-dev/write_body
Browse files Browse the repository at this point in the history
Fix the issue when the request/response body exceeds 4096 bytes
  • Loading branch information
iawia002 authored Nov 20, 2024
2 parents 4bc997d + a439808 commit e86297c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
5 changes: 3 additions & 2 deletions test-programs/src/bin/client_post_with_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ struct Data {
}

fn main() {
let buffer = [0; 5000];
let resp = Client::new()
.post("https://httpbin.org/post")
.body("hello")
.body(buffer)
.connect_timeout(Duration::from_secs(5))
.send()
.unwrap();
assert_eq!(resp.status_code(), 200);

let data = resp.json::<Data>().unwrap();
assert_eq!(data.data, "hello");
assert_eq!(data.data.len(), 5000);
}
29 changes: 28 additions & 1 deletion waki/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::bindings::wasi::{
http::types::{IncomingBody, InputStream},
http::types::{IncomingBody, InputStream, OutgoingBody},
io::streams::StreamError,
};

Expand Down Expand Up @@ -59,3 +59,30 @@ impl Body {
}
}
}

pub(crate) fn write_to_outgoing_body(outgoing_body: &OutgoingBody, mut buf: &[u8]) -> Result<()> {
if buf.is_empty() {
return Ok(());
}

let out = outgoing_body
.write()
.map_err(|_| anyhow!("outgoing request write failed"))?;

let pollable = out.subscribe();
while !buf.is_empty() {
pollable.block();

let permit = out.check_write()?;
let len = buf.len().min(permit as usize);
let (chunk, rest) = buf.split_at(len);
buf = rest;

out.write(chunk)?;
}

out.flush()?;
pollable.block();
let _ = out.check_write()?;
Ok(())
}
9 changes: 2 additions & 7 deletions waki/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
outgoing_handler,
types::{IncomingRequest, OutgoingBody, OutgoingRequest, RequestOptions},
},
body::Body,
body::{write_to_outgoing_body, Body},
header::HeaderMap,
ErrorCode, Method, Response,
};
Expand Down Expand Up @@ -241,12 +241,7 @@ impl Request {
.body()
.map_err(|_| anyhow!("outgoing request write failed"))?;
let body = self.body.bytes()?;
if !body.is_empty() {
let request_body = outgoing_body
.write()
.map_err(|_| anyhow!("outgoing request write failed"))?;
request_body.blocking_write_and_flush(&body)?;
}
write_to_outgoing_body(&outgoing_body, body.as_slice())?;
OutgoingBody::finish(outgoing_body, None)?;

let future_response = outgoing_handler::handle(req, Some(options))?;
Expand Down
12 changes: 2 additions & 10 deletions waki/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
bindings::wasi::http::types::{
IncomingResponse, OutgoingBody, OutgoingResponse, ResponseOutparam,
},
body::Body,
body::{write_to_outgoing_body, Body},
header::HeaderMap,
ErrorCode,
};
Expand Down Expand Up @@ -112,14 +112,6 @@ pub fn handle_response(response_out: ResponseOutparam, response: Response) {
ResponseOutparam::set(response_out, Ok(outgoing_response));

let body = response.body.bytes().unwrap();
if !body.is_empty() {
let out = outgoing_body.write().unwrap();
// `blocking-write-and-flush` writes up to 4096 bytes
let chunks = body.chunks(4096);
for chunk in chunks {
out.blocking_write_and_flush(chunk).unwrap();
}
}

write_to_outgoing_body(&outgoing_body, body.as_slice()).unwrap();
OutgoingBody::finish(outgoing_body, None).unwrap();
}

0 comments on commit e86297c

Please sign in to comment.