Skip to content

Commit

Permalink
feat(client): add SendRequest::try_send_request() method
Browse files Browse the repository at this point in the history
This method returns a `TrySendError` type, which allows for returning
the request back to the caller if an error occured between queuing and
trying to write the request.

This method is added for both `http1` and `http2`.
  • Loading branch information
seanmonstar committed Jun 21, 2024
1 parent aa7ff60 commit 450bfa7
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 74 deletions.
51 changes: 28 additions & 23 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_util::ready;
use http::{Request, Response};
use httparse::ParserConfig;

use super::super::dispatch;
use super::super::dispatch::{self, TrySendError};
use crate::body::{Body, Incoming as IncomingBody};
use crate::proto;

Expand Down Expand Up @@ -200,33 +200,38 @@ where
}
}

/*
pub(super) fn send_request_retryable(
/// Sends a `Request` on the associated connection.
///
/// Returns a future that if successful, yields the `Response`.
///
/// # Error
///
/// If there was an error before trying to serialize the request to the
/// connection, the message will be returned as part of this error.
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
where
B: Send,
{
match self.dispatch.try_send(req) {
Ok(rx) => {
Either::Left(rx.then(move |res| {
match res {
Ok(Ok(res)) => future::ok(res),
Ok(Err(err)) => future::err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
}
Err(req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::Right(future::err((err, Some(req))))
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = crate::Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
}
*/
}

impl<B> fmt::Debug for SendRequest<B> {
Expand Down
51 changes: 28 additions & 23 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::rt::{Read, Write};
use futures_util::ready;
use http::{Request, Response};

use super::super::dispatch;
use super::super::dispatch::{self, TrySendError};
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::proto;
Expand Down Expand Up @@ -152,33 +152,38 @@ where
}
}

/*
pub(super) fn send_request_retryable(
/// Sends a `Request` on the associated connection.
///
/// Returns a future that if successful, yields the `Response`.
///
/// # Error
///
/// If there was an error before trying to serialize the request to the
/// connection, the message will be returned as part of this error.
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
where
B: Send,
{
match self.dispatch.try_send(req) {
Ok(rx) => {
Either::Left(rx.then(move |res| {
match res {
Ok(Ok(res)) => future::ok(res),
Ok(Err(err)) => future::err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
}
Err(req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::Right(future::err((err, Some(req))))
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = crate::Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
}
*/
}

impl<B> fmt::Debug for SendRequest<B> {
Expand Down
2 changes: 2 additions & 0 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
pub mod http1;
#[cfg(feature = "http2")]
pub mod http2;

pub use super::dispatch::TrySendError;
61 changes: 45 additions & 16 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "http2")]
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};

#[cfg(test)]
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;

/// An error when calling `try_send_request`.
///
/// There is a possibility of an error occuring on a connection in-between the
/// time that a request is queued and when it is actually written to the IO
/// transport. If that happens, it is safe to return the request back to the
/// caller, as it was never fully sent.
#[derive(Debug)]
pub struct TrySendError<T> {
pub(crate) error: crate::Error,
pub(crate) message: Option<T>,
}

pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded_channel();
let (giver, taker) = want::new();
Expand Down Expand Up @@ -92,7 +103,7 @@ impl<T, U> Sender<T, U> {
}
}

#[cfg(test)]
#[cfg(feature = "http1")]
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
if !self.can_send() {
return Err(val);
Expand Down Expand Up @@ -135,7 +146,6 @@ impl<T, U> UnboundedSender<T, U> {
self.giver.is_canceled()
}

#[cfg(test)]
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
Expand Down Expand Up @@ -210,17 +220,17 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() {
cb.send(Err((
crate::Error::new_canceled().with("connection closed"),
Some(val),
)));
cb.send(Err(TrySendError {
error: crate::Error::new_canceled().with("connection closed"),
message: Some(val),
}));
}
}
}

pub(crate) enum Callback<T, U> {
#[allow(unused)]
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
}

Expand All @@ -229,7 +239,10 @@ impl<T, U> Drop for Callback<T, U> {
match self {
Callback::Retry(tx) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Err((dispatch_gone(), None)));
let _ = tx.send(Err(TrySendError {
error: dispatch_gone(),
message: None,
}));
}
}
Callback::NoRetry(tx) => {
Expand Down Expand Up @@ -269,18 +282,34 @@ impl<T, U> Callback<T, U> {
}
}

pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
match self {
Callback::Retry(ref mut tx) => {
let _ = tx.take().unwrap().send(val);
}
Callback::NoRetry(ref mut tx) => {
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
let _ = tx.take().unwrap().send(val.map_err(|e| e.error));
}
}
}
}

impl<T> TrySendError<T> {
/// Take the message from this error.
///
/// The message will not always have been recovered. If an error occurs
/// after the message has been serialized onto the connection, it will not
/// be available here.
pub fn take_message(&mut self) -> Option<T> {
self.message.take()
}

/// Consumes this to return the inner error.
pub fn into_error(self) -> crate::Error {
self.error
}
}

#[cfg(feature = "http2")]
pin_project! {
pub struct SendWhen<B>
Expand Down Expand Up @@ -325,8 +354,8 @@ where
trace!("send_when canceled");
Poll::Ready(())
}
Poll::Ready(Err(err)) => {
call_back.send(Err(err));
Poll::Ready(Err((error, message))) => {
call_back.send(Err(TrySendError { error, message }));
Poll::Ready(())
}
}
Expand Down Expand Up @@ -389,8 +418,8 @@ mod tests {
let err = fulfilled
.expect("fulfilled")
.expect_err("promise should error");
match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (),
match (err.error.is_canceled(), err.message) {
(true, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}
}
Expand Down
18 changes: 13 additions & 5 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use http::Request;

use super::{Http1Transaction, Wants};
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
#[cfg(feature = "client")]
use crate::client::dispatch::TrySendError;
use crate::common::task;
use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
use crate::upgrade::OnUpgrade;
Expand Down Expand Up @@ -655,15 +657,21 @@ cfg_client! {
}
Err(err) => {
if let Some(cb) = self.callback.take() {
cb.send(Err((err, None)));
cb.send(Err(TrySendError {
error: err,
message: None,
}));
Ok(())
} else if !self.rx_closed {
self.rx.close();
if let Some((req, cb)) = self.rx.try_recv() {
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
cb.send(Err(TrySendError {
error: crate::Error::new_canceled().with(err),
message: Some(req),
}));
Ok(())
} else {
Err(err)
Expand Down Expand Up @@ -729,9 +737,9 @@ mod tests {
let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
.expect_err("callback should send error");

match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (),
other => panic!("expected Canceled, got {:?}", other),
match (err.error.is_canceled(), err.message.as_ref()) {
(true, Some(_)) => (),
_ => panic!("expected Canceled, got {:?}", err),
}
});
}
Expand Down
20 changes: 13 additions & 7 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use pin_project_lite::pin_project;
use super::ping::{Ponger, Recorder};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
use crate::body::{Body, Incoming as IncomingBody};
use crate::client::dispatch::{Callback, SendWhen};
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
use crate::common::io::Compat;
use crate::common::time::Time;
use crate::ext::Protocol;
Expand Down Expand Up @@ -662,10 +662,10 @@ where
.map_or(false, |len| len != 0)
{
warn!("h2 connect request with non-zero body not supported");
cb.send(Err((
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
None,
)));
cb.send(Err(TrySendError {
error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
message: None,
}));
continue;
}

Expand All @@ -677,7 +677,10 @@ where
Ok(ok) => ok,
Err(err) => {
debug!("client send request error: {}", err);
cb.send(Err((crate::Error::new_h2(err), None)));
cb.send(Err(TrySendError {
error: crate::Error::new_h2(err),
message: None,
}));
continue;
}
};
Expand All @@ -702,7 +705,10 @@ where
}
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => {
f.cb.send(Err((crate::Error::new_h2(err), None)));
f.cb.send(Err(TrySendError {
error: crate::Error::new_h2(err),
message: None,
}));
continue;
}
}
Expand Down

0 comments on commit 450bfa7

Please sign in to comment.