Skip to content

Commit

Permalink
test: add cross-product test and enable more explain options (#50)
Browse files Browse the repository at this point in the history
* test: add cross-product test and enable more options

Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>

* add tasks to use datafusion optimizer

Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>

* change join_enumerate task types

Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>

* restore planner tests

Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>

---------

Signed-off-by: Yuchen Liang <yuchenl3@andrew.cmu.edu>
  • Loading branch information
yliang412 authored Feb 11, 2024
1 parent 2694b9a commit d36dfd5
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 45 deletions.
27 changes: 27 additions & 0 deletions optd-sqlplannertest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
= Usage

**Update the test cases**

```shell
cargo run -p optd-sqlplannertest --bin planner_test_apply
```

**Verify the test cases**

```shell
cargo test -p optd-sqlplannertest
# or use nextest
cargo nextest run -p optd-sqlplannertest
```

The `explain` and `execute` task will be run with datafusion's logical optimizer disabled. To keep using datafusion's logical optimizer, you could use the `execute_with_logical` and `explain_with_logical` tasks instead.

Currently we have the following options for the explain task:

- `logical_datafusion`: datafusion's logical plan.
- `logical_optd`: optd's logical plan before optimization.
- `physical_optd`: optd's physical plan after optimization.
- `physical_datafusion`: datafusion's physical plan.
- `join_orders`: physical join orders.
- `logical_join_orders`: logical join orders.

177 changes: 137 additions & 40 deletions optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@ use async_trait::async_trait;
#[derive(Default)]
pub struct DatafusionDb {
ctx: SessionContext,
/// Context enabling datafusion's logical optimizer.
with_logical_ctx: SessionContext,
}

impl DatafusionDb {
pub async fn new() -> Result<Self> {
let ctx = DatafusionDb::new_session_ctx(false).await?;
let with_logical_ctx = DatafusionDb::new_session_ctx(true).await?;
Ok(Self {
ctx,
with_logical_ctx,
})
}

/// Creates a new session context. If the `with_logical` flag is set, datafusion's logical optimizer will be used.
async fn new_session_ctx(with_logical: bool) -> Result<SessionContext> {
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
session_config.options_mut().optimizer.max_passes = 0;
if !with_logical {
session_config.options_mut().optimizer.max_passes = 0;
}

let rn_config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
Expand All @@ -36,26 +50,37 @@ impl DatafusionDb {
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
state.catalog_list(),
)));
// clean up optimizer rules so that we can plug in our own optimizer
state = state.with_optimizer_rules(vec![]);
state = state.with_physical_optimizer_rules(vec![]);
if !with_logical {
// clean up optimizer rules so that we can plug in our own optimizer
state = state.with_optimizer_rules(vec![]);
state = state.with_physical_optimizer_rules(vec![]);
}
// use optd-bridge query planner
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
SessionContext::new_with_state(state)
};
ctx.refresh_catalogs().await?;
Ok(Self { ctx })
Ok(ctx)
}

async fn execute(&self, sql: &str) -> Result<Vec<Vec<String>>> {
async fn execute(&self, sql: &str, with_logical: bool) -> Result<Vec<Vec<String>>> {
let sql = unescape_input(sql)?;
let dialect = Box::new(GenericDialect);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
let mut result = Vec::new();
for statement in statements {
let plan = self.ctx.state().statement_to_plan(statement).await?;
let df = if with_logical {
let plan = self
.with_logical_ctx
.state()
.statement_to_plan(statement)
.await?;
self.with_logical_ctx.execute_logical_plan(plan).await?
} else {
let plan = self.ctx.state().statement_to_plan(statement).await?;
self.ctx.execute_logical_plan(plan).await?
};

let df = self.ctx.execute_logical_plan(plan).await?;
let batches = df.collect().await?;

let options = FormatOptions::default();
Expand All @@ -79,53 +104,125 @@ impl DatafusionDb {
}
Ok(result)
}

/// Executes the `execute` task.
async fn task_execute(&mut self, r: &mut String, sql: &str, with_logical: bool) -> Result<()> {
use std::fmt::Write;
let result = self.execute(&sql, with_logical).await?;
writeln!(r, "{}", result.into_iter().map(|x| x.join(" ")).join("\n"))?;
writeln!(r)?;
Ok(())
}

/// Executes the `explain` task.
async fn task_explain(
&mut self,
r: &mut String,
sql: &str,
task: &str,
with_logical: bool,
) -> Result<()> {
use std::fmt::Write;

let result = self
.execute(&format!("explain {}", &sql), with_logical)
.await?;
let subtask_start_pos = if with_logical {
"explain_with_logical:".len()
} else {
"explain:".len()
};
for subtask in task[subtask_start_pos..].split(",") {
let subtask = subtask.trim();
if subtask == "logical_datafusion" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "logical_plan after datafusion")
.map(|x| &x[1])
.unwrap()
)?;
} else if subtask == "logical_optd" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "logical_plan after optd")
.map(|x| &x[1])
.unwrap()
)?;
} else if subtask == "physical_optd" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "physical_plan after optd")
.map(|x| &x[1])
.unwrap()
)?;
} else if subtask == "join_orders" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "physical_plan after optd-all-join-orders")
.map(|x| &x[1])
.unwrap()
)?;
writeln!(r)?;
} else if subtask == "logical_join_orders" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "physical_plan after optd-all-logical-join-orders")
.map(|x| &x[1])
.unwrap()
)?;
writeln!(r)?;
} else if subtask == "physical_datafusion" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "physical_plan")
.map(|x| &x[1])
.unwrap()
)?;
}
}

Ok(())
}
}

#[async_trait]
impl sqlplannertest::PlannerTestRunner for DatafusionDb {
async fn run(&mut self, test_case: &sqlplannertest::ParsedTestCase) -> Result<String> {
for before in &test_case.before_sql {
self.execute(before)
self.execute(before, true)
.await
.context("before execution error")?;
}

use std::fmt::Write;
let mut result = String::new();
let r = &mut result;
for task in &test_case.tasks {
if task == "execute" {
let result = self.execute(&test_case.sql).await?;
writeln!(r, "{}", result.into_iter().map(|x| x.join(" ")).join("\n"))?;
writeln!(r)?;
self.task_execute(r, &test_case.sql, false).await?;
} else if task == "execute_with_logical" {
self.task_execute(r, &test_case.sql, true).await?;
} else if task.starts_with("explain:") {
let result = self.execute(&format!("explain {}", test_case.sql)).await?;
for subtask in task["explain:".len()..].split(",") {
let subtask = subtask.trim();
if subtask == "join_orders" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "physical_plan after optd-all-join-orders")
.map(|x| &x[1])
.unwrap()
)?;
writeln!(r)?;
} else if subtask == "logical_join_orders" {
writeln!(
r,
"{}",
result
.iter()
.find(|x| x[0] == "physical_plan after optd-all-logical-join-orders")
.map(|x| &x[1])
.unwrap()
)?;
writeln!(r)?;
}
}
self.task_explain(r, &test_case.sql, task, false).await?;
} else if task.starts_with("explain_with_logical:") {
self.task_explain(r, &test_case.sql, task, true).await?;
}
}
Ok(result)
Expand Down
34 changes: 34 additions & 0 deletions optd-sqlplannertest/tests/cross_product.planner.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- (no id or description)
create table t1(t1v1 int);
create table t2(t2v1 int);
insert into t1 values (0), (1), (2);
insert into t2 values (0), (1), (2);

/*
3
3
*/

-- Test optimizer logical for a cross product.
select * from t1, t2;

/*
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalJoin { join_type: Cross, cond: true }
├── LogicalScan { table: t1 }
└── LogicalScan { table: t2 }
PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalNestedLoopJoin { join_type: Cross, cond: true }
├── PhysicalScan { table: t1 }
└── PhysicalScan { table: t2 }
0 0
0 1
0 2
1 0
1 1
1 2
2 0
2 1
2 2
*/

14 changes: 14 additions & 0 deletions optd-sqlplannertest/tests/cross_product.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
- sql: |
create table t1(t1v1 int);
create table t2(t2v1 int);
insert into t1 values (0), (1), (2);
insert into t2 values (0), (1), (2);
tasks:
- execute
- sql: |
select * from t1, t2;
desc: Test optimizer logical for a cross product.
tasks:
- explain:logical_optd,physical_optd
- execute

46 changes: 46 additions & 0 deletions optd-sqlplannertest/tests/empty_relation.planner.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ select 64 + 1;
select 64 + 1 from t1;

/*
LogicalProjection
├── exprs:Add
│ ├── 64
│ └── 1
└── LogicalEmptyRelation { produce_one_row: true }
PhysicalProjection
├── exprs:Add
│ ├── 64
│ └── 1
└── PhysicalEmptyRelation { produce_one_row: true }
65
65
65
Expand All @@ -26,5 +36,41 @@ select 64+1 from t1 inner join t2 on false;
select 64+1 from t1 inner join t2 on 1=0;

/*
LogicalProjection { exprs: [ #0, #1, #2, #3 ] }
└── LogicalJoin { join_type: Inner, cond: true }
├── LogicalScan { table: t1 }
└── LogicalScan { table: t2 }
PhysicalProjection { exprs: [ #0, #1, #2, #3 ] }
└── PhysicalProjection { exprs: [ #2, #3, #0, #1 ] }
└── PhysicalNestedLoopJoin { join_type: Inner, cond: true }
├── PhysicalScan { table: t2 }
└── PhysicalScan { table: t1 }
0 0 0 200
0 0 1 201
0 0 2 202
1 1 0 200
1 1 1 201
1 1 2 202
2 2 0 200
2 2 1 201
2 2 2 202
65
65
65
65
65
65
65
65
65
65
65
65
65
65
65
65
65
65
*/

14 changes: 14 additions & 0 deletions optd-sqlplannertest/tests/join_enumerate.planner.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ insert into t3 values (0, 300), (1, 301), (2, 302);
select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2;

/*
(Join t2 (Join t1 t3))
(Join t2 (Join t3 t1))
(Join t3 (Join t1 t2))
(Join t3 (Join t2 t1))
(Join (Join t1 t2) t3)
(Join (Join t1 t3) t2)
(Join (Join t2 t1) t3)
(Join (Join t3 t1) t2)
0 200 0 0 0 300
1 201 1 1 1 301
Expand All @@ -27,7 +34,14 @@ select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2;
select * from t1, t2, t3 where t1v1 = t2v1 and t1v2 = t3v2;

/*
(Join t2 (Join t1 t3))
(Join t2 (Join t3 t1))
(Join t3 (Join t1 t2))
(Join t3 (Join t2 t1))
(Join (Join t1 t2) t3)
(Join (Join t1 t3) t2)
(Join (Join t2 t1) t3)
(Join (Join t3 t1) t2)
0 0 0 200 0 300
1 1 1 201 1 301
Expand Down
Loading

0 comments on commit d36dfd5

Please sign in to comment.