Skip to content

Commit

Permalink
feat: Support multilevel partition tables and foreign tables in Execu…
Browse files Browse the repository at this point in the history
…tor hook for DuckDB

Signed-off-by: shamb0 <r.raajey@gmail.com>
  • Loading branch information
shamb0 committed Aug 29, 2024
1 parent 387c744 commit a1ae9c3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 78 deletions.
51 changes: 11 additions & 40 deletions src/hooks/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,56 +49,27 @@ pub async fn executor_run(
prev_hook(query_desc, direction, count, execute_once);
return Ok(());
}

let ps = query_desc.plannedstmt;
let rtable = unsafe { (*ps).rtable };
let query = get_current_query(ps, unsafe { CStr::from_ptr(query_desc.sourceText) })?;
let query_relations = get_query_relations(ps);
let is_duckdb_query = !query_relations.is_empty()
&& query_relations.iter().all(|pg_relation| {
if pg_relation.is_foreign_table() {
let foreign_table = unsafe { pg_sys::GetForeignTable(pg_relation.oid()) };
let foreign_server = unsafe { pg_sys::GetForeignServer((*foreign_table).serverid) };
let fdw_handler = FdwHandler::from(foreign_server);
fdw_handler != FdwHandler::Other
} else {
false
}
});
let query =
QueryInterceptor::get_current_query(ps, unsafe { CStr::from_ptr(query_desc.sourceText) })?;
let query_relations = QueryInterceptor::get_query_relations(ps);

if rtable.is_null()
|| query_desc.operation != pg_sys::CmdType::CMD_SELECT
|| !is_duckdb_query
// Tech Debt: Find a less hacky way to let COPY/CREATE go through
|| query.to_lowercase().starts_with("copy")
|| query.to_lowercase().starts_with("create")
{
pgrx::warning!(
"query_relations.is_empty() :: {:#?}",
query_relations.is_empty()
);

if !QueryExecutor::should_use_duckdb(&query_desc, &query, &query_relations) {
prev_hook(query_desc, direction, count, execute_once);
pgrx::warning!("pga:: *** ExtensionHook::executor_run() Y Stg-01 ***");
return Ok(());
}

// Set DuckDB search path according search path in Postgres
// Make sure it could find unqualified relations.
set_search_path_by_pg()?;

match connection::create_arrow(query.as_str()) {
Err(err) => {
connection::clear_arrow();
fallback_warning!(err.to_string());
prev_hook(query_desc, direction, count, execute_once);
return Ok(());
}
Ok(false) => {
connection::clear_arrow();
return Ok(());
}
_ => {}
}

// Set DuckDB search path according search path in Postgres
// Make sure it could find unqualified relations.
set_search_path_by_pg()?;

match QueryTransformer::transform_query_for_duckdb(&query, &query_relations) {
Ok(transformed_queries) => {
if transformed_queries.len() > 1 {
Expand Down
43 changes: 5 additions & 38 deletions src/hooks/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use anyhow::Result;
use crate::duckdb::connection;
use crate::fdw::handler::FdwHandler;
use crate::schema::cell::*;
use anyhow::{anyhow, Error, Result};
use duckdb::arrow::array::RecordBatch;
use pgrx::*;
use sqlparser::ast::{BinaryOperator, Expr, Query, SetExpr, Statement, TableFactor, Value};
use sqlparser::dialect::PostgreSqlDialect;
Expand Down Expand Up @@ -566,40 +570,3 @@ fn get_postgres_search_path() -> Vec<String> {
schema_vec
}

pub fn set_search_path_by_pg() -> Result<()> {
let mut search_path = get_postgres_search_path();
let duckdb_schemas = connection::get_available_schemas()?;

// Filter schemas. If one of schemas doesn't exist, it will cause the DuckDB 'SET search_path' to fail.
search_path.retain(|schema| duckdb_schemas.contains(schema));

// Set duckdb catalog search path
connection::set_search_path(search_path)?;

Ok(())
}

fn get_postgres_search_path() -> Vec<String> {
let active_schemas =
unsafe { PgList::<pg_sys::Oid>::from_pg(pg_sys::fetch_search_path(false)) };

let mut schema_vec: Vec<String> = Vec::with_capacity(active_schemas.len());
for schema_oid in active_schemas.iter_oid() {
let tuple = unsafe {
pg_sys::SearchSysCache1(
pg_sys::SysCacheIdentifier::NAMESPACEOID as i32,
schema_oid.into_datum().unwrap(),
)
};

if !tuple.is_null() {
let pg_namespace = unsafe { pg_sys::GETSTRUCT(tuple) as pg_sys::Form_pg_namespace };
let name = pg_sys::name_data_to_str(unsafe { &(*pg_namespace).nspname });
schema_vec.push(name.to_string());

unsafe { pg_sys::ReleaseSysCache(tuple) };
}
}

schema_vec
}

0 comments on commit a1ae9c3

Please sign in to comment.