Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand};
use ethrex_blockchain::{
BlockchainOptions, BlockchainType, L2Config,
BlockchainOptions, BlockchainType, DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT, L2Config,
error::{ChainError, InvalidBlockError},
};
use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body};
Expand Down Expand Up @@ -183,6 +183,15 @@ pub struct Options {
env = "ETHREX_MEMPOOL_MAX_SIZE"
)]
pub mempool_max_size: usize,
#[arg(
help = "Maximum number of pending transactions a single sender may hold in the mempool. Replacements at an existing (sender, nonce) bypass this cap.",
long = "mempool.max-pending-txs-per-account",
default_value_t = DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT,
value_name = "MAX_PENDING_TXS_PER_ACCOUNT",
help_heading = "Node options",
env = "ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT"
)]
pub mempool_max_pending_txs_per_account: usize,
Comment on lines +186 to +194
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[arg(
long = "http.addr",
default_value = "127.0.0.1",
Expand Down Expand Up @@ -464,6 +473,7 @@ impl Default for Options {
dev: Default::default(),
force: false,
mempool_max_size: Default::default(),
mempool_max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT,
tx_broadcasting_time_interval: Default::default(),
Comment on lines 473 to 477
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target_peers: Default::default(),
lookup_interval: Default::default(),
Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ pub async fn init_l1(
max_blobs_per_block: opts.max_blobs_per_block,
precompute_witnesses: opts.precompute_witnesses,
precompile_cache_enabled: !opts.no_precompile_cache,
max_pending_txs_per_account: opts.mempool_max_pending_txs_per_account,
},
);

Expand Down
1 change: 1 addition & 0 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub async fn init_l2(
max_blobs_per_block: None, // L2 doesn't support blob transactions
precompute_witnesses: opts.node_opts.precompute_witnesses,
precompile_cache_enabled: true,
max_pending_txs_per_account: opts.node_opts.mempool_max_pending_txs_per_account,
};

let blockchain = init_blockchain(store.clone(), blockchain_opts.clone());
Expand Down
32 changes: 28 additions & 4 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ pub struct BlockchainOptions {
/// warmer thread and the executor. Set to false (via `--no-precompile-cache`) to
/// disable the cache for benchmarking purposes.
pub precompile_cache_enabled: bool,
/// Maximum number of pending transactions a single sender may hold in
/// the mempool. A replacement at an existing `(sender, nonce)` bypasses
/// this check.
pub max_pending_txs_per_account: usize,
}

impl Default for BlockchainOptions {
Expand All @@ -240,10 +244,14 @@ impl Default for BlockchainOptions {
max_blobs_per_block: None,
precompute_witnesses: false,
precompile_cache_enabled: true,
max_pending_txs_per_account: DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT,
}
}
}

/// Default per-account pending-tx cap.
pub const DEFAULT_MAX_PENDING_TXS_PER_ACCOUNT: usize = 16;

#[derive(Debug, Clone)]
pub struct BatchBlockProcessingFailure {
pub last_valid_hash: H256,
Expand Down Expand Up @@ -2350,8 +2358,12 @@ impl Blockchain {
// Add blobs bundle before the transaction so that when add_transaction
// notifies payload builders the blob data is already available.
self.mempool.add_blobs_bundle(hash, blobs_bundle)?;
self.mempool
.add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
self.mempool.add_transaction(
hash,
sender,
MempoolTransaction::new(transaction, sender),
self.options.max_pending_txs_per_account,
)?;
Ok(hash)
}

Expand Down Expand Up @@ -2387,8 +2399,12 @@ impl Blockchain {
}

// Add transaction to storage
self.mempool
.add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?;
self.mempool.add_transaction(
hash,
sender,
MempoolTransaction::new(transaction, sender),
self.options.max_pending_txs_per_account,
)?;

Ok(hash)
}
Expand Down Expand Up @@ -2548,6 +2564,14 @@ impl Blockchain {
return Err(MempoolError::InvalidChainId(config.chain_id));
}

// The per-account pending-tx cap is enforced atomically inside
// `Mempool::add_transaction` (under the same write lock as the
// insertion) so concurrent submissions can't both pass a stale
// count check and race past the limit. Replacement candidates
// bypass it implicitly: the caller removes the old tx before
// `add_transaction` runs, so the post-removal count is one
// below the cap and the new insertion stays within it.

Ok(tx_to_replace_hash)
}

Expand Down
4 changes: 4 additions & 0 deletions crates/blockchain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ pub enum MempoolError {
TxMaxInitCodeSizeError,
#[error("Transaction encoded size ({actual} bytes) exceeds the {limit}-byte limit")]
TxSizeExceeded { actual: usize, limit: usize },
#[error(
"Sender has {count} pending transactions; adding a new one would exceed the per-account cap of {limit}"
)]
MaxPendingTxsPerAccountExceeded { count: usize, limit: usize },
#[error("Transaction gas limit exceeded")]
TxGasLimitExceededError,
#[error(
Expand Down
101 changes: 99 additions & 2 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,33 @@ impl Mempool {
.map_err(|error| StoreError::MempoolReadLock(error.to_string()))
}

/// Add transaction to the pool without doing validity checks
/// Add transaction to the pool without doing validity checks.
///
/// Enforces the per-sender pending-tx cap atomically: the count is
/// re-checked under the same write lock that performs the insertion.
/// Replacement candidates (same `(sender, nonce)`) must have already
/// been removed via `remove_transaction` so this counter reflects the
/// post-replacement state. Returns
/// [`MempoolError::MaxPendingTxsPerAccountExceeded`] if the cap would
/// be exceeded.
pub fn add_transaction(
&self,
hash: H256,
sender: Address,
transaction: MempoolTransaction,
) -> Result<(), StoreError> {
max_pending_txs_per_account: usize,
) -> Result<(), MempoolError> {
let mut inner = self.write()?;
let count = inner
.txs_by_sender_nonce
.range((sender, 0)..=(sender, u64::MAX))
.count();
if count >= max_pending_txs_per_account {
return Err(MempoolError::MaxPendingTxsPerAccountExceeded {
count,
limit: max_pending_txs_per_account,
});
}
// Prune the order queue if it has grown too much
if inner.txs_order.len() > inner.mempool_prune_threshold {
// NOTE: we do this to avoid borrow checker errors
Expand Down Expand Up @@ -452,6 +471,17 @@ impl Mempool {
Ok(contains)
}

/// Returns the number of pending transactions currently held in the
/// mempool for `sender`. Used by the per-sender slot cap at admission.
pub fn count_for_sender(&self, sender: Address) -> Result<usize, MempoolError> {
let inner = self.read()?;
let count = inner
.txs_by_sender_nonce
.range((sender, 0)..=(sender, u64::MAX))
.count();
Comment on lines +479 to +481
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bounded, so I think it's OK for now.

Ok(count)
}

pub fn find_tx_to_replace(
&self,
sender: Address,
Expand Down Expand Up @@ -599,3 +629,70 @@ pub fn transaction_intrinsic_gas(

Ok(gas)
}

#[cfg(test)]
mod tests {
use super::*;
use ethrex_common::types::EIP1559Transaction;

fn build_tx(nonce: u64) -> Transaction {
Transaction::EIP1559Transaction(EIP1559Transaction {
nonce,
..Default::default()
})
}

fn add_tx(pool: &Mempool, sender: Address, nonce: u64) -> H256 {
let tx = build_tx(nonce);
let mtx = MempoolTransaction::new(tx, sender);
let hash = mtx.hash();
pool.add_transaction(hash, sender, mtx, usize::MAX).unwrap();
hash
}

#[test]
fn count_for_sender_empty_pool() {
let pool = Mempool::new(64);
let sender = Address::from_low_u64_be(1);
assert_eq!(pool.count_for_sender(sender).unwrap(), 0);
}

#[test]
fn count_for_sender_one_tx() {
let pool = Mempool::new(64);
let sender = Address::from_low_u64_be(1);
add_tx(&pool, sender, 0);
assert_eq!(pool.count_for_sender(sender).unwrap(), 1);
}

#[test]
fn count_for_sender_many_nonces() {
let pool = Mempool::new(64);
let sender = Address::from_low_u64_be(1);
for nonce in 0..5 {
add_tx(&pool, sender, nonce);
}
assert_eq!(pool.count_for_sender(sender).unwrap(), 5);
}

#[test]
fn count_for_sender_isolates_senders() {
let pool = Mempool::new(64);
let a = Address::from_low_u64_be(1);
let b = Address::from_low_u64_be(2);
add_tx(&pool, a, 0);
add_tx(&pool, a, 1);
add_tx(&pool, b, 0);
assert_eq!(pool.count_for_sender(a).unwrap(), 2);
assert_eq!(pool.count_for_sender(b).unwrap(), 1);
}

#[test]
fn count_for_sender_unknown_returns_zero() {
let pool = Mempool::new(64);
let a = Address::from_low_u64_be(1);
let b = Address::from_low_u64_be(2);
add_tx(&pool, a, 0);
assert_eq!(pool.count_for_sender(b).unwrap(), 0);
}
}
6 changes: 6 additions & 0 deletions docs/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ Node options:
[env: ETHREX_MEMPOOL_MAX_SIZE=]
[default: 10000]

--mempool.max-pending-txs-per-account <MAX_PENDING_TXS_PER_ACCOUNT>
Maximum number of pending transactions a single sender may hold in the mempool. Replacements at an existing (sender, nonce) bypass this cap.

[env: ETHREX_MEMPOOL_MAX_PENDING_TXS_PER_ACCOUNT=]
[default: 16]

--precompute-witnesses
Once synced, computes execution witnesses upon receiving newPayload messages and stores them in local storage

Expand Down
11 changes: 8 additions & 3 deletions test/tests/blockchain/mempool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,10 @@ fn test_filter_mempool_transactions() {
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
let filter = |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) };
mempool
.add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone())
.add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone(), usize::MAX)
.unwrap();
mempool
.add_transaction(plain_tx_hash, plain_tx_sender, plain_tx)
.add_transaction(plain_tx_hash, plain_tx_sender, plain_tx, usize::MAX)
.unwrap();
let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap();
assert_eq!(
Expand Down Expand Up @@ -521,7 +521,12 @@ fn blobs_bundle_insert_and_remove() {
.unwrap();

mempool
.add_transaction(hash, sender, MempoolTransaction::new(tx, sender))
.add_transaction(
hash,
sender,
MempoolTransaction::new(tx, sender),
usize::MAX,
)
.expect("Failed to add blob transaction");
}

Expand Down
Loading