diff --git a/test-programs/src/bin/client_post_with_body.rs b/test-programs/src/bin/client_post_with_body.rs index ba552fa..2cbb3eb 100644 --- a/test-programs/src/bin/client_post_with_body.rs +++ b/test-programs/src/bin/client_post_with_body.rs @@ -8,14 +8,15 @@ struct Data { } fn main() { + let buffer = [0; 5000]; let resp = Client::new() .post("https://httpbin.org/post") - .body("hello") + .body(buffer) .connect_timeout(Duration::from_secs(5)) .send() .unwrap(); assert_eq!(resp.status_code(), 200); let data = resp.json::().unwrap(); - assert_eq!(data.data, "hello"); + assert_eq!(data.data.len(), 5000); } diff --git a/waki/src/body.rs b/waki/src/body.rs index bc68184..eaf6f7a 100644 --- a/waki/src/body.rs +++ b/waki/src/body.rs @@ -1,5 +1,5 @@ use crate::bindings::wasi::{ - http::types::{IncomingBody, InputStream}, + http::types::{IncomingBody, InputStream, OutgoingBody}, io::streams::StreamError, }; @@ -59,3 +59,30 @@ impl Body { } } } + +pub(crate) fn write_to_outgoing_body(outgoing_body: &OutgoingBody, mut buf: &[u8]) -> Result<()> { + if buf.is_empty() { + return Ok(()); + } + + let out = outgoing_body + .write() + .map_err(|_| anyhow!("outgoing request write failed"))?; + + let pollable = out.subscribe(); + while !buf.is_empty() { + pollable.block(); + + let permit = out.check_write()?; + let len = buf.len().min(permit as usize); + let (chunk, rest) = buf.split_at(len); + buf = rest; + + out.write(chunk)?; + } + + out.flush()?; + pollable.block(); + let _ = out.check_write()?; + Ok(()) +} diff --git a/waki/src/request.rs b/waki/src/request.rs index 255244e..eb57bb6 100644 --- a/waki/src/request.rs +++ b/waki/src/request.rs @@ -3,7 +3,7 @@ use crate::{ outgoing_handler, types::{IncomingRequest, OutgoingBody, OutgoingRequest, RequestOptions}, }, - body::Body, + body::{write_to_outgoing_body, Body}, header::HeaderMap, ErrorCode, Method, Response, }; @@ -241,12 +241,7 @@ impl Request { .body() .map_err(|_| anyhow!("outgoing request write failed"))?; let body = self.body.bytes()?; - if !body.is_empty() { - let request_body = outgoing_body - .write() - .map_err(|_| anyhow!("outgoing request write failed"))?; - request_body.blocking_write_and_flush(&body)?; - } + write_to_outgoing_body(&outgoing_body, body.as_slice())?; OutgoingBody::finish(outgoing_body, None)?; let future_response = outgoing_handler::handle(req, Some(options))?; diff --git a/waki/src/response.rs b/waki/src/response.rs index 0be9546..6f63a81 100644 --- a/waki/src/response.rs +++ b/waki/src/response.rs @@ -2,7 +2,7 @@ use crate::{ bindings::wasi::http::types::{ IncomingResponse, OutgoingBody, OutgoingResponse, ResponseOutparam, }, - body::Body, + body::{write_to_outgoing_body, Body}, header::HeaderMap, ErrorCode, }; @@ -112,14 +112,6 @@ pub fn handle_response(response_out: ResponseOutparam, response: Response) { ResponseOutparam::set(response_out, Ok(outgoing_response)); let body = response.body.bytes().unwrap(); - if !body.is_empty() { - let out = outgoing_body.write().unwrap(); - // `blocking-write-and-flush` writes up to 4096 bytes - let chunks = body.chunks(4096); - for chunk in chunks { - out.blocking_write_and_flush(chunk).unwrap(); - } - } - + write_to_outgoing_body(&outgoing_body, body.as_slice()).unwrap(); OutgoingBody::finish(outgoing_body, None).unwrap(); }