Skip to content

Commit

Permalink
Add recovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
gtnao committed Jan 20, 2024
1 parent 91b45b6 commit 8d32bf2
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 13 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ The following shows REPEATABLE READ implemented via snapshot isolation.
- [ ] Lock
- [x] Row-level Exclusive Locking: prevent dirty write.
- [ ] Deadlock Detection
- [ ] Recovery (ARIES-based)
- [x] Recovery (ARIES-based)
- [ ] Checkpoint
- [ ] Indexing
- [ ] B+ Tree
- [x] Buffer Pool
Expand Down
22 changes: 21 additions & 1 deletion src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,31 @@ impl BufferPoolManager {
}
unreachable!("page not found")
}
pub fn init_page_for_recovery(&mut self, page_id: PageID, page: Page) -> Result<()> {
if self.page_table.contains_key(&page_id) {
return Ok(());
}
let mut data = vec![0u8; PAGE_SIZE];
self.disk_manager.read_page(page_id, &mut data)?;
if data.iter().all(|&x| x == 0) {
if self.is_full() {
self.evict_page()?;
}
let frame_id = self.frames.len();
self.frames
.push(Some(Frame::new(Arc::new(RwLock::new(page)))));
self.page_table.insert(page_id, frame_id);
if let Some(frame) = &mut self.frames[frame_id] {
frame.mark_dirty();
}
}
Ok(())
}
pub fn shutdown(&mut self) -> Result<()> {
self.flush_all_pages()?;
Ok(())
}
fn flush_all_pages(&mut self) -> Result<()> {
pub fn flush_all_pages(&mut self) -> Result<()> {
let keys = self.page_table.keys().cloned().collect::<Vec<_>>();
for page_id in keys {
self.flush_page(page_id)?;
Expand Down
23 changes: 17 additions & 6 deletions src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
log::LogManager,
parser::{CreateTableStatementAST, StatementAST},
plan::Planner,
recovery::RecoveryManager,
value::Value,
};

Expand All @@ -29,7 +30,7 @@ pub struct Instance {
}

impl Instance {
pub fn new(dir: &str, init: bool) -> Result<Self> {
pub fn new(dir: &str, init: bool, recover: bool) -> Result<Self> {
if init {
if fs::metadata(dir).is_ok() && fs::metadata(dir)?.is_dir() {
fs::remove_dir_all(dir)?;
Expand Down Expand Up @@ -64,6 +65,16 @@ impl Instance {
catalog.bootstrap(init)?;
let catalog = Arc::new(Mutex::new(catalog));

if recover {
let log_records = log_manager
.lock()
.map_err(|e| anyhow::anyhow!("{}", e))?
.read()?;
let mut recovery_manager =
RecoveryManager::new(buffer_pool_manager.clone(), log_records);
recovery_manager.recover()?;
}

Ok(Self {
buffer_pool_manager,
catalog,
Expand Down Expand Up @@ -163,7 +174,7 @@ mod tests {
fn test_new_init() -> Result<()> {
let temp_dir = tempdir()?;
let dir = temp_dir.path().join("test");
Instance::new(dir.to_str().unwrap(), true)?;
Instance::new(dir.to_str().unwrap(), true, false)?;
assert!(dir.exists());
assert!(dir.join("data.db").exists());
assert!(dir.join("txn.log").exists());
Expand All @@ -174,14 +185,14 @@ mod tests {
fn test_new_init_exists() -> Result<()> {
let temp_dir = tempdir()?;
let dir = temp_dir.path().join("test");
let instance = Instance::new(dir.to_str().unwrap(), true)?;
let instance = Instance::new(dir.to_str().unwrap(), true, false)?;

let created_at = fs::metadata(&dir)?.created()?;
thread::sleep(std::time::Duration::from_secs(1));

// check dir was recreated
instance.shutdown()?;
Instance::new(dir.to_str().unwrap(), true)?;
Instance::new(dir.to_str().unwrap(), true, false)?;
assert!(created_at < fs::metadata(&dir)?.created()?);
assert!(dir.join("data.db").exists());
assert!(dir.join("txn.log").exists());
Expand All @@ -192,13 +203,13 @@ mod tests {
fn test_new_not_init() -> Result<()> {
let temp_dir = tempdir()?;
let dir = temp_dir.path().join("test");
let instance = Instance::new(dir.to_str().unwrap(), true)?;
let instance = Instance::new(dir.to_str().unwrap(), true, false)?;
let created_at = fs::metadata(&dir)?.created()?;
thread::sleep(std::time::Duration::from_secs(1));

// check dir was not recreated
instance.shutdown()?;
Instance::new(dir.to_str().unwrap(), false)?;
Instance::new(dir.to_str().unwrap(), false, false)?;
assert_eq!(created_at, fs::metadata(&dir)?.created()?);
assert!(dir.join("data.db").exists());
assert!(dir.join("txn.log").exists());
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod log;
pub mod page;
pub mod parser;
pub mod plan;
pub mod recovery;
pub mod server;
pub mod table;
pub mod tuple;
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ fn main() -> Result<()> {
match &*args[1] {
"client" => client_start()?,
"server" => {
let init = args.len() > 2 && &*args[2] == "--init";
server_start(init)?;
let init = args.iter().any(|arg| arg == "--init");
let recover = args.iter().any(|arg| arg == "--recover");
server_start(init, recover)?;
}
_ => {
println!("Usage: cargo run client|server");
Expand Down
139 changes: 139 additions & 0 deletions src/recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::sync::{Arc, Mutex};

use anyhow::Result;

use crate::{
buffer::BufferPoolManager,
common::TransactionID,
log::{LogRecord, LogRecordBody},
page::{table_page::TablePage, Page},
};

pub struct RecoveryManager {
buffer_pool_manager: Arc<Mutex<BufferPoolManager>>,
log_records: Vec<LogRecord>,
active_txn_ids: Vec<TransactionID>,
}

impl RecoveryManager {
pub fn new(
buffer_pool_manager: Arc<Mutex<BufferPoolManager>>,
log_records: Vec<LogRecord>,
) -> Self {
Self {
buffer_pool_manager,
log_records,
active_txn_ids: vec![],
}
}

pub fn recover(&mut self) -> Result<()> {
self.redo()?;
self.undo()?;
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.flush_all_pages()?;
Ok(())
}

pub fn redo(&mut self) -> Result<()> {
for log_record in &self.log_records {
match log_record.body {
LogRecordBody::BeginTransaction => {
self.active_txn_ids.push(log_record.txn_id);
}
LogRecordBody::CommitTransaction | LogRecordBody::AbortTransaction => {
self.active_txn_ids.retain(|&x| x != log_record.txn_id);
}
LogRecordBody::InsertToTablePage(ref body) => {
let page = self
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(body.page_id)?;
let page_lsn = page
.read()
.map_err(|_| anyhow::anyhow!("lock error"))?
.with_table_page(|table_page| table_page.lsn());
if page_lsn < log_record.lsn {
page.write()
.map_err(|_| anyhow::anyhow!("lock error"))?
.with_table_page_mut(|table_page| -> Result<()> {
table_page
.insert(&body.data)
.map_err(|e| anyhow::anyhow!("{}", e))?;
table_page.set_lsn(log_record.lsn);
Ok(())
})?;
}
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.unpin_page(body.page_id, true)?;
}
LogRecordBody::DeleteFromTablePage(ref body) => {
let page = self
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(body.rid.0)?;
let page_lsn = page
.read()
.map_err(|_| anyhow::anyhow!("lock error"))?
.with_table_page(|table_page| table_page.lsn());
if page_lsn < log_record.lsn {
page.write()
.map_err(|_| anyhow::anyhow!("lock error"))?
.with_table_page_mut(|table_page| {
table_page.delete(body.rid.1, log_record.txn_id);
table_page.set_lsn(log_record.lsn);
});
}
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.unpin_page(body.rid.0, true)?;
}
LogRecordBody::NewTablePage(ref body) => {
let mut table_page = TablePage::new(body.page_id);
table_page.set_lsn(log_record.lsn);
let page = Page::Table(table_page);
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.init_page_for_recovery(body.page_id, page)?;
}
LogRecordBody::SetNextPageID(ref body) => {
let page = self
.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.fetch_page(body.page_id)?;
let page_lsn = page
.read()
.map_err(|_| anyhow::anyhow!("lock error"))?
.with_table_page(|table_page| table_page.lsn());
if page_lsn < log_record.lsn {
page.write()
.map_err(|_| anyhow::anyhow!("lock error"))?
.with_table_page_mut(|table_page| {
table_page.set_next_page_id(body.next_page_id);
table_page.set_lsn(log_record.lsn);
});
}
self.buffer_pool_manager
.lock()
.map_err(|_| anyhow::anyhow!("lock error"))?
.unpin_page(body.page_id, true)?;
}
}
}
Ok(())
}

pub fn undo(&self) -> Result<()> {
// TODO:
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use crate::{

const SERVER_DEFAULT_PORT: u16 = 7878;

pub fn server_start(init: bool) -> Result<()> {
pub fn server_start(init: bool, recover: bool) -> Result<()> {
println!("junkdb server started");

// init
let instance = Arc::new(RwLock::new(Instance::new("data", init)?));
let instance = Arc::new(RwLock::new(Instance::new("data", init, recover)?));

// trap signals
let instance_clone = instance.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
pub fn setup_test_database() -> Result<Instance> {
let dir = tempdir()?;
let data_dir = dir.path().join("data");
let instance = Instance::new(data_dir.to_str().unwrap(), true)?;
let instance = Instance::new(data_dir.to_str().unwrap(), true, false)?;
let txn_id = instance.begin(None)?;
let create_table_ast = CreateTableStatementAST {
table_name: "t1".to_string(),
Expand Down

0 comments on commit 8d32bf2

Please sign in to comment.