Skip to content
83 changes: 77 additions & 6 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use ethrex_vm::backends::CachingDatabase;
use ethrex_vm::backends::levm::LEVM;
use ethrex_vm::backends::levm::db::DatabaseLogger;
use ethrex_vm::{BlockExecutionResult, DynVmDatabase, Evm, EvmError};
use mempool::Mempool;
use mempool::{Mempool, TxOrigin};
use payload::PayloadOrTask;
use rustc_hash::{FxHashMap, FxHashSet};
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -2306,12 +2306,39 @@ impl Blockchain {
Ok(())
}

/// Add a blob transaction and its blobs bundle to the mempool checking that the transaction is valid
/// Add a P2P-received blob transaction and its blobs bundle to the mempool.
///
/// The transaction is validated as `TxOrigin::External`, so it is subject to
/// all admission policies (including operator-configurable floors).
#[cfg(feature = "c-kzg")]
pub async fn add_blob_transaction_to_pool(
&self,
transaction: EIP4844Transaction,
blobs_bundle: BlobsBundle,
) -> Result<H256, MempoolError> {
self.add_blob_transaction_to_pool_with_origin(transaction, blobs_bundle, TxOrigin::External)
.await
}

/// Add a locally-submitted blob transaction (e.g. via `eth_sendRawTransaction`)
/// to the mempool. The transaction is validated as `TxOrigin::Local`, so it may
/// bypass operator-friendly admission gates.
#[cfg(feature = "c-kzg")]
pub async fn add_local_blob_transaction_to_pool(
&self,
transaction: EIP4844Transaction,
blobs_bundle: BlobsBundle,
) -> Result<H256, MempoolError> {
self.add_blob_transaction_to_pool_with_origin(transaction, blobs_bundle, TxOrigin::Local)
.await
}

#[cfg(feature = "c-kzg")]
async fn add_blob_transaction_to_pool_with_origin(
&self,
transaction: EIP4844Transaction,
blobs_bundle: BlobsBundle,
origin: TxOrigin,
) -> Result<H256, MempoolError> {
let fork = self.current_fork().await?;

Expand Down Expand Up @@ -2343,7 +2370,10 @@ impl Blockchain {
let sender = transaction.sender(&NativeCrypto)?;

// Validate transaction
if let Some(tx_to_replace) = self.validate_transaction(&transaction, sender).await? {
if let Some(tx_to_replace) = self
.validate_transaction(&transaction, sender, origin)
.await?
{
self.remove_transaction_from_pool(&tx_to_replace)?;
}

Expand All @@ -2355,10 +2385,33 @@ impl Blockchain {
Ok(hash)
}

/// Add a transaction to the mempool checking that the transaction is valid
/// Add a P2P-received transaction to the mempool.
///
/// The transaction is validated as `TxOrigin::External`, so it is subject to
/// all admission policies (including operator-configurable floors).
pub async fn add_transaction_to_pool(
&self,
transaction: Transaction,
) -> Result<H256, MempoolError> {
self.add_transaction_to_pool_with_origin(transaction, TxOrigin::External)
.await
}

/// Add a locally-submitted transaction (e.g. via `eth_sendRawTransaction`) to
/// the mempool. The transaction is validated as `TxOrigin::Local`, so it may
/// bypass operator-friendly admission gates.
pub async fn add_local_transaction_to_pool(
&self,
transaction: Transaction,
) -> Result<H256, MempoolError> {
self.add_transaction_to_pool_with_origin(transaction, TxOrigin::Local)
.await
}

async fn add_transaction_to_pool_with_origin(
&self,
transaction: Transaction,
origin: TxOrigin,
) -> Result<H256, MempoolError> {
// Blob transactions should be submitted via add_blob_transaction along with the corresponding blobs bundle
if matches!(transaction, Transaction::EIP4844Transaction(_)) {
Expand All @@ -2382,7 +2435,10 @@ impl Blockchain {
}
let sender = transaction.sender(&NativeCrypto)?;
// Validate transaction
if let Some(tx_to_replace) = self.validate_transaction(&transaction, sender).await? {
if let Some(tx_to_replace) = self
.validate_transaction(&transaction, sender, origin)
.await?
{
self.remove_transaction_from_pool(&tx_to_replace)?;
}

Expand Down Expand Up @@ -2437,12 +2493,27 @@ impl Blockchain {
5. Ensure the transactor is able to add a new transaction. The number of transactions sent by an account may be limited by a certain configured value

*/
/// Returns the hash of the transaction to replace in case the nonce already exists
/// Returns the hash of the transaction to replace in case the nonce already exists.
///
/// `origin` records whether the transaction came in via RPC (`TxOrigin::Local`)
/// or P2P (`TxOrigin::External`). The plumbing exists so future admission
/// gates can apply origin-aware exemptions (e.g. PR #6604's min-tip floor
/// will skip local txs by default).
pub async fn validate_transaction(
&self,
tx: &Transaction,
sender: Address,
_origin: TxOrigin,
) -> Result<Option<H256>, MempoolError> {
// TODO(#6604): when the min-tip floor lands (PR #6604 adds
// `BlockchainOptions::min_tip_wei` and a `gas_tip_cap < min_tip_wei`
// rejection), drop the underscore from `_origin` and wrap the floor
// check with `if origin != TxOrigin::Local { ... }` so
// `TxOrigin::Local` transactions bypass the floor. The opt-out
// operator flag (e.g. `--mempool.nolocals`) should land in the same
// PR that wires the exemption — adding it now would expose a no-op
// operator knob.

let nonce = tx.nonce();

if matches!(tx, &Transaction::PrivilegedL2Transaction(_)) {
Expand Down
14 changes: 14 additions & 0 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ use ethrex_storage::error::StoreError;
use ethrex_vm::{intrinsic_gas_dimensions, intrinsic_gas_floor};
use tracing::warn;

/// Provenance of a transaction entering the mempool.
///
/// Used by admission validation to apply origin-specific policies (e.g. the
/// min-tip floor only applies to `External` txs unless `--mempool.nolocals` is
/// set). Orthogonal to propagation policy: a `Local` tx is still gossiped to
/// peers unless a separate non-propagation flag is set.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxOrigin {
/// Submitted by an operator-controlled RPC client (e.g. `eth_sendRawTransaction`).
Local,
/// Received from a P2P peer.
External,
}

#[derive(Debug, Default)]
struct MempoolInner {
broadcast_pool: FxHashSet<H256>,
Expand Down
2 changes: 2 additions & 0 deletions crates/networking/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ redundant_clone = "warn"
[features]
jemalloc_profiling = ["dep:jemalloc_pprof"]
eip-8025 = ["ethrex-blockchain/eip-8025", "ethrex-common/eip-8025"]
# Forward to ethrex-blockchain so RPC code can `cfg`-gate the EIP-4844 path.
c-kzg = ["ethrex-blockchain/c-kzg", "ethrex-common/c-kzg"]
42 changes: 28 additions & 14 deletions crates/networking/rpc/eth/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,20 +606,34 @@ impl RpcHandler for SendRawTransactionRequest {
}

async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
let hash = if let SendRawTransactionRequest::EIP4844(wrapped_blob_tx) = self {
context
.blockchain
.add_blob_transaction_to_pool(
wrapped_blob_tx.tx.clone(),
wrapped_blob_tx.blobs_bundle.clone(),
)
.await
} else {
context
.blockchain
.add_transaction_to_pool(self.to_transaction())
.await
}?;
// RPC-submitted transactions are tagged as `TxOrigin::Local` so they may
// bypass admission gates (such as the min-tip floor) intended to protect
// against P2P spam. See `Blockchain::add_local_transaction_to_pool`.
let hash = match self {
#[cfg(feature = "c-kzg")]
SendRawTransactionRequest::EIP4844(wrapped_blob_tx) => {
context
.blockchain
.add_local_blob_transaction_to_pool(
wrapped_blob_tx.tx.clone(),
wrapped_blob_tx.blobs_bundle.clone(),
)
.await?
}
#[cfg(not(feature = "c-kzg"))]
SendRawTransactionRequest::EIP4844(_) => {
return Err(RpcErr::Internal(
"EIP-4844 transactions require the c-kzg feature to be enabled at build time"
.to_string(),
));
}
_ => {
context
.blockchain
.add_local_transaction_to_pool(self.to_transaction())
.await?
}
};
serde_json::to_value(format!("{hash:#x}"))
.map_err(|error| RpcErr::Internal(error.to_string()))
}
Expand Down
86 changes: 79 additions & 7 deletions test/tests/blockchain/mempool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ethrex_blockchain::constants::{
TX_INIT_CODE_WORD_GAS_COST,
};
use ethrex_blockchain::error::MempoolError;
use ethrex_blockchain::mempool::{Mempool, transaction_intrinsic_gas};
use ethrex_blockchain::mempool::{Mempool, TxOrigin, transaction_intrinsic_gas};
use ethrex_crypto::NativeCrypto;
use rustc_hash::FxHashMap;

Expand Down Expand Up @@ -293,7 +293,7 @@ async fn transaction_with_big_init_code_in_shanghai_fails() {
};

let tx = Transaction::EIP1559Transaction(tx);
let validation = blockchain.validate_transaction(&tx, Address::random());
let validation = blockchain.validate_transaction(&tx, Address::random(), TxOrigin::External);
assert!(matches!(
validation.await,
Err(MempoolError::TxMaxInitCodeSizeError)
Expand All @@ -320,7 +320,7 @@ async fn transaction_with_gas_limit_higher_than_of_the_block_should_fail() {
};

let tx = Transaction::EIP1559Transaction(tx);
let validation = blockchain.validate_transaction(&tx, Address::random());
let validation = blockchain.validate_transaction(&tx, Address::random(), TxOrigin::External);
assert!(matches!(
validation.await,
Err(MempoolError::TxGasLimitExceededError)
Expand All @@ -347,7 +347,7 @@ async fn transaction_with_priority_fee_higher_than_gas_fee_should_fail() {
};

let tx = Transaction::EIP1559Transaction(tx);
let validation = blockchain.validate_transaction(&tx, Address::random());
let validation = blockchain.validate_transaction(&tx, Address::random(), TxOrigin::External);
assert!(matches!(
validation.await,
Err(MempoolError::TxTipAboveFeeCapError)
Expand All @@ -374,7 +374,7 @@ async fn transaction_with_gas_limit_lower_than_intrinsic_gas_should_fail() {
};

let tx = Transaction::EIP1559Transaction(tx);
let validation = blockchain.validate_transaction(&tx, Address::random());
let validation = blockchain.validate_transaction(&tx, Address::random(), TxOrigin::External);
assert!(matches!(
validation.await,
Err(MempoolError::TxIntrinsicGasCostAboveLimitError)
Expand All @@ -401,7 +401,7 @@ async fn transaction_with_blob_base_fee_below_min_should_fail() {
};

let tx = Transaction::EIP4844Transaction(tx);
let validation = blockchain.validate_transaction(&tx, Address::random());
let validation = blockchain.validate_transaction(&tx, Address::random(), TxOrigin::External);
assert!(matches!(
validation.await,
Err(MempoolError::TxBlobBaseFeeTooLowError)
Expand All @@ -427,7 +427,7 @@ async fn validate_transaction_rejects_oversize_non_blob() {
});

let res = blockchain
.validate_transaction(&tx, Address::random())
.validate_transaction(&tx, Address::random(), TxOrigin::External)
.await;
match res {
Err(MempoolError::TxSizeExceeded { actual, limit }) => {
Expand Down Expand Up @@ -548,3 +548,75 @@ fn blobs_bundle_insert_and_remove() {
vec![None]
);
}

#[tokio::test]
async fn validate_transaction_accepts_both_origins() {
// Threading check: `validate_transaction` must accept both origins. With no
// origin-gated rules yet wired on `main`, Local and External should still
// produce the same downstream error for an identical fixture (proving that
// adding the parameter did not accidentally diverge the validation paths).
let (config, header) = build_basic_config_and_header(false, false);
let store = setup_storage(config, header).await.expect("Storage setup");
let blockchain = Blockchain::default_with_store(store);

let tx = EIP1559Transaction {
nonce: 3,
max_priority_fee_per_gas: 0,
max_fee_per_gas: 0,
gas_limit: 100_000_001, // forces TxGasLimitExceededError before any origin-gated rule could fire
to: TxKind::Call(Address::from_low_u64_be(1)),
value: U256::zero(),
data: Bytes::default(),
access_list: Default::default(),
..Default::default()
};
let tx = Transaction::EIP1559Transaction(tx);
let sender = Address::random();

let local = blockchain
.validate_transaction(&tx, sender, TxOrigin::Local)
.await;
let external = blockchain
.validate_transaction(&tx, sender, TxOrigin::External)
.await;

assert!(matches!(local, Err(MempoolError::TxGasLimitExceededError)));
assert!(matches!(
external,
Err(MempoolError::TxGasLimitExceededError)
));
}

#[tokio::test]
async fn add_local_transaction_to_pool_routes_through_validation() {
// Threading check: the RPC entry point must route through
// `validate_transaction`, not silently bypass it. Use a tx whose
// sender is recoverable but whose account doesn't exist in storage;
// `validate_transaction` rejects this specifically with
// `NotEnoughBalance`. Asserting that exact variant proves we hit
// the validation path rather than some earlier check.
let (config, header) = build_basic_config_and_header(false, false);
let store = setup_storage(config, header).await.expect("Storage setup");
let blockchain = Blockchain::default_with_store(store);

// Canonical legacy tx (sender derivable from signature). Gas limit
// 63_000 is well below the test header's 100_000_000 cap, so the
// gas-limit check passes; the sender isn't seeded into the store, so
// `validate_transaction` reaches the balance check and returns
// `NotEnoughBalance` — the proof point that we routed through
// validation rather than silently inserting.
let tx = Transaction::decode_canonical(&hex::decode("f86d80843baa0c4082f618946177843db3138ae69679a54b95cf345ed759450d870aa87bee538000808360306ba0151ccc02146b9b11adf516e6787b59acae3e76544fdcd75e77e67c6b598ce65da064c5dd5aae2fbb535830ebbdad0234975cd7ece3562013b63ea18cc0df6c97d4").unwrap()).unwrap();

let result = blockchain.add_local_transaction_to_pool(tx).await;
// The minimal test store doesn't seed account state, so the balance
// lookup may surface as `StoreError` (state-root missing) rather than
// `NotEnoughBalance`. Either outcome proves the call reached the
// validation path — what the test must NOT see is `Ok(_)`.
assert!(
matches!(
result,
Err(MempoolError::NotEnoughBalance) | Err(MempoolError::StoreError(_))
),
"expected an account-lookup error from validate_transaction, got {result:?}",
);
}
Loading