From ba31f33e0d5bda1b1c0e26d5b369dfe5af167fed Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Wed, 20 Aug 2025 13:20:41 -0400 Subject: [PATCH 01/15] wip --- pgdog/src/admin/backend.rs | 4 +-- pgdog/src/backend/pool/connection/binding.rs | 2 +- .../src/backend/pool/connection/mirror/mod.rs | 4 +-- pgdog/src/backend/pool/connection/mod.rs | 4 +-- .../logical/subscriber/parallel_connection.rs | 4 +-- pgdog/src/backend/server.rs | 4 +-- pgdog/src/frontend/client/mod.rs | 26 +++++++++---------- .../frontend/client/query_engine/context.rs | 8 +++--- pgdog/src/frontend/client/query_engine/mod.rs | 3 +-- pgdog/src/frontend/client/test/mod.rs | 6 ++--- pgdog/src/frontend/client/timeouts.rs | 4 +-- .../frontend/{buffer.rs => client_request.rs} | 20 +++++++------- pgdog/src/frontend/mod.rs | 4 +-- pgdog/src/frontend/query_logger.rs | 6 ++--- pgdog/src/frontend/router/context.rs | 4 +-- pgdog/src/frontend/router/mod.rs | 4 +-- pgdog/src/frontend/router/parser/command.rs | 2 +- pgdog/src/frontend/router/parser/context.rs | 2 +- .../frontend/router/parser/query/explain.rs | 6 ++--- pgdog/src/frontend/router/parser/query/mod.rs | 2 +- .../src/frontend/router/parser/query/show.rs | 6 ++--- .../src/frontend/router/parser/query/test.rs | 19 +++++++------- 22 files changed, 72 insertions(+), 72 deletions(-) rename pgdog/src/frontend/{buffer.rs => client_request.rs} (92%) diff --git a/pgdog/src/admin/backend.rs b/pgdog/src/admin/backend.rs index 1773c38c6..33237bf9a 100644 --- a/pgdog/src/admin/backend.rs +++ b/pgdog/src/admin/backend.rs @@ -6,7 +6,7 @@ use std::time::Duration; use tokio::time::sleep; use tracing::debug; -use crate::frontend::Buffer; +use crate::frontend::ClientRequest; use crate::net::messages::command_complete::CommandComplete; use crate::net::messages::{ErrorResponse, FromBytes, Protocol, Query, ReadyForQuery}; use crate::net::ProtocolMessage; @@ -37,7 +37,7 @@ impl Backend { } /// Handle command. - pub async fn send(&mut self, messages: &Buffer) -> Result<(), Error> { + pub async fn send(&mut self, messages: &ClientRequest) -> Result<(), Error> { let message = messages.first().ok_or(Error::Empty)?; let message: ProtocolMessage = message.clone(); diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 9bbea4fe4..c5a538706 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -113,7 +113,7 @@ impl Binding { } /// Send an entire buffer of messages to the servers(s). - pub async fn send(&mut self, messages: &crate::frontend::Buffer) -> Result<(), Error> { + pub async fn send(&mut self, messages: &crate::frontend::ClientRequest) -> Result<(), Error> { match self { Binding::Server(server) => { if let Some(server) = server { diff --git a/pgdog/src/backend/pool/connection/mirror/mod.rs b/pgdog/src/backend/pool/connection/mirror/mod.rs index 6a560498a..d0b7c4d13 100644 --- a/pgdog/src/backend/pool/connection/mirror/mod.rs +++ b/pgdog/src/backend/pool/connection/mirror/mod.rs @@ -16,7 +16,7 @@ use crate::frontend::comms::comms; use crate::frontend::PreparedStatements; use crate::net::{Parameter, Parameters, Stream}; -use crate::frontend::Buffer; +use crate::frontend::ClientRequest; use super::Error; @@ -27,7 +27,7 @@ pub use handler::*; #[derive(Clone, Debug)] pub struct BufferWithDelay { delay: Duration, - buffer: Buffer, + buffer: ClientRequest, } #[derive(Clone, Debug)] diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index 21e0d3c9e..dbe0e6f02 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -117,7 +117,7 @@ impl Connection { } /// Send client request to mirrors. - pub fn mirror(&mut self, buffer: &crate::frontend::Buffer) { + pub fn mirror(&mut self, buffer: &crate::frontend::ClientRequest) { for mirror in &mut self.mirrors { mirror.send(buffer); } @@ -274,7 +274,7 @@ impl Connection { /// Send buffer in a potentially sharded context. pub(crate) async fn handle_buffer( &mut self, - messages: &crate::frontend::Buffer, + messages: &crate::frontend::ClientRequest, router: &mut Router, streaming: bool, ) -> Result<(), Error> { diff --git a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs index f93af5602..144495de2 100644 --- a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs +++ b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs @@ -12,7 +12,7 @@ use tokio::sync::{ use crate::{ backend::Server, - frontend::Buffer, + frontend::ClientRequest, net::{Message, ProtocolMessage}, }; @@ -57,7 +57,7 @@ impl ParallelConnection { } // Queue up the contents of the buffer. - pub async fn send(&mut self, buffer: &Buffer) -> Result<(), Error> { + pub async fn send(&mut self, buffer: &ClientRequest) -> Result<(), Error> { for message in buffer.iter() { self.tx .send(ParallelMessage::ProtocolMessage(message.clone())) diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index b5dd99a1a..5e312aac7 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -17,7 +17,7 @@ use super::{ }; use crate::{ auth::{md5, scram::Client}, - frontend::Buffer, + frontend::ClientRequest, net::{ messages::{ hello::SslReply, Authentication, BackendKeyData, ErrorResponse, FromBytes, Message, @@ -282,7 +282,7 @@ impl Server { } /// Send messages to the server and flush the buffer. - pub async fn send(&mut self, messages: &Buffer) -> Result<(), Error> { + pub async fn send(&mut self, messages: &ClientRequest) -> Result<(), Error> { self.stats.state(State::Active); for message in messages.iter() { diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 2a512633c..fe5d78693 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -9,7 +9,7 @@ use tokio::time::timeout; use tokio::{select, spawn}; use tracing::{debug, enabled, error, info, trace, Level as LogLevel}; -use super::{Buffer, Comms, Error, PreparedStatements}; +use super::{ClientRequest, Comms, Error, PreparedStatements}; use crate::auth::{md5, scram::Server}; use crate::backend::{ databases, @@ -45,7 +45,7 @@ pub struct Client { prepared_statements: PreparedStatements, transaction: Option, timeouts: Timeouts, - request_buffer: Buffer, + client_request: ClientRequest, stream_buffer: BytesMut, cross_shard_disabled: bool, passthrough_password: Option, @@ -70,7 +70,7 @@ impl MemoryUsage for Client { + self.prepared_statements.memory_used() + std::mem::size_of::() + self.stream_buffer.memory_usage() - + self.request_buffer.memory_usage() + + self.client_request.memory_usage() + self .passthrough_password .as_ref() @@ -215,7 +215,7 @@ impl Client { prepared_statements: PreparedStatements::new(), transaction: None, timeouts: Timeouts::from_config(&config.config.general), - request_buffer: Buffer::new(), + client_request: ClientRequest::new(), stream_buffer: BytesMut::new(), shutdown: false, cross_shard_disabled: false, @@ -256,7 +256,7 @@ impl Client { admin: false, transaction: None, timeouts: Timeouts::from_config(&config().config.general), - request_buffer: Buffer::new(), + client_request: ClientRequest::new(), stream_buffer: BytesMut::new(), shutdown: false, cross_shard_disabled: false, @@ -312,7 +312,7 @@ impl Client { buffer = self.buffer(client_state) => { let event = buffer?; - if !self.request_buffer.is_empty() { + if !self.client_request.is_empty() { self.client_messages(&mut query_engine).await?; } @@ -366,7 +366,7 @@ impl Client { /// This ensures we don't check out a connection from the pool until the client /// sent a complete request. async fn buffer(&mut self, state: State) -> Result { - self.request_buffer.clear(); + self.client_request.clear(); // Only start timer once we receive the first message. let mut timer = None; @@ -379,10 +379,10 @@ impl Client { self.timeouts = Timeouts::from_config(&config.config.general); self.cross_shard_disabled = config.config.general.cross_shard_disabled; - while !self.request_buffer.full() { + while !self.client_request.full() { let idle_timeout = self .timeouts - .client_idle_timeout(&state, &self.request_buffer); + .client_idle_timeout(&state, &self.client_request); let message = match timeout(idle_timeout, self.stream.read_buf(&mut self.stream_buffer)).await { @@ -408,10 +408,10 @@ impl Client { } else { let message = ProtocolMessage::from_bytes(message.to_bytes()?)?; if message.extended() && self.prepared_statements.enabled { - self.request_buffer + self.client_request .push(self.prepared_statements.maybe_rewrite(message)?); } else { - self.request_buffer.push(message); + self.client_request.push(message); } } } @@ -420,7 +420,7 @@ impl Client { debug!( "request buffered [{:.4}ms] {:?}", timer.unwrap().elapsed().as_secs_f64() * 1000.0, - self.request_buffer + self.client_request .iter() .map(|m| m.code()) .collect::>(), @@ -429,7 +429,7 @@ impl Client { trace!( "request buffered [{:.4}ms]\n{:#?}", timer.unwrap().elapsed().as_secs_f64() * 1000.0, - self.request_buffer, + self.client_request, ); } diff --git a/pgdog/src/frontend/client/query_engine/context.rs b/pgdog/src/frontend/client/query_engine/context.rs index 6c075a6d8..5641dda9c 100644 --- a/pgdog/src/frontend/client/query_engine/context.rs +++ b/pgdog/src/frontend/client/query_engine/context.rs @@ -2,7 +2,7 @@ use crate::{ backend::pool::connection::mirror::Mirror, frontend::{ client::{timeouts::Timeouts, TransactionType}, - Buffer, Client, PreparedStatements, + Client, ClientRequest, PreparedStatements, }, net::{Parameters, Stream}, stats::memory::MemoryUsage, @@ -15,7 +15,7 @@ pub struct QueryEngineContext<'a> { /// Client session parameters. pub(super) params: &'a mut Parameters, /// Request - pub(super) buffer: &'a mut Buffer, + pub(super) buffer: &'a mut ClientRequest, /// Client's socket to send responses to. pub(super) stream: &'a mut Stream, /// Client in transaction? @@ -35,7 +35,7 @@ impl<'a> QueryEngineContext<'a> { Self { prepared_statements: &mut client.prepared_statements, params: &mut client.params, - buffer: &mut client.request_buffer, + buffer: &mut client.client_request, stream: &mut client.stream, transaction: client.transaction, timeouts: client.timeouts, @@ -45,7 +45,7 @@ impl<'a> QueryEngineContext<'a> { } /// Create context from mirror. - pub fn new_mirror(mirror: &'a mut Mirror, buffer: &'a mut Buffer) -> Self { + pub fn new_mirror(mirror: &'a mut Mirror, buffer: &'a mut ClientRequest) -> Self { Self { prepared_statements: &mut mirror.prepared_statements, params: &mut mirror.params, diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index 312facef4..e27a3da9a 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -1,9 +1,8 @@ use crate::{ backend::pool::{Connection, Request}, frontend::{ - buffer::BufferedQuery, router::{parser::Shard, Route}, - Client, Command, Comms, Error, Router, RouterContext, Stats, + BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats, }, net::{BackendKeyData, ErrorResponse, Message, Parameters}, state::State, diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index ae908019f..3f52a75e8 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -124,13 +124,13 @@ async fn test_test_client() { conn.write_all(&query).await.unwrap(); client.buffer(State::Idle).await.unwrap(); - assert_eq!(client.request_buffer.total_message_len(), query.len()); + assert_eq!(client.client_request.total_message_len(), query.len()); client.client_messages(&mut engine).await.unwrap(); assert!(client.transaction.is_none()); assert_eq!(engine.stats().state, State::Active); // Buffer not cleared yet. - assert_eq!(client.request_buffer.total_message_len(), query.len()); + assert_eq!(client.client_request.total_message_len(), query.len()); assert!(engine.backend().connected()); // let command = engine @@ -399,7 +399,7 @@ async fn test_abrupt_disconnect() { let event = client.buffer(State::Idle).await.unwrap(); assert_eq!(event, BufferEvent::DisconnectAbrupt); - assert!(client.request_buffer.is_empty()); + assert!(client.client_request.is_empty()); // Client disconnects and returns gracefully. let (conn, mut client, _) = new_client!(false); diff --git a/pgdog/src/frontend/client/timeouts.rs b/pgdog/src/frontend/client/timeouts.rs index 2d2d6b287..57b75b7b0 100644 --- a/pgdog/src/frontend/client/timeouts.rs +++ b/pgdog/src/frontend/client/timeouts.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::{config::General, frontend::Buffer, state::State}; +use crate::{config::General, frontend::ClientRequest, state::State}; #[derive(Debug, Clone, Copy)] pub struct Timeouts { @@ -35,7 +35,7 @@ impl Timeouts { } #[inline] - pub(crate) fn client_idle_timeout(&self, state: &State, buffer: &Buffer) -> Duration { + pub(crate) fn client_idle_timeout(&self, state: &State, buffer: &ClientRequest) -> Duration { match state { State::Idle => { if buffer.is_empty() { diff --git a/pgdog/src/frontend/buffer.rs b/pgdog/src/frontend/client_request.rs similarity index 92% rename from pgdog/src/frontend/buffer.rs rename to pgdog/src/frontend/client_request.rs index be7c4008c..e739ed544 100644 --- a/pgdog/src/frontend/buffer.rs +++ b/pgdog/src/frontend/client_request.rs @@ -14,11 +14,11 @@ use super::PreparedStatements; /// Message buffer. #[derive(Debug, Clone)] -pub struct Buffer { +pub struct ClientRequest { buffer: Vec, } -impl MemoryUsage for Buffer { +impl MemoryUsage for ClientRequest { #[inline] fn memory_usage(&self) -> usize { // ProtocolMessage uses memory allocated by BytesMut (mostly). @@ -26,13 +26,13 @@ impl MemoryUsage for Buffer { } } -impl Default for Buffer { +impl Default for ClientRequest { fn default() -> Self { Self::new() } } -impl Buffer { +impl ClientRequest { /// Create new buffer. pub fn new() -> Self { Self { @@ -159,19 +159,19 @@ impl Buffer { } } -impl From for Vec { - fn from(val: Buffer) -> Self { +impl From for Vec { + fn from(val: ClientRequest) -> Self { val.buffer } } -impl From> for Buffer { +impl From> for ClientRequest { fn from(value: Vec) -> Self { - Buffer { buffer: value } + ClientRequest { buffer: value } } } -impl Deref for Buffer { +impl Deref for ClientRequest { type Target = Vec; fn deref(&self) -> &Self::Target { @@ -179,7 +179,7 @@ impl Deref for Buffer { } } -impl DerefMut for Buffer { +impl DerefMut for ClientRequest { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.buffer } diff --git a/pgdog/src/frontend/mod.rs b/pgdog/src/frontend/mod.rs index 2d2a66643..bff961d8c 100644 --- a/pgdog/src/frontend/mod.rs +++ b/pgdog/src/frontend/mod.rs @@ -1,8 +1,8 @@ //! pgDog frontend manages connections to clients. -pub mod buffer; pub mod buffered_query; pub mod client; +pub mod client_request; pub mod comms; pub mod connected_client; pub mod error; @@ -15,9 +15,9 @@ pub mod query_logger; pub mod router; pub mod stats; -pub use buffer::Buffer; pub use buffered_query::BufferedQuery; pub use client::Client; +pub use client_request::ClientRequest; pub use comms::Comms; pub use connected_client::ConnectedClient; pub use error::Error; diff --git a/pgdog/src/frontend/query_logger.rs b/pgdog/src/frontend/query_logger.rs index a5c9b0c3d..413a7aaf1 100644 --- a/pgdog/src/frontend/query_logger.rs +++ b/pgdog/src/frontend/query_logger.rs @@ -6,16 +6,16 @@ use tokio::{fs::OpenOptions, io::AsyncWriteExt}; use crate::config::config; -use super::{Buffer, Error}; +use super::{ClientRequest, Error}; /// Log queries. pub struct QueryLogger<'a> { - buffer: &'a Buffer, + buffer: &'a ClientRequest, } impl<'a> QueryLogger<'a> { /// Create new query logger. - pub fn new(buffer: &'a Buffer) -> Self { + pub fn new(buffer: &'a ClientRequest) -> Self { Self { buffer } } diff --git a/pgdog/src/frontend/router/context.rs b/pgdog/src/frontend/router/context.rs index 302c8c70b..1f4e4b53f 100644 --- a/pgdog/src/frontend/router/context.rs +++ b/pgdog/src/frontend/router/context.rs @@ -1,7 +1,7 @@ use super::Error; use crate::{ backend::Cluster, - frontend::{buffer::BufferedQuery, client::TransactionType, Buffer, PreparedStatements}, + frontend::{client::TransactionType, BufferedQuery, ClientRequest, PreparedStatements}, net::{Bind, Parameters}, }; @@ -27,7 +27,7 @@ pub struct RouterContext<'a> { impl<'a> RouterContext<'a> { pub fn new( - buffer: &'a Buffer, + buffer: &'a ClientRequest, cluster: &'a Cluster, stmt: &'a mut PreparedStatements, params: &'a Parameters, diff --git a/pgdog/src/frontend/router/mod.rs b/pgdog/src/frontend/router/mod.rs index 399d00df5..f9a451956 100644 --- a/pgdog/src/frontend/router/mod.rs +++ b/pgdog/src/frontend/router/mod.rs @@ -14,7 +14,7 @@ use lazy_static::lazy_static; use parser::Shard; pub use parser::{Command, QueryParser, Route}; -use super::Buffer; +use super::ClientRequest; pub use context::RouterContext; pub use search_path::SearchPath; pub use sharding::{Lists, Ranges}; @@ -62,7 +62,7 @@ impl Router { } /// Parse CopyData messages and shard them. - pub fn copy_data(&mut self, buffer: &Buffer) -> Result, Error> { + pub fn copy_data(&mut self, buffer: &ClientRequest) -> Result, Error> { match self.latest_command { Command::Copy(ref mut copy) => Ok(copy.shard(&buffer.copy_data()?)?), _ => Ok(buffer diff --git a/pgdog/src/frontend/router/parser/command.rs b/pgdog/src/frontend/router/parser/command.rs index 4bf495445..40c2fd28a 100644 --- a/pgdog/src/frontend/router/parser/command.rs +++ b/pgdog/src/frontend/router/parser/command.rs @@ -1,5 +1,5 @@ use super::*; -use crate::{frontend::buffer::BufferedQuery, net::parameter::ParameterValue}; +use crate::{frontend::BufferedQuery, net::parameter::ParameterValue}; use lazy_static::lazy_static; #[derive(Debug, Clone)] diff --git a/pgdog/src/frontend/router/parser/context.rs b/pgdog/src/frontend/router/parser/context.rs index db5b1ab18..977c255f5 100644 --- a/pgdog/src/frontend/router/parser/context.rs +++ b/pgdog/src/frontend/router/parser/context.rs @@ -6,7 +6,7 @@ use pgdog_plugin::{PdRouterContext, PdStatement}; use crate::{ backend::ShardingSchema, config::{config, MultiTenant, ReadWriteStrategy}, - frontend::{buffer::BufferedQuery, PreparedStatements, RouterContext}, + frontend::{BufferedQuery, PreparedStatements, RouterContext}, }; use super::Error; diff --git a/pgdog/src/frontend/router/parser/query/explain.rs b/pgdog/src/frontend/router/parser/query/explain.rs index 4f761ce91..25a37bd13 100644 --- a/pgdog/src/frontend/router/parser/query/explain.rs +++ b/pgdog/src/frontend/router/parser/query/explain.rs @@ -28,13 +28,13 @@ mod tests { use super::*; use crate::backend::Cluster; - use crate::frontend::{Buffer, PreparedStatements, RouterContext}; + use crate::frontend::{ClientRequest, PreparedStatements, RouterContext}; use crate::net::messages::{Bind, Parameter, Parse, Query}; use crate::net::Parameters; // Helper function to route a plain SQL statement and return its `Route`. fn route(sql: &str) -> Route { - let buffer = Buffer::from(vec![Query::new(sql).into()]); + let buffer = ClientRequest::from(vec![Query::new(sql).into()]); let cluster = Cluster::new_test(); let mut stmts = PreparedStatements::default(); @@ -60,7 +60,7 @@ mod tests { .collect::>(); let bind = Bind::new_params("", ¶meters); - let buffer: Buffer = vec![parse_msg.into(), bind.into()].into(); + let buffer: ClientRequest = vec![parse_msg.into(), bind.into()].into(); let cluster = Cluster::new_test(); let mut stmts = PreparedStatements::default(); diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index d1ed4124c..06a297420 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -4,13 +4,13 @@ use std::collections::HashSet; use crate::{ backend::{databases::databases, ShardingSchema}, frontend::{ - buffer::BufferedQuery, router::{ context::RouterContext, parser::{rewrite::Rewrite, OrderBy, Shard}, round_robin, sharding::{Centroids, ContextBuilder, Value as ShardingValue}, }, + BufferedQuery, }, net::{ messages::{Bind, Vector}, diff --git a/pgdog/src/frontend/router/parser/query/show.rs b/pgdog/src/frontend/router/parser/query/show.rs index 7ad7836c8..1e34bc90a 100644 --- a/pgdog/src/frontend/router/parser/query/show.rs +++ b/pgdog/src/frontend/router/parser/query/show.rs @@ -24,7 +24,7 @@ mod test_show { use crate::backend::Cluster; use crate::frontend::router::parser::Shard; use crate::frontend::router::QueryParser; - use crate::frontend::{Buffer, PreparedStatements, RouterContext}; + use crate::frontend::{ClientRequest, PreparedStatements, RouterContext}; use crate::net::messages::Query; use crate::net::Parameters; @@ -37,7 +37,7 @@ mod test_show { // First call let query = "SHOW TRANSACTION ISOLATION LEVEL"; - let buffer = Buffer::from(vec![Query::new(query).into()]); + let buffer = ClientRequest::from(vec![Query::new(query).into()]); let context = RouterContext::new(&buffer, &c, &mut ps, &p, None).unwrap(); let first = parser.parse(context).unwrap().clone(); @@ -46,7 +46,7 @@ mod test_show { // Second call let query = "SHOW TRANSACTION ISOLATION LEVEL"; - let buffer = Buffer::from(vec![Query::new(query).into()]); + let buffer = ClientRequest::from(vec![Query::new(query).into()]); let context = RouterContext::new(&buffer, &c, &mut ps, &p, None).unwrap(); let second = parser.parse(context).unwrap().clone(); diff --git a/pgdog/src/frontend/router/parser/query/test.rs b/pgdog/src/frontend/router/parser/query/test.rs index b3834866a..31efc5b24 100644 --- a/pgdog/src/frontend/router/parser/query/test.rs +++ b/pgdog/src/frontend/router/parser/query/test.rs @@ -9,7 +9,7 @@ use crate::{ use super::{super::Shard, *}; use crate::backend::Cluster; use crate::config::ReadWriteStrategy; -use crate::frontend::{Buffer, PreparedStatements, RouterContext}; +use crate::frontend::{ClientRequest, PreparedStatements, RouterContext}; use crate::net::messages::Query; use crate::net::Parameters; @@ -17,11 +17,12 @@ macro_rules! command { ($query:expr) => {{ let query = $query; let mut query_parser = QueryParser::default(); - let buffer = Buffer::from(vec![Query::new(query).into()]); + let client_request = ClientRequest::from(vec![Query::new(query).into()]); let cluster = Cluster::new_test(); let mut stmt = PreparedStatements::default(); let params = Parameters::default(); - let context = RouterContext::new(&buffer, &cluster, &mut stmt, ¶ms, None).unwrap(); + let context = + RouterContext::new(&client_request, &cluster, &mut stmt, ¶ms, None).unwrap(); let command = query_parser.parse(context).unwrap().clone(); (command, query_parser) @@ -46,7 +47,7 @@ macro_rules! query_parser { let cluster = $cluster; let mut prep_stmts = PreparedStatements::default(); let params = Parameters::default(); - let buffer: Buffer = vec![$query.into()].into(); + let client_request: ClientRequest = vec![$query.into()].into(); let maybe_transaction = if $in_transaction { Some(TransactionType::ReadWrite) @@ -55,7 +56,7 @@ macro_rules! query_parser { }; let router_context = RouterContext::new( - &buffer, + &client_request, &cluster, &mut prep_stmts, ¶ms, @@ -89,7 +90,7 @@ macro_rules! parse { let route = QueryParser::default() .parse( RouterContext::new( - &Buffer::from(vec![parse.into(), bind.into()]), + &ClientRequest::from(vec![parse.into(), bind.into()]), &Cluster::new_test(), &mut PreparedStatements::default(), &Parameters::default(), @@ -253,7 +254,7 @@ fn test_set() { _ => panic!("search path"), } - let buffer: Buffer = vec![Query::new(r#"SET statement_timeout TO 1"#).into()].into(); + let buffer: ClientRequest = vec![Query::new(r#"SET statement_timeout TO 1"#).into()].into(); let cluster = Cluster::new_test(); let mut prep_stmts = PreparedStatements::default(); let params = Parameters::default(); @@ -376,7 +377,7 @@ fn test_function_begin() { let cluster = Cluster::new_test(); let mut prep_stmts = PreparedStatements::default(); let params = Parameters::default(); - let buffer: Buffer = vec![Query::new( + let buffer: ClientRequest = vec![Query::new( "SELECT ROW(t1.*) AS tt1, ROW(t2.*) AS tt2 @@ -448,7 +449,7 @@ fn test_close_direct_one_shard() { let cluster = Cluster::new_test_single_shard(); let mut qp = QueryParser::default(); - let buf: Buffer = vec![Close::named("test").into(), Sync.into()].into(); + let buf: ClientRequest = vec![Close::named("test").into(), Sync.into()].into(); let mut pp = PreparedStatements::default(); let params = Parameters::default(); let transaction = None; From 18efdff5b62ea5d7083f11fe5ad98b5a312cc70b Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Wed, 20 Aug 2025 17:25:22 -0400 Subject: [PATCH 02/15] wip --- pgdog/src/backend/pool/connection/binding.rs | 18 +++++++-- .../backend/pool/connection/mirror/handler.rs | 2 +- pgdog/src/backend/pool/connection/mod.rs | 14 +++---- .../frontend/client/query_engine/context.rs | 6 +-- .../query_engine/incomplete_requests.rs | 6 +-- pgdog/src/frontend/client/query_engine/mod.rs | 7 ++-- .../src/frontend/client/query_engine/query.rs | 6 +-- .../client/query_engine/route_query.rs | 4 +- pgdog/src/frontend/client_request.rs | 38 +++++++++---------- 9 files changed, 56 insertions(+), 45 deletions(-) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index c5a538706..12894ad35 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -1,6 +1,7 @@ //! Binding between frontend client and a connection on the backend. use crate::{ + frontend::ClientRequest, net::{parameter::Parameters, ProtocolMessage}, state::State, }; @@ -113,20 +114,29 @@ impl Binding { } /// Send an entire buffer of messages to the servers(s). - pub async fn send(&mut self, messages: &crate::frontend::ClientRequest) -> Result<(), Error> { + pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { + println!(); + println!(); + println!(); + println!("ClientRequesto {:#?}", client_request); + println!(); + println!(); + println!(); + match self { + Binding::Admin(backend) => Ok(backend.send(client_request).await?), + Binding::Server(server) => { if let Some(server) = server { - server.send(messages).await + server.send(client_request).await } else { Err(Error::NotConnected) } } - Binding::Admin(backend) => Ok(backend.send(messages).await?), Binding::MultiShard(servers, _state) => { for server in servers.iter_mut() { - server.send(messages).await?; + server.send(client_request).await?; } Ok(()) diff --git a/pgdog/src/backend/pool/connection/mirror/handler.rs b/pgdog/src/backend/pool/connection/mirror/handler.rs index fa3ab03eb..4e809af0d 100644 --- a/pgdog/src/backend/pool/connection/mirror/handler.rs +++ b/pgdog/src/backend/pool/connection/mirror/handler.rs @@ -33,7 +33,7 @@ impl MirrorHandler { } /// Maybe send request to handler. - pub fn send(&mut self, buffer: &Buffer) -> bool { + pub fn send(&mut self, buffer: &ClientRequest) -> bool { match self.state { MirrorHandlerState::Dropping => { debug!("mirror dropping request"); diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index dbe0e6f02..b93cf0a78 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -272,25 +272,25 @@ impl Connection { } /// Send buffer in a potentially sharded context. - pub(crate) async fn handle_buffer( + pub(crate) async fn handle_client_request( &mut self, - messages: &crate::frontend::ClientRequest, + client_request: &crate::frontend::ClientRequest, router: &mut Router, streaming: bool, ) -> Result<(), Error> { - if messages.copy() && !streaming { + if client_request.copy() && !streaming { let rows = router - .copy_data(messages) + .copy_data(client_request) .map_err(|e| Error::Router(e.to_string()))?; if !rows.is_empty() { self.send_copy(rows).await?; - self.send(&messages.without_copy_data()).await?; + self.send(&client_request.without_copy_data()).await?; } else { - self.send(messages).await?; + self.send(client_request).await?; } } else { // Send query to server. - self.send(messages).await?; + self.send(client_request).await?; } Ok(()) diff --git a/pgdog/src/frontend/client/query_engine/context.rs b/pgdog/src/frontend/client/query_engine/context.rs index 5641dda9c..b9816391e 100644 --- a/pgdog/src/frontend/client/query_engine/context.rs +++ b/pgdog/src/frontend/client/query_engine/context.rs @@ -15,7 +15,7 @@ pub struct QueryEngineContext<'a> { /// Client session parameters. pub(super) params: &'a mut Parameters, /// Request - pub(super) buffer: &'a mut ClientRequest, + pub(super) client_request: &'a mut ClientRequest, /// Client's socket to send responses to. pub(super) stream: &'a mut Stream, /// Client in transaction? @@ -35,7 +35,7 @@ impl<'a> QueryEngineContext<'a> { Self { prepared_statements: &mut client.prepared_statements, params: &mut client.params, - buffer: &mut client.client_request, + client_request: &mut client.client_request, stream: &mut client.stream, transaction: client.transaction, timeouts: client.timeouts, @@ -49,7 +49,7 @@ impl<'a> QueryEngineContext<'a> { Self { prepared_statements: &mut mirror.prepared_statements, params: &mut mirror.params, - buffer, + client_request: buffer, stream: &mut mirror.stream, transaction: mirror.transaction, timeouts: mirror.timeouts, diff --git a/pgdog/src/frontend/client/query_engine/incomplete_requests.rs b/pgdog/src/frontend/client/query_engine/incomplete_requests.rs index 24a78bb08..2d1699359 100644 --- a/pgdog/src/frontend/client/query_engine/incomplete_requests.rs +++ b/pgdog/src/frontend/client/query_engine/incomplete_requests.rs @@ -12,16 +12,16 @@ impl QueryEngine { context: &mut QueryEngineContext<'_>, ) -> Result { // Client sent Sync only - let only_sync = context.buffer.iter().all(|m| m.code() == 'S'); + let only_sync = context.client_request.iter().all(|m| m.code() == 'S'); // Client sent only Close. let only_close = context - .buffer + .client_request .iter() .all(|m| ['C', 'S'].contains(&m.code())) && !only_sync; let mut bytes_sent = 0; - for msg in context.buffer.iter() { + for msg in context.client_request.iter() { match msg.code() { 'C' => { let close = Close::from_bytes(msg.to_bytes()?)?; diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index e27a3da9a..2bfae1521 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -91,7 +91,8 @@ impl<'a> QueryEngine { /// Handle client request. pub async fn handle(&mut self, context: &mut QueryEngineContext<'_>) -> Result<(), Error> { - self.stats.received(context.buffer.total_message_len()); + self.stats + .received(context.client_request.total_message_len()); // Intercept commands we don't have to forward to a server. if self.intercept_incomplete(context).await? { @@ -109,7 +110,7 @@ impl<'a> QueryEngine { // Queue up request to mirrors, if any. // Do this before sending query to actual server // to have accurate timings between queries. - self.backend.mirror(&context.buffer); + self.backend.mirror(&context.client_request); let command = self.router.command(); let route = command.route().clone(); @@ -156,7 +157,7 @@ impl<'a> QueryEngine { } Command::Copy(_) => self.execute(context, &route).await?, Command::Rewrite(query) => { - context.buffer.rewrite(query)?; + context.client_request.rewrite(query)?; self.execute(context, &route).await?; } Command::Deallocate => self.deallocate(context).await?, diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 185a48e63..6a6c71186 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -35,21 +35,21 @@ impl QueryEngine { } // We need to run a query now. - if context.buffer.executable() { + if context.client_request.executable() { if let Some(begin_stmt) = self.begin_stmt.take() { self.backend.execute(begin_stmt.query()).await?; } } // Set response format. - for msg in context.buffer.iter() { + for msg in context.client_request.iter() { if let ProtocolMessage::Bind(bind) = msg { self.backend.bind(bind)? } } self.backend - .handle_buffer(context.buffer, &mut self.router, self.streaming) + .handle_client_request(context.client_request, &mut self.router, self.streaming) .await?; while self.backend.has_more_messages() diff --git a/pgdog/src/frontend/client/query_engine/route_query.rs b/pgdog/src/frontend/client/query_engine/route_query.rs index 37d970e2f..f7f318e07 100644 --- a/pgdog/src/frontend/client/query_engine/route_query.rs +++ b/pgdog/src/frontend/client/query_engine/route_query.rs @@ -21,7 +21,7 @@ impl QueryEngine { }; let router_context = RouterContext::new( - context.buffer, + context.client_request, cluster, context.prepared_statements, context.params, @@ -29,7 +29,7 @@ impl QueryEngine { )?; match self.router.query(router_context) { Ok(cmd) => { - trace!("routing {:#?} to {:#?}", context.buffer, cmd); + trace!("routing {:#?} to {:#?}", context.client_request, cmd); } Err(err) => { if err.empty_query() { diff --git a/pgdog/src/frontend/client_request.rs b/pgdog/src/frontend/client_request.rs index e739ed544..6f67b6437 100644 --- a/pgdog/src/frontend/client_request.rs +++ b/pgdog/src/frontend/client_request.rs @@ -15,14 +15,14 @@ use super::PreparedStatements; /// Message buffer. #[derive(Debug, Clone)] pub struct ClientRequest { - buffer: Vec, + messages: Vec, } impl MemoryUsage for ClientRequest { #[inline] fn memory_usage(&self) -> usize { // ProtocolMessage uses memory allocated by BytesMut (mostly). - self.buffer.capacity() * std::mem::size_of::() + self.messages.capacity() * std::mem::size_of::() } } @@ -36,14 +36,14 @@ impl ClientRequest { /// Create new buffer. pub fn new() -> Self { Self { - buffer: Vec::with_capacity(5), + messages: Vec::with_capacity(5), } } /// The buffer is full and the client won't send any more messages /// until it gets a reply, or we don't want to buffer the data in memory. pub fn full(&self) -> bool { - if let Some(message) = self.buffer.last() { + if let Some(message) = self.messages.last() { // Flush (F) | Sync (F) | Query (F) | CopyDone (F) | CopyFail (F) if matches!(message.code(), 'H' | 'S' | 'Q' | 'c' | 'f') { return true; @@ -66,12 +66,12 @@ impl ClientRequest { /// Number of bytes in the buffer. pub fn total_message_len(&self) -> usize { - self.buffer.iter().map(|b| b.len()).sum() + self.messages.iter().map(|b| b.len()).sum() } /// If this buffer contains a query, retrieve it. pub fn query(&self) -> Result, Error> { - for message in &self.buffer { + for message in &self.messages { match message { ProtocolMessage::Query(query) => { return Ok(Some(BufferedQuery::Query(query.clone()))) @@ -104,7 +104,7 @@ impl ClientRequest { /// If this buffer contains bound parameters, retrieve them. pub fn parameters(&self) -> Result, Error> { - for message in &self.buffer { + for message in &self.messages { if let ProtocolMessage::Bind(bind) = message { return Ok(Some(bind)); } @@ -116,7 +116,7 @@ impl ClientRequest { /// Get all CopyData messages. pub fn copy_data(&self) -> Result, Error> { let mut rows = vec![]; - for message in &self.buffer { + for message in &self.messages { if let ProtocolMessage::CopyData(copy_data) = message { rows.push(copy_data.clone()) } @@ -127,14 +127,14 @@ impl ClientRequest { /// Remove all CopyData messages and return the rest. pub fn without_copy_data(&self) -> Self { - let mut buffer = self.buffer.clone(); + let mut buffer = self.messages.clone(); buffer.retain(|m| m.code() != 'd'); - Self { buffer } + Self { messages: buffer } } /// The buffer has COPY messages. pub fn copy(&self) -> bool { - self.buffer + self.messages .last() .map(|m| m.code() == 'd' || m.code() == 'c') .unwrap_or(false) @@ -143,31 +143,31 @@ impl ClientRequest { /// The client is setting state on the connection /// which we can no longer ignore. pub(crate) fn executable(&self) -> bool { - self.buffer + self.messages .iter() .any(|m| ['E', 'Q', 'B'].contains(&m.code())) } /// Rewrite query in buffer. pub fn rewrite(&mut self, query: &str) -> Result<(), Error> { - if self.buffer.iter().any(|c| c.code() != 'Q') { + if self.messages.iter().any(|c| c.code() != 'Q') { return Err(Error::OnlySimpleForRewrites); } - self.buffer.clear(); - self.buffer.push(Query::new(query).into()); + self.messages.clear(); + self.messages.push(Query::new(query).into()); Ok(()) } } impl From for Vec { fn from(val: ClientRequest) -> Self { - val.buffer + val.messages } } impl From> for ClientRequest { fn from(value: Vec) -> Self { - ClientRequest { buffer: value } + ClientRequest { messages: value } } } @@ -175,12 +175,12 @@ impl Deref for ClientRequest { type Target = Vec; fn deref(&self) -> &Self::Target { - &self.buffer + &self.messages } } impl DerefMut for ClientRequest { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.buffer + &mut self.messages } } From 2c59105457737506f865fb668d74a45f04cdefe5 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Wed, 20 Aug 2025 18:44:30 -0400 Subject: [PATCH 03/15] wip --- pgdog/src/admin/backend.rs | 4 +- .../logical/subscriber/parallel_connection.rs | 4 +- pgdog/src/backend/server.rs | 4 +- pgdog/src/frontend/client/mod.rs | 8 +-- .../query_engine/incomplete_requests.rs | 11 +++- pgdog/src/frontend/client/query_engine/mod.rs | 3 ++ .../src/frontend/client/query_engine/query.rs | 4 +- pgdog/src/frontend/client/test/mod.rs | 2 +- pgdog/src/frontend/client/timeouts.rs | 8 ++- pgdog/src/frontend/client_request.rs | 53 +++++++++++-------- 10 files changed, 64 insertions(+), 37 deletions(-) diff --git a/pgdog/src/admin/backend.rs b/pgdog/src/admin/backend.rs index 33237bf9a..3b13ef5f5 100644 --- a/pgdog/src/admin/backend.rs +++ b/pgdog/src/admin/backend.rs @@ -37,8 +37,8 @@ impl Backend { } /// Handle command. - pub async fn send(&mut self, messages: &ClientRequest) -> Result<(), Error> { - let message = messages.first().ok_or(Error::Empty)?; + pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { + let message = client_request.messages.first().ok_or(Error::Empty)?; let message: ProtocolMessage = message.clone(); if message.code() != 'Q' { diff --git a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs index 144495de2..8e48f7ba4 100644 --- a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs +++ b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs @@ -57,8 +57,8 @@ impl ParallelConnection { } // Queue up the contents of the buffer. - pub async fn send(&mut self, buffer: &ClientRequest) -> Result<(), Error> { - for message in buffer.iter() { + pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { + for message in client_request.messages.iter() { self.tx .send(ParallelMessage::ProtocolMessage(message.clone())) .await diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 5e312aac7..cefbb9dd6 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -282,10 +282,10 @@ impl Server { } /// Send messages to the server and flush the buffer. - pub async fn send(&mut self, messages: &ClientRequest) -> Result<(), Error> { + pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { self.stats.state(State::Active); - for message in messages.iter() { + for message in client_request.messages.iter() { self.send_one(message).await?; } self.flush().await?; diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index fe5d78693..9b012b1b4 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -312,7 +312,7 @@ impl Client { buffer = self.buffer(client_state) => { let event = buffer?; - if !self.client_request.is_empty() { + if !self.client_request.messages.is_empty() { self.client_messages(&mut query_engine).await?; } @@ -366,7 +366,7 @@ impl Client { /// This ensures we don't check out a connection from the pool until the client /// sent a complete request. async fn buffer(&mut self, state: State) -> Result { - self.client_request.clear(); + self.client_request.messages.clear(); // Only start timer once we receive the first message. let mut timer = None; @@ -409,9 +409,10 @@ impl Client { let message = ProtocolMessage::from_bytes(message.to_bytes()?)?; if message.extended() && self.prepared_statements.enabled { self.client_request + .messages .push(self.prepared_statements.maybe_rewrite(message)?); } else { - self.client_request.push(message); + self.client_request.messages.push(message); } } } @@ -421,6 +422,7 @@ impl Client { "request buffered [{:.4}ms] {:?}", timer.unwrap().elapsed().as_secs_f64() * 1000.0, self.client_request + .messages .iter() .map(|m| m.code()) .collect::>(), diff --git a/pgdog/src/frontend/client/query_engine/incomplete_requests.rs b/pgdog/src/frontend/client/query_engine/incomplete_requests.rs index 2d1699359..20e852424 100644 --- a/pgdog/src/frontend/client/query_engine/incomplete_requests.rs +++ b/pgdog/src/frontend/client/query_engine/incomplete_requests.rs @@ -12,16 +12,23 @@ impl QueryEngine { context: &mut QueryEngineContext<'_>, ) -> Result { // Client sent Sync only - let only_sync = context.client_request.iter().all(|m| m.code() == 'S'); + let only_sync = context + .client_request + .messages + .iter() + .all(|m| m.code() == 'S'); + // Client sent only Close. let only_close = context .client_request + .messages .iter() .all(|m| ['C', 'S'].contains(&m.code())) && !only_sync; + let mut bytes_sent = 0; - for msg in context.client_request.iter() { + for msg in context.client_request.messages.iter() { match msg.code() { 'C' => { let close = Close::from_bytes(msg.to_bytes()?)?; diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index 2bfae1521..d101ba463 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -115,6 +115,9 @@ impl<'a> QueryEngine { let command = self.router.command(); let route = command.route().clone(); + // FIXME, we should not to copy route twice. + context.client_request.route = route.clone(); + match command { Command::Shards(shards) => self.show_shards(context, *shards).await?, Command::StartTransaction(begin) => { diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 6a6c71186..6dbbb951d 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -30,7 +30,7 @@ impl QueryEngine { return Ok(()); } - if !self.connect(context, route).await? { + if !self.connect(context, &route).await? { return Ok(()); } @@ -42,7 +42,7 @@ impl QueryEngine { } // Set response format. - for msg in context.client_request.iter() { + for msg in context.client_request.messages.iter() { if let ProtocolMessage::Bind(bind) = msg { self.backend.bind(bind)? } diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 3f52a75e8..00b4c1c7f 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -399,7 +399,7 @@ async fn test_abrupt_disconnect() { let event = client.buffer(State::Idle).await.unwrap(); assert_eq!(event, BufferEvent::DisconnectAbrupt); - assert!(client.client_request.is_empty()); + assert!(client.client_request.messages.is_empty()); // Client disconnects and returns gracefully. let (conn, mut client, _) = new_client!(false); diff --git a/pgdog/src/frontend/client/timeouts.rs b/pgdog/src/frontend/client/timeouts.rs index 57b75b7b0..dea4962f2 100644 --- a/pgdog/src/frontend/client/timeouts.rs +++ b/pgdog/src/frontend/client/timeouts.rs @@ -35,10 +35,14 @@ impl Timeouts { } #[inline] - pub(crate) fn client_idle_timeout(&self, state: &State, buffer: &ClientRequest) -> Duration { + pub(crate) fn client_idle_timeout( + &self, + state: &State, + client_request: &ClientRequest, + ) -> Duration { match state { State::Idle => { - if buffer.is_empty() { + if client_request.messages.is_empty() { self.client_idle_timeout } else { Duration::MAX diff --git a/pgdog/src/frontend/client_request.rs b/pgdog/src/frontend/client_request.rs index 6f67b6437..415129e19 100644 --- a/pgdog/src/frontend/client_request.rs +++ b/pgdog/src/frontend/client_request.rs @@ -1,5 +1,4 @@ -//! Message buffer. - +//! ClientRequest (messages buffer). use crate::{ net::{ messages::{Bind, CopyData, Protocol, Query}, @@ -7,15 +6,19 @@ use crate::{ }, stats::memory::MemoryUsage, }; -use std::ops::{Deref, DerefMut}; + +use super::{ + router::{parser::Shard, Route}, + PreparedStatements, +}; pub use super::BufferedQuery; -use super::PreparedStatements; /// Message buffer. #[derive(Debug, Clone)] pub struct ClientRequest { - messages: Vec, + pub messages: Vec, + pub route: Route, } impl MemoryUsage for ClientRequest { @@ -37,6 +40,7 @@ impl ClientRequest { pub fn new() -> Self { Self { messages: Vec::with_capacity(5), + route: Route::write(Shard::All), } } @@ -127,9 +131,13 @@ impl ClientRequest { /// Remove all CopyData messages and return the rest. pub fn without_copy_data(&self) -> Self { - let mut buffer = self.messages.clone(); - buffer.retain(|m| m.code() != 'd'); - Self { messages: buffer } + let mut messages = self.messages.clone(); + messages.retain(|m| m.code() != 'd'); + + Self { + messages, + route: self.route.clone(), + } } /// The buffer has COPY messages. @@ -166,21 +174,24 @@ impl From for Vec { } impl From> for ClientRequest { - fn from(value: Vec) -> Self { - ClientRequest { messages: value } + fn from(messages: Vec) -> Self { + ClientRequest { + messages, + route: Route::write(Shard::All), + } } } -impl Deref for ClientRequest { - type Target = Vec; +// impl Deref for ClientRequest { +// type Target = Vec; - fn deref(&self) -> &Self::Target { - &self.messages - } -} +// fn deref(&self) -> &Self::Target { +// &self.messages +// } +// } -impl DerefMut for ClientRequest { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.messages - } -} +// impl DerefMut for ClientRequest { +// fn deref_mut(&mut self) -> &mut Self::Target { +// &mut self.messages +// } +// } From 7eec2f70efddc1ab21aad30db9058cdf55bd070d Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Thu, 21 Aug 2025 10:01:13 -0400 Subject: [PATCH 04/15] wip --- pgdog/src/backend/pool/connection/binding.rs | 8 -------- pgdog/src/frontend/client/query_engine/query.rs | 8 +++++--- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 12894ad35..583ad91a5 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -115,14 +115,6 @@ impl Binding { /// Send an entire buffer of messages to the servers(s). pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { - println!(); - println!(); - println!(); - println!("ClientRequesto {:#?}", client_request); - println!(); - println!(); - println!(); - match self { Binding::Admin(backend) => Ok(backend.send(client_request).await?), diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 6dbbb951d..61ae693c1 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -98,9 +98,11 @@ impl QueryEngine { let in_transaction = message.in_transaction() || self.begin_stmt.is_some(); if !in_transaction { context.transaction = None; - } else if context.transaction.is_none() { - // Query parser is disabled, so the server is responsible for telling us - // we started a transaction. + } + + // Query parser is disabled, so the server is responsible for telling + // us we started a transaction. Do not override an existing transaction state. + if in_transaction && context.transaction.is_none() { context.transaction = Some(TransactionType::ReadWrite); } From de848772b4147e4d9c5fe319e8fa6f954e725ae6 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Thu, 21 Aug 2025 10:06:50 -0400 Subject: [PATCH 05/15] fix-rename --- pgdog/src/backend/pool/connection/mirror/buffer_with_delay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pgdog/src/backend/pool/connection/mirror/buffer_with_delay.rs b/pgdog/src/backend/pool/connection/mirror/buffer_with_delay.rs index 8c0541f86..3e23e0439 100644 --- a/pgdog/src/backend/pool/connection/mirror/buffer_with_delay.rs +++ b/pgdog/src/backend/pool/connection/mirror/buffer_with_delay.rs @@ -3,11 +3,11 @@ use std::time::Duration; -use crate::frontend::Buffer; +use crate::frontend::ClientRequest; /// Simulate original delay between requests. #[derive(Clone, Debug)] pub struct BufferWithDelay { pub(super) delay: Duration, - pub(super) buffer: Buffer, + pub(super) buffer: ClientRequest, } From 757d3d8e595f0782d90d5a9d26ea6392dcd96f74 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Thu, 21 Aug 2025 10:20:11 -0400 Subject: [PATCH 06/15] wip --- pgdog/src/frontend/client_request.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pgdog/src/frontend/client_request.rs b/pgdog/src/frontend/client_request.rs index 415129e19..cf8edb5de 100644 --- a/pgdog/src/frontend/client_request.rs +++ b/pgdog/src/frontend/client_request.rs @@ -181,17 +181,3 @@ impl From> for ClientRequest { } } } - -// impl Deref for ClientRequest { -// type Target = Vec; - -// fn deref(&self) -> &Self::Target { -// &self.messages -// } -// } - -// impl DerefMut for ClientRequest { -// fn deref_mut(&mut self) -> &mut Self::Target { -// &mut self.messages -// } -// } From a7f1b657ef1205a9f617a9409688cbb30c9c292e Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Thu, 21 Aug 2025 11:38:54 -0400 Subject: [PATCH 07/15] wip --- .../frontend/client/query_engine/connect.rs | 2 +- pgdog/src/frontend/client/query_engine/mod.rs | 20 ++++++++----------- .../src/frontend/client/query_engine/query.rs | 7 ++++--- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pgdog/src/frontend/client/query_engine/connect.rs b/pgdog/src/frontend/client/query_engine/connect.rs index b5ff0f968..7c6aa5ab7 100644 --- a/pgdog/src/frontend/client/query_engine/connect.rs +++ b/pgdog/src/frontend/client/query_engine/connect.rs @@ -11,13 +11,13 @@ impl QueryEngine { pub(super) async fn connect( &mut self, context: &mut QueryEngineContext<'_>, - route: &Route, ) -> Result { if self.backend.connected() { return Ok(true); } let request = Request::new(self.client_id); + let route = &context.client_request.route; self.stats.waiting(request.created_at); self.comms.stats(self.stats); diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index d101ba463..75a4c712c 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -1,8 +1,8 @@ use crate::{ backend::pool::{Connection, Request}, frontend::{ - router::{parser::Shard, Route}, - BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats, + router::parser::Shard, BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, + Stats, }, net::{BackendKeyData, ErrorResponse, Message, Parameters}, state::State, @@ -113,10 +113,6 @@ impl<'a> QueryEngine { self.backend.mirror(&context.client_request); let command = self.router.command(); - let route = command.route().clone(); - - // FIXME, we should not to copy route twice. - context.client_request.route = route.clone(); match command { Command::Shards(shards) => self.show_shards(context, *shards).await?, @@ -125,19 +121,19 @@ impl<'a> QueryEngine { } Command::CommitTransaction => { if self.backend.connected() { - self.execute(context, &route).await? + self.execute(context).await? } else { self.end_transaction(context, false).await? } } Command::RollbackTransaction => { if self.backend.connected() { - self.execute(context, &route).await? + self.execute(context).await? } else { self.end_transaction(context, true).await? } } - Command::Query(_) => self.execute(context, &route).await?, + Command::Query(_) => self.execute(context).await?, Command::Listen { channel, shard } => { self.listen(context, &channel.clone(), shard.clone()) .await? @@ -153,15 +149,15 @@ impl<'a> QueryEngine { Command::Unlisten(channel) => self.unlisten(context, &channel.clone()).await?, Command::Set { name, value } => { if self.backend.connected() { - self.execute(context, &route).await? + self.execute(context).await? } else { self.set(context, name.clone(), value.clone()).await? } } - Command::Copy(_) => self.execute(context, &route).await?, + Command::Copy(_) => self.execute(context).await?, Command::Rewrite(query) => { context.client_request.rewrite(query)?; - self.execute(context, &route).await?; + self.execute(context).await?; } Command::Deallocate => self.deallocate(context).await?, command => self.unknown_command(context, command.clone()).await?, diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 61ae693c1..e0973fdcd 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -15,8 +15,9 @@ impl QueryEngine { pub(super) async fn execute( &mut self, context: &mut QueryEngineContext<'_>, - route: &Route, ) -> Result<(), Error> { + let route = &context.client_request.route; + // Check for cross-shard quries. if context.cross_shard_disabled && route.is_cross_shard() { let bytes_sent = context @@ -30,11 +31,11 @@ impl QueryEngine { return Ok(()); } - if !self.connect(context, &route).await? { + if !self.connect(context).await? { return Ok(()); } - // We need to run a query now. + // We need to run a query now, execute any buffered/queued `BEGIN` statements if context.client_request.executable() { if let Some(begin_stmt) = self.begin_stmt.take() { self.backend.execute(begin_stmt.query()).await?; From 06f2de8e510c3d419db54bda8ea1f775fa19a741 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Thu, 21 Aug 2025 18:47:43 -0400 Subject: [PATCH 08/15] wip --- pgdog-plugin/src/bindings.rs | 1 + pgdog/src/backend/pool/connection/binding.rs | 5 +++-- pgdog/src/backend/pool/connection/mod.rs | 2 +- pgdog/src/backend/pool/guard.rs | 2 +- pgdog/src/backend/server.rs | 3 ++- .../src/frontend/client/query_engine/connect.rs | 16 +++++++++++++--- pgdog/src/frontend/client/query_engine/query.rs | 2 +- pgdog/src/frontend/router/parser/query/mod.rs | 2 ++ pgdog/src/frontend/router/parser/query/select.rs | 2 ++ pgdog/src/frontend/router/parser/route.rs | 7 +++++++ 10 files changed, 33 insertions(+), 9 deletions(-) diff --git a/pgdog-plugin/src/bindings.rs b/pgdog-plugin/src/bindings.rs index 6f47703df..fdabc97b7 100644 --- a/pgdog-plugin/src/bindings.rs +++ b/pgdog-plugin/src/bindings.rs @@ -21,6 +21,7 @@ pub const _DARWIN_FEATURE_ONLY_VERS_1050: u32 = 1; pub const _DARWIN_FEATURE_ONLY_UNIX_CONFORMANCE: u32 = 1; pub const _DARWIN_FEATURE_UNIX_CONFORMANCE: u32 = 3; pub const __has_ptrcheck: u32 = 0; +pub const __has_bounds_safety_attributes: u32 = 0; pub const USE_CLANG_TYPES: u32 = 0; pub const __PTHREAD_SIZE__: u32 = 8176; pub const __PTHREAD_ATTR_SIZE__: u32 = 56; diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 583ad91a5..042d6e3a3 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -59,6 +59,8 @@ impl Binding { pub(super) async fn read(&mut self) -> Result { match self { + Binding::Admin(backend) => Ok(backend.read().await?), + Binding::Server(guard) => { if let Some(guard) = guard.as_mut() { guard.read().await @@ -70,7 +72,6 @@ impl Binding { } } - Binding::Admin(backend) => Ok(backend.read().await?), Binding::MultiShard(shards, state) => { if shards.is_empty() { loop { @@ -271,7 +272,7 @@ impl Binding { } } - pub(super) fn dirty(&mut self) { + pub(super) fn mark_dirty(&mut self) { match self { Binding::Server(Some(ref mut server)) => server.mark_dirty(true), Binding::MultiShard(ref mut servers, _state) => { diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index b93cf0a78..d970b394c 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -353,7 +353,7 @@ impl Connection { pub(crate) fn lock(&mut self, lock: bool) { self.locked = lock; if lock { - self.binding.dirty(); + self.binding.mark_dirty(); } } diff --git a/pgdog/src/backend/pool/guard.rs b/pgdog/src/backend/pool/guard.rs index e121f3d70..0b47d8bd3 100644 --- a/pgdog/src/backend/pool/guard.rs +++ b/pgdog/src/backend/pool/guard.rs @@ -128,7 +128,7 @@ impl Guard { error!("server reset error [{}]", server.addr()); } Ok(_) => { - server.cleaned(); + server.mark_clean(); } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index cefbb9dd6..87ac4c53d 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -338,6 +338,7 @@ impl Server { if let Some(message) = self.prepared_statements.state_mut().get_simulated() { return Ok(message.backend()); } + match self .stream .as_mut() @@ -823,7 +824,7 @@ impl Server { /// Server has been cleaned. #[inline] - pub(super) fn cleaned(&mut self) { + pub(super) fn mark_clean(&mut self) { self.dirty = false; } diff --git a/pgdog/src/frontend/client/query_engine/connect.rs b/pgdog/src/frontend/client/query_engine/connect.rs index 7c6aa5ab7..acbe31ebf 100644 --- a/pgdog/src/frontend/client/query_engine/connect.rs +++ b/pgdog/src/frontend/client/query_engine/connect.rs @@ -1,9 +1,8 @@ use tokio::time::timeout; +use tracing::error; use super::*; -use tracing::error; - impl QueryEngine { /// Connect to backend, if necessary. /// @@ -17,7 +16,7 @@ impl QueryEngine { } let request = Request::new(self.client_id); - let route = &context.client_request.route; + let route = context.client_request.route.clone(); self.stats.waiting(request.created_at); self.comms.stats(self.stats); @@ -26,12 +25,23 @@ impl QueryEngine { Ok(_) => { self.stats.connected(); self.stats.locked(route.lock_session()); + + println!("scuba :: lock_session {:#?}", route.lock_session()); + // This connection will be locked to this client // until they disconnect. // // Used in case the client runs an advisory lock // or another leaky transaction mode abstraction. self.backend.lock(route.lock_session()); + println!("lock_session {:#?}", route.lock_session()); + + println!(""); + println!(""); + println!(""); + println!("backend: {:#?}", self.backend); + println!(""); + println!(""); if let Ok(addr) = self.backend.addr() { debug!( diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index e0973fdcd..cc3e39300 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -16,7 +16,7 @@ impl QueryEngine { &mut self, context: &mut QueryEngineContext<'_>, ) -> Result<(), Error> { - let route = &context.client_request.route; + let route = context.client_request.route.clone(); // Check for cross-shard quries. if context.cross_shard_disabled && route.is_cross_shard() { diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index 06a297420..767eb3555 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -288,6 +288,8 @@ impl QueryParser { // Set plugin-specified route, if available. // Plugins override what we calculated above. if let Command::Query(ref mut route) = command { + println!("fuuuuu"); + if let Some(read) = self.plugin_output.read { route.set_read_mut(read); } diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index 5a59f58a7..b226f312b 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -26,6 +26,7 @@ impl QueryParser { } if matches!(self.shard, Shard::Direct(_)) { + println!("2"); return Ok(Command::Query( Route::read(self.shard.clone()).set_write(writes), )); @@ -33,6 +34,7 @@ impl QueryParser { // `SELECT NOW()`, `SELECT 1`, etc. if stmt.from_clause.is_empty() { + println!("1"); return Ok(Command::Query( Route::read(Some(round_robin::next() % context.shards)).set_write(writes), )); diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index e96bbc18e..fdb0335c8 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -170,6 +170,7 @@ impl Route { } pub fn set_write(mut self, write: FunctionBehavior) -> Self { + println!("i am setting writes"); self.set_write_mut(write); self } @@ -179,8 +180,14 @@ impl Route { writes, locking_behavior, } = write; + + println!("locking behavior: {:?}", &locking_behavior); + println!("write: {:?}", &write); + self.read = !writes; self.lock_session = matches!(locking_behavior, LockingBehavior::Lock); + + println!("1.1 self {:#?}", self); } pub fn set_lock_session(mut self) -> Self { From 9d8ab4ab5004fb3f80876647ba9609f1e94ec7f3 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 22 Aug 2025 10:21:19 -0400 Subject: [PATCH 09/15] wip --- pgdog/src/frontend/client/query_engine/mod.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index 31c086ce7..75a4c712c 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -3,8 +3,6 @@ use crate::{ frontend::{ router::parser::Shard, BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats, - router::{parser::Shard, Route}, - BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats, }, net::{BackendKeyData, ErrorResponse, Message, Parameters}, state::State, @@ -116,9 +114,6 @@ impl<'a> QueryEngine { let command = self.router.command(); - // FIXME, we should not to copy route twice. - context.client_request.route = route.clone(); - match command { Command::Shards(shards) => self.show_shards(context, *shards).await?, Command::StartTransaction(begin) => { From dcf42ada7a42856f38bf5302c6f1fc3f4f196440 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 22 Aug 2025 11:43:15 -0400 Subject: [PATCH 10/15] wip --- pgdog/src/frontend/client/query_engine/mod.rs | 1 + pgdog/src/frontend/client/query_engine/query.rs | 4 ++++ pgdog/src/frontend/router/parser/query/select.rs | 14 +++++++++++--- pgdog/src/frontend/router/parser/route.rs | 3 ++- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index 75a4c712c..2ffe35927 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -113,6 +113,7 @@ impl<'a> QueryEngine { self.backend.mirror(&context.client_request); let command = self.router.command(); + context.client_request.route = command.route().clone(); match command { Command::Shards(shards) => self.show_shards(context, *shards).await?, diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index cc3e39300..04d84b267 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -18,6 +18,8 @@ impl QueryEngine { ) -> Result<(), Error> { let route = context.client_request.route.clone(); + println!("-- commando.routo X: {:#?}", &route); + // Check for cross-shard quries. if context.cross_shard_disabled && route.is_cross_shard() { let bytes_sent = context @@ -31,6 +33,8 @@ impl QueryEngine { return Ok(()); } + println!("-- commando.routo 2: {:#?}", &route); + if !self.connect(context).await? { return Ok(()); } diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index b226f312b..8fc222526 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -34,10 +34,18 @@ impl QueryParser { // `SELECT NOW()`, `SELECT 1`, etc. if stmt.from_clause.is_empty() { - println!("1"); - return Ok(Command::Query( + let command = Command::Query( Route::read(Some(round_robin::next() % context.shards)).set_write(writes), - )); + ); + + println!(""); + println!(""); + println!(""); + println!("-- commando.routo 1: {:#?}", &command.route()); + println!(""); + println!(""); + + return Ok(command); } let order_by = Self::select_sort(&stmt.sort_clause, context.router_context.bind); diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index fdb0335c8..feecdaed8 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -172,6 +172,7 @@ impl Route { pub fn set_write(mut self, write: FunctionBehavior) -> Self { println!("i am setting writes"); self.set_write_mut(write); + println!("yolokoko: {}", self.lock_session()); self } @@ -182,7 +183,7 @@ impl Route { } = write; println!("locking behavior: {:?}", &locking_behavior); - println!("write: {:?}", &write); + println!(">> write: {:?}", &write); self.read = !writes; self.lock_session = matches!(locking_behavior, LockingBehavior::Lock); From 5a83b1781af1b9a28ff64a9857542942a4a915dd Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 22 Aug 2025 11:59:08 -0400 Subject: [PATCH 11/15] wip --- pgdog-plugin/src/bindings.rs | 1 - pgdog/src/frontend/client/query_engine/connect.rs | 10 ---------- pgdog/src/frontend/client/query_engine/query.rs | 12 +++--------- pgdog/src/frontend/router/parser/query/mod.rs | 2 -- pgdog/src/frontend/router/parser/query/select.rs | 8 -------- pgdog/src/frontend/router/parser/route.rs | 5 ----- 6 files changed, 3 insertions(+), 35 deletions(-) diff --git a/pgdog-plugin/src/bindings.rs b/pgdog-plugin/src/bindings.rs index fdabc97b7..6f47703df 100644 --- a/pgdog-plugin/src/bindings.rs +++ b/pgdog-plugin/src/bindings.rs @@ -21,7 +21,6 @@ pub const _DARWIN_FEATURE_ONLY_VERS_1050: u32 = 1; pub const _DARWIN_FEATURE_ONLY_UNIX_CONFORMANCE: u32 = 1; pub const _DARWIN_FEATURE_UNIX_CONFORMANCE: u32 = 3; pub const __has_ptrcheck: u32 = 0; -pub const __has_bounds_safety_attributes: u32 = 0; pub const USE_CLANG_TYPES: u32 = 0; pub const __PTHREAD_SIZE__: u32 = 8176; pub const __PTHREAD_ATTR_SIZE__: u32 = 56; diff --git a/pgdog/src/frontend/client/query_engine/connect.rs b/pgdog/src/frontend/client/query_engine/connect.rs index acbe31ebf..33b694089 100644 --- a/pgdog/src/frontend/client/query_engine/connect.rs +++ b/pgdog/src/frontend/client/query_engine/connect.rs @@ -26,22 +26,12 @@ impl QueryEngine { self.stats.connected(); self.stats.locked(route.lock_session()); - println!("scuba :: lock_session {:#?}", route.lock_session()); - // This connection will be locked to this client // until they disconnect. // // Used in case the client runs an advisory lock // or another leaky transaction mode abstraction. self.backend.lock(route.lock_session()); - println!("lock_session {:#?}", route.lock_session()); - - println!(""); - println!(""); - println!(""); - println!("backend: {:#?}", self.backend); - println!(""); - println!(""); if let Ok(addr) = self.backend.addr() { debug!( diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 04d84b267..968d76ecd 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -18,8 +18,6 @@ impl QueryEngine { ) -> Result<(), Error> { let route = context.client_request.route.clone(); - println!("-- commando.routo X: {:#?}", &route); - // Check for cross-shard quries. if context.cross_shard_disabled && route.is_cross_shard() { let bytes_sent = context @@ -33,8 +31,6 @@ impl QueryEngine { return Ok(()); } - println!("-- commando.routo 2: {:#?}", &route); - if !self.connect(context).await? { return Ok(()); } @@ -103,11 +99,9 @@ impl QueryEngine { let in_transaction = message.in_transaction() || self.begin_stmt.is_some(); if !in_transaction { context.transaction = None; - } - - // Query parser is disabled, so the server is responsible for telling - // us we started a transaction. Do not override an existing transaction state. - if in_transaction && context.transaction.is_none() { + } else if in_transaction && context.transaction.is_none() { + // Query parser is disabled, so the server is responsible for telling us + // we started a transaction. context.transaction = Some(TransactionType::ReadWrite); } diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index 767eb3555..06a297420 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -288,8 +288,6 @@ impl QueryParser { // Set plugin-specified route, if available. // Plugins override what we calculated above. if let Command::Query(ref mut route) = command { - println!("fuuuuu"); - if let Some(read) = self.plugin_output.read { route.set_read_mut(read); } diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index 8fc222526..786e7774f 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -26,7 +26,6 @@ impl QueryParser { } if matches!(self.shard, Shard::Direct(_)) { - println!("2"); return Ok(Command::Query( Route::read(self.shard.clone()).set_write(writes), )); @@ -38,13 +37,6 @@ impl QueryParser { Route::read(Some(round_robin::next() % context.shards)).set_write(writes), ); - println!(""); - println!(""); - println!(""); - println!("-- commando.routo 1: {:#?}", &command.route()); - println!(""); - println!(""); - return Ok(command); } diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index feecdaed8..683fe5154 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -170,9 +170,7 @@ impl Route { } pub fn set_write(mut self, write: FunctionBehavior) -> Self { - println!("i am setting writes"); self.set_write_mut(write); - println!("yolokoko: {}", self.lock_session()); self } @@ -182,9 +180,6 @@ impl Route { locking_behavior, } = write; - println!("locking behavior: {:?}", &locking_behavior); - println!(">> write: {:?}", &write); - self.read = !writes; self.lock_session = matches!(locking_behavior, LockingBehavior::Lock); From 895cfd63e8bef59cc8f8682068837cc1dbef37d4 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 22 Aug 2025 12:08:52 -0400 Subject: [PATCH 12/15] wip --- pgdog/src/frontend/client/query_engine/query.rs | 2 +- pgdog/src/frontend/router/parser/query/select.rs | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 968d76ecd..eba2c6e59 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -16,7 +16,7 @@ impl QueryEngine { &mut self, context: &mut QueryEngineContext<'_>, ) -> Result<(), Error> { - let route = context.client_request.route.clone(); + let route = &context.client_request.route; // Check for cross-shard quries. if context.cross_shard_disabled && route.is_cross_shard() { diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index 786e7774f..5a59f58a7 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -33,11 +33,9 @@ impl QueryParser { // `SELECT NOW()`, `SELECT 1`, etc. if stmt.from_clause.is_empty() { - let command = Command::Query( + return Ok(Command::Query( Route::read(Some(round_robin::next() % context.shards)).set_write(writes), - ); - - return Ok(command); + )); } let order_by = Self::select_sort(&stmt.sort_clause, context.router_context.bind); From 8043cce61ef88d60c3848e371a64b63ca23efe8e Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 22 Aug 2025 12:09:48 -0400 Subject: [PATCH 13/15] wip --- pgdog/src/frontend/client/query_engine/connect.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgdog/src/frontend/client/query_engine/connect.rs b/pgdog/src/frontend/client/query_engine/connect.rs index 33b694089..3886151ce 100644 --- a/pgdog/src/frontend/client/query_engine/connect.rs +++ b/pgdog/src/frontend/client/query_engine/connect.rs @@ -16,7 +16,7 @@ impl QueryEngine { } let request = Request::new(self.client_id); - let route = context.client_request.route.clone(); + let route = &context.client_request.route; self.stats.waiting(request.created_at); self.comms.stats(self.stats); From 33acaf225a43226051894e4899461d86e9e172d5 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 22 Aug 2025 12:11:05 -0400 Subject: [PATCH 14/15] reduce-diff --- pgdog/src/frontend/client/query_engine/query.rs | 2 +- pgdog/src/frontend/router/parser/route.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index eba2c6e59..6036fe32d 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -99,7 +99,7 @@ impl QueryEngine { let in_transaction = message.in_transaction() || self.begin_stmt.is_some(); if !in_transaction { context.transaction = None; - } else if in_transaction && context.transaction.is_none() { + } else if context.transaction.is_none() { // Query parser is disabled, so the server is responsible for telling us // we started a transaction. context.transaction = Some(TransactionType::ReadWrite); diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index 683fe5154..4b8335043 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -182,8 +182,6 @@ impl Route { self.read = !writes; self.lock_session = matches!(locking_behavior, LockingBehavior::Lock); - - println!("1.1 self {:#?}", self); } pub fn set_lock_session(mut self) -> Self { From 303e5fc50ade41b36c258d93956afdfe0ccefb35 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Tue, 26 Aug 2025 14:53:36 -0400 Subject: [PATCH 15/15] wip --- pgdog/src/backend/pool/connection/binding.rs | 6 ++++++ pgdog/src/backend/pool/connection/mod.rs | 2 ++ pgdog/src/backend/pool/connection/multi_shard/mod.rs | 2 ++ 3 files changed, 10 insertions(+) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 042d6e3a3..8ab6ab19f 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -116,6 +116,8 @@ impl Binding { /// Send an entire buffer of messages to the servers(s). pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { + println!("\n\nsending.... \n{:#?}\n\n", client_request); + match self { Binding::Admin(backend) => Ok(backend.send(client_request).await?), @@ -129,6 +131,7 @@ impl Binding { Binding::MultiShard(servers, _state) => { for server in servers.iter_mut() { + println!("\nserver\n${:#?}\n", server); server.send(client_request).await?; } @@ -223,6 +226,7 @@ impl Binding { /// Execute a query on all servers. pub async fn execute(&mut self, query: &str) -> Result<(), Error> { + println!("\n\nexecuting...\n{:#?}", query); match self { Binding::Server(Some(ref mut server)) => { server.execute(query).await?; @@ -244,6 +248,8 @@ impl Binding { match self { Binding::Server(Some(ref mut server)) => server.link_client(params).await, Binding::MultiShard(ref mut servers, _) => { + println!("\nLinking... {:#?}", servers); + let mut max = 0; for server in servers { let synced = server.link_client(params).await?; diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index d970b394c..934f5095c 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -278,6 +278,8 @@ impl Connection { router: &mut Router, streaming: bool, ) -> Result<(), Error> { + println!("\nhandle_client_request: \n\n{:#?}\n \n", client_request); + if client_request.copy() && !streaming { let rows = router .copy_data(client_request) diff --git a/pgdog/src/backend/pool/connection/multi_shard/mod.rs b/pgdog/src/backend/pool/connection/multi_shard/mod.rs index e3c442331..3390b18f4 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/mod.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/mod.rs @@ -40,6 +40,7 @@ struct Counters { pub struct MultiShard { /// Number of shards we are connected to. shards: usize, + /// Route the query is taking. route: Route, @@ -48,6 +49,7 @@ pub struct MultiShard { /// Sorting/aggregate buffer. buffer: Buffer, + decoder: Decoder, }