Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"
moka = { version = "0.12.11", features = ["sync"] }
ouroboros = "0.18.5"
serde = "1.0.228"
serde = { version = "1.0.228", features = ["derive"]}
serde_json = "1.0.145"

[dev-dependencies]
tempfile = "3.23.0"
97 changes: 72 additions & 25 deletions storage/src/compaction/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::{Ok, Result};
use crate::{
SSTableBuilder, SSTableIterator, Storage, common::iterator::StorageIterator,
iterators::merged_iterator::MergedIterator, lsm_storage::StorageState,
manifest::ManifestRecord::Compaction,
};
const COMPACT_INTERVAL: Duration = Duration::from_secs(60);

Expand Down Expand Up @@ -40,37 +41,44 @@ impl Storage {
merged_iter.next()?;
}

let id = self.get_sst_id();
let table = builder.build(id, self.block_cache.clone(), self.sst_path(id))?;
let compact_sst_id = {
let id = self.get_sst_id();
let table = builder.build(id, self.block_cache.clone(), self.sst_path(id))?;

let mut write_guard = self.state.write().unwrap();
let mut l0_sstables = write_guard.l0_sstables.clone();
let mut sstables = write_guard.sstables.clone();
let mut levels = write_guard.levels.clone();
let mut write_guard = self.state.write().unwrap();
let mut l0_sstables = write_guard.l0_sstables.clone();
let mut sstables = write_guard.sstables.clone();
let mut levels = write_guard.levels.clone();

for sst_id in l0.iter() {
sstables.remove(sst_id);
l0_sstables.retain(|&x| x != *sst_id);
remove_file(self.sst_path(*sst_id))?;
}
for sst_id in l0.iter() {
sstables.remove(sst_id);
l0_sstables.retain(|&x| x != *sst_id);
remove_file(self.sst_path(*sst_id))?;
}

for sst_id in l1.iter() {
sstables.remove(sst_id);
levels[0].1.retain(|&x| x != *sst_id);
remove_file(self.sst_path(*sst_id))?;
}
for sst_id in l1.iter() {
sstables.remove(sst_id);
levels[0].1.retain(|&x| x != *sst_id);
remove_file(self.sst_path(*sst_id))?;
}

sstables.insert(id, Arc::new(table));
levels[0].1.insert(0, id);
sstables.insert(id, Arc::new(table));
levels[0].1.insert(0, id);

*write_guard = Arc::new(StorageState {
memtable: write_guard.memtable.clone(),
frozen_memtables: write_guard.frozen_memtables.clone(),
l0_sstables,
sstables,
levels,
});
*write_guard = Arc::new(StorageState {
memtable: write_guard.memtable.clone(),
frozen_memtables: write_guard.frozen_memtables.clone(),
l0_sstables,
sstables,
levels,
});

id
};

let state_lock = self.state_lock.lock().unwrap();
self.manifest
.add_record(&state_lock, Compaction(compact_sst_id));
Ok(())
}

Expand All @@ -90,6 +98,7 @@ impl Storage {
mod tests {
use std::{ops::Bound, str::from_utf8};

use bytes::Bytes;
use tempfile::tempdir;

use crate::{Config, common::iterator::StorageIterator, lsm_util::get_entries, new};
Expand Down Expand Up @@ -244,4 +253,42 @@ mod tests {
vec!["3", "2", "4", "5", "6", "7", "8", "9", "10", "11", "12"]
);
}

#[test]
fn test_get_key() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
};
let storage = new(config);

for (key, value) in get_entries() {
storage.put(key, value).unwrap();
}

// will create two sstables at l0 with a, b, c & d
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

// compact l0 sstables to an l1 sstable with keys a, b, c & d
storage.trigger_compaction().expect("compacted");

// create another l0 sstable, will have keys e & f
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

let l1_value = storage.get(b"a").expect("a exists");
assert_eq!(l1_value, Bytes::copy_from_slice(b"1"));

let l0_value = storage.get(b"e").expect("a exists");
assert_eq!(l0_value, Bytes::copy_from_slice(b"5"));
}
}
56 changes: 31 additions & 25 deletions storage/src/compaction/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,46 @@ use std::{
time::Duration,
};

use crate::{SSTableBuilder, Storage, lsm_storage::StorageState};
use crate::{SSTableBuilder, Storage, lsm_storage::StorageState, manifest::ManifestRecord::Flush};

const FLUSH_INTERVAL: Duration = Duration::from_millis(50);

impl Storage {
pub(crate) fn flush_frozen_memtable(&self) -> Result<()> {
let mut sst_builder = SSTableBuilder::new(self.config.block_size);
let mut guard = self.state.write().unwrap();
let sst_id = {
let mut sst_builder = SSTableBuilder::new(self.config.block_size);
let mut guard = self.state.write().unwrap();

let mut memtables = guard.frozen_memtables.clone();
let mut l0_sstables = guard.l0_sstables.clone();
let mut sstables = guard.sstables.clone();
let mut memtables = guard.frozen_memtables.clone();
let mut l0_sstables = guard.l0_sstables.clone();
let mut sstables = guard.sstables.clone();

let Some(memtable) = memtables.pop() else {
return Ok(());
};
memtable.flush(&mut sst_builder)?;
let Some(memtable) = memtables.pop() else {
return Ok(());
};
memtable.flush(&mut sst_builder)?;

let sst = sst_builder.build(
memtable.id,
self.block_cache.clone(),
self.sst_path(memtable.id),
)?;
l0_sstables.insert(0, memtable.id);
sstables.insert(memtable.id, Arc::new(sst));

let sst = sst_builder.build(
memtable.id,
self.block_cache.clone(),
self.sst_path(memtable.id),
)?;
l0_sstables.insert(0, memtable.id);
sstables.insert(memtable.id, Arc::new(sst));
*guard = Arc::new(StorageState {
memtable: guard.memtable.clone(),
frozen_memtables: memtables,
levels: guard.levels.clone(),
l0_sstables,
sstables,
});

*guard = Arc::new(StorageState {
memtable: guard.memtable.clone(),
frozen_memtables: memtables,
levels: guard.levels.clone(),
l0_sstables,
sstables,
});
memtable.id
};

let state_lock = self.state_lock.lock().unwrap();
self.manifest.add_record(&state_lock, Flush(sst_id))?;
Ok(())
}

Expand All @@ -56,7 +62,7 @@ impl Storage {
}

pub fn sst_path(&self, id: usize) -> String {
format!("{}/sst/{}.sst", self.config.db_dir, id)
format!("{}/{}.sst", self.config.db_dir, id)
}

pub fn spawn_flusher(self: &Arc<Self>) -> JoinHandle<()> {
Expand Down
1 change: 1 addition & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod compaction;
pub mod iterators;
pub mod lsm_storage;
mod lsm_util;
mod manifest;
pub mod memtable;

mod block;
Expand Down
Loading
Loading