Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade grpc example deps, fixing #180 #181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions examples/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ publish = false


[dependencies]
tonic = "0.8"
prost = "0.11"
hyper = "0.14"
tonic = "0.12"
prost = "0.13"
hyper = "1.4"
async-stream = "0.3"
turmoil = { path = "../.." }
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1"
tower = "0.4"
tower = "0.5"
hyper-util = "0.1"

[build-dependencies]
tonic-build = "0.8"
prost = "0.11"
prost-build = "0.11"
protox = "0.2"
tonic-build = "0.12"
prost = "0.13"
prost-build = "0.13"
protox = "0.7.0"
120 changes: 69 additions & 51 deletions examples/grpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use hyper::server::accept::from_stream;
use hyper::Server;
use std::net::{IpAddr, Ipv4Addr};
use tonic::transport::Endpoint;
use tonic::transport::{Endpoint, Server};
use tonic::Status;
use tonic::{Request, Response};
use tower::make::Shared;
use tracing::{info_span, Instrument};
use turmoil::{net, Builder};
use turmoil::Builder;

#[allow(non_snake_case)]
mod proto {
tonic::include_proto!("helloworld");
}

use crate::connector::{TurmoilTcpConnector, TurmoilTcpStream};
use crate::proto::greeter_client::GreeterClient;
use proto::greeter_server::{Greeter, GreeterServer};
use proto::{HelloReply, HelloRequest};

use crate::proto::greeter_client::GreeterClient;
use turmoil::net::TcpListener;

fn main() {
configure_tracing();
Expand All @@ -30,15 +28,16 @@ fn main() {
sim.host("server", move || {
let greeter = greeter.clone();
async move {
Server::builder(from_stream(async_stream::stream! {
let listener = net::TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| s);
}
}))
.serve(Shared::new(greeter))
.await
.unwrap();
Server::builder()
.add_service(greeter)
.serve_with_incoming(async_stream::stream! {
let listener = TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| TurmoilTcpStream(s));
}
})
.await
.unwrap();

Ok(())
}
Expand All @@ -49,7 +48,7 @@ fn main() {
"client",
async move {
let ch = Endpoint::new("http://server:9999")?
.connect_with_connector(connector::connector())
.connect_with_connector(TurmoilTcpConnector)
.await?;
let mut greeter_client = GreeterClient::new(ch);

Expand Down Expand Up @@ -111,68 +110,87 @@ impl Greeter for MyGreeter {
}

mod connector {
use hyper::Uri;
use hyper_util::client::legacy::connect::Connected;
use hyper_util::rt::TokioIo;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};

use hyper::{
client::connect::{Connected, Connection},
Uri,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::server::TcpConnectInfo;
use tower::Service;
use turmoil::net::TcpStream;

type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, std::io::Error>> + Send>>;
#[derive(Clone)]
pub struct TurmoilTcpConnector;

impl Service<Uri> for TurmoilTcpConnector {
type Response = TokioIo<TurmoilTcpStream>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

pub fn connector(
) -> impl Service<Uri, Response = TurmoilConnection, Error = std::io::Error, Future = Fut> + Clone
{
tower::service_fn(|uri: Uri| {
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, uri: Uri) -> Self::Future {
Box::pin(async move {
let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok::<_, std::io::Error>(TurmoilConnection(conn))
}) as Fut
})
let stream = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok(TokioIo::new(TurmoilTcpStream(stream)))
})
}
}

pub struct TurmoilConnection(turmoil::net::TcpStream);
pub struct TurmoilTcpStream(pub TcpStream);

impl AsyncRead for TurmoilConnection {
impl hyper_util::client::legacy::connect::Connection for TurmoilTcpStream {
fn connected(&self) -> Connected {
Connected::new()
}
}

impl tonic::transport::server::Connected for TurmoilTcpStream {
type ConnectInfo = TcpConnectInfo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that TokioIo brings us pretty close to not having to implement all these traits for turmoil types, other than this one.

Copy link
Contributor Author

@LeonHartley LeonHartley Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need the AsyncRead / AsyncWrite implementations, or implementations for hyper::rt::Read/Write as TokioIo<T> requires T to impl either of those


fn connect_info(&self) -> Self::ConnectInfo {
TcpConnectInfo {
local_addr: self.0.local_addr().ok(),
remote_addr: self.0.peer_addr().ok(),
}
}
}

impl AsyncRead for TurmoilTcpStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for TurmoilConnection {
impl AsyncWrite for TurmoilTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
cx: &mut Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

impl Connection for TurmoilConnection {
fn connected(&self) -> hyper::client::connect::Connected {
Connected::new()
}
}
}
Loading