Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions pgdog/src/backend/pool/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,65 @@ async fn test_lsn_monitor() {

pool.shutdown();
}

/// If a client disconnects while the backend still has protocol messages
/// queued for it, but the high-level stats state is Idle, the connection must
/// still be treated as needing a drain. Otherwise the connection is checked
/// back into the pool with a non-empty protocol queue and will be "tainted"
/// for the next client.
#[tokio::test]
async fn test_abrupt_disconnect_with_pending_protocol_state_requires_drain() {
crate::logger();

let pool = pool();
let mut guard = pool.get(&Request::default()).await.unwrap();

guard
.send(
&vec![
ProtocolMessage::from(Query::new("SELECT 1")),
ProtocolMessage::from(Query::new("SELECT 1")),
Comment on lines +672 to +673
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Holding off on this PR because stuffing the ClientRequest like this doesn't seem to be a realistic reproduction. Addressing some of the out-of-sync side effects in #668 instead.

]
.into(),
)
.await
.unwrap();

// Consume only the RowDescription ('T'), leaving the remainder of the
// response (D, C, Z) unread. At this point the protocol state still
// expects a ReadyForQuery, so has_more_messages() is true.
for c in ['T', 'D', 'C', 'Z'] {
let msg = guard.read().await.unwrap();
assert_eq!(msg.code(), c);
}
assert!(
guard.prepared_statements().state().has_more_messages(),
"protocol state should have pending messages after partial read"
);
assert_eq!(guard.stats().state, State::Idle);

// Client disconnects, guard is dropped, runs Guard::cleanup. needs_drain()
// must report that there are pending protocol messages it's expecting,
// otherwise the connection is checked back into the pool with a non-empty
// protocol queue.
drop(guard);

// Re-acquire the same connection from the pool. If this
// assertion ever fails, the connection has been returned to the pool
// in a tainted state.
let mut guard = pool.get(&Request::default()).await.unwrap();
assert!(
!guard.prepared_statements().state().has_more_messages(),
"connection should not have pending protocol messages after cleanup"
);

// Run a fresh query and assert we get a valid result back; if the
// connection had been returned to the pool without a drain, this is
// where stray CommandComplete/ReadyForQuery messages from the previous
// query would surface and break the protocol. Each query would receive the
// prior query's results (off by 1).
let rows: Vec<i32> = guard.fetch_all("SELECT 2").await.unwrap();
assert_eq!(rows, vec![2]);
let rows: Vec<i32> = guard.fetch_all("SELECT 3").await.unwrap();
assert_eq!(rows, vec![3]);
}
31 changes: 29 additions & 2 deletions pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ impl Server {
)
}

/// Server is done executing all queries and isz
/// Server is done executing all queries and is
/// not inside a transaction.
pub fn can_check_in(&self) -> bool {
self.stats().state == State::Idle
Expand Down Expand Up @@ -602,7 +602,17 @@ impl Server {

/// Connection was left with an unfinished query.
pub fn needs_drain(&self) -> bool {
!self.in_sync()
// We need to drain whenever the protocol state indicates that there
// are still tracked messages expected from the server *or* the high‑
// level connection state is not one of the "in sync" states.
//
// This is intentionally more conservative than just checking
// `in_sync()`: if a client disappears after we've enqueued expected
// backend responses (via `PreparedStatements::handle`) but before we
// see a new ReadyForQuery, the stats state can still be Idle while the
// protocol queue has pending items. In that case we must treat the
// connection as needing a drain before it is safe to reuse.
self.has_more_messages() || !self.in_sync()
}

/// Close the connection, don't do any recovery.
Expand Down Expand Up @@ -1771,6 +1781,23 @@ pub mod test {
}
}

#[test]
fn test_needs_drain_when_protocol_queue_not_empty_even_if_idle() {
use crate::state::State;

let mut server = Server::default();
assert_eq!(server.stats().state, State::Idle);
assert!(server.in_sync());
assert!(!server.has_more_messages());
assert!(!server.needs_drain());

server.prepared_statements_mut().state_mut().add('Z'); // expect a ReadyForQuery at some point

assert_eq!(server.stats().state, State::Idle);
assert!(server.prepared_statements().state().has_more_messages());
assert!(server.needs_drain());
}

#[tokio::test]
async fn test_partial_state() -> Result<(), Box<dyn std::error::Error>> {
crate::logger();
Expand Down