Skip to content

Commit d5c2228

Browse files
committed
feat: add sstable
1 parent 21c4e02 commit d5c2228

File tree

2 files changed

+322
-4
lines changed

2 files changed

+322
-4
lines changed

src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
pub mod database;
22
mod mem_table;
3+
mod sstable;
34
mod utils;
45
mod wal;
56
mod wal_iterator;
6-
7-
// mod table;
8-
// mod table_manager;
9-
// mod utils;

src/sstable.rs

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
use std::fs::{create_dir_all, File, OpenOptions};
2+
use std::io::{self, BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
3+
use std::path::{Path, PathBuf};
4+
use std::time::{SystemTime, UNIX_EPOCH};
5+
use std::{u8, usize};
6+
7+
use crate::mem_table::MemTable;
8+
9+
pub struct SSTableEntry {
10+
key: Vec<u8>,
11+
value: Option<Vec<u8>>,
12+
timestamp: u128,
13+
}
14+
15+
pub struct SSTable {
16+
file: BufReader<File>,
17+
path: PathBuf,
18+
offsets: Vec<u64>,
19+
low_key: Vec<u8>,
20+
high_key: Vec<u8>,
21+
}
22+
23+
impl SSTable {
24+
pub fn new(memtable: &MemTable, level: usize, dir: &Path) -> io::Result<SSTable> {
25+
let timestamp = SystemTime::now()
26+
.duration_since(UNIX_EPOCH)
27+
.unwrap()
28+
.as_micros();
29+
30+
let path = Path::new(dir).join(format!("{}/{}.sstable", level, timestamp.to_string()));
31+
32+
create_dir_all(path.parent().unwrap())?;
33+
34+
let file = OpenOptions::new().append(true).create(true).open(&path)?;
35+
let mut file = BufWriter::new(file);
36+
37+
let mut offsets = Vec::new();
38+
let mut offset = 0;
39+
for entry in memtable.entries() {
40+
offsets.push(offset as u64);
41+
42+
file.write_all(&entry.key.len().to_le_bytes())?;
43+
file.write_all(&entry.key)?;
44+
45+
file.write_all(&(entry.deleted as u8).to_le_bytes())?;
46+
47+
if !entry.deleted {
48+
let value = entry.value.as_ref().unwrap();
49+
file.write_all(&value.len().to_le_bytes())?;
50+
file.write_all(value)?;
51+
}
52+
53+
file.write_all(&entry.timestamp.to_le_bytes())?;
54+
55+
offset += size_of::<usize>()
56+
+ size_of::<u8>()
57+
+ size_of::<u128>()
58+
+ entry.key.len()
59+
+ if !entry.deleted {
60+
size_of::<usize>() + entry.value.as_ref().unwrap().len()
61+
} else {
62+
0
63+
}
64+
}
65+
66+
file.flush()?;
67+
68+
let file = OpenOptions::new().read(true).open(&path)?;
69+
let file = BufReader::new(file);
70+
71+
Ok(SSTable {
72+
file,
73+
path,
74+
offsets,
75+
low_key: memtable.entries().first().unwrap().key.clone(),
76+
high_key: memtable.entries().last().unwrap().key.clone(),
77+
})
78+
}
79+
80+
pub fn load_from_path(path: &Path) -> io::Result<SSTable> {
81+
let file = OpenOptions::new().read(true).open(&path)?;
82+
let mut file = BufReader::new(file);
83+
84+
let mut offsets = Vec::new();
85+
let mut offset = 0;
86+
while file.fill_buf()?.len() > 0 {
87+
offsets.push(offset as u64);
88+
89+
let mut buf = [0u8; size_of::<usize>()];
90+
file.read_exact(&mut buf)?;
91+
92+
let key_len = usize::from_le_bytes(buf);
93+
file.seek_relative(key_len as i64)?;
94+
95+
let mut buf = [0u8; size_of::<u8>()];
96+
file.read_exact(&mut buf)?;
97+
let deleted = buf[0] == 1;
98+
99+
let mut val_len = 0;
100+
if !deleted {
101+
let mut buf = [0u8; size_of::<usize>()];
102+
file.read_exact(&mut buf)?;
103+
104+
val_len = usize::from_le_bytes(buf);
105+
file.seek_relative(val_len as i64)?;
106+
}
107+
108+
file.seek_relative(size_of::<u128>() as i64)?;
109+
110+
offset += size_of::<usize>()
111+
+ size_of::<u8>()
112+
+ size_of::<u128>()
113+
+ key_len
114+
+ if !deleted {
115+
size_of::<usize>() + val_len
116+
} else {
117+
0
118+
}
119+
}
120+
121+
file.seek(SeekFrom::Start(0))?;
122+
123+
let mut buf = [0u8; size_of::<usize>()];
124+
file.read_exact(&mut buf)?;
125+
126+
let key_len = usize::from_le_bytes(buf);
127+
let mut low_key = vec![0u8; key_len];
128+
file.read_exact(&mut low_key)?;
129+
130+
file.seek(SeekFrom::Start(*offsets.last().unwrap() as u64))?;
131+
132+
let mut buf = [0u8; size_of::<usize>()];
133+
file.read_exact(&mut buf)?;
134+
135+
let key_len = usize::from_le_bytes(buf);
136+
let mut high_key = vec![0u8; key_len];
137+
file.read_exact(&mut high_key)?;
138+
139+
Ok(SSTable {
140+
file,
141+
path: path.to_owned(),
142+
offsets,
143+
low_key,
144+
high_key,
145+
})
146+
}
147+
148+
pub fn key_in_range(&self, key: &[u8]) -> bool {
149+
key >= &self.low_key && key <= &self.high_key
150+
}
151+
152+
pub fn get(&mut self, key: &[u8]) -> io::Result<Option<SSTableEntry>> {
153+
let mut a = 0;
154+
let mut b = 0;
155+
while a <= b {
156+
let m = (a + b) / 2;
157+
let offset = self.offsets[m];
158+
159+
self.file.seek(SeekFrom::Start(offset))?;
160+
161+
let mut buf = [0u8; size_of::<usize>()];
162+
self.file.read_exact(&mut buf)?;
163+
164+
let key_len = usize::from_le_bytes(buf);
165+
let mut table_key = vec![0u8; key_len];
166+
self.file.read_exact(&mut table_key)?;
167+
168+
if key == &table_key {
169+
let mut buf = [0u8; size_of::<u8>()];
170+
self.file.read_exact(&mut buf)?;
171+
172+
let deleted = buf[0] == 1;
173+
174+
let mut val = None;
175+
if !deleted {
176+
let mut buf = [0u8; size_of::<usize>()];
177+
self.file.read_exact(&mut buf)?;
178+
179+
let val_len = usize::from_le_bytes(buf);
180+
let mut table_value = vec![0u8; val_len];
181+
self.file.read_exact(&mut table_value)?;
182+
183+
val = Some(table_value)
184+
}
185+
186+
let mut buf = [0u8; size_of::<u128>()];
187+
self.file.read_exact(&mut buf)?;
188+
189+
let timestamp = u128::from_le_bytes(buf);
190+
191+
return Ok(Some(SSTableEntry {
192+
key: table_key,
193+
value: val,
194+
timestamp,
195+
}));
196+
} else if key > &table_key {
197+
a = m + 1;
198+
} else if key < &table_key {
199+
b = m - 1;
200+
}
201+
}
202+
203+
Ok(None)
204+
}
205+
}
206+
207+
#[cfg(test)]
208+
mod tests {
209+
use crate::{mem_table::MemTable, sstable::SSTable};
210+
use std::{
211+
fs::{File, OpenOptions},
212+
io::{BufReader, Read, Seek},
213+
};
214+
use tempfile::tempdir;
215+
216+
fn assert_next_entry(
217+
reader: &mut BufReader<File>,
218+
key: &[u8],
219+
value: Option<&[u8]>,
220+
timestamp: u128,
221+
) {
222+
// Assert key lengths on disk are the same
223+
let mut len_buf = [0u8; size_of::<usize>()];
224+
reader.read_exact(&mut len_buf).unwrap();
225+
let key_len = usize::from_le_bytes(len_buf);
226+
assert_eq!(key_len, key.len());
227+
228+
// Assert key values on disk are the same
229+
let mut file_key = vec![0u8; key_len];
230+
reader.read_exact(&mut file_key).unwrap();
231+
assert_eq!(file_key, key);
232+
233+
// Assert deleted booleans on disk are the same
234+
let mut deleted_buf = [0u8; size_of::<u8>()];
235+
reader.read_exact(&mut deleted_buf).unwrap();
236+
let deleted = deleted_buf[0] == 1;
237+
assert_eq!(deleted, value.is_none());
238+
239+
if !deleted {
240+
let mut len_buf = [0u8; size_of::<usize>()];
241+
reader.read_exact(&mut len_buf).unwrap();
242+
let val_len = usize::from_le_bytes(len_buf);
243+
assert_eq!(val_len, value.unwrap().len());
244+
245+
// Assert key values on disk are the same
246+
let mut file_value = vec![0u8; val_len];
247+
reader.read_exact(&mut file_value).unwrap();
248+
assert_eq!(file_value, value.unwrap());
249+
}
250+
251+
// Assert timestamps on disk are the same
252+
let mut timestamp_buf = [0u8; size_of::<u128>()];
253+
reader.read_exact(&mut timestamp_buf).unwrap();
254+
let file_timestamp = u128::from_le_bytes(timestamp_buf);
255+
assert_eq!(file_timestamp, timestamp);
256+
}
257+
258+
#[test]
259+
fn test_new_sstable() {
260+
let dir = tempdir().unwrap();
261+
262+
let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![
263+
(b"a", Some(b"1"), 0),
264+
(b"b", Some(b"2"), 1),
265+
(b"c", Some(b"3"), 2),
266+
(b"d", Some(b"4"), 3),
267+
];
268+
269+
let mut memtable = MemTable::new();
270+
for entry in &entries {
271+
if let Some(value) = entry.1 {
272+
memtable.set(entry.0, value, entry.2);
273+
} else {
274+
memtable.delete(entry.0, entry.2);
275+
}
276+
}
277+
278+
let table = SSTable::new(&memtable, 0, dir.path()).unwrap();
279+
280+
let file = OpenOptions::new().read(true).open(&table.path).unwrap();
281+
let mut reader = BufReader::new(file);
282+
283+
for (i, entry) in entries.iter().enumerate() {
284+
assert_eq!(reader.stream_position().unwrap(), table.offsets[i]);
285+
assert_next_entry(&mut reader, entry.0, entry.1, entry.2);
286+
}
287+
}
288+
289+
#[test]
290+
fn test_load_sstable() {
291+
let dir = tempdir().unwrap();
292+
293+
let entries: Vec<(&[u8], Option<&[u8]>, u128)> = vec![
294+
(b"a", Some(b"1"), 0),
295+
(b"b", Some(b"2"), 1),
296+
(b"c", Some(b"3"), 2),
297+
(b"d", Some(b"4"), 3),
298+
];
299+
300+
let mut memtable = MemTable::new();
301+
for entry in &entries {
302+
if let Some(value) = entry.1 {
303+
memtable.set(entry.0, value, entry.2);
304+
} else {
305+
memtable.delete(entry.0, entry.2);
306+
}
307+
}
308+
309+
let table = SSTable::new(&memtable, 0, dir.path()).unwrap();
310+
311+
let table = SSTable::load_from_path(&table.path).unwrap();
312+
313+
let file = OpenOptions::new().read(true).open(&table.path).unwrap();
314+
let mut reader = BufReader::new(file);
315+
316+
for (i, entry) in entries.iter().enumerate() {
317+
assert_eq!(reader.stream_position().unwrap(), table.offsets[i]);
318+
assert_next_entry(&mut reader, entry.0, entry.1, entry.2);
319+
}
320+
}
321+
}

0 commit comments

Comments
 (0)