From a1ae9c37449ec816bdb42cf9e5dee3d137138418 Mon Sep 17 00:00:00 2001 From: shamb0 Date: Thu, 29 Aug 2024 22:57:14 +0530 Subject: [PATCH] feat: Support multilevel partition tables and foreign tables in Executor hook for DuckDB Signed-off-by: shamb0 --- src/hooks/executor.rs | 51 ++++++++++--------------------------------- src/hooks/query.rs | 43 +++++------------------------------- 2 files changed, 16 insertions(+), 78 deletions(-) diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index 8560200f..ff2cd678 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -49,31 +49,20 @@ pub async fn executor_run( prev_hook(query_desc, direction, count, execute_once); return Ok(()); } - + let ps = query_desc.plannedstmt; - let rtable = unsafe { (*ps).rtable }; - let query = get_current_query(ps, unsafe { CStr::from_ptr(query_desc.sourceText) })?; - let query_relations = get_query_relations(ps); - let is_duckdb_query = !query_relations.is_empty() - && query_relations.iter().all(|pg_relation| { - if pg_relation.is_foreign_table() { - let foreign_table = unsafe { pg_sys::GetForeignTable(pg_relation.oid()) }; - let foreign_server = unsafe { pg_sys::GetForeignServer((*foreign_table).serverid) }; - let fdw_handler = FdwHandler::from(foreign_server); - fdw_handler != FdwHandler::Other - } else { - false - } - }); + let query = + QueryInterceptor::get_current_query(ps, unsafe { CStr::from_ptr(query_desc.sourceText) })?; + let query_relations = QueryInterceptor::get_query_relations(ps); - if rtable.is_null() - || query_desc.operation != pg_sys::CmdType::CMD_SELECT - || !is_duckdb_query - // Tech Debt: Find a less hacky way to let COPY/CREATE go through - || query.to_lowercase().starts_with("copy") - || query.to_lowercase().starts_with("create") - { + pgrx::warning!( + "query_relations.is_empty() :: {:#?}", + query_relations.is_empty() + ); + + if !QueryExecutor::should_use_duckdb(&query_desc, &query, &query_relations) { prev_hook(query_desc, direction, count, execute_once); + pgrx::warning!("pga:: *** ExtensionHook::executor_run() Y Stg-01 ***"); return Ok(()); } @@ -81,24 +70,6 @@ pub async fn executor_run( // Make sure it could find unqualified relations. set_search_path_by_pg()?; - match connection::create_arrow(query.as_str()) { - Err(err) => { - connection::clear_arrow(); - fallback_warning!(err.to_string()); - prev_hook(query_desc, direction, count, execute_once); - return Ok(()); - } - Ok(false) => { - connection::clear_arrow(); - return Ok(()); - } - _ => {} - } - - // Set DuckDB search path according search path in Postgres - // Make sure it could find unqualified relations. - set_search_path_by_pg()?; - match QueryTransformer::transform_query_for_duckdb(&query, &query_relations) { Ok(transformed_queries) => { if transformed_queries.len() > 1 { diff --git a/src/hooks/query.rs b/src/hooks/query.rs index 5362190f..3ca0e77f 100644 --- a/src/hooks/query.rs +++ b/src/hooks/query.rs @@ -15,7 +15,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use anyhow::Result; +use crate::duckdb::connection; +use crate::fdw::handler::FdwHandler; +use crate::schema::cell::*; +use anyhow::{anyhow, Error, Result}; +use duckdb::arrow::array::RecordBatch; use pgrx::*; use sqlparser::ast::{BinaryOperator, Expr, Query, SetExpr, Statement, TableFactor, Value}; use sqlparser::dialect::PostgreSqlDialect; @@ -566,40 +570,3 @@ fn get_postgres_search_path() -> Vec { schema_vec } -pub fn set_search_path_by_pg() -> Result<()> { - let mut search_path = get_postgres_search_path(); - let duckdb_schemas = connection::get_available_schemas()?; - - // Filter schemas. If one of schemas doesn't exist, it will cause the DuckDB 'SET search_path' to fail. - search_path.retain(|schema| duckdb_schemas.contains(schema)); - - // Set duckdb catalog search path - connection::set_search_path(search_path)?; - - Ok(()) -} - -fn get_postgres_search_path() -> Vec { - let active_schemas = - unsafe { PgList::::from_pg(pg_sys::fetch_search_path(false)) }; - - let mut schema_vec: Vec = Vec::with_capacity(active_schemas.len()); - for schema_oid in active_schemas.iter_oid() { - let tuple = unsafe { - pg_sys::SearchSysCache1( - pg_sys::SysCacheIdentifier::NAMESPACEOID as i32, - schema_oid.into_datum().unwrap(), - ) - }; - - if !tuple.is_null() { - let pg_namespace = unsafe { pg_sys::GETSTRUCT(tuple) as pg_sys::Form_pg_namespace }; - let name = pg_sys::name_data_to_str(unsafe { &(*pg_namespace).nspname }); - schema_vec.push(name.to_string()); - - unsafe { pg_sys::ReleaseSysCache(tuple) }; - } - } - - schema_vec -}