diff --git a/src/message_pool/config.rs b/src/message_pool/config.rs index 7bc03cc7c6df..c0211a8631f7 100644 --- a/src/message_pool/config.rs +++ b/src/message_pool/config.rs @@ -43,12 +43,6 @@ impl Default for MpoolConfig { } impl MpoolConfig { - #[cfg(test)] - /// Saves message pool `config` to the database, to easily reload. - pub fn save_config(&self, store: &DB) -> anyhow::Result<()> { - store.write_bin(MPOOL_CONFIG_KEY, &fvm_ipld_encoding::to_vec(&self)?) - } - /// Returns the low limit capacity of messages to allocate. pub fn size_limit_low(&self) -> i64 { self.size_limit_low diff --git a/src/message_pool/errors.rs b/src/message_pool/errors.rs index b033247b05c6..f6c3955be982 100644 --- a/src/message_pool/errors.rs +++ b/src/message_pool/errors.rs @@ -15,8 +15,6 @@ pub enum Error { GasPriceTooLow, #[error("gas fee cap is too low")] GasFeeCapTooLow, - #[error("Cannot send more Filecoin than will ever exist")] - MessageValueTooHigh, #[error("Message sequence too low")] SequenceTooLow, #[error("Not enough funds to execute transaction")] diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 6e8119d5b54b..8a301c52e54a 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -7,385 +7,51 @@ pub(in crate::message_pool) mod msg_pool; pub(in crate::message_pool) mod msg_set; pub(in crate::message_pool) mod pending_store; pub(in crate::message_pool) mod provider; +pub(in crate::message_pool) mod reorg; pub(in crate::message_pool) mod republish; pub mod selection; #[cfg(test)] pub mod test_provider; pub(in crate::message_pool) mod utils; + // TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941 #[allow(unused_imports)] pub use events::MpoolUpdate; -use std::{borrow::BorrowMut, cmp::Ordering}; - -use crate::blocks::Tipset; -use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic}; -use crate::message::{MessageRead as _, SignedMessage}; -use crate::networks::ChainConfig; -use crate::prelude::*; -use crate::shim::{address::Address, crypto::Signature}; -use crate::state_manager::IdToAddressCache; -use crate::utils::cache::SizeTrackingLruCache; -use crate::utils::get_size::CidWrapper; -use ahash::{HashMap, HashMapExt, HashSet}; -use fvm_ipld_encoding::to_vec; -use parking_lot::RwLock as SyncRwLock; -use tracing::error; -use utils::{get_base_fee_lower_bound, recover_sig}; - -use super::errors::Error; -use crate::message_pool::msgpool::msg_pool::StateNonceCacheKey; -use crate::message_pool::{ - msg_chain::{Chains, create_message_chains}, - msg_pool::{StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key}, - msgpool::{pending_store::PendingStore, republish::RepublishState}, - provider::Provider, -}; +pub(in crate::message_pool) use utils::recover_sig; const REPLACE_BY_FEE_RATIO: f32 = 1.25; const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64; const RBF_DENOM: u64 = 256; const BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE: i64 = 100; -const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10; -const REPUB_MSG_LIMIT: usize = 30; const MIN_GAS: u64 = 1298450; -#[allow(clippy::too_many_arguments)] -async fn republish_pending_messages( - api: &T, - network_sender: &flume::Sender, - pending_store: &PendingStore, - cur_tipset: &SyncRwLock, - republish: &RepublishState, - local_addrs: &SyncRwLock>, - chain_config: &ChainConfig, -) -> Result<(), Error> -where - T: Provider, -{ - let ts = cur_tipset.read().shallow_clone(); - let mut pending_map: HashMap> = HashMap::new(); - - // Only republish messages from local addresses, i.e., transactions which were - // sent to this node directly. - for actor in local_addrs.read().iter() { - if let Some(mset) = pending_store.snapshot_for(actor) - && !mset.msgs.is_empty() - { - pending_map.insert(*actor, mset.msgs); - } - } - - let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?; - - let network_name = chain_config.network.genesis_name(); - for m in msgs.iter() { - let mb = to_vec(m)?; - network_sender - .send_async(NetworkMessage::PubsubMessage { - topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")), - message: mb, - }) - .await - .map_err(|_| Error::Other("Network receiver dropped".to_string()))?; - } - - let republished_cids: Vec<_> = msgs.iter().map(|m| m.cid()).collect(); - republish.replace_with(republished_cids); - - Ok(()) -} - -/// Select messages from the mempool to be included in the next block that -/// builds on a given base tipset. The messages should be eligible for inclusion -/// based on their sequences and the overall number of them should observe block -/// gas limits. -fn select_messages_for_block( - api: &T, - chain_config: &ChainConfig, - base: &Tipset, - pending: HashMap>, -) -> Result, Error> -where - T: Provider, -{ - let mut msgs: Vec = vec![]; - - let base_fee = api.chain_compute_base_fee(base)?; - let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR); - - if pending.is_empty() { - return Ok(msgs); - } - - let mut chains = Chains::new(); - for (actor, mset) in pending.iter() { - create_message_chains( - api, - actor, - mset, - &base_fee_lower_bound, - base, - &mut chains, - chain_config, - )?; - } - - if chains.is_empty() { - return Ok(msgs); - } - - chains.sort(false); - - let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT; - let mut i = 0; - 'l: while let Some(chain) = chains.get_mut_at(i) { - // we can exceed this if we have picked (some) longer chain already - if msgs.len() > REPUB_MSG_LIMIT { - break; - } - - if gas_limit <= MIN_GAS { - break; - } - - // check if chain has been invalidated - if !chain.valid { - i += 1; - continue; - } - - // check if fits in block - if chain.gas_limit <= gas_limit { - // check the baseFee lower bound -- only republish messages that can be included - // in the chain within the next 20 blocks. - for m in chain.msgs.iter() { - if m.gas_fee_cap() < base_fee_lower_bound { - let key = chains.get_key_at(i); - chains.invalidate(key); - continue 'l; - } - gas_limit = gas_limit.saturating_sub(m.gas_limit()); - msgs.push(m.clone()); - } - - i += 1; - continue; - } - - // we can't fit the current chain but there is gas to spare - // trim it and push it down - chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee); - let mut j = i; - while j < chains.len() - 1 { - #[allow(clippy::indexing_slicing)] - if chains[j].compare(&chains[j + 1]) == Ordering::Less { - break; - } - chains.key_vec.swap(i, i + 1); - j += 1; - } - } - - if msgs.len() > REPUB_MSG_LIMIT { - msgs.truncate(REPUB_MSG_LIMIT); - } - - Ok(msgs) -} - -/// Revert and/or apply tipsets to the message pool. This function should be -/// called every time that there is a head change in the message pool. -/// -/// - **Apply**: messages included in the new tipset are removed from the pending -/// pool via [`MsgSet::rm`] with `applied=true`. -/// - **Revert**: messages from the reverted tipset are re-added to the pool with -/// [`StrictnessPolicy::Relaxed`] and [`TrustPolicy::Trusted`], allowing them back without -/// nonce gap restrictions. -/// -/// The state nonce cache is naturally invalidated when the tipset changes, since -/// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey). -#[allow(clippy::too_many_arguments)] -pub(in crate::message_pool) fn head_change( - api: &T, - bls_sig_cache: &SizeTrackingLruCache, - republish: &RepublishState, - pending_store: &PendingStore, - cur_tipset: &SyncRwLock, - key_cache: &IdToAddressCache, - state_nonce_cache: &SizeTrackingLruCache, - revert: Vec, - apply: Vec, -) -> Result<(), Error> -where - T: Provider + 'static, -{ - let mut repub = false; - let mut rmsgs: HashMap> = HashMap::new(); - for ts in revert { - let Ok(pts) = api.load_tipset(ts.parents()) else { - tracing::error!("error loading reverted tipset parent"); - continue; - }; - *cur_tipset.write() = pts; - - let mut msgs: Vec = Vec::new(); - for block in ts.block_headers() { - let Ok((umsg, smsgs)) = api.messages_for_block(block) else { - tracing::error!("error retrieving messages for reverted block"); - continue; - }; - msgs.extend(smsgs); - for msg in umsg { - let msg_cid = msg.cid(); - let Ok(smsg) = recover_sig(bls_sig_cache, msg) else { - tracing::debug!("could not recover signature for bls message {}", msg_cid); - continue; - }; - msgs.push(smsg) - } - } - - for msg in msgs { - add_to_selected_msgs(msg, rmsgs.borrow_mut()); - } - } - - for ts in apply { - let mpool_ctx = MpoolCtx { - api, - key_cache, - pending_store, - ts: &ts, - }; - for b in ts.block_headers() { - let Ok((msgs, smsgs)) = api.messages_for_block(b) else { - tracing::error!("error retrieving messages for block"); - continue; - }; - - for msg in smsgs { - mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?; - if !repub && republish.was_republished(&msg.cid()) { - repub = true; - } - } - for msg in msgs { - mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?; - if !repub && republish.was_republished(&msg.cid()) { - repub = true; - } - } - } - *cur_tipset.write() = ts; - } - if repub { - republish.trigger()?; - } - let cur_ts = cur_tipset.read().shallow_clone(); - let mpool_ctx = MpoolCtx { - api, - key_cache, - pending_store, - ts: &cur_ts, - }; - for (_, hm) in rmsgs { - for (_, msg) in hm { - let sequence = match mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from()) { - Ok(seq) => seq, - Err(e) => { - tracing::debug!("Failed to get the state sequence: {}", e); - continue; - } - }; - if let Err(e) = add_helper( - api, - bls_sig_cache, - pending_store, - key_cache, - &cur_ts, - msg, - sequence, - TrustPolicy::Trusted, - StrictnessPolicy::Relaxed, - ) { - error!("Failed to read message from reorg to mpool: {}", e); - } - } - } - Ok(()) -} - -pub(in crate::message_pool) struct MpoolCtx<'a, T> { - pub api: &'a T, - pub key_cache: &'a IdToAddressCache, - pub pending_store: &'a PendingStore, - pub ts: &'a Tipset, -} - -impl MpoolCtx<'_, T> { - /// Remove a message from the selected messages map (`rmsgs`). If the - /// message is not there, fall back to removing it from the pending store. - pub(in crate::message_pool) fn remove_from_selected_msgs( - &self, - from: &Address, - sequence: u64, - rmsgs: &mut HashMap>, - ) -> Result<(), Error> { - if rmsgs - .get_mut(from) - .and_then(|temp| temp.remove(&sequence)) - .is_none() - && let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts) - .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}")) - { - let _ = self.pending_store.remove(&resolved, sequence, true); - } - Ok(()) - } - - /// Get the state nonce for an address, accounting for messages already - /// included in the current tipset. - pub(in crate::message_pool) fn get_state_sequence( - &self, - state_nonce_cache: &SizeTrackingLruCache, - addr: &Address, - ) -> Result { - msg_pool::get_state_sequence(self.api, self.key_cache, state_nonce_cache, addr, self.ts) - } -} - -/// This is a helper function for `head_change`. This method will add a signed -/// message to the given messages selected by priority `HashMap`. -pub(in crate::message_pool) fn add_to_selected_msgs( - m: SignedMessage, - rmsgs: &mut HashMap>, -) { - rmsgs.entry(m.from()).or_default().insert(m.sequence(), m); -} - #[cfg(test)] pub mod tests { use std::{borrow::BorrowMut, time::Duration}; use crate::blocks::Tipset; use crate::key_management::{KeyStore, KeyStoreConfig, Wallet}; - use crate::message::SignedMessage; + use crate::libp2p::NetworkMessage; + use crate::message::{MessageRead as _, SignedMessage}; use crate::networks::ChainConfig; use crate::shim::{ address::Address, - crypto::SignatureType, + crypto::{Signature, SignatureType}, econ::TokenAmount, message::{Message, Message_v3}, }; + use ahash::{HashMap, HashMapExt}; use num_traits::Zero; use test_provider::*; use tokio::task::JoinSet; use super::*; use crate::message_pool::{ + Error, msg_chain::{Chains, create_message_chains}, msg_pool::MessagePool, + provider::Provider, }; struct TestMpool { @@ -681,11 +347,11 @@ pub mod tests { // Pending map should be keyed by key_addr, not id_addr. assert!( - mpool.pending_store.snapshot_for(&key_addr).is_some(), + mpool.pending.snapshot_for(&key_addr).is_some(), "pending should be keyed by resolved key address" ); assert!( - mpool.pending_store.snapshot_for(&id_addr).is_none(), + mpool.pending.snapshot_for(&id_addr).is_none(), "pending should NOT have entry under raw ID address" ); } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index a28e0c057025..7f950420c2ff 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -8,11 +8,19 @@ use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; -#[cfg(test)] -use crate::db::SettingsStore; use crate::eth::is_valid_eth_tx_for_sending; use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic}; use crate::message::{ChainMessage, MessageRead as _, SignedMessage, valid_for_block_inclusion}; +use crate::message_pool::{ + config::MpoolConfig, + errors::Error, + msgpool::{ + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore, + recover_sig, republish::RepublishState, + }, + provider::Provider, + utils::get_base_fee_lower_bound, +}; use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION}; use crate::prelude::*; use crate::rpc::eth::types::EthAddress; @@ -21,16 +29,18 @@ use crate::shim::{ crypto::{Signature, SignatureType}, econ::TokenAmount, gas::{Gas, price_list_by_network_version}, + state_tree::ActorState, }; use crate::state_manager::IdToAddressCache; use crate::state_manager::utils::is_valid_for_sending; use crate::utils::cache::SizeTrackingLruCache; -use crate::utils::get_size::{CidWrapper, GetSize}; +use crate::utils::get_size::CidWrapper; use ahash::HashSet; use anyhow::Context as _; use cid::Cid; use futures::StreamExt; use fvm_ipld_encoding::to_vec; +use get_size2::GetSize; use itertools::Itertools; use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; @@ -43,17 +53,12 @@ use tokio::{ }; use tracing::warn; -use crate::message_pool::{ - config::MpoolConfig, - errors::Error, - head_change, - msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore, - recover_sig, republish::RepublishState, republish_pending_messages, - }, - provider::Provider, - utils::get_base_fee_lower_bound, -}; +/// Maximum size of a serialized message in bytes. Anti-DoS measure to keep +/// the pool from ingesting pathologically large messages. +const MAX_MESSAGE_SIZE: usize = 64 << 10; // 64 KiB + +pub(in crate::message_pool) const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000; +pub(in crate::message_pool) const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10; // LruCache sizes have been taken from the lotus implementation const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize); @@ -62,21 +67,15 @@ const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize); const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize); #[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)] -pub(crate) struct StateNonceCacheKey { +pub(in crate::message_pool) struct StateNonceCacheKey { tipset_key: TipsetKey, addr: Address, } -pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000; -pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10; -/// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent -/// large messages from being added to the message pool. -const MAX_MESSAGE_SIZE: usize = 64 << 10; // 64 KiB - /// Trust policy for whether a message is from a trusted or untrusted source. /// Untrusted sources are subject to stricter limits. #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum TrustPolicy { +pub(in crate::message_pool) enum TrustPolicy { Trusted, Untrusted, } @@ -85,10 +84,10 @@ pub use super::msg_set::{MsgSetLimits, StrictnessPolicy}; /// LRU caches owned by [`MessagePool`]. pub(in crate::message_pool) struct Caches { - pub bls_sig: SizeTrackingLruCache, - pub sig_val: SizeTrackingLruCache, - pub key: IdToAddressCache, - pub state_nonce: SizeTrackingLruCache, + pub(in crate::message_pool) bls_sig: SizeTrackingLruCache, + pub(in crate::message_pool) sig_val: SizeTrackingLruCache, + pub(in crate::message_pool) key: IdToAddressCache, + pub(in crate::message_pool) state_nonce: SizeTrackingLruCache, } impl Caches { @@ -122,33 +121,33 @@ impl ShallowClone for Caches { pub struct MessagePool { /// Pending messages, keyed by resolved-key address, together with the /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`]. - pub(in crate::message_pool) pending_store: PendingStore, + pub(in crate::message_pool) pending: PendingStore, pub(in crate::message_pool) caches: Caches, /// Resolved-key senders of locally submitted messages. pub(in crate::message_pool) local_addrs: Arc>>, /// The current tipset (a set of blocks) - pub cur_tipset: Arc>, + pub(in crate::message_pool) cur_tipset: Arc>, /// The underlying provider - pub api: Arc, + pub(in crate::message_pool) api: Arc, /// Sender half to send messages to other components - pub network_sender: flume::Sender, + pub(in crate::message_pool) network_sender: flume::Sender, /// Republish coordination state pub(in crate::message_pool) republish: Arc, - /// Configurable parameters of the message pool - pub config: Arc, + /// Configurable parameters of the message pool. + pub(in crate::message_pool) config: Arc, /// Chain configuration - pub chain_config: Arc, + pub(in crate::message_pool) chain_config: Arc, } impl ShallowClone for MessagePool { fn shallow_clone(&self) -> Self { Self { + pending: self.pending.shallow_clone(), + caches: self.caches.shallow_clone(), local_addrs: self.local_addrs.shallow_clone(), - pending_store: self.pending_store.shallow_clone(), cur_tipset: self.cur_tipset.shallow_clone(), api: self.api.shallow_clone(), network_sender: self.network_sender.clone(), - caches: self.caches.shallow_clone(), republish: self.republish.shallow_clone(), config: self.config.shallow_clone(), chain_config: self.chain_config.shallow_clone(), @@ -177,49 +176,6 @@ pub(in crate::message_pool) fn resolve_to_key( Ok(resolved) } -/// Get the state nonce for an address, accounting for messages already included in `cur_ts`. -pub(in crate::message_pool) fn get_state_sequence( - api: &T, - key_cache: &IdToAddressCache, - state_nonce_cache: &SizeTrackingLruCache, - addr: &Address, - cur_ts: &Tipset, -) -> Result { - let nk = StateNonceCacheKey { - tipset_key: cur_ts.key().clone(), - addr: *addr, - }; - - if let Some(cached) = state_nonce_cache.get_cloned(&nk) { - return Ok(cached); - } - - let actor = api.get_actor_after(addr, cur_ts)?; - let mut next_nonce = actor.sequence; - - if let (Ok(resolved), Ok(messages)) = ( - resolve_to_key(api, key_cache, addr, cur_ts) - .inspect_err(|e| tracing::warn!(%addr, "failed to resolve address to key: {e:#}")), - api.messages_for_tipset(cur_ts) - .inspect_err(|e| tracing::warn!("failed to get messages for tipset: {e:#}")), - ) { - for msg in messages.iter() { - if let Ok(from) = resolve_to_key(api, key_cache, &msg.from(), cur_ts).inspect_err( - |e| tracing::warn!(from = %msg.from(), "failed to resolve message sender: {e:#}"), - ) && from == resolved - { - let n = msg.sequence() + 1; - if n > next_nonce { - next_nonce = n; - } - } - } - } - - state_nonce_cache.push(nk, next_nonce); - Ok(next_nonce) -} - impl MessagePool where T: Provider, @@ -229,7 +185,11 @@ where self.cur_tipset.read().clone() } - pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result { + pub(in crate::message_pool) fn resolve_to_key( + &self, + addr: &Address, + cur_ts: &Tipset, + ) -> Result { resolve_to_key(self.api.as_ref(), &self.caches.key, addr, cur_ts) } @@ -242,32 +202,38 @@ where Ok(()) } - /// Push a signed message to the `MessagePool`. Additionally performs basic - /// checks on the validity of a message. - pub async fn push_internal( + /// Push a signed message to the `MessagePool`. Records the sender as + /// local and broadcasts on gossip if validation marks it publishable. + async fn push_internal( &self, msg: SignedMessage, trust_policy: TrustPolicy, ) -> Result { - self.check_message(&msg)?; let cid = msg.cid(); - let cur_ts = self.current_tipset(); - let publish = self.add_tipset(msg.clone(), &cur_ts, true, trust_policy)?; - let msg_ser = to_vec(&msg)?; - let network_name = self.chain_config.network.genesis_name(); + let publish = self.add_to_pool(msg.clone(), true, trust_policy)?; self.add_local(&msg)?; if publish { - self.network_sender - .send_async(NetworkMessage::PubsubMessage { - topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")), - message: msg_ser, - }) - .await - .map_err(|_| Error::Other("Network receiver dropped".to_string()))?; + self.publish_pubsub(&msg).await?; } Ok(cid) } + /// Broadcast a signed message on the network's `gossipsub` topic. + pub(in crate::message_pool) async fn publish_pubsub( + &self, + msg: &SignedMessage, + ) -> Result<(), Error> { + let message = to_vec(msg)?; + let network_name = self.chain_config.network.genesis_name(); + self.network_sender + .send_async(NetworkMessage::PubsubMessage { + topic: Topic::new(format!("{PUBSUB_MSG_STR}/{network_name}")), + message, + }) + .await + .map_err(|_| Error::Other("Network receiver dropped".to_string())) + } + /// Push a signed message to the `MessagePool` from an trusted source. pub async fn push(&self, msg: SignedMessage) -> Result { self.push_internal(msg, TrustPolicy::Trusted).await @@ -278,127 +244,86 @@ where self.push_internal(msg, TrustPolicy::Untrusted).await } - fn check_message(&self, msg: &SignedMessage) -> Result<(), Error> { - if to_vec(msg)?.len() > MAX_MESSAGE_SIZE { - return Err(Error::MessageTooBig); - } - let to = msg.message().to(); - if to.protocol() == Protocol::Delegated { - EthAddress::from_filecoin_address(&to).context(format!( - "message recipient {to} is a delegated address but not a valid Eth Address" - ))?; - } - valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?; - if msg.value() > *crate::shim::econ::TOTAL_FILECOIN { - return Err(Error::MessageValueTooHigh); - } - if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() { - return Err(Error::GasFeeCapTooLow); - } - self.verify_msg_sig(msg) - } - - /// This is a helper to push that will help to make sure that the message - /// fits the parameters to be pushed to the `MessagePool`. + /// Insert a message received via gossip. Runs full validation. Does + /// not publish back to the network. pub fn add(&self, msg: SignedMessage) -> Result<(), Error> { - self.check_message(&msg)?; - let ts = self.current_tipset(); - self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?; + self.add_to_pool(msg, false, TrustPolicy::Trusted)?; Ok(()) } - /// Verify the message signature. first check if it has already been - /// verified and put into cache. If it has not, then manually verify it - /// then put it into cache for future use. - fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> { - let cid = msg.cid(); - - if let Some(()) = self.caches.sig_val.get_cloned(&(cid).into()) { - return Ok(()); - } - - msg.verify(self.chain_config.eth_chain_id) - .map_err(|e| Error::Other(e.to_string()))?; + /// Message validation. + /// + /// Returns `publish: bool` — `true` when the message should be gossiped + /// after insertion; `false` when a local sender's message failed the + /// soft base-fee floor (kept locally, not broadcast). + pub(in crate::message_pool) fn validate_for_pool( + &self, + msg: &SignedMessage, + cur_ts: &Tipset, + local: bool, + ) -> Result { + validate_static(msg)?; + validate_signature(msg, &self.caches.sig_val, self.chain_config.eth_chain_id)?; - self.caches.sig_val.push(cid.into(), ()); + let expected_sequence = self.get_state_sequence(&msg.from(), cur_ts)?; + let sender_actor = self.api.get_actor_after(&msg.from(), cur_ts)?; - Ok(()) + validate_with_state( + msg, + &self.chain_config, + cur_ts, + &sender_actor, + expected_sequence, + local, + ) } - /// Verify the `state_sequence` and balance for the sender of the message - /// given then call `add_locked` to finish adding the `signed_message` - /// to pending. - fn add_tipset( + /// Validate `msg` and insert it into the pending pool. + /// + /// Returns `publish: bool` (see [`Self::validate_for_pool`]). + pub(in crate::message_pool) fn add_to_pool( &self, msg: SignedMessage, - cur_ts: &Tipset, local: bool, trust_policy: TrustPolicy, ) -> Result { - let sequence = self.get_state_sequence(&msg.from(), cur_ts)?; - - if sequence > msg.message().sequence { - return Err(Error::SequenceTooLow); - } - - let sender_actor = self.api.get_actor_after(&msg.message().from(), cur_ts)?; - - // This message can only be included in the next epoch and beyond, hence the +1. - let nv = self.chain_config.network_version(cur_ts.epoch() + 1); - let eth_chain_id = self.chain_config.eth_chain_id; - if msg.signature().signature_type() == SignatureType::Delegated - && !is_valid_eth_tx_for_sending(eth_chain_id, nv, &msg) - { - return Err(Error::Other( - "Invalid Ethereum message for the current network version".to_owned(), - )); - } - if !is_valid_for_sending(nv, &sender_actor) { - return Err(Error::Other( - "Sender actor is not a valid top-level sender".to_owned(), - )); - } - - let publish = verify_msg_before_add(&msg, cur_ts, local, &self.chain_config)?; - - let balance = self.get_state_balance(&msg.from(), cur_ts)?; - - let msg_balance = msg.required_funds(); - if balance < msg_balance { - return Err(Error::NotEnoughFunds); - } + let cur_ts = self.current_tipset(); + let publish = self.validate_for_pool(&msg, &cur_ts, local)?; let strictness = if local { StrictnessPolicy::Relaxed } else { StrictnessPolicy::Strict }; - self.add_helper(msg, trust_policy, strictness)?; + self.add_to_pool_unchecked(&cur_ts, msg, trust_policy, strictness)?; Ok(publish) } - /// Finish verifying signed message before adding it to the pending `mset` - /// hash-map. If an entry in the hash-map does not yet exist, create a - /// new `mset` that will correspond to the from message and push it to - /// the pending hash-map. - fn add_helper( + /// Insert a message into the pending pool *without* running validation + /// (size, sig, base-fee, sender-actor checks). The reorg replay path + /// uses this directly to restore reverted messages even when they no + /// longer pass the add-time filters. + pub(in crate::message_pool) fn add_to_pool_unchecked( &self, + cur_ts: &Tipset, msg: SignedMessage, trust_policy: TrustPolicy, strictness: StrictnessPolicy, ) -> Result<(), Error> { - let from = msg.from(); - let cur_ts = self.current_tipset(); - add_helper( - self.api.as_ref(), - &self.caches.bls_sig, - &self.pending_store, - &self.caches.key, - &cur_ts, - msg, - self.get_state_sequence(&from, &cur_ts)?, - trust_policy, - strictness, - ) + if msg.signature().signature_type() == SignatureType::Bls { + self.caches + .bls_sig + .push(msg.cid().into(), msg.signature().clone()); + } + + self.api + .put_message(&ChainMessage::Signed(msg.clone().into()))?; + self.api + .put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?; + + let sequence = self.get_state_sequence(&msg.from(), cur_ts)?; + let resolved_from = self.resolve_to_key(&msg.from(), cur_ts)?; + self.pending + .insert(resolved_from, msg, sequence, trust_policy, strictness) } /// Get the sequence for a given address, return Error if there is a failure @@ -410,8 +335,8 @@ where let resolved = self.resolve_to_key(addr, &cur_ts).ok(); let mset = resolved - .and_then(|r| self.pending_store.snapshot_for(&r)) - .or_else(|| self.pending_store.snapshot_for(addr)); + .and_then(|r| self.pending.snapshot_for(&r)) + .or_else(|| self.pending.snapshot_for(addr)); match mset { Some(mset) => { if sequence > mset.next_sequence { @@ -423,32 +348,58 @@ where } } - /// Get the state of the sequence for a given address in `cur_ts`. - fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result { - get_state_sequence( - self.api.as_ref(), - &self.caches.key, - &self.caches.state_nonce, - addr, - cur_ts, - ) - } + /// Get the state nonce for an address in `cur_ts`, accounting for + /// messages already included in that tipset. Cached by `(TipsetKey, + /// Address)`. + pub(in crate::message_pool) fn get_state_sequence( + &self, + addr: &Address, + cur_ts: &Tipset, + ) -> Result { + let nk = StateNonceCacheKey { + tipset_key: cur_ts.key().clone(), + addr: *addr, + }; - /// Get the state balance for the actor that corresponds to the supplied - /// address and tipset, if this actor does not exist, return an error. - fn get_state_balance(&self, addr: &Address, ts: &Tipset) -> Result { - let actor = self.api.get_actor_after(addr, ts)?; - Ok(TokenAmount::from(&actor.balance)) + if let Some(cached) = self.caches.state_nonce.get_cloned(&nk) { + return Ok(cached); + } + + let actor = self.api.get_actor_after(addr, cur_ts)?; + let mut next_nonce = actor.sequence; + + if let (Ok(resolved), Ok(messages)) = ( + self.resolve_to_key(addr, cur_ts) + .inspect_err(|e| tracing::warn!(%addr, "failed to resolve address to key: {e:#}")), + self.api + .messages_for_tipset(cur_ts) + .inspect_err(|e| tracing::warn!("failed to get messages for tipset: {e:#}")), + ) { + for msg in messages.iter() { + if let Ok(from) = self.resolve_to_key(&msg.from(), cur_ts).inspect_err( + |e| tracing::warn!(from = %msg.from(), "failed to resolve message sender: {e:#}"), + ) && from == resolved + { + let n = msg.sequence() + 1; + if n > next_nonce { + next_nonce = n; + } + } + } + } + + self.caches.state_nonce.push(nk, next_nonce); + Ok(next_nonce) } /// Return a tuple that contains a vector of all signed messages and the /// current tipset for self. pub fn pending(&self) -> (Vec, Tipset) { - let pending = self.pending_store.snapshot(); - let len = pending.values().map(|mset| mset.msgs.len()).sum(); + let snapshot = self.pending.snapshot(); + let len = snapshot.values().map(|mset| mset.msgs.len()).sum(); let mut out = Vec::with_capacity(len); - for mset in pending.into_values() { + for mset in snapshot.into_values() { out.extend( mset.msgs .into_values() @@ -470,7 +421,7 @@ where .resolve_to_key(a, &cur_ts) .inspect_err(|e| tracing::debug!(%a, "pending_for: failed to resolve address: {e:#}")) .ok()?; - let mset = self.pending_store.snapshot_for(&resolved)?; + let mset = self.pending.snapshot_for(&resolved)?; if mset.msgs.is_empty() { return None; } @@ -487,7 +438,7 @@ where /// removal from the pending pool. #[allow(dead_code)] // surfaces the MpoolUpdate API for external subscribers. pub fn subscribe_to_updates(&self) -> broadcast::Receiver { - self.pending_store.subscribe() + self.pending.subscribe() } /// Return Vector of signed messages given a block header for self. @@ -509,43 +460,8 @@ where Ok(msg_vec) } - #[cfg(test)] - pub fn get_config(&self) -> &MpoolConfig { - &self.config - } - - #[cfg(test)] - pub fn set_config( - &mut self, - db: &DB, - cfg: MpoolConfig, - ) -> Result<(), Error> { - cfg.save_config(db) - .map_err(|e| Error::Other(e.to_string()))?; - self.config = cfg.into(); - Ok(()) - } - - #[cfg(test)] - pub async fn apply_head_change( - &self, - revert: Vec, - apply: Vec, - ) -> Result<(), Error> - where - T: 'static, - { - head_change( - self.api.as_ref(), - &self.caches.bls_sig, - self.republish.as_ref(), - &self.pending_store, - self.cur_tipset.as_ref(), - &self.caches.key, - &self.caches.state_nonce, - revert, - apply, - ) + pub fn gas_limit_overestimation(&self) -> f64 { + self.config.gas_limit_overestimation } } @@ -560,170 +476,174 @@ where config: MpoolConfig, chain_config: Arc, services: &mut JoinSet>, - ) -> Result, Error> + ) -> Result where T: Provider, { // Per-actor limits are constant for the lifetime of this pool; capture // them once here rather than re-reading on every insert. - let pending_store = PendingStore::new(MsgSetLimits::new( + let pending = PendingStore::new(MsgSetLimits::new( api.max_actor_pending_messages(), api.max_untrusted_actor_pending_messages(), )); - let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); - let block_delay = chain_config.block_delay_secs; - + let cur_tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); + let republish_interval = + u64::from(10 * chain_config.block_delay_secs + chain_config.propagation_delay_secs); let (republish, repub_trigger_rx) = RepublishState::new(); + let mp = MessagePool { - pending_store, + pending, caches: Caches::new(), local_addrs: Arc::new(SyncRwLock::new(HashSet::default())), - cur_tipset: tipset, + republish: Arc::new(republish), + cur_tipset, api: Arc::new(api), - config: config.into(), network_sender, - republish: Arc::new(republish), - chain_config: Arc::clone(&chain_config), + config: Arc::new(config), + chain_config, }; - let mut head_changes_rx = mp.api.subscribe_head_changes(); - - let api = mp.api.clone(); - let bls_sig_cache = mp.caches.bls_sig.shallow_clone(); - let pending_store = mp.pending_store.shallow_clone(); - let republish = mp.republish.clone(); - let key_cache = mp.caches.key.shallow_clone(); - let state_nonce_cache = mp.caches.state_nonce.shallow_clone(); - - let current_ts = mp.cur_tipset.clone(); - // Reacts to new HeadChanges - services.spawn(async move { - loop { - match head_changes_rx.recv().await { - Ok(HeadChanges { reverts, applies }) => { - if let Err(e) = head_change( - api.as_ref(), - &bls_sig_cache, - republish.as_ref(), - &pending_store, - ¤t_ts, - &key_cache, - &state_nonce_cache, - reverts, - applies, - ) { - tracing::warn!("Error changing head: {e}"); + { + let mp = mp.shallow_clone(); + let mut head_changes_rx = mp.api.subscribe_head_changes(); + services.spawn(async move { + loop { + match head_changes_rx.recv().await { + Ok(HeadChanges { reverts, applies }) => { + if let Err(e) = mp.apply_head_change(reverts, applies).await { + tracing::warn!("Error changing head: {e}"); + } + } + Err(RecvError::Lagged(e)) => { + warn!("Head change subscriber lagged: skipping {e} events"); + } + Err(RecvError::Closed) => { + break Ok(()); } - } - Err(RecvError::Lagged(n)) => { - warn!("Head change subscriber lagged: skipping {n} events"); - } - Err(RecvError::Closed) => { - break Ok(()); } } - } - }); - - let api = mp.api.clone(); - let pending_store = mp.pending_store.shallow_clone(); - let cur_tipset = mp.cur_tipset.clone(); - let republish = mp.republish.clone(); - let local_addrs = mp.local_addrs.clone(); - let network_sender = Arc::new(mp.network_sender.clone()); - let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs); + }); + } + // Reacts to republishing requests - services.spawn(async move { - let mut repub_trigger_rx = repub_trigger_rx.stream(); - let mut interval = interval(Duration::from_secs(republish_interval)); - loop { - tokio::select! { - _ = interval.tick() => (), - _ = repub_trigger_rx.next() => (), - } - if let Err(e) = republish_pending_messages( - api.as_ref(), - network_sender.as_ref(), - &pending_store, - cur_tipset.as_ref(), - republish.as_ref(), - local_addrs.as_ref(), - &chain_config, - ) - .await - { - warn!("Failed to republish pending messages: {}", e.to_string()); + { + let mp = mp.shallow_clone(); + services.spawn(async move { + let mut repub_trigger_rx = repub_trigger_rx.stream(); + let mut interval = interval(Duration::from_secs(republish_interval)); + loop { + tokio::select! { + _ = interval.tick() => (), + _ = repub_trigger_rx.next() => (), + } + if let Err(e) = mp.run_republish_cycle().await { + warn!("Failed to republish pending messages: {}", e.to_string()); + } } - } - }); + }); + } + Ok(mp) } } -// Helpers for MessagePool +fn validate_static(msg: &SignedMessage) -> Result<(), Error> { + if to_vec(msg)?.len() > MAX_MESSAGE_SIZE { + return Err(Error::MessageTooBig); + } + let to = msg.message().to(); + if to.protocol() == Protocol::Delegated { + EthAddress::from_filecoin_address(&to).context(format!( + "message recipient {to} is a delegated address but not a valid Eth Address" + ))?; + } + valid_for_block_inclusion(msg.message(), Gas::new(0), NEWEST_NETWORK_VERSION)?; + if msg.gas_fee_cap().atto() < &MINIMUM_BASE_FEE.into() { + return Err(Error::GasFeeCapTooLow); + } + Ok(()) +} -/// Finish verifying the signed message before adding it to the pending `mset` -/// hash-map. If an entry in the hash-map does not yet exist, create a new -/// `mset` that will correspond to the form message and push it to the pending -/// hash-map. -#[allow(clippy::too_many_arguments)] -pub(in crate::message_pool) fn add_helper( - api: &T, - bls_sig_cache: &SizeTrackingLruCache, - pending_store: &PendingStore, - key_cache: &IdToAddressCache, +fn validate_signature( + msg: &SignedMessage, + sig_val_cache: &SizeTrackingLruCache, + eth_chain_id: u64, +) -> Result<(), Error> { + let cid = msg.cid(); + if sig_val_cache.get_cloned(&cid.into()).is_some() { + return Ok(()); + } + msg.verify(eth_chain_id) + .map_err(|e| Error::Other(e.to_string()))?; + sig_val_cache.push(cid.into(), ()); + Ok(()) +} + +/// Check the message against the pre-resolved chain state. +fn validate_with_state( + msg: &SignedMessage, + chain_config: &ChainConfig, cur_ts: &Tipset, - msg: SignedMessage, - sequence: u64, - trust_policy: TrustPolicy, - strictness: StrictnessPolicy, -) -> Result<(), Error> -where - T: Provider, -{ - if msg.signature().signature_type() == SignatureType::Bls { - bls_sig_cache.push(msg.cid().into(), msg.signature().clone()); + sender_actor: &ActorState, + expected_sequence: u64, + local: bool, +) -> Result { + if expected_sequence > msg.message().sequence { + return Err(Error::SequenceTooLow); } - api.put_message(&ChainMessage::Signed(msg.clone().into()))?; - api.put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?; + // The message can only be included in the next epoch and beyond, hence the +1. + let nv_next = chain_config.network_version(cur_ts.epoch() + 1); + if msg.is_delegated() && !is_valid_eth_tx_for_sending(chain_config.eth_chain_id, nv_next, msg) { + return Err(Error::Other( + "Invalid Ethereum message for the current network version".to_owned(), + )); + } + if !is_valid_for_sending(nv_next, sender_actor) { + return Err(Error::Other( + "Sender actor is not a valid top-level sender".to_owned(), + )); + } + + let nv_cur = chain_config.network_version(cur_ts.epoch()); + let min_gas = price_list_by_network_version(nv_cur).on_chain_message(msg.chain_length()?); + valid_for_block_inclusion(msg.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?; + + let publish = check_base_fee_floor(msg, cur_ts, local)?; - let resolved_from = resolve_to_key(api, key_cache, &msg.from(), cur_ts)?; - pending_store.insert(resolved_from, msg, sequence, trust_policy, strictness) + let balance = TokenAmount::from(&sender_actor.balance); + if balance < msg.required_funds() { + return Err(Error::NotEnoughFunds); + } + + Ok(publish) } -fn verify_msg_before_add( - m: &SignedMessage, +/// Base-Fee floor check. +pub(in crate::message_pool) fn check_base_fee_floor( + msg: &SignedMessage, cur_ts: &Tipset, local: bool, - chain_config: &ChainConfig, ) -> Result { - let epoch = cur_ts.epoch(); - let min_gas = price_list_by_network_version(chain_config.network_version(epoch)) - .on_chain_message(m.chain_length()?); - valid_for_block_inclusion(m.message(), min_gas.total(), NEWEST_NETWORK_VERSION)?; - if !cur_ts.block_headers().is_empty() { - let base_fee = &cur_ts.block_headers().first().parent_base_fee; - let base_fee_lower_bound = - get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE); - if m.gas_fee_cap() < base_fee_lower_bound { - if local { - warn!( - "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})", - m.gas_fee_cap(), - base_fee_lower_bound - ); - return Ok(false); - } - return Err(Error::SoftValidationFailure(format!( - "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})", - m.gas_fee_cap(), - base_fee_lower_bound - ))); - } + let base_fee = &cur_ts.block_headers().first().parent_base_fee; + let lb = get_base_fee_lower_bound(base_fee, BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE); + if msg.gas_fee_cap() >= lb { + return Ok(local); + } + if local { + warn!( + "local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound: {})", + msg.gas_fee_cap(), + lb + ); + return Ok(false); } - Ok(local) + Err(Error::SoftValidationFailure(format!( + "GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: {}, baseFeeLowerBound:{})", + msg.gas_fee_cap(), + lb + ))) } #[cfg(test)] @@ -742,6 +662,8 @@ mod tests { use super::*; use crate::shim::message::Message as ShimMessage; + use tokio::task::JoinSet; + fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage { SignedMessage::mock_bls_signed_message(ShimMessage { from, @@ -752,95 +674,91 @@ mod tests { }) } - /// Build a `PendingStore` sized from the [`TestApi`] provider's limits. - fn test_pending_store(api: &TestApi) -> PendingStore { - PendingStore::new(MsgSetLimits::new( - api.max_actor_pending_messages(), - api.max_untrusted_actor_pending_messages(), - )) + fn make_test_mpool(api: TestApi) -> (MessagePool, JoinSet>) { + let (tx, _rx) = flume::bounded(50); + let mut services = JoinSet::new(); + let mpool = MessagePool::new( + api, + tx, + Default::default(), + Default::default(), + &mut services, + ) + .unwrap(); + (mpool, services) } // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M // gas limit. There are no limits on a single message. - #[test] - fn add_helper_message_gas_limit_test() { + #[tokio::test] + async fn add_to_pool_unchecked_accepts_high_gas_limit() { let api = TestApi::default(); - let bls_sig_cache = SizeTrackingLruCache::new_mocked(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let pending_store = test_pending_store(&api); - let cur_ts = api.get_heaviest_tipset(); + let (mpool, _services) = make_test_mpool(api); + let cur_ts = mpool.current_tipset(); let message = ShimMessage { gas_limit: 666_666_666, ..ShimMessage::default() }; let msg = SignedMessage::mock_bls_signed_message(message); - let sequence = msg.message().sequence; - let res = add_helper( - &api, - &bls_sig_cache, - &pending_store, - &key_cache, + let res = mpool.add_to_pool_unchecked( &cur_ts, msg, - sequence, TrustPolicy::Trusted, StrictnessPolicy::Relaxed, ); assert!(res.is_ok()); } - #[test] - fn test_resolve_to_key_returns_non_id_unchanged() { + #[tokio::test] + async fn test_resolve_to_key_returns_non_id_unchanged() { let api = TestApi::default(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let ts = api.get_heaviest_tipset(); + let (mpool, _services) = make_test_mpool(api); + let ts = mpool.current_tipset(); let bls_addr = Address::new_bls(&[1u8; 48]).unwrap(); - let result = resolve_to_key(&api, &key_cache, &bls_addr, &ts).unwrap(); + let result = mpool.resolve_to_key(&bls_addr, &ts).unwrap(); assert_eq!(result, bls_addr); assert_eq!( - key_cache.len(), + mpool.caches.key.len(), 0, "cache should not be populated for non-ID addresses" ); } - #[test] - fn test_resolve_to_key_resolves_id_and_caches() { + #[tokio::test] + async fn test_resolve_to_key_resolves_id_and_caches() { let api = TestApi::default(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let ts = api.get_heaviest_tipset(); - let id_addr = Address::new_id(100); let key_addr = Address::new_bls(&[5u8; 48]).unwrap(); api.set_key_address_mapping(&id_addr, &key_addr); - let result = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap(); + let (mpool, _services) = make_test_mpool(api); + let ts = mpool.current_tipset(); + + let result = mpool.resolve_to_key(&id_addr, &ts).unwrap(); assert_eq!(result, key_addr); assert_eq!( - key_cache.len(), + mpool.caches.key.len(), 1, "cache should have one entry after resolution" ); // Second call should hit the cache (no API call needed) - let result2 = resolve_to_key(&api, &key_cache, &id_addr, &ts).unwrap(); + let result2 = mpool.resolve_to_key(&id_addr, &ts).unwrap(); assert_eq!(result2, key_addr); } - #[test] - fn test_add_helper_keys_pending_by_resolved_address() { + #[tokio::test] + async fn test_add_to_pool_unchecked_keys_pending_by_resolved_address() { let api = TestApi::default(); - let bls_sig_cache = SizeTrackingLruCache::new_mocked(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let pending_store = test_pending_store(&api); - let cur_ts = api.get_heaviest_tipset(); - let id_addr = Address::new_id(200); let key_addr = Address::new_bls(&[7u8; 48]).unwrap(); api.set_key_address_mapping(&id_addr, &key_addr); api.set_state_sequence(&key_addr, 0); + let (mpool, _services) = make_test_mpool(api); + let cur_ts = mpool.current_tipset(); + let message = ShimMessage { from: id_addr, gas_limit: 1_000_000, @@ -848,42 +766,36 @@ mod tests { }; let msg = SignedMessage::mock_bls_signed_message(message); - add_helper( - &api, - &bls_sig_cache, - &pending_store, - &key_cache, - &cur_ts, - msg, - 0, - TrustPolicy::Trusted, - StrictnessPolicy::Relaxed, - ) - .unwrap(); + mpool + .add_to_pool_unchecked( + &cur_ts, + msg, + TrustPolicy::Trusted, + StrictnessPolicy::Relaxed, + ) + .unwrap(); assert!( - pending_store.snapshot_for(&key_addr).is_some(), + mpool.pending.snapshot_for(&key_addr).is_some(), "pending should be keyed by the resolved key address" ); assert!( - pending_store.snapshot_for(&id_addr).is_none(), + mpool.pending.snapshot_for(&id_addr).is_none(), "pending should NOT have an entry under the raw ID address" ); } - #[test] - fn test_get_sequence_works_with_both_address_forms() { + #[tokio::test] + async fn test_get_sequence_works_with_both_address_forms() { let api = TestApi::default(); - let bls_sig_cache = SizeTrackingLruCache::new_mocked(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let pending_store = test_pending_store(&api); - let cur_ts = api.get_heaviest_tipset(); - let id_addr = Address::new_id(300); let key_addr = Address::new_bls(&[9u8; 48]).unwrap(); api.set_key_address_mapping(&id_addr, &key_addr); api.set_state_sequence(&key_addr, 0); + let (mpool, _services) = make_test_mpool(api); + let cur_ts = mpool.current_tipset(); + // Add two messages from the ID address for seq in 0..2 { let message = ShimMessage { @@ -893,26 +805,27 @@ mod tests { ..ShimMessage::default() }; let msg = SignedMessage::mock_bls_signed_message(message); - add_helper( - &api, - &bls_sig_cache, - &pending_store, - &key_cache, - &cur_ts, - msg, - 0, - TrustPolicy::Trusted, - StrictnessPolicy::Relaxed, - ) - .unwrap(); + mpool + .add_to_pool_unchecked( + &cur_ts, + msg, + TrustPolicy::Trusted, + StrictnessPolicy::Relaxed, + ) + .unwrap(); } - let state_seq = api.get_actor_after(&id_addr, &cur_ts).unwrap().sequence; - let resolved_for_id = resolve_to_key(&api, &key_cache, &id_addr, &cur_ts).unwrap(); - let resolved_for_key = resolve_to_key(&api, &key_cache, &key_addr, &cur_ts).unwrap(); + let state_seq = mpool + .api + .get_actor_after(&id_addr, &cur_ts) + .unwrap() + .sequence; + let resolved_for_id = mpool.resolve_to_key(&id_addr, &cur_ts).unwrap(); + let resolved_for_key = mpool.resolve_to_key(&key_addr, &cur_ts).unwrap(); assert_eq!(resolved_for_id, resolved_for_key); - let next_seq = pending_store + let next_seq = mpool + .pending .snapshot_for(&resolved_for_id) .unwrap() .next_sequence; @@ -920,14 +833,11 @@ mod tests { assert_eq!(expected, 2, "should reflect both pending messages"); } - #[test] - fn test_get_state_sequence_accounts_for_tipset_messages() { + #[tokio::test] + async fn test_get_state_sequence_accounts_for_tipset_messages() { use crate::message_pool::test_provider::mock_block; let api = TestApi::default(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let state_nonce_cache = SizeTrackingLruCache::new_mocked(); - let sender = Address::new_bls(&[3u8; 48]).unwrap(); api.set_state_sequence(&sender, 5); @@ -938,21 +848,20 @@ mod tests { ); let ts = Tipset::from(block); - let nonce = get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap(); + let (mpool, _services) = make_test_mpool(api); + + let nonce = mpool.get_state_sequence(&sender, &ts).unwrap(); assert_eq!( nonce, 8, "should account for non-consecutive tipset message at nonce 7" ); } - #[test] - fn test_get_state_sequence_ignores_other_addresses() { + #[tokio::test] + async fn test_get_state_sequence_ignores_other_addresses() { use crate::message_pool::test_provider::mock_block; let api = TestApi::default(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let state_nonce_cache = SizeTrackingLruCache::new_mocked(); - let addr_a = Address::new_bls(&[4u8; 48]).unwrap(); let addr_b = Address::new_bls(&[5u8; 48]).unwrap(); api.set_state_sequence(&addr_a, 0); @@ -969,30 +878,26 @@ mod tests { ); let ts = Tipset::from(block); - let nonce_a = - get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_a, &ts).unwrap(); + let (mpool, _services) = make_test_mpool(api); + + let nonce_a = mpool.get_state_sequence(&addr_a, &ts).unwrap(); assert_eq!( nonce_a, 0, "addr_a nonce should be unaffected by addr_b's messages" ); - let nonce_b = - get_state_sequence(&api, &key_cache, &state_nonce_cache, &addr_b, &ts).unwrap(); + let nonce_b = mpool.get_state_sequence(&addr_b, &ts).unwrap(); assert_eq!( nonce_b, 3, "addr_b nonce should reflect its tipset messages" ); } - #[test] - fn test_get_state_sequence_cache_hit() { + #[tokio::test] + async fn test_get_state_sequence_cache_hit() { use crate::message_pool::test_provider::mock_block; let api = TestApi::default(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let state_nonce_cache: SizeTrackingLruCache = - SizeTrackingLruCache::new_mocked(); - let sender = Address::new_bls(&[6u8; 48]).unwrap(); api.set_state_sequence(&sender, 5); @@ -1002,46 +907,42 @@ mod tests { .set_block_messages(&block, vec![make_smsg(sender, 5, 100)]); let ts = Tipset::from(block); - let nonce1 = - get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap(); + let (mpool, _services) = make_test_mpool(api); + + let nonce1 = mpool.get_state_sequence(&sender, &ts).unwrap(); assert_eq!(nonce1, 6); // Mutate the underlying state; the cache should still return the old value. - api.set_state_sequence(&sender, 99); - let nonce2 = - get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts).unwrap(); + mpool.api.set_state_sequence(&sender, 99); + let nonce2 = mpool.get_state_sequence(&sender, &ts).unwrap(); assert_eq!( nonce2, 6, "second call should return the cached value, not re-read state" ); } - #[test] - fn test_get_state_sequence_cache_miss_on_different_tipset() { + #[tokio::test] + async fn test_get_state_sequence_cache_miss_on_different_tipset() { use crate::message_pool::test_provider::mock_block; let api = TestApi::default(); - let key_cache = SizeTrackingLruCache::new_mocked(); - let state_nonce_cache: SizeTrackingLruCache = - SizeTrackingLruCache::new_mocked(); - let sender = Address::new_bls(&[7u8; 48]).unwrap(); api.set_state_sequence(&sender, 10); + let (mpool, _services) = make_test_mpool(api); + let block_a = mock_block(1, 1); let ts_a = Tipset::from(&block_a); - let nonce_a = - get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_a).unwrap(); + let nonce_a = mpool.get_state_sequence(&sender, &ts_a).unwrap(); assert_eq!(nonce_a, 10); // Different tipset should be a cache miss and re-read state. - api.set_state_sequence(&sender, 20); + mpool.api.set_state_sequence(&sender, 20); let block_b = mock_block(2, 2); let ts_b = Tipset::from(&block_b); - let nonce_b = - get_state_sequence(&api, &key_cache, &state_nonce_cache, &sender, &ts_b).unwrap(); + let nonce_b = mpool.get_state_sequence(&sender, &ts_b).unwrap(); assert_eq!( nonce_b, 20, "different tipset should miss the cache and read fresh state" diff --git a/src/message_pool/msgpool/msg_set.rs b/src/message_pool/msgpool/msg_set.rs index 5aa1cd13c42c..0eae120e03fc 100644 --- a/src/message_pool/msgpool/msg_set.rs +++ b/src/message_pool/msgpool/msg_set.rs @@ -11,7 +11,8 @@ use ahash::{HashMap, HashMapExt}; use crate::message::{MessageRead, SignedMessage}; use crate::message_pool::errors::Error; use crate::message_pool::metrics; -use crate::message_pool::msgpool::{RBF_DENOM, RBF_NUM, TrustPolicy}; +use crate::message_pool::msg_pool::TrustPolicy; +use crate::message_pool::msgpool::{RBF_DENOM, RBF_NUM}; use crate::shim::econ::TokenAmount; /// Maximum allowed nonce gap for trusted message inserts under [`StrictnessPolicy::Strict`]. diff --git a/src/message_pool/msgpool/reorg.rs b/src/message_pool/msgpool/reorg.rs new file mode 100644 index 000000000000..474cf3000673 --- /dev/null +++ b/src/message_pool/msgpool/reorg.rs @@ -0,0 +1,134 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Reorg handling: revert + apply tipsets against the pending pool. + +use crate::blocks::Tipset; +use crate::message::{MessageRead as _, SignedMessage}; +use crate::message_pool::msgpool::utils; +use crate::message_pool::{ + Error, + msg_pool::{StrictnessPolicy, TrustPolicy}, + msgpool::{msg_pool::MessagePool, recover_sig}, + provider::Provider, +}; +use crate::shim::address::Address; +use crate::utils::ShallowClone as _; +use ahash::{HashMap, HashMapExt}; + +impl MessagePool +where + T: Provider + 'static, +{ + /// Revert and/or apply tipsets to the message pool. + /// + /// - **Apply**: messages included in the new tipset are removed from the + /// pending pool with `applied = true`. + /// - **Revert**: messages from the reverted tipset are re-added to the + /// pool with [`StrictnessPolicy::Relaxed`] and [`TrustPolicy::Trusted`], + /// allowing them back without nonce-gap restrictions. + /// + /// The state-nonce cache is naturally invalidated when the tipset + /// changes, since it is keyed by `(TipsetKey, Address)`. + pub(in crate::message_pool) async fn apply_head_change( + &self, + revert: Vec, + apply: Vec, + ) -> Result<(), Error> { + let mut repub = false; + let mut rmsgs: HashMap> = HashMap::new(); + for ts in revert { + let Ok(pts) = self.api.load_tipset(ts.parents()) else { + tracing::error!("error loading reverted tipset parent"); + continue; + }; + *self.cur_tipset.write() = pts; + + let mut msgs: Vec = Vec::new(); + for block in ts.block_headers() { + let Ok((umsg, smsgs)) = self.api.messages_for_block(block) else { + tracing::error!("error retrieving messages for reverted block"); + continue; + }; + msgs.extend(smsgs); + for msg in umsg { + let msg_cid = msg.cid(); + let Ok(smsg) = recover_sig(&self.caches.bls_sig, msg) else { + tracing::debug!("could not recover signature for bls message {}", msg_cid); + continue; + }; + msgs.push(smsg) + } + } + + for msg in msgs { + utils::add_to_selected_msgs(msg, &mut rmsgs); + } + } + + for ts in apply { + for b in ts.block_headers() { + let Ok((msgs, smsgs)) = self.api.messages_for_block(b) else { + tracing::error!("error retrieving messages for block"); + continue; + }; + + for msg in smsgs { + self.remove_applied_from_pool(&msg.from(), msg.sequence(), &mut rmsgs, &ts)?; + if !repub && self.republish.was_republished(&msg.cid()) { + repub = true; + } + } + for msg in msgs { + self.remove_applied_from_pool(&msg.from, msg.sequence, &mut rmsgs, &ts)?; + if !repub && self.republish.was_republished(&msg.cid()) { + repub = true; + } + } + } + *self.cur_tipset.write() = ts; + } + if repub { + self.republish.trigger()?; + } + + let cur_ts = self.cur_tipset.read().shallow_clone(); + for (_, hm) in rmsgs { + for (_, msg) in hm { + if let Err(e) = self.add_to_pool_unchecked( + &cur_ts, + msg, + TrustPolicy::Trusted, + StrictnessPolicy::Relaxed, + ) { + tracing::error!("Failed to read message from reorg to mpool: {}", e); + } + } + } + Ok(()) + } + + /// Remove a message from the in-progress `rmsgs` scratch map. If the + /// message isn't there, fall back to removing it from the real pending + /// pool. Used by [`Self::apply_head_change`] when an applied tipset + /// includes a message that we hadn't yet seen reverted. + fn remove_applied_from_pool( + &self, + from: &Address, + sequence: u64, + rmsgs: &mut HashMap>, + ts: &Tipset, + ) -> Result<(), Error> { + if rmsgs + .get_mut(from) + .and_then(|temp| temp.remove(&sequence)) + .is_none() + && let Ok(resolved) = self + .resolve_to_key(from, ts) + .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}")) + { + let _ = self.pending.remove(&resolved, sequence, true); + } + Ok(()) + } +} diff --git a/src/message_pool/msgpool/republish.rs b/src/message_pool/msgpool/republish.rs index 5b0e65a7f06d..e707b7e221f6 100644 --- a/src/message_pool/msgpool/republish.rs +++ b/src/message_pool/msgpool/republish.rs @@ -4,13 +4,25 @@ //! Tracks which CIDs were already broadcast in the current republish cycle //! and exposes a trigger to wake the republish task early. -use ahash::HashSet; +use std::cmp::Ordering; + +use crate::message::{MessageRead as _, SignedMessage}; +use crate::message_pool::{ + Error, + msg_chain::{Chains, create_message_chains}, + msgpool::{MIN_GAS, msg_pool::MessagePool}, + provider::Provider, + utils::get_base_fee_lower_bound, +}; +use crate::prelude::ShallowClone; +use crate::shim::address::Address; +use ahash::{HashMap, HashMapExt, HashSet}; use cid::Cid; use parking_lot::RwLock as SyncRwLock; -use crate::message_pool::Error; - const REPUB_TRIGGER_CAPACITY: usize = 1; +const BASE_FEE_LOWER_BOUND_FACTOR: i64 = 10; +const REPUB_MSG_LIMIT: usize = 30; pub(in crate::message_pool) struct RepublishState { republished: SyncRwLock>, @@ -51,6 +63,135 @@ impl RepublishState { } } +impl MessagePool { + pub(in crate::message_pool) async fn run_republish_cycle(&self) -> Result<(), Error> { + let ts = self.cur_tipset.read().shallow_clone(); + + // Only republish messages from local addresses, i.e., transactions which + // were sent to this node directly. + let local: Vec
= self.local_addrs.read().iter().copied().collect(); + let mut pending_map: HashMap> = + HashMap::with_capacity(local.len()); + for actor in &local { + if let Some(mset) = self.pending.snapshot_for(actor) + && !mset.msgs.is_empty() + { + pending_map.insert(*actor, mset.msgs); + } + } + + let msgs = + select_messages_to_republish(self.api.as_ref(), &self.chain_config, &ts, pending_map)?; + + for m in msgs.iter() { + self.publish_pubsub(m).await?; + } + + self.republish.replace_with(msgs.iter().map(|m| m.cid())); + + Ok(()) + } +} + +/// Score local senders' pending message chains for the republish broadcast. +/// +/// Distinct from the block-producer selection path (`selection.rs`): uses +/// the aggressive [`BASE_FEE_LOWER_BOUND_FACTOR`] of 10 (vs. 100 in the add +/// path) and caps the result at [`REPUB_MSG_LIMIT`] messages. +fn select_messages_to_republish( + api: &T, + chain_config: &crate::networks::ChainConfig, + base: &crate::blocks::Tipset, + pending: HashMap>, +) -> Result, Error> +where + T: Provider, +{ + let mut msgs: Vec = vec![]; + + let base_fee = api.chain_compute_base_fee(base)?; + let base_fee_lower_bound = get_base_fee_lower_bound(&base_fee, BASE_FEE_LOWER_BOUND_FACTOR); + + if pending.is_empty() { + return Ok(msgs); + } + + let mut chains = Chains::new(); + for (actor, mset) in pending.iter() { + create_message_chains( + api, + actor, + mset, + &base_fee_lower_bound, + base, + &mut chains, + chain_config, + )?; + } + + if chains.is_empty() { + return Ok(msgs); + } + + chains.sort(false); + + let mut gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT; + let mut i = 0; + 'l: while let Some(chain) = chains.get_mut_at(i) { + // we can exceed this if we have picked (some) longer chain already + if msgs.len() > REPUB_MSG_LIMIT { + break; + } + + if gas_limit <= MIN_GAS { + break; + } + + // check if chain has been invalidated + if !chain.valid { + i += 1; + continue; + } + + // check if fits in block + if chain.gas_limit <= gas_limit { + // check the baseFee lower bound -- only republish messages that can be included + // in the chain within the next 20 blocks. + for m in chain.msgs.iter() { + if m.gas_fee_cap() < base_fee_lower_bound { + let key = chains.get_key_at(i); + chains.invalidate(key); + continue 'l; + } + gas_limit = gas_limit.saturating_sub(m.gas_limit()); + msgs.push(m.clone()); + } + + i += 1; + continue; + } + + // we can't fit the current chain but there is gas to spare + // trim it and push it down + chains.trim_msgs_at(i, gas_limit, REPUB_MSG_LIMIT, &base_fee); + let mut j = i; + while j < chains.len() - 1 { + #[allow(clippy::indexing_slicing)] + if chains[j].compare(&chains[j + 1]) == Ordering::Less { + break; + } + chains.key_vec.swap(i, i + 1); + j += 1; + } + } + + if msgs.len() > REPUB_MSG_LIMIT { + msgs.truncate(REPUB_MSG_LIMIT); + } + + Ok(msgs) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index b6d386aa314b..0badb492d394 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -23,10 +23,11 @@ use crate::shim::crypto::Signature; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; -use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig}; +use super::{msg_pool::MessagePool, provider::Provider, utils, utils::recover_sig}; use crate::message_pool::{ - Error, add_to_selected_msgs, + Error, msg_chain::{Chains, NodeKey, create_message_chains}, + msg_pool::resolve_to_key, msgpool::{MIN_GAS, pending_store::PendingStore}, }; @@ -639,7 +640,7 @@ where } fn get_pending_messages(&self, cur_ts: &Tipset, ts: &Tipset) -> Result { - let snapshot = self.pending_store.snapshot(); + let snapshot = self.pending.snapshot(); let mut result: Pending = HashMap::with_capacity(snapshot.len()); for (a, mset) in snapshot { result.insert(a, mset.msgs); @@ -653,7 +654,7 @@ where run_head_change( self.api.as_ref(), &self.caches.bls_sig, - &self.pending_store, + &self.pending, &self.caches.key, cur_ts.clone(), ts.clone(), @@ -671,7 +672,7 @@ where ) -> Result { let result = Vec::with_capacity(self.config.size_limit_low() as usize); let gas_limit = crate::shim::econ::BLOCK_GAS_LIMIT; - let min_gas = 1298450; + let min_gas = MIN_GAS; // 1. Get priority actor chains let priority = self.config.priority_addrs(); @@ -845,37 +846,72 @@ where } } for msg in msgs { - add_to_selected_msgs(msg, rmsgs); + utils::add_to_selected_msgs(msg, rmsgs); } } for ts in right_chain { - let mpool_ctx = MpoolCtx { - api, - key_cache, - pending_store, - ts: &ts, - }; for b in ts.block_headers() { let (msgs, smsgs) = api.messages_for_block(b)?; for msg in smsgs { - mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?; + remove_applied_from_pool( + api, + key_cache, + pending_store, + &ts, + &msg.from(), + msg.sequence(), + rmsgs, + )?; } for msg in msgs { - mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?; + remove_applied_from_pool( + api, + key_cache, + pending_store, + &ts, + &msg.from, + msg.sequence, + rmsgs, + )?; } } } Ok(()) } +/// Free-fn mirror of [`MessagePool::remove_applied_from_pool`] for the +/// simulator path, which has only the individual fields to hand and not a +/// `&MessagePool`. Bodies are intentionally identical; consolidation can +/// happen once the simulator routes through `&MessagePool` directly. +#[allow(clippy::too_many_arguments)] +fn remove_applied_from_pool( + api: &T, + key_cache: &IdToAddressCache, + pending_store: &PendingStore, + ts: &Tipset, + from: &Address, + sequence: u64, + rmsgs: &mut HashMap>, +) -> Result<(), Error> { + if rmsgs + .get_mut(from) + .and_then(|temp| temp.remove(&sequence)) + .is_none() + && let Ok(resolved) = resolve_to_key(api, key_cache, from, ts) + .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}")) + { + let _ = pending_store.remove(&resolved, sequence, true); + } + Ok(()) +} + #[cfg(test)] mod test_selection { use std::sync::Arc; use super::*; - use crate::db::MemoryDB; use crate::key_management::{KeyStore, KeyStoreConfig, Wallet}; use crate::message_pool::msgpool::{ test_provider::{TestApi, mock_block}, @@ -974,7 +1010,7 @@ mod test_selection { .unwrap(); // we should now have no pending messages in the MessagePool - let remaining = mpool.pending_store.snapshot(); + let remaining = mpool.pending.snapshot(); assert!( remaining.is_empty(), "Expected no pending messages, but got {}", @@ -1296,13 +1332,6 @@ mod test_selection { #[tokio::test] async fn message_selection_priority() { - let db = MemoryDB::default(); - - let mut joinset = JoinSet::new(); - let mut mpool = make_test_mpool(&mut joinset); - let ts = mock_tipset(&mpool).await; - let api = mpool.api.clone(); - let ks1 = KeyStore::new(KeyStoreConfig::Memory).unwrap(); let mut w1 = Wallet::new(ks1); let a1 = w1.generate_addr(SignatureType::Secp256k1).unwrap(); @@ -1311,10 +1340,17 @@ mod test_selection { let mut w2 = Wallet::new(ks2); let a2 = w2.generate_addr(SignatureType::Secp256k1).unwrap(); - // set priority addrs to a1 - let mut mpool_cfg = mpool.get_config().clone(); - mpool_cfg.priority_addrs.push(a1); - mpool.set_config(&db, mpool_cfg).unwrap(); + let cfg = crate::message_pool::config::MpoolConfig { + priority_addrs: vec![a1], + ..Default::default() + }; + + let mut joinset = JoinSet::new(); + let (tx, _rx) = flume::bounded(50); + let mpool = + MessagePool::new(TestApi::default(), tx, cfg, Arc::default(), &mut joinset).unwrap(); + let ts = mock_tipset(&mpool).await; + let api = mpool.api.clone(); // let gas_limit = 6955002; api.set_state_balance_raw(&a1, TokenAmount::from_whole(1)); diff --git a/src/message_pool/msgpool/utils.rs b/src/message_pool/msgpool/utils.rs index b4dbb4243f95..0b6f02614fc4 100644 --- a/src/message_pool/msgpool/utils.rs +++ b/src/message_pool/msgpool/utils.rs @@ -3,14 +3,15 @@ use crate::chain::MINIMUM_BASE_FEE; use crate::message::{MessageRead as _, SignedMessage}; +use crate::message_pool::Error; +use crate::shim::address::Address; use crate::shim::{crypto::Signature, econ::TokenAmount, message::Message}; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; +use ahash::HashMap; use num_rational::BigRational; use num_traits::ToPrimitive; -use crate::message_pool::Error; - pub(in crate::message_pool) fn get_base_fee_lower_bound( base_fee: &TokenAmount, factor: i64, @@ -55,3 +56,10 @@ pub(in crate::message_pool) fn recover_sig( let smsg = SignedMessage::new_from_parts(msg, val)?; Ok(smsg) } + +pub(in crate::message_pool) fn add_to_selected_msgs( + m: SignedMessage, + rmsgs: &mut HashMap>, +) { + rmsgs.entry(m.from()).or_default().insert(m.sequence(), m); +} diff --git a/src/message_pool/nonce_tracker.rs b/src/message_pool/nonce_tracker.rs index d57ddf6e4e22..b5341dfbd890 100644 --- a/src/message_pool/nonce_tracker.rs +++ b/src/message_pool/nonce_tracker.rs @@ -48,6 +48,7 @@ mod tests { use crate::key_management::{KeyStore, KeyStoreConfig, Wallet}; use crate::message_pool::MessagePool; use crate::message_pool::msgpool::test_provider::TestApi; + use crate::prelude::*; use crate::shim::crypto::SignatureType; use crate::shim::{address::Address, econ::TokenAmount}; use std::sync::Arc; @@ -120,13 +121,12 @@ mod tests { const N: usize = 10; let tracker = Arc::new(NonceTracker::new()); let (mpool, mut wallet, sender, _rx) = make_test_pool_and_wallet(); - let mpool = Arc::new(mpool); let key = Arc::new(wallet.find_key(&sender).unwrap()); let eth_chain_id: EthChainId = crate::networks::calibnet::ETH_CHAIN_ID; let mut tasks = JoinSet::new(); for _ in 0..N { - let (tracker, mpool, key) = (tracker.clone(), mpool.clone(), key.clone()); + let (tracker, mpool, key) = (tracker.clone(), mpool.shallow_clone(), key.clone()); tasks.spawn(async move { tracker .sign_and_push(&mpool, make_message(sender), &key, eth_chain_id) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 06c645a928ff..c60c2e91028a 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1830,7 +1830,7 @@ pub async fn eth_gas_search(data: &Ctx, msg: Message, tsk: &ApiTipsetKey) -> any ) }) { let ret = gas_search(data, &msg, &prior_messages, ts).await?; - Ok(((ret as f64) * data.mpool.config.gas_limit_overestimation) as u64) + Ok(((ret as f64) * data.mpool.gas_limit_overestimation()) as u64) } else { anyhow::bail!( "message execution failed: exit {}, reason: {}", diff --git a/src/rpc/methods/gas.rs b/src/rpc/methods/gas.rs index a42b85dbe7c0..d61cc71489b1 100644 --- a/src/rpc/methods/gas.rs +++ b/src/rpc/methods/gas.rs @@ -312,7 +312,7 @@ pub async fn estimate_message_gas( ) -> Result { if msg.gas_limit == 0 { let gl = GasEstimateGasLimit::estimate_gas_limit(data, msg.clone(), &tsk).await?; - let gl = gl as f64 * data.mpool.config.gas_limit_overestimation; + let gl = gl as f64 * data.mpool.gas_limit_overestimation(); msg.set_gas_limit((gl as u64).min(BLOCK_GAS_LIMIT)); } if msg.gas_premium.is_zero() { diff --git a/src/utils/cache/lru.rs b/src/utils/cache/lru.rs index 913bcde4fba8..a5b86e62845f 100644 --- a/src/utils/cache/lru.rs +++ b/src/utils/cache/lru.rs @@ -177,11 +177,6 @@ where } size } - - #[cfg(test)] - pub(crate) fn new_mocked() -> Self { - Self::new_inner(Cow::Borrowed("mocked_cache"), NonZeroUsize::new(1)) - } } impl Collector for SizeTrackingLruCache