diff --git a/src/catalog.rs b/src/catalog.rs index 5816926..2efe5bb 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -6,6 +6,7 @@ use crate::{ buffer::BufferPoolManager, common::{PageID, TransactionID}, concurrency::TransactionManager, + index::Index, lock::LockManager, log::{LogManager, LogRecordBody, NewBPlusTreeLeafPage, NewTablePage}, page::{ @@ -300,6 +301,61 @@ impl Catalog { } Ok(schema) } + pub fn get_index(&self, index_id: i64, txn_id: TransactionID) -> Result { + let system_indexes_table = + self.system_table_heap(PageID(SYSTEM_INDEXES_FIRST_PAGE_ID.0), txn_id); + for tuple in system_indexes_table.iter() { + let values = tuple.values(&Self::system_indexes_schema()); + if let Value::Integer(IntegerValue(id)) = values[0] { + if id == index_id { + return Ok(Index::from_system_table(values)?); + } + } + } + Err(anyhow::anyhow!("index not found")) + } + pub fn get_indexes_by_table_name( + &self, + table_name: &str, + txn_id: TransactionID, + ) -> Result> { + let system_index_columns_table = + self.system_table_heap(PageID(SYSTEM_INDEX_COLUMNS_FIRST_PAGE_ID.0), txn_id); + let mut columns = Vec::new(); + for tuple in system_index_columns_table.iter() { + let values = tuple.values(&Self::system_index_columns_schema()); + let id = if let Value::Integer(id) = &values[0] { + id.0 + } else { + return Err(anyhow::anyhow!("Invalid id")); + }; + let column_name = if let Value::Varchar(column_name) = &values[1] { + column_name.0.clone() + } else { + return Err(anyhow::anyhow!("Invalid column_name")); + }; + columns.push((id, column_name)); + } + + let system_indexes_table = + self.system_table_heap(PageID(SYSTEM_INDEXES_FIRST_PAGE_ID.0), txn_id); + let mut indexes = Vec::new(); + for tuple in system_indexes_table.iter() { + let values = tuple.values(&Self::system_indexes_schema()); + if let Value::Varchar(VarcharValue(name)) = &values[2] { + if name == table_name { + let mut index = Index::from_system_table(values)?; + for (id, column_name) in columns.iter() { + if *id == index.id { + index.add_columns(column_name.clone()); + } + } + indexes.push(index); + } + } + } + Ok(indexes) + } fn create_empty_system_table(&self, txn_id: TransactionID) -> Result<()> { let page = self diff --git a/src/executor.rs b/src/executor.rs index 837d12d..fa4bc21 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -13,6 +13,7 @@ use self::{ delete_executor::DeleteExecutor, empty_row_executor::EmptyRowExecutor, filter_executor::FilterExecutor, + index_scan_executor::IndexScanExecutor, insert_executor::InsertExecutor, limit_executor::LimitExecutor, nested_loop_join_executor::NestedLoopJoinExecutor, @@ -26,6 +27,7 @@ mod aggregate_executor; mod delete_executor; mod empty_row_executor; mod filter_executor; +mod index_scan_executor; mod insert_executor; mod limit_executor; mod nested_loop_join_executor; @@ -77,7 +79,11 @@ impl ExecutorEngine { ) .iter(), }), - Plan::IndexScan(_) => unimplemented!(), + Plan::IndexScan(index_plan) => Executor::IndexScan(IndexScanExecutor { + plan: index_plan.clone(), + executor_context: &self.context, + index_id: index_plan.index_id, + }), Plan::Filter(filter_plan) => Executor::Filter(FilterExecutor { plan: filter_plan.clone(), child: Box::new(self.create_executor(&plan.children()[0])), @@ -193,6 +199,7 @@ impl ExecutorEngine { pub enum Executor<'a> { SeqScan(SeqScanExecutor<'a>), + IndexScan(IndexScanExecutor<'a>), Filter(FilterExecutor<'a>), Project(ProjectExecutor<'a>), NestedLoopJoin(NestedLoopJoinExecutor<'a>), @@ -208,6 +215,7 @@ impl Executor<'_> { pub fn init(&mut self) -> Result<()> { match self { Executor::SeqScan(executor) => executor.init(), + Executor::IndexScan(executor) => executor.init(), Executor::Filter(executor) => executor.init(), Executor::Project(executor) => executor.init(), Executor::NestedLoopJoin(executor) => executor.init(), @@ -223,6 +231,7 @@ impl Executor<'_> { pub(crate) fn next(&mut self) -> Result> { match self { Executor::SeqScan(executor) => executor.next(), + Executor::IndexScan(executor) => executor.next(), Executor::Filter(executor) => executor.next(), Executor::Project(executor) => executor.next(), Executor::NestedLoopJoin(executor) => executor.next(), diff --git a/src/executor/index_scan_executor.rs b/src/executor/index_scan_executor.rs new file mode 100644 index 0000000..08b8ecd --- /dev/null +++ b/src/executor/index_scan_executor.rs @@ -0,0 +1,27 @@ +use anyhow::Result; + +use crate::{plan::IndexScanPlan, tuple::Tuple}; + +use super::ExecutorContext; + +pub struct IndexScanExecutor<'a> { + pub plan: IndexScanPlan, + pub executor_context: &'a ExecutorContext, + pub index_id: i64, +} + +impl IndexScanExecutor<'_> { + pub fn init(&mut self) -> Result<()> { + let index = self + .executor_context + .catalog + .lock() + .map_err(|_| anyhow::anyhow!("Catalog lock error"))? + .get_index(self.index_id, self.executor_context.transaction_id)?; + println!("Index: {:?}", index); + Ok(()) + } + pub fn next(&mut self) -> Result> { + Ok(None) + } +} diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..2e6b0f1 --- /dev/null +++ b/src/index.rs @@ -0,0 +1,47 @@ +use anyhow::Result; + +use crate::{common::PageID, value::Value}; + +#[derive(Debug)] +pub struct Index { + pub id: i64, + pub name: String, + pub table_name: String, + pub first_page_id: PageID, + pub columns: Vec, +} + +impl Index { + pub fn from_system_table(values: Vec) -> Result { + let id = if let Value::Integer(id) = &values[0] { + id.0 + } else { + return Err(anyhow::anyhow!("Invalid id")); + }; + let name = if let Value::Varchar(name) = &values[1] { + name.0.clone() + } else { + return Err(anyhow::anyhow!("Invalid name")); + }; + let table_name = if let Value::Varchar(table_name) = &values[2] { + table_name.0.clone() + } else { + return Err(anyhow::anyhow!("Invalid table_name")); + }; + let first_page_id = if let Value::Integer(first_page_id) = &values[3] { + PageID(first_page_id.0 as u32) + } else { + return Err(anyhow::anyhow!("Invalid first_page_id")); + }; + Ok(Self { + id, + name, + table_name, + first_page_id, + columns: Vec::new(), + }) + } + pub fn add_columns(&mut self, column: String) { + self.columns.push(column); + } +} diff --git a/src/instance.rs b/src/instance.rs index dcf5f20..6fae08c 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -15,6 +15,7 @@ use crate::{ executor::{ExecutorContext, ExecutorEngine}, lock::LockManager, log::LogManager, + optimizer::Optimizer, parser::{CreateIndexStatementAST, CreateTableStatementAST, StatementAST}, plan::Planner, recovery::RecoveryManager, @@ -131,6 +132,8 @@ impl Instance { let bound_statement = binder.bind_statement(statement)?; let planner = Planner::new(bound_statement); let plan = planner.plan(); + let optimizer = Optimizer::new(self.catalog.clone(), txn_id); + let plan = optimizer.optimize(plan)?; let schema = plan.schema().clone(); let executor_context = ExecutorContext { transaction_id: txn_id, diff --git a/src/lib.rs b/src/lib.rs index c723c64..96c557a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,10 +6,12 @@ pub mod common; pub mod concurrency; pub mod disk; pub mod executor; +pub mod index; pub mod instance; pub mod lexer; pub mod lock; pub mod log; +pub mod optimizer; pub mod page; pub mod parser; pub mod plan; diff --git a/src/optimizer.rs b/src/optimizer.rs new file mode 100644 index 0000000..e87f837 --- /dev/null +++ b/src/optimizer.rs @@ -0,0 +1,58 @@ +use std::sync::{Arc, Mutex}; + +use anyhow::Result; + +use crate::{ + binder::BoundExpressionAST, + catalog::Catalog, + common::TransactionID, + plan::{IndexScanPlan, Plan}, +}; + +pub struct Optimizer { + catalog: Arc>, + txn_id: TransactionID, +} +impl Optimizer { + pub fn new(catalog: Arc>, txn_id: TransactionID) -> Self { + Self { catalog, txn_id } + } + pub fn optimize(&self, plan: Plan) -> Result { + let mut optimized_plan = plan; + optimized_plan = self.optimize_filter_index_scan(optimized_plan)?; + Ok(optimized_plan) + } + fn optimize_filter_index_scan(&self, plan: Plan) -> Result { + let mut children = Vec::new(); + for child in plan.children() { + let optimized_child = self.optimize_filter_index_scan(*child)?; + children.push(optimized_child); + } + let mut source_plan = plan.clone(); + source_plan.set_children(children); + if let Plan::Filter(filter_plan) = plan { + if let BoundExpressionAST::Binary(binary_expression) = filter_plan.condition { + if let BoundExpressionAST::Path(path_expression) = *binary_expression.left { + let indexes = self + .catalog + .lock() + .map_err(|_| anyhow::anyhow!("Catalog lock error"))? + .get_indexes_by_table_name(&path_expression.table_name, self.txn_id)?; + for index in indexes { + if index.columns.len() == 1 + && index.columns[0] == path_expression.column_name + { + if let BoundExpressionAST::Literal(_) = *binary_expression.right { + return Ok(Plan::IndexScan(IndexScanPlan { + index_id: index.id, + schema: filter_plan.schema, + })); + } + } + } + } + } + } + Ok(source_plan) + } +} diff --git a/src/plan.rs b/src/plan.rs index 0d4c425..e57c7c5 100644 --- a/src/plan.rs +++ b/src/plan.rs @@ -59,6 +59,24 @@ impl Plan { Plan::Update(plan) => vec![plan.child.clone()], } } + pub fn set_children(&mut self, children: Vec) { + match self { + Plan::SeqScan(_) => {} + Plan::IndexScan(_) => {} + Plan::Filter(plan) => plan.child = Box::new(children[0].clone()), + Plan::Project(plan) => plan.child = Box::new(children[0].clone()), + Plan::NestedLoopJoin(plan) => { + plan.children = children.into_iter().map(Box::new).collect() + } + Plan::Aggregate(plan) => plan.child = Box::new(children[0].clone()), + Plan::Sort(plan) => plan.child = Box::new(children[0].clone()), + Plan::Limit(plan) => plan.child = Box::new(children[0].clone()), + Plan::EmptyRow(_) => {} + Plan::Insert(_) => {} + Plan::Delete(plan) => plan.child = Box::new(children[0].clone()), + Plan::Update(plan) => plan.child = Box::new(children[0].clone()), + } + } } #[derive(Debug, PartialEq, Eq, Clone)] pub struct SeqScanPlan { @@ -67,7 +85,7 @@ pub struct SeqScanPlan { } #[derive(Debug, PartialEq, Eq, Clone)] pub struct IndexScanPlan { - pub root_page_id: PageID, + pub index_id: i64, pub schema: Schema, } #[derive(Debug, PartialEq, Eq, Clone)]