Skip to content
Open
14 changes: 13 additions & 1 deletion cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand};
use ethrex_blockchain::{
BlockchainOptions, BlockchainType, L2Config,
BlockchainOptions, BlockchainType, DEFAULT_MEMPOOL_REORG_DEPTH, L2Config,
error::{ChainError, InvalidBlockError},
};
use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body};
Expand Down Expand Up @@ -183,6 +183,15 @@ pub struct Options {
env = "ETHREX_MEMPOOL_MAX_SIZE"
)]
pub mempool_max_size: usize,
#[arg(
help = "Maximum reorg depth (in blocks) for which transactions in orphaned blocks are re-injected into the mempool. Deeper reorgs skip re-injection.",
long = "mempool.reorg-depth",
default_value_t = DEFAULT_MEMPOOL_REORG_DEPTH,
value_name = "MEMPOOL_REORG_DEPTH",
help_heading = "Node options",
env = "ETHREX_MEMPOOL_REORG_DEPTH"
)]
pub mempool_reorg_depth: u64,
#[arg(
long = "http.addr",
default_value = "0.0.0.0",
Expand Down Expand Up @@ -392,6 +401,7 @@ impl Options {
discv4_enabled: true,
discv5_enabled: true,
mempool_max_size: 10_000,
mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
..Default::default()
}
}
Expand All @@ -414,6 +424,7 @@ impl Options {
discv4_enabled: true,
discv5_enabled: true,
mempool_max_size: 10_000,
mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
..Default::default()
}
}
Expand Down Expand Up @@ -450,6 +461,7 @@ impl Default for Options {
dev: Default::default(),
force: false,
mempool_max_size: Default::default(),
mempool_reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
tx_broadcasting_time_interval: Default::default(),
target_peers: Default::default(),
lookup_interval: Default::default(),
Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ pub async fn init_l1(
max_blobs_per_block: opts.max_blobs_per_block,
precompute_witnesses: opts.precompute_witnesses,
precompile_cache_enabled: !opts.no_precompile_cache,
reorg_depth: opts.mempool_reorg_depth,
},
);

Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ pub async fn init_l2(
max_blobs_per_block: None, // L2 doesn't support blob transactions
precompute_witnesses: opts.node_opts.precompute_witnesses,
precompile_cache_enabled: true,
reorg_depth: opts.node_opts.mempool_reorg_depth,
};

let blockchain = init_blockchain(store.clone(), blockchain_opts.clone());
Expand Down
180 changes: 178 additions & 2 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ use ethrex_common::types::BlobsBundle;
const MAX_PAYLOADS: usize = 10;
const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;

/// Maximum reorg depth (in blocks) for which orphaned-block transactions are
/// re-injected into the mempool. Reorgs deeper than this are treated as
/// catastrophic — re-injection is skipped to bound the amount of work done
/// during fork-choice processing.
pub const DEFAULT_MEMPOOL_REORG_DEPTH: u64 = 64;

/// Background thread for dropping large tree structures off the critical path.
/// Accepts any `Send` value and drops it on a dedicated thread, avoiding
/// recursive deallocation costs (~500us for state trie roots) on hot paths.
Expand Down Expand Up @@ -231,6 +237,10 @@ pub struct BlockchainOptions {
/// warmer thread and the executor. Set to false (via `--no-precompile-cache`) to
/// disable the cache for benchmarking purposes.
pub precompile_cache_enabled: bool,
/// Maximum reorg depth (in blocks) for which transactions in orphaned
/// blocks are re-injected into the mempool. Reorgs deeper than this skip
/// re-injection and log a warning.
pub reorg_depth: u64,
}

impl Default for BlockchainOptions {
Expand All @@ -242,6 +252,7 @@ impl Default for BlockchainOptions {
max_blobs_per_block: None,
precompute_witnesses: false,
precompile_cache_enabled: true,
reorg_depth: DEFAULT_MEMPOOL_REORG_DEPTH,
}
}
}
Expand Down Expand Up @@ -2374,10 +2385,175 @@ impl Blockchain {
self.mempool.remove_transaction(hash)
}

/// Remove all transactions in the executed block from the pool (if we have them)
/// Remove all transactions in the executed block from the pool (if we have them).
///
/// For blob (EIP-4844) transactions, the sidecars are moved into the mempool
/// limbo rather than discarded, so they remain available for re-injection if
/// the block gets orphaned by a reorg. Limbo entries are purged once the
/// block is finalized.
pub fn remove_block_transactions_from_pool(&self, block: &Block) -> Result<(), StoreError> {
for tx in &block.body.transactions {
self.mempool.remove_transaction(&tx.hash())?;
self.mempool.remove_included_transaction(&tx.hash())?;
}
Ok(())
}

/// Re-inject transactions that were in orphaned blocks back into the mempool
/// after a reorg, with full admission re-validation.
///
/// Behaviour:
/// - If the reorg depth exceeds `options.reorg_depth`, the entire re-injection
/// is skipped and a warning is logged.
/// - Each transaction is admitted via the same path as a freshly-submitted tx
/// (`add_transaction_to_pool` / `add_blob_transaction_to_pool`), so all
/// stateful checks (nonce/balance under the new canonical head) are applied.
/// - For blob transactions, the sidecar is pulled from the mempool limbo. If
/// the sidecar is missing (e.g. the node was restarted between block
/// inclusion and the reorg), the blob tx is dropped silently — there is no
/// way to reconstruct sidecars.
/// - Failures are logged at debug level and the loop continues; re-injection
/// is strictly best-effort.
///
/// Returns the number of transactions actually re-injected.
pub async fn reinject_orphaned_transactions(
&self,
previous_head_hash: H256,
new_head_hash: H256,
) -> Result<usize, StoreError> {
let Some(branches) =
fork_choice::find_common_ancestor(&self.storage, previous_head_hash, new_head_hash)
.await?
else {
debug!(
previous_head = %previous_head_hash,
new_head = %new_head_hash,
"Could not compute common ancestor for reorg re-injection; skipping",
);
return Ok(0);
};

let depth = branches.depth();
if depth == 0 {
// Not a reorg (e.g. the new head was a descendant of the previous head).
return Ok(0);
}

if depth > self.options.reorg_depth {
warn!(
depth,
cap = self.options.reorg_depth,
previous_head = %previous_head_hash,
new_head = %new_head_hash,
"Reorg deeper than configured cap; skipping mempool re-injection",
);
return Ok(0);
}

let txs = fork_choice::collect_orphaned_transactions(&self.storage, &branches).await?;
info!(
depth,
orphaned_tx_count = txs.len(),
previous_head = %previous_head_hash,
new_head = %new_head_hash,
"Reorg detected; re-injecting orphaned transactions into mempool",
);

let mut reinjected = 0usize;
for tx in txs {
let tx_hash = tx.hash();
match tx {
#[cfg(feature = "c-kzg")]
Transaction::EIP4844Transaction(inner) => {
let Some(blobs_bundle) = self.mempool.take_blob_limbo_entry(&tx_hash)? else {
debug!(
tx_hash = %tx_hash,
"Blob sidecar missing from limbo; cannot re-inject orphaned EIP-4844 tx",
);
continue;
};
match self.add_blob_transaction_to_pool(inner, blobs_bundle).await {
Ok(_) => reinjected += 1,
Err(error) => {
debug!(
tx_hash = %tx_hash,
%error,
"Failed to re-inject orphaned EIP-4844 transaction",
);
}
}
Comment on lines +2474 to +2499
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
#[cfg(not(feature = "c-kzg"))]
Transaction::EIP4844Transaction(_) => {
debug!(
tx_hash = %tx_hash,
"Skipping orphaned EIP-4844 transaction (c-kzg feature disabled)",
);
}
other => match self.add_transaction_to_pool(other).await {
Ok(_) => reinjected += 1,
Err(error) => {
debug!(
tx_hash = %tx_hash,
%error,
"Failed to re-inject orphaned transaction",
);
}
},
}
}
Ok(reinjected)
}

/// Purge blob sidecars from the mempool limbo for all transactions in
/// blocks between (exclusive) the previously-finalized block and (inclusive)
/// the newly-finalized block.
///
/// Once a block is finalized it cannot be reorged, so we no longer need to
/// hold on to its blob sidecars. `previous_finalized_hash` may be zero on
/// the very first finalization advance, in which case purging walks back
/// until genesis (or as far as block bodies are available).
pub async fn purge_finalized_blob_limbo(
&self,
previous_finalized_hash: H256,
new_finalized_hash: H256,
) -> Result<(), StoreError> {
if new_finalized_hash.is_zero() || previous_finalized_hash == new_finalized_hash {
return Ok(());
}

// Walk from new_finalized back to previous_finalized, collecting tx
// hashes for every block we pass through. We bound the walk by the
// configured reorg depth so a misconfigured / malicious previous_hash
// cannot cause unbounded work.
let max_walk = self.options.reorg_depth;
let mut current_hash = new_finalized_hash;
let mut steps = 0u64;
let mut hashes_to_purge: Vec<H256> = Vec::new();
while current_hash != previous_finalized_hash && !current_hash.is_zero() {
if steps > max_walk {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Off-by-one in limbo purge walk: steps starts at 0 and is incremented after each block is processed, so the guard steps > max_walk first triggers when steps == max_walk + 1, meaning max_walk + 1 blocks are processed before the break. This is inconsistent with the configured reorg_depth cap. Changing to >= means at most max_walk iterations are processed.

Suggested change
let max_walk = self.options.reorg_depth;
let mut current_hash = new_finalized_hash;
let mut steps = 0u64;
let mut hashes_to_purge: Vec<H256> = Vec::new();
while current_hash != previous_finalized_hash && !current_hash.is_zero() {
if steps > max_walk {
let max_walk = self.options.reorg_depth;
let mut current_hash = new_finalized_hash;
let mut steps = 0u64;
let mut hashes_to_purge: Vec<H256> = Vec::new();
while current_hash != previous_finalized_hash && !current_hash.is_zero() {
if steps >= max_walk {
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/blockchain.rs
Line: 2528-2533

Comment:
**Off-by-one in limbo purge walk**: `steps` starts at 0 and is incremented _after_ each block is processed, so the guard `steps > max_walk` first triggers when `steps == max_walk + 1`, meaning `max_walk + 1` blocks are processed before the break. This is inconsistent with the configured `reorg_depth` cap. Changing to `>=` means at most `max_walk` iterations are processed.

```suggestion
        let max_walk = self.options.reorg_depth;
        let mut current_hash = new_finalized_hash;
        let mut steps = 0u64;
        let mut hashes_to_purge: Vec<H256> = Vec::new();
        while current_hash != previous_finalized_hash && !current_hash.is_zero() {
            if steps >= max_walk {
```

How can I resolve this? If you propose a fix, please make it concise.

debug!(
steps,
cap = max_walk,
"Blob limbo purge walked past the reorg-depth cap; stopping",
);
break;
}
let Some(header) = self.storage.get_block_header_by_hash(current_hash)? else {
break;
};
if let Some(body) = self.storage.get_block_body_by_hash(current_hash).await? {
for tx in body.transactions {
hashes_to_purge.push(tx.hash());
}
}
if header.number == 0 {
break;
}
current_hash = header.parent_hash;
steps += 1;
}
if !hashes_to_purge.is_empty() {
self.mempool.purge_blob_limbo_entries(&hashes_to_purge)?;
}
Ok(())
}
Expand Down
Loading
Loading