Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions crates/cold/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,20 @@ impl BlockData {

impl From<ExecutedBlock> for BlockData {
fn from(block: ExecutedBlock) -> Self {
Self::new(
block.header,
block.transactions,
block.receipts,
block.signet_events,
block.zenith_header,
)
// Destructure so adding a new `ExecutedBlock` field is a compile
// error here until the author decides whether it belongs in cold.
// `bundle` and `journal_hash` are hot-only: they live in
// `PlainAccountState`/`PlainStorageState` and `JournalHashes`.
let ExecutedBlock {
header,
transactions,
receipts,
signet_events,
zenith_header,
bundle: _,
journal_hash: _,
} = block;
Self::new(header, transactions, receipts, signet_events, zenith_header)
}
}

Expand Down
22 changes: 22 additions & 0 deletions crates/hot-mdbx/src/db_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ impl FixedSizeInfo {
_ => None,
}
}

/// Canonical mapping from the `(dual_key, fixed_val)` size hints accepted
/// by [`queue_raw_create`] to the [`FixedSizeInfo`] implied by them.
/// Used both when persisting FSI for a newly-created table and when
/// pre-populating the open-time cache with a compile-time fallback for
/// tables that pre-date their on-disk metadata entry.
///
/// [`queue_raw_create`]: signet_hot::model::HotKvWrite::queue_raw_create
pub(crate) const fn from_create_args(
dual_key: Option<usize>,
fixed_val: Option<usize>,
) -> Self {
match (dual_key, fixed_val) {
(Some(key2_size), Some(value_size)) => {
Self::DupFixed { key2_size, total_size: key2_size + value_size }
}
(Some(key2_size), None) => Self::DupSort { key2_size },
(None, _) => Self::None,
}
}
}

impl ValSer for FixedSizeInfo {
Expand Down Expand Up @@ -208,6 +228,7 @@ mod tests {
("TableG", FixedSizeInfo::None),
("TableH", FixedSizeInfo::None),
("TableI", FixedSizeInfo::None),
("TableJ", FixedSizeInfo::None),
];
let cache = FsiCache::new(known);

Expand All @@ -233,6 +254,7 @@ mod tests {
("T7", FixedSizeInfo::None),
("T8", FixedSizeInfo::None),
("T9", FixedSizeInfo::None),
("T10", FixedSizeInfo::None),
];
let cache = FsiCache::new(known);

Expand Down
52 changes: 32 additions & 20 deletions crates/hot-mdbx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,9 @@ mod utils;

use signet_hot::{
model::{HotKv, HotKvError, HotKvWrite},
tables::{
AccountChangeSets, AccountsHistory, Bytecodes, HeaderNumbers, Headers, NUM_TABLES,
PlainAccountState, PlainStorageState, StorageChangeSets, StorageHistory, Table,
},
tables::{NUM_TABLES, STANDARD_TABLES},
};

/// The known table names, used to pre-populate the FSI cache at open time.
const KNOWN_TABLE_NAMES: [&str; NUM_TABLES] = [
Headers::NAME,
HeaderNumbers::NAME,
Bytecodes::NAME,
PlainAccountState::NAME,
PlainStorageState::NAME,
AccountsHistory::NAME,
AccountChangeSets::NAME,
StorageHistory::NAME,
StorageChangeSets::NAME,
];

/// 1 KB in bytes
pub const KILOBYTE: usize = 1024;
/// 1 MB in bytes
Expand Down Expand Up @@ -458,13 +442,41 @@ fn create_tables_and_populate_cache(env: &Environment) -> Result<FsiCache, MdbxE
Ok(FsiCache::new(known))
}

/// Read FSI entries for all known tables from the metadata table.
/// Read FSI entries for all standard tables from the metadata table.
///
/// Iterates [`STANDARD_TABLES`] and reads each table's on-disk FSI. Missing
/// entries fall back to the compile-time-expected FSI inferred from the
/// table's [`StandardTable`] metadata via [`FixedSizeInfo::from_create_args`]
/// (the same mapping used by [`Tx::queue_raw_create`] when persisting FSI
/// for a newly-created table), so opening a database written by an older
/// binary that pre-dates one or more tables succeeds. The next RW open
/// creates any missing table and persists its FSI normally.
///
/// [`StandardTable`]: signet_hot::tables::StandardTable
fn read_known_fsi<K: signet_libmdbx::TransactionKind>(
tx: &Tx<K>,
) -> Result<[(&'static str, FixedSizeInfo); NUM_TABLES], MdbxError> {
let mut known = [("", FixedSizeInfo::None); NUM_TABLES];
for (i, &name) in KNOWN_TABLE_NAMES.iter().enumerate() {
known[i] = (name, tx.read_fsi_from_table(name)?);
for (index, table) in STANDARD_TABLES.iter().enumerate() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could rewrite this loop as a try_for_each() ?

let fsi = match tx.read_fsi_from_table(table.name) {
Ok(fsi) => fsi,
Err(MdbxError::UnknownTable(_)) => {
let expected =
FixedSizeInfo::from_create_args(table.dual_key_size, table.fixed_val_size);
tracing::warn!(
target: "storage::db::mdbx",
table = table.name,
?expected,
"FSI metadata entry missing for known table; falling back to compile-time \
expected value. Fires once per open per missing table until a RW open \
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this info belongs in a comment? 🤔

                     creates the table and persists its FSI; RO-only consumers of a \
                     pre-upgrade database will see this on every open."```

creates the table and persists its FSI; RO-only consumers of a \
pre-upgrade database will see this on every open."
);
expected
}
Err(error) => return Err(error),
};
known[index] = (table.name, fsi);
}
Ok(known)
}
Expand Down
47 changes: 47 additions & 0 deletions crates/hot-mdbx/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod tests {
use signet_hot::{
KeySer, MAX_KEY_SIZE, ValSer,
conformance::{conformance, test_unwind_conformance},
db::{HotDbRead, UnsafeDbWrite},
model::{
DualKeyTraverse, DualTableTraverse, HotKv, HotKvRead, HotKvWrite, TableTraverse,
TableTraverseMut,
Expand Down Expand Up @@ -1968,6 +1969,52 @@ mod tests {
}
}

/// Regression test: opening a database written by an older binary that
/// pre-dates a known table must succeed in both RO and RW modes, and a
/// subsequent RW open must re-create the missing table and persist its
/// FSI normally.
#[test]
#[serial]
fn open_tolerates_pre_upgrade_db() {
let dir = tempdir().unwrap();

// Phase 1: open RW (auto-creates all tables and FSIs), then forget
// JournalHashes entirely to mimic a pre-upgrade DB.
{
let args = DatabaseArguments::new();
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap();
let writer: Tx<Rw> = db.tx_rw().unwrap();
// SAFETY: no Cursor or Database references to JournalHashes
// exist in this scope.
unsafe {
writer.forget_table(tables::JournalHashes::NAME).unwrap();
}
writer.raw_commit().unwrap();
}

// Phase 2: reopening RO must succeed despite the missing table.
{
let args = DatabaseArguments::new();
DatabaseEnv::open(dir.path(), DatabaseEnvKind::RO, args)
.expect("RO open must tolerate a pre-upgrade DB");
}

// Phase 3: the next RW open must re-create the table and FSI, and
// subsequent reads and writes against it must work normally.
{
let args = DatabaseArguments::new();
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args)
.expect("RW open must tolerate a pre-upgrade DB");
let hash = B256::repeat_byte(0x42);
let writer: Tx<Rw> = db.tx_rw().unwrap();
writer.put_journal_hash(7, &hash).unwrap();
writer.raw_commit().unwrap();

let reader: Tx<Ro> = db.reader().unwrap();
assert_eq!(reader.get_journal_hash(7).unwrap(), Some(hash));
}
}

/// Regression test: FSI must survive a database close/reopen cycle.
///
/// This exercises the `store_fsi` / `read_fsi_from_table` path with an empty cache.
Expand Down
42 changes: 28 additions & 14 deletions crates/hot-mdbx/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,28 @@ impl<K: TransactionKind + WriteMarker> Tx<K> {

Ok(())
}

/// Removes a table from the environment as if it had never been created:
/// drops the named sub-database and erases its FSI metadata entry. Used
/// by tests to simulate a database written by an older binary that
/// pre-dates the table. Does not invalidate the in-memory `FsiCache`;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still control all significant running copies of the binaries, so we could choose not to implement migration logic/tests here. worth discussing whether we need this complexity yet

/// callers must reopen the parent `DatabaseEnv` after commit.
///
/// # Safety
///
/// Caller must ensure no [`Cursor`] or other references to `table`
/// exist for the lifetime of this transaction. See
/// [`signet_libmdbx::tx::Tx::drop_db`].
#[cfg(test)]
pub(crate) unsafe fn forget_table(&self, table: &'static str) -> Result<(), MdbxError> {
let metadata = self.inner.open_db(None)?;
self.inner
.del(metadata, fsi_name_to_key(table).as_slice(), None)
.map_err(MdbxError::Mdbx)?;
let db = self.inner.open_db(Some(table))?;
// SAFETY: forwarded from this function's safety contract.
unsafe { self.inner.drop_db(db) }.map_err(MdbxError::Mdbx)
}
}

fn fsi_name_to_key(name: &'static str) -> B256 {
Expand Down Expand Up @@ -288,22 +310,14 @@ macro_rules! impl_hot_kv_write {
dual_key: Option<usize>,
fixed_val: Option<usize>,
) -> Result<(), Self::Error> {
let mut flags = signet_libmdbx::DatabaseFlags::default();
let fsi = FixedSizeInfo::from_create_args(dual_key, fixed_val);

let mut fsi = FixedSizeInfo::None;

if let Some(key2_size) = dual_key {
let mut flags = signet_libmdbx::DatabaseFlags::default();
if fsi.is_dupsort() {
flags.set(signet_libmdbx::DatabaseFlags::DUP_SORT, true);
if let Some(value_size) = fixed_val {
flags.set(signet_libmdbx::DatabaseFlags::DUP_FIXED, true);
fsi = FixedSizeInfo::DupFixed {
key2_size,
total_size: key2_size + value_size,
};
} else {
// DUPSORT without DUP_FIXED - variable value size
fsi = FixedSizeInfo::DupSort { key2_size };
}
}
if fsi.is_dup_fixed() {
flags.set(signet_libmdbx::DatabaseFlags::DUP_FIXED, true);
}

self.inner.create_db(Some(table), flags)?;
Expand Down
1 change: 1 addition & 0 deletions crates/hot/src/conformance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub fn conformance<T: HotKv>(hot_kv: &T) {
test_storage_history(hot_kv);
test_account_changes(hot_kv);
test_storage_changes(hot_kv);
test_journal_hash_roundtrip(hot_kv);
test_missing_reads(hot_kv);
test_cursor_iter_k2(hot_kv);
test_cursor_iter_k2_single(hot_kv);
Expand Down
39 changes: 39 additions & 0 deletions crates/hot/src/conformance/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,42 @@ pub fn test_storage_changes<T: HotKv>(hot_kv: &T) {
}
}

/// Test writing, reading, and overwriting journal hashes.
pub fn test_journal_hash_roundtrip<T: HotKv>(hot_kv: &T) {
let hash_a = b256!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
let hash_b = b256!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
let hash_c = b256!("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc");

// Write hashes at two block numbers.
{
let writer = hot_kv.writer().unwrap();
writer.put_journal_hash(7, &hash_a).unwrap();
writer.put_journal_hash(8, &hash_b).unwrap();
writer.commit().unwrap();
}

// Read back.
{
let reader = hot_kv.reader().unwrap();
assert_eq!(reader.get_journal_hash(7).unwrap(), Some(hash_a));
assert_eq!(reader.get_journal_hash(8).unwrap(), Some(hash_b));
assert_eq!(reader.get_journal_hash(9).unwrap(), None);
}

// Overwrite block 7 - producers retrying a block must be able to replace
// the previous entry.
{
let writer = hot_kv.writer().unwrap();
writer.put_journal_hash(7, &hash_c).unwrap();
writer.commit().unwrap();
}
{
let reader = hot_kv.reader().unwrap();
assert_eq!(reader.get_journal_hash(7).unwrap(), Some(hash_c));
assert_eq!(reader.get_journal_hash(8).unwrap(), Some(hash_b));
}
}

/// Test that missing reads return None
pub fn test_missing_reads<T: HotKv>(hot_kv: &T) {
let missing_addr = address!("0x9999999999999999999999999999999999999999");
Expand Down Expand Up @@ -288,4 +324,7 @@ pub fn test_missing_reads<T: HotKv>(hot_kv: &T) {

// Missing storage change
assert!(reader.get_storage_change(999999, &missing_addr, &missing_slot).unwrap().is_none());

// Missing journal hash
assert!(reader.get_journal_hash(999999).unwrap().is_none());
}
25 changes: 23 additions & 2 deletions crates/hot/src/conformance/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub fn make_account_info(nonce: u64, balance: U256, code_hash: Option<B256>) ->
/// - Headers and header number mappings
/// - Account and storage change sets
/// - Account and storage history indices
/// - Journal hashes
pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
// Test addresses
let addr1 = address!("0x1111111111111111111111111111111111111111");
Expand Down Expand Up @@ -412,10 +413,21 @@ pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
blocks.push((sealed, bundle));
}

// Store A: Append all 5 blocks, then unwind to block 1
// Journal hashes, one per block; verifies unwind_above also drops these.
let journal_hashes: Vec<B256> = (0..blocks.len())
.map(|index| {
let byte = u8::try_from(index + 1).expect("test block count fits in u8");
B256::from([byte; 32])
})
.collect();

// Store A: Append all 5 blocks plus journal hashes, then unwind to block 1
{
let writer = store_a.writer().unwrap();
writer.append_blocks(blocks.iter().map(|(h, b)| (h, b))).unwrap();
for ((header, _), hash) in blocks.iter().zip(&journal_hashes) {
writer.put_journal_hash(header.number, hash).unwrap();
}
writer.commit().unwrap();
}
{
Expand All @@ -424,10 +436,13 @@ pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
writer.commit().unwrap();
}

// Store B: Append only blocks 0, 1
// Store B: Append only blocks 0, 1 plus their journal hashes
{
let writer = store_b.writer().unwrap();
writer.append_blocks(blocks[0..2].iter().map(|(h, b)| (h, b))).unwrap();
for ((header, _), hash) in blocks[0..2].iter().zip(&journal_hashes[0..2]) {
writer.put_journal_hash(header.number, hash).unwrap();
}
writer.commit().unwrap();
}

Expand All @@ -454,6 +469,12 @@ pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
collect_single_table::<tables::PlainAccountState, _>(&reader_b),
);

assert_single_tables_equal::<tables::JournalHashes>(
"JournalHashes",
collect_single_table::<tables::JournalHashes, _>(&reader_a),
collect_single_table::<tables::JournalHashes, _>(&reader_b),
);

// Note: Bytecodes are not removed on unwind (they're content-addressed),
// so store_a may have more bytecodes than store_b. We skip this comparison.
// assert_single_tables_equal::<tables::Bytecodes>(...)
Expand Down
Loading