diff --git a/Cargo.lock b/Cargo.lock index c6c692b..2d73244 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,6 +51,7 @@ dependencies = [ "moka", "ouroboros", "serde", + "serde_json", "tempfile", ] @@ -170,6 +171,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + [[package]] name = "js-sys" version = "0.3.81" @@ -207,6 +214,12 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "memchr" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" + [[package]] name = "moka" version = "0.12.11" @@ -358,6 +371,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + [[package]] name = "scopeguard" version = "1.2.0" @@ -377,6 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ "serde_core", + "serde_derive", ] [[package]] @@ -399,6 +419,19 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.145" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", + "serde_core", +] + [[package]] name = "smallvec" version = "1.15.1" diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 5f57b96..ef87468 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -12,7 +12,8 @@ crossbeam = "0.8.4" crossbeam-skiplist = "0.1.3" moka = { version = "0.12.11", features = ["sync"] } ouroboros = "0.18.5" -serde = "1.0.228" +serde = { version = "1.0.228", features = ["derive"]} +serde_json = "1.0.145" [dev-dependencies] tempfile = "3.23.0" diff --git a/storage/src/compaction/compact.rs b/storage/src/compaction/compact.rs index 9c91feb..e886648 100644 --- a/storage/src/compaction/compact.rs +++ b/storage/src/compaction/compact.rs @@ -10,6 +10,7 @@ use anyhow::{Ok, Result}; use crate::{ SSTableBuilder, SSTableIterator, Storage, common::iterator::StorageIterator, iterators::merged_iterator::MergedIterator, lsm_storage::StorageState, + manifest::ManifestRecord::Compaction, }; const COMPACT_INTERVAL: Duration = Duration::from_secs(60); @@ -40,37 +41,44 @@ impl Storage { merged_iter.next()?; } - let id = self.get_sst_id(); - let table = builder.build(id, self.block_cache.clone(), self.sst_path(id))?; + let compact_sst_id = { + let id = self.get_sst_id(); + let table = builder.build(id, self.block_cache.clone(), self.sst_path(id))?; - let mut write_guard = self.state.write().unwrap(); - let mut l0_sstables = write_guard.l0_sstables.clone(); - let mut sstables = write_guard.sstables.clone(); - let mut levels = write_guard.levels.clone(); + let mut write_guard = self.state.write().unwrap(); + let mut l0_sstables = write_guard.l0_sstables.clone(); + let mut sstables = write_guard.sstables.clone(); + let mut levels = write_guard.levels.clone(); - for sst_id in l0.iter() { - sstables.remove(sst_id); - l0_sstables.retain(|&x| x != *sst_id); - remove_file(self.sst_path(*sst_id))?; - } + for sst_id in l0.iter() { + sstables.remove(sst_id); + l0_sstables.retain(|&x| x != *sst_id); + remove_file(self.sst_path(*sst_id))?; + } - for sst_id in l1.iter() { - sstables.remove(sst_id); - levels[0].1.retain(|&x| x != *sst_id); - remove_file(self.sst_path(*sst_id))?; - } + for sst_id in l1.iter() { + sstables.remove(sst_id); + levels[0].1.retain(|&x| x != *sst_id); + remove_file(self.sst_path(*sst_id))?; + } - sstables.insert(id, Arc::new(table)); - levels[0].1.insert(0, id); + sstables.insert(id, Arc::new(table)); + levels[0].1.insert(0, id); - *write_guard = Arc::new(StorageState { - memtable: write_guard.memtable.clone(), - frozen_memtables: write_guard.frozen_memtables.clone(), - l0_sstables, - sstables, - levels, - }); + *write_guard = Arc::new(StorageState { + memtable: write_guard.memtable.clone(), + frozen_memtables: write_guard.frozen_memtables.clone(), + l0_sstables, + sstables, + levels, + }); + id + }; + + let state_lock = self.state_lock.lock().unwrap(); + self.manifest + .add_record(&state_lock, Compaction(compact_sst_id)); Ok(()) } @@ -90,6 +98,7 @@ impl Storage { mod tests { use std::{ops::Bound, str::from_utf8}; + use bytes::Bytes; use tempfile::tempdir; use crate::{Config, common::iterator::StorageIterator, lsm_util::get_entries, new}; @@ -244,4 +253,42 @@ mod tests { vec!["3", "2", "4", "5", "6", "7", "8", "9", "10", "11", "12"] ); } + + #[test] + fn test_get_key() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create two sstables at l0 with a, b, c & d + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // compact l0 sstables to an l1 sstable with keys a, b, c & d + storage.trigger_compaction().expect("compacted"); + + // create another l0 sstable, will have keys e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let l1_value = storage.get(b"a").expect("a exists"); + assert_eq!(l1_value, Bytes::copy_from_slice(b"1")); + + let l0_value = storage.get(b"e").expect("a exists"); + assert_eq!(l0_value, Bytes::copy_from_slice(b"5")); + } } diff --git a/storage/src/compaction/flush.rs b/storage/src/compaction/flush.rs index acd6a7b..e63f417 100644 --- a/storage/src/compaction/flush.rs +++ b/storage/src/compaction/flush.rs @@ -5,40 +5,46 @@ use std::{ time::Duration, }; -use crate::{SSTableBuilder, Storage, lsm_storage::StorageState}; +use crate::{SSTableBuilder, Storage, lsm_storage::StorageState, manifest::ManifestRecord::Flush}; const FLUSH_INTERVAL: Duration = Duration::from_millis(50); impl Storage { pub(crate) fn flush_frozen_memtable(&self) -> Result<()> { - let mut sst_builder = SSTableBuilder::new(self.config.block_size); - let mut guard = self.state.write().unwrap(); + let sst_id = { + let mut sst_builder = SSTableBuilder::new(self.config.block_size); + let mut guard = self.state.write().unwrap(); - let mut memtables = guard.frozen_memtables.clone(); - let mut l0_sstables = guard.l0_sstables.clone(); - let mut sstables = guard.sstables.clone(); + let mut memtables = guard.frozen_memtables.clone(); + let mut l0_sstables = guard.l0_sstables.clone(); + let mut sstables = guard.sstables.clone(); - let Some(memtable) = memtables.pop() else { - return Ok(()); - }; - memtable.flush(&mut sst_builder)?; + let Some(memtable) = memtables.pop() else { + return Ok(()); + }; + memtable.flush(&mut sst_builder)?; + + let sst = sst_builder.build( + memtable.id, + self.block_cache.clone(), + self.sst_path(memtable.id), + )?; + l0_sstables.insert(0, memtable.id); + sstables.insert(memtable.id, Arc::new(sst)); - let sst = sst_builder.build( - memtable.id, - self.block_cache.clone(), - self.sst_path(memtable.id), - )?; - l0_sstables.insert(0, memtable.id); - sstables.insert(memtable.id, Arc::new(sst)); + *guard = Arc::new(StorageState { + memtable: guard.memtable.clone(), + frozen_memtables: memtables, + levels: guard.levels.clone(), + l0_sstables, + sstables, + }); - *guard = Arc::new(StorageState { - memtable: guard.memtable.clone(), - frozen_memtables: memtables, - levels: guard.levels.clone(), - l0_sstables, - sstables, - }); + memtable.id + }; + let state_lock = self.state_lock.lock().unwrap(); + self.manifest.add_record(&state_lock, Flush(sst_id))?; Ok(()) } @@ -56,7 +62,7 @@ impl Storage { } pub fn sst_path(&self, id: usize) -> String { - format!("{}/sst/{}.sst", self.config.db_dir, id) + format!("{}/{}.sst", self.config.db_dir, id) } pub fn spawn_flusher(self: &Arc) -> JoinHandle<()> { diff --git a/storage/src/lib.rs b/storage/src/lib.rs index e4152d4..3eb61ce 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -3,6 +3,7 @@ pub mod compaction; pub mod iterators; pub mod lsm_storage; mod lsm_util; +mod manifest; pub mod memtable; mod block; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 02a5b4b..d48963b 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -1,3 +1,5 @@ +use anyhow::Result; +use bytes::Bytes; use std::{ collections::HashMap, ops::Bound, @@ -16,11 +18,10 @@ use crate::{ merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator, }, lsm_util::{create_db_dir, load_sstables}, + manifest::{Manifest, ManifestRecord}, memtable::{memtable_iterator::map_bound, table::Memtable}, sst::BlockCache, }; -use anyhow::Result; -use bytes::Bytes; #[derive(Debug)] pub struct Storage { @@ -29,6 +30,7 @@ pub struct Storage { pub(crate) block_cache: Arc, pub(crate) state_lock: Mutex<()>, pub(crate) sst_id: AtomicUsize, + pub(crate) manifest: Manifest, } #[derive(Debug)] @@ -50,11 +52,28 @@ pub struct Config { pub fn new(config: Config) -> Arc { let db_dir = Path::new(&config.db_dir); + let manifest_file = db_dir.join("manifest"); create_db_dir(db_dir); let block_cache = Arc::new(BlockCache::new(4096)); - let (l0_sstables, sstables) = load_sstables(db_dir, block_cache).expect("loaded sstables"); - let sst_id = match l0_sstables.iter().max() { + let manifest; + let mut manifest_records: Vec = vec![]; + + match Manifest::recover(&manifest_file) { + Ok((man, manifest_recs)) => { + manifest = man; + manifest_records = manifest_recs; + } + Err(_) => manifest = Manifest::create(manifest_file).unwrap(), + } + + let (l0_sst_ids, l1_sst_ids, sstables) = + load_sstables(db_dir, block_cache, manifest_records).expect("loaded sstables"); + + let sst_id = match ([l0_sst_ids.clone(), l1_sst_ids.clone()].concat()) + .iter() + .max() + { Some(id) => id + 1, None => 0, }; @@ -63,11 +82,12 @@ pub fn new(config: Config) -> Arc { config, sst_id: AtomicUsize::new(sst_id), state_lock: Mutex::new(()), - block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache + manifest, + block_cache: Arc::new(BlockCache::new(1 << 20)), // 1mb cache state: RwLock::new(Arc::new(StorageState { - l0_sstables, + l0_sstables: l0_sst_ids, sstables, - levels: vec![(0, vec![])], + levels: vec![(0, l1_sst_ids)], frozen_memtables: Vec::new(), memtable: Arc::new(Memtable::new(sst_id)), })), @@ -106,7 +126,7 @@ impl Storage { } } - // search in l1 ssts + // search in l0 ssts if res.is_none() { let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); for table_id in state.l0_sstables.iter() { @@ -510,4 +530,112 @@ mod tests { assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); } + + #[test] + fn get_key_within_range() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage.trigger_compaction().expect("compacted"); + + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; + for (key, value) in new_entries { + let _ = storage.put(key, value); + } + + // this will create an sst with a & e + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let mut iter = storage + .scan(Bound::Included(b"d"), Bound::Included(b"f")) + .unwrap(); + let mut keys = vec![]; + let mut values = vec![]; + + while iter.is_valid() { + let k = from_utf8(iter.key()).unwrap(); + let v = from_utf8(iter.value()).unwrap(); + + keys.push(String::from(k)); + values.push(String::from(v)); + + let _ = iter.next(); + } + + assert_eq!(keys, vec!["d", "e", "f"]); + assert_eq!(values, vec!["22", "21", "6"]); + } + + #[test] + fn test_manifest_recovery() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage.trigger_compaction().expect("compacted"); + + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage.trigger_compaction().expect("compacted"); + + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + let storage = new(config); + + let state = { + let guard = storage.state.read().unwrap(); + guard.clone() + }; + + assert_eq!(state.l0_sstables, [3, 2]); + assert_eq!(state.levels[0].1, [8]) + } } diff --git a/storage/src/lsm_util.rs b/storage/src/lsm_util.rs index bf70e64..3d2a023 100644 --- a/storage/src/lsm_util.rs +++ b/storage/src/lsm_util.rs @@ -1,44 +1,56 @@ use anyhow::Result; -use std::{cmp::Reverse, collections::HashMap, fs, io, path::Path, sync::Arc, time::SystemTime}; - -use crate::{FileObject, SSTable, sst::BlockCache}; - -type LoadedSstables = (Vec, HashMap>); - -fn read_dir_sorted>(path: P) -> io::Result> { - let mut entries: Vec = fs::read_dir(path)?.filter_map(Result::ok).collect(); - entries.sort_by_key(|entry| { - Reverse( - entry - .metadata() - .and_then(|m| m.modified()) - .unwrap_or(SystemTime::UNIX_EPOCH), - ) - }); - Ok(entries) -} +use std::{collections::HashMap, fs, path::Path, sync::Arc}; + +use crate::{FileObject, SSTable, manifest::ManifestRecord, sst::BlockCache}; -pub(crate) fn load_sstables(path: &Path, block_cache: Arc) -> Result { - let mut l0_sstables = vec![]; +type LoadedSstables = (Vec, Vec, HashMap>); + +pub(crate) fn load_sstables( + path: &Path, + block_cache: Arc, + manifest_recs: Vec, +) -> Result { + let mut l0 = vec![]; + let mut l1 = vec![]; let mut sstables = HashMap::new(); - for entry in read_dir_sorted(path.join("sst")).unwrap() { - let sst_path = entry.path(); - let filename = sst_path.file_name().unwrap().to_str().unwrap(); - let sst_id = filename.rsplit_once(".").unwrap().0.parse().unwrap(); + for record in manifest_recs { + match record { + ManifestRecord::Flush(sst_id) => { + l0.insert(0, sst_id); + } + ManifestRecord::Compaction(sst_id) => { + // during compaction all l0 sstables are compacted to a single l1 sstable + l0.clear(); - let file = FileObject::open(sst_path.as_path()).expect("failed to open file"); - let sst = SSTable::open(sst_id, block_cache.clone(), file).expect("failed to open sstable"); + // we only support full compaction which means there's only one l1 sstable + l1 = vec![sst_id] + } + _ => {} + } + } + + for l0_sst_id in &l0 { + let sst_path = path.join(format!("{}.sst", l0_sst_id)); + let file = FileObject::open(&sst_path).expect("failed to open file"); + let sst = + SSTable::open(*l0_sst_id, block_cache.clone(), file).expect("failed to open sstable"); + sstables.insert(sst.id, Arc::new(sst)); + } - l0_sstables.push(sst.id); + for l1_sst_id in &l1 { + let sst_path = path.join(format!("{}.sst", l1_sst_id)); + let file = FileObject::open(&sst_path).expect("failed to open file"); + let sst = + SSTable::open(*l1_sst_id, block_cache.clone(), file).expect("failed to open sstable"); sstables.insert(sst.id, Arc::new(sst)); } - anyhow::Ok((l0_sstables, sstables)) + anyhow::Ok((l0, l1, sstables)) } pub(crate) fn create_db_dir(path: &Path) { - fs::create_dir_all(path.join("sst")).expect("failed to create db dir"); + fs::create_dir_all(path).expect("failed to create db dir"); } pub fn get_entries() -> Vec<(&'static [u8], &'static [u8])> { diff --git a/storage/src/manifest.rs b/storage/src/manifest.rs new file mode 100644 index 0000000..48a565b --- /dev/null +++ b/storage/src/manifest.rs @@ -0,0 +1,107 @@ +use std::fs::OpenOptions; +use std::io::{Read, Write}; +use std::path::Path; +use std::sync::{Arc, MutexGuard}; +use std::{fs::File, sync::Mutex}; + +use anyhow::Result; +use bytes::Buf; +use serde::{Deserialize, Serialize}; + +#[derive(Debug)] +pub struct Manifest { + file: Arc>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ManifestRecord { + Flush(usize), + NewMemtable(usize), + Compaction(usize), +} + +impl Manifest { + pub fn create(path: impl AsRef) -> Result { + let mut open_opts = OpenOptions::new(); + open_opts.read(true).write(true).create(true); + + let file = Mutex::new(open_opts.open(path)?); + Ok(Self { + file: Arc::new(file), + }) + } + + pub fn recover(path: impl AsRef) -> Result<(Self, Vec)> { + let mut file = OpenOptions::new().read(true).append(true).open(path)?; + + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + let mut buf_ptr = buf.as_slice(); + + let mut records = Vec::new(); + while buf_ptr.has_remaining() { + let len = buf_ptr.get_u64(); + let slice = &buf_ptr[..len as usize]; + let json = serde_json::from_slice::(slice)?; + buf_ptr.advance(len as usize); + records.push(json); + } + + Ok(( + Self { + file: Arc::new(Mutex::new(file)), + }, + records, + )) + } + + pub fn add_record( + &self, + _state_lock_observer: &MutexGuard<()>, + record: ManifestRecord, + ) -> Result<()> { + self.add_record_when_init(record) + } + + // TODO: compact the manifest when it gets too large + pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> { + let mut file = self.file.lock().expect("to have acquired lock"); + let json_record = serde_json::to_vec(&record)?; + + file.write_all(&(json_record.len() as u64).to_be_bytes())?; + file.write_all(&json_record)?; + file.sync_all()?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + + use super::ManifestRecord::{Compaction, Flush}; + use super::*; + use std::sync::Mutex; + use tempfile::tempdir; + + #[test] + fn test_manifest_recovery() { + let dir = tempdir().unwrap(); + let manifest_file = dir.path().join("manifest"); + + let manifest = Manifest::create(manifest_file.as_path()).unwrap(); + + let lock = Mutex::default(); + manifest + .add_record(&lock.lock().unwrap(), Flush(1)) + .unwrap(); + manifest + .add_record(&lock.lock().unwrap(), Flush(2)) + .unwrap(); + manifest + .add_record(&lock.lock().unwrap(), Compaction(3)) + .unwrap(); + + let manifest_records = Manifest::recover(manifest_file.as_path()).unwrap(); + assert_eq!(manifest_records.1.len(), 3); + } +}