Skip to content

Commit

Permalink
Implement close notify in server close
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Jan 27, 2025
1 parent ad8fb54 commit e24fcb5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
13 changes: 10 additions & 3 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
kawa.prepare(&mut kawa::h1::BlockConverter);
debug_kawa(kawa);
let bufs = kawa.as_io_slice();
if bufs.is_empty() {
if bufs.is_empty() && !self.socket.socket_wants_write() {
self.readiness.interest.remove(Ready::WRITABLE);
return MuxResult::Continue;
}
Expand All @@ -157,7 +157,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
parts.metrics.bout += size;
}
}
if update_readiness_after_write(size, status, &mut self.readiness) {
if update_readiness_after_write(size, status, &mut self.readiness)
|| self.socket.socket_wants_write()
{
return MuxResult::Continue;
}

Expand Down Expand Up @@ -272,7 +274,12 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}
Position::Client(_, _, BackendStatus::Connecting(_))
| Position::Client(_, _, BackendStatus::Connected) => {}
Position::Server => unreachable!(),
Position::Server => {
println_!("H1 SENDING CLOSE NOTIFY");
self.socket.socket_close();
let _ = self.socket.socket_write_vectored(&[]);
return;
}
}
// reconnection is handled by the server
let StreamState::Linked(token) = context.streams[self.stream].state else {
Expand Down
15 changes: 13 additions & 2 deletions lib/src/protocol/mux/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
priorities.sort();

println_!("PRIORITIES: {priorities:?}");
let mut socket_write = false;
'outer: for stream_id in priorities {
let global_stream_id = *self.streams.get(stream_id).unwrap();
let stream = &mut context.streams[global_stream_id];
Expand All @@ -621,6 +622,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
debug_kawa(kawa);
}
while !kawa.out.is_empty() {
socket_write = true;
let bufs = kawa.as_io_slice();
let (size, status) = self.socket.socket_write_vectored(&bufs);
kawa.consume(size);
Expand Down Expand Up @@ -670,7 +672,11 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
self.streams.remove(&stream_id).unwrap();
}

if self.expect_write.is_none() {
if self.socket.socket_wants_write() {
if !socket_write {
self.socket.socket_write(&[]);
}
} else if self.expect_write.is_none() {
// We wrote everything
self.readiness.interest.remove(Ready::WRITABLE);
}
Expand Down Expand Up @@ -1045,7 +1051,12 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
match self.position {
Position::Client(_, _, BackendStatus::KeepAlive) => unreachable!(),
Position::Client(..) => {}
Position::Server => unreachable!(),
Position::Server => {
println_!("H2 SENDING CLOSE NOTIFY");
self.socket.socket_close();
let _ = self.socket.socket_write_vectored(&[]);
return;
}
}
// reconnection is handled by the server for each stream separately
for global_stream_id in self.streams.values() {
Expand Down
5 changes: 4 additions & 1 deletion lib/src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16)
fill_default_301_answer(kawa, host, uri);
} else {
fill_default_answer(kawa, code);
context.keep_alive_frontend = false;
}
context.status = Some(code);
stream.state = StreamState::Unlinked;
Expand Down Expand Up @@ -450,7 +451,7 @@ impl<Front: SocketHandler> Connection<Front> {
);
println_!("--------------- CONNECTION CLOSE: {backend_borrow:#?}");
}
Position::Server => todo!(),
Position::Server => {}
}
match self {
Connection::H1(c) => c.close(context, endpoint),
Expand Down Expand Up @@ -1586,6 +1587,8 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
println_!("FRONTEND: {:#?}", self.frontend);
println_!("BACKENDS: {:#?}", self.router.backends);

self.frontend.close(&mut self.context, EndpointClient(&mut self.router));

for (token, client) in &mut self.router.backends {
let proxy_borrow = proxy.borrow();
client.timeout_container().cancel();
Expand Down

0 comments on commit e24fcb5

Please sign in to comment.