Skip to content

Commit

Permalink
Add index scan(wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtnao committed May 6, 2024
1 parent 9e02b58 commit e4ce48f
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 20 deletions.
17 changes: 16 additions & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,29 @@ impl Catalog {
}
Ok(schema)
}

// TODO: store indexes in catalog
pub fn get_index(&self, index_id: i64, txn_id: TransactionID) -> Result<Index> {
let system_indexes_table =
self.system_table_heap(PageID(SYSTEM_INDEXES_FIRST_PAGE_ID.0), txn_id);
for tuple in system_indexes_table.iter() {
let values = tuple.values(&Self::system_indexes_schema());
if let Value::Integer(IntegerValue(id)) = values[0] {
if id == index_id {
return Ok(Index::from_system_table(values)?);
let mut index = Index::from_system_table(values)?;
let system_index_columns_table = self
.system_table_heap(PageID(SYSTEM_INDEX_COLUMNS_FIRST_PAGE_ID.0), txn_id);
for tuple in system_index_columns_table.iter() {
let values = tuple.values(&Self::system_index_columns_schema());
if let Value::Integer(IntegerValue(columns_table_index_id)) = &values[0] {
if *columns_table_index_id == index.id {
if let Value::Varchar(VarcharValue(column_name)) = &values[1] {
index.add_columns(column_name.clone());
}
}
}
}
return Ok(index);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ impl ExecutorEngine {
plan: index_plan.clone(),
executor_context: &self.context,
index_id: index_plan.index_id,
rids: None,
cursor: 0,
}),
Plan::Filter(filter_plan) => Executor::Filter(FilterExecutor {
plan: filter_plan.clone(),
Expand Down Expand Up @@ -160,6 +162,7 @@ impl ExecutorEngine {
self.context.log_manager.clone(),
self.context.transaction_id,
),
table_schema: insert_plan.table_schema.clone(),
count: 0,
executed: false,
}),
Expand Down
42 changes: 38 additions & 4 deletions src/executor/index_scan_executor.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
use anyhow::Result;

use crate::{plan::IndexScanPlan, tuple::Tuple};
use crate::{common::RID, index::IndexManager, plan::IndexScanPlan, tuple::Tuple};

use super::ExecutorContext;

pub struct IndexScanExecutor<'a> {
pub plan: IndexScanPlan,
pub executor_context: &'a ExecutorContext,
pub index_id: i64,
pub rids: Option<Vec<RID>>,
pub cursor: usize,
}

impl IndexScanExecutor<'_> {
pub fn init(&mut self) -> Result<()> {
let index = self
let mut index = self
.executor_context
.catalog
.lock()
.map_err(|_| anyhow::anyhow!("Catalog lock error"))?
.get_index(self.index_id, self.executor_context.transaction_id)?;
println!("Index: {:?}", index);
index.set_schema(self.plan.table_schema.clone());
let index_manager = IndexManager::new(
index,
self.executor_context.catalog.clone(),
self.executor_context.buffer_pool_manager.clone(),
);
let right_value = self
.plan
.binary_expression
.right
.eval(&vec![&Tuple::new(None, &[])], &vec![])
.map_err(|_| anyhow::anyhow!("eval error"))?;
self.rids = index_manager.lookup(&right_value)?;
Ok(())
}
pub fn next(&mut self) -> Result<Option<Tuple>> {
Ok(None)
if let Some(rids) = &self.rids {
if self.cursor >= rids.len() {
return Ok(None);
}
let rid = rids[self.cursor];
self.cursor += 1;
let page = self
.executor_context
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(rid.0)?;
let tuple_data = page
.read()
.map_err(|_| anyhow::anyhow!("read error"))?
.with_table_page(|table_page| table_page.get_tuple(rid.1 as usize));
let tuple = Tuple::new(None, &tuple_data);
Ok(Some(tuple))
} else {
Ok(None)
}
}
}
32 changes: 31 additions & 1 deletion src/executor/insert_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use anyhow::Result;
use crate::{
catalog::Schema,
common::INVALID_TRANSACTION_ID,
index::IndexManager,
plan::InsertPlan,
table::TableHeap,
tuple::Tuple,
Expand All @@ -15,6 +16,7 @@ pub struct InsertExecutor<'a> {
pub plan: InsertPlan,
pub executor_context: &'a ExecutorContext,
pub table_heap: TableHeap,
pub table_schema: Schema,
pub count: u32,
pub executed: bool,
}
Expand All @@ -38,8 +40,36 @@ impl InsertExecutor<'_> {
raw_value.convert_to(&c.data_type)
})
.collect::<Result<Vec<_>>>()?;
self.table_heap.insert(&values)?;
let rid = self.table_heap.insert(&values)?;
self.count += 1;
let mut indexes = self
.executor_context
.catalog
.lock()
.map_err(|_| anyhow::anyhow!("Catalog lock error"))?
.get_indexes_by_table_name(
&self.plan.table_name,
self.executor_context.transaction_id,
)?;
for index in indexes.iter_mut() {
index.set_schema(self.plan.table_schema.clone());
}
for index in indexes {
// TODO: only support single column index
let column_name = index.columns[0].clone();
let index_manager = IndexManager::new(
index,
self.executor_context.catalog.clone(),
self.executor_context.buffer_pool_manager.clone(),
);
for (i, column) in self.plan.table_schema.columns.iter().enumerate() {
if column.name == column_name {
let right_value = values[i].clone();
index_manager.insert(&right_value, rid)?;
break;
}
}
}
Ok(())
}
pub fn next(&mut self) -> Result<Option<Tuple>> {
Expand Down
112 changes: 111 additions & 1 deletion src/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use std::sync::{Arc, Mutex};

use anyhow::Result;

use crate::{common::PageID, value::Value};
use crate::{
buffer::BufferPoolManager,
catalog::{Catalog, Schema},
common::{PageID, RID},
value::Value,
};

#[derive(Debug)]
pub struct Index {
Expand All @@ -9,6 +16,7 @@ pub struct Index {
pub table_name: String,
pub first_page_id: PageID,
pub columns: Vec<String>,
pub schema: Schema,
}

impl Index {
Expand Down Expand Up @@ -39,9 +47,111 @@ impl Index {
table_name,
first_page_id,
columns: Vec::new(),
schema: Schema {
columns: Vec::new(),
},
})
}
// TODO: set by catalog
pub fn add_columns(&mut self, column: String) {
self.columns.push(column);
}
// TODO: set by catalog
pub fn set_schema(&mut self, schema: Schema) {
let columns = self.columns.iter().map(|column| {
schema
.columns
.iter()
.find(|c| c.name == *column)
.expect("column not found")
.clone()
});
self.schema = Schema {
columns: columns.collect(),
};
}
}

pub struct IndexManager {
index: Index,
catalog: Arc<Mutex<Catalog>>,
buffer_pool_manager: Arc<Mutex<BufferPoolManager>>,
}
impl IndexManager {
pub fn new(
index: Index,
catalog: Arc<Mutex<Catalog>>,
buffer_pool_manager: Arc<Mutex<BufferPoolManager>>,
) -> Self {
Self {
index,
catalog,
buffer_pool_manager,
}
}
pub fn lookup(&self, key: &Value) -> Result<Option<Vec<RID>>> {
let leaf_page_id = self.find_leaf_page(key)?;
let page = self
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(leaf_page_id)?;
let rid = page
.read()
.map_err(|_| anyhow::anyhow!("read error"))?
.with_b_plus_tree_leaf_page(|leaf_page| {
leaf_page.lookup(&[key.clone()], &self.index.schema)
});
Ok(rid)
}
pub fn insert(&self, key: &Value, rid: RID) -> Result<()> {
self.insert_into_leaf_page(key, rid)?;
Ok(())
}
fn insert_into_leaf_page(&self, key: &Value, rid: RID) -> Result<()> {
let leaf_page_id = self.find_leaf_page(key)?;
let page = self
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(leaf_page_id)?;
// TODO: write WAL
page.write()
.map_err(|_| anyhow::anyhow!("write error"))?
.with_b_plus_tree_leaf_page_mut(|leaf_page| {
// TODO: full check
leaf_page.insert(&[key.clone()], rid, &self.index.schema)
});
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.unpin_page(leaf_page_id, true)?;
Ok(())
}
fn find_leaf_page(&self, key: &Value) -> Result<PageID> {
let page_id = self.index.first_page_id;
loop {
let page = self
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(page_id)?;
if page
.read()
.map_err(|_| anyhow::anyhow!("read error"))?
.is_b_plus_tree_leaf()
{
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.unpin_page(page_id, false)?;
return Ok(page_id);
}
// TODO: lookup internal page
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.unpin_page(page_id, false)?;
}
}
}
11 changes: 11 additions & 0 deletions src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl Optimizer {
source_plan.set_children(children);
if let Plan::Filter(filter_plan) = plan {
if let BoundExpressionAST::Binary(binary_expression) = filter_plan.condition {
let binary_expression_clone = binary_expression.clone();
if let BoundExpressionAST::Path(path_expression) = *binary_expression.left {
let indexes = self
.catalog
Expand All @@ -43,9 +44,19 @@ impl Optimizer {
&& index.columns[0] == path_expression.column_name
{
if let BoundExpressionAST::Literal(_) = *binary_expression.right {
let table_schema = self
.catalog
.lock()
.map_err(|_| anyhow::anyhow!("Catalog lock error"))?
.get_schema_by_table_name(
&path_expression.table_name,
self.txn_id,
)?;
return Ok(Plan::IndexScan(IndexScanPlan {
index_id: index.id,
schema: filter_plan.schema,
binary_expression: binary_expression_clone,
table_schema,
}));
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,20 @@ impl Page {
}
}
pub fn from_data(data: &[u8]) -> Self {
Page::Table(TablePage::from_data(data))
let mut bytes = [0u8; 4];
bytes.copy_from_slice(&data[PAGE_TYPE_OFFSET..(PAGE_TYPE_OFFSET + PAGE_TYPE_SIZE)]);
let page_type = match u32::from_le_bytes(bytes) {
1 => TABLE_PAGE_PAGE_TYPE,
2 => B_PLUS_TREE_LEAF_PAGE_PAGE_TYPE,
_ => panic!("page type not supported"),
};
match page_type {
TABLE_PAGE_PAGE_TYPE => Page::Table(TablePage::from_data(data)),
B_PLUS_TREE_LEAF_PAGE_PAGE_TYPE => {
Page::BPlusTreeLeaf(BPlusTreeLeafPage::from_data(data))
}
_ => panic!("page type not supported"),
}
}
pub fn data(&self) -> &[u8] {
match self {
Expand All @@ -84,4 +97,7 @@ impl Page {
Page::BPlusTreeLeaf(b_plus_tree_leaf_page) => b_plus_tree_leaf_page.page_id(),
}
}
pub fn is_b_plus_tree_leaf(&self) -> bool {
matches!(self, Page::BPlusTreeLeaf(_))
}
}
9 changes: 6 additions & 3 deletions src/page/table_page.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};

use crate::{
common::{PageID, TransactionID, INVALID_PAGE_ID, LSN, PAGE_SIZE},
common::{PageID, TransactionID, INVALID_PAGE_ID, LSN, PAGE_SIZE, RID},
tuple::Tuple,
};

Expand Down Expand Up @@ -50,7 +50,7 @@ impl TablePage {
pub fn from_data(data: &[u8]) -> Self {
TablePage { data: data.into() }
}
pub fn insert(&mut self, data: &[u8]) -> Result<()> {
pub fn insert(&mut self, data: &[u8]) -> Result<RID> {
// TODO: too large for one page
if self.free_space() < data.len() + LINE_POINTER_SIZE {
return Err(anyhow!("free space not enough"));
Expand All @@ -72,7 +72,10 @@ impl TablePage {
.copy_from_slice(&data_size.to_le_bytes());
self.data[(next_upper_offset as usize)..(upper_offset as usize)].copy_from_slice(data);

Ok(())
Ok(RID(
self.page_id(),
(lower_offset - HEADER_SIZE as u32) / LINE_POINTER_SIZE as u32,
))
}
pub fn delete(&mut self, index: u32, txn_id: TransactionID) {
let offset = self.line_pointer_offset(index as usize) as usize;
Expand Down
Loading

0 comments on commit e4ce48f

Please sign in to comment.