Skip to content

Commit

Permalink
feat: enhance explain
Browse files Browse the repository at this point in the history
  • Loading branch information
kysshsy committed Oct 9, 2024
1 parent 5b14368 commit cc4262f
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 44 deletions.
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pgrx = "0.12.5"
serde = "1.0.210"
serde_json = "1.0.128"
signal-hook = "0.3.17"
sqlparser = "0.50.0"
sqlparser = { path = "/home/kyss/labs/sqlparser-rs" }
strum = { version = "0.26.3", features = ["derive"] }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "c32abb7" }
thiserror = "1.0.63"
Expand Down
18 changes: 18 additions & 0 deletions src/duckdb/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,21 @@ pub fn set_search_path(search_path: Vec<String>) -> Result<()> {

Ok(())
}

pub fn execute_explain(query: &str) -> Result<String> {
let conn = unsafe { &*get_global_connection().get() };
let mut stmt = conn.prepare(query)?;
let rows = stmt.query_row([], |row| {
let mut r = vec![];

let mut col_index = 1;
while let Ok(value) = row.get::<_, String>(col_index) {
r.push(value);
col_index += 1;
}

Ok(r)
})?;

Ok(rows.join(""))
}
42 changes: 3 additions & 39 deletions src/hooks/utility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

#![allow(clippy::too_many_arguments)]

use std::ffi::CString;
mod explain;

use anyhow::{bail, Result};
use pg_sys::NodeTag;
use pgrx::*;
use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser};

use super::query::*;
use explain::explain_query;

#[allow(deprecated)]
type ProcessUtilityHook = fn(
Expand Down Expand Up @@ -95,43 +95,6 @@ fn is_support_utility(stmt_type: NodeTag) -> bool {
stmt_type == pg_sys::NodeTag::T_ExplainStmt
}

fn explain_query(
query_string: &core::ffi::CStr,
stmt: *mut pg_sys::ExplainStmt,
dest: *mut pg_sys::DestReceiver,
) -> Result<bool> {
let query = unsafe { (*stmt).query as *mut pg_sys::Query };

let query_relations = get_query_relations(unsafe { (*query).rtable });
if unsafe { (*query).commandType } != pg_sys::CmdType::CMD_SELECT
|| !is_duckdb_query(&query_relations)
{
return Ok(true);
}

if unsafe { !(*stmt).options.is_null() } {
error!("the EXPLAIN options provided are not supported for DuckDB pushdown queries.");
}

unsafe {
let tstate = pg_sys::begin_tup_output_tupdesc(
dest,
pg_sys::ExplainResultDesc(stmt),
&pg_sys::TTSOpsVirtual,
);
let query = format!(
"DuckDB Scan: {}",
parse_query_from_utility_stmt(query_string)?
);
let query_c_str = CString::new(query)?;

pg_sys::do_text_output_multiline(tstate, query_c_str.as_ptr());
pg_sys::end_tup_output(tstate);
}

Ok(false)
}

fn parse_query_from_utility_stmt(query_string: &core::ffi::CStr) -> Result<String> {
let query_string = query_string.to_str()?;

Expand All @@ -146,6 +109,7 @@ fn parse_query_from_utility_stmt(query_string: &core::ffi::CStr) -> Result<Strin
verbose: _,
statement,
format: _,
options: _,
} => Ok(statement.to_string()),
_ => bail!("unexpected utility statement: {}", query_string),
}
Expand Down
155 changes: 155 additions & 0 deletions src/hooks/utility/explain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2023-2024 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::ffi::{CStr, CString};
use std::time::Instant;

use anyhow::Result;
use pgrx::{error, pg_sys};

use super::parse_query_from_utility_stmt;
use crate::{
duckdb::connection,
hooks::query::{get_query_relations, is_duckdb_query, set_search_path_by_pg},
};

enum Style {
Postgres,
Duckdb,
}
struct ExplainState {
analyze: bool,
style: Style,
}

pub fn explain_query(
query_string: &core::ffi::CStr,
stmt: *mut pg_sys::ExplainStmt,
dest: *mut pg_sys::DestReceiver,
) -> Result<bool> {
let query = unsafe { (*stmt).query as *mut pg_sys::Query };

let query_relations = get_query_relations(unsafe { (*query).rtable });
if unsafe { (*query).commandType } != pg_sys::CmdType::CMD_SELECT
|| !is_duckdb_query(&query_relations)
{
return Ok(true);
}

let state = parse_explain_options(unsafe { (*stmt).options });
let query = parse_query_from_utility_stmt(query_string)?;

let output = match state.style {
Style::Postgres => {
let mut output = format!("DuckDB Scan: {}\n", query);
if state.analyze {
let start_time = Instant::now();
set_search_path_by_pg()?;
connection::execute(&query, [])?;
let duration = start_time.elapsed();
output += &format!(
"Execution Time: {:.3} ms\n",
duration.as_micros() as f64 / 1_000.0
);
}
output
}
Style::Duckdb => {
set_search_path_by_pg()?;
let explain_query = if state.analyze {
format!("EXPLAIN ANALYZE {query}")
} else {
format!("EXPLAIN {query}")
};
connection::execute_explain(&explain_query)?
}
};

unsafe {
let tstate = pg_sys::begin_tup_output_tupdesc(
dest,
pg_sys::ExplainResultDesc(stmt),
&pg_sys::TTSOpsVirtual,
);

let output_cstr = CString::new(output)?;

pg_sys::do_text_output_multiline(tstate, output_cstr.as_ptr());
pg_sys::end_tup_output(tstate);
}

Ok(false)
}

fn parse_explain_options(options: *const pg_sys::List) -> ExplainState {
let mut explain_state = ExplainState {
analyze: false,
style: Style::Postgres,
};

if options.is_null() {
return explain_state;
}

unsafe {
let elements = (*options).elements;

for i in 0..(*options).length as isize {
let opt = (*elements.offset(i)).ptr_value as *mut pg_sys::DefElem;

let opt_name = match CStr::from_ptr((*opt).defname).to_str() {
Ok(opt) => opt,
Err(e) => {
error!("failed to parse EXPLAIN option name: {e}");
}
};
match opt_name {
"analyze" => {
explain_state.analyze = pg_sys::defGetBoolean(opt);
}
"style" => {
let style = match CStr::from_ptr(pg_sys::defGetString(opt)).to_str() {
Ok(style) => style,

Err(e) => {
error!("failed to parse STYLE option: {e}");
}
};

explain_state.style = match parse_explain_style(style) {
Some(s) => s,
None => {
error!("unrecognized STYLE option: {style}")
}
};
}
_ => error!("unrecognized EXPLAIN option \"{opt_name}\""),
}
}
}

explain_state
}

fn parse_explain_style(style: &str) -> Option<Style> {
match style {
"pg" => Some(Style::Postgres),
"postgres" => Some(Style::Postgres),
"duckdb" => Some(Style::Duckdb),
_ => None,
}
}

0 comments on commit cc4262f

Please sign in to comment.