diff --git a/Cargo.lock b/Cargo.lock index 334280fb331..9da05ac8543 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4145,6 +4145,7 @@ name = "ethrex-levm" version = "9.0.0" dependencies = [ "bytes", + "dashmap", "derive_more 1.0.0", "ethrex-common", "ethrex-crypto", diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 7f7647f786b..964f37c3e85 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -941,13 +941,12 @@ pub async fn import_blocks_bench( _ => warn!("Failed to add block {number} with hash {hash:#x}"), })?; - // TODO: replace this - // This sleep is because we have a background process writing to disk the last layer - // And until it's done we can't execute the new block - // Because this wants to compare against running a real node in terms of reported performance - // It takes less than 500ms, so this is good enough, but we should report the performance - // without taking into account that wait. - tokio::time::sleep(Duration::from_millis(500)).await; + // TODO: replace this with an explicit "wait for persistence idle" call. + // The previous block's Phase 2 (disk write of bottom-most diff layer) runs in + // a background thread; we sleep so its completion doesn't bleed into the next + // block's per-block timer. 100ms is comfortably above Phase 2 cost on SSD for + // small blocks; raise if metrics get noisy. + tokio::time::sleep(Duration::from_millis(100)).await; } // Make head canonical and label all special blocks correctly. diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 078e12a87a9..711af26779c 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -837,36 +837,40 @@ impl Blockchain { const NUM_WORKERS: usize = 16; let parent_state_root = parent_header.state_root; - // === Stage A: Drain + accumulate all AccountUpdates === - // BAL guarantees completeness, so we block until execution finishes. - let mut all_updates: FxHashMap
= FxHashMap::default(); - for updates in rx { - let current_length = queue_length.fetch_sub(1, Ordering::Acquire); - *max_queue_length = current_length.max(*max_queue_length); - for update in updates { - match all_updates.entry(update.address) { - Entry::Vacant(e) => { - e.insert(update); - } - Entry::Occupied(mut e) => { - e.get_mut().merge(update); - } - } + // === Stage A: receive the single BAL-derived batch === + // execute_block_parallel calls bal_to_account_updates BEFORE the rayon tx + // loop and sends exactly one Vec)>]>,
) -> Result<(), EvmError> {
// Only visit accounts whose minimum change index <= max_idx.
let end = accounts_by_min_index.partition_point(|(min_idx, _)| *min_idx <= max_idx);
@@ -885,11 +937,18 @@ impl LEVM {
}
// Compute code update before borrowing acc (borrow checker: can't access
- // db.codes while acc holds a mutable borrow of db)
+ // db.codes while acc holds a mutable borrow of db).
+ // Cache hit avoids redundant keccak + jump-target scan when the same code
+ // change is replayed across multiple txs in the parallel loop.
let code_update = if code_pos > 0 {
- Some(Self::code_from_bal(
- &acct_changes.code_changes[code_pos - 1].new_code,
- ))
+ let code_idx = code_pos - 1;
+ if let Some(cache) = code_cache {
+ Some(cache[acct_idx][code_idx].clone())
+ } else {
+ Some(Self::code_from_bal(
+ &acct_changes.code_changes[code_idx].new_code,
+ ))
+ }
} else {
None
};
@@ -1059,31 +1118,57 @@ impl LEVM {
.is_some_and(|a| a.storage.contains_key(key))
});
- // Pre-compute capacity hint for per-tx DBs from BAL account count.
- let bal_account_count = bal.accounts().len();
+ // Pre-compute Code objects (hash + jump_targets) once per BAL code change.
+ // Without this, every tx that re-applies the same code change in seed_db_from_bal
+ // re-keccaks and re-scans the same bytecode. Stress blocks have 10-30 code changes
+ // and 200-500 txs, so the savings compound. Outer Vec indexed by BAL account index;
+ // inner indexed by code_change position. `Code` clone is cheap relative to recompute
+ // (Bytes is refcounted; only jump_targets Vec heap-clones).
+ let code_cache: Vec)>> = bal
+ .accounts()
+ .iter()
+ .map(|ac| {
+ ac.code_changes
+ .iter()
+ .map(|c| Self::code_from_bal(&c.new_code))
+ .collect()
+ })
+ .collect();
// 2. Execute all txs in parallel (embarrassingly parallel, BAL-seeded).
- // BAL validation is deferred to after the gas limit check (step 3) so that
- // blocks exceeding gas limit produce GAS_USED_OVERFLOW before BAL mismatch.
+ // Per-tx BAL validation runs INSIDE the closure (alongside exec) so the
+ // serial post-loop only handles gas accounting and shared-set bookkeeping.
+ // The `validation_error` field carries any per-tx BAL validation failure;
+ // it is surfaced AFTER the gas-limit check so that GAS_USED_OVERFLOW takes
+ // priority over BAL mismatch errors (the BAL is built assuming rejected
+ // txs; the miner balance in the BAL won't match execution that ran all txs).
+ //
+ // `current_state` and `codes` are dropped inside the closure after
+ // validation runs, so they don't need to traverse the rayon boundary.
+ // Per-tx reduction summaries (`destroyed_addresses`, `read_keys`) are
+ // pre-computed for the post-loop's `unread_storage_reads` cleanup.
type TxExecResult = (
usize,
TxType,
ExecutionReport,
- FxHashMap,
- FxHashMap,
FxHashSet, // accessed_accounts tracker (coarse)
- Vec, // shadow recorder touched_addresses (EIP-7928 exact)
- Vec<(Address, U256)>, // shadow recorder storage_reads (EIP-7928 exact)
+ Vec, // destroyed addresses (clear all reads for these)
+ Vec<(Address, H256)>, // read keys (specific entries to clear)
+ Option, // deferred BAL validation error (gas check first)
);
let exec_results: Result, EvmError> = (0..n_txs)
.into_par_iter()
.map(|tx_idx| -> Result<_, EvmError> {
let (tx, sender) = &txs_with_sender[tx_idx];
+ // Per-tx capacity hint: most txs touch <10 accounts. Sizing to the full
+ // BAL account count (often 100s on stress blocks) inflates per-task allocator
+ // pressure × n_rayon_workers. Use a small constant; HashMap grows on demand.
+ const PER_TX_DB_CAPACITY: usize = 32;
let mut tx_db = GeneralizedDatabase::new_with_shared_base_and_capacity(
store.clone(),
system_seed.clone(),
- bal_account_count,
+ PER_TX_DB_CAPACITY,
);
// Small capacity: parallel txs rarely nest >8 call frames, and
// over-allocating per-tx wastes memory across many rayon tasks.
@@ -1098,6 +1183,7 @@ impl LEVM {
bal,
u32::try_from(tx_idx).unwrap_or(u32::MAX),
&validation_index.accounts_by_min_index,
+ Some(&code_cache),
)?;
// Enable accessed_accounts tracker (coarse) for `unaccessed_pure_accounts`
@@ -1139,28 +1225,103 @@ impl LEVM {
.take()
.map(|mut r| (r.take_touched_addresses(), r.take_storage_reads()))
.unwrap_or_default();
+
+ // Pre-compute per-tx summaries for the post-loop's unread_storage_reads
+ // cleanup, then drop current_state/codes (they don't cross the rayon
+ // boundary). Destroyed accounts clear ALL their reads; non-destroyed
+ // accounts clear only the slots they actually loaded.
+ let mut destroyed_addresses: Vec = Vec::new();
+ let mut read_keys: Vec<(Address, H256)> = Vec::new();
+ for (addr, acct) in ¤t_state {
+ if matches!(
+ acct.status,
+ AccountStatus::Destroyed | AccountStatus::DestroyedModified
+ ) {
+ destroyed_addresses.push(*addr);
+ } else {
+ for key in acct.storage.keys() {
+ read_keys.push((*addr, *key));
+ }
+ }
+ }
+
+ // Run BAL validation inside the closure. Errors are deferred to the
+ // serial post-loop so the gas-limit check can take priority.
+ let bal_idx = u32::try_from(tx_idx + 1).unwrap_or(u32::MAX);
+ let seed_idx = u32::try_from(tx_idx).unwrap_or(u32::MAX);
+ let validation_error: Option = (|| {
+ Self::validate_tx_execution(
+ bal_idx,
+ seed_idx,
+ ¤t_state,
+ &codes,
+ bal,
+ validation_index,
+ &system_seed,
+ &store,
+ )
+ .map_err(|e| {
+ EvmError::Custom(format!("BAL validation failed for tx {tx_idx}: {e}"))
+ })?;
+
+ // EIP-7928 (Group B): missing-account check.
+ for addr in &shadow_touched {
+ if !validation_index.addr_to_idx.contains_key(addr) {
+ return Err(EvmError::Custom(format!(
+ "BAL validation failed for tx {tx_idx}: account {addr:?} was \
+ accessed during execution but is missing from BAL"
+ )));
+ }
+ }
+ // EIP-7928 (Group B): missing storage_read check.
+ for (addr, slot) in &shadow_reads {
+ let Some(&bal_acct_idx) = validation_index.addr_to_idx.get(addr) else {
+ // Already caught by the touched-address check above.
+ continue;
+ };
+ let acct = &bal.accounts()[bal_acct_idx];
+ let in_changes = acct
+ .storage_changes
+ .binary_search_by(|sc| sc.slot.cmp(slot))
+ .is_ok();
+ let in_reads = acct.storage_reads.contains(slot);
+ if !in_changes && !in_reads {
+ return Err(EvmError::Custom(format!(
+ "BAL validation failed for tx {tx_idx}: storage slot {slot} of \
+ account {addr:?} was read during execution but is missing from \
+ BAL (no storage_changes or storage_reads entry)"
+ )));
+ }
+ }
+ Ok(())
+ })()
+ .err();
+
+ drop(current_state);
+ drop(codes);
+
Ok((
tx_idx,
tx.tx_type(),
report,
- current_state,
- codes,
tracked,
- shadow_touched,
- shadow_reads,
+ destroyed_addresses,
+ read_keys,
+ validation_error,
))
})
.collect();
let mut exec_results = exec_results?;
- // Sort so gas accounting and validation happen in tx order.
- exec_results.sort_unstable_by_key(|(idx, _, _, _, _, _, _, _)| *idx);
+ // Sort so gas accounting and validation surfacing happen in tx order.
+ exec_results.sort_unstable_by_key(|(idx, _, _, _, _, _, _)| *idx);
- // 3. Gas limit check — must happen BEFORE BAL validation so that blocks
- // exceeding the gas limit produce GAS_USED_OVERFLOW instead of a BAL
- // mismatch error (the BAL is built assuming rejected txs, so the miner
- // balance in the BAL won't match execution that ran all txs).
+ // 3. Gas limit check — must happen BEFORE surfacing any BAL validation
+ // error so that blocks exceeding the gas limit produce
+ // GAS_USED_OVERFLOW instead of a BAL mismatch error (the BAL is
+ // built assuming rejected txs, so the miner balance in the BAL won't
+ // match execution that ran all txs).
//
// EIP-8037 PR #2703: also enforce the per-tx 2D inclusion check
// against running block totals. A tx whose worst-case regular or
@@ -1168,7 +1329,7 @@ impl LEVM {
// position invalidates the block with GAS_ALLOWANCE_EXCEEDED.
let mut block_regular_gas_used = 0_u64;
let mut block_state_gas_used = 0_u64;
- for (tx_idx, _, report, _, _, _, _, _) in &exec_results {
+ for (tx_idx, _, report, _, _, _, _) in &exec_results {
let (tx, _) = txs_with_sender
.get(*tx_idx)
.ok_or_else(|| EvmError::Custom(format!("tx index {tx_idx} out of bounds")))?;
@@ -1197,95 +1358,42 @@ impl LEVM {
)));
}
- // 4. Per-tx BAL validation — now safe to run after gas limit is confirmed OK.
- // Also mark off storage_reads that appear in per-tx execution state.
- for (tx_idx, _, _, current_state, codes, tracked_accounts, shadow_touched, shadow_reads) in
- &exec_results
- {
- let bal_idx = u32::try_from(*tx_idx + 1).unwrap_or(u32::MAX);
- let seed_idx = u32::try_from(*tx_idx).unwrap_or(u32::MAX);
- Self::validate_tx_execution(
- bal_idx,
- seed_idx,
- current_state,
- codes,
- bal,
- validation_index,
- &system_seed,
- &store,
- )
- .map_err(|e| EvmError::Custom(format!("BAL validation failed for tx {tx_idx}: {e}")))?;
-
- // Mark storage_reads that were actually loaded during this tx.
- // storage_reads slots are NOT in storage_changes (conflict check ensures this),
- // so they're not seeded. If a slot appears in the per-tx state's storage,
- // the tx genuinely read it via SLOAD.
- // Special case: selfdestruct clears storage from the final state, so reads
- // that happened before destruction are no longer visible. For destroyed
- // accounts, mark ALL their BAL storage_reads as satisfied.
- if !unread_storage_reads.is_empty() {
- for (addr, acct) in current_state {
- if matches!(
- acct.status,
- AccountStatus::Destroyed | AccountStatus::DestroyedModified
- ) {
- unread_storage_reads.retain(|&(a, _)| a != *addr);
- } else {
- for key in acct.storage.keys() {
- unread_storage_reads.remove(&(*addr, *key));
- }
- }
- }
+ // 4. Surface earliest deferred BAL validation error (post gas check).
+ for (_, _, _, _, _, _, val_err) in &mut exec_results {
+ if let Some(err) = val_err.take() {
+ return Err(err);
}
+ }
- // Mark pure-access accounts that were accessed during this tx.
+ // 5. Apply per-tx summaries to the shared `unread_storage_reads` and
+ // `unaccessed_pure_accounts` checklists. The per-tx closures
+ // pre-computed these from `current_state` before dropping it.
+ if !unread_storage_reads.is_empty() {
+ for (_, _, _, _, destroyed, read_keys, _) in &exec_results {
+ for addr in destroyed {
+ unread_storage_reads.retain(|&(a, _)| a != *addr);
+ }
+ for entry in read_keys {
+ unread_storage_reads.remove(entry);
+ }
+ }
+ }
+ if !unaccessed_pure_accounts.is_empty() {
// The coinbase is always accessed during fee finalization (geth's
// readerTracker records it), even when the miner fee is zero and
// ethrex skips the load_account call.
- if !unaccessed_pure_accounts.is_empty() {
- unaccessed_pure_accounts.remove(&header.coinbase);
+ unaccessed_pure_accounts.remove(&header.coinbase);
+ for (_, _, _, tracked_accounts, _, _, _) in &exec_results {
for addr in tracked_accounts {
unaccessed_pure_accounts.remove(addr);
}
}
-
- // EIP-7928 (Group B): missing-access detection using the shadow recorder.
- // For each address the per-tx shadow recorder marked as touched, the header
- // BAL must contain an entry for it. For each storage read, the header BAL
- // must carry the slot either in storage_changes or storage_reads.
- for addr in shadow_touched {
- if !validation_index.addr_to_idx.contains_key(addr) {
- return Err(EvmError::Custom(format!(
- "BAL validation failed for tx {tx_idx}: account {addr:?} was \
- accessed during execution but is missing from BAL"
- )));
- }
- }
- for (addr, slot) in shadow_reads {
- let Some(&bal_acct_idx) = validation_index.addr_to_idx.get(addr) else {
- // Already caught by the touched-address check above.
- continue;
- };
- let acct = &bal.accounts()[bal_acct_idx];
- let in_changes = acct
- .storage_changes
- .binary_search_by(|sc| sc.slot.cmp(slot))
- .is_ok();
- let in_reads = acct.storage_reads.contains(slot);
- if !in_changes && !in_reads {
- return Err(EvmError::Custom(format!(
- "BAL validation failed for tx {tx_idx}: storage slot {slot} of \
- account {addr:?} was read during execution but is missing from \
- BAL (no storage_changes or storage_reads entry)"
- )));
- }
- }
}
- // 5. Build receipts in tx order.
+ // 6. Build receipts in tx order.
let mut receipts = Vec::with_capacity(n_txs);
let mut cumulative_gas_used = 0_u64;
- for (_, tx_type, report, _, _, _, _, _) in exec_results {
+ for (_, tx_type, report, _, _, _, _) in exec_results {
cumulative_gas_used += report.gas_spent;
let receipt = Receipt::new(
tx_type,
diff --git a/crates/vm/levm/Cargo.toml b/crates/vm/levm/Cargo.toml
index 834650b5c4b..d45f56acf8d 100644
--- a/crates/vm/levm/Cargo.toml
+++ b/crates/vm/levm/Cargo.toml
@@ -20,6 +20,7 @@ malachite = "0.6.1"
strum = { version = "0.27.1", features = ["derive"] }
rustc-hash.workspace = true
rayon.workspace = true
+dashmap = "6.1"
[features]
diff --git a/crates/vm/levm/src/db/mod.rs b/crates/vm/levm/src/db/mod.rs
index cafdd1cbef0..4463f789b26 100644
--- a/crates/vm/levm/src/db/mod.rs
+++ b/crates/vm/levm/src/db/mod.rs
@@ -1,18 +1,23 @@
use crate::{errors::DatabaseError, precompiles::PrecompileCache};
+use dashmap::DashMap;
use ethrex_common::{
Address, H256, U256,
types::{AccountState, ChainConfig, Code, CodeMetadata},
};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
-use rustc_hash::FxHashMap;
-use std::sync::{Arc, OnceLock, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard};
+use rustc_hash::FxBuildHasher;
+use std::sync::{Arc, OnceLock};
pub mod gen_db;
-// Type aliases for cache storage maps
-type AccountCache = FxHashMap;
-type StorageCache = FxHashMap<(Address, H256), U256>;
-type CodeCache = FxHashMap;
+// Type aliases for cache storage maps. Sharded concurrent maps replace the old
+// `RwLock` design — under the parallel BAL path, 16 rayon workers all
+// hammered the single account RwLock, contributing ~11% of CPU to lock
+// contention (perf measured 2026-04-28). DashMap shards by key hash so workers
+// touching different addresses don't contend.
+type AccountCache = DashMap;
+type StorageCache = DashMap<(Address, H256), U256, FxBuildHasher>;
+type CodeCache = DashMap;
pub trait Database: Send + Sync {
fn get_account_state(&self, address: Address) -> Result;
@@ -53,11 +58,11 @@ pub trait Database: Send + Sync {
pub struct CachingDatabase {
inner: Arc,
/// Cached account states (balance, nonce, code_hash, storage_root)
- accounts: RwLock,
+ accounts: AccountCache,
/// Cached storage values
- storage: RwLock,
+ storage: StorageCache,
/// Cached contract code
- code: RwLock,
+ code: CodeCache,
/// Shared precompile result cache (warmer populates, executor reuses)
precompile_cache: PrecompileCache,
/// Cached chain config (constant for the lifetime of this database)
@@ -68,9 +73,9 @@ impl CachingDatabase {
pub fn new(inner: Arc) -> Self {
Self {
inner,
- accounts: RwLock::new(FxHashMap::default()),
- storage: RwLock::new(FxHashMap::default()),
- code: RwLock::new(FxHashMap::default()),
+ accounts: AccountCache::with_hasher(FxBuildHasher),
+ storage: StorageCache::with_hasher(FxBuildHasher),
+ code: CodeCache::with_hasher(FxBuildHasher),
precompile_cache: PrecompileCache::new(),
chain_config: OnceLock::new(),
}
@@ -80,40 +85,12 @@ impl CachingDatabase {
pub fn precompile_cache(&self) -> &PrecompileCache {
&self.precompile_cache
}
-
- fn read_accounts(&self) -> Result, DatabaseError> {
- self.accounts.read().map_err(poison_error_to_db_error)
- }
-
- fn write_accounts(&self) -> Result, DatabaseError> {
- self.accounts.write().map_err(poison_error_to_db_error)
- }
-
- fn read_storage(&self) -> Result, DatabaseError> {
- self.storage.read().map_err(poison_error_to_db_error)
- }
-
- fn write_storage(&self) -> Result, DatabaseError> {
- self.storage.write().map_err(poison_error_to_db_error)
- }
-
- fn read_code(&self) -> Result, DatabaseError> {
- self.code.read().map_err(poison_error_to_db_error)
- }
-
- fn write_code(&self) -> Result, DatabaseError> {
- self.code.write().map_err(poison_error_to_db_error)
- }
-}
-
-fn poison_error_to_db_error(err: PoisonError) -> DatabaseError {
- DatabaseError::Custom(format!("Cache lock poisoned: {err}"))
}
impl Database for CachingDatabase {
fn get_account_state(&self, address: Address) -> Result {
- // Check cache first
- if let Some(state) = self.read_accounts()?.get(&address).copied() {
+ // Check cache first (per-shard lock; non-contended for distinct addresses)
+ if let Some(state) = self.accounts.get(&address).map(|r| *r) {
return Ok(state);
}
@@ -121,14 +98,14 @@ impl Database for CachingDatabase {
let state = self.inner.get_account_state(address)?;
// Populate cache (AccountState is Copy, no clone needed)
- self.write_accounts()?.insert(address, state);
+ self.accounts.insert(address, state);
Ok(state)
}
fn get_storage_value(&self, address: Address, key: H256) -> Result {
// Check cache first
- if let Some(value) = self.read_storage()?.get(&(address, key)).copied() {
+ if let Some(value) = self.storage.get(&(address, key)).map(|r| *r) {
return Ok(value);
}
@@ -136,7 +113,7 @@ impl Database for CachingDatabase {
let value = self.inner.get_storage_value(address, key)?;
// Populate cache (U256 is Copy, no clone needed)
- self.write_storage()?.insert((address, key), value);
+ self.storage.insert((address, key), value);
Ok(value)
}
@@ -159,7 +136,7 @@ impl Database for CachingDatabase {
fn get_account_code(&self, code_hash: H256) -> Result {
// Check cache first
- if let Some(code) = self.read_code()?.get(&code_hash).cloned() {
+ if let Some(code) = self.code.get(&code_hash).map(|r| r.clone()) {
return Ok(code);
}
@@ -167,7 +144,7 @@ impl Database for CachingDatabase {
let code = self.inner.get_account_code(code_hash)?;
// Populate cache (Code contains Bytes which is ref-counted, clone is cheap)
- self.write_code()?.insert(code_hash, code.clone());
+ self.code.insert(code_hash, code.clone());
Ok(code)
}
@@ -184,32 +161,23 @@ impl Database for CachingDatabase {
}
fn prefetch_accounts(&self, addresses: &[Address]) -> Result<(), DatabaseError> {
- // Fetch from inner in parallel (no lock contention), then single write-lock to populate cache.
- let fetched: Vec<(Address, AccountState)> = addresses
+ // Fetch from inner in parallel; insert into sharded cache without serializing
+ // through a single write lock — DashMap inserts touch only the matching shard.
+ addresses
.par_iter()
- .map(|&addr| self.inner.get_account_state(addr).map(|s| (addr, s)))
- .collect::>()?;
- let mut cache = self.write_accounts()?;
- for (addr, state) in fetched {
- cache.entry(addr).or_insert(state);
- }
- Ok(())
+ .try_for_each(|&addr| -> Result<(), DatabaseError> {
+ let state = self.inner.get_account_state(addr)?;
+ self.accounts.entry(addr).or_insert(state);
+ Ok(())
+ })
}
fn prefetch_storage(&self, keys: &[(Address, H256)]) -> Result<(), DatabaseError> {
- // Fetch from inner in parallel (no lock contention), then single write-lock to populate cache.
- let fetched: Vec<((Address, H256), U256)> = keys
- .par_iter()
- .map(|&(addr, key)| {
- self.inner
- .get_storage_value(addr, key)
- .map(|v| ((addr, key), v))
+ keys.par_iter()
+ .try_for_each(|&(addr, key)| -> Result<(), DatabaseError> {
+ let value = self.inner.get_storage_value(addr, key)?;
+ self.storage.entry((addr, key)).or_insert(value);
+ Ok(())
})
- .collect::>()?;
- let mut cache = self.write_storage()?;
- for (key, value) in fetched {
- cache.entry(key).or_insert(value);
- }
- Ok(())
}
}