diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index f8039cc77..e26528d5c 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -208,6 +208,11 @@ impl ProtocolState { &self.queue } + #[cfg(test)] + pub(crate) fn queue_mut(&mut self) -> &mut VecDeque { + &mut self.queue + } + pub(crate) fn done(&self) -> bool { self.is_empty() && !self.out_of_sync } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index a7f67030b..55a63486b 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -363,6 +363,13 @@ impl Server { } } Err(err) => { + match err { + Error::ProtocolOutOfSync => { + // conservatively, we do not know for sure if this is recoverable + self.stats.state(State::Error); + } + _ => {} + } error!( "{:?} got: {}, extended buffer: {:?}, state: {}", err, @@ -986,7 +993,7 @@ impl Drop for Server { pub mod test { use crate::{config::Memory, frontend::PreparedStatements, net::*}; - use super::*; + use super::{Error, *}; impl Default for Server { fn default() -> Self { @@ -2395,4 +2402,36 @@ pub mod test { "sync_prepared flag should remain false after regular queries" ); } + + #[tokio::test] + async fn test_protocol_out_of_sync_sets_error_state() { + let mut server = test_server().await; + + server + .send(&vec![Query::new("SELECT 1").into()].into()) + .await + .unwrap(); + + for c in ['T', 'D'] { + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), c); + } + + // simulate an unlikely, but existent out-of-sync state + server + .prepared_statements_mut() + .state_mut() + .queue_mut() + .clear(); + + let res = server.read().await; + assert!( + matches!(res, Err(Error::ProtocolOutOfSync)), + "protocol should be out of sync" + ); + assert!( + server.stats().state == State::Error, + "state should be Error after detecting desync" + ) + } }