Skip to content

Commit 827673c

Browse files
committed
feat: add sstable
1 parent 21c4e02 commit 827673c

File tree

2 files changed

+282
-4
lines changed

2 files changed

+282
-4
lines changed

src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
pub mod database;
22
mod mem_table;
3+
mod sstable;
34
mod utils;
45
mod wal;
56
mod wal_iterator;
6-
7-
// mod table;
8-
// mod table_manager;
9-
// mod utils;

src/sstable.rs

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
use std::fs::{create_dir_all, File, OpenOptions};
2+
use std::io::{self, BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
3+
use std::path::{Path, PathBuf};
4+
use std::time::{SystemTime, UNIX_EPOCH};
5+
use std::{u8, usize};
6+
7+
use crate::mem_table::MemTable;
8+
9+
pub struct SSTableEntry {
10+
key: Vec<u8>,
11+
value: Option<Vec<u8>>,
12+
timestamp: u128,
13+
}
14+
15+
pub struct SSTable {
16+
file: BufReader<File>,
17+
path: PathBuf,
18+
offsets: Vec<u64>,
19+
low_key: Vec<u8>,
20+
high_key: Vec<u8>,
21+
}
22+
23+
impl SSTable {
24+
pub fn new(memtable: &MemTable, level: usize, dir: &Path) -> io::Result<SSTable> {
25+
let timestamp = SystemTime::now()
26+
.duration_since(UNIX_EPOCH)
27+
.unwrap()
28+
.as_micros();
29+
30+
let path = Path::new(dir).join(format!("{}/{}.sstable", level, timestamp.to_string()));
31+
32+
create_dir_all(path.parent().unwrap())?;
33+
34+
let file = OpenOptions::new().append(true).create(true).open(&path)?;
35+
let mut file = BufWriter::new(file);
36+
37+
let mut offsets = Vec::new();
38+
let mut offset = 0;
39+
for entry in memtable.entries() {
40+
offsets.push(offset as u64);
41+
42+
file.write_all(&entry.key.len().to_le_bytes())?;
43+
file.write_all(&entry.key)?;
44+
45+
file.write_all(&(entry.deleted as u8).to_le_bytes())?;
46+
47+
if !entry.deleted {
48+
let value = entry.value.as_ref().unwrap();
49+
file.write_all(&value.len().to_le_bytes())?;
50+
file.write_all(value)?;
51+
}
52+
53+
file.write_all(&entry.timestamp.to_le_bytes())?;
54+
55+
offset += size_of::<usize>()
56+
+ size_of::<usize>()
57+
+ size_of::<u8>()
58+
+ size_of::<u128>()
59+
+ entry.key.len()
60+
+ entry.value.as_ref().unwrap_or(&vec![]).len();
61+
}
62+
63+
file.flush()?;
64+
65+
let file = OpenOptions::new().read(true).open(&path)?;
66+
let file = BufReader::new(file);
67+
68+
Ok(SSTable {
69+
file,
70+
path,
71+
offsets,
72+
low_key: memtable.entries().first().unwrap().key.clone(),
73+
high_key: memtable.entries().last().unwrap().key.clone(),
74+
})
75+
}
76+
77+
pub fn load_from_path(path: &Path) -> io::Result<SSTable> {
78+
let file = OpenOptions::new().read(true).open(&path)?;
79+
let mut file = BufReader::new(file);
80+
81+
let mut offsets = Vec::new();
82+
let mut offset = 0;
83+
while file.fill_buf()?.len() > 0 {
84+
offsets.push(offset as u64);
85+
86+
let mut buf = [0u8; size_of::<usize>()];
87+
file.read_exact(&mut buf)?;
88+
89+
let key_len = usize::from_le_bytes(buf);
90+
file.consume(key_len);
91+
92+
let mut buf = [0u8; size_of::<u8>()];
93+
file.read_exact(&mut buf)?;
94+
95+
let mut val_len = 0;
96+
if buf[0] != 0 {
97+
let mut buf = [0u8; size_of::<usize>()];
98+
file.read_exact(&mut buf)?;
99+
100+
val_len = usize::from_le_bytes(buf);
101+
file.consume(key_len);
102+
}
103+
104+
file.consume(size_of::<u128>());
105+
106+
offset += size_of::<usize>()
107+
+ size_of::<usize>()
108+
+ size_of::<u8>()
109+
+ size_of::<u128>()
110+
+ key_len
111+
+ val_len;
112+
}
113+
114+
file.seek(SeekFrom::Start(0))?;
115+
116+
let mut buf = [0u8; size_of::<usize>()];
117+
file.read_exact(&mut buf)?;
118+
119+
let key_len = usize::from_le_bytes(buf);
120+
let mut low_key = vec![0u8; key_len];
121+
file.read_exact(&mut low_key)?;
122+
123+
file.seek(SeekFrom::Start(*offsets.last().unwrap() as u64))?;
124+
125+
let mut buf = [0u8; size_of::<usize>()];
126+
file.read_exact(&mut buf)?;
127+
128+
let key_len = usize::from_le_bytes(buf);
129+
let mut high_key = vec![0u8; key_len];
130+
file.read_exact(&mut high_key)?;
131+
132+
Ok(SSTable {
133+
file,
134+
path: path.to_owned(),
135+
offsets,
136+
low_key,
137+
high_key,
138+
})
139+
}
140+
141+
pub fn key_in_range(&self, key: &[u8]) -> bool {
142+
key >= &self.low_key && key <= &self.high_key
143+
}
144+
145+
pub fn get(&mut self, key: &[u8]) -> io::Result<Option<SSTableEntry>> {
146+
let mut a = 0;
147+
let mut b = 0;
148+
while a <= b {
149+
let m = (a + b) / 2;
150+
let offset = self.offsets[m];
151+
152+
self.file.seek(SeekFrom::Start(offset))?;
153+
154+
let mut buf = [0u8; size_of::<usize>()];
155+
self.file.read_exact(&mut buf)?;
156+
157+
let key_len = usize::from_le_bytes(buf);
158+
let mut table_key = vec![0u8; key_len];
159+
self.file.read_exact(&mut table_key)?;
160+
161+
if key == &table_key {
162+
let mut buf = [0u8; size_of::<u8>()];
163+
self.file.read_exact(&mut buf)?;
164+
165+
let deleted = buf[0] == 1;
166+
167+
let mut val = None;
168+
if !deleted {
169+
let mut buf = [0u8; size_of::<usize>()];
170+
self.file.read_exact(&mut buf)?;
171+
172+
let val_len = usize::from_le_bytes(buf);
173+
let mut table_value = vec![0u8; val_len];
174+
self.file.read_exact(&mut table_value)?;
175+
176+
val = Some(table_value)
177+
}
178+
179+
let mut buf = [0u8; size_of::<u128>()];
180+
self.file.read_exact(&mut buf)?;
181+
182+
let timestamp = u128::from_le_bytes(buf);
183+
184+
return Ok(Some(SSTableEntry {
185+
key: table_key,
186+
value: val,
187+
timestamp,
188+
}));
189+
} else if key > &table_key {
190+
a = m + 1;
191+
} else if key < &table_key {
192+
b = m - 1;
193+
}
194+
}
195+
196+
Ok(None)
197+
}
198+
}
199+
200+
#[cfg(test)]
201+
mod tests {
202+
use crate::{mem_table::MemTable, sstable::SSTable};
203+
use std::{
204+
fs::{File, OpenOptions},
205+
io::{BufReader, Read, Seek},
206+
};
207+
use tempfile::tempdir;
208+
209+
fn assert_next_entry(
210+
reader: &mut BufReader<File>,
211+
key: &[u8],
212+
value: Option<&[u8]>,
213+
timestamp: u128,
214+
) {
215+
// Assert key lengths on disk are the same
216+
let mut len_buf = [0u8; size_of::<usize>()];
217+
reader.read_exact(&mut len_buf).unwrap();
218+
let key_len = usize::from_le_bytes(len_buf);
219+
assert_eq!(key_len, key.len());
220+
221+
// Assert key values on disk are the same
222+
let mut file_key = vec![0u8; key_len];
223+
reader.read_exact(&mut file_key).unwrap();
224+
assert_eq!(file_key, key);
225+
226+
// Assert deleted booleans on disk are the same
227+
let mut deleted_buf = [0u8; size_of::<u8>()];
228+
reader.read_exact(&mut deleted_buf).unwrap();
229+
let deleted = deleted_buf[0] == 1;
230+
assert_eq!(deleted, value.is_none());
231+
232+
if !deleted {
233+
let mut len_buf = [0u8; size_of::<usize>()];
234+
reader.read_exact(&mut len_buf).unwrap();
235+
let val_len = usize::from_le_bytes(len_buf);
236+
assert_eq!(val_len, value.unwrap().len());
237+
238+
// Assert key values on disk are the same
239+
let mut file_value = vec![0u8; val_len];
240+
reader.read_exact(&mut file_value).unwrap();
241+
assert_eq!(file_value, value.unwrap());
242+
}
243+
244+
// Assert timestamps on disk are the same
245+
let mut timestamp_buf = [0u8; size_of::<u128>()];
246+
reader.read_exact(&mut timestamp_buf).unwrap();
247+
let file_timestamp = u128::from_le_bytes(timestamp_buf);
248+
assert_eq!(file_timestamp, timestamp);
249+
}
250+
251+
#[test]
252+
fn test_new_sstable() {
253+
let dir = tempdir().unwrap();
254+
255+
let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![
256+
(b"a", Some(b"1"), 0),
257+
(b"b", Some(b"2"), 1),
258+
(b"c", Some(b"3"), 2),
259+
(b"d", Some(b"4"), 3),
260+
];
261+
262+
let mut memtable = MemTable::new();
263+
for entry in &entries {
264+
if let Some(value) = entry.1 {
265+
memtable.set(entry.0, value, entry.2);
266+
} else {
267+
memtable.delete(entry.0, entry.2);
268+
}
269+
}
270+
271+
let table = SSTable::new(&memtable, 0, dir.path()).unwrap();
272+
273+
let file = OpenOptions::new().read(true).open(&table.path).unwrap();
274+
let mut reader = BufReader::new(file);
275+
276+
for (i, entry) in entries.iter().enumerate() {
277+
assert_eq!(reader.stream_position().unwrap(), table.offsets[i]);
278+
assert_next_entry(&mut reader, entry.0, entry.1, entry.2);
279+
}
280+
}
281+
}

0 commit comments

Comments
 (0)