Skip to content

Commit

Permalink
chore(codec): Replace tonic::codec::encode::fuse with tokio-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Nov 13, 2023
1 parent 522a8d7 commit 5417c6e
Showing 1 changed file with 5 additions and 60 deletions.
65 changes: 5 additions & 60 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use std::{
};
use tokio_stream::{Stream, StreamExt};

use fuse::Fuse;

pub(super) const BUFFER_SIZE: usize = 8 * 1024;
const YIELD_THRESHOLD: usize = 32 * 1024;

Expand All @@ -29,7 +27,7 @@ where
{
let stream = EncodedBytes::new(
encoder,
source,
source.fuse(),
compression_encoding,
compression_override,
max_message_size,
Expand All @@ -50,7 +48,7 @@ where
{
let stream = EncodedBytes::new(
encoder,
source.map(Ok),
source.fuse().map(Ok),
compression_encoding,
SingleMessageCompressionOverride::default(),
max_message_size,
Expand All @@ -71,7 +69,7 @@ where
U: Stream<Item = Result<T::Item, Status>>,
{
#[pin]
source: Fuse<U>,
source: U,
encoder: T,
compression_encoding: Option<CompressionEncoding>,
max_message_size: Option<usize>,
Expand All @@ -84,6 +82,7 @@ where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
{
// `source` should be fused stream.
fn new(
encoder: T,
source: U,
Expand All @@ -107,7 +106,7 @@ where
};

Self {
source: Fuse::new(source),
source,
encoder,
compression_encoding,
max_message_size,
Expand Down Expand Up @@ -345,57 +344,3 @@ where
Poll::Ready(self.project().state.trailers())
}
}

mod fuse {
use std::{
pin::Pin,
task::{ready, Context, Poll},
};

use tokio_stream::Stream;

/// Stream for the [`fuse`](super::StreamExt::fuse) method.
#[derive(Debug)]
#[pin_project::pin_project]
#[must_use = "streams do nothing unless polled"]
pub(crate) struct Fuse<St> {
#[pin]
stream: St,
done: bool,
}

impl<St> Fuse<St> {
pub(crate) fn new(stream: St) -> Self {
Self {
stream,
done: false,
}
}
}

impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();

if *this.done {
return Poll::Ready(None);
}

let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
*this.done = true;
}
Poll::Ready(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.done {
(0, Some(0))
} else {
self.stream.size_hint()
}
}
}
}

0 comments on commit 5417c6e

Please sign in to comment.