Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ pub struct Options {
env = "ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT"
)]
pub mempool_max_pending_txs_per_account: usize,
#[arg(
help = "When a sender exceeds the per-account pending-tx cap, also drop the highest-nonce half of their existing pool entries. The breaching tx is still rejected; this only frees pool budget for legitimate senders.",
long = "mempool.punish-spammer",
default_value_t = true,
value_name = "BOOL",
help_heading = "Node options",
env = "ETHREX_MEMPOOL_PUNISH_SPAMMER",
action = clap::ArgAction::Set
)]
pub mempool_punish_spammer: bool,
#[arg(
long = "http.addr",
default_value = "0.0.0.0",
Expand Down Expand Up @@ -460,6 +470,7 @@ impl Default for Options {
force: false,
mempool_max_size: Default::default(),
mempool_max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT,
mempool_punish_spammer: true,
tx_broadcasting_time_interval: Default::default(),
target_peers: Default::default(),
lookup_interval: Default::default(),
Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ pub async fn init_l1(
precompute_witnesses: opts.precompute_witnesses,
precompile_cache_enabled: !opts.no_precompile_cache,
max_pending_txs_per_account: opts.mempool_max_pending_txs_per_account,
punish_spammer: opts.mempool_punish_spammer,
},
);

Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub async fn init_l2(
precompute_witnesses: opts.node_opts.precompute_witnesses,
precompile_cache_enabled: true,
max_pending_txs_per_account: opts.node_opts.mempool_max_pending_txs_per_account,
punish_spammer: opts.node_opts.mempool_punish_spammer,
};

let blockchain = init_blockchain(store.clone(), blockchain_opts.clone());
Expand Down
9 changes: 9 additions & 0 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ pub struct BlockchainOptions {
/// the mempool. A replacement at an existing `(sender, nonce)` bypasses
/// this check.
pub max_pending_txs_per_account: usize,
/// Erigon-style punishment policy: when the per-sender cap is breached,
/// drop the highest-nonce half of the sender's existing pool entries
/// in addition to rejecting the new transaction. The breaching tx is
/// still rejected; this only frees pool budget for legitimate senders.
/// Defaults to `true`.
pub punish_spammer: bool,
}

impl Default for BlockchainOptions {
Expand All @@ -247,6 +253,7 @@ impl Default for BlockchainOptions {
precompute_witnesses: false,
precompile_cache_enabled: true,
max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT,
punish_spammer: true,
}
}
}
Expand Down Expand Up @@ -2351,6 +2358,7 @@ impl Blockchain {
sender,
MempoolTransaction::new(transaction, sender),
self.options.max_pending_txs_per_account,
self.options.punish_spammer,
)?;
Ok(hash)
}
Expand Down Expand Up @@ -2380,6 +2388,7 @@ impl Blockchain {
sender,
MempoolTransaction::new(transaction, sender),
self.options.max_pending_txs_per_account,
self.options.punish_spammer,
)?;

Ok(hash)
Expand Down
247 changes: 245 additions & 2 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,69 @@ impl Mempool {
/// post-replacement state. Returns
/// [`MempoolError::MaxPendingTxsPerAccountExceeded`] if the cap would
/// be exceeded.
///
/// When `punish_spammer` is true, breaching the cap additionally
/// drops the highest-nonce half of the sender's existing pool entries
/// (rounded up for odd counts) before returning the error. Erigon-style
/// punishment: a sender hitting the cap is likely spamming future
/// nonces, so freeing those slots reclaims pool budget for transactions
/// more likely to execute next. The new transaction is still rejected.
pub fn add_transaction(
&self,
hash: H256,
sender: Address,
transaction: MempoolTransaction,
max_pending_txs_per_account: usize,
punish_spammer: bool,
) -> Result<(), MempoolError> {
let mut inner = self.write()?;
let count = inner
.txs_by_sender_nonce
.range((sender, 0)..=(sender, u64::MAX))
.count();
if count >= max_pending_txs_per_account {
// Drop the upper half of the sender's pool entries, rounded up so
// odd counts still lose the median entry (`ceil(count / 2)`).
// Skip the prune when `count <= 1`: dropping the only entry adds
// no useful punishment (the sender already gets the new tx
// rejected) and turns interactions with delegated cap=1 into
// "every cap-breach wipes the sender's lone tx".
let mut dropped_count = 0usize;
if punish_spammer && count > 1 {
// Collect the sender's nonce range once, in reverse, so we
// can read victims and new_top_nonce in a single pass and
// release the immutable borrow before mutating the map.
let entries: Vec<(u64, H256)> = inner
.txs_by_sender_nonce
.range((sender, 0)..=(sender, u64::MAX))
.rev()
.map(|((_, nonce), hash)| (*nonce, *hash))
.collect();
dropped_count = count.div_ceil(2);
let new_top_nonce = entries.get(dropped_count).map(|(nonce, _)| *nonce);
for (_, victim_hash) in entries.iter().take(dropped_count) {
inner.remove_transaction_with_lock(victim_hash)?;
}
// `remove_transaction_with_lock` doesn't touch `txs_order`,
// so each victim leaves a tombstone behind. The natural
// lazy-sweep eventually cleans them up via the
// `mempool_prune_threshold` check below, but a sustained
// spam loop can race ahead of the threshold. Sweep
// `txs_order` once here, scoped to this punishment fire, so
// the queue stays roughly aligned with `transaction_pool`.
let txpool = core::mem::take(&mut inner.transaction_pool);
inner.txs_order.retain(|h| txpool.contains_key(h));
inner.transaction_pool = txpool;
warn!(
target: "mempool",
sender = ?sender,
dropped_count,
new_top_nonce = ?new_top_nonce,
"punishSpammer: per-sender cap breached; dropped highest-nonce half of sender's pool entries"
);
}
return Err(MempoolError::MaxPendingTxsPerAccountExceeded {
count,
count: count.saturating_sub(dropped_count),
limit: max_pending_txs_per_account,
});
}
Expand Down Expand Up @@ -621,7 +669,8 @@ mod tests {
let tx = build_tx(nonce);
let mtx = MempoolTransaction::new(tx, sender);
let hash = mtx.hash();
pool.add_transaction(hash, sender, mtx, usize::MAX).unwrap();
pool.add_transaction(hash, sender, mtx, usize::MAX, true)
.unwrap();
hash
}

Expand Down Expand Up @@ -670,4 +719,198 @@ mod tests {
add_tx(&pool, a, 0);
assert_eq!(pool.count_for_sender(b).unwrap(), 0);
}

/// Helper that submits a new tx for `sender` at `nonce` with the given
/// cap and punish flag, expecting it to be rejected with
/// `MaxPendingTxsPerAccountExceeded`.
fn submit_at_cap(
pool: &Mempool,
sender: Address,
nonce: u64,
cap: usize,
punish_spammer: bool,
) {
let tx = build_tx(nonce);
let mtx = MempoolTransaction::new(tx, sender);
let hash = mtx.hash();
let err = pool
.add_transaction(hash, sender, mtx, cap, punish_spammer)
.expect_err("expected per-account cap rejection");
assert!(
matches!(err, MempoolError::MaxPendingTxsPerAccountExceeded { .. }),
"expected MaxPendingTxsPerAccountExceeded, got {err:?}"
);
}

#[test]
fn punish_spammer_drops_highest_nonce_half_at_cap() {
let pool = Mempool::new(128);
let sender = Address::from_low_u64_be(1);
// Fill the sender up to a cap of 16.
for nonce in 0..16u64 {
add_tx(&pool, sender, nonce);
}
assert_eq!(pool.count_for_sender(sender).unwrap(), 16);

// 17th tx is rejected and triggers punishment.
submit_at_cap(&pool, sender, 16, 16, true);

// 8 highest-nonce entries (nonces 8..16) should be dropped.
assert_eq!(pool.count_for_sender(sender).unwrap(), 8);
let inner = pool.read().unwrap();
let remaining_nonces: Vec<u64> = inner
.txs_by_sender_nonce
.range((sender, 0)..=(sender, u64::MAX))
.map(|((_, n), _)| *n)
.collect();
assert_eq!(remaining_nonces, (0..8u64).collect::<Vec<_>>());
// `txs_order` must NOT carry tombstones for the dropped hashes —
// they were cleaned up by the per-punishment sweep so the
// `mempool_prune_threshold` check below counts accurately.
assert_eq!(
inner.txs_order.len(),
inner.transaction_pool.len(),
"txs_order must align with transaction_pool after punishment",
);
}

#[test]
fn punish_spammer_disabled_leaves_existing_txs() {
let pool = Mempool::new(128);
let sender = Address::from_low_u64_be(1);
for nonce in 0..16u64 {
add_tx(&pool, sender, nonce);
}
assert_eq!(pool.count_for_sender(sender).unwrap(), 16);

// With punish_spammer = false, the new tx is rejected but the
// existing 16 entries are untouched.
submit_at_cap(&pool, sender, 16, 16, false);

assert_eq!(pool.count_for_sender(sender).unwrap(), 16);
}

#[test]
fn below_cap_admits_normally_without_punishment() {
let pool = Mempool::new(64);
let sender = Address::from_low_u64_be(1);
// Cap is 2; sender currently has 1 pending.
add_tx(&pool, sender, 0);
assert_eq!(pool.count_for_sender(sender).unwrap(), 1);

let tx = build_tx(1);
let mtx = MempoolTransaction::new(tx, sender);
let hash = mtx.hash();
pool.add_transaction(hash, sender, mtx, 2, true)
.expect("below-cap submission should be admitted");

assert_eq!(pool.count_for_sender(sender).unwrap(), 2);
}

#[test]
fn punish_spammer_removes_blob_bundles_for_dropped_blob_txs() {
use ethrex_common::types::{BlobsBundle, EIP4844Transaction};

let pool = Mempool::new(128);
let sender = Address::from_low_u64_be(1);

// Build 16 entries alternating blob and non-blob txs. Blob txs sit
// at odd nonces so half of them (the highest-nonce ones: 9, 11, 13, 15)
// fall into the dropped upper half.
let mut blob_hashes_in_upper_half: Vec<H256> = Vec::new();
let mut blob_hashes_in_lower_half: Vec<H256> = Vec::new();
for nonce in 0..16u64 {
let (tx, is_blob) = if nonce % 2 == 1 {
(
Transaction::EIP4844Transaction(EIP4844Transaction {
nonce,
..Default::default()
}),
true,
)
} else {
(build_tx(nonce), false)
};
let mtx = MempoolTransaction::new(tx, sender);
let hash = mtx.hash();
if is_blob {
pool.add_blobs_bundle(hash, BlobsBundle::default()).unwrap();
if nonce >= 8 {
blob_hashes_in_upper_half.push(hash);
} else {
blob_hashes_in_lower_half.push(hash);
}
}
pool.add_transaction(hash, sender, mtx, usize::MAX, true)
.unwrap();
}
assert_eq!(pool.count_for_sender(sender).unwrap(), 16);
for h in &blob_hashes_in_upper_half {
assert!(pool.get_blobs_bundle(*h).unwrap().is_some());
}

// Trigger punishment with cap = 16.
submit_at_cap(&pool, sender, 16, 16, true);

// Upper-half blob bundles must be gone; lower-half blob bundles
// must remain.
assert_eq!(pool.count_for_sender(sender).unwrap(), 8);
for h in &blob_hashes_in_upper_half {
assert!(
pool.get_blobs_bundle(*h).unwrap().is_none(),
"blob bundle for dropped tx {h:?} should have been removed"
);
}
for h in &blob_hashes_in_lower_half {
assert!(
pool.get_blobs_bundle(*h).unwrap().is_some(),
"blob bundle for surviving tx {h:?} should still be present"
);
}
}

#[test]
fn punish_spammer_skips_prune_when_count_is_one() {
// With cap = 1 and a single pending tx, the prune-on-breach policy
// would wipe the sender's only tx on every cap-breach attempt, which
// is especially harmful when combined with delegated cap=1 (every
// collision wipes the prior tx). The implementation skips the prune
// when `count <= 1`.
let pool = Mempool::new(64);
let sender = Address::from_low_u64_be(7);
add_tx(&pool, sender, 0);
assert_eq!(pool.count_for_sender(sender).unwrap(), 1);

submit_at_cap(&pool, sender, 1, 1, true);

// The pre-existing single tx must survive.
assert_eq!(pool.count_for_sender(sender).unwrap(), 1);
}

#[test]
fn punish_spammer_reports_post_prune_count() {
// The rejection error must reflect the sender's count AFTER the prune
// so RPC clients see the actual post-state, not the pre-prune count.
let pool = Mempool::new(64);
let sender = Address::from_low_u64_be(8);
for nonce in 0..16u64 {
add_tx(&pool, sender, nonce);
}

let tx = build_tx(16);
let mtx = MempoolTransaction::new(tx, sender);
let hash = mtx.hash();
let err = pool
.add_transaction(hash, sender, mtx, 16, true)
.expect_err("expected per-account cap rejection");

match err {
MempoolError::MaxPendingTxsPerAccountExceeded { count, limit } => {
// 16 - ceil(16/2) = 16 - 8 = 8 remaining after the prune.
assert_eq!(count, 8);
assert_eq!(limit, 16);
}
other => panic!("expected MaxPendingTxsPerAccountExceeded, got {other:?}"),
}
}
}
7 changes: 7 additions & 0 deletions docs/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ Node options:
[env: ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT=]
[default: 16]

--mempool.punish-spammer <BOOL>
When a sender exceeds the per-account pending-tx cap, also drop the highest-nonce half of their existing pool entries. The breaching tx is still rejected; this only frees pool budget for legitimate senders.

[env: ETHREX_MEMPOOL_PUNISH_SPAMMER=]
[default: true]
[possible values: true, false]

--precompute-witnesses
Once synced, computes execution witnesses upon receiving newPayload messages and stores them in local storage

Expand Down
12 changes: 9 additions & 3 deletions test/tests/blockchain/mempool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,10 @@ fn test_filter_mempool_transactions() {
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
let filter = |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) };
mempool
.add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone())
.add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone(), usize::MAX, false)
.unwrap();
mempool
.add_transaction(plain_tx_hash, plain_tx_sender, plain_tx)
.add_transaction(plain_tx_hash, plain_tx_sender, plain_tx, usize::MAX, false)
.unwrap();
let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap();
assert_eq!(
Expand Down Expand Up @@ -440,7 +440,13 @@ fn blobs_bundle_insert_and_remove() {
.unwrap();

mempool
.add_transaction(hash, sender, MempoolTransaction::new(tx, sender))
.add_transaction(
hash,
sender,
MempoolTransaction::new(tx, sender),
usize::MAX,
false,
)
.expect("Failed to add blob transaction");
}

Expand Down
Loading