diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index e335932fc1..7af113cec6 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -192,6 +192,16 @@ pub struct Options { env = "ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT" )] pub mempool_max_pending_txs_per_account: usize, + #[arg( + help = "When a sender exceeds the per-account pending-tx cap, also drop the highest-nonce half of their existing pool entries. The breaching tx is still rejected; this only frees pool budget for legitimate senders.", + long = "mempool.punish-spammer", + default_value_t = true, + value_name = "BOOL", + help_heading = "Node options", + env = "ETHREX_MEMPOOL_PUNISH_SPAMMER", + action = clap::ArgAction::Set + )] + pub mempool_punish_spammer: bool, #[arg( long = "http.addr", default_value = "0.0.0.0", @@ -460,6 +470,7 @@ impl Default for Options { force: false, mempool_max_size: Default::default(), mempool_max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, + mempool_punish_spammer: true, tx_broadcasting_time_interval: Default::default(), target_peers: Default::default(), lookup_interval: Default::default(), diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index cc682e4e98..eeaddef3af 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -530,6 +530,7 @@ pub async fn init_l1( precompute_witnesses: opts.precompute_witnesses, precompile_cache_enabled: !opts.no_precompile_cache, max_pending_txs_per_account: opts.mempool_max_pending_txs_per_account, + punish_spammer: opts.mempool_punish_spammer, }, ); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index d611b30206..8607bbd4b1 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -225,6 +225,7 @@ pub async fn init_l2( precompute_witnesses: opts.node_opts.precompute_witnesses, precompile_cache_enabled: true, max_pending_txs_per_account: opts.node_opts.mempool_max_pending_txs_per_account, + punish_spammer: opts.node_opts.mempool_punish_spammer, }; let blockchain = init_blockchain(store.clone(), blockchain_opts.clone()); diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 8e6728f523..8dca6d770c 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -235,6 +235,12 @@ pub struct BlockchainOptions { /// the mempool. A replacement at an existing `(sender, nonce)` bypasses /// this check. pub max_pending_txs_per_account: usize, + /// Erigon-style punishment policy: when the per-sender cap is breached, + /// drop the highest-nonce half of the sender's existing pool entries + /// in addition to rejecting the new transaction. The breaching tx is + /// still rejected; this only frees pool budget for legitimate senders. + /// Defaults to `true`. + pub punish_spammer: bool, } impl Default for BlockchainOptions { @@ -247,6 +253,7 @@ impl Default for BlockchainOptions { precompute_witnesses: false, precompile_cache_enabled: true, max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, + punish_spammer: true, } } } @@ -2351,6 +2358,7 @@ impl Blockchain { sender, MempoolTransaction::new(transaction, sender), self.options.max_pending_txs_per_account, + self.options.punish_spammer, )?; Ok(hash) } @@ -2380,6 +2388,7 @@ impl Blockchain { sender, MempoolTransaction::new(transaction, sender), self.options.max_pending_txs_per_account, + self.options.punish_spammer, )?; Ok(hash) diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index f2c3f1edc2..54fc42a692 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -150,12 +150,20 @@ impl Mempool { /// post-replacement state. Returns /// [`MempoolError::MaxPendingTxsPerAccountExceeded`] if the cap would /// be exceeded. + /// + /// When `punish_spammer` is true, breaching the cap additionally + /// drops the highest-nonce half of the sender's existing pool entries + /// (rounded up for odd counts) before returning the error. Erigon-style + /// punishment: a sender hitting the cap is likely spamming future + /// nonces, so freeing those slots reclaims pool budget for transactions + /// more likely to execute next. The new transaction is still rejected. pub fn add_transaction( &self, hash: H256, sender: Address, transaction: MempoolTransaction, max_pending_txs_per_account: usize, + punish_spammer: bool, ) -> Result<(), MempoolError> { let mut inner = self.write()?; let count = inner @@ -163,8 +171,48 @@ impl Mempool { .range((sender, 0)..=(sender, u64::MAX)) .count(); if count >= max_pending_txs_per_account { + // Drop the upper half of the sender's pool entries, rounded up so + // odd counts still lose the median entry (`ceil(count / 2)`). + // Skip the prune when `count <= 1`: dropping the only entry adds + // no useful punishment (the sender already gets the new tx + // rejected) and turns interactions with delegated cap=1 into + // "every cap-breach wipes the sender's lone tx". + let mut dropped_count = 0usize; + if punish_spammer && count > 1 { + // Collect the sender's nonce range once, in reverse, so we + // can read victims and new_top_nonce in a single pass and + // release the immutable borrow before mutating the map. + let entries: Vec<(u64, H256)> = inner + .txs_by_sender_nonce + .range((sender, 0)..=(sender, u64::MAX)) + .rev() + .map(|((_, nonce), hash)| (*nonce, *hash)) + .collect(); + dropped_count = count.div_ceil(2); + let new_top_nonce = entries.get(dropped_count).map(|(nonce, _)| *nonce); + for (_, victim_hash) in entries.iter().take(dropped_count) { + inner.remove_transaction_with_lock(victim_hash)?; + } + // `remove_transaction_with_lock` doesn't touch `txs_order`, + // so each victim leaves a tombstone behind. The natural + // lazy-sweep eventually cleans them up via the + // `mempool_prune_threshold` check below, but a sustained + // spam loop can race ahead of the threshold. Sweep + // `txs_order` once here, scoped to this punishment fire, so + // the queue stays roughly aligned with `transaction_pool`. + let txpool = core::mem::take(&mut inner.transaction_pool); + inner.txs_order.retain(|h| txpool.contains_key(h)); + inner.transaction_pool = txpool; + warn!( + target: "mempool", + sender = ?sender, + dropped_count, + new_top_nonce = ?new_top_nonce, + "punishSpammer: per-sender cap breached; dropped highest-nonce half of sender's pool entries" + ); + } return Err(MempoolError::MaxPendingTxsPerAccountExceeded { - count, + count: count.saturating_sub(dropped_count), limit: max_pending_txs_per_account, }); } @@ -621,7 +669,8 @@ mod tests { let tx = build_tx(nonce); let mtx = MempoolTransaction::new(tx, sender); let hash = mtx.hash(); - pool.add_transaction(hash, sender, mtx, usize::MAX).unwrap(); + pool.add_transaction(hash, sender, mtx, usize::MAX, true) + .unwrap(); hash } @@ -670,4 +719,198 @@ mod tests { add_tx(&pool, a, 0); assert_eq!(pool.count_for_sender(b).unwrap(), 0); } + + /// Helper that submits a new tx for `sender` at `nonce` with the given + /// cap and punish flag, expecting it to be rejected with + /// `MaxPendingTxsPerAccountExceeded`. + fn submit_at_cap( + pool: &Mempool, + sender: Address, + nonce: u64, + cap: usize, + punish_spammer: bool, + ) { + let tx = build_tx(nonce); + let mtx = MempoolTransaction::new(tx, sender); + let hash = mtx.hash(); + let err = pool + .add_transaction(hash, sender, mtx, cap, punish_spammer) + .expect_err("expected per-account cap rejection"); + assert!( + matches!(err, MempoolError::MaxPendingTxsPerAccountExceeded { .. }), + "expected MaxPendingTxsPerAccountExceeded, got {err:?}" + ); + } + + #[test] + fn punish_spammer_drops_highest_nonce_half_at_cap() { + let pool = Mempool::new(128); + let sender = Address::from_low_u64_be(1); + // Fill the sender up to a cap of 16. + for nonce in 0..16u64 { + add_tx(&pool, sender, nonce); + } + assert_eq!(pool.count_for_sender(sender).unwrap(), 16); + + // 17th tx is rejected and triggers punishment. + submit_at_cap(&pool, sender, 16, 16, true); + + // 8 highest-nonce entries (nonces 8..16) should be dropped. + assert_eq!(pool.count_for_sender(sender).unwrap(), 8); + let inner = pool.read().unwrap(); + let remaining_nonces: Vec = inner + .txs_by_sender_nonce + .range((sender, 0)..=(sender, u64::MAX)) + .map(|((_, n), _)| *n) + .collect(); + assert_eq!(remaining_nonces, (0..8u64).collect::>()); + // `txs_order` must NOT carry tombstones for the dropped hashes — + // they were cleaned up by the per-punishment sweep so the + // `mempool_prune_threshold` check below counts accurately. + assert_eq!( + inner.txs_order.len(), + inner.transaction_pool.len(), + "txs_order must align with transaction_pool after punishment", + ); + } + + #[test] + fn punish_spammer_disabled_leaves_existing_txs() { + let pool = Mempool::new(128); + let sender = Address::from_low_u64_be(1); + for nonce in 0..16u64 { + add_tx(&pool, sender, nonce); + } + assert_eq!(pool.count_for_sender(sender).unwrap(), 16); + + // With punish_spammer = false, the new tx is rejected but the + // existing 16 entries are untouched. + submit_at_cap(&pool, sender, 16, 16, false); + + assert_eq!(pool.count_for_sender(sender).unwrap(), 16); + } + + #[test] + fn below_cap_admits_normally_without_punishment() { + let pool = Mempool::new(64); + let sender = Address::from_low_u64_be(1); + // Cap is 2; sender currently has 1 pending. + add_tx(&pool, sender, 0); + assert_eq!(pool.count_for_sender(sender).unwrap(), 1); + + let tx = build_tx(1); + let mtx = MempoolTransaction::new(tx, sender); + let hash = mtx.hash(); + pool.add_transaction(hash, sender, mtx, 2, true) + .expect("below-cap submission should be admitted"); + + assert_eq!(pool.count_for_sender(sender).unwrap(), 2); + } + + #[test] + fn punish_spammer_removes_blob_bundles_for_dropped_blob_txs() { + use ethrex_common::types::{BlobsBundle, EIP4844Transaction}; + + let pool = Mempool::new(128); + let sender = Address::from_low_u64_be(1); + + // Build 16 entries alternating blob and non-blob txs. Blob txs sit + // at odd nonces so half of them (the highest-nonce ones: 9, 11, 13, 15) + // fall into the dropped upper half. + let mut blob_hashes_in_upper_half: Vec = Vec::new(); + let mut blob_hashes_in_lower_half: Vec = Vec::new(); + for nonce in 0..16u64 { + let (tx, is_blob) = if nonce % 2 == 1 { + ( + Transaction::EIP4844Transaction(EIP4844Transaction { + nonce, + ..Default::default() + }), + true, + ) + } else { + (build_tx(nonce), false) + }; + let mtx = MempoolTransaction::new(tx, sender); + let hash = mtx.hash(); + if is_blob { + pool.add_blobs_bundle(hash, BlobsBundle::default()).unwrap(); + if nonce >= 8 { + blob_hashes_in_upper_half.push(hash); + } else { + blob_hashes_in_lower_half.push(hash); + } + } + pool.add_transaction(hash, sender, mtx, usize::MAX, true) + .unwrap(); + } + assert_eq!(pool.count_for_sender(sender).unwrap(), 16); + for h in &blob_hashes_in_upper_half { + assert!(pool.get_blobs_bundle(*h).unwrap().is_some()); + } + + // Trigger punishment with cap = 16. + submit_at_cap(&pool, sender, 16, 16, true); + + // Upper-half blob bundles must be gone; lower-half blob bundles + // must remain. + assert_eq!(pool.count_for_sender(sender).unwrap(), 8); + for h in &blob_hashes_in_upper_half { + assert!( + pool.get_blobs_bundle(*h).unwrap().is_none(), + "blob bundle for dropped tx {h:?} should have been removed" + ); + } + for h in &blob_hashes_in_lower_half { + assert!( + pool.get_blobs_bundle(*h).unwrap().is_some(), + "blob bundle for surviving tx {h:?} should still be present" + ); + } + } + + #[test] + fn punish_spammer_skips_prune_when_count_is_one() { + // With cap = 1 and a single pending tx, the prune-on-breach policy + // would wipe the sender's only tx on every cap-breach attempt, which + // is especially harmful when combined with delegated cap=1 (every + // collision wipes the prior tx). The implementation skips the prune + // when `count <= 1`. + let pool = Mempool::new(64); + let sender = Address::from_low_u64_be(7); + add_tx(&pool, sender, 0); + assert_eq!(pool.count_for_sender(sender).unwrap(), 1); + + submit_at_cap(&pool, sender, 1, 1, true); + + // The pre-existing single tx must survive. + assert_eq!(pool.count_for_sender(sender).unwrap(), 1); + } + + #[test] + fn punish_spammer_reports_post_prune_count() { + // The rejection error must reflect the sender's count AFTER the prune + // so RPC clients see the actual post-state, not the pre-prune count. + let pool = Mempool::new(64); + let sender = Address::from_low_u64_be(8); + for nonce in 0..16u64 { + add_tx(&pool, sender, nonce); + } + + let tx = build_tx(16); + let mtx = MempoolTransaction::new(tx, sender); + let hash = mtx.hash(); + let err = pool + .add_transaction(hash, sender, mtx, 16, true) + .expect_err("expected per-account cap rejection"); + + match err { + MempoolError::MaxPendingTxsPerAccountExceeded { count, limit } => { + // 16 - ceil(16/2) = 16 - 8 = 8 remaining after the prune. + assert_eq!(count, 8); + assert_eq!(limit, 16); + } + other => panic!("expected MaxPendingTxsPerAccountExceeded, got {other:?}"), + } + } } diff --git a/docs/CLI.md b/docs/CLI.md index aecec1c246..3fbaf02f70 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -97,6 +97,13 @@ Node options: [env: ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT=] [default: 16] + --mempool.punish-spammer + When a sender exceeds the per-account pending-tx cap, also drop the highest-nonce half of their existing pool entries. The breaching tx is still rejected; this only frees pool budget for legitimate senders. + + [env: ETHREX_MEMPOOL_PUNISH_SPAMMER=] + [default: true] + [possible values: true, false] + --precompute-witnesses Once synced, computes execution witnesses upon receiving newPayload messages and stores them in local storage diff --git a/test/tests/blockchain/mempool_tests.rs b/test/tests/blockchain/mempool_tests.rs index 9098b2599b..d2ccad278d 100644 --- a/test/tests/blockchain/mempool_tests.rs +++ b/test/tests/blockchain/mempool_tests.rs @@ -370,10 +370,10 @@ fn test_filter_mempool_transactions() { let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); let filter = |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) }; mempool - .add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone()) + .add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone(), usize::MAX, false) .unwrap(); mempool - .add_transaction(plain_tx_hash, plain_tx_sender, plain_tx) + .add_transaction(plain_tx_hash, plain_tx_sender, plain_tx, usize::MAX, false) .unwrap(); let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap(); assert_eq!( @@ -440,7 +440,13 @@ fn blobs_bundle_insert_and_remove() { .unwrap(); mempool - .add_transaction(hash, sender, MempoolTransaction::new(tx, sender)) + .add_transaction( + hash, + sender, + MempoolTransaction::new(tx, sender), + usize::MAX, + false, + ) .expect("Failed to add blob transaction"); }