diff --git a/CHANGELOG.md b/CHANGELOG.md index 69ee0e6d0ac..86961f18c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Perf +### 2026-04-29 + +- Use fixed-width RECEIPTS keys and cursor iteration with early stop for eth_getTransactionReceipt; fix prefix iterator scan in get_transaction_location [#6548](https://github.com/lambdaclass/ethrex/pull/6548) + ### 2026-04-27 - Reduce peak disk usage during snap sync by moving SST files into the temp DB instead of copying [#6532](https://github.com/lambdaclass/ethrex/pull/6532) diff --git a/Cargo.lock b/Cargo.lock index d21e761724f..7e724116588 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4397,6 +4397,7 @@ dependencies = [ "ethrex-rlp", "ethrex-trie", "fastbloom", + "libc", "lru 0.16.4", "rayon", "rocksdb", @@ -4407,6 +4408,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "tracing-subscriber 0.3.23", ] [[package]] diff --git a/crates/l2/tee/quote-gen/Cargo.lock b/crates/l2/tee/quote-gen/Cargo.lock index 8434d3a8d27..46057bd5095 100644 --- a/crates/l2/tee/quote-gen/Cargo.lock +++ b/crates/l2/tee/quote-gen/Cargo.lock @@ -1272,6 +1272,7 @@ dependencies = [ "ethrex-rlp", "ethrex-trie", "fastbloom", + "libc", "lru", "rayon", "rustc-hash", @@ -1280,6 +1281,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 2a39c74023f..c25e32e1221 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -1242,7 +1242,7 @@ async fn handle_incoming_message( let start_index = if i == 0 { first_block_receipt_index } else { 0 }; let block_receipts = state .storage - .get_receipts_for_block_from_index(hash, start_index) + .get_receipts_for_block_from_index(hash, start_index, None) .await?; let mut block_receipt_list = Vec::new(); diff --git a/crates/networking/rpc/eth/block.rs b/crates/networking/rpc/eth/block.rs index ec3bba13d46..af357f1a568 100644 --- a/crates/networking/rpc/eth/block.rs +++ b/crates/networking/rpc/eth/block.rs @@ -12,7 +12,7 @@ use crate::{ utils::RpcErr, }; use ethrex_common::types::{ - Block, BlockBody, BlockHash, BlockHeader, BlockNumber, Receipt, calculate_base_fee_per_blob_gas, + Block, BlockBody, BlockHash, BlockHeader, Receipt, calculate_base_fee_per_blob_gas, }; use ethrex_storage::Store; @@ -177,7 +177,7 @@ impl RpcHandler for GetBlockReceiptsRequest { // Block not found _ => return Ok(Value::Null), }; - let receipts = get_all_block_rpc_receipts(block_number, header, body, storage).await?; + let receipts = get_all_block_rpc_receipts(header, body, storage, None).await?; serde_json::to_value(&receipts).map_err(|error| RpcErr::Internal(error.to_string())) } @@ -270,11 +270,11 @@ impl RpcHandler for GetRawReceipts { }; let header = storage.get_block_header(block_number)?; let body = storage.get_block_body(block_number).await?; - let (header, body) = match (header, body) { + let (header, _body) = match (header, body) { (Some(header), Some(body)) => (header, body), _ => return Ok(Value::Null), }; - let receipts: Vec = get_all_block_receipts(block_number, header, body, storage) + let receipts: Vec = get_all_block_receipts(header, storage) .await? .iter() .map(|receipt| { @@ -329,11 +329,18 @@ impl RpcHandler for GetBlobBaseFee { } } +/// Fetches RPC receipts for a block, optionally stopping after `target_index`. +/// +/// When `target_index` is `Some(n)`, only receipts 0..=n are fetched using a +/// cursor pass — this is the fast path for `eth_getTransactionReceipt` which +/// only needs one receipt but requires preceding cumulative gas values. +/// +/// When `target_index` is `None`, all receipts are fetched (for `eth_getBlockReceipts`). pub async fn get_all_block_rpc_receipts( - block_number: BlockNumber, header: BlockHeader, body: BlockBody, storage: &Store, + target_index: Option, ) -> Result, RpcErr> { let mut receipts = Vec::new(); // Check if this is the genesis block @@ -353,16 +360,32 @@ pub async fn get_all_block_rpc_receipts( .try_into() .map_err(|_| RpcErr::Internal("blob_base_fee does not fit in u64".to_owned()))?; // Fetch receipt info from block + let block_hash = header.hash(); let block_info = RpcReceiptBlockInfo::from_block_header(header); - // Fetch receipt for each tx in the block and add block and tx info + // Fetch receipts: only up to target_index+1 when set, otherwise all + let fetch_count = target_index + .map(|ti| (ti + 1) as usize) + .unwrap_or(body.transactions.len()); + let all_receipts = storage + .get_receipts_for_block_from_index(&block_hash, 0, Some(fetch_count)) + .await?; + if all_receipts.len() != fetch_count { + return Err(RpcErr::Internal(format!( + "Expected {} receipts, got {}", + fetch_count, + all_receipts.len() + ))); + } let mut last_cumulative_gas_used = 0; let mut current_log_index = 0; - for (index, tx) in body.transactions.iter().enumerate() { + for (index, (tx, receipt)) in body + .transactions + .iter() + .zip(all_receipts.iter()) + .enumerate() + .take(fetch_count) + { let index = index as u64; - let receipt = match storage.get_receipt(block_number, index).await? { - Some(receipt) => receipt, - _ => return Err(RpcErr::Internal("Could not get receipt".to_owned())), - }; let gas_used = receipt.cumulative_gas_used - last_cumulative_gas_used; let tx_info = RpcReceiptTxInfo::from_transaction( tx.clone(), @@ -385,23 +408,13 @@ pub async fn get_all_block_rpc_receipts( } pub async fn get_all_block_receipts( - block_number: BlockNumber, header: BlockHeader, - body: BlockBody, storage: &Store, ) -> Result, RpcErr> { - let mut receipts = Vec::new(); // Check if this is the genesis block if header.parent_hash.is_zero() { - return Ok(receipts); + return Ok(Vec::new()); } - for (index, _) in body.transactions.iter().enumerate() { - let index = index as u64; - let receipt = match storage.get_receipt(block_number, index).await? { - Some(receipt) => receipt, - _ => return Err(RpcErr::Internal("Could not get receipt".to_owned())), - }; - receipts.push(receipt); - } - Ok(receipts) + let block_hash = header.hash(); + Ok(storage.get_receipts_for_block(&block_hash).await?) } diff --git a/crates/networking/rpc/eth/transaction.rs b/crates/networking/rpc/eth/transaction.rs index b658ecf43d5..5a7085a9ade 100644 --- a/crates/networking/rpc/eth/transaction.rs +++ b/crates/networking/rpc/eth/transaction.rs @@ -292,7 +292,7 @@ impl RpcHandler for GetTransactionReceiptRequest { "Requested receipt for transaction {:#x}", self.transaction_hash, ); - let (block_number, block_hash, index) = match storage + let (_block_number, block_hash, index) = match storage .get_transaction_location(self.transaction_hash) .await? { @@ -304,11 +304,10 @@ impl RpcHandler for GetTransactionReceiptRequest { None => return Ok(Value::Null), }; let receipts = - block::get_all_block_rpc_receipts(block_number, block.header, block.body, storage) + block::get_all_block_rpc_receipts(block.header, block.body, storage, Some(index)) .await?; - serde_json::to_value(receipts.get(index as usize)) - .map_err(|error| RpcErr::Internal(error.to_string())) + serde_json::to_value(receipts.last()).map_err(|error| RpcErr::Internal(error.to_string())) } } diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 1fb56f7de63..1ca447d650a 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -26,6 +26,8 @@ tokio = { workspace = true, features = ["rt", "sync"] } fastbloom = "0.14" rayon.workspace = true lru.workspace = true +libc = "0.2" +tracing-subscriber = { workspace = true } [features] default = [] @@ -35,9 +37,20 @@ rocksdb = ["dep:rocksdb"] tempfile.workspace = true tokio = { workspace = true, features = ["full"] } + [lib] path = "./lib.rs" +[[bin]] +name = "seed_migration_test" +path = "seed_migration_test.rs" +required-features = ["rocksdb"] + +[[bin]] +name = "bench_migration" +path = "bench_migration.rs" +required-features = ["rocksdb"] + [lints.clippy] unwrap_used = "deny" redundant_clone = "warn" diff --git a/crates/storage/api/tables.rs b/crates/storage/api/tables.rs index fa59620b186..49578b7127c 100644 --- a/crates/storage/api/tables.rs +++ b/crates/storage/api/tables.rs @@ -30,11 +30,16 @@ pub const ACCOUNT_CODES: &str = "account_codes"; /// - [`u8; 8`] = `code_length.to_be_bytes()` pub const ACCOUNT_CODE_METADATA: &str = "account_code_metadata"; -/// Receipts column family: [`Vec`] => [`Vec`] -/// - [`Vec`] = `(block_hash, index).encode_to_vec()` -/// - [`Vec`] = `receipt.encode_to_vec()` +/// Receipts column family (legacy, pre-v2): [`Vec`] => [`Vec`] +/// Kept only for migration reads; dropped automatically on next startup. pub const RECEIPTS: &str = "receipts"; +/// Receipts v2 column family: [`Vec`] => [`Vec`] +/// - Key: `block_hash (32B) || index (8B big-endian u64)` — fixed-width raw key +/// enabling cursor-based prefix iteration by block hash. +/// - Value: `receipt.encode_to_vec()` +pub const RECEIPTS_V2: &str = "receipts_v2"; + /// Transaction locations column family: [`Vec`] => [`Vec`] /// - [`Vec`] = Composite key /// ```rust,no_run @@ -102,7 +107,7 @@ pub const MISC_VALUES: &str = "misc_values"; /// - [`Vec`] = `serde_json::to_vec(&witness)` pub const EXECUTION_WITNESSES: &str = "execution_witnesses"; -pub const TABLES: [&str; 19] = [ +pub const TABLES: [&str; 20] = [ CHAIN_DATA, ACCOUNT_CODES, ACCOUNT_CODE_METADATA, @@ -113,6 +118,7 @@ pub const TABLES: [&str; 19] = [ PENDING_BLOCKS, TRANSACTION_LOCATIONS, RECEIPTS, + RECEIPTS_V2, SNAP_STATE, INVALID_CHAINS, ACCOUNT_TRIE_NODES, diff --git a/crates/storage/backend/rocksdb.rs b/crates/storage/backend/rocksdb.rs index 1672fffb07d..bac59d5154a 100644 --- a/crates/storage/backend/rocksdb.rs +++ b/crates/storage/backend/rocksdb.rs @@ -1,6 +1,6 @@ use crate::api::tables::{ ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, BLOCK_NUMBERS, BODIES, - CANONICAL_BLOCK_HASHES, FULLSYNC_HEADERS, HEADERS, RECEIPTS, STORAGE_FLATKEYVALUE, + CANONICAL_BLOCK_HASHES, FULLSYNC_HEADERS, HEADERS, RECEIPTS_V2, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES, TRANSACTION_LOCATIONS, }; use crate::api::{ @@ -68,7 +68,7 @@ impl RocksDBBackend { BLOCK_NUMBERS, HEADERS, BODIES, - RECEIPTS, + RECEIPTS_V2, TRANSACTION_LOCATIONS, FULLSYNC_HEADERS, ]; @@ -165,7 +165,7 @@ impl RocksDBBackend { block_opts.set_block_cache(&block_cache); cf_opts.set_block_based_table_factory(&block_opts); } - RECEIPTS => { + RECEIPTS_V2 => { cf_opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB cf_opts.set_max_write_buffer_number(3); cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB diff --git a/crates/storage/bench_migration.rs b/crates/storage/bench_migration.rs new file mode 100644 index 00000000000..7c3d2d592a0 --- /dev/null +++ b/crates/storage/bench_migration.rs @@ -0,0 +1,100 @@ +/// Standalone binary to benchmark the v1→v2 RECEIPTS migration. +/// +/// Usage: bench_migration +/// +/// Opens the RocksDB database, runs the two-CF migration (receipts → receipts_v2), +/// and reports wall-clock time and peak RSS. +/// +/// Prerequisites: +/// 1. Run `seed_migration_test ` to seed 150M old-format entries +/// 2. Ensure metadata.json has {"schema_version": 1} (or just don't create one) +/// 3. Run this binary +use std::time::Instant; + +fn get_rss_mb() -> Option { + #[cfg(target_os = "macos")] + { + use std::mem; + let mut info: libc::rusage = unsafe { mem::zeroed() }; + let ret = unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut info) }; + if ret == 0 { + // macOS reports maxrss in bytes + Some(info.ru_maxrss as f64 / (1024.0 * 1024.0)) + } else { + None + } + } + #[cfg(target_os = "linux")] + { + use std::mem; + let mut info: libc::rusage = unsafe { mem::zeroed() }; + let ret = unsafe { libc::getrusage(libc::RUSAGE_SELF, &mut info) }; + if ret == 0 { + // Linux reports maxrss in kilobytes + Some(info.ru_maxrss as f64 / 1024.0) + } else { + None + } + } + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + { + None + } +} + +fn main() { + // Initialize tracing so migration progress logs are visible + tracing_subscriber::fmt() + .with_target(false) + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let db_path = &args[1]; + + println!("Opening database at: {db_path}"); + let rss_before = get_rss_mb(); + + let backend = ethrex_storage::backend::rocksdb::RocksDBBackend::open(db_path) + .expect("Failed to open RocksDB"); + + let rss_after_open = get_rss_mb(); + println!( + "Database opened. RSS after open: {:.1} MB", + rss_after_open.unwrap_or(0.0) + ); + + // Run the migration + println!("Starting migration v1→v2 (two-CF: receipts → receipts_v2)..."); + let start = Instant::now(); + + ethrex_storage::migrations::run_pending_migrations( + &backend, + std::path::Path::new(db_path), + 1, // pretend we're at v1 + ) + .expect("Migration failed"); + + let elapsed = start.elapsed(); + let rss_after = get_rss_mb(); + + println!("\n=== Migration Benchmark Results ==="); + println!("Wall-clock time: {:.1}s", elapsed.as_secs_f64()); + if let Some(before) = rss_before { + println!("RSS before open: {:.1} MB", before); + } + if let Some(after_open) = rss_after_open { + println!("RSS after open: {:.1} MB", after_open); + } + if let Some(after) = rss_after { + println!("Peak RSS (maxrss): {:.1} MB", after); + } + println!("==================================="); +} diff --git a/crates/storage/lib.rs b/crates/storage/lib.rs index 876911539e5..18db85a4133 100644 --- a/crates/storage/lib.rs +++ b/crates/storage/lib.rs @@ -85,7 +85,7 @@ pub use store::{ /// When bumping this version, add a corresponding migration function to /// `migrations::MIGRATIONS`. The migration framework will automatically /// upgrade existing databases instead of requiring a full resync. -pub const STORE_SCHEMA_VERSION: u64 = 1; +pub const STORE_SCHEMA_VERSION: u64 = 2; /// Name of the file storing the metadata about the database. /// diff --git a/crates/storage/migrations.rs b/crates/storage/migrations.rs index 2f224601472..091ee4da9ed 100644 --- a/crates/storage/migrations.rs +++ b/crates/storage/migrations.rs @@ -2,9 +2,14 @@ use std::io::Write; use std::path::Path; use crate::api::StorageBackend; +use crate::api::tables::{RECEIPTS, RECEIPTS_V2}; use crate::error::StoreError; +use crate::store::receipt_key; use crate::{STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION}; +use ethrex_common::H256; +use ethrex_rlp::decode::RLPDecode; + use super::store::StoreMetadata; /// A migration function that upgrades the database schema by one version. @@ -22,10 +27,7 @@ pub type MigrationFn = fn(backend: &dyn StorageBackend) -> Result<(), StoreError /// /// **Invariant**: `MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize` /// (empty when `STORE_SCHEMA_VERSION == 1`, one entry when it's 2, etc.) -pub const MIGRATIONS: &[MigrationFn] = &[ - // Currently empty — no migrations exist yet. - // When STORE_SCHEMA_VERSION is bumped to 2, add migrate_1_to_2 here. -]; +pub const MIGRATIONS: &[MigrationFn] = &[migrate_1_to_2]; // Compile-time check: the number of migration functions must match the number // of version gaps (i.e. STORE_SCHEMA_VERSION - 1). @@ -93,6 +95,69 @@ fn write_metadata_version(db_path: &Path, version: u64) -> Result<(), StoreError Ok(()) } +/// Migrates the RECEIPTS table from RLP-encoded `(BlockHash, u64)` keys +/// to raw `block_hash (32B) || index (8B big-endian u64)` keys in a new +/// `receipts_v2` column family. +/// +/// This two-CF approach copies entries from the old `receipts` CF to +/// `receipts_v2` with the new key format. The old `receipts` CF is **not** +/// deleted here — it will be dropped automatically by the auto-cleanup in +/// `RocksDBBackend::open()` on the next startup (since `RECEIPTS` is no +/// longer listed in `TABLES`). +/// +/// Crash safety: if interrupted, metadata still says v1, so the migration +/// restarts from scratch on next boot. Duplicate puts to `receipts_v2` are +/// idempotent. +fn migrate_1_to_2(backend: &dyn StorageBackend) -> Result<(), StoreError> { + const BATCH_SIZE: usize = 10_000; + + let txn = backend.begin_read()?; + let iter = txn.prefix_iterator(RECEIPTS, &[])?; + + let mut batch: Vec<(Vec, Vec)> = Vec::with_capacity(BATCH_SIZE); + let mut migrated: u64 = 0; + + for result in iter { + let (key, value) = result?; + + let (block_hash, index) = match <(H256, u64)>::decode(&key) { + Ok(decoded) => decoded, + Err(_) => { + tracing::warn!( + "Skipping RECEIPTS key that failed RLP decode (len={})", + key.len() + ); + continue; + } + }; + + let new_key = receipt_key(&block_hash, index); + batch.push((new_key, value.to_vec())); + + if batch.len() >= BATCH_SIZE { + let count = batch.len() as u64; + let mut tx = backend.begin_write()?; + tx.put_batch(RECEIPTS_V2, std::mem::take(&mut batch))?; + tx.commit()?; + migrated += count; + tracing::info!("Migration v1→v2: migrated {migrated} RECEIPTS entries so far"); + } + } + + // Flush remaining entries. + if !batch.is_empty() { + let count = batch.len() as u64; + let mut tx = backend.begin_write()?; + tx.put_batch(RECEIPTS_V2, batch)?; + tx.commit()?; + migrated += count; + tracing::info!("Migration v1→v2: migrated {migrated} RECEIPTS entries so far"); + } + + tracing::info!("Migration v1→v2 complete: migrated {migrated} RECEIPTS entries total"); + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -131,4 +196,63 @@ mod tests { let metadata: StoreMetadata = serde_json::from_str(&contents).unwrap(); assert_eq!(metadata.schema_version, STORE_SCHEMA_VERSION); } + + #[test] + fn migrate_1_to_2_converts_rlp_keys_to_fixed_width() { + use crate::api::StorageBackend; + use ethrex_common::types::{Receipt, TxType}; + use ethrex_rlp::encode::RLPEncode; + + let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap(); + + let block_hash = H256::random(); + let receipts: Vec = (0..5) + .map(|i| Receipt::new(TxType::Legacy, true, (i + 1) * 21000, vec![])) + .collect(); + + // Seed old-format RLP keys: (BlockHash, u64).encode_to_vec() + { + let mut tx = backend.begin_write().unwrap(); + let batch: Vec<(Vec, Vec)> = receipts + .iter() + .enumerate() + .map(|(i, r)| { + let old_key = (block_hash, i as u64).encode_to_vec(); + let value = r.encode_to_vec(); + (old_key, value) + }) + .collect(); + tx.put_batch(RECEIPTS, batch).unwrap(); + tx.commit().unwrap(); + } + + // Verify old keys exist + { + let txn = backend.begin_read().unwrap(); + let old_key = (block_hash, 0u64).encode_to_vec(); + assert!(txn.get(RECEIPTS, &old_key).unwrap().is_some()); + } + + // Run migration + migrate_1_to_2(&backend).unwrap(); + + // Verify new fixed-width keys exist in RECEIPTS_V2 + let txn = backend.begin_read().unwrap(); + for i in 0..5u64 { + let new_key = receipt_key(&block_hash, i); + let value = txn + .get(RECEIPTS_V2, &new_key) + .unwrap() + .expect("new key should exist in RECEIPTS_V2 after migration"); + let decoded = Receipt::decode(value.as_ref()).unwrap(); + assert_eq!(decoded, receipts[i as usize]); + + // Old keys should still be in RECEIPTS (CF drop happens at startup, not during migration) + let old_key = (block_hash, i).encode_to_vec(); + assert!( + txn.get(RECEIPTS, &old_key).unwrap().is_some(), + "old key should still exist in RECEIPTS (dropped at startup)" + ); + } + } } diff --git a/crates/storage/seed_migration_test.rs b/crates/storage/seed_migration_test.rs new file mode 100644 index 00000000000..2a867b4fb63 --- /dev/null +++ b/crates/storage/seed_migration_test.rs @@ -0,0 +1,286 @@ +/// Standalone binary to seed ~150M old-format (RLP-encoded) RECEIPTS keys +/// into an existing RocksDB database for migration benchmarking. +/// +/// Usage: seed_migration_test +/// +/// This opens the database, writes 150M entries with RLP-encoded (H256, u64) +/// keys and small synthetic receipt values into the "receipts" column family, +/// then exits. After running, reset metadata.json to {"schema_version": 1} +/// and start ethrex to trigger the migration. +use rocksdb::{ + BlockBasedOptions, Cache, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, + WriteBatch, +}; +use std::time::Instant; + +/// All column families that ethrex expects. We must open them all even though +/// we only write to "receipts", otherwise RocksDB will refuse to open. +const ALL_CFS: &[&str] = &[ + "default", + "chain_data", + "account_codes", + "account_code_metadata", + "bodies", + "block_numbers", + "canonical_block_hashes", + "headers", + "pending_blocks", + "transaction_locations", + "receipts", + "receipts_v2", + "snap_state", + "invalid_ancestors", + "account_trie_nodes", + "storage_trie_nodes", + "fullsync_headers", + "account_flatkeyvalue", + "storage_flatkeyvalue", + "misc_values", + "execution_witnesses", +]; + +const COMPRESSIBLE: &[&str] = &[ + "block_numbers", + "headers", + "bodies", + "receipts", + "receipts_v2", + "transaction_locations", + "fullsync_headers", +]; + +/// RLP-encode a (H256, u64) tuple the same way ethrex_rlp does. +/// Layout: RLP list header + 32-byte hash (with RLP string header) + u64 (with RLP string header) +fn rlp_encode_receipt_key(block_hash: &[u8; 32], index: u64) -> Vec { + // RLP-encode the H256 (32 bytes): 0xa0 prefix + 32 bytes = 33 bytes + // RLP-encode the u64: variable length + // Then wrap in a list + + let mut hash_encoded = Vec::with_capacity(33); + hash_encoded.push(0x80 + 32); // string header for 32 bytes + hash_encoded.extend_from_slice(block_hash); + + let idx_encoded = rlp_encode_u64(index); + + let payload_len = hash_encoded.len() + idx_encoded.len(); + let mut out = Vec::with_capacity(payload_len + 3); + + // List header + if payload_len < 56 { + out.push(0xc0 + payload_len as u8); + } else { + let len_bytes = minimal_be_bytes(payload_len as u64); + out.push(0xf7 + len_bytes.len() as u8); + out.extend_from_slice(&len_bytes); + } + out.extend_from_slice(&hash_encoded); + out.extend_from_slice(&idx_encoded); + out +} + +fn rlp_encode_u64(val: u64) -> Vec { + if val == 0 { + return vec![0x80]; // empty string = 0 + } + if val < 128 { + return vec![val as u8]; // single byte + } + let bytes = minimal_be_bytes(val); + let mut out = Vec::with_capacity(1 + bytes.len()); + out.push(0x80 + bytes.len() as u8); + out.extend_from_slice(&bytes); + out +} + +fn minimal_be_bytes(val: u64) -> Vec { + let bytes = val.to_be_bytes(); + let start = bytes.iter().position(|&b| b != 0).unwrap_or(7); + bytes[start..].to_vec() +} + +/// Create a minimal synthetic receipt value (RLP-encoded). +/// Receipt: [tx_type(0), succeeded(true), cumulative_gas(21000), bloom(256 zeros), logs(empty)] +fn synthetic_receipt_value() -> Vec { + // A minimal Legacy receipt: RLP([1, cumgas, bloom, []]) + // succeeded = 0x01 (single byte) + // cumulative_gas_used = 21000 = 0x5208 + // bloom = 256 zero bytes + // logs = empty list + + let succeeded = vec![0x01]; // RLP single byte + let cumgas = vec![0x82, 0x52, 0x08]; // RLP string: 2 bytes, 0x5208 + // bloom: 256 zero bytes -> string header 0xb9 0x01 0x00 + 256 zeros + let mut bloom = Vec::with_capacity(259); + bloom.push(0xb9); + bloom.push(0x01); + bloom.push(0x00); + bloom.extend_from_slice(&[0u8; 256]); + let logs = vec![0xc0]; // empty list + + let payload_len = succeeded.len() + cumgas.len() + bloom.len() + logs.len(); + let mut out = Vec::with_capacity(payload_len + 4); + + // List header for the receipt + if payload_len < 56 { + out.push(0xc0 + payload_len as u8); + } else { + let len_bytes = minimal_be_bytes(payload_len as u64); + out.push(0xf7 + len_bytes.len() as u8); + out.extend_from_slice(&len_bytes); + } + out.extend_from_slice(&succeeded); + out.extend_from_slice(&cumgas); + out.extend_from_slice(&bloom); + out.extend_from_slice(&logs); + out +} + +fn main() { + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let db_path = &args[1]; + + println!("Opening database at: {db_path}"); + + // DB options matching ethrex's RocksDBBackend::open() + let mut opts = Options::default(); + opts.create_if_missing(true); // Create DB if it doesn't exist + opts.create_missing_column_families(true); + opts.set_max_open_files(512); + opts.set_max_file_opening_threads(16); + opts.set_max_background_jobs(8); + opts.set_level_zero_file_num_compaction_trigger(2); + opts.set_level_zero_slowdown_writes_trigger(10); + opts.set_level_zero_stop_writes_trigger(16); + opts.set_target_file_size_base(512 * 1024 * 1024); + opts.set_max_bytes_for_level_base(2 * 1024 * 1024 * 1024); + opts.set_max_bytes_for_level_multiplier(10.0); + opts.set_level_compaction_dynamic_level_bytes(true); + opts.set_db_write_buffer_size(1024 * 1024 * 1024); + opts.set_write_buffer_size(128 * 1024 * 1024); + opts.set_max_write_buffer_number(4); + opts.set_min_write_buffer_number_to_merge(2); + opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::PointInTime); + opts.set_max_total_wal_size(2 * 1024 * 1024 * 1024); + opts.set_wal_bytes_per_sync(32 * 1024 * 1024); + opts.set_bytes_per_sync(32 * 1024 * 1024); + opts.set_use_fsync(false); + opts.set_enable_pipelined_write(true); + opts.set_allow_concurrent_memtable_write(true); + opts.set_enable_write_thread_adaptive_yield(true); + opts.set_compaction_readahead_size(4 * 1024 * 1024); + opts.set_advise_random_on_open(false); + opts.set_compression_type(rocksdb::DBCompressionType::None); + + let block_cache = Cache::new_lru_cache(2 * 1024 * 1024 * 1024); // 2GB for seeding + + // Discover existing CFs so we don't miss any + let existing_cfs = DBWithThreadMode::::list_cf(&opts, db_path) + .unwrap_or_else(|_| vec!["default".to_string()]); + + let mut all_cfs: Vec = existing_cfs; + for cf in ALL_CFS { + let s = cf.to_string(); + if !all_cfs.contains(&s) { + all_cfs.push(s); + } + } + + let cf_descriptors: Vec = all_cfs + .iter() + .map(|cf_name| { + let mut cf_opts = Options::default(); + cf_opts.set_level_zero_file_num_compaction_trigger(4); + cf_opts.set_level_zero_slowdown_writes_trigger(20); + cf_opts.set_level_zero_stop_writes_trigger(36); + + if COMPRESSIBLE.contains(&cf_name.as_str()) { + cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + } else { + cf_opts.set_compression_type(rocksdb::DBCompressionType::None); + } + + cf_opts.set_write_buffer_size(128 * 1024 * 1024); + cf_opts.set_max_write_buffer_number(3); + cf_opts.set_target_file_size_base(256 * 1024 * 1024); + + let mut block_opts = BlockBasedOptions::default(); + block_opts.set_block_size(32 * 1024); + block_opts.set_block_cache(&block_cache); + cf_opts.set_block_based_table_factory(&block_opts); + + ColumnFamilyDescriptor::new(cf_name.clone(), cf_opts) + }) + .collect(); + + let db = DBWithThreadMode::::open_cf_descriptors(&opts, db_path, cf_descriptors) + .expect("Failed to open database"); + + let cf = db + .cf_handle("receipts") + .expect("receipts column family not found"); + + let receipt_value = synthetic_receipt_value(); + println!("Synthetic receipt value: {} bytes", receipt_value.len()); + + const TOTAL: u64 = 150_000_000; + const BATCH_SIZE: u64 = 50_000; + const RECEIPTS_PER_BLOCK: u64 = 256; + + let start = Instant::now(); + let mut batch = WriteBatch::default(); + let mut count: u64 = 0; + + println!("Seeding {TOTAL} old-format RLP RECEIPTS entries..."); + + for i in 0..TOTAL { + // Generate a deterministic "block hash" from the block index + let block_idx = i / RECEIPTS_PER_BLOCK; + let receipt_idx = i % RECEIPTS_PER_BLOCK; + let mut block_hash = [0u8; 32]; + // Use a prefix that won't collide with real block hashes (starts with 0xFF) + block_hash[0] = 0xFF; + block_hash[1] = 0xFE; + // Encode block_idx into bytes 24..31 + block_hash[24..32].copy_from_slice(&block_idx.to_be_bytes()); + + let key = rlp_encode_receipt_key(&block_hash, receipt_idx); + batch.put_cf(&cf, &key, &receipt_value); + count += 1; + + if count.is_multiple_of(BATCH_SIZE) { + db.write(batch).expect("Failed to write batch"); + batch = WriteBatch::default(); + + if count.is_multiple_of(5_000_000) { + let elapsed = start.elapsed().as_secs_f64(); + let rate = count as f64 / elapsed; + println!( + " {count}/{TOTAL} ({:.1}%) — {:.0} entries/sec — {:.1}s elapsed", + count as f64 / TOTAL as f64 * 100.0, + rate, + elapsed + ); + } + } + } + + // Final batch + if !count.is_multiple_of(BATCH_SIZE) { + db.write(batch).expect("Failed to write final batch"); + } + + let elapsed = start.elapsed().as_secs_f64(); + println!( + "Done! Seeded {count} entries in {elapsed:.1}s ({:.0} entries/sec)", + count as f64 / elapsed + ); + println!("Now reset metadata.json to {{\"schema_version\": 1}} and start ethrex."); + println!("Migration will copy entries from 'receipts' to 'receipts_v2' (two-CF approach)."); + println!( + "The old 'receipts' CF will be dropped automatically on the next startup after migration." + ); +} diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 92086bdec0b..92c3904b79d 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -7,7 +7,7 @@ use crate::{ tables::{ ACCOUNT_CODE_METADATA, ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, BLOCK_NUMBERS, BODIES, CANONICAL_BLOCK_HASHES, CHAIN_DATA, EXECUTION_WITNESSES, - FULLSYNC_HEADERS, HEADERS, INVALID_CHAINS, MISC_VALUES, PENDING_BLOCKS, RECEIPTS, + FULLSYNC_HEADERS, HEADERS, INVALID_CHAINS, MISC_VALUES, PENDING_BLOCKS, RECEIPTS_V2, SNAP_STATE, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES, TRANSACTION_LOCATIONS, }, }, @@ -592,9 +592,12 @@ impl Store { let mut transaction_locations = Vec::new(); while let Some(Ok((key, value))) = iter.next() { - // Ensure key is exactly tx_hash + block_hash (32 + 32 = 64 bytes) - // and starts with our exact tx_hash - if key.len() == 64 && &key[0..32] == tx_hash_bytes { + // Without a RocksDB prefix extractor, the iterator continues past + // the prefix boundary — break as soon as we leave our tx hash range. + if !key.starts_with(tx_hash_bytes) { + break; + } + if key.len() == 64 { transaction_locations.push(<(BlockNumber, BlockHash, Index)>::decode(&value)?); } } @@ -632,10 +635,9 @@ impl Store { index: Index, receipt: Receipt, ) -> Result<(), StoreError> { - // FIXME: Use dupsort table - let key = (block_hash, index).encode_to_vec(); + let key = receipt_key(&block_hash, index); let value = receipt.encode_to_vec(); - self.write_async(RECEIPTS, key, value).await + self.write_async(RECEIPTS_V2, key, value).await } /// Add receipts @@ -648,12 +650,12 @@ impl Store { .into_iter() .enumerate() .map(|(index, receipt)| { - let key = (block_hash, index as u64).encode_to_vec(); + let key = receipt_key(&block_hash, index as u64); let value = receipt.encode_to_vec(); (key, value) }) .collect(); - self.write_batch_async(RECEIPTS, batch_items).await + self.write_batch_async(RECEIPTS_V2, batch_items).await } /// Obtain receipt for a canonical block represented by the block number. @@ -675,8 +677,8 @@ impl Store { block_hash: BlockHash, index: Index, ) -> Result, StoreError> { - let key = (block_hash, index).encode_to_vec(); - self.read_async(RECEIPTS, key) + let key = receipt_key(&block_hash, index); + self.read_async(RECEIPTS_V2, key) .await? .map(|bytes| Receipt::decode(bytes.as_slice())) .transpose() @@ -1074,33 +1076,61 @@ impl Store { &self, block_hash: &BlockHash, ) -> Result, StoreError> { - self.get_receipts_for_block_from_index(block_hash, 0).await + self.get_receipts_for_block_from_index(block_hash, 0, None) + .await } - /// Retrieves receipts for a block starting from the given index. - /// Used by eth/70 partial receipt requests (EIP-7975). + /// Retrieves receipts for a block starting from the given index, + /// optionally limited to `max_count` receipts. + /// + /// Uses cursor-based prefix iteration over the 32-byte block hash prefix + /// for efficient batch retrieval. Used by: + /// - eth/70 partial receipt requests (EIP-7975) via p2p + /// - `eth_getTransactionReceipt` RPC with a count limit to avoid + /// fetching the entire block's receipts pub async fn get_receipts_for_block_from_index( &self, block_hash: &BlockHash, start_index: u64, + max_count: Option, ) -> Result, StoreError> { - let mut receipts = Vec::new(); - let mut index = start_index; + let backend = self.backend.clone(); + let block_hash = *block_hash; - let txn = self.backend.begin_read()?; - loop { - let key = (*block_hash, index).encode_to_vec(); - match txn.get(RECEIPTS, key.as_slice())? { - Some(receipt_bytes) => { - let receipt = Receipt::decode(receipt_bytes.as_slice())?; - receipts.push(receipt); - index += 1; + tokio::task::spawn_blocking(move || { + let txn = backend.begin_read()?; + let prefix = block_hash.as_bytes().to_vec(); + let iter = txn.prefix_iterator(RECEIPTS_V2, &prefix)?; + let mut receipts = Vec::new(); + for result in iter { + let (k, v) = result?; + if !k.starts_with(&prefix) { + break; + } + if k.len() != 40 { + continue; + } + // Skip entries before start_index (for eth/70 partial requests) + if start_index > 0 { + let idx_bytes: [u8; 8] = k[32..40] + .try_into() + .expect("slice is exactly 8 bytes (checked k.len() == 40)"); + let idx = u64::from_be_bytes(idx_bytes); + if idx < start_index { + continue; + } + } + receipts.push(Receipt::decode(v.as_ref())?); + if let Some(max) = max_count + && receipts.len() >= max + { + break; } - None => break, } - } - - Ok(receipts) + Ok(receipts) + }) + .await + .map_err(|e| StoreError::Custom(format!("Task panicked: {e}")))? } // Snap State methods @@ -1443,9 +1473,9 @@ impl Store { for (block_hash, receipts) in update_batch.receipts { for (index, receipt) in receipts.into_iter().enumerate() { - let key = (block_hash, index as u64).encode_to_vec(); + let key = receipt_key(&block_hash, index as u64); let value = receipt.encode_to_vec(); - tx.put(RECEIPTS, &key, &value)?; + tx.put(RECEIPTS_V2, &key, &value)?; } } @@ -3233,6 +3263,14 @@ fn snap_state_key(index: SnapStateIndex) -> Vec { (index as u8).encode_to_vec() } +/// Builds a fixed-width RECEIPTS key: block_hash (32B) || index (8B BE). +pub fn receipt_key(block_hash: &BlockHash, index: u64) -> Vec { + let mut key = Vec::with_capacity(40); + key.extend_from_slice(block_hash.as_bytes()); + key.extend_from_slice(&index.to_be_bytes()); + key +} + fn encode_code(code: &Code) -> Vec { let mut buf = Vec::with_capacity( 6 + code.bytecode.len() + std::mem::size_of_val(code.jump_targets.as_slice()), diff --git a/crates/vm/levm/bench/revm_comparison/Cargo.lock b/crates/vm/levm/bench/revm_comparison/Cargo.lock index e9cdff6f8a4..6908c649ac2 100644 --- a/crates/vm/levm/bench/revm_comparison/Cargo.lock +++ b/crates/vm/levm/bench/revm_comparison/Cargo.lock @@ -1108,6 +1108,7 @@ dependencies = [ "ethrex-rlp", "ethrex-trie", "fastbloom", + "libc", "lru", "rayon", "rustc-hash", @@ -1116,6 +1117,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/tooling/Cargo.lock b/tooling/Cargo.lock index db2503f429d..e0c3e6465d4 100644 --- a/tooling/Cargo.lock +++ b/tooling/Cargo.lock @@ -3800,6 +3800,7 @@ dependencies = [ "ethrex-rlp 12.0.0", "ethrex-trie 12.0.0", "fastbloom", + "libc", "lru 0.16.3", "rayon", "rocksdb", @@ -3809,6 +3810,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "tracing-subscriber 0.3.23", ] [[package]]