diff --git a/examples/exception_handler/Cargo.toml b/examples/exception_handler/Cargo.toml index e38ba03..688c3c2 100644 --- a/examples/exception_handler/Cargo.toml +++ b/examples/exception_handler/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent", features = ["full"] } -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } diff --git a/examples/form/Cargo.toml b/examples/form/Cargo.toml index e196fdf..0498975 100644 --- a/examples/form/Cargo.toml +++ b/examples/form/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } diff --git a/examples/grpc/Cargo.toml b/examples/grpc/Cargo.toml new file mode 100644 index 0000000..40b2942 --- /dev/null +++ b/examples/grpc/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "example-grpc" +edition.workspace = true +authors.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +version.workspace = true + +[[bin]] +name = "example-grpc-client" +path = "src/client.rs" + +[dependencies] +tonic = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } +tonic-reflection = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } +prost = "0.12" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +silent = { path = "../../silent", features = ["grpc"] } +axum = "0.7" +async-trait = "0.1.80" +hyper = "1.3.1" +hyper-util = "0.1.3" +bytes = "1.6.0" +pin-project-lite = "0.2.13" +http-body = "1.0.0" +http = "1.1.0" +http-body-util = "0.1.1" + +[build-dependencies] +tonic-build = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } diff --git a/examples/grpc/README.md b/examples/grpc/README.md new file mode 100644 index 0000000..e7f0230 --- /dev/null +++ b/examples/grpc/README.md @@ -0,0 +1,11 @@ +### Run server + +```bash +cargo run -p example-grpc --bin example-grpc +``` + +### Run client + +```bash +cargo run -p example-grpc --bin example-grpc-server +``` \ No newline at end of file diff --git a/examples/grpc/build.rs b/examples/grpc/build.rs new file mode 100644 index 0000000..78e5e44 --- /dev/null +++ b/examples/grpc/build.rs @@ -0,0 +1,5 @@ +fn main() { + tonic_build::configure() + .compile(&["proto/helloworld.proto"], &["/proto"]) + .unwrap(); +} diff --git a/examples/grpc/proto/helloworld.proto b/examples/grpc/proto/helloworld.proto new file mode 100644 index 0000000..8de5d08 --- /dev/null +++ b/examples/grpc/proto/helloworld.proto @@ -0,0 +1,37 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs new file mode 100644 index 0000000..b559b1b --- /dev/null +++ b/examples/grpc/src/client.rs @@ -0,0 +1,33 @@ +use hello_world::greeter_client::GreeterClient; +use hello_world::HelloRequest; + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = GreeterClient::connect("http://0.0.0.0:50051").await?; + + let request = tonic::Request::new(HelloRequest { + name: "Tonic".into(), + }); + + let response = client.say_hello(request).await?; + + println!("RESPONSE={:?}", response); + + println!("MESSAGE={:?}", response.into_inner()); + + let request = tonic::Request::new(HelloRequest { + name: "Tonic".into(), + }); + + let response = client.say_hello(request).await?; + + println!("RESPONSE={:?}", response); + + println!("MESSAGE={:?}", response.into_inner()); + + Ok(()) +} diff --git a/examples/grpc/src/main.rs b/examples/grpc/src/main.rs new file mode 100644 index 0000000..020b55e --- /dev/null +++ b/examples/grpc/src/main.rs @@ -0,0 +1,49 @@ +use async_trait::async_trait; +use hello_world::greeter_server::{Greeter, GreeterServer}; +use hello_world::{HelloReply, HelloRequest}; +use silent::prelude::{logger, HandlerAppend, Level, Route, RouteService, Server}; +use tonic::{transport::Server as TonicServer, Request, Response, Status}; + +mod client; + +pub mod hello_world { + tonic::include_proto!("helloworld"); // The string specified here must match the proto package name +} + +#[derive(Debug, Default)] +pub struct MyGreeter {} + +#[async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, // Accept request of type HelloRequest + ) -> Result, Status> { + // Return an instance of type HelloReply + println!("Got a request: {:?}", request); + + let reply = HelloReply { + message: format!("Hello {}!", request.into_inner().name), // We must use .into_inner() as the fields of gRPC requests and responses are private + }; + + Ok(Response::new(reply)) // Send back our formatted greeting + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let greeter = MyGreeter::default(); + logger::fmt().with_max_level(Level::INFO).init(); + let greeter_server = GreeterServer::new(greeter); + let grpc = TonicServer::builder() + // Wrap all services in the middleware stack + .add_service(greeter_server) + .into_router(); + let route = Route::new("").get(|_req| async { Ok("hello world") }); + let root = route.route().with_grpc(grpc.into()); + Server::new() + .bind("0.0.0.0:50051".parse().unwrap()) + .serve(root) + .await; + Ok(()) +} diff --git a/examples/grpc_h2c/Cargo.toml b/examples/grpc_h2c/Cargo.toml new file mode 100644 index 0000000..bd121de --- /dev/null +++ b/examples/grpc_h2c/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "example-grpc-h2c" +edition.workspace = true +authors.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +version.workspace = true + +[[bin]] +name = "example-grpc-client" +path = "src/client.rs" + +[dependencies] +tonic = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } +tonic-reflection = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } +prost = "0.12" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +silent = { path = "../../silent", features = ["grpc"] } +axum = "0.7" +async-trait = "0.1.80" +hyper = "1.3.1" +hyper-util = "0.1.3" +bytes = "1.6.0" +pin-project-lite = "0.2.13" +http-body = "1.0.0" +http = "1.1.0" +http-body-util = "0.1.1" +tower = "0.4.13" + +[build-dependencies] +tonic-build = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } diff --git a/examples/grpc_h2c/README.md b/examples/grpc_h2c/README.md new file mode 100644 index 0000000..76a2355 --- /dev/null +++ b/examples/grpc_h2c/README.md @@ -0,0 +1,11 @@ +### Run server + +```bash +cargo run -p example-grpc --bin example-grpc +``` + +### Run client + +```bash +cargo run -p example-grpc --bin example-grpc-server +``` diff --git a/examples/grpc_h2c/build.rs b/examples/grpc_h2c/build.rs new file mode 100644 index 0000000..3b3886e --- /dev/null +++ b/examples/grpc_h2c/build.rs @@ -0,0 +1,10 @@ +use std::{env, path::PathBuf}; + +fn main() { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + + tonic_build::configure() + .file_descriptor_set_path(out_dir.join("helloworld_descriptor.bin")) + .compile(&["proto/helloworld.proto"], &["/proto"]) + .unwrap(); +} diff --git a/examples/grpc_h2c/proto/helloworld.proto b/examples/grpc_h2c/proto/helloworld.proto new file mode 100644 index 0000000..d79a6a0 --- /dev/null +++ b/examples/grpc_h2c/proto/helloworld.proto @@ -0,0 +1,37 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/examples/grpc_h2c/src/client.rs b/examples/grpc_h2c/src/client.rs new file mode 100644 index 0000000..a5cb030 --- /dev/null +++ b/examples/grpc_h2c/src/client.rs @@ -0,0 +1,92 @@ +use hello_world::greeter_client::GreeterClient; +use hello_world::HelloRequest; +use http::Uri; +use hyper_util::client::legacy::Client; +use hyper_util::rt::TokioExecutor; + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let origin = Uri::from_static("http://[::1]:50051"); + let h2c_client = h2c::H2cChannel { + client: Client::builder(TokioExecutor::new()).build_http(), + }; + + let mut client = GreeterClient::with_origin(h2c_client, origin); + + let request = tonic::Request::new(HelloRequest { + name: "Tonic".into(), + }); + + let response = client.say_hello(request).await?; + + println!("RESPONSE={:?}", response); + + Ok(()) +} + +mod h2c { + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + use hyper::body::Incoming; + use hyper_util::{ + client::legacy::{connect::HttpConnector, Client}, + rt::TokioExecutor, + }; + use tonic::body::{empty_body, BoxBody}; + use tower::Service; + + pub struct H2cChannel { + pub client: Client, + } + + impl Service> for H2cChannel { + type Response = http::Response; + type Error = hyper::Error; + type Future = + Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let client = self.client.clone(); + + Box::pin(async move { + let origin = request.uri(); + + let h2c_req = hyper::Request::builder() + .uri(origin) + .method(request.method()) + .header(http::header::UPGRADE, "h2c") + .body(empty_body()) + .unwrap(); + + let res = client.request(h2c_req).await.unwrap(); + + if res.status() != http::StatusCode::SWITCHING_PROTOCOLS { + panic!("Our server didn't upgrade: {}", res.status()); + } + + let upgraded_io = hyper::upgrade::on(res).await.unwrap(); + + // In an ideal world you would somehow cache this connection + let (mut h2_client, conn) = + hyper::client::conn::http2::Builder::new(TokioExecutor::new()) + .handshake(upgraded_io) + .await + .unwrap(); + tokio::spawn(conn); + + h2_client.send_request(request).await + }) + } + } +} diff --git a/examples/grpc_h2c/src/main.rs b/examples/grpc_h2c/src/main.rs new file mode 100644 index 0000000..19e31fb --- /dev/null +++ b/examples/grpc_h2c/src/main.rs @@ -0,0 +1,45 @@ +// use tonic::transport::server::TowerToHyperService; +use tonic::{transport::Server, Request, Response, Status}; + +use hello_world::greeter_server::{Greeter, GreeterServer}; +use hello_world::{HelloReply, HelloRequest}; +use silent::prelude::{logger, HandlerAppend, Level, Route, RouteService}; + +pub mod hello_world { + tonic::include_proto!("helloworld"); +} + +#[derive(Default)] +pub struct MyGreeter {} + +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + println!("Got a request from {:?}", request.remote_addr()); + + let reply = hello_world::HelloReply { + message: format!("Hello {}!", request.into_inner().name), + }; + Ok(Response::new(reply)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let greeter = MyGreeter::default(); + logger::fmt().with_max_level(Level::INFO).init(); + + let grpc = Server::builder() + .add_service(GreeterServer::new(greeter)) + .into_router(); + let route = Route::new("").get(|_req| async { Ok("hello world") }); + let root = route.route().with_grpc(grpc.into()); + silent::prelude::Server::new() + .bind("0.0.0.0:50051".parse().unwrap()) + .serve(root) + .await; + Ok(()) +} diff --git a/examples/grpc_streaming/Cargo.toml b/examples/grpc_streaming/Cargo.toml new file mode 100644 index 0000000..3056b7d --- /dev/null +++ b/examples/grpc_streaming/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "example-grpc-streaming" +edition.workspace = true +authors.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +version.workspace = true + +[[bin]] +name = "example-grpc-client" +path = "src/client.rs" + +[dependencies] +tonic = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } +tonic-reflection = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } +prost = "0.12" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +silent = { path = "../../silent", features = ["grpc"] } +axum = "0.7" +async-trait = "0.1.80" +hyper = "1.3.1" +hyper-util = "0.1.3" +bytes = "1.6.0" +pin-project-lite = "0.2.13" +http-body = "1.0.0" +http = "1.1.0" +http-body-util = "0.1.1" +tokio-stream = "0.1.15" +h2 = "0.4.4" + +[build-dependencies] +tonic-build = { git = "https://github.com/alexrudy/tonic", branch = "hyper-1.0" } diff --git a/examples/grpc_streaming/README.md b/examples/grpc_streaming/README.md new file mode 100644 index 0000000..76a2355 --- /dev/null +++ b/examples/grpc_streaming/README.md @@ -0,0 +1,11 @@ +### Run server + +```bash +cargo run -p example-grpc --bin example-grpc +``` + +### Run client + +```bash +cargo run -p example-grpc --bin example-grpc-server +``` diff --git a/examples/grpc_streaming/build.rs b/examples/grpc_streaming/build.rs new file mode 100644 index 0000000..4657bea --- /dev/null +++ b/examples/grpc_streaming/build.rs @@ -0,0 +1,5 @@ +fn main() { + tonic_build::configure() + .compile(&["proto/echo.proto"], &["/proto"]) + .unwrap(); +} diff --git a/examples/grpc_streaming/proto/echo.proto b/examples/grpc_streaming/proto/echo.proto new file mode 100644 index 0000000..1fe2e9f --- /dev/null +++ b/examples/grpc_streaming/proto/echo.proto @@ -0,0 +1,43 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + syntax = "proto3"; + + package grpc.examples.echo; + + // EchoRequest is the request for echo. + message EchoRequest { + string message = 1; + } + + // EchoResponse is the response for echo. + message EchoResponse { + string message = 1; + } + + // Echo is the echo service. + service Echo { + // UnaryEcho is unary echo. + rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} + // ServerStreamingEcho is server side streaming. + rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} + // ClientStreamingEcho is client side streaming. + rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} + // BidirectionalStreamingEcho is bidi streaming. + rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} + } diff --git a/examples/grpc_streaming/src/client.rs b/examples/grpc_streaming/src/client.rs new file mode 100644 index 0000000..5d19b43 --- /dev/null +++ b/examples/grpc_streaming/src/client.rs @@ -0,0 +1,85 @@ +pub mod pb { + tonic::include_proto!("grpc.examples.echo"); +} + +use std::time::Duration; +use tokio_stream::{Stream, StreamExt}; +use tonic::transport::Channel; + +use pb::{echo_client::EchoClient, EchoRequest}; + +fn echo_requests_iter() -> impl Stream { + tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { + message: format!("msg {:02}", i), + }) +} + +async fn streaming_echo(client: &mut EchoClient, num: usize) { + let stream = client + .server_streaming_echo(EchoRequest { + message: "foo".into(), + }) + .await + .unwrap() + .into_inner(); + + // stream is infinite - take just 5 elements and then disconnect + let mut stream = stream.take(num); + while let Some(item) = stream.next().await { + println!("\treceived: {}", item.unwrap().message); + } + // stream is droped here and the disconnect info is send to server +} + +async fn bidirectional_streaming_echo(client: &mut EchoClient, num: usize) { + let in_stream = echo_requests_iter().take(num); + + let response = client + .bidirectional_streaming_echo(in_stream) + .await + .unwrap(); + + let mut resp_stream = response.into_inner(); + + while let Some(received) = resp_stream.next().await { + let received = received.unwrap(); + println!("\treceived message: `{}`", received.message); + } +} + +async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient, dur: Duration) { + let in_stream = echo_requests_iter().throttle(dur); + + let response = client + .bidirectional_streaming_echo(in_stream) + .await + .unwrap(); + + let mut resp_stream = response.into_inner(); + + while let Some(received) = resp_stream.next().await { + let received = received.unwrap(); + println!("\treceived message: `{}`", received.message); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = EchoClient::connect("http://0.0.0.0:50051").await.unwrap(); + + println!("Streaming echo:"); + streaming_echo(&mut client, 5).await; + tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions + + // Echo stream that sends 17 requests then graceful end that connection + println!("\r\nBidirectional stream echo:"); + bidirectional_streaming_echo(&mut client, 17).await; + + // Echo stream that sends up to `usize::MAX` requests. One request each 2s. + // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from + // graceful client disconnection (above example) on the server side. + println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); + bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await; + + Ok(()) +} diff --git a/examples/grpc_streaming/src/main.rs b/examples/grpc_streaming/src/main.rs new file mode 100644 index 0000000..7feba8f --- /dev/null +++ b/examples/grpc_streaming/src/main.rs @@ -0,0 +1,164 @@ +use async_trait::async_trait; +use silent::prelude::{logger, HandlerAppend, Level, Route, RouteService, Server}; + +mod client; + +pub mod pb { + tonic::include_proto!("grpc.examples.echo"); +} + +use std::{error::Error, io::ErrorKind, pin::Pin, time::Duration}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tonic::{transport::Server as TonicServer, Request, Response, Status, Streaming}; + +use pb::{EchoRequest, EchoResponse}; + +type EchoResult = Result, Status>; +type ResponseStream = Pin> + Send>>; + +fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { + let mut err: &(dyn Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::() { + return Some(io_err); + } + + // h2::Error do not expose std::io::Error with `source()` + // https://github.com/hyperium/h2/pull/462 + if let Some(h2_err) = err.downcast_ref::() { + if let Some(io_err) = h2_err.get_io() { + return Some(io_err); + } + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +} + +#[derive(Debug)] +pub struct EchoServer {} + +#[tonic::async_trait] +impl pb::echo_server::Echo for EchoServer { + async fn unary_echo(&self, _: Request) -> EchoResult { + Err(Status::unimplemented("not implemented")) + } + + type ServerStreamingEchoStream = ResponseStream; + + async fn server_streaming_echo( + &self, + req: Request, + ) -> EchoResult { + println!("EchoServer::server_streaming_echo"); + println!("\tclient connected from: {:?}", req.remote_addr()); + + // creating infinite stream with requested message + let repeat = std::iter::repeat(EchoResponse { + message: req.into_inner().message, + }); + let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); + + // spawn and channel are required if you want handle "disconnect" functionality + // the `out_stream` will not be polled after client disconnect + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + while let Some(item) = stream.next().await { + match tx.send(Result::<_, Status>::Ok(item)).await { + Ok(_) => { + // item (server response) was queued to be send to client + } + Err(_item) => { + // output_stream was build from rx and both are dropped + break; + } + } + } + println!("\tclient disconnected"); + }); + + let output_stream = ReceiverStream::new(rx); + Ok(Response::new( + Box::pin(output_stream) as Self::ServerStreamingEchoStream + )) + } + + async fn client_streaming_echo( + &self, + _: Request>, + ) -> EchoResult { + Err(Status::unimplemented("not implemented")) + } + + type BidirectionalStreamingEchoStream = ResponseStream; + + async fn bidirectional_streaming_echo( + &self, + req: Request>, + ) -> EchoResult { + println!("EchoServer::bidirectional_streaming_echo"); + + let mut in_stream = req.into_inner(); + let (tx, rx) = mpsc::channel(128); + + // this spawn here is required if you want to handle connection error. + // If we just map `in_stream` and write it back as `out_stream` the `out_stream` + // will be drooped when connection error occurs and error will never be propagated + // to mapped version of `in_stream`. + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(v) => tx + .send(Ok(EchoResponse { message: v.message })) + .await + .expect("working rx"), + Err(err) => { + if let Some(io_err) = match_for_io_error(&err) { + if io_err.kind() == ErrorKind::BrokenPipe { + // here you can handle special case when client + // disconnected in unexpected way + eprintln!("\tclient disconnected: broken pipe"); + break; + } + } + + match tx.send(Err(err)).await { + Ok(_) => (), + Err(_err) => break, // response was droped + } + } + } + } + println!("\tstream ended"); + }); + + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new( + Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream + )) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + logger::fmt().with_max_level(Level::INFO).init(); + let server = EchoServer {}; + let grpc = TonicServer::builder() + // Wrap all services in the middleware stack + .add_service(pb::echo_server::EchoServer::new(server)) + .into_router(); + let route = Route::new("").get(|_req| async { Ok("hello world") }); + let root = route.route().with_grpc(grpc.into()); + Server::new() + .bind("0.0.0.0:50051".parse().unwrap()) + .serve(root) + .await; + Ok(()) +} diff --git a/examples/llma_chat/Cargo.toml b/examples/llma_chat/Cargo.toml index 63656e8..032d51b 100644 --- a/examples/llma_chat/Cargo.toml +++ b/examples/llma_chat/Cargo.toml @@ -10,6 +10,6 @@ async-trait = "0.1.80" llm = "0.1.1" once_cell = "1.19.0" rand = "0.8.5" -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } silent = { path = "../../silent", features = ["ws"] } tokio = { version = "1.37.0", features = ["full"] } diff --git a/examples/multipart-form/Cargo.toml b/examples/multipart-form/Cargo.toml index 346b459..bb2807d 100644 --- a/examples/multipart-form/Cargo.toml +++ b/examples/multipart-form/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } diff --git a/examples/silent-db-macros/Cargo.toml b/examples/silent-db-macros/Cargo.toml index 0b21f24..4f317fb 100644 --- a/examples/silent-db-macros/Cargo.toml +++ b/examples/silent-db-macros/Cargo.toml @@ -12,4 +12,4 @@ version = "0.1.0" [dependencies] silent-db = { path = "../../silent-db", features = ["mysql"] } -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } diff --git a/examples/silent_db/Cargo.toml b/examples/silent_db/Cargo.toml index 2e46ff5..bfffcf5 100644 --- a/examples/silent_db/Cargo.toml +++ b/examples/silent_db/Cargo.toml @@ -11,7 +11,7 @@ version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } silent-db = { path = "../../silent-db", features = ["mysql"] } tokio = { version = "1.37", features = ["full"] } anyhow = "1.0.82" diff --git a/examples/sse-chat/Cargo.toml b/examples/sse-chat/Cargo.toml index 22d2e7a..8e941a0 100644 --- a/examples/sse-chat/Cargo.toml +++ b/examples/sse-chat/Cargo.toml @@ -11,4 +11,4 @@ once_cell = "1" parking_lot = "0.12" tokio = { version = "1", features = ["macros"] } tokio-stream = { version = "0.1", features = ["net"] } -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } diff --git a/examples/templates/Cargo.toml b/examples/templates/Cargo.toml index 8c8d168..c6014fc 100644 --- a/examples/templates/Cargo.toml +++ b/examples/templates/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } silent = { path = "../../silent", features = ["template"] } diff --git a/examples/todo/Cargo.toml b/examples/todo/Cargo.toml index e2f3cf9..4d15a6d 100644 --- a/examples/todo/Cargo.toml +++ b/examples/todo/Cargo.toml @@ -7,6 +7,6 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } uuid = { version = "1.8.0", features = ["serde", "v4"] } async-trait = "0.1.80" diff --git a/silent-db/Cargo.toml b/silent-db/Cargo.toml index 0621f3f..a7c1763 100644 --- a/silent-db/Cargo.toml +++ b/silent-db/Cargo.toml @@ -32,4 +32,4 @@ silent-db-macros = { path = "../silent-db-macros", version = "0.2.0" } regex = "1.10.4" [dev-dependencies] -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } diff --git a/silent/Cargo.toml b/silent/Cargo.toml index 10a02a7..bad937a 100644 --- a/silent/Cargo.toml +++ b/silent/Cargo.toml @@ -16,7 +16,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = ["server", "test", "static"] -full = ["admin", "server", "multipart", "ws", "sse", "security", "static", "session", "cookie", "template", "test", "scheduler"] +full = ["admin", "server", "multipart", "ws", "sse", "security", "static", "session", "cookie", "template", "test", "scheduler", "grpc"] admin = ["server", "sse", "template", "session"] server = ["tokio/fs", "tokio/net", "tokio/rt-multi-thread", "tokio/signal"] ws = [] @@ -30,17 +30,19 @@ template = [] #wasi = ["tokio/sync"] test = ["tokio/macros", "tokio/rt"] scheduler = [] +grpc = ["axum", "tower-service"] [dependencies] thiserror = "1.0.59" hyper = { version = "1.3.1", features = ["full"] } +hyper-util = { version = "0.1.3", features = ["full"] } tokio = { version = "1.37.0", optional = true } bytes = "1.6.0" http-body-util = "0.1.1" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["local-time"] } async-trait = "0.1.80" -serde = { version = "1.0.198", features = ["derive"] } +serde = { version = "1.0.200", features = ["derive"] } serde_json = "1.0.116" uuid = "1.8.0" url = "2.5.0" @@ -49,7 +51,7 @@ multimap = { version = "0.10.0", features = ["serde"] } mime = "0.3.17" tempfile = "3.10.1" textnonce = "1.0.0" -multer = "3.0.0" +multer = "3.1.0" futures-util = "0.3.30" chrono = { version = "0.4.38", default-features = false, features = ["clock"] } tokio-tungstenite = "0.21.0" @@ -69,6 +71,8 @@ tera = "1.19.1" http = "1.1.0" http-body = "1.0.0" futures = "0.3.30" -tokio-util = "0.7.10" +tokio-util = "0.7.11" anyhow = "1.0.82" cron = "0.12.1" +axum = { version = "0.7.5", optional = true } +tower-service = { version = "0.3.2", optional = true } \ No newline at end of file diff --git a/silent/src/conn/mod.rs b/silent/src/conn/mod.rs deleted file mode 100644 index 2d96611..0000000 --- a/silent/src/conn/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -pub mod support; - -use crate::rt::RtExecutor; -use hyper::server::conn::{http1, http2}; - -#[doc(hidden)] -#[allow(dead_code)] -pub struct SilentConnection { - pub(crate) http1: http1::Builder, - pub(crate) http2: http2::Builder, -} - -impl Default for SilentConnection { - fn default() -> Self { - Self { - http1: http1::Builder::new(), - http2: http2::Builder::new(RtExecutor), - } - } -} diff --git a/silent/src/conn/support/mod.rs b/silent/src/conn/support/mod.rs deleted file mode 100644 index cd8e496..0000000 --- a/silent/src/conn/support/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod tokiort; - -#[allow(unused_imports)] -pub use tokiort::{TokioExecutor, TokioIo, TokioTimer}; diff --git a/silent/src/conn/support/tokiort.rs b/silent/src/conn/support/tokiort.rs deleted file mode 100644 index 05b117a..0000000 --- a/silent/src/conn/support/tokiort.rs +++ /dev/null @@ -1,235 +0,0 @@ -#![allow(dead_code)] -//! Various runtimes for hyper -use std::{ - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use futures_util::Future; -use hyper::rt::{Sleep, Timer}; -use pin_project_lite::pin_project; - -#[derive(Clone)] -/// An Executor that uses the tokio runtime. -pub struct TokioExecutor; - -impl hyper::rt::Executor for TokioExecutor -where - F: std::future::Future + Send + 'static, - F::Output: Send + 'static, -{ - fn execute(&self, fut: F) { - tokio::task::spawn(fut); - } -} - -/// A Timer that uses the tokio runtime. -#[derive(Clone, Debug)] -pub struct TokioTimer; - -impl Timer for TokioTimer { - fn sleep(&self, duration: Duration) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep(duration), - }) - } - - fn sleep_until(&self, deadline: Instant) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep_until(deadline.into()), - }) - } - - fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { - if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { - sleep.reset(new_deadline) - } - } -} - -struct TokioTimeout { - inner: Pin>>, -} - -impl Future for TokioTimeout -where - T: Future, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { - self.inner.as_mut().poll(context) - } -} - -// Use TokioSleep to get tokio::time::Sleep to implement Unpin. -// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html -pin_project! { - pub(crate) struct TokioSleep { - #[pin] - pub(crate) inner: tokio::time::Sleep, - } -} - -impl Future for TokioSleep { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) - } -} - -impl Sleep for TokioSleep {} - -impl TokioSleep { - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - self.project().inner.as_mut().reset(deadline.into()); - } -} - -pin_project! { - #[derive(Debug)] - pub struct TokioIo { - #[pin] - inner: T, - } -} - -impl TokioIo { - pub fn new(inner: T) -> Self { - Self { inner } - } - - pub fn inner(self) -> T { - self.inner - } -} - -impl hyper::rt::Read for TokioIo -where - T: tokio::io::AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut buf: hyper::rt::ReadBufCursor<'_>, - ) -> Poll> { - let n = unsafe { - let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); - match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { - Poll::Ready(Ok(())) => tbuf.filled().len(), - other => return other, - } - }; - - unsafe { - buf.advance(n); - } - Poll::Ready(Ok(())) - } -} - -impl hyper::rt::Write for TokioIo -where - T: tokio::io::AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - tokio::io::AsyncWrite::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) - } -} - -impl tokio::io::AsyncRead for TokioIo -where - T: hyper::rt::Read, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - tbuf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - //let init = tbuf.initialized().len(); - let filled = tbuf.filled().len(); - let sub_filled = unsafe { - let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); - - match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { - Poll::Ready(Ok(())) => buf.filled().len(), - other => return other, - } - }; - - let n_filled = filled + sub_filled; - // At least sub_filled bytes had to have been initialized. - let n_init = sub_filled; - unsafe { - tbuf.assume_init(n_init); - tbuf.set_filled(n_filled); - } - - Poll::Ready(Ok(())) - } -} - -impl tokio::io::AsyncWrite for TokioIo -where - T: hyper::rt::Write, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - hyper::rt::Write::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - hyper::rt::Write::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - hyper::rt::Write::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - hyper::rt::Write::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) - } -} diff --git a/silent/src/core/adapt.rs b/silent/src/core/adapt.rs index 093e4bc..7c9b5f0 100644 --- a/silent/src/core/adapt.rs +++ b/silent/src/core/adapt.rs @@ -1,4 +1,5 @@ use crate::core::req_body::ReqBody; +use crate::prelude::ResBody; #[cfg(feature = "cookie")] use crate::SilentError; use crate::{Request, Response}; @@ -7,13 +8,13 @@ use cookie::{Cookie, CookieJar}; #[cfg(feature = "cookie")] use http::{header, StatusCode}; use hyper::Request as HyperRequest; +use hyper::Response as HyperResponse; pub trait RequestAdapt { fn tran_to_request(self) -> Request; } -#[allow(dead_code)] -pub trait ResponseAdapt { +pub trait ResponseAdapt { fn tran_from_response(res: Response) -> Self; } @@ -55,3 +56,36 @@ impl RequestAdapt for HyperRequest { Request::from_parts(parts, body) } } + +impl ResponseAdapt for HyperResponse { + fn tran_from_response(res: Response) -> Self { + #[cfg(feature = "cookie")] + let Response { + status_code, + headers, + body, + cookies, + .. + } = res; + #[cfg(not(feature = "cookie"))] + let Response { + status_code, + headers, + body, + .. + } = res; + + let mut res = hyper::Response::new(body); + *res.headers_mut() = headers; + #[cfg(feature = "cookie")] + for cookie in cookies.delta() { + if let Ok(hv) = cookie.encoded().to_string().parse() { + res.headers_mut().append(header::SET_COOKIE, hv); + } + } + // Default to a 404 if no response code was set + *res.status_mut() = status_code; + + res + } +} diff --git a/silent/src/core/req_body.rs b/silent/src/core/req_body.rs index 363d09f..2e85b2e 100644 --- a/silent/src/core/req_body.rs +++ b/silent/src/core/req_body.rs @@ -1,14 +1,20 @@ +use std::io::{Error as IoError, ErrorKind}; +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::Bytes; use futures_util::Stream; use http_body::{Body, Frame, SizeHint}; use hyper::body::Incoming; -use std::io::{Error as IoError, ErrorKind}; -use std::pin::Pin; -use std::task::{Context, Poll}; #[derive(Debug)] +/// 请求体 pub enum ReqBody { + /// Empty body. Empty, + /// Once bytes body. + Once(Bytes), + /// Incoming default body. Incoming(Incoming), } @@ -34,6 +40,7 @@ impl Body for ReqBody { ) -> Poll, Self::Error>>> { match &mut *self { ReqBody::Empty => Poll::Ready(None), + ReqBody::Once(bytes) => Poll::Ready(Some(Ok(Frame::data(bytes.clone())))), ReqBody::Incoming(body) => Pin::new(body) .poll_frame(cx) .map_err(|e| IoError::new(ErrorKind::Other, e)), @@ -43,6 +50,7 @@ impl Body for ReqBody { fn is_end_stream(&self) -> bool { match self { ReqBody::Empty => true, + ReqBody::Once(bytes) => bytes.is_empty(), ReqBody::Incoming(body) => body.is_end_stream(), } } @@ -50,6 +58,7 @@ impl Body for ReqBody { fn size_hint(&self) -> SizeHint { match self { ReqBody::Empty => SizeHint::with_exact(0), + ReqBody::Once(bytes) => SizeHint::with_exact(bytes.len() as u64), ReqBody::Incoming(body) => body.size_hint(), } } diff --git a/silent/src/core/request.rs b/silent/src/core/request.rs index 4191c8d..5423458 100644 --- a/silent/src/core/request.rs +++ b/silent/src/core/request.rs @@ -8,13 +8,15 @@ use crate::header::CONTENT_TYPE; #[cfg(feature = "scheduler")] use crate::Scheduler; use crate::{Configs, SilentError}; +use bytes::Bytes; #[cfg(feature = "cookie")] use cookie::{Cookie, CookieJar}; use http::request::Parts; -use http::Request as BaseRequest; use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version}; +use http::{Request as BaseRequest, StatusCode}; use http_body_util::BodyExt; use mime::Mime; +use serde::de::StdError; use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; @@ -46,6 +48,60 @@ pub struct Request { pub(crate) configs: Configs, } +impl Request { + /// 从http请求体创建请求 + pub fn into_http(self) -> http::Request { + http::Request::from_parts(self.parts, self.body) + } + /// Strip the request to [`hyper::Request`]. + #[doc(hidden)] + pub fn strip_to_hyper(&mut self) -> Result, SilentError> + where + QB: TryFrom, + >::Error: StdError + Send + Sync + 'static, + { + let mut builder = http::request::Builder::new() + .method(self.method().clone()) + .uri(self.uri().clone()) + .version(self.version()); + if let Some(headers) = builder.headers_mut() { + *headers = std::mem::take(self.headers_mut()); + } + if let Some(extensions) = builder.extensions_mut() { + *extensions = std::mem::take(self.extensions_mut()); + } + + let body = self.take_body(); + builder + .body(body.try_into().map_err(|e| { + SilentError::business_error( + StatusCode::INTERNAL_SERVER_ERROR, + format!("request strip to hyper failed: {e}"), + ) + })?) + .map_err(|e| SilentError::business_error(StatusCode::BAD_REQUEST, e.to_string())) + } + /// Strip the request to [`hyper::Request`]. + #[doc(hidden)] + pub async fn strip_to_bytes_hyper(&mut self) -> Result, SilentError> { + let mut builder = http::request::Builder::new() + .method(self.method().clone()) + .uri(self.uri().clone()) + .version(self.version()); + if let Some(headers) = builder.headers_mut() { + *headers = std::mem::take(self.headers_mut()); + } + if let Some(extensions) = builder.extensions_mut() { + *extensions = std::mem::take(self.extensions_mut()); + } + + let mut body = self.take_body(); + builder + .body(body.frame().await.unwrap().unwrap().into_data().unwrap()) + .map_err(|e| SilentError::business_error(StatusCode::BAD_REQUEST, e.to_string())) + } +} + impl Default for Request { fn default() -> Self { Self::empty() @@ -309,6 +365,13 @@ impl Request { .await?; Ok(serde_json::from_value(value.to_owned())?) } + ReqBody::Once(bytes) => match content_type.subtype() { + mime::WWW_FORM_URLENCODED => { + serde_urlencoded::from_bytes(&bytes).map_err(SilentError::from) + } + mime::JSON => serde_json::from_slice(&bytes).map_err(|e| e.into()), + _ => Err(SilentError::JsonEmpty), + }, ReqBody::Empty => Err(SilentError::BodyEmpty), } } diff --git a/silent/src/core/res_body.rs b/silent/src/core/res_body.rs index cf3fbf7..bc096bc 100644 --- a/silent/src/core/res_body.rs +++ b/silent/src/core/res_body.rs @@ -22,6 +22,8 @@ pub enum ResBody { Incoming(Incoming), /// Stream body. Stream(BoxStream<'static, Result>), + /// Boxed body. + Boxed(Pin + Send + Sync + 'static>>), } /// 转换数据为响应Body @@ -68,6 +70,14 @@ impl Stream for ResBody { .as_mut() .poll_next(cx) .map_err(|e| IoError::new(ErrorKind::Other, e)), + ResBody::Boxed(body) => match Body::poll_frame(Pin::new(body), cx) { + Poll::Ready(Some(Ok(frame))) => Poll::Ready(frame.into_data().map(Ok).ok()), + Poll::Ready(Some(Err(e))) => { + Poll::Ready(Some(Err(IoError::new(ErrorKind::Other, e)))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, } } } @@ -94,6 +104,7 @@ impl Body for ResBody { ResBody::Once(bytes) => bytes.is_empty(), ResBody::Chunks(chunks) => chunks.is_empty(), ResBody::Incoming(body) => body.is_end_stream(), + ResBody::Boxed(body) => body.is_end_stream(), ResBody::Stream(_) => false, } } @@ -107,6 +118,7 @@ impl Body for ResBody { SizeHint::with_exact(size) } ResBody::Incoming(recv) => recv.size_hint(), + ResBody::Boxed(recv) => recv.size_hint(), ResBody::Stream(_) => SizeHint::default(), } } diff --git a/silent/src/core/response.rs b/silent/src/core/response.rs index 62368e3..9a54b32 100644 --- a/silent/src/core/response.rs +++ b/silent/src/core/response.rs @@ -1,9 +1,12 @@ use crate::core::res_body::{full, ResBody}; use crate::headers::{ContentType, Header, HeaderMap, HeaderMapExt}; +#[cfg(feature = "grpc")] +use crate::prelude::stream_body; use crate::{header, Configs, Result, SilentError, StatusCode}; use bytes::Bytes; #[cfg(feature = "cookie")] use cookie::{Cookie, CookieJar}; +use http::response::Parts; use http::Extensions; use http_body::{Body, SizeHint}; use serde::Serialize; @@ -16,18 +19,61 @@ use std::fmt::{Display, Formatter}; /// use silent::Response; /// let req = Response::empty(); /// ``` -pub struct Response { +pub struct Response { /// The HTTP status code. pub(crate) status_code: StatusCode, /// The HTTP headers. pub(crate) headers: HeaderMap, - pub(crate) body: ResBody, + pub(crate) body: B, #[cfg(feature = "cookie")] pub(crate) cookies: CookieJar, pub(crate) extensions: Extensions, pub(crate) configs: Configs, } +impl Response { + #[cfg(feature = "grpc")] + /// 合并axum响应 + #[inline] + pub async fn merge_axum(&mut self, res: axum::response::Response) { + let (parts, body) = res.into_parts(); + let Parts { + status, + headers, + extensions, + .. + } = parts; + self.status_code = status; + headers.iter().for_each(|(key, value)| { + self.headers.insert(key.clone(), value.clone()); + }); + self.extensions.extend(extensions); + self.body = stream_body(body.into_data_stream()); + } + + /// 合并hyper响应 + #[inline] + pub fn merge_hyper(&mut self, hyper_res: hyper::Response) + where + B: Into, + { + let ( + Parts { + status, + headers, + extensions, + .. + }, + body, + ) = hyper_res.into_parts(); + + self.status_code = status; + self.headers = headers; + self.extensions = extensions; + self.body = body.into(); + } +} + impl fmt::Debug for Response { #[inline] fn fmt(&self, f: &mut Formatter) -> fmt::Result { @@ -56,19 +102,40 @@ impl Response { } } /// 设置响应状态 + #[inline] pub fn set_status(&mut self, status: StatusCode) { self.status_code = status; } + /// 包含响应状态 + #[inline] + pub fn with_status(mut self, status: StatusCode) -> Self { + self.status_code = status; + self + } /// 设置响应body + #[inline] pub fn set_body(&mut self, body: ResBody) { self.body = body; } + /// 包含响应body + #[inline] + pub fn with_body(mut self, body: ResBody) -> Self { + self.body = body; + self + } /// 获取响应体 + #[inline] pub fn body(&self) -> &ResBody { &self.body } /// 设置响应header - pub fn set_header(mut self, key: header::HeaderName, value: header::HeaderValue) -> Self { + #[inline] + pub fn set_header(&mut self, key: header::HeaderName, value: header::HeaderValue) { + self.headers.insert(key, value); + } + /// 包含响应header + #[inline] + pub fn with_header(mut self, key: header::HeaderName, value: header::HeaderValue) -> Self { self.headers.insert(key, value); self } @@ -145,37 +212,14 @@ impl Response { { self.headers.typed_insert(header); } - #[inline] - pub(crate) fn into_hyper(self) -> hyper::Response { - #[cfg(feature = "cookie")] - let Self { - status_code, - headers, - body, - cookies, - .. - } = self; - #[cfg(not(feature = "cookie"))] - let Self { - status_code, - headers, - body, - .. - } = self; - - let mut res = hyper::Response::new(body); - *res.headers_mut() = headers; - #[cfg(feature = "cookie")] - for cookie in cookies.delta() { - if let Ok(hv) = cookie.encoded().to_string().parse() { - res.headers_mut().append(header::SET_COOKIE, hv); - } - } - // Default to a 404 if no response code was set - *res.status_mut() = status_code; - - res + /// 包含响应header + pub fn with_typed_header(mut self, header: H) -> Self + where + H: Header, + { + self.headers.typed_insert(header); + self } #[cfg(feature = "cookie")] diff --git a/silent/src/grpc/handler.rs b/silent/src/grpc/handler.rs new file mode 100644 index 0000000..92b742d --- /dev/null +++ b/silent/src/grpc/handler.rs @@ -0,0 +1,69 @@ +use crate::grpc::service::GrpcService; +use crate::{Handler, Response, SilentError}; +use async_trait::async_trait; +use http::{header, HeaderValue, StatusCode}; +use hyper::upgrade::OnUpgrade; +use hyper_util::rt::TokioExecutor; +use tower_service::Service; +use tracing::error; + +#[derive(Clone)] +pub struct GrpcHandler(axum::Router<()>); + +impl From> for GrpcHandler { + fn from(router: axum::Router<()>) -> Self { + Self(router) + } +} + +#[async_trait] +impl Handler for GrpcHandler { + async fn call(&self, mut req: crate::Request) -> crate::Result { + if let Some(on_upgrade) = req.extensions_mut().remove::() { + let handler = self.0.clone(); + tokio::spawn(async move { + let conn = on_upgrade.await; + if conn.is_err() { + eprintln!("upgrade error: {:?}", conn.err()); + return; + } + let upgraded_io = conn.unwrap(); + + let http = hyper::server::conn::http2::Builder::new(TokioExecutor::new()); + match http + .serve_connection(upgraded_io, GrpcService::new(handler)) + .await + { + Ok(_) => eprintln!("finished gracefully"), + Err(err) => println!("ERROR: {err}"), + } + }); + let mut res = Response::empty(); + res.set_status(StatusCode::SWITCHING_PROTOCOLS); + res.headers_mut() + .insert(header::UPGRADE, HeaderValue::from_static("h2c")); + Ok(res) + } else { + let mut handler = self.0.clone(); + let req = req.into_http(); + + let axum_res = handler.call(req).await.map_err(|e| { + error!(error = ?e, "call axum router failed: {}", e); + SilentError::business_error( + StatusCode::INTERNAL_SERVER_ERROR, + format!("call axum router failed: {}", e), + ) + })?; + let mut res = Response::empty(); + res.merge_axum(axum_res).await; + res.headers_mut() + .insert(header::CONTENT_TYPE, "application/grpc".parse().unwrap()); + res.headers_mut().insert( + header::HeaderName::from_static("grpc-status"), + "0".parse().unwrap(), + ); + + Ok(res) + } + } +} diff --git a/silent/src/grpc/mod.rs b/silent/src/grpc/mod.rs new file mode 100644 index 0000000..d0f4da9 --- /dev/null +++ b/silent/src/grpc/mod.rs @@ -0,0 +1,5 @@ +mod handler; +mod service; +// mod stream; + +pub use handler::GrpcHandler; diff --git a/silent/src/grpc/service.rs b/silent/src/grpc/service.rs new file mode 100644 index 0000000..9a097a0 --- /dev/null +++ b/silent/src/grpc/service.rs @@ -0,0 +1,43 @@ +use std::future::Future; +use std::pin::Pin; + +use hyper::body::Incoming; +use hyper::service::Service as HyperService; +use tower_service::Service; + +use crate::log::error; + +#[doc(hidden)] +#[derive(Clone)] +pub struct GrpcService { + pub(crate) handler: axum::Router<()>, +} + +impl GrpcService { + #[inline] + #[allow(dead_code)] + pub fn new(handler: axum::Router<()>) -> Self { + Self { handler } + } +} + +impl HyperService> for GrpcService { + type Response = axum::response::Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + #[inline] + fn call(&self, req: hyper::Request) -> Self::Future { + let mut handler = self.handler.clone(); + Box::pin(async move { + let res = handler + .call(req) + .await + .map_err(|e| { + error!(error = ?e, "call grpc router failed: {}", e); + }) + .unwrap(); + Ok(res) + }) + } +} diff --git a/silent/src/grpc/stream.rs b/silent/src/grpc/stream.rs new file mode 100644 index 0000000..0255e2a --- /dev/null +++ b/silent/src/grpc/stream.rs @@ -0,0 +1,85 @@ +use crate::prelude::ReqBody; +use crate::SilentError; +use bytes::Bytes; +use futures::Stream; +use http::request::Parts; +use http::StatusCode; +use http_body::Body; +use http_body_util::BodyExt; +use hyper::body::Incoming; +use pin_project_lite::pin_project; +use std::task::Poll; +use tokio::runtime::Handle; +use tower_service::Service; +use tracing::error; + +pin_project! { + pub(crate) struct GrpcStream{ + #[pin] + incoming: Incoming, + handler: axum::Router<()>, + parts: Parts, + } +} + +impl GrpcStream { + pub fn new(incoming: Incoming, handler: axum::Router<()>, parts: Parts) -> Self { + Self { + incoming, + handler, + parts, + } + } +} + +impl Stream for GrpcStream { + type Item = crate::Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut stream = self.project(); + match stream.incoming.as_mut().poll_frame(cx) { + Poll::Ready(Some(Ok(chunk))) => match chunk.into_data() { + Ok(chunk) => { + let body = ReqBody::Once(chunk); + let req = hyper::Request::from_parts(stream.parts.clone(), body); + let mut handler = stream.handler.clone(); + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + let axum_res = handler.call(req).await.map_err(|e| { + error!(error = ?e, "call axum router failed: {}", e); + SilentError::business_error( + StatusCode::INTERNAL_SERVER_ERROR, + format!("call axum router failed: {}", e), + ) + })?; + let mut body = axum_res.into_body(); + if let Some(Ok(chunk)) = body.frame().await { + if let Ok(chunk) = chunk.into_data() { + eprintln!("stream chunk: {:#?}", chunk); + Poll::Ready(Some(Ok(chunk))) + } else { + Poll::Ready(None) + } + } else { + Poll::Ready(None) + } + }) + }) + } + Err(_) => Poll::Ready(None), + }, + Poll::Pending => Poll::Pending, + _ => Poll::Ready(Some(Ok(Bytes::new()))), + } + } + fn size_hint(&self) -> (usize, Option) { + let size_hint = self.incoming.size_hint(); + ( + size_hint.lower() as usize, + size_hint.upper().map(|x| x as usize), + ) + } +} diff --git a/silent/src/lib.rs b/silent/src/lib.rs index e09ffb8..3435e60 100644 --- a/silent/src/lib.rs +++ b/silent/src/lib.rs @@ -1,10 +1,10 @@ mod configs; /// The `silent` library. -#[cfg(feature = "server")] -mod conn; #[warn(missing_docs)] mod core; mod error; +#[cfg(feature = "grpc")] +mod grpc; mod handler; mod log; pub mod middleware; @@ -51,8 +51,8 @@ pub mod prelude { #[cfg(feature = "multipart")] pub use crate::core::form::{FilePart, FormData}; pub use crate::core::{ - path_param::PathParam, request::Request, res_body::full, res_body::stream_body, - res_body::ResBody, response::Response, + path_param::PathParam, req_body::ReqBody, request::Request, res_body::full, + res_body::stream_body, res_body::ResBody, response::Response, }; pub use crate::error::{SilentError, SilentResult as Result}; #[cfg(feature = "static")] @@ -65,7 +65,7 @@ pub mod prelude { #[cfg(feature = "ws")] pub use crate::route::handler_append::WSHandlerAppend; pub use crate::route::handler_append::{HandlerAppend, HandlerGetter}; - pub use crate::route::{Route, RouteService}; + pub use crate::route::{Route, RouteService, RouterAdapt}; #[cfg(feature = "scheduler")] pub use crate::scheduler::Task; #[cfg(feature = "security")] diff --git a/silent/src/route/mod.rs b/silent/src/route/mod.rs index 7979221..f1bd076 100644 --- a/silent/src/route/mod.rs +++ b/silent/src/route/mod.rs @@ -15,6 +15,10 @@ pub use root::RootRoute; use crate::prelude::HandlerGetter; pub use route_service::RouteService; +pub trait RouterAdapt { + fn into_router(self) -> Route; +} + #[derive(Clone)] pub struct Route { pub path: String, @@ -25,6 +29,12 @@ pub struct Route { create_path: String, } +impl RouterAdapt for Route { + fn into_router(self) -> Route { + self + } +} + impl Default for Route { fn default() -> Self { Self::new("") @@ -95,7 +105,8 @@ impl Route { route.get_append_real_route(last_path) } } - pub fn append(mut self, mut route: Route) -> Self { + pub fn append(mut self, route: R) -> Self { + let mut route = route.into_router(); self.middlewares .iter() .cloned() diff --git a/silent/src/route/root.rs b/silent/src/route/root.rs index 77f593e..221a32d 100644 --- a/silent/src/route/root.rs +++ b/silent/src/route/root.rs @@ -7,15 +7,20 @@ use crate::session::SessionMiddleware; use crate::templates::TemplateMiddleware; #[cfg(feature = "scheduler")] use crate::Scheduler; +#[cfg(feature = "grpc")] +use crate::{grpc::GrpcHandler, Handler}; use crate::{ Configs, MiddleWareHandler, MiddlewareResult, Request, Response, SilentError, StatusCode, }; #[cfg(feature = "session")] use async_session::{Session, SessionStore}; use chrono::Utc; +use http::{header, HeaderValue}; +use mime::Mime; use std::fmt; use std::future::Future; use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Arc; #[cfg(feature = "scheduler")] use tokio::sync::Mutex; @@ -30,6 +35,8 @@ pub struct RootRoute { pub(crate) configs: Option, #[cfg(feature = "scheduler")] pub(crate) scheduler: Arc>, + #[cfg(feature = "grpc")] + pub(crate) grpc: Option, } impl fmt::Debug for RootRoute { @@ -55,9 +62,22 @@ impl RootRoute { configs: None, #[cfg(feature = "scheduler")] scheduler: Arc::new(Mutex::new(Scheduler::new())), + #[cfg(feature = "grpc")] + grpc: None, } } + #[cfg(feature = "grpc")] + pub fn grpc(&mut self, grpc: GrpcHandler) { + self.grpc = Some(grpc); + } + + #[cfg(feature = "grpc")] + pub fn with_grpc(mut self, grpc: GrpcHandler) -> Self { + self.grpc = Some(grpc); + self + } + pub fn push(&mut self, route: Route) { self.middlewares.clone_from(&route.middlewares); self.children.push(route); @@ -114,6 +134,28 @@ impl RootRoute { MiddlewareResult::Error(err) => return Err(err), } } + if req.content_type() == Some(Mime::from_str("application/grpc").unwrap()) + || req.headers().get(header::UPGRADE) == Some(&HeaderValue::from_static("h2c")) + { + #[cfg(feature = "grpc")] + { + return if let Some(grpc) = self.grpc.clone() { + grpc.call(req).await + } else { + Err(SilentError::business_error( + StatusCode::NOT_IMPLEMENTED, + "grpc handler not set".to_string(), + )) + }; + } + #[cfg(not(feature = "grpc"))] + { + return Err(SilentError::business_error( + StatusCode::NOT_IMPLEMENTED, + "grpc not enabled".to_string(), + )); + } + } match self.handler_match(&mut req, path.as_str()) { RouteMatched::Matched(route) => match route.handler.get(req.method()) { None => { diff --git a/silent/src/scheduler/storage/traits.rs b/silent/src/scheduler/storage/traits.rs index 1c54d90..4bf1b32 100644 --- a/silent/src/scheduler/storage/traits.rs +++ b/silent/src/scheduler/storage/traits.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; -// 定时任务存储Trait #[allow(dead_code)] +/// 定时任务存储Trait #[async_trait] trait Storage { async fn load(&mut self); diff --git a/silent/src/service/hyper_service.rs b/silent/src/service/hyper_service.rs index b38164f..7912dc8 100644 --- a/silent/src/service/hyper_service.rs +++ b/silent/src/service/hyper_service.rs @@ -2,11 +2,11 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use hyper::body::Incoming; use hyper::service::Service as HyperService; use hyper::{Request as HyperRequest, Response as HyperResponse}; -use crate::core::{adapt::RequestAdapt, res_body::ResBody}; +use crate::core::{adapt::RequestAdapt, adapt::ResponseAdapt, res_body::ResBody}; +use crate::prelude::ReqBody; use crate::route::RootRoute; use crate::{Request, Response}; @@ -36,16 +36,38 @@ impl HyperServiceHandler { } } -impl HyperService> for HyperServiceHandler { +impl HyperService> for HyperServiceHandler +where + B: Into, +{ type Response = HyperResponse; type Error = hyper::Error; type Future = Pin> + Send>>; #[inline] - fn call(&self, req: HyperRequest) -> Self::Future { + fn call(&self, req: HyperRequest) -> Self::Future { let (parts, body) = req.into_parts(); let req = HyperRequest::from_parts(parts, body.into()).tran_to_request(); let response = self.handle(req); - Box::pin(async move { Ok(response.await.into_hyper()) }) + Box::pin(async move { + let res = response.await; + Ok(ResponseAdapt::tran_from_response(res)) + }) + } +} +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_handle_request() { + // Arrange + let remote_addr = "127.0.0.1:8080".parse().unwrap(); + let routes = RootRoute::new(); // Assuming RootRoute::new() creates a new instance of RootRoute + let hsh = HyperServiceHandler::new(remote_addr, routes); + let req = hyper::Request::builder().body(()).unwrap(); // Assuming Request::new() creates a new instance of Request + + // Act + let _ = hsh.call(req).await; } } diff --git a/silent/src/service/mod.rs b/silent/src/service/mod.rs index 926e181..885c3da 100644 --- a/silent/src/service/mod.rs +++ b/silent/src/service/mod.rs @@ -1,21 +1,18 @@ mod hyper_service; mod serve; -use crate::conn::SilentConnection; use crate::route::RouteService; use crate::service::serve::Serve; use crate::Configs; #[cfg(feature = "scheduler")] use crate::Scheduler; use std::net::SocketAddr; -use std::sync::Arc; use tokio::net::TcpListener; use tokio::signal; - +use tokio::task::JoinSet; pub struct Server { addr: Option, listener: Option, - conn: Arc, shutdown_callback: Option>, configs: Option, } @@ -31,7 +28,6 @@ impl Server { Self { addr: None, listener: None, - conn: Arc::new(SilentConnection::default()), shutdown_callback: None, configs: None, } @@ -74,7 +70,6 @@ impl Server { S: RouteService, { let Self { - conn, listener, configs, addr, @@ -107,7 +102,7 @@ impl Server { tokio::spawn(async move { Scheduler::schedule(scheduler).await; }); - + let mut join_set = JoinSet::new(); loop { #[cfg(unix)] let terminate = async { @@ -135,9 +130,8 @@ impl Server { Ok((stream, peer_addr)) => { tracing::info!("Accepting from: {}", stream.peer_addr().unwrap()); let routes = root_route.clone(); - let conn = conn.clone(); - tokio::task::spawn(async move { - if let Err(err) = Serve::new(routes, conn).call(stream,peer_addr).await { + join_set.spawn(async move { + if let Err(err) = Serve::new(routes).call(stream,peer_addr).await { tracing::error!("Failed to serve connection: {:?}", err); } }); diff --git a/silent/src/service/serve.rs b/silent/src/service/serve.rs index 8c0dc0c..7f94c34 100644 --- a/silent/src/service/serve.rs +++ b/silent/src/service/serve.rs @@ -1,30 +1,33 @@ -use crate::conn::support::TokioIo; -use crate::conn::SilentConnection; -use crate::route::RootRoute; -use crate::service::hyper_service::HyperServiceHandler; +use std::error::Error as StdError; use std::net::SocketAddr; -use std::sync::Arc; + +use hyper_util::rt::{TokioExecutor, TokioIo}; +use hyper_util::server::conn::auto::Builder; use tokio::net::TcpStream; -pub(crate) struct Serve { +use crate::route::RootRoute; +use crate::service::hyper_service::HyperServiceHandler; + +pub(crate) struct Serve { pub(crate) routes: RootRoute, - pub(crate) conn: Arc, + pub(crate) builder: Builder, } impl Serve { - pub(crate) fn new(routes: RootRoute, conn: Arc) -> Self { - Self { routes, conn } + pub(crate) fn new(routes: RootRoute) -> Self { + Self { + routes, + builder: Builder::new(TokioExecutor::new()), + } } pub(crate) async fn call( &self, stream: TcpStream, peer_addr: SocketAddr, - ) -> Result<(), hyper::Error> { + ) -> Result<(), Box> { let io = TokioIo::new(stream); - self.conn - .http1 + self.builder .serve_connection(io, HyperServiceHandler::new(peer_addr, self.routes.clone())) - .with_upgrades() .await } } diff --git a/silent/src/ws/websocket.rs b/silent/src/ws/websocket.rs index d89e515..40ceae6 100644 --- a/silent/src/ws/websocket.rs +++ b/silent/src/ws/websocket.rs @@ -1,4 +1,3 @@ -use crate::conn::support::TokioIo; use crate::log::debug; use crate::tokio_tungstenite::tungstenite::protocol; use crate::tokio_tungstenite::WebSocketStream; @@ -11,6 +10,7 @@ use futures_util::sink::{Sink, SinkExt}; use futures_util::stream::{Stream, StreamExt}; use futures_util::{future, ready}; use hyper::upgrade::Upgraded as HyperUpgraded; +use hyper_util::rt::TokioIo; use std::future::Future; use std::pin::Pin; use std::sync::Arc;