Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
pub mod database;
mod mem_table;
mod sstable;
mod utils;
mod wal;
mod wal_iterator;

// mod table;
// mod table_manager;
// mod utils;
348 changes: 348 additions & 0 deletions src/sstable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
use std::fs::{create_dir_all, File, OpenOptions};
use std::io::{self, BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{u8, usize};

use crate::mem_table::MemTable;

pub struct SSTableEntry {
key: Vec<u8>,
value: Option<Vec<u8>>,
timestamp: u128,
}

pub struct SSTable {
file: BufReader<File>,
path: PathBuf,
offsets: Vec<u64>,
low_key: Vec<u8>,
high_key: Vec<u8>,
}

impl SSTable {
pub fn new(memtable: &MemTable, level: usize, dir: &Path) -> io::Result<SSTable> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros();

let path = Path::new(dir).join(format!("{}/{}.sstable", level, timestamp.to_string()));

create_dir_all(path.parent().unwrap())?;

let file = OpenOptions::new().append(true).create(true).open(&path)?;
let mut file = BufWriter::new(file);

let mut offsets = Vec::new();
let mut offset = 0;
for entry in memtable.entries() {
offsets.push(offset as u64);

file.write_all(&entry.key.len().to_le_bytes())?;
file.write_all(&entry.key)?;

file.write_all(&(entry.deleted as u8).to_le_bytes())?;

if !entry.deleted {
let value = entry.value.as_ref().unwrap();
file.write_all(&value.len().to_le_bytes())?;
file.write_all(value)?;
}

file.write_all(&entry.timestamp.to_le_bytes())?;

offset += size_of::<usize>()
+ size_of::<u8>()
+ size_of::<u128>()
+ entry.key.len()
+ if !entry.deleted {
size_of::<usize>() + entry.value.as_ref().unwrap().len()
} else {
0
}
}

file.flush()?;

let file = OpenOptions::new().read(true).open(&path)?;
let file = BufReader::new(file);

Ok(SSTable {
file,
path,
offsets,
low_key: memtable.entries().first().unwrap().key.clone(),
high_key: memtable.entries().last().unwrap().key.clone(),
})
}

pub fn load_from_path(path: &Path) -> io::Result<SSTable> {
let file = OpenOptions::new().read(true).open(&path)?;
let mut file = BufReader::new(file);

let mut offsets = Vec::new();
let mut offset = 0;
while file.fill_buf()?.len() > 0 {
offsets.push(offset as u64);

let mut buf = [0u8; size_of::<usize>()];
file.read_exact(&mut buf)?;

let key_len = usize::from_le_bytes(buf);
file.seek_relative(key_len as i64)?;

let mut buf = [0u8; size_of::<u8>()];
file.read_exact(&mut buf)?;
let deleted = buf[0] == 1;

let mut val_len = 0;
if !deleted {
let mut buf = [0u8; size_of::<usize>()];
file.read_exact(&mut buf)?;

val_len = usize::from_le_bytes(buf);
file.seek_relative(val_len as i64)?;
}

file.seek_relative(size_of::<u128>() as i64)?;

offset += size_of::<usize>()
+ size_of::<u8>()
+ size_of::<u128>()
+ key_len
+ if !deleted {
size_of::<usize>() + val_len
} else {
0
}
}

file.seek(SeekFrom::Start(0))?;

let mut buf = [0u8; size_of::<usize>()];
file.read_exact(&mut buf)?;

let key_len = usize::from_le_bytes(buf);
let mut low_key = vec![0u8; key_len];
file.read_exact(&mut low_key)?;

file.seek(SeekFrom::Start(*offsets.last().unwrap() as u64))?;

let mut buf = [0u8; size_of::<usize>()];
file.read_exact(&mut buf)?;

let key_len = usize::from_le_bytes(buf);
let mut high_key = vec![0u8; key_len];
file.read_exact(&mut high_key)?;

Ok(SSTable {
file,
path: path.to_owned(),
offsets,
low_key,
high_key,
})
}

pub fn key_in_range(&self, key: &[u8]) -> bool {
key >= &self.low_key && key <= &self.high_key
}

pub fn get(&mut self, key: &[u8]) -> io::Result<Option<SSTableEntry>> {
let mut a = 0;
let mut b = 0;
while a <= b {
let m = (a + b) / 2;
let offset = self.offsets[m];

self.file.seek(SeekFrom::Start(offset))?;

let mut buf = [0u8; size_of::<usize>()];
self.file.read_exact(&mut buf)?;

let key_len = usize::from_le_bytes(buf);
let mut table_key = vec![0u8; key_len];
self.file.read_exact(&mut table_key)?;

if key == &table_key {
let mut buf = [0u8; size_of::<u8>()];
self.file.read_exact(&mut buf)?;

let deleted = buf[0] == 1;

let mut val = None;
if !deleted {
let mut buf = [0u8; size_of::<usize>()];
self.file.read_exact(&mut buf)?;

let val_len = usize::from_le_bytes(buf);
let mut table_value = vec![0u8; val_len];
self.file.read_exact(&mut table_value)?;

val = Some(table_value)
}

let mut buf = [0u8; size_of::<u128>()];
self.file.read_exact(&mut buf)?;

let timestamp = u128::from_le_bytes(buf);

return Ok(Some(SSTableEntry {
key: table_key,
value: val,
timestamp,
}));
} else if key > &table_key {
a = m + 1;
} else if key < &table_key {
b = m - 1;
}
}

Ok(None)
}
}

#[cfg(test)]
mod tests {
use crate::{mem_table::MemTable, sstable::SSTable};
use std::{
fs::{File, OpenOptions},
io::{BufReader, Read, Seek},
};
use tempfile::tempdir;

fn assert_next_entry(
reader: &mut BufReader<File>,
key: &[u8],
value: Option<&[u8]>,
timestamp: u128,
) {
// Assert key lengths on disk are the same
let mut len_buf = [0u8; size_of::<usize>()];
reader.read_exact(&mut len_buf).unwrap();
let key_len = usize::from_le_bytes(len_buf);
assert_eq!(key_len, key.len());

// Assert key values on disk are the same
let mut file_key = vec![0u8; key_len];
reader.read_exact(&mut file_key).unwrap();
assert_eq!(file_key, key);

// Assert deleted booleans on disk are the same
let mut deleted_buf = [0u8; size_of::<u8>()];
reader.read_exact(&mut deleted_buf).unwrap();
let deleted = deleted_buf[0] == 1;
assert_eq!(deleted, value.is_none());

if !deleted {
let mut len_buf = [0u8; size_of::<usize>()];
reader.read_exact(&mut len_buf).unwrap();
let val_len = usize::from_le_bytes(len_buf);
assert_eq!(val_len, value.unwrap().len());

// Assert key values on disk are the same
let mut file_value = vec![0u8; val_len];
reader.read_exact(&mut file_value).unwrap();
assert_eq!(file_value, value.unwrap());
}

// Assert timestamps on disk are the same
let mut timestamp_buf = [0u8; size_of::<u128>()];
reader.read_exact(&mut timestamp_buf).unwrap();
let file_timestamp = u128::from_le_bytes(timestamp_buf);
assert_eq!(file_timestamp, timestamp);
}

#[test]
fn test_new_sstable() {
let dir = tempdir().unwrap();

let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![
(b"a", Some(b"1"), 0),
(b"b", Some(b"2"), 1),
(b"c", Some(b"3"), 2),
(b"d", Some(b"4"), 3),
];

let mut memtable = MemTable::new();
for entry in &entries {
if let Some(value) = entry.1 {
memtable.set(entry.0, value, entry.2);
} else {
memtable.delete(entry.0, entry.2);
}
}

let table = SSTable::new(&memtable, 0, dir.path()).unwrap();

let file = OpenOptions::new().read(true).open(&table.path).unwrap();
let mut reader = BufReader::new(file);

for (i, entry) in entries.iter().enumerate() {
assert_eq!(reader.stream_position().unwrap(), table.offsets[i]);
assert_next_entry(&mut reader, entry.0, entry.1, entry.2);
}
}

#[test]
fn test_load_sstable() {
let dir = tempdir().unwrap();

let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![
(b"a", Some(b"1"), 0),
(b"b", Some(b"2"), 1),
(b"c", Some(b"3"), 2),
(b"d", Some(b"4"), 3),
];

let mut memtable = MemTable::new();
for entry in &entries {
if let Some(value) = entry.1 {
memtable.set(entry.0, value, entry.2);
} else {
memtable.delete(entry.0, entry.2);
}
}

let table = SSTable::new(&memtable, 0, dir.path()).unwrap();

let table = SSTable::load_from_path(&table.path).unwrap();

let file = OpenOptions::new().read(true).open(&table.path).unwrap();
let mut reader = BufReader::new(file);

for (i, entry) in entries.iter().enumerate() {
assert_eq!(reader.stream_position().unwrap(), table.offsets[i]);
assert_next_entry(&mut reader, entry.0, entry.1, entry.2);
}
}

#[test]
fn test_key_in_range() {
let dir = tempdir().unwrap();

let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![
(b"a", Some(b"1"), 0),
(b"b", Some(b"2"), 1),
(b"c", Some(b"3"), 2),
(b"d", Some(b"4"), 3),
];

let mut memtable = MemTable::new();
for entry in &entries {
if let Some(value) = entry.1 {
memtable.set(entry.0, value, entry.2);
} else {
memtable.delete(entry.0, entry.2);
}
}

let table = SSTable::new(&memtable, 0, dir.path()).unwrap();

assert!(!table.key_in_range(b"A"));
assert!(table.key_in_range(b"c"));
assert!(!table.key_in_range(b"AA"));
}
}