feat: prefill RPC caches for new validated tipsets#7068
Conversation
WalkthroughThis PR introduces a validated tipset broadcast channel in the sync state machine, refactors Ethereum block conversion to accept StateManager references instead of full context, and integrates a reactive cache-warming loop in the daemon that proactively loads and warms RPC caches for newly validated tipsets when the node is not fully synced. ChangesValidated Tipset Broadcast and RPC Cache Warming
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
| } | ||
|
|
||
| fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) { | ||
| fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) -> bool { |
There was a problem hiding this comment.
It's unclear to me what the return bool here denotes. Seems like it should be a Result.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/chain_sync/chain_follower.rs (1)
631-637:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winUse
broadcast::channelinstead ofSender::newfor Tokio broadcast initialization.Line 636 uses
tokio::sync::broadcast::Sender::new(1024), which is not part of Tokio 1.x's public API. The correct approach istokio::sync::broadcast::channel(capacity), which returns a tuple of(Sender, Receiver).Proposed fix
impl SyncStateMachine { pub fn new( cs: ChainStore, bad_block_cache: Option<BadBlockCache>, stateless_mode: bool, ) -> Self { + let (validated_tipset_broadcast_tx, _) = tokio::sync::broadcast::channel(1024); Self { cs, bad_block_cache, tipsets: HashMap::default(), stateless_mode, - validated_tipset_broadcast_tx: tokio::sync::broadcast::Sender::new(1024), + validated_tipset_broadcast_tx, } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/chain_sync/chain_follower.rs` around lines 631 - 637, The initialization uses tokio::sync::broadcast::Sender::new which doesn't exist in Tokio 1.x; replace it by calling tokio::sync::broadcast::channel(capacity) and assign the returned Sender to validated_tipset_broadcast_tx (e.g., let (validated_tipset_broadcast_tx, _rx) = tokio::sync::broadcast::channel(1024) before constructing Self), keeping the symbol validated_tipset_broadcast_tx as the Sender used in the struct and ensuring any needed Receiver is retained or discarded appropriately.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/daemon/mod.rs`:
- Around line 350-383: The loop prefetches tipsets and calls
crate::rpc::eth::Block::from_filecoin_tipset even when RPC is disabled; guard
this work with the RPC flag by checking enable_rpc (or the equivalent config
passed in) before entering the spawn or before handling Ok(tsk) so that when
enable_rpc is false you skip loading from validated_tipset_rx and the
Block::from_filecoin_tipset calls. Locate the validated_tipset_rx handling in
the service spawn and short-circuit using the enable_rpc boolean (or avoid
spawning the inner tokio::spawn that uses state_manager.chain_index() and
Block::from_filecoin_tipset) to prevent unnecessary CPU/IO when RPC is disabled.
- Around line 360-375: The spawned task currently uses the `?` operator on
`state_manager.chain_index().load_required_tipset(&tsk)?`, which will silently
propagate errors; change this to explicitly handle the Result: call
`state_manager.chain_index().load_required_tipset(&tsk)`, match on Err/Ok, and
on Err log a warning (including `tsk` and the error) and return early from the
task (e.g., via `return`/`anyhow::Ok(())`) so the subsequent
`Block::from_filecoin_tipset` calls only run when the tipset loaded
successfully; reference `state_manager.chain_index().load_required_tipset(&tsk)`
and the cache warmup loop invoking
`crate::rpc::eth::Block::from_filecoin_tipset`.
---
Outside diff comments:
In `@src/chain_sync/chain_follower.rs`:
- Around line 631-637: The initialization uses
tokio::sync::broadcast::Sender::new which doesn't exist in Tokio 1.x; replace it
by calling tokio::sync::broadcast::channel(capacity) and assign the returned
Sender to validated_tipset_broadcast_tx (e.g., let
(validated_tipset_broadcast_tx, _rx) = tokio::sync::broadcast::channel(1024)
before constructing Self), keeping the symbol validated_tipset_broadcast_tx as
the Sender used in the struct and ensuring any needed Receiver is retained or
discarded appropriately.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: a8bf0584-72dc-4011-844e-4225fbdc22ff
📒 Files selected for processing (4)
src/chain_sync/chain_follower.rssrc/daemon/mod.rssrc/rpc/methods/chain.rssrc/rpc/methods/eth.rs
| // Prefill RPC method caches for newly validated tipsets to speed up subsequent RPC calls. | ||
| services.spawn(async move { | ||
| loop { | ||
| match validated_tipset_rx.recv().await { | ||
| Ok(_) if !sync_status.read().is_synced() => { | ||
| // Skip if the node is catching up to avoid unnecessary work, as the head may be changing rapidly. | ||
| continue; | ||
| } | ||
| Ok(tsk) => { | ||
| let state_manager = state_manager.shallow_clone(); | ||
| tokio::spawn(async move { | ||
| let ts = state_manager.chain_index().load_required_tipset(&tsk)?; | ||
| for tx_info in | ||
| [crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash] | ||
| { | ||
| if let Err(e) = crate::rpc::eth::Block::from_filecoin_tipset( | ||
| &state_manager, | ||
| ts.shallow_clone(), | ||
| tx_info, | ||
| ) | ||
| .await { | ||
| warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}"); | ||
| } | ||
| } | ||
| anyhow::Ok(()) | ||
| }); | ||
| } | ||
| Err(RecvError::Lagged(n)) => { | ||
| warn!("validated tipset broadcast lagged: skipped {n} tipsets") | ||
| } | ||
| Err(RecvError::Closed) => break Ok(()), | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
Skip tipset cache warmup when RPC is disabled.
This loop still does tipset loads and ETH block conversion even when enable_rpc is false, which adds unnecessary CPU/IO load.
💡 Suggested change
-fn start_chain_follower_service(
+fn start_chain_follower_service(
services: &mut JoinSet<anyhow::Result<()>>,
chain_follower: ChainFollower,
+ enable_rpc: bool,
) {
let sync_status = chain_follower.sync_status.shallow_clone();
let state_manager = chain_follower.state_manager.shallow_clone();
let mut validated_tipset_rx = chain_follower.subscribe_validated_tipset();
services.spawn(async move { chain_follower.run().await });
+ if !enable_rpc {
+ return;
+ }
// Prefill RPC method caches for newly validated tipsets to speed up subsequent RPC calls.
services.spawn(async move {
loop {// outside this hunk (call site):
start_chain_follower_service(&mut services, chain_follower, config.client.enable_rpc);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/daemon/mod.rs` around lines 350 - 383, The loop prefetches tipsets and
calls crate::rpc::eth::Block::from_filecoin_tipset even when RPC is disabled;
guard this work with the RPC flag by checking enable_rpc (or the equivalent
config passed in) before entering the spawn or before handling Ok(tsk) so that
when enable_rpc is false you skip loading from validated_tipset_rx and the
Block::from_filecoin_tipset calls. Locate the validated_tipset_rx handling in
the service spawn and short-circuit using the enable_rpc boolean (or avoid
spawning the inner tokio::spawn that uses state_manager.chain_index() and
Block::from_filecoin_tipset) to prevent unnecessary CPU/IO when RPC is disabled.
| tokio::spawn(async move { | ||
| let ts = state_manager.chain_index().load_required_tipset(&tsk)?; | ||
| for tx_info in | ||
| [crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash] | ||
| { | ||
| if let Err(e) = crate::rpc::eth::Block::from_filecoin_tipset( | ||
| &state_manager, | ||
| ts.shallow_clone(), | ||
| tx_info, | ||
| ) | ||
| .await { | ||
| warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}"); | ||
| } | ||
| } | ||
| anyhow::Ok(()) | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify this path currently has a fallible load with `?` and no dedicated load-failure log.
rg -nC3 'load_required_tipset\(&tsk\)\?' src/daemon/mod.rs
rg -nC3 'failed to call `Block::from_filecoin_tipset` for cache warmup' src/daemon/mod.rs
rg -nC3 'tokio::spawn\(async move \{' src/daemon/mod.rsRepository: ChainSafe/forest
Length of output: 1236
Add error logging for tipset load failures in cache warmup task.
The load_required_tipset(&tsk)? call on line 361 uses the ? operator in a spawned task. If this fails, the error propagates silently without logging, making debugging difficult. Explicit error handling with a warning message is needed to maintain visibility into cache warmup failures.
Suggested change
tokio::spawn(async move {
- let ts = state_manager.chain_index().load_required_tipset(&tsk)?;
+ let ts = match state_manager.chain_index().load_required_tipset(&tsk) {
+ Ok(ts) => ts,
+ Err(e) => {
+ warn!(
+ "failed to load tipset {tsk} for RPC cache warmup: {e:#}"
+ );
+ return;
+ }
+ };
for tx_info in
[crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash]
{📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tokio::spawn(async move { | |
| let ts = state_manager.chain_index().load_required_tipset(&tsk)?; | |
| for tx_info in | |
| [crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash] | |
| { | |
| if let Err(e) = crate::rpc::eth::Block::from_filecoin_tipset( | |
| &state_manager, | |
| ts.shallow_clone(), | |
| tx_info, | |
| ) | |
| .await { | |
| warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}"); | |
| } | |
| } | |
| anyhow::Ok(()) | |
| }); | |
| tokio::spawn(async move { | |
| let ts = match state_manager.chain_index().load_required_tipset(&tsk) { | |
| Ok(ts) => ts, | |
| Err(e) => { | |
| warn!( | |
| "failed to load tipset {tsk} for RPC cache warmup: {e:#}" | |
| ); | |
| return; | |
| } | |
| }; | |
| for tx_info in | |
| [crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash] | |
| { | |
| if let Err(e) = crate::rpc::eth::Block::from_filecoin_tipset( | |
| &state_manager, | |
| ts.shallow_clone(), | |
| tx_info, | |
| ) | |
| .await { | |
| warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}"); | |
| } | |
| } | |
| anyhow::Ok(()) | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/daemon/mod.rs` around lines 360 - 375, The spawned task currently uses
the `?` operator on `state_manager.chain_index().load_required_tipset(&tsk)?`,
which will silently propagate errors; change this to explicitly handle the
Result: call `state_manager.chain_index().load_required_tipset(&tsk)`, match on
Err/Ok, and on Err log a warning (including `tsk` and the error) and return
early from the task (e.g., via `return`/`anyhow::Ok(())`) so the subsequent
`Block::from_filecoin_tipset` calls only run when the tipset loaded
successfully; reference `state_manager.chain_index().load_required_tipset(&tsk)`
and the cache warmup loop invoking
`crate::rpc::eth::Block::from_filecoin_tipset`.
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 9 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
|
no green checkmark, no review |
Summary of changes
Changes introduced in this pull request:
Reference issue to close (if applicable)
Closes
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
New Features
Performance
Refactor