Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ impl ChainFollower {
)
.await
}

/// Subscribe to validated tipsets.
pub fn subscribe_validated_tipset(&self) -> tokio::sync::broadcast::Receiver<TipsetKey> {
self.state_machine
.lock()
.validated_tipset_broadcast_tx
.subscribe()
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -610,6 +618,8 @@ struct SyncStateMachine {
// Map from TipsetKey to FullTipset
tipsets: HashMap<TipsetKey, FullTipset>,
stateless_mode: bool,
/// Broadcast channel for validated tipsets, used to notify other components of new validated tipsets.
validated_tipset_broadcast_tx: tokio::sync::broadcast::Sender<TipsetKey>,
}

impl SyncStateMachine {
Expand All @@ -623,6 +633,7 @@ impl SyncStateMachine {
bad_block_cache,
tipsets: HashMap::default(),
stateless_mode,
validated_tipset_broadcast_tx: tokio::sync::broadcast::Sender::new(1024),
}
}

Expand Down Expand Up @@ -783,10 +794,10 @@ impl SyncStateMachine {
}
}

fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) {
fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) -> bool {
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
if !self.is_parent_validated(&tipset) {
tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), parent_state = %tipset.parent_state(), "Parent tipset must be validated");
return;
return false;
}

self.tipsets.remove(tipset.key());
Expand All @@ -798,17 +809,21 @@ impl SyncStateMachine {
if self.cs.heaviest_tipset().weight() < tipset.weight() {
if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
error!("Error setting heaviest tipset: {}", e);
return false;
} else {
info!("Heaviest tipset: {} ({})", epoch, terse_key);
}
}
} else if is_proposed_head {
if let Err(e) = self.cs.put_tipset(&tipset) {
error!("Error putting tipset: {e}");
return false;
}
} else if let Err(e) = self.cs.set_heaviest_tipset(tipset) {
error!("Error setting heaviest tipset: {e}");
return false;
}
true
}

pub fn update(&mut self, event: SyncEvent) {
Expand All @@ -823,7 +838,15 @@ impl SyncStateMachine {
SyncEvent::ValidatedTipset {
tipset,
is_proposed_head,
} => self.mark_validated_tipset(tipset, is_proposed_head),
} => {
let tsk = tipset.key().clone();
if self.mark_validated_tipset(tipset, is_proposed_head)
&& crate::utils::broadcast::has_subscribers(&self.validated_tipset_broadcast_tx)
&& let Err(e) = self.validated_tipset_broadcast_tx.send(tsk)
{
warn!("Failed to broadcast validated tipset: {e}");
}
}
}
}

Expand Down
37 changes: 37 additions & 0 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,44 @@ fn start_chain_follower_service(
services: &mut JoinSet<anyhow::Result<()>>,
chain_follower: ChainFollower,
) {
let sync_status = chain_follower.sync_status.shallow_clone();
let state_manager = chain_follower.state_manager.shallow_clone();
let mut validated_tipset_rx = chain_follower.subscribe_validated_tipset();
services.spawn(async move { chain_follower.run().await });
// Prefill RPC method caches for newly validated tipsets to speed up subsequent RPC calls.
services.spawn(async move {
loop {
match validated_tipset_rx.recv().await {
Ok(_) if !sync_status.read().is_synced() => {
// Skip if the node is catching up to avoid unnecessary work, as the head may be changing rapidly.
continue;
}
Ok(tsk) => {
let state_manager = state_manager.shallow_clone();
tokio::spawn(async move {
let ts = state_manager.chain_index().load_required_tipset(&tsk)?;
for tx_info in
[crate::rpc::eth::TxInfo::Full, crate::rpc::eth::TxInfo::Hash]
{
if let Err(e) = crate::rpc::eth::Block::from_filecoin_tipset(
&state_manager,
ts.shallow_clone(),
tx_info,
)
.await {
warn!("failed to call `Block::from_filecoin_tipset` for cache warmup: {e:#}");
}
}
anyhow::Ok(())
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
Err(RecvError::Lagged(n)) => {
warn!("validated tipset broadcast lagged: skipped {n} tipsets")
}
Err(RecvError::Closed) => break Ok(()),
}
}
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

async fn maybe_start_health_check_service(
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) fn new_heads(data: Ctx) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
for ts in changes.applies {
// Convert the tipset to an Ethereum block with full transaction info
// Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block
match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
match EthBlock::from_filecoin_tipset(&data.state_manager, ts, TxInfo::Full).await {
Ok(block) => {
if let Err(e) = sender.send(ApiHeaders(block)) {
tracing::error!("Failed to send headers: {}", e);
Expand Down
18 changes: 9 additions & 9 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::shim::gas::GasOutputs;
use crate::shim::message::Message;
use crate::shim::trace::{CallReturn, ExecutionEvent};
use crate::shim::{clock::ChainEpoch, state_tree::StateTree};
use crate::state_manager::{ExecutedMessage, ExecutedTipset, TipsetState, VMFlush};
use crate::state_manager::{ExecutedMessage, ExecutedTipset, StateManager, TipsetState, VMFlush};
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::db::BlockstoreExt as _;
use crate::utils::encoding::from_slice_with_fallback;
Expand Down Expand Up @@ -474,13 +474,13 @@ impl Block {
///
/// Reference: <https://github.com/filecoin-project/lotus/blob/941455f1d23e73b9ee92a1a4ce745d8848969858/node/impl/eth/utils.go#L44>
pub async fn from_filecoin_tipset(
ctx: Ctx,
state_manager: &StateManager,
tipset: crate::blocks::Tipset,
tx_info: TxInfo,
) -> Result<Self> {
static ETH_BLOCK_CACHE: LazyLock<SizeTrackingLruCache<CidWrapper, Block>> =
LazyLock::new(|| {
const DEFAULT_CACHE_SIZE: NonZeroUsize = nonzero!(500usize);
const DEFAULT_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
let cache_size = std::env::var("FOREST_ETH_BLOCK_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
Expand All @@ -500,9 +500,9 @@ impl Block {
state_root,
executed_messages,
..
} = ctx.state_manager.load_executed_tipset(&tipset).await?;
} = state_manager.load_executed_tipset(&tipset).await?;
let has_transactions = !executed_messages.is_empty();
let state_tree = ctx.state_manager.get_state_tree(&state_root)?;
let state_tree = state_manager.get_state_tree(&state_root)?;

let mut full_transactions = vec![];
let mut gas_used = 0;
Expand All @@ -519,13 +519,13 @@ impl Block {
ChainMessage::Signed(smsg) => new_eth_tx_from_signed_message(
smsg,
&state_tree,
ctx.chain_config().eth_chain_id,
state_manager.chain_config().eth_chain_id,
)?,
ChainMessage::Unsigned(msg) => {
let tx = eth_tx_from_native_message(
msg,
&state_tree,
ctx.chain_config().eth_chain_id,
state_manager.chain_config().eth_chain_id,
)?;
ApiEthTx {
hash: msg.cid().into(),
Expand Down Expand Up @@ -1407,7 +1407,7 @@ impl RpcMethod<2> for EthGetBlockByHash {
let ts = resolver
.tipset_by_block_number_or_hash(block_hash, ResolveNullTipset::TakeOlder)
.await?;
Block::from_filecoin_tipset(ctx, ts, full_tx_info.into())
Block::from_filecoin_tipset(&ctx.state_manager, ts, full_tx_info.into())
.await
.map_err(ServerError::from)
}
Expand Down Expand Up @@ -1435,7 +1435,7 @@ impl RpcMethod<2> for EthGetBlockByNumber {
let ts = resolver
.tipset_by_block_number_or_hash(block_param, ResolveNullTipset::TakeOlder)
.await?;
Block::from_filecoin_tipset(ctx, ts, full_tx_info.into())
Block::from_filecoin_tipset(&ctx.state_manager, ts, full_tx_info.into())
.await
.map_err(ServerError::from)
}
Expand Down
Loading