diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index b296467f4..72dbad78e 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -141,7 +141,7 @@ impl ConnectionH1 { kawa.prepare(&mut kawa::h1::BlockConverter); debug_kawa(kawa); let bufs = kawa.as_io_slice(); - if bufs.is_empty() { + if bufs.is_empty() && !self.socket.socket_wants_write() { self.readiness.interest.remove(Ready::WRITABLE); return MuxResult::Continue; } @@ -157,7 +157,9 @@ impl ConnectionH1 { parts.metrics.bout += size; } } - if update_readiness_after_write(size, status, &mut self.readiness) { + if update_readiness_after_write(size, status, &mut self.readiness) + || self.socket.socket_wants_write() + { return MuxResult::Continue; } @@ -272,7 +274,12 @@ impl ConnectionH1 { } Position::Client(_, _, BackendStatus::Connecting(_)) | Position::Client(_, _, BackendStatus::Connected) => {} - Position::Server => unreachable!(), + Position::Server => { + println_!("H1 SENDING CLOSE NOTIFY"); + self.socket.socket_close(); + let _ = self.socket.socket_write_vectored(&[]); + return; + } } // reconnection is handled by the server let StreamState::Linked(token) = context.streams[self.stream].state else { diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 411fc4d49..22a7b33a4 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -605,6 +605,7 @@ impl ConnectionH2 { priorities.sort(); println_!("PRIORITIES: {priorities:?}"); + let mut socket_write = false; 'outer: for stream_id in priorities { let global_stream_id = *self.streams.get(stream_id).unwrap(); let stream = &mut context.streams[global_stream_id]; @@ -621,6 +622,7 @@ impl ConnectionH2 { debug_kawa(kawa); } while !kawa.out.is_empty() { + socket_write = true; let bufs = kawa.as_io_slice(); let (size, status) = self.socket.socket_write_vectored(&bufs); kawa.consume(size); @@ -670,7 +672,11 @@ impl ConnectionH2 { self.streams.remove(&stream_id).unwrap(); } - if self.expect_write.is_none() { + if self.socket.socket_wants_write() { + if !socket_write { + self.socket.socket_write(&[]); + } + } else if self.expect_write.is_none() { // We wrote everything self.readiness.interest.remove(Ready::WRITABLE); } @@ -1045,7 +1051,12 @@ impl ConnectionH2 { match self.position { Position::Client(_, _, BackendStatus::KeepAlive) => unreachable!(), Position::Client(..) => {} - Position::Server => unreachable!(), + Position::Server => { + println_!("H2 SENDING CLOSE NOTIFY"); + self.socket.socket_close(); + let _ = self.socket.socket_write_vectored(&[]); + return; + } } // reconnection is handled by the server for each stream separately for global_stream_id in self.streams.values() { diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 377ee1a27..f41e8fa37 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -151,6 +151,7 @@ fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16) fill_default_301_answer(kawa, host, uri); } else { fill_default_answer(kawa, code); + context.keep_alive_frontend = false; } context.status = Some(code); stream.state = StreamState::Unlinked; @@ -450,7 +451,7 @@ impl Connection { ); println_!("--------------- CONNECTION CLOSE: {backend_borrow:#?}"); } - Position::Server => todo!(), + Position::Server => {} } match self { Connection::H1(c) => c.close(context, endpoint), @@ -1586,6 +1587,8 @@ impl