From 8da4d355a2a499e49c0a4006b16783b3a94d7c62 Mon Sep 17 00:00:00 2001 From: Adam Comer Date: Sun, 23 Nov 2025 03:22:58 -0700 Subject: [PATCH] feat: add sstable --- src/lib.rs | 5 +- src/sstable.rs | 348 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 349 insertions(+), 4 deletions(-) create mode 100644 src/sstable.rs diff --git a/src/lib.rs b/src/lib.rs index 29fa412..d39c1f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,6 @@ pub mod database; mod mem_table; +mod sstable; mod utils; mod wal; mod wal_iterator; - -// mod table; -// mod table_manager; -// mod utils; diff --git a/src/sstable.rs b/src/sstable.rs new file mode 100644 index 0000000..1d2546f --- /dev/null +++ b/src/sstable.rs @@ -0,0 +1,348 @@ +use std::fs::{create_dir_all, File, OpenOptions}; +use std::io::{self, BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{u8, usize}; + +use crate::mem_table::MemTable; + +pub struct SSTableEntry { + key: Vec, + value: Option>, + timestamp: u128, +} + +pub struct SSTable { + file: BufReader, + path: PathBuf, + offsets: Vec, + low_key: Vec, + high_key: Vec, +} + +impl SSTable { + pub fn new(memtable: &MemTable, level: usize, dir: &Path) -> io::Result { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros(); + + let path = Path::new(dir).join(format!("{}/{}.sstable", level, timestamp.to_string())); + + create_dir_all(path.parent().unwrap())?; + + let file = OpenOptions::new().append(true).create(true).open(&path)?; + let mut file = BufWriter::new(file); + + let mut offsets = Vec::new(); + let mut offset = 0; + for entry in memtable.entries() { + offsets.push(offset as u64); + + file.write_all(&entry.key.len().to_le_bytes())?; + file.write_all(&entry.key)?; + + file.write_all(&(entry.deleted as u8).to_le_bytes())?; + + if !entry.deleted { + let value = entry.value.as_ref().unwrap(); + file.write_all(&value.len().to_le_bytes())?; + file.write_all(value)?; + } + + file.write_all(&entry.timestamp.to_le_bytes())?; + + offset += size_of::() + + size_of::() + + size_of::() + + entry.key.len() + + if !entry.deleted { + size_of::() + entry.value.as_ref().unwrap().len() + } else { + 0 + } + } + + file.flush()?; + + let file = OpenOptions::new().read(true).open(&path)?; + let file = BufReader::new(file); + + Ok(SSTable { + file, + path, + offsets, + low_key: memtable.entries().first().unwrap().key.clone(), + high_key: memtable.entries().last().unwrap().key.clone(), + }) + } + + pub fn load_from_path(path: &Path) -> io::Result { + let file = OpenOptions::new().read(true).open(&path)?; + let mut file = BufReader::new(file); + + let mut offsets = Vec::new(); + let mut offset = 0; + while file.fill_buf()?.len() > 0 { + offsets.push(offset as u64); + + let mut buf = [0u8; size_of::()]; + file.read_exact(&mut buf)?; + + let key_len = usize::from_le_bytes(buf); + file.seek_relative(key_len as i64)?; + + let mut buf = [0u8; size_of::()]; + file.read_exact(&mut buf)?; + let deleted = buf[0] == 1; + + let mut val_len = 0; + if !deleted { + let mut buf = [0u8; size_of::()]; + file.read_exact(&mut buf)?; + + val_len = usize::from_le_bytes(buf); + file.seek_relative(val_len as i64)?; + } + + file.seek_relative(size_of::() as i64)?; + + offset += size_of::() + + size_of::() + + size_of::() + + key_len + + if !deleted { + size_of::() + val_len + } else { + 0 + } + } + + file.seek(SeekFrom::Start(0))?; + + let mut buf = [0u8; size_of::()]; + file.read_exact(&mut buf)?; + + let key_len = usize::from_le_bytes(buf); + let mut low_key = vec![0u8; key_len]; + file.read_exact(&mut low_key)?; + + file.seek(SeekFrom::Start(*offsets.last().unwrap() as u64))?; + + let mut buf = [0u8; size_of::()]; + file.read_exact(&mut buf)?; + + let key_len = usize::from_le_bytes(buf); + let mut high_key = vec![0u8; key_len]; + file.read_exact(&mut high_key)?; + + Ok(SSTable { + file, + path: path.to_owned(), + offsets, + low_key, + high_key, + }) + } + + pub fn key_in_range(&self, key: &[u8]) -> bool { + key >= &self.low_key && key <= &self.high_key + } + + pub fn get(&mut self, key: &[u8]) -> io::Result> { + let mut a = 0; + let mut b = 0; + while a <= b { + let m = (a + b) / 2; + let offset = self.offsets[m]; + + self.file.seek(SeekFrom::Start(offset))?; + + let mut buf = [0u8; size_of::()]; + self.file.read_exact(&mut buf)?; + + let key_len = usize::from_le_bytes(buf); + let mut table_key = vec![0u8; key_len]; + self.file.read_exact(&mut table_key)?; + + if key == &table_key { + let mut buf = [0u8; size_of::()]; + self.file.read_exact(&mut buf)?; + + let deleted = buf[0] == 1; + + let mut val = None; + if !deleted { + let mut buf = [0u8; size_of::()]; + self.file.read_exact(&mut buf)?; + + let val_len = usize::from_le_bytes(buf); + let mut table_value = vec![0u8; val_len]; + self.file.read_exact(&mut table_value)?; + + val = Some(table_value) + } + + let mut buf = [0u8; size_of::()]; + self.file.read_exact(&mut buf)?; + + let timestamp = u128::from_le_bytes(buf); + + return Ok(Some(SSTableEntry { + key: table_key, + value: val, + timestamp, + })); + } else if key > &table_key { + a = m + 1; + } else if key < &table_key { + b = m - 1; + } + } + + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use crate::{mem_table::MemTable, sstable::SSTable}; + use std::{ + fs::{File, OpenOptions}, + io::{BufReader, Read, Seek}, + }; + use tempfile::tempdir; + + fn assert_next_entry( + reader: &mut BufReader, + key: &[u8], + value: Option<&[u8]>, + timestamp: u128, + ) { + // Assert key lengths on disk are the same + let mut len_buf = [0u8; size_of::()]; + reader.read_exact(&mut len_buf).unwrap(); + let key_len = usize::from_le_bytes(len_buf); + assert_eq!(key_len, key.len()); + + // Assert key values on disk are the same + let mut file_key = vec![0u8; key_len]; + reader.read_exact(&mut file_key).unwrap(); + assert_eq!(file_key, key); + + // Assert deleted booleans on disk are the same + let mut deleted_buf = [0u8; size_of::()]; + reader.read_exact(&mut deleted_buf).unwrap(); + let deleted = deleted_buf[0] == 1; + assert_eq!(deleted, value.is_none()); + + if !deleted { + let mut len_buf = [0u8; size_of::()]; + reader.read_exact(&mut len_buf).unwrap(); + let val_len = usize::from_le_bytes(len_buf); + assert_eq!(val_len, value.unwrap().len()); + + // Assert key values on disk are the same + let mut file_value = vec![0u8; val_len]; + reader.read_exact(&mut file_value).unwrap(); + assert_eq!(file_value, value.unwrap()); + } + + // Assert timestamps on disk are the same + let mut timestamp_buf = [0u8; size_of::()]; + reader.read_exact(&mut timestamp_buf).unwrap(); + let file_timestamp = u128::from_le_bytes(timestamp_buf); + assert_eq!(file_timestamp, timestamp); + } + + #[test] + fn test_new_sstable() { + let dir = tempdir().unwrap(); + + let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![ + (b"a", Some(b"1"), 0), + (b"b", Some(b"2"), 1), + (b"c", Some(b"3"), 2), + (b"d", Some(b"4"), 3), + ]; + + let mut memtable = MemTable::new(); + for entry in &entries { + if let Some(value) = entry.1 { + memtable.set(entry.0, value, entry.2); + } else { + memtable.delete(entry.0, entry.2); + } + } + + let table = SSTable::new(&memtable, 0, dir.path()).unwrap(); + + let file = OpenOptions::new().read(true).open(&table.path).unwrap(); + let mut reader = BufReader::new(file); + + for (i, entry) in entries.iter().enumerate() { + assert_eq!(reader.stream_position().unwrap(), table.offsets[i]); + assert_next_entry(&mut reader, entry.0, entry.1, entry.2); + } + } + + #[test] + fn test_load_sstable() { + let dir = tempdir().unwrap(); + + let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![ + (b"a", Some(b"1"), 0), + (b"b", Some(b"2"), 1), + (b"c", Some(b"3"), 2), + (b"d", Some(b"4"), 3), + ]; + + let mut memtable = MemTable::new(); + for entry in &entries { + if let Some(value) = entry.1 { + memtable.set(entry.0, value, entry.2); + } else { + memtable.delete(entry.0, entry.2); + } + } + + let table = SSTable::new(&memtable, 0, dir.path()).unwrap(); + + let table = SSTable::load_from_path(&table.path).unwrap(); + + let file = OpenOptions::new().read(true).open(&table.path).unwrap(); + let mut reader = BufReader::new(file); + + for (i, entry) in entries.iter().enumerate() { + assert_eq!(reader.stream_position().unwrap(), table.offsets[i]); + assert_next_entry(&mut reader, entry.0, entry.1, entry.2); + } + } + + #[test] + fn test_key_in_range() { + let dir = tempdir().unwrap(); + + let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![ + (b"a", Some(b"1"), 0), + (b"b", Some(b"2"), 1), + (b"c", Some(b"3"), 2), + (b"d", Some(b"4"), 3), + ]; + + let mut memtable = MemTable::new(); + for entry in &entries { + if let Some(value) = entry.1 { + memtable.set(entry.0, value, entry.2); + } else { + memtable.delete(entry.0, entry.2); + } + } + + let table = SSTable::new(&memtable, 0, dir.path()).unwrap(); + + assert!(!table.key_in_range(b"A")); + assert!(table.key_in_range(b"c")); + assert!(!table.key_in_range(b"AA")); + } +}