Skip to content

Commit

Permalink
Add optimizer for index(wip).
Browse files Browse the repository at this point in the history
  • Loading branch information
gtnao committed May 6, 2024
1 parent 9055071 commit 9e02b58
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 2 deletions.
56 changes: 56 additions & 0 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
buffer::BufferPoolManager,
common::{PageID, TransactionID},
concurrency::TransactionManager,
index::Index,
lock::LockManager,
log::{LogManager, LogRecordBody, NewBPlusTreeLeafPage, NewTablePage},
page::{
Expand Down Expand Up @@ -300,6 +301,61 @@ impl Catalog {
}
Ok(schema)
}
pub fn get_index(&self, index_id: i64, txn_id: TransactionID) -> Result<Index> {
let system_indexes_table =
self.system_table_heap(PageID(SYSTEM_INDEXES_FIRST_PAGE_ID.0), txn_id);
for tuple in system_indexes_table.iter() {
let values = tuple.values(&Self::system_indexes_schema());
if let Value::Integer(IntegerValue(id)) = values[0] {
if id == index_id {
return Ok(Index::from_system_table(values)?);
}
}
}
Err(anyhow::anyhow!("index not found"))
}
pub fn get_indexes_by_table_name(
&self,
table_name: &str,
txn_id: TransactionID,
) -> Result<Vec<Index>> {
let system_index_columns_table =
self.system_table_heap(PageID(SYSTEM_INDEX_COLUMNS_FIRST_PAGE_ID.0), txn_id);
let mut columns = Vec::new();
for tuple in system_index_columns_table.iter() {
let values = tuple.values(&Self::system_index_columns_schema());
let id = if let Value::Integer(id) = &values[0] {
id.0
} else {
return Err(anyhow::anyhow!("Invalid id"));
};
let column_name = if let Value::Varchar(column_name) = &values[1] {
column_name.0.clone()
} else {
return Err(anyhow::anyhow!("Invalid column_name"));
};
columns.push((id, column_name));
}

let system_indexes_table =
self.system_table_heap(PageID(SYSTEM_INDEXES_FIRST_PAGE_ID.0), txn_id);
let mut indexes = Vec::new();
for tuple in system_indexes_table.iter() {
let values = tuple.values(&Self::system_indexes_schema());
if let Value::Varchar(VarcharValue(name)) = &values[2] {
if name == table_name {
let mut index = Index::from_system_table(values)?;
for (id, column_name) in columns.iter() {
if *id == index.id {
index.add_columns(column_name.clone());
}
}
indexes.push(index);
}
}
}
Ok(indexes)
}

fn create_empty_system_table(&self, txn_id: TransactionID) -> Result<()> {
let page = self
Expand Down
11 changes: 10 additions & 1 deletion src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use self::{
delete_executor::DeleteExecutor,
empty_row_executor::EmptyRowExecutor,
filter_executor::FilterExecutor,
index_scan_executor::IndexScanExecutor,
insert_executor::InsertExecutor,
limit_executor::LimitExecutor,
nested_loop_join_executor::NestedLoopJoinExecutor,
Expand All @@ -26,6 +27,7 @@ mod aggregate_executor;
mod delete_executor;
mod empty_row_executor;
mod filter_executor;
mod index_scan_executor;
mod insert_executor;
mod limit_executor;
mod nested_loop_join_executor;
Expand Down Expand Up @@ -77,7 +79,11 @@ impl ExecutorEngine {
)
.iter(),
}),
Plan::IndexScan(_) => unimplemented!(),
Plan::IndexScan(index_plan) => Executor::IndexScan(IndexScanExecutor {
plan: index_plan.clone(),
executor_context: &self.context,
index_id: index_plan.index_id,
}),
Plan::Filter(filter_plan) => Executor::Filter(FilterExecutor {
plan: filter_plan.clone(),
child: Box::new(self.create_executor(&plan.children()[0])),
Expand Down Expand Up @@ -193,6 +199,7 @@ impl ExecutorEngine {

pub enum Executor<'a> {
SeqScan(SeqScanExecutor<'a>),
IndexScan(IndexScanExecutor<'a>),
Filter(FilterExecutor<'a>),
Project(ProjectExecutor<'a>),
NestedLoopJoin(NestedLoopJoinExecutor<'a>),
Expand All @@ -208,6 +215,7 @@ impl Executor<'_> {
pub fn init(&mut self) -> Result<()> {
match self {
Executor::SeqScan(executor) => executor.init(),
Executor::IndexScan(executor) => executor.init(),
Executor::Filter(executor) => executor.init(),
Executor::Project(executor) => executor.init(),
Executor::NestedLoopJoin(executor) => executor.init(),
Expand All @@ -223,6 +231,7 @@ impl Executor<'_> {
pub(crate) fn next(&mut self) -> Result<Option<Tuple>> {
match self {
Executor::SeqScan(executor) => executor.next(),
Executor::IndexScan(executor) => executor.next(),
Executor::Filter(executor) => executor.next(),
Executor::Project(executor) => executor.next(),
Executor::NestedLoopJoin(executor) => executor.next(),
Expand Down
27 changes: 27 additions & 0 deletions src/executor/index_scan_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Result;

use crate::{plan::IndexScanPlan, tuple::Tuple};

use super::ExecutorContext;

pub struct IndexScanExecutor<'a> {
pub plan: IndexScanPlan,
pub executor_context: &'a ExecutorContext,
pub index_id: i64,
}

impl IndexScanExecutor<'_> {
pub fn init(&mut self) -> Result<()> {
let index = self
.executor_context
.catalog
.lock()
.map_err(|_| anyhow::anyhow!("Catalog lock error"))?
.get_index(self.index_id, self.executor_context.transaction_id)?;
println!("Index: {:?}", index);
Ok(())
}
pub fn next(&mut self) -> Result<Option<Tuple>> {
Ok(None)
}
}
47 changes: 47 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use anyhow::Result;

use crate::{common::PageID, value::Value};

#[derive(Debug)]
pub struct Index {
pub id: i64,
pub name: String,
pub table_name: String,
pub first_page_id: PageID,
pub columns: Vec<String>,
}

impl Index {
pub fn from_system_table(values: Vec<Value>) -> Result<Self> {
let id = if let Value::Integer(id) = &values[0] {
id.0
} else {
return Err(anyhow::anyhow!("Invalid id"));
};
let name = if let Value::Varchar(name) = &values[1] {
name.0.clone()
} else {
return Err(anyhow::anyhow!("Invalid name"));
};
let table_name = if let Value::Varchar(table_name) = &values[2] {
table_name.0.clone()
} else {
return Err(anyhow::anyhow!("Invalid table_name"));
};
let first_page_id = if let Value::Integer(first_page_id) = &values[3] {
PageID(first_page_id.0 as u32)
} else {
return Err(anyhow::anyhow!("Invalid first_page_id"));
};
Ok(Self {
id,
name,
table_name,
first_page_id,
columns: Vec::new(),
})
}
pub fn add_columns(&mut self, column: String) {
self.columns.push(column);
}
}
3 changes: 3 additions & 0 deletions src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
executor::{ExecutorContext, ExecutorEngine},
lock::LockManager,
log::LogManager,
optimizer::Optimizer,
parser::{CreateIndexStatementAST, CreateTableStatementAST, StatementAST},
plan::Planner,
recovery::RecoveryManager,
Expand Down Expand Up @@ -131,6 +132,8 @@ impl Instance {
let bound_statement = binder.bind_statement(statement)?;
let planner = Planner::new(bound_statement);
let plan = planner.plan();
let optimizer = Optimizer::new(self.catalog.clone(), txn_id);
let plan = optimizer.optimize(plan)?;
let schema = plan.schema().clone();
let executor_context = ExecutorContext {
transaction_id: txn_id,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ pub mod common;
pub mod concurrency;
pub mod disk;
pub mod executor;
pub mod index;
pub mod instance;
pub mod lexer;
pub mod lock;
pub mod log;
pub mod optimizer;
pub mod page;
pub mod parser;
pub mod plan;
Expand Down
58 changes: 58 additions & 0 deletions src/optimizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::{Arc, Mutex};

use anyhow::Result;

use crate::{
binder::BoundExpressionAST,
catalog::Catalog,
common::TransactionID,
plan::{IndexScanPlan, Plan},
};

pub struct Optimizer {
catalog: Arc<Mutex<Catalog>>,
txn_id: TransactionID,
}
impl Optimizer {
pub fn new(catalog: Arc<Mutex<Catalog>>, txn_id: TransactionID) -> Self {
Self { catalog, txn_id }
}
pub fn optimize(&self, plan: Plan) -> Result<Plan> {
let mut optimized_plan = plan;
optimized_plan = self.optimize_filter_index_scan(optimized_plan)?;
Ok(optimized_plan)
}
fn optimize_filter_index_scan(&self, plan: Plan) -> Result<Plan> {
let mut children = Vec::new();
for child in plan.children() {
let optimized_child = self.optimize_filter_index_scan(*child)?;
children.push(optimized_child);
}
let mut source_plan = plan.clone();
source_plan.set_children(children);
if let Plan::Filter(filter_plan) = plan {
if let BoundExpressionAST::Binary(binary_expression) = filter_plan.condition {
if let BoundExpressionAST::Path(path_expression) = *binary_expression.left {
let indexes = self
.catalog
.lock()
.map_err(|_| anyhow::anyhow!("Catalog lock error"))?
.get_indexes_by_table_name(&path_expression.table_name, self.txn_id)?;
for index in indexes {
if index.columns.len() == 1
&& index.columns[0] == path_expression.column_name
{
if let BoundExpressionAST::Literal(_) = *binary_expression.right {
return Ok(Plan::IndexScan(IndexScanPlan {
index_id: index.id,
schema: filter_plan.schema,
}));
}
}
}
}
}
}
Ok(source_plan)
}
}
20 changes: 19 additions & 1 deletion src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ impl Plan {
Plan::Update(plan) => vec![plan.child.clone()],
}
}
pub fn set_children(&mut self, children: Vec<Plan>) {
match self {
Plan::SeqScan(_) => {}
Plan::IndexScan(_) => {}
Plan::Filter(plan) => plan.child = Box::new(children[0].clone()),
Plan::Project(plan) => plan.child = Box::new(children[0].clone()),
Plan::NestedLoopJoin(plan) => {
plan.children = children.into_iter().map(Box::new).collect()
}
Plan::Aggregate(plan) => plan.child = Box::new(children[0].clone()),
Plan::Sort(plan) => plan.child = Box::new(children[0].clone()),
Plan::Limit(plan) => plan.child = Box::new(children[0].clone()),
Plan::EmptyRow(_) => {}
Plan::Insert(_) => {}
Plan::Delete(plan) => plan.child = Box::new(children[0].clone()),
Plan::Update(plan) => plan.child = Box::new(children[0].clone()),
}
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct SeqScanPlan {
Expand All @@ -67,7 +85,7 @@ pub struct SeqScanPlan {
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct IndexScanPlan {
pub root_page_id: PageID,
pub index_id: i64,
pub schema: Schema,
}
#[derive(Debug, PartialEq, Eq, Clone)]
Expand Down

0 comments on commit 9e02b58

Please sign in to comment.