diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index bc03040218a8..a420647a46fb 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -178,6 +178,14 @@ impl ChainFollower { ) .await } + + /// Subscribe to validated tipsets. + pub fn subscribe_validated_tipset(&self) -> tokio::sync::broadcast::Receiver { + self.state_machine + .lock() + .validated_tipset_broadcast_tx + .subscribe() + } } #[allow(clippy::too_many_arguments)] @@ -610,6 +618,8 @@ struct SyncStateMachine { // Map from TipsetKey to FullTipset tipsets: HashMap, stateless_mode: bool, + /// Broadcast channel for validated tipsets, used to notify other components of new validated tipsets. + validated_tipset_broadcast_tx: tokio::sync::broadcast::Sender, } impl SyncStateMachine { @@ -623,6 +633,7 @@ impl SyncStateMachine { bad_block_cache, tipsets: HashMap::default(), stateless_mode, + validated_tipset_broadcast_tx: tokio::sync::broadcast::Sender::new(1024), } } @@ -783,10 +794,10 @@ impl SyncStateMachine { } } - fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) { + fn try_mark_tipset_as_validated(&mut self, tipset: FullTipset, is_proposed_head: bool) -> bool { if !self.is_parent_validated(&tipset) { tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated"); - return; + return false; } self.tipsets.remove(tipset.key()); @@ -798,6 +809,7 @@ impl SyncStateMachine { if self.cs.heaviest_tipset().weight() < tipset.weight() { if let Err(e) = self.cs.set_heaviest_tipset(tipset) { error!("Error setting heaviest tipset: {}", e); + return false; } else { info!("Heaviest tipset: {} ({})", epoch, terse_key); } @@ -805,10 +817,13 @@ impl SyncStateMachine { } else if is_proposed_head { if let Err(e) = self.cs.put_tipset(&tipset) { error!("Error putting tipset: {e}"); + return false; } } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) { error!("Error setting heaviest tipset: {e}"); + return false; } + true } pub fn update(&mut self, event: SyncEvent) { @@ -823,7 +838,15 @@ impl SyncStateMachine { SyncEvent::ValidatedTipset { tipset, is_proposed_head, - } => self.mark_validated_tipset(tipset, is_proposed_head), + } => { + let tsk = tipset.key().clone(); + if self.try_mark_tipset_as_validated(tipset, is_proposed_head) + && crate::utils::broadcast::has_subscribers(&self.validated_tipset_broadcast_tx) + && let Err(e) = self.validated_tipset_broadcast_tx.send(tsk) + { + warn!("Failed to broadcast validated tipset: {e}"); + } + } } } @@ -1076,7 +1099,7 @@ mod tests { for (ts, is_proposed_head) in validation_tipsets { validation_tasks.push(ts.epoch()); db.put_cbor_default(&ts.epoch()).unwrap(); - state_machine.mark_validated_tipset(ts, is_proposed_head); + state_machine.try_mark_tipset_as_validated(ts, is_proposed_head); } } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 670dd30a6a25..b5a4ec866039 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -341,9 +341,57 @@ fn create_chain_follower( fn start_chain_follower_service( services: &mut JoinSet>, + opts: &CliOpts, + config: &Config, chain_follower: ChainFollower, ) { - services.spawn(async move { chain_follower.run().await }); + services.spawn({ + let chain_follower = chain_follower.shallow_clone(); + async move { chain_follower.run().await } + }); + // Prefill RPC method caches for newly validated tipsets to speed up subsequent RPC calls. + if config.client.enable_rpc && !opts.stateless { + let sync_status = chain_follower.sync_status.shallow_clone(); + let state_manager = chain_follower.state_manager.shallow_clone(); + let mut validated_tipset_rx = chain_follower.subscribe_validated_tipset(); + services.spawn(async move { + loop { + match validated_tipset_rx.recv().await { + Ok(_) if !sync_status.read().is_synced() => { + // Skip if the node is catching up to avoid unnecessary work, as the head may be changing rapidly. + continue; + } + Ok(tsk) => { + let state_manager = state_manager.shallow_clone(); + tokio::spawn(async move { + match state_manager.chain_index().load_required_tipset(&tsk) { + Ok(ts) => { + for tx_info in [crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash] + { + if let Err(e) = crate::rpc::eth::Block::from_filecoin_tipset( + &state_manager, + ts.shallow_clone(), + tx_info, + ) + .await { + warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}"); + } + } + }, + Err(e) => { + warn!("failed to load tipset for cache warmup: {e:#}"); + } + } + }); + } + Err(RecvError::Lagged(n)) => { + warn!("validated tipset broadcast lagged: skipped {n} tipsets") + } + Err(RecvError::Closed) => break Ok(()), + } + } + }); + } } async fn maybe_start_health_check_service( @@ -709,7 +757,7 @@ pub(super) async fn start_services( ensure_proof_params_downloaded().await?; } services.spawn(p2p_service.run()); - start_chain_follower_service(&mut services, chain_follower); + start_chain_follower_service(&mut services, opts, &config, chain_follower); // blocking until any of the services returns an error, propagate_error(&mut services) .await diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 3134395587c9..f9f8f6d23bbd 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -88,7 +88,7 @@ pub(crate) fn new_heads(data: Ctx) -> (Subscriber, JoinHandle<()>) { for ts in changes.applies { // Convert the tipset to an Ethereum block with full transaction info // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block - match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { + match EthBlock::from_filecoin_tipset(&data.state_manager, ts, TxInfo::Full).await { Ok(block) => { if let Err(e) = sender.send(ApiHeaders(block)) { tracing::error!("Failed to send headers: {}", e); diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 9b5a19dc50b9..7180cd1bacee 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -55,7 +55,7 @@ use crate::shim::message::Message; use crate::shim::trace::{CallReturn, ExecutionEvent}; use crate::shim::{clock::ChainEpoch, state_tree::StateTree}; use crate::state_manager::cache::ForestLruCache; -use crate::state_manager::{ExecutedMessage, ExecutedTipset, TipsetState, VMFlush}; +use crate::state_manager::{ExecutedMessage, ExecutedTipset, StateManager, TipsetState, VMFlush}; use crate::utils::db::BlockstoreExt as _; use crate::utils::encoding::from_slice_with_fallback; use crate::utils::get_size::{CidWrapper, big_int_heap_size_helper}; @@ -474,7 +474,7 @@ impl Block { /// /// Reference: pub async fn from_filecoin_tipset( - ctx: Ctx, + state_manager: &StateManager, tipset: crate::blocks::Tipset, tx_info: TxInfo, ) -> Result> { @@ -484,13 +484,13 @@ impl Block { }); match tx_info { - TxInfo::Full => Self::from_filecoin_tipset_with_full_tx(ctx, tipset).await, + TxInfo::Full => Self::from_filecoin_tipset_with_full_tx(state_manager, tipset).await, TxInfo::Hash => { let block_cid = tipset.key().cid()?; ETH_BLOCK_HASH_TX_CACHE .get_or_else(&block_cid.into(), async move || { let block_with_full_tx = - Self::from_filecoin_tipset_with_full_tx(ctx, tipset).await?; + Self::from_filecoin_tipset_with_full_tx(state_manager, tipset).await?; Ok(Arc::new( Arc::unwrap_or_clone(block_with_full_tx) .downcast_full_transaction_to_hash(), @@ -502,7 +502,7 @@ impl Block { } async fn from_filecoin_tipset_with_full_tx( - ctx: Ctx, + state_manager: &StateManager, tipset: crate::blocks::Tipset, ) -> Result> { static ETH_BLOCK_FULL_TX_CACHE: LazyLock>> = @@ -521,9 +521,9 @@ impl Block { state_root, executed_messages, .. - } = ctx.state_manager.load_executed_tipset(&tipset).await?; + } = state_manager.load_executed_tipset(&tipset).await?; let has_transactions = !executed_messages.is_empty(); - let state_tree = ctx.state_manager.get_state_tree(&state_root)?; + let state_tree = state_manager.get_state_tree(&state_root)?; let mut full_transactions = vec![]; let mut gas_used = 0; @@ -540,13 +540,13 @@ impl Block { ChainMessage::Signed(smsg) => new_eth_tx_from_signed_message( smsg, &state_tree, - ctx.chain_config().eth_chain_id, + state_manager.chain_config().eth_chain_id, )?, ChainMessage::Unsigned(msg) => { let tx = eth_tx_from_native_message( msg, &state_tree, - ctx.chain_config().eth_chain_id, + state_manager.chain_config().eth_chain_id, )?; ApiEthTx { hash: msg.cid().into(), @@ -1437,7 +1437,7 @@ impl RpcMethod<2> for EthGetBlockByHash { let ts = resolver .tipset_by_block_number_or_hash(block_hash, ResolveNullTipset::TakeOlder) .await?; - Block::from_filecoin_tipset(ctx, ts, full_tx_info.into()) + Block::from_filecoin_tipset(&ctx.state_manager, ts, full_tx_info.into()) .await .map_err(ServerError::from) } @@ -1465,7 +1465,7 @@ impl RpcMethod<2> for EthGetBlockByNumber { let ts = resolver .tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder) .await?; - Block::from_filecoin_tipset(ctx, ts, full_tx_info.into()) + Block::from_filecoin_tipset(&ctx.state_manager, ts, full_tx_info.into()) .await .map_err(ServerError::from) }