diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 74a66aab1e..7f7dd0b51c 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -10,7 +10,7 @@ use std::{ use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand}; use ethrex_blockchain::{ - BlockchainOptions, BlockchainType, L2Config, + BlockchainOptions, BlockchainType, DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, L2Config, error::{ChainError, InvalidBlockError}, }; use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body}; @@ -183,6 +183,15 @@ pub struct Options { env = "ETHREX_MEMPOOL_MAX_SIZE" )] pub mempool_max_size: usize, + #[arg( + help = "Maximum number of pending transactions a single sender may hold in the mempool. Replacements at an existing (sender, nonce) bypass this cap.", + long = "mempool.max-pending-txs-per-account", + default_value_t = DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, + value_name = "MAX_PENDING_TXS_PER_ACCOUNT", + help_heading = "Node options", + env = "ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT" + )] + pub mempool_max_pending_txs_per_account: usize, #[arg( long = "http.addr", default_value = "127.0.0.1", @@ -464,6 +473,7 @@ impl Default for Options { dev: Default::default(), force: false, mempool_max_size: Default::default(), + mempool_max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, tx_broadcasting_time_interval: Default::default(), target_peers: Default::default(), lookup_interval: Default::default(), diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 4f76b6eb5c..777d52ec85 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -531,6 +531,7 @@ pub async fn init_l1( max_blobs_per_block: opts.max_blobs_per_block, precompute_witnesses: opts.precompute_witnesses, precompile_cache_enabled: !opts.no_precompile_cache, + max_pending_txs_per_account: opts.mempool_max_pending_txs_per_account, }, ); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 668c1d9d55..0eff4a0ece 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -228,6 +228,7 @@ pub async fn init_l2( max_blobs_per_block: None, // L2 doesn't support blob transactions precompute_witnesses: opts.node_opts.precompute_witnesses, precompile_cache_enabled: true, + max_pending_txs_per_account: opts.node_opts.mempool_max_pending_txs_per_account, }; let blockchain = init_blockchain(store.clone(), blockchain_opts.clone()); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 8f3f301d8f..77a05ba53f 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -229,6 +229,10 @@ pub struct BlockchainOptions { /// warmer thread and the executor. Set to false (via `--no-precompile-cache`) to /// disable the cache for benchmarking purposes. pub precompile_cache_enabled: bool, + /// Maximum number of pending transactions a single sender may hold in + /// the mempool. A replacement at an existing `(sender, nonce)` bypasses + /// this check. + pub max_pending_txs_per_account: usize, } impl Default for BlockchainOptions { @@ -240,10 +244,14 @@ impl Default for BlockchainOptions { max_blobs_per_block: None, precompute_witnesses: false, precompile_cache_enabled: true, + max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, } } } +/// Default per-account pending-tx cap. +pub const DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT: usize = 16; + #[derive(Debug, Clone)] pub struct BatchBlockProcessingFailure { pub last_valid_hash: H256, @@ -2350,8 +2358,12 @@ impl Blockchain { // Add blobs bundle before the transaction so that when add_transaction // notifies payload builders the blob data is already available. self.mempool.add_blobs_bundle(hash, blobs_bundle)?; - self.mempool - .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?; + self.mempool.add_transaction( + hash, + sender, + MempoolTransaction::new(transaction, sender), + self.options.max_pending_txs_per_account, + )?; Ok(hash) } @@ -2387,8 +2399,12 @@ impl Blockchain { } // Add transaction to storage - self.mempool - .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?; + self.mempool.add_transaction( + hash, + sender, + MempoolTransaction::new(transaction, sender), + self.options.max_pending_txs_per_account, + )?; Ok(hash) } @@ -2548,6 +2564,14 @@ impl Blockchain { return Err(MempoolError::InvalidChainId(config.chain_id)); } + // The per-account pending-tx cap is enforced atomically inside + // `Mempool::add_transaction` (under the same write lock as the + // insertion) so concurrent submissions can't both pass a stale + // count check and race past the limit. Replacement candidates + // bypass it implicitly: the caller removes the old tx before + // `add_transaction` runs, so the post-removal count is one + // below the cap and the new insertion stays within it. + Ok(tx_to_replace_hash) } diff --git a/crates/blockchain/error.rs b/crates/blockchain/error.rs index 3bb0ea553f..04eb77af63 100644 --- a/crates/blockchain/error.rs +++ b/crates/blockchain/error.rs @@ -83,6 +83,10 @@ pub enum MempoolError { TxMaxInitCodeSizeError, #[error("Transaction encoded size ({actual} bytes) exceeds the {limit}-byte limit")] TxSizeExceeded { actual: usize, limit: usize }, + #[error( + "Sender has {count} pending transactions; adding a new one would exceed the per-account cap of {limit}" + )] + MaxPendingTxsPerAccountExceeded { count: usize, limit: usize }, #[error("Transaction gas limit exceeded")] TxGasLimitExceededError, #[error( diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index 3db0f80a6f..a4137fe620 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -142,14 +142,33 @@ impl Mempool { .map_err(|error| StoreError::MempoolReadLock(error.to_string())) } - /// Add transaction to the pool without doing validity checks + /// Add transaction to the pool without doing validity checks. + /// + /// Enforces the per-sender pending-tx cap atomically: the count is + /// re-checked under the same write lock that performs the insertion. + /// Replacement candidates (same `(sender, nonce)`) must have already + /// been removed via `remove_transaction` so this counter reflects the + /// post-replacement state. Returns + /// [`MempoolError::MaxPendingTxsPerAccountExceeded`] if the cap would + /// be exceeded. pub fn add_transaction( &self, hash: H256, sender: Address, transaction: MempoolTransaction, - ) -> Result<(), StoreError> { + max_pending_txs_per_account: usize, + ) -> Result<(), MempoolError> { let mut inner = self.write()?; + let count = inner + .txs_by_sender_nonce + .range((sender, 0)..=(sender, u64::MAX)) + .count(); + if count >= max_pending_txs_per_account { + return Err(MempoolError::MaxPendingTxsPerAccountExceeded { + count, + limit: max_pending_txs_per_account, + }); + } // Prune the order queue if it has grown too much if inner.txs_order.len() > inner.mempool_prune_threshold { // NOTE: we do this to avoid borrow checker errors @@ -452,6 +471,17 @@ impl Mempool { Ok(contains) } + /// Returns the number of pending transactions currently held in the + /// mempool for `sender`. Used by the per-sender slot cap at admission. + pub fn count_for_sender(&self, sender: Address) -> Result { + let inner = self.read()?; + let count = inner + .txs_by_sender_nonce + .range((sender, 0)..=(sender, u64::MAX)) + .count(); + Ok(count) + } + pub fn find_tx_to_replace( &self, sender: Address, @@ -599,3 +629,70 @@ pub fn transaction_intrinsic_gas( Ok(gas) } + +#[cfg(test)] +mod tests { + use super::*; + use ethrex_common::types::EIP1559Transaction; + + fn build_tx(nonce: u64) -> Transaction { + Transaction::EIP1559Transaction(EIP1559Transaction { + nonce, + ..Default::default() + }) + } + + fn add_tx(pool: &Mempool, sender: Address, nonce: u64) -> H256 { + let tx = build_tx(nonce); + let mtx = MempoolTransaction::new(tx, sender); + let hash = mtx.hash(); + pool.add_transaction(hash, sender, mtx, usize::MAX).unwrap(); + hash + } + + #[test] + fn count_for_sender_empty_pool() { + let pool = Mempool::new(64); + let sender = Address::from_low_u64_be(1); + assert_eq!(pool.count_for_sender(sender).unwrap(), 0); + } + + #[test] + fn count_for_sender_one_tx() { + let pool = Mempool::new(64); + let sender = Address::from_low_u64_be(1); + add_tx(&pool, sender, 0); + assert_eq!(pool.count_for_sender(sender).unwrap(), 1); + } + + #[test] + fn count_for_sender_many_nonces() { + let pool = Mempool::new(64); + let sender = Address::from_low_u64_be(1); + for nonce in 0..5 { + add_tx(&pool, sender, nonce); + } + assert_eq!(pool.count_for_sender(sender).unwrap(), 5); + } + + #[test] + fn count_for_sender_isolates_senders() { + let pool = Mempool::new(64); + let a = Address::from_low_u64_be(1); + let b = Address::from_low_u64_be(2); + add_tx(&pool, a, 0); + add_tx(&pool, a, 1); + add_tx(&pool, b, 0); + assert_eq!(pool.count_for_sender(a).unwrap(), 2); + assert_eq!(pool.count_for_sender(b).unwrap(), 1); + } + + #[test] + fn count_for_sender_unknown_returns_zero() { + let pool = Mempool::new(64); + let a = Address::from_low_u64_be(1); + let b = Address::from_low_u64_be(2); + add_tx(&pool, a, 0); + assert_eq!(pool.count_for_sender(b).unwrap(), 0); + } +} diff --git a/docs/CLI.md b/docs/CLI.md index 3456578a0f..51bea6d912 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -91,6 +91,12 @@ Node options: [env: ETHREX_MEMPOOL_MAX_SIZE=] [default: 10000] + --mempool.max-pending-txs-per-account + Maximum number of pending transactions a single sender may hold in the mempool. Replacements at an existing (sender, nonce) bypass this cap. + + [env: ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT=] + [default: 16] + --precompute-witnesses Once synced, computes execution witnesses upon receiving newPayload messages and stores them in local storage diff --git a/test/tests/blockchain/mempool_tests.rs b/test/tests/blockchain/mempool_tests.rs index 93f2b5ff38..dc66211d9f 100644 --- a/test/tests/blockchain/mempool_tests.rs +++ b/test/tests/blockchain/mempool_tests.rs @@ -451,10 +451,10 @@ fn test_filter_mempool_transactions() { let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); let filter = |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) }; mempool - .add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone()) + .add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone(), usize::MAX) .unwrap(); mempool - .add_transaction(plain_tx_hash, plain_tx_sender, plain_tx) + .add_transaction(plain_tx_hash, plain_tx_sender, plain_tx, usize::MAX) .unwrap(); let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap(); assert_eq!( @@ -521,7 +521,12 @@ fn blobs_bundle_insert_and_remove() { .unwrap(); mempool - .add_transaction(hash, sender, MempoolTransaction::new(tx, sender)) + .add_transaction( + hash, + sender, + MempoolTransaction::new(tx, sender), + usize::MAX, + ) .expect("Failed to add blob transaction"); }