Skip to content
Open
14 changes: 13 additions & 1 deletion cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand};
use ethrex_blockchain::{
BlockchainOptions, BlockchainType, L2Config,
BlockchainOptions, BlockchainType, DEFAULT_MEMPOOL_REORG_DEPTH, L2Config,
error::{ChainError, InvalidBlockError},
};
use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body};
Expand Down Expand Up @@ -183,6 +183,15 @@ pub struct Options {
env = "ETHREX_MEMPOOL_MAX_SIZE"
)]
pub mempool_max_size: usize,
#[arg(
help = "Maximum reorg depth (in blocks) for which transactions in orphaned blocks are re-injected into the mempool. Deeper reorgs skip re-injection.",
long = "mempool.reorg-depth",
default_value_t = DEFAULT_MEMPOOL_REORG_DEPTH,
value_name = "MEMPOOL_REORG_DEPTH",
help_heading = "Node options",
env = "ETHREX_MEMPOOL_REORG_DEPTH"
)]
pub mempool_reorg_depth: u64,
#[arg(
long = "http.addr",
default_value = "0.0.0.0",
Expand Down Expand Up @@ -392,6 +401,7 @@ impl Options {
discv4_enabled: true,
discv5_enabled: true,
mempool_max_size: 10_000,
mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
..Default::default()
}
}
Expand All @@ -414,6 +424,7 @@ impl Options {
discv4_enabled: true,
discv5_enabled: true,
mempool_max_size: 10_000,
mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
..Default::default()
}
}
Expand Down Expand Up @@ -450,6 +461,7 @@ impl Default for Options {
dev: Default::default(),
force: false,
mempool_max_size: Default::default(),
mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
tx_broadcasting_time_interval: Default::default(),
target_peers: Default::default(),
lookup_interval: Default::default(),
Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ pub async fn init_l1(
max_blobs_per_block: opts.max_blobs_per_block,
precompute_witnesses: opts.precompute_witnesses,
precompile_cache_enabled: !opts.no_precompile_cache,
reorg_depth: opts.mempool_reorg_depth,
},
);

Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ pub async fn init_l2(
max_blobs_per_block: None, // L2 doesn't support blob transactions
precompute_witnesses: opts.node_opts.precompute_witnesses,
precompile_cache_enabled: true,
reorg_depth: opts.node_opts.mempool_reorg_depth,
};

let blockchain = init_blockchain(store.clone(), blockchain_opts.clone());
Expand Down
207 changes: 205 additions & 2 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ use ethrex_common::types::BlobsBundle;
const MAX_PAYLOADS: usize = 10;
const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;

/// Maximum reorg depth (in blocks) for which orphaned-block transactions are
/// re-injected into the mempool. Reorgs deeper than this are treated as
/// catastrophic — re-injection is skipped to bound the amount of work done
/// during fork-choice processing.
pub const DEFAULT_MEMPOOL_REORG_DEPTH: u64 = 64;

/// Background thread for dropping large tree structures off the critical path.
/// Accepts any `Send` value and drops it on a dedicated thread, avoiding
/// recursive deallocation costs (~500us for state trie roots) on hot paths.
Expand Down Expand Up @@ -231,6 +237,10 @@ pub struct BlockchainOptions {
/// warmer thread and the executor. Set to false (via `--no-precompile-cache`) to
/// disable the cache for benchmarking purposes.
pub precompile_cache_enabled: bool,
/// Maximum reorg depth (in blocks) for which transactions in orphaned
/// blocks are re-injected into the mempool. Reorgs deeper than this skip
/// re-injection and log a warning.
pub reorg_depth: u64,
}

impl Default for BlockchainOptions {
Expand All @@ -242,6 +252,7 @@ impl Default for BlockchainOptions {
max_blobs_per_block: None,
precompute_witnesses: false,
precompile_cache_enabled: true,
reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
}
}
}
Expand Down Expand Up @@ -2374,10 +2385,202 @@ impl Blockchain {
self.mempool.remove_transaction(hash)
}

/// Remove all transactions in the executed block from the pool (if we have them)
/// Remove all transactions in the executed block from the pool (if we have them).
///
/// For blob (EIP-4844) transactions, the sidecars are moved into the mempool
/// limbo rather than discarded, so they remain available for re-injection if
/// the block gets orphaned by a reorg. Limbo entries are purged once the
/// block is finalized.
pub fn remove_block_transactions_from_pool(&self, block: &Block) -> Result<(), StoreError> {
for tx in &block.body.transactions {
self.mempool.remove_transaction(&tx.hash())?;
self.mempool.remove_included_transaction(&tx.hash())?;
}
Ok(())
}

/// Re-inject transactions that were in orphaned blocks back into the mempool
/// after a reorg, with full admission re-validation.
///
/// Behaviour:
/// - If the reorg depth exceeds `options.reorg_depth`, the entire re-injection
/// is skipped and a warning is logged.
/// - Each transaction is admitted via the same path as a freshly-submitted tx
/// (`add_transaction_to_pool` / `add_blob_transaction_to_pool`), so all
/// stateful checks (nonce/balance under the new canonical head) are applied.
/// - For blob transactions, the sidecar is pulled from the mempool limbo. If
/// the sidecar is missing (e.g. the node was restarted between block
/// inclusion and the reorg), the blob tx is dropped silently — there is no
/// way to reconstruct sidecars.
/// - Failures are logged at debug level and the loop continues; re-injection
/// is strictly best-effort.
///
/// Returns the number of transactions actually re-injected.
pub async fn reinject_orphaned_transactions(
&self,
previous_head_hash: H256,
new_head_hash: H256,
) -> Result<usize, StoreError> {
let Some(branches) =
fork_choice::find_common_ancestor(&self.storage, previous_head_hash, new_head_hash)
.await?
else {
debug!(
previous_head = %previous_head_hash,
new_head = %new_head_hash,
"Could not compute common ancestor for reorg re-injection; skipping",
);
return Ok(0);
};

let depth = branches.depth();
if depth == 0 {
// Not a reorg (e.g. the new head was a descendant of the previous head).
return Ok(0);
}

if depth > self.options.reorg_depth {
warn!(
depth,
cap = self.options.reorg_depth,
previous_head = %previous_head_hash,
new_head = %new_head_hash,
"Reorg deeper than configured cap; skipping mempool re-injection",
);
return Ok(0);
}

let txs = fork_choice::collect_orphaned_transactions(&self.storage, &branches).await?;
info!(
depth,
orphaned_tx_count = txs.len(),
previous_head = %previous_head_hash,
new_head = %new_head_hash,
"Reorg detected; re-injecting orphaned transactions into mempool",
);

let mut reinjected = 0usize;
let mut skipped_full = 0usize;
for tx in txs {
// Don't evict freshly-arrived (post-reorg) txs to make room for
// orphaned ones. If the pool is already at capacity, stop
// re-injecting and log how many were skipped.
if self.mempool.is_full()? {
skipped_full += 1;
continue;
}
let tx_hash = tx.hash();
match tx {
#[cfg(feature = "c-kzg")]
Transaction::EIP4844Transaction(inner) => {
let Some(blobs_bundle) = self.mempool.take_blob_limbo_entry(&tx_hash)? else {
debug!(
tx_hash = %tx_hash,
"Blob sidecar missing from limbo; cannot re-inject orphaned EIP-4844 tx",
);
continue;
};
// Clone the bundle so we can re-insert it into limbo if
// admission fails for a transient reason (pool full,
// replacement rules, etc.). Otherwise the sidecar would
// be lost permanently and the blob tx could never be
// re-injected on a later attempt.
let bundle_backup = blobs_bundle.clone();
match self.add_blob_transaction_to_pool(inner, blobs_bundle).await {
Ok(_) => reinjected += 1,
Err(error) => {
debug!(
tx_hash = %tx_hash,
%error,
"Failed to re-inject orphaned EIP-4844 transaction; restoring sidecar to limbo",
);
self.mempool
.insert_blob_limbo_entry(tx_hash, bundle_backup)?;
}
}
Comment on lines +2474 to +2499
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
#[cfg(not(feature = "c-kzg"))]
Transaction::EIP4844Transaction(_) => {
debug!(
tx_hash = %tx_hash,
"Skipping orphaned EIP-4844 transaction (c-kzg feature disabled)",
);
}
other => match self.add_transaction_to_pool(other).await {
Ok(_) => reinjected += 1,
Err(error) => {
debug!(
tx_hash = %tx_hash,
%error,
"Failed to re-inject orphaned transaction",
);
}
},
}
}
if skipped_full > 0 {
warn!(
skipped = skipped_full,
reinjected,
"Mempool reached capacity during reorg re-injection; remaining orphaned txs dropped to preserve freshly-arrived ones",
);
}
Ok(reinjected)
}

/// Purge blob sidecars from the mempool limbo for all transactions in
/// blocks between (exclusive) the previously-finalized block and (inclusive)
/// the newly-finalized block.
///
/// Once a block is finalized it cannot be reorged, so we no longer need to
/// hold on to its blob sidecars. `previous_finalized_hash` may be zero on
/// the very first finalization advance, in which case purging walks every
/// canonical block from genesis up to the new finalized height.
///
/// The walk is by canonical block number, not by parent_hash chasing,
/// because the input is a CL-supplied finalized hash (already part of the
/// canonical chain). This removes the previous `reorg_depth` cap, which
/// would leak sidecars whenever a finalization gap exceeded the cap
/// (fresh-node first finalization, CL downtime catch-up, etc.).
pub async fn purge_finalized_blob_limbo(
&self,
previous_finalized_hash: H256,
new_finalized_hash: H256,
) -> Result<(), StoreError> {
if new_finalized_hash.is_zero() || previous_finalized_hash == new_finalized_hash {
return Ok(());
}

let Some(new_header) = self.storage.get_block_header_by_hash(new_finalized_hash)? else {
return Ok(());
};
let start_number = if previous_finalized_hash.is_zero() {
0
} else {
match self
.storage
.get_block_header_by_hash(previous_finalized_hash)?
{
Some(header) => header.number.saturating_add(1),
// Previous finalized hash isn't known to this store (sync gap?).
// Fall back to a full walk from genesis so we still drain stale
// limbo entries.
None => 0,
}
};

let mut hashes_to_purge: Vec<H256> = Vec::new();
for number in start_number..=new_header.number {
let Some(block_hash) = self.storage.get_canonical_block_hash(number).await? else {
continue;
};
if let Some(body) = self.storage.get_block_body_by_hash(block_hash).await? {
for tx in body.transactions {
hashes_to_purge.push(tx.hash());
}
}
}
if !hashes_to_purge.is_empty() {
self.mempool.purge_blob_limbo_entries(&hashes_to_purge)?;
}
Ok(())
}
Expand Down
Loading
Loading