From dba91740c793d67cfa02dfee69460ccfabe8b230 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 22 Mar 2024 20:32:50 +0100 Subject: [PATCH 1/6] progress --- h3-quinn/src/lib.rs | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 573fa823..b21e6e5f 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -150,7 +150,7 @@ impl From for SendDatagramError { impl quic::Connection for Connection where - B: Buf, + B: Buf + Send, { type SendStream = SendStream; type RecvStream = RecvStream; @@ -277,7 +277,7 @@ pub struct OpenStreams { impl quic::OpenStreams for OpenStreams where - B: Buf, + B: Buf + Send, { type RecvStream = RecvStream; type SendStream = SendStream; @@ -348,7 +348,7 @@ where impl quic::BidiStream for BidiStream where - B: Buf, + B: Buf + Send, { type SendStream = SendStream; type RecvStream = RecvStream; @@ -380,7 +380,7 @@ impl quic::RecvStream for BidiStream { impl quic::SendStream for BidiStream where - B: Buf, + B: Buf + Send, { type Error = SendStreamError; @@ -406,7 +406,7 @@ where } impl quic::SendStreamUnframed for BidiStream where - B: Buf, + B: Buf + Send, { fn poll_send( &mut self, @@ -543,11 +543,11 @@ impl Error for ReadError { pub struct SendStream { stream: Option, writing: Option>, - write_fut: WriteFuture, + write_fut: WriteFuture, } -type WriteFuture = - ReusableBoxFuture<'static, (quinn::SendStream, Result)>; +type WriteFuture = + ReusableBoxFuture<'static, (quinn::SendStream, Result, WriteBuf)>; impl SendStream where @@ -564,29 +564,31 @@ where impl quic::SendStream for SendStream where - B: Buf, + B: Buf + Send, { type Error = SendStreamError; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - if let Some(ref mut data) = self.writing { + if let Some(mut data) = self.writing.take() { while data.has_remaining() { if let Some(mut stream) = self.stream.take() { - let chunk = data.chunk().to_owned(); // FIXME - avoid copy + let chunk = data.chunk();//.to_owned(); // FIXME - avoid copy self.write_fut.set(async move { let ret = stream.write(&chunk).await; - (stream, ret) + (stream, ret, data) }); } - let (stream, res) = ready!(self.write_fut.poll(cx)); + let (stream, res, mut data2) = ready!(self.write_fut.poll(cx)); self.stream = Some(stream); - match res { - Ok(cnt) => data.advance(cnt), + let x = match res { + Ok(cnt) => data2.advance(cnt), Err(err) => { return Poll::Ready(Err(SendStreamError::Write(err))); } - } + }; + self.writing = Some(data2); + x } } self.writing = None; @@ -630,7 +632,7 @@ where impl quic::SendStreamUnframed for SendStream where - B: Buf, + B: Buf + Send, { fn poll_send( &mut self, From 9c25ecc9582f4ab7d6af3d06e50623191431ea96 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 22 Mar 2024 20:58:48 +0100 Subject: [PATCH 2/6] do not copy the buffer --- h3-quinn/src/lib.rs | 53 +++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index b21e6e5f..062f9c64 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -150,7 +150,7 @@ impl From for SendDatagramError { impl quic::Connection for Connection where - B: Buf + Send, + B: Buf + Send + 'static, { type SendStream = SendStream; type RecvStream = RecvStream; @@ -277,7 +277,7 @@ pub struct OpenStreams { impl quic::OpenStreams for OpenStreams where - B: Buf + Send, + B: Buf + Send + 'static, { type RecvStream = RecvStream; type SendStream = SendStream; @@ -348,7 +348,7 @@ where impl quic::BidiStream for BidiStream where - B: Buf + Send, + B: Buf + Send + 'static, { type SendStream = SendStream; type RecvStream = RecvStream; @@ -380,7 +380,7 @@ impl quic::RecvStream for BidiStream { impl quic::SendStream for BidiStream where - B: Buf + Send, + B: Buf + Send + 'static, { type Error = SendStreamError; @@ -406,7 +406,7 @@ where } impl quic::SendStreamUnframed for BidiStream where - B: Buf + Send, + B: Buf + Send + 'static, { fn poll_send( &mut self, @@ -546,8 +546,14 @@ pub struct SendStream { write_fut: WriteFuture, } -type WriteFuture = - ReusableBoxFuture<'static, (quinn::SendStream, Result, WriteBuf)>; +type WriteFuture = ReusableBoxFuture< + 'static, + ( + quinn::SendStream, + Result, + WriteBuf, + ), +>; impl SendStream where @@ -564,31 +570,36 @@ where impl quic::SendStream for SendStream where - B: Buf + Send, + B: Buf + Send + 'static, { type Error = SendStreamError; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - if let Some(mut data) = self.writing.take() { - while data.has_remaining() { + loop { + if let Some(data) = self.writing.take() { if let Some(mut stream) = self.stream.take() { - let chunk = data.chunk();//.to_owned(); // FIXME - avoid copy self.write_fut.set(async move { + let chunk = data.chunk(); let ret = stream.write(&chunk).await; (stream, ret, data) }); } - let (stream, res, mut data2) = ready!(self.write_fut.poll(cx)); - self.stream = Some(stream); - let x = match res { - Ok(cnt) => data2.advance(cnt), - Err(err) => { - return Poll::Ready(Err(SendStreamError::Write(err))); - } - }; + } + let (stream, res, mut data2) = ready!(self.write_fut.poll(cx)); + self.stream = Some(stream); + match res { + Ok(cnt) => data2.advance(cnt), + Err(err) => { + return Poll::Ready(Err(SendStreamError::Write(err))); + } + }; + if !data2.has_remaining() { + self.writing = Some(data2); + break; + } else { self.writing = Some(data2); - x + continue; } } self.writing = None; @@ -632,7 +643,7 @@ where impl quic::SendStreamUnframed for SendStream where - B: Buf + Send, + B: Buf + Send + 'static, { fn poll_send( &mut self, From b1322d849c5181ac3f3c144bb68d6015fea365fb Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 22 Mar 2024 20:59:36 +0100 Subject: [PATCH 3/6] fmt --- h3-quinn/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 062f9c64..886e969b 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -584,7 +584,6 @@ where (stream, ret, data) }); } - } let (stream, res, mut data2) = ready!(self.write_fut.poll(cx)); self.stream = Some(stream); From 360aec0d93d26eb7c99cae82558a5dc4b63941da Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 22 Mar 2024 21:12:14 +0100 Subject: [PATCH 4/6] fix --- h3-quinn/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 886e969b..a1f3cee6 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -590,11 +590,11 @@ where match res { Ok(cnt) => data2.advance(cnt), Err(err) => { + self.writing = Some(data2); return Poll::Ready(Err(SendStreamError::Write(err))); } }; if !data2.has_remaining() { - self.writing = Some(data2); break; } else { self.writing = Some(data2); @@ -622,7 +622,8 @@ where } fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { - if self.writing.is_some() { + // writing is moved into the write_fut, so stream should be present + if self.writing.is_some() || self.stream.is_none() { return Err(Self::Error::NotReady); } self.writing = Some(data.into()); From a97fa995835083e03452b38cf802460805cdef0f Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 22 Mar 2024 21:26:32 +0100 Subject: [PATCH 5/6] remove unnecessary variable --- h3-quinn/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index a1f3cee6..c534eba0 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -579,8 +579,7 @@ where if let Some(data) = self.writing.take() { if let Some(mut stream) = self.stream.take() { self.write_fut.set(async move { - let chunk = data.chunk(); - let ret = stream.write(&chunk).await; + let ret = stream.write(data.chunk()).await; (stream, ret, data) }); } From 8ef41bb2f4aaad583dc844444448c842c2b3a9d0 Mon Sep 17 00:00:00 2001 From: ruben Date: Fri, 22 Mar 2024 21:31:03 +0100 Subject: [PATCH 6/6] bring back variable --- h3-quinn/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index c534eba0..1430a34a 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -579,7 +579,8 @@ where if let Some(data) = self.writing.take() { if let Some(mut stream) = self.stream.take() { self.write_fut.set(async move { - let ret = stream.write(data.chunk()).await; + let chunk = data.chunk(); + let ret = stream.write(chunk).await; (stream, ret, data) }); }