Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions web-transport-quiche/src/ez/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,28 @@ impl<M: Metrics> ServerBuilder<M, ServerWithListener> {
}
}

/// A pre-accepted QUIC connection with the TLS handshake already complete.
///
/// The peer address is available before calling [Incoming::accept].
pub struct Incoming {
connection: Connection,
}

impl Incoming {
/// Returns the peer's socket address.
pub fn peer_addr(&self) -> SocketAddr {
self.connection.peer_addr()
}

/// Accept the connection, starting the post-handshake driver.
pub fn accept(self) -> Connection {
self.connection
}
}

/// A QUIC server that accepts new connections.
pub struct Server<M: Metrics = DefaultMetrics> {
accept: mpsc::Receiver<Connection>,
accept: mpsc::Receiver<Incoming>,
local_addrs: Vec<SocketAddr>,
// Cancels socket tasks when dropped.
#[allow(dead_code)]
Expand Down Expand Up @@ -231,7 +250,7 @@ impl<M: Metrics> Server<M> {

async fn run_socket(
socket: tokio_quiche::QuicConnectionStream<M>,
accept: mpsc::Sender<Connection>,
accept: mpsc::Sender<Incoming>,
) -> io::Result<()> {
let mut rx = socket.into_inner();
while let Some(initial) = rx.recv().await {
Expand All @@ -245,19 +264,20 @@ impl<M: Metrics> Server<M> {

let inner = initial.start(session);
let connection = Connection::new(inner, state, accept_bi.1, accept_uni.1);
let incoming = Incoming { connection };

if accept.send(connection).await.is_err() {
if accept.send(incoming).await.is_err() {
return Ok(());
}
}

Ok(())
}

/// Accept a new QUIC [Connection] from a client.
/// Accept a new QUIC [Incoming] from a client.
///
/// Returns `None` when the server is shutting down.
pub async fn accept(&mut self) -> Option<Connection> {
pub async fn accept(&mut self) -> Option<Incoming> {
self.accept.recv().await
}

Expand Down
5 changes: 4 additions & 1 deletion web-transport-quiche/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ impl<M: ez::Metrics> Server<M> {
pub async fn accept(&mut self) -> Option<h3::Request> {
loop {
tokio::select! {
Some(conn) = self.inner.accept() => self.accept.push(Box::pin(h3::Request::accept(conn))),
Some(incoming) = self.inner.accept() => {
let conn = incoming.accept();
self.accept.push(Box::pin(h3::Request::accept(conn)));
}
Some(res) = self.accept.next() => {
match res {
Ok(session) => return Some(session),
Expand Down