Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{error, info};
use crate::{
backend::{
databases::{databases, User as DatabaseUser},
replication::{ReplicationConfig, ShardedColumn, ShardedSchemas},
replication::{ReplicationConfig, ShardedSchemas},
Schema, ShardedTables,
},
config::{
Expand Down Expand Up @@ -369,11 +369,6 @@ impl Cluster {
self.pub_sub_channel_size > 0
}

/// Find sharded column position, if the table and columns match the configuration.
pub fn sharded_column(&self, table: &str, columns: &[&str]) -> Option<ShardedColumn> {
self.sharded_tables.sharded_column(table, columns)
}

/// A cluster is read_only if zero shards have a primary.
pub fn read_only(&self) -> bool {
for shard in &self.shards {
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/pool/connection/lazy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Lazy connection guard.
//!
//! Handles server synchronization and lazy connection creation.
35 changes: 34 additions & 1 deletion pgdog/src/backend/replication/sharded_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pgdog_config::OmnishardedTable;

use crate::{
config::{DataType, ShardedTable},
frontend::router::sharding::Mapping,
frontend::router::{parser::Column, sharding::Mapping},
net::messages::Vector,
};
use std::{
Expand Down Expand Up @@ -103,6 +103,39 @@ impl ShardedTables {
.find(|t| t.name.as_deref() == Some(name))
}

/// Determine if the column is sharded and return its data type,
/// as declared in the schema.
pub fn get_table(&self, column: Column<'_>) -> Option<&ShardedTable> {
// Only fully-qualified columns can be matched.
let table = if let Some(table) = column.table() {
table
} else {
return None;
};

for candidate in &self.inner.tables {
if let Some(table_name) = candidate.name.as_ref() {
if !table.name_match(table_name) {
continue;
}
}

if let Some(schema_name) = candidate.schema.as_ref() {
if let Some(schema) = table.schema() {
if schema.name != schema_name {
continue;
}
}
}

if column.name == candidate.column {
return Some(candidate);
}
}

None
}

/// Find out which column (if any) is sharded in the given table.
pub fn sharded_column(&self, table: &str, columns: &[&str]) -> Option<ShardedColumn> {
let with_names = self
Expand Down
49 changes: 32 additions & 17 deletions pgdog/src/frontend/router/parser/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use pg_query::{
};
use std::fmt::{Display, Formatter, Result as FmtResult};

use super::Table;
use super::{Error, Table};
use crate::util::escape_identifier;

/// Column name extracted from a query.
Expand Down Expand Up @@ -44,9 +44,7 @@ impl<'a> Column<'a> {
pub fn to_owned(&self) -> OwnedColumn {
OwnedColumn::from(*self)
}
}

impl<'a> Column<'a> {
pub fn from_string(string: &'a Node) -> Result<Self, ()> {
match &string.node {
Some(NodeEnum::String(protobuf::String { sval })) => Ok(Self {
Expand All @@ -57,6 +55,14 @@ impl<'a> Column<'a> {
_ => Err(()),
}
}

/// Fully-qualify this column with a table.
pub fn qualify(&mut self, table: Table<'a>) {
if self.table.is_none() {
self.table = Some(table.name);
self.schema = table.schema;
}
}
}

impl<'a> Display for Column<'a> {
Expand Down Expand Up @@ -114,15 +120,15 @@ impl<'a> From<&'a OwnedColumn> for Column<'a> {
}

impl<'a> TryFrom<&'a Node> for Column<'a> {
type Error = ();
type Error = Error;

fn try_from(value: &'a Node) -> Result<Self, Self::Error> {
Column::try_from(&value.node)
}
}

impl<'a> TryFrom<&'a Option<NodeEnum>> for Column<'a> {
type Error = ();
type Error = Error;

fn try_from(value: &'a Option<NodeEnum>) -> Result<Self, Self::Error> {
fn from_node(node: &Node) -> Option<&str> {
Expand All @@ -133,12 +139,15 @@ impl<'a> TryFrom<&'a Option<NodeEnum>> for Column<'a> {
}
}

fn from_slice<'a>(nodes: &'a [Node]) -> Result<Column<'a>, ()> {
fn from_slice<'a>(nodes: &'a [Node]) -> Result<Column<'a>, Error> {
match nodes.len() {
3 => {
let schema = nodes.first().and_then(from_node);
let table = nodes.get(1).and_then(from_node);
let name = nodes.get(2).and_then(from_node).ok_or(())?;
let name = nodes
.get(2)
.and_then(from_node)
.ok_or(Error::ColumnDecode)?;

Ok(Column {
schema,
Expand All @@ -149,7 +158,10 @@ impl<'a> TryFrom<&'a Option<NodeEnum>> for Column<'a> {

2 => {
let table = nodes.first().and_then(from_node);
let name = nodes.get(1).and_then(from_node).ok_or(())?;
let name = nodes
.get(1)
.and_then(from_node)
.ok_or(Error::ColumnDecode)?;

Ok(Column {
schema: None,
Expand All @@ -159,15 +171,18 @@ impl<'a> TryFrom<&'a Option<NodeEnum>> for Column<'a> {
}

1 => {
let name = nodes.first().and_then(from_node).ok_or(())?;
let name = nodes
.first()
.and_then(from_node)
.ok_or(Error::ColumnDecode)?;

Ok(Column {
name,
..Default::default()
})
}

_ => Err(()),
_ => Err(Error::ColumnDecode),
}
}

Expand All @@ -186,26 +201,26 @@ impl<'a> TryFrom<&'a Option<NodeEnum>> for Column<'a> {
if let Some(ref node) = list.arg {
Ok(Column::try_from(&node.node)?)
} else {
Err(())
Err(Error::ColumnDecode)
}
} else {
Err(())
Err(Error::ColumnDecode)
}
}

_ => Err(()),
_ => Err(Error::ColumnDecode),
}
}
}

impl<'a> TryFrom<&Option<&'a Node>> for Column<'a> {
type Error = ();
type Error = Error;

fn try_from(value: &Option<&'a Node>) -> Result<Self, Self::Error> {
if let Some(value) = value {
(*value).try_into()
} else {
Err(())
Err(Error::ColumnDecode)
}
}
}
Expand All @@ -224,7 +239,7 @@ impl<'a> From<&'a str> for Column<'a> {
mod test {
use pg_query::{parse, NodeEnum};

use super::Column;
use super::{Column, Error};

#[test]
fn test_column() {
Expand All @@ -236,7 +251,7 @@ mod test {
.cols
.iter()
.map(Column::try_from)
.collect::<Result<Vec<Column>, ()>>()
.collect::<Result<Vec<Column>, Error>>()
.unwrap();
assert_eq!(
columns,
Expand Down
12 changes: 12 additions & 0 deletions pgdog/src/frontend/router/parser/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,16 @@ pub enum Error {

#[error("prepared statement \"{0}\" doesn't exist")]
PreparedStatementDoesntExist(String),

#[error("column decode error")]
ColumnDecode,

#[error("table decode error")]
TableDecode,

#[error("parameter ${0} not in bind")]
BindParameterMissing(i32),

#[error("statement is not a SELECT")]
NotASelect,
}
2 changes: 1 addition & 1 deletion pgdog/src/frontend/router/parser/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<'a> Insert<'a> {
.cols
.iter()
.map(Column::try_from)
.collect::<Result<Vec<Column<'a>>, ()>>()
.collect::<Result<Vec<Column<'a>>, Error>>()
.ok()
.unwrap_or(vec![])
}
Expand Down
2 changes: 2 additions & 0 deletions pgdog/src/frontend/router/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod rewrite_plan;
pub mod route;
pub mod schema;
pub mod sequence;
pub mod statement;
pub mod table;
pub mod tuple;
pub mod value;
Expand Down Expand Up @@ -59,6 +60,7 @@ pub use rewrite_plan::{HelperKind, HelperMapping, QueryRewriter, RewriteOutput,
pub use route::{Route, Shard};
pub use schema::Schema;
pub use sequence::{OwnedSequence, Sequence};
pub use statement::StatementParser;
pub use table::{OwnedTable, Table};
pub use tuple::Tuple;
pub use value::Value;
Expand Down
53 changes: 18 additions & 35 deletions pgdog/src/frontend/router/parser/query/delete.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::frontend::router::parser::where_clause::TablesSource;

use super::shared::ConvergeAlgorithm;
use super::StatementParser;
use super::*;

impl QueryParser {
Expand All @@ -9,47 +7,32 @@ impl QueryParser {
stmt: &DeleteStmt,
context: &QueryParserContext,
) -> Result<Command, Error> {
let table = stmt.relation.as_ref().map(Table::from);

if let Some(table) = table {
// Schema-based sharding.
if let Some(schema) = context.sharding_schema.schemas.get(table.schema()) {
let shard: Shard = schema.shard().into();

let shard = StatementParser::from_delete(
stmt,
context.router_context.bind,
&context.sharding_schema,
self.recorder_mut(),
)
.shard()?;

let shard = match shard {
Some(shard) => {
if let Some(recorder) = self.recorder_mut() {
recorder.record_entry(
Some(shard.clone()),
format!("DELETE matched schema {}", schema.name()),
"DELETE matched WHERE clause for sharding key",
);
}

return Ok(Command::Query(Route::write(shard)));
shard
}

let source = TablesSource::from(table);
let where_clause = WhereClause::new(&source, &stmt.where_clause);

if let Some(where_clause) = where_clause {
let shards = Self::where_clause(
&context.sharding_schema,
&where_clause,
context.router_context.bind,
&mut self.explain_recorder,
)?;
let shard = Self::converge(&shards, ConvergeAlgorithm::default());
None => {
if let Some(recorder) = self.recorder_mut() {
recorder.record_entry(
Some(shard.clone()),
"DELETE matched WHERE clause for sharding key",
);
recorder.record_entry(None, "DELETE fell back to broadcast");
}
return Ok(Command::Query(Route::write(shard)));
Shard::default()
}
}
};

if let Some(recorder) = self.recorder_mut() {
recorder.record_entry(None, "DELETE fell back to broadcast");
}
Ok(Command::Query(Route::write(None)))
Ok(Command::Query(Route::write(shard)))
}
}
26 changes: 14 additions & 12 deletions pgdog/src/frontend/router/parser/query/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,29 @@ impl QueryParser {
));
}

let mut shards = HashSet::new();

let shard = StatementParser::from_select(
stmt,
context.router_context.bind,
&context.sharding_schema,
self.recorder_mut(),
)
.shard()?;
if let Some(shard) = shard {
shards.insert(shard);
}

// `SELECT NOW()`, `SELECT 1`, etc.
if stmt.from_clause.is_empty() {
if shards.is_empty() && stmt.from_clause.is_empty() {
return Ok(Command::Query(
Route::read(Some(round_robin::next() % context.shards)).set_write(writes),
));
}

let order_by = Self::select_sort(&stmt.sort_clause, context.router_context.bind);
let mut shards = HashSet::new();

let from_clause = TablesSource::from(FromClause::new(&stmt.from_clause));
let where_clause = WhereClause::new(&from_clause, &stmt.where_clause);

if let Some(ref where_clause) = where_clause {
shards = Self::where_clause(
&context.sharding_schema,
where_clause,
context.router_context.bind,
&mut self.explain_recorder,
)?;
}

// Schema-based sharding.
let mut schema_sharder = SchemaSharder::default();
Expand Down
Loading
Loading