diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 683a5cc4ccb..9c4c948d3c0 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -10,7 +10,7 @@ use std::{ use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand}; use ethrex_blockchain::{ - BlockchainOptions, BlockchainType, L2Config, + BlockchainOptions, BlockchainType, DEFAULT_MEMPOOL_REORG_DEPTH, L2Config, error::{ChainError, InvalidBlockError}, }; use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body}; @@ -183,6 +183,15 @@ pub struct Options { env = "ETHREX_MEMPOOL_MAX_SIZE" )] pub mempool_max_size: usize, + #[arg( + help = "Maximum reorg depth (in blocks) for which transactions in orphaned blocks are re-injected into the mempool. Deeper reorgs skip re-injection.", + long = "mempool.reorg-depth", + default_value_t = DEFAULT_MEMPOOL_REORG_DEPTH, + value_name = "MEMPOOL_REORG_DEPTH", + help_heading = "Node options", + env = "ETHREX_MEMPOOL_REORG_DEPTH" + )] + pub mempool_reorg_depth: u64, #[arg( long = "http.addr", default_value = "0.0.0.0", @@ -392,6 +401,7 @@ impl Options { discv4_enabled: true, discv5_enabled: true, mempool_max_size: 10_000, + mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH, ..Default::default() } } @@ -414,6 +424,7 @@ impl Options { discv4_enabled: true, discv5_enabled: true, mempool_max_size: 10_000, + mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH, ..Default::default() } } @@ -450,6 +461,7 @@ impl Default for Options { dev: Default::default(), force: false, mempool_max_size: Default::default(), + mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH, tx_broadcasting_time_interval: Default::default(), target_peers: Default::default(), lookup_interval: Default::default(), diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index a7a093ea72d..88ea5fd63bd 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -529,6 +529,7 @@ pub async fn init_l1( max_blobs_per_block: opts.max_blobs_per_block, precompute_witnesses: opts.precompute_witnesses, precompile_cache_enabled: !opts.no_precompile_cache, + reorg_depth: opts.mempool_reorg_depth, }, ); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index f051c5b65ce..dabf4adee99 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -224,6 +224,7 @@ pub async fn init_l2( max_blobs_per_block: None, // L2 doesn't support blob transactions precompute_witnesses: opts.node_opts.precompute_witnesses, precompile_cache_enabled: true, + reorg_depth: opts.node_opts.mempool_reorg_depth, }; let blockchain = init_blockchain(store.clone(), blockchain_opts.clone()); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 98ebe601fd1..7dbbebf70f5 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -119,6 +119,12 @@ use ethrex_common::types::BlobsBundle; const MAX_PAYLOADS: usize = 10; const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000; +/// Maximum reorg depth (in blocks) for which orphaned-block transactions are +/// re-injected into the mempool. Reorgs deeper than this are treated as +/// catastrophic — re-injection is skipped to bound the amount of work done +/// during fork-choice processing. +pub const DEFAULT_MEMPOOL_REORG_DEPTH: u64 = 64; + /// Background thread for dropping large tree structures off the critical path. /// Accepts any `Send` value and drops it on a dedicated thread, avoiding /// recursive deallocation costs (~500us for state trie roots) on hot paths. @@ -231,6 +237,10 @@ pub struct BlockchainOptions { /// warmer thread and the executor. Set to false (via `--no-precompile-cache`) to /// disable the cache for benchmarking purposes. pub precompile_cache_enabled: bool, + /// Maximum reorg depth (in blocks) for which transactions in orphaned + /// blocks are re-injected into the mempool. Reorgs deeper than this skip + /// re-injection and log a warning. + pub reorg_depth: u64, } impl Default for BlockchainOptions { @@ -242,6 +252,7 @@ impl Default for BlockchainOptions { max_blobs_per_block: None, precompute_witnesses: false, precompile_cache_enabled: true, + reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH, } } } @@ -2374,10 +2385,202 @@ impl Blockchain { self.mempool.remove_transaction(hash) } - /// Remove all transactions in the executed block from the pool (if we have them) + /// Remove all transactions in the executed block from the pool (if we have them). + /// + /// For blob (EIP-4844) transactions, the sidecars are moved into the mempool + /// limbo rather than discarded, so they remain available for re-injection if + /// the block gets orphaned by a reorg. Limbo entries are purged once the + /// block is finalized. pub fn remove_block_transactions_from_pool(&self, block: &Block) -> Result<(), StoreError> { for tx in &block.body.transactions { - self.mempool.remove_transaction(&tx.hash())?; + self.mempool.remove_included_transaction(&tx.hash())?; + } + Ok(()) + } + + /// Re-inject transactions that were in orphaned blocks back into the mempool + /// after a reorg, with full admission re-validation. + /// + /// Behaviour: + /// - If the reorg depth exceeds `options.reorg_depth`, the entire re-injection + /// is skipped and a warning is logged. + /// - Each transaction is admitted via the same path as a freshly-submitted tx + /// (`add_transaction_to_pool` / `add_blob_transaction_to_pool`), so all + /// stateful checks (nonce/balance under the new canonical head) are applied. + /// - For blob transactions, the sidecar is pulled from the mempool limbo. If + /// the sidecar is missing (e.g. the node was restarted between block + /// inclusion and the reorg), the blob tx is dropped silently — there is no + /// way to reconstruct sidecars. + /// - Failures are logged at debug level and the loop continues; re-injection + /// is strictly best-effort. + /// + /// Returns the number of transactions actually re-injected. + pub async fn reinject_orphaned_transactions( + &self, + previous_head_hash: H256, + new_head_hash: H256, + ) -> Result { + let Some(branches) = + fork_choice::find_common_ancestor(&self.storage, previous_head_hash, new_head_hash) + .await? + else { + debug!( + previous_head = %previous_head_hash, + new_head = %new_head_hash, + "Could not compute common ancestor for reorg re-injection; skipping", + ); + return Ok(0); + }; + + let depth = branches.depth(); + if depth == 0 { + // Not a reorg (e.g. the new head was a descendant of the previous head). + return Ok(0); + } + + if depth > self.options.reorg_depth { + warn!( + depth, + cap = self.options.reorg_depth, + previous_head = %previous_head_hash, + new_head = %new_head_hash, + "Reorg deeper than configured cap; skipping mempool re-injection", + ); + return Ok(0); + } + + let txs = fork_choice::collect_orphaned_transactions(&self.storage, &branches).await?; + info!( + depth, + orphaned_tx_count = txs.len(), + previous_head = %previous_head_hash, + new_head = %new_head_hash, + "Reorg detected; re-injecting orphaned transactions into mempool", + ); + + let mut reinjected = 0usize; + let mut skipped_full = 0usize; + for tx in txs { + // Don't evict freshly-arrived (post-reorg) txs to make room for + // orphaned ones. If the pool is already at capacity, stop + // re-injecting and log how many were skipped. + if self.mempool.is_full()? { + skipped_full += 1; + continue; + } + let tx_hash = tx.hash(); + match tx { + #[cfg(feature = "c-kzg")] + Transaction::EIP4844Transaction(inner) => { + let Some(blobs_bundle) = self.mempool.take_blob_limbo_entry(&tx_hash)? else { + debug!( + tx_hash = %tx_hash, + "Blob sidecar missing from limbo; cannot re-inject orphaned EIP-4844 tx", + ); + continue; + }; + // Clone the bundle so we can re-insert it into limbo if + // admission fails for a transient reason (pool full, + // replacement rules, etc.). Otherwise the sidecar would + // be lost permanently and the blob tx could never be + // re-injected on a later attempt. + let bundle_backup = blobs_bundle.clone(); + match self.add_blob_transaction_to_pool(inner, blobs_bundle).await { + Ok(_) => reinjected += 1, + Err(error) => { + debug!( + tx_hash = %tx_hash, + %error, + "Failed to re-inject orphaned EIP-4844 transaction; restoring sidecar to limbo", + ); + self.mempool + .insert_blob_limbo_entry(tx_hash, bundle_backup)?; + } + } + } + #[cfg(not(feature = "c-kzg"))] + Transaction::EIP4844Transaction(_) => { + debug!( + tx_hash = %tx_hash, + "Skipping orphaned EIP-4844 transaction (c-kzg feature disabled)", + ); + } + other => match self.add_transaction_to_pool(other).await { + Ok(_) => reinjected += 1, + Err(error) => { + debug!( + tx_hash = %tx_hash, + %error, + "Failed to re-inject orphaned transaction", + ); + } + }, + } + } + if skipped_full > 0 { + warn!( + skipped = skipped_full, + reinjected, + "Mempool reached capacity during reorg re-injection; remaining orphaned txs dropped to preserve freshly-arrived ones", + ); + } + Ok(reinjected) + } + + /// Purge blob sidecars from the mempool limbo for all transactions in + /// blocks between (exclusive) the previously-finalized block and (inclusive) + /// the newly-finalized block. + /// + /// Once a block is finalized it cannot be reorged, so we no longer need to + /// hold on to its blob sidecars. `previous_finalized_hash` may be zero on + /// the very first finalization advance, in which case purging walks every + /// canonical block from genesis up to the new finalized height. + /// + /// The walk is by canonical block number, not by parent_hash chasing, + /// because the input is a CL-supplied finalized hash (already part of the + /// canonical chain). This removes the previous `reorg_depth` cap, which + /// would leak sidecars whenever a finalization gap exceeded the cap + /// (fresh-node first finalization, CL downtime catch-up, etc.). + pub async fn purge_finalized_blob_limbo( + &self, + previous_finalized_hash: H256, + new_finalized_hash: H256, + ) -> Result<(), StoreError> { + if new_finalized_hash.is_zero() || previous_finalized_hash == new_finalized_hash { + return Ok(()); + } + + let Some(new_header) = self.storage.get_block_header_by_hash(new_finalized_hash)? else { + return Ok(()); + }; + let start_number = if previous_finalized_hash.is_zero() { + 0 + } else { + match self + .storage + .get_block_header_by_hash(previous_finalized_hash)? + { + Some(header) => header.number.saturating_add(1), + // Previous finalized hash isn't known to this store (sync gap?). + // Fall back to a full walk from genesis so we still drain stale + // limbo entries. + None => 0, + } + }; + + let mut hashes_to_purge: Vec = Vec::new(); + for number in start_number..=new_header.number { + let Some(block_hash) = self.storage.get_canonical_block_hash(number).await? else { + continue; + }; + if let Some(body) = self.storage.get_block_body_by_hash(block_hash).await? { + for tx in body.transactions { + hashes_to_purge.push(tx.hash()); + } + } + } + if !hashes_to_purge.is_empty() { + self.mempool.purge_blob_limbo_entries(&hashes_to_purge)?; } Ok(()) } diff --git a/crates/blockchain/fork_choice.rs b/crates/blockchain/fork_choice.rs index fa68bee8bf6..5196fcdc19b 100644 --- a/crates/blockchain/fork_choice.rs +++ b/crates/blockchain/fork_choice.rs @@ -1,10 +1,11 @@ use ethrex_common::{ H256, - types::{BlockHash, BlockHeader, BlockNumber}, + types::{BlockHash, BlockHeader, BlockNumber, Transaction}, }; use ethrex_metrics::metrics; use ethrex_storage::{Store, error::StoreError}; -use tracing::{error, warn}; +use rustc_hash::FxHashSet; +use tracing::{debug, error, warn}; use crate::{ error::{self, InvalidForkChoice}, @@ -205,3 +206,158 @@ async fn find_link_with_canonical_chain( Ok(None) } + +/// Result of a reorg analysis between two heads: the common ancestor plus the +/// branches of orphaned blocks (between the common ancestor and the previous +/// head, exclusive of the ancestor) and new canonical blocks (between the +/// common ancestor and the new head, exclusive of the ancestor). +/// +/// Branches are ordered from oldest to newest (i.e. block at index 0 is the +/// child of the common ancestor). +#[derive(Debug)] +pub struct ReorgBranches { + pub common_ancestor_number: BlockNumber, + pub common_ancestor_hash: BlockHash, + pub orphaned: Vec<(BlockNumber, BlockHash)>, + pub new_canonical: Vec<(BlockNumber, BlockHash)>, +} + +impl ReorgBranches { + /// Reorg depth, defined as the number of orphaned blocks between the + /// previous head and the common ancestor. + pub fn depth(&self) -> u64 { + self.orphaned.len() as u64 + } +} + +/// Compute the common ancestor between two block hashes and return the orphaned +/// and new-canonical branches. +/// +/// The algorithm walks both chains backward (by parent_hash), keeping the two +/// pointers at equal block numbers, until they converge on a shared ancestor. +/// +/// Returns `Ok(None)` if either hash cannot be found in the store, or if no +/// common ancestor exists (e.g. one of the chains is not connected to the +/// other in the DB). +pub async fn find_common_ancestor( + store: &Store, + previous_head_hash: BlockHash, + new_head_hash: BlockHash, +) -> Result, StoreError> { + // Fast path: same hash. + if previous_head_hash == new_head_hash { + let header = match store.get_block_header_by_hash(previous_head_hash)? { + Some(h) => h, + None => return Ok(None), + }; + return Ok(Some(ReorgBranches { + common_ancestor_number: header.number, + common_ancestor_hash: previous_head_hash, + orphaned: Vec::new(), + new_canonical: Vec::new(), + })); + } + + let Some(mut prev_header) = store.get_block_header_by_hash(previous_head_hash)? else { + return Ok(None); + }; + let Some(mut new_header) = store.get_block_header_by_hash(new_head_hash)? else { + return Ok(None); + }; + + let mut prev_hash = previous_head_hash; + let mut new_hash = new_head_hash; + + let mut orphaned: Vec<(BlockNumber, BlockHash)> = Vec::new(); + let mut new_canonical: Vec<(BlockNumber, BlockHash)> = Vec::new(); + + // Bring both pointers down to the same block number. + while new_header.number > prev_header.number { + new_canonical.push((new_header.number, new_hash)); + new_hash = new_header.parent_hash; + new_header = match store.get_block_header_by_hash(new_hash)? { + Some(h) => h, + None => return Ok(None), + }; + } + while prev_header.number > new_header.number { + orphaned.push((prev_header.number, prev_hash)); + prev_hash = prev_header.parent_hash; + prev_header = match store.get_block_header_by_hash(prev_hash)? { + Some(h) => h, + None => return Ok(None), + }; + } + + // Walk both pointers in lockstep until they meet. + while prev_hash != new_hash { + if prev_header.number == 0 || new_header.number == 0 { + // Reached genesis without convergence. + return Ok(None); + } + orphaned.push((prev_header.number, prev_hash)); + new_canonical.push((new_header.number, new_hash)); + prev_hash = prev_header.parent_hash; + new_hash = new_header.parent_hash; + prev_header = match store.get_block_header_by_hash(prev_hash)? { + Some(h) => h, + None => return Ok(None), + }; + new_header = match store.get_block_header_by_hash(new_hash)? { + Some(h) => h, + None => return Ok(None), + }; + } + + // Branches were built newest -> oldest while walking; flip them to + // oldest -> newest for callers. + orphaned.reverse(); + new_canonical.reverse(); + + Ok(Some(ReorgBranches { + common_ancestor_number: prev_header.number, + common_ancestor_hash: prev_hash, + orphaned, + new_canonical, + })) +} + +/// Collect the set of transactions that should be re-injected into the mempool +/// after a reorg: all transactions that appear in the orphaned branch but NOT +/// in the new canonical branch. +/// +/// Returns the list of transactions to re-inject, in the order they appeared +/// in the orphaned chain (oldest block first, intra-block order preserved). +pub async fn collect_orphaned_transactions( + store: &Store, + branches: &ReorgBranches, +) -> Result, StoreError> { + // First, build the set of tx hashes that landed in the new canonical + // branch. We subtract these from the orphaned set so a tx that appears + // on both sides (e.g. a user broadcast that got picked up by both + // proposers) is treated as "still included" and not re-injected. + let mut new_canonical_hashes: FxHashSet = FxHashSet::default(); + for (_number, hash) in &branches.new_canonical { + let Some(body) = store.get_block_body_by_hash(*hash).await? else { + continue; + }; + for tx in &body.transactions { + new_canonical_hashes.insert(tx.hash()); + } + } + + // Now walk the orphaned branch and collect transactions not seen in new canonical. + let mut to_reinject = Vec::new(); + for (_number, hash) in &branches.orphaned { + let Some(body) = store.get_block_body_by_hash(*hash).await? else { + debug!(block_hash = %hash, "Orphaned block body not found in store; skipping for re-injection"); + continue; + }; + for tx in body.transactions { + if !new_canonical_hashes.contains(&tx.hash()) { + to_reinject.push(tx); + } + } + } + Ok(to_reinject) +} diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index bdc31f8aaac..a2cdc26ae33 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -28,6 +28,16 @@ struct MempoolInner { broadcast_pool: FxHashSet, transaction_pool: FxHashMap, blobs_bundle_pool: FxHashMap, + /// Blob sidecars for transactions that have been included in a recently + /// applied block but whose containing block has not yet been finalized. + /// Kept around so that on a reorg the sidecars can be pulled back along + /// with the orphaned transactions and re-injected into the mempool. + /// + /// Entries are moved here from `blobs_bundle_pool` when a block is + /// applied and removed when the containing block is finalized (purged + /// by `purge_blob_limbo_entries`) or when pulled back for re-injection + /// on a reorg (`take_blob_limbo_entry`). + blobs_bundle_limbo: FxHashMap, /// Transaction hashes that have been requested via GetPooledTransactions /// but whose responses haven't arrived yet. Used to avoid sending duplicate /// requests when multiple peers announce the same transaction. @@ -71,13 +81,13 @@ impl MempoolInner { Ok(()) } - /// Remove a blobs bundle from the pool - pub fn remove_blob_bundle(&mut self, hash: &H256) { - let Some(h) = self.blobs_bundle_pool.remove(hash) else { - return; - }; - - for commitment in &h.commitments { + /// Clear the versioned-hash index entries for `bundle`'s tx hash. Used + /// by both `remove_blob_bundle` (which also drops the bundle) and + /// `remove_included_transaction` (which MOVES the bundle into limbo and + /// therefore must clear the index separately to avoid an extra clone of + /// the full sidecar). + fn clear_blob_versioned_hash_index(&mut self, hash: &H256, bundle: &BlobsBundle) { + for commitment in &bundle.commitments { let versioned_hash = kzg_commitment_to_versioned_hash(commitment); if let Entry::Occupied(mut entry) = self.blobs_bundle_by_versioned_hash.entry(versioned_hash) @@ -91,6 +101,14 @@ impl MempoolInner { } } + /// Remove a blobs bundle from the pool + pub fn remove_blob_bundle(&mut self, hash: &H256) { + let Some(bundle) = self.blobs_bundle_pool.remove(hash) else { + return; + }; + self.clear_blob_versioned_hash_index(hash, &bundle); + } + /// Remove the oldest transaction in the pool fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> { // Remove elements from the order queue until one is present in the pool @@ -141,6 +159,16 @@ impl Mempool { .map_err(|error| StoreError::MempoolReadLock(error.to_string())) } + /// Returns `true` if the pool is at capacity. Callers can use this to + /// decide whether to admit a new tx (and trigger oldest-first eviction) + /// or to skip admission entirely. Used by reorg re-injection so a deep + /// reorg doesn't evict freshly-arrived txs to make room for orphaned + /// ones. + pub fn is_full(&self) -> Result { + let inner = self.read()?; + Ok(inner.transaction_pool.len() >= inner.max_mempool_size) + } + /// Add transaction to the pool without doing validity checks pub fn add_transaction( &self, @@ -227,6 +255,68 @@ impl Mempool { Ok(()) } + /// Remove a transaction from the pool that was just included in a block. + /// For blob (EIP-4844) transactions, the sidecar is MOVED from the active + /// `blobs_bundle_pool` into `blobs_bundle_limbo` rather than cloned, so + /// it remains available for re-injection if a reorg orphans the block. + /// Moving avoids cloning the full sidecar (~800 KB worst case) on every + /// blob-tx inclusion. + /// + /// Returns whether the transaction was present in the mempool. + pub fn remove_included_transaction(&self, hash: &H256) -> Result { + let mut inner = self.write()?; + let was_present = inner.transaction_pool.contains_key(hash); + // Detach the bundle (if any) from `blobs_bundle_pool` without + // cloning, clear its versioned-hash index entries, then park the + // bundle in limbo. The subsequent `remove_transaction_with_lock` + // call observes an empty `blobs_bundle_pool` for this hash and + // doesn't try to remove or re-iterate the bundle. + if let Some(bundle) = inner.blobs_bundle_pool.remove(hash) { + inner.clear_blob_versioned_hash_index(hash, &bundle); + inner.blobs_bundle_limbo.insert(*hash, bundle); + } + inner.remove_transaction_with_lock(hash)?; + Ok(was_present) + } + + /// Purge a set of transaction-hash blob sidecars from the limbo. + /// Called when a block is finalized; sidecars for txs in finalized + /// blocks are no longer needed because finalized blocks cannot be reorged. + pub fn purge_blob_limbo_entries(&self, tx_hashes: &[H256]) -> Result<(), StoreError> { + let mut inner = self.write()?; + for hash in tx_hashes { + inner.blobs_bundle_limbo.remove(hash); + } + Ok(()) + } + + /// Pop a blob sidecar from limbo by tx hash. Returns the bundle if it was + /// present. Used by reorg re-injection to recover sidecars for orphaned + /// blob transactions. + pub fn take_blob_limbo_entry(&self, tx_hash: &H256) -> Result, StoreError> { + let mut inner = self.write()?; + Ok(inner.blobs_bundle_limbo.remove(tx_hash)) + } + + /// Reinsert a previously-taken blob sidecar back into limbo. Used by + /// reorg re-injection to put the sidecar back when admission of the + /// orphaned blob tx fails (so it can be retried on a subsequent + /// re-injection attempt rather than being permanently lost). + pub fn insert_blob_limbo_entry( + &self, + tx_hash: H256, + bundle: BlobsBundle, + ) -> Result<(), StoreError> { + let mut inner = self.write()?; + inner.blobs_bundle_limbo.insert(tx_hash, bundle); + Ok(()) + } + + /// Returns the current number of blob sidecars held in limbo. + pub fn blob_limbo_size(&self) -> Result { + Ok(self.read()?.blobs_bundle_limbo.len()) + } + /// Applies the filter and returns a set of suitable transactions from the mempool. /// These transactions will be grouped by sender and sorted by nonce pub fn filter_transactions( diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index c59e9b72f75..2636c2b6e96 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -271,6 +271,25 @@ async fn handle_forkchoice( return Ok((None, PayloadStatus::syncing().into())); } + // Snapshot the previous head and previously-finalized number before applying + // the new fork choice. We use the previous head later to detect reorgs and + // re-inject orphaned-block transactions into the mempool; we use the previous + // finalized number to decide which blob sidecars are safe to purge from the + // limbo. + let previous_head_hash = context + .storage + .get_latest_canonical_block_hash() + .await? + .unwrap_or_default(); + let previous_finalized_hash = match context.storage.get_finalized_block_number().await? { + Some(number) => context + .storage + .get_canonical_block_hash(number) + .await? + .unwrap_or_default(), + None => Default::default(), + }; + match apply_fork_choice( &context.storage, fork_choice_state.head_block_hash, @@ -282,8 +301,9 @@ async fn handle_forkchoice( Ok(head) => { // Fork Choice was succesful, the node is up to date with the current chain context.blockchain.set_synced(); - // Remove included transactions from the mempool after we accept the fork choice - // TODO(#797): The remove of transactions from the mempool could be incomplete (i.e. REORGS) + // Remove included transactions from the mempool after we accept the fork choice. + // Blob sidecars for included blob txs are moved to the mempool limbo so + // they survive a potential reorg until the block is finalized. match context.storage.get_block_by_hash(head.hash()).await { Ok(Some(block)) => { // Remove executed transactions from mempool @@ -304,6 +324,59 @@ async fn handle_forkchoice( } }; + // Re-inject any transactions orphaned by a reorg back into the mempool. + // This is best-effort and is spawned off the FCU response path so the + // CL gets its acknowledgment immediately. A near-cap reorg of ~10K + // orphaned txs would otherwise add ~1+ second of ECDSA recovery + KZG + // re-verification to the engine API response (well under the 8s spec + // timeout but tight enough to destabilize CL timing budgets). + // + // Fast-path: in the normal slot-advance case the new head's parent + // IS the previous head, i.e. no reorg. Skip the spawn (and its 3 + // header-by-hash DB reads) to keep typical FCUs lean. Genuine + // reorgs — where the new head's parent differs from the previous + // head — still fall through to the spawned re-injection. + if !previous_head_hash.is_zero() + && previous_head_hash != head.hash() + && head.parent_hash != previous_head_hash + { + let blockchain = context.blockchain.clone(); + let new_head_hash = head.hash(); + tokio::spawn(async move { + match blockchain + .reinject_orphaned_transactions(previous_head_hash, new_head_hash) + .await + { + Ok(0) => {} + Ok(count) => { + debug!( + reinjected = count, + "Re-injected orphaned transactions into mempool" + ); + } + Err(error) => { + debug!(%error, "Failed to re-inject orphaned transactions"); + } + } + }); + } + + // Purge blob sidecars from the limbo for transactions in newly-finalized + // blocks. Finalized blocks cannot be reorged, so we no longer need to + // retain their sidecars. + if !fork_choice_state.finalized_block_hash.is_zero() + && fork_choice_state.finalized_block_hash != previous_finalized_hash + && let Err(error) = context + .blockchain + .purge_finalized_blob_limbo( + previous_finalized_hash, + fork_choice_state.finalized_block_hash, + ) + .await + { + debug!(%error, "Failed to purge finalized blob limbo entries"); + } + // Notify all eth_subscribe("newHeads") subscribers. if let Some(ws) = &context.ws { let _ = ws.subscription_manager.new_head(head.clone()); diff --git a/docs/CLI.md b/docs/CLI.md index 782b9d3bd1b..356e525268f 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -91,6 +91,12 @@ Node options: [env: ETHREX_MEMPOOL_MAX_SIZE=] [default: 10000] + --mempool.reorg-depth + Maximum reorg depth (in blocks) for which transactions in orphaned blocks are re-injected into the mempool. Deeper reorgs skip re-injection. + + [env: ETHREX_MEMPOOL_REORG_DEPTH=] + [default: 64] + --precompute-witnesses Once synced, computes execution witnesses upon receiving newPayload messages and stores them in local storage @@ -317,6 +323,12 @@ Node options: [env: ETHREX_MEMPOOL_MAX_SIZE=] [default: 10000] + --mempool.reorg-depth + Maximum reorg depth (in blocks) for which transactions in orphaned blocks are re-injected into the mempool. Deeper reorgs skip re-injection. + + [env: ETHREX_MEMPOOL_REORG_DEPTH=] + [default: 64] + P2P options: --bootnodes ... Comma separated enode URLs for P2P discovery bootstrap. diff --git a/test/tests/blockchain/mod.rs b/test/tests/blockchain/mod.rs index 519561062ae..ca2ced36bea 100644 --- a/test/tests/blockchain/mod.rs +++ b/test/tests/blockchain/mod.rs @@ -2,4 +2,5 @@ mod batch_tests; mod eip7702_revert_authority_tests; mod eip7702_zero_transfer_tests; mod mempool_tests; +mod reorg_reinjection_tests; mod smoke_tests; diff --git a/test/tests/blockchain/reorg_reinjection_tests.rs b/test/tests/blockchain/reorg_reinjection_tests.rs new file mode 100644 index 00000000000..12f2e93a567 --- /dev/null +++ b/test/tests/blockchain/reorg_reinjection_tests.rs @@ -0,0 +1,476 @@ +//! Tests for mempool re-injection of transactions orphaned by chain reorgs. +//! +//! These tests exercise the helpers directly (common-ancestor walk, +//! orphaned-transaction collection, blob limbo) without spinning up a full +//! FCU pipeline. An end-to-end test driven by `forkchoiceUpdated` would need +//! complete block execution against multiple competing chains and is left +//! for follow-up fixture work — these tests cover the load-bearing logic. + +use ethrex_blockchain::{ + Blockchain, BlockchainOptions, + fork_choice::{collect_orphaned_transactions, find_common_ancestor}, + mempool::Mempool, +}; +use ethrex_common::{ + Address, Bytes, H160, H256, U256, + types::{ + BYTES_PER_BLOB, BlobsBundle, Block, BlockBody, BlockHeader, EIP1559Transaction, + EIP4844Transaction, MempoolTransaction, Transaction, TxKind, + }, +}; +use ethrex_storage::{EngineType, Store}; + +const MEMPOOL_MAX_SIZE_TEST: usize = 10_000; + +/// Build an empty BlockBody with a given list of transactions. +fn body_with_txs(transactions: Vec) -> BlockBody { + BlockBody { + transactions, + ommers: Vec::new(), + withdrawals: Some(Vec::new()), + } +} + +/// Build a minimal EIP-1559 transaction with a deterministic nonce. +fn make_eip1559_tx(nonce: u64) -> Transaction { + Transaction::EIP1559Transaction(EIP1559Transaction { + nonce, + max_priority_fee_per_gas: 1, + max_fee_per_gas: 1, + gas_limit: 21_000, + to: TxKind::Call(Address::from_low_u64_be(1)), + value: U256::zero(), + data: Bytes::default(), + access_list: Default::default(), + ..Default::default() + }) +} + +/// Insert a chain of empty headers descending from `parent` into the store +/// without any state-root checks. The headers form a linear chain that can +/// be walked by `find_common_ancestor`. +async fn insert_chain( + store: &Store, + parent: &BlockHeader, + count: u64, + branch_marker: u8, +) -> Vec { + let mut chain = Vec::with_capacity(count as usize); + let mut prev = parent.clone(); + for offset in 1..=count { + let header = BlockHeader { + parent_hash: prev.hash(), + number: prev.number + 1, + timestamp: prev.timestamp + 12, + // Mix in branch + offset so blocks on competing branches have + // distinct hashes even when they share the same height. + extra_data: Bytes::from(vec![branch_marker, offset as u8]), + ..Default::default() + }; + let hash = header.hash(); + store + .add_block_header(hash, header.clone()) + .await + .expect("add header"); + // We add an empty body for every header so `get_block_by_hash` works. + let body = body_with_txs(Vec::new()); + store + .add_block_body(hash, body) + .await + .expect("add empty body"); + chain.push(header.clone()); + prev = header; + } + chain +} + +/// Insert a Block (header + body) into the store. +async fn insert_block(store: &Store, block: Block) { + let hash = block.hash(); + store + .add_block_header(hash, block.header.clone()) + .await + .expect("add header"); + store + .add_block_body(hash, block.body) + .await + .expect("add body"); +} + +async fn fresh_store() -> (Store, BlockHeader) { + let store = Store::new("test_store", EngineType::InMemory).expect("create store"); + let genesis_header = BlockHeader { + number: 0, + timestamp: 0, + ..Default::default() + }; + let genesis_hash = genesis_header.hash(); + store + .add_block_header(genesis_hash, genesis_header.clone()) + .await + .expect("add genesis header"); + store + .add_block_body(genesis_hash, body_with_txs(Vec::new())) + .await + .expect("add genesis body"); + store + .forkchoice_update(vec![], 0, genesis_hash, None, None) + .await + .expect("genesis forkchoice"); + (store, genesis_header) +} + +#[tokio::test] +async fn common_ancestor_same_block_returns_zero_depth() { + let (store, genesis) = fresh_store().await; + let branches = find_common_ancestor(&store, genesis.hash(), genesis.hash()) + .await + .expect("find ancestor") + .expect("ancestor present"); + assert_eq!(branches.common_ancestor_hash, genesis.hash()); + assert!(branches.orphaned.is_empty()); + assert!(branches.new_canonical.is_empty()); + assert_eq!(branches.depth(), 0); +} + +#[tokio::test] +async fn common_ancestor_finds_shared_block_with_equal_height_branches() { + let (store, genesis) = fresh_store().await; + let chain_a = insert_chain(&store, &genesis, 3, 0xa).await; + let chain_b = insert_chain(&store, &genesis, 3, 0xb).await; + let prev_head = chain_a.last().unwrap().hash(); + let new_head = chain_b.last().unwrap().hash(); + + let branches = find_common_ancestor(&store, prev_head, new_head) + .await + .expect("find ancestor") + .expect("ancestor present"); + + assert_eq!(branches.common_ancestor_hash, genesis.hash()); + assert_eq!(branches.common_ancestor_number, 0); + assert_eq!(branches.orphaned.len(), 3); + assert_eq!(branches.new_canonical.len(), 3); + assert_eq!(branches.depth(), 3); + + // Branches must be ordered oldest -> newest. + assert_eq!(branches.orphaned[0].1, chain_a[0].hash()); + assert_eq!(branches.orphaned[2].1, chain_a[2].hash()); + assert_eq!(branches.new_canonical[0].1, chain_b[0].hash()); + assert_eq!(branches.new_canonical[2].1, chain_b[2].hash()); +} + +#[tokio::test] +async fn common_ancestor_handles_unequal_height_branches() { + // Old chain: genesis -> A1 -> A2. + // New chain: genesis -> A1 -> B2 -> B3. + // Shared prefix is A1, depth is 1 (only A2 is orphaned). + let (store, genesis) = fresh_store().await; + let chain_a = insert_chain(&store, &genesis, 2, 0xa).await; + let a1 = chain_a[0].clone(); + let chain_b = insert_chain(&store, &a1, 2, 0xb).await; + + let prev_head = chain_a[1].hash(); + let new_head = chain_b[1].hash(); + + let branches = find_common_ancestor(&store, prev_head, new_head) + .await + .expect("find ancestor") + .expect("ancestor present"); + + assert_eq!(branches.common_ancestor_hash, a1.hash()); + assert_eq!(branches.common_ancestor_number, 1); + assert_eq!(branches.orphaned.len(), 1); + assert_eq!(branches.new_canonical.len(), 2); +} + +#[tokio::test] +async fn collect_orphaned_subtracts_transactions_present_in_new_canonical() { + // Old chain genesis -> A1 (containing tx_x, tx_y). + // New chain genesis -> B1 (containing tx_y, tx_z). + // We expect only tx_x to be flagged for re-injection because tx_y is now + // canonical and tx_z was already canonical from the start. + let (store, genesis) = fresh_store().await; + + let tx_x = make_eip1559_tx(0); + let tx_y = make_eip1559_tx(1); + let tx_z = make_eip1559_tx(2); + + let header_a = BlockHeader { + parent_hash: genesis.hash(), + number: 1, + timestamp: genesis.timestamp + 12, + extra_data: Bytes::from(vec![0xa, 0x1]), + ..Default::default() + }; + let block_a = Block::new(header_a, body_with_txs(vec![tx_x.clone(), tx_y.clone()])); + let header_b = BlockHeader { + parent_hash: genesis.hash(), + number: 1, + timestamp: genesis.timestamp + 12, + extra_data: Bytes::from(vec![0xb, 0x1]), + ..Default::default() + }; + let block_b = Block::new(header_b, body_with_txs(vec![tx_y.clone(), tx_z.clone()])); + + let block_a_hash = block_a.hash(); + let block_b_hash = block_b.hash(); + insert_block(&store, block_a).await; + insert_block(&store, block_b).await; + + let branches = find_common_ancestor(&store, block_a_hash, block_b_hash) + .await + .expect("find ancestor") + .expect("ancestor present"); + + let orphaned = collect_orphaned_transactions(&store, &branches) + .await + .expect("collect"); + assert_eq!(orphaned.len(), 1); + assert_eq!(orphaned[0].hash(), tx_x.hash()); +} + +#[tokio::test] +async fn reinject_skips_when_reorg_depth_exceeds_cap() { + // Build two chains of depth 4 off of the genesis and set a depth cap of 2. + // We expect re-injection to be skipped entirely (returns 0). + let (store, genesis) = fresh_store().await; + let chain_a = insert_chain(&store, &genesis, 4, 0xa).await; + let chain_b = insert_chain(&store, &genesis, 4, 0xb).await; + + let opts = BlockchainOptions { + reorg_depth: 2, + ..Default::default() + }; + let blockchain = Blockchain::new(store.clone(), opts); + + let reinjected = blockchain + .reinject_orphaned_transactions( + chain_a.last().unwrap().hash(), + chain_b.last().unwrap().hash(), + ) + .await + .expect("reinject"); + assert_eq!(reinjected, 0); +} + +#[tokio::test] +async fn reinject_is_best_effort_when_admission_fails() { + // Build a tiny reorg: genesis -> A1 (contains tx with nonce=0), no new chain. + // Re-injecting an EIP-4844 transaction without a sidecar should silently + // skip it. We use this to assert no error is propagated and the other tx + // still re-injects. + let (store, genesis) = fresh_store().await; + let plain_tx = make_eip1559_tx(0); + + // Build a fake blob tx (no actual sidecar in the limbo). + let blob_tx_inner = EIP4844Transaction { + nonce: 0, + max_priority_fee_per_gas: 1, + max_fee_per_gas: 1, + max_fee_per_blob_gas: 1.into(), + gas: 21_000, + to: Address::from_low_u64_be(1), + ..Default::default() + }; + let blob_tx = Transaction::EIP4844Transaction(blob_tx_inner); + + let header_a = BlockHeader { + parent_hash: genesis.hash(), + number: 1, + timestamp: genesis.timestamp + 12, + gas_limit: 100_000_000, + extra_data: Bytes::from(vec![0xa, 0x1]), + ..Default::default() + }; + let block_a = Block::new( + header_a, + body_with_txs(vec![plain_tx.clone(), blob_tx.clone()]), + ); + let block_a_hash = block_a.hash(); + insert_block(&store, block_a).await; + + let header_b = BlockHeader { + parent_hash: genesis.hash(), + number: 1, + timestamp: genesis.timestamp + 12, + gas_limit: 100_000_000, + extra_data: Bytes::from(vec![0xb, 0x1]), + ..Default::default() + }; + let block_b_number = header_b.number; + let block_b = Block::new(header_b, body_with_txs(Vec::new())); + let block_b_hash = block_b.hash(); + insert_block(&store, block_b).await; + + // Make block B the canonical head so admission validation can read a + // header. We keep the default chain config from `Store::new`. + store + .forkchoice_update( + vec![(block_b_number, block_b_hash)], + block_b_number, + block_b_hash, + None, + None, + ) + .await + .expect("set canonical to B"); + + let blockchain = Blockchain::default_with_store(store.clone()); + let reinjected = blockchain + .reinject_orphaned_transactions(block_a_hash, block_b_hash) + .await + .expect("reinject"); + + // The blob tx must be skipped (no sidecar) and the plain tx must also + // fail admission (the sender has no balance on this minimal store). Both + // failures are handled best-effort: zero txs land in the pool, no error + // propagates, and no panic. + assert_eq!( + reinjected, 0, + "best-effort path must report 0 re-injections when both txs fail admission", + ); + // The blob limbo was empty going in and the no-sidecar code path consumes + // nothing, so a follow-up re-injection attempt would still see an empty + // limbo. Calling it a second time with the same arguments must also yield + // 0, confirming the path is idempotent and side-effect-free. + let reinjected_again = blockchain + .reinject_orphaned_transactions(block_a_hash, block_b_hash) + .await + .expect("reinject (second call)"); + assert_eq!(reinjected_again, 0); +} + +#[test] +fn included_transaction_moves_blob_sidecar_to_limbo() { + let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); + + // Construct a minimal blob tx + bundle. + let tx = EIP4844Transaction { + nonce: 1, + max_priority_fee_per_gas: 1, + max_fee_per_gas: 1, + max_fee_per_blob_gas: 1.into(), + gas: 21_000, + to: Address::from_low_u64_be(1), + ..Default::default() + }; + let tx = Transaction::EIP4844Transaction(tx); + let sender = H160::random(); + let hash = tx.hash(); + let bundle = BlobsBundle { + blobs: vec![[0u8; BYTES_PER_BLOB]], + commitments: vec![[0u8; 48]], + proofs: vec![[0u8; 48]], + version: 0, + }; + + mempool + .add_blobs_bundle(hash, bundle.clone()) + .expect("add bundle"); + mempool + .add_transaction(hash, sender, MempoolTransaction::new(tx, sender)) + .expect("add tx"); + + // Before inclusion: sidecar is in the active pool, limbo is empty. + assert!(mempool.get_blobs_bundle(hash).expect("get").is_some()); + assert_eq!(mempool.blob_limbo_size().expect("size"), 0); + + // Simulate block inclusion: remove the tx via the inclusion-aware path. + let was_present = mempool + .remove_included_transaction(&hash) + .expect("remove included"); + assert!(was_present); + + // Sidecar should have moved to limbo. + assert!(mempool.get_blobs_bundle(hash).expect("get").is_none()); + assert_eq!(mempool.blob_limbo_size().expect("size"), 1); + + // take_blob_limbo_entry returns it (and removes from limbo). + let recovered = mempool + .take_blob_limbo_entry(&hash) + .expect("take") + .expect("present"); + assert_eq!(recovered.commitments, bundle.commitments); + assert_eq!(mempool.blob_limbo_size().expect("size"), 0); +} + +#[test] +fn purge_blob_limbo_entries_drops_sidecars() { + let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); + let tx = EIP4844Transaction { + nonce: 2, + max_priority_fee_per_gas: 1, + max_fee_per_gas: 1, + max_fee_per_blob_gas: 1.into(), + gas: 21_000, + to: Address::from_low_u64_be(1), + ..Default::default() + }; + let tx = Transaction::EIP4844Transaction(tx); + let sender = H160::random(); + let hash = tx.hash(); + let bundle = BlobsBundle { + blobs: vec![[1u8; BYTES_PER_BLOB]], + commitments: vec![[1u8; 48]], + proofs: vec![[1u8; 48]], + version: 0, + }; + mempool.add_blobs_bundle(hash, bundle).expect("add bundle"); + mempool + .add_transaction(hash, sender, MempoolTransaction::new(tx, sender)) + .expect("add tx"); + mempool + .remove_included_transaction(&hash) + .expect("remove included"); + assert_eq!(mempool.blob_limbo_size().expect("size"), 1); + + mempool.purge_blob_limbo_entries(&[hash]).expect("purge"); + assert_eq!(mempool.blob_limbo_size().expect("size"), 0); + // Subsequent take should return None. + assert!( + mempool + .take_blob_limbo_entry(&hash) + .expect("take") + .is_none() + ); +} + +#[test] +fn purge_unrelated_hashes_is_noop() { + let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); + // Limbo starts empty; purging arbitrary hashes is fine. + mempool + .purge_blob_limbo_entries(&[H256::random(), H256::random()]) + .expect("purge"); + assert_eq!(mempool.blob_limbo_size().expect("size"), 0); +} + +#[test] +fn mempool_is_full_gates_capacity() { + // `Mempool::is_full` is the public accessor used by reorg re-injection + // to decide whether to skip rather than evict freshly-arrived txs to + // make room for orphaned ones. Verify the accessor flips at capacity. + let mempool = Mempool::new(2); + assert!(!mempool.is_full().expect("is_full")); + + let sender = Address::from_low_u64_be(1); + let tx_a = make_eip1559_tx(0); + let tx_b = make_eip1559_tx(1); + + let hash_a = tx_a.hash(); + let mempool_tx_a = MempoolTransaction::new(tx_a, sender); + mempool + .add_transaction(hash_a, sender, mempool_tx_a) + .expect("add A"); + assert!(!mempool.is_full().expect("is_full after 1")); + + let hash_b = tx_b.hash(); + let mempool_tx_b = MempoolTransaction::new(tx_b, sender); + mempool + .add_transaction(hash_b, sender, mempool_tx_b) + .expect("add B"); + assert!( + mempool.is_full().expect("is_full at capacity"), + "is_full must report true once transaction_pool reaches max_mempool_size", + ); +}