Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ signet-types = "0.19"
signet-zenith = "0.19"
signet-journal = "0.19"
signet-journal-chain = "0.1"
signet-journal-client = "0.1"
signet-storage = "0.10"
signet-cold = "0.10"
signet-hot = "0.10"
Expand Down Expand Up @@ -129,6 +130,7 @@ tempfile = "3.17.0"

[patch.crates-io]
signet-journal-chain = { git = "https://github.com/init4tech/journal-service", branch = "main" }
signet-journal-client = { git = "https://github.com/init4tech/journal-service", branch = "main" }
signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "fraser/eng-2017/journal-hashes" }
signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "fraser/eng-2017/journal-hashes" }
signet-cold-sql = { git = "https://github.com/init4tech/storage.git", branch = "fraser/eng-2017/journal-hashes" }
Expand Down
12 changes: 11 additions & 1 deletion crates/host-reth/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures_util::StreamExt;
use reth::{
chainspec::EthChainSpec,
primitives::{EthPrimitives, Receipt},
providers::{BlockIdReader, BlockReader, HeaderProvider, ReceiptProvider},
providers::{BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ReceiptProvider},
};
use reth_exex::{ExExContext, ExExEvent, ExExNotifications, ExExNotificationsStream};
use reth_node_api::{FullNodeComponents, NodeTypes};
Expand Down Expand Up @@ -261,4 +261,14 @@ where
self.events.send(ExExEvent::FinishedHeight(BlockNumHash { number: block_number, hash }))?;
Ok(())
}

async fn host_tip(&self) -> Result<u64, Self::Error> {
Ok(self.provider.best_block_number()?)
}

// A reth ExEx shares the host's notification pipeline; unconsumed notifications fill reth's
// buffer and stall its pipeline, so a journal-syncing node must drain them.
fn backpressures_host(&self) -> bool {
true
}
}
4 changes: 4 additions & 0 deletions crates/host-rpc/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,8 @@ where
// No-op: no ExEx to notify for an RPC follower.
Ok(())
}

async fn host_tip(&self) -> Result<u64, Self::Error> {
Ok(self.provider.get_block_number().await?)
}
}
1 change: 1 addition & 0 deletions crates/node-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ alloy.workspace = true
eyre.workspace = true
reqwest.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tracing.workspace = true
signet-genesis.workspace = true
Expand Down
212 changes: 208 additions & 4 deletions crates/node-config/src/journal.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,54 @@
use core::num::NonZeroU64;
use init4_bin_base::utils::from_env::FromEnv;
use core::{num::NonZeroU64, str::FromStr, time::Duration};
use init4_bin_base::utils::from_env::{FromEnv, FromEnvErr, FromEnvVar};
use signet_journal_chain::SAFETY_MARGIN;
use tracing::warn;

/// How a node sources rollup state.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SyncStrategy {
/// Execute host blocks to derive state (the current, default behaviour).
#[default]
Blocks,
/// Apply pre-computed journals from upstream sources without executing blocks.
Journals,
}

impl FromStr for SyncStrategy {
type Err = ParseSyncStrategyError;

fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.trim().to_ascii_lowercase().as_str() {
"blocks" => Ok(Self::Blocks),
"journals" => Ok(Self::Journals),
other => Err(ParseSyncStrategyError(other.to_owned())),
}
}
}

impl FromEnvVar for SyncStrategy {
fn from_env_var(env_var: &str) -> Result<Self, FromEnvErr> {
let raw = String::from_env_var(env_var)?;
raw.parse().map_err(|error| FromEnvErr::parse_error(env_var, error))
}
}

/// Error parsing a [`SyncStrategy`] from a string.
#[derive(Debug, Clone, thiserror::Error)]
#[error("invalid journal sync strategy '{0}', expected 'blocks' or 'journals'")]
pub struct ParseSyncStrategyError(String);

/// Error returned by [`JournalConfig::validate`].
#[derive(Debug, Clone, Copy, thiserror::Error)]
pub enum JournalConfigError {
/// `sync_strategy` is [`SyncStrategy::Journals`] but no upstream sources were configured.
#[error(
"journal sync strategy is 'journals' but no upstream sources were configured \
(set SIGNET_JOURNAL_SOURCES)"
)]
MissingSources,
}

/// Default maximum total byte size of the journal ring buffer (64 MiB).
pub const DEFAULT_RING_BUFFER_MAX_BYTES: u64 = 64 * 1024 * 1024;

Expand All @@ -24,9 +70,45 @@ pub const DEFAULT_MAX_SUBSCRIBER_LAG: u64 = 100;
/// All fields are optional. When unset, [`JournalConfig`] returns the
/// constants above via its accessors. Configurable via environment variables
/// (`SIGNET_JOURNAL_*`) or via serde for file-based config.
#[derive(Debug, Clone, Copy, Default, serde::Deserialize, FromEnv)]
#[derive(Debug, Clone, Default, serde::Deserialize, FromEnv)]
#[serde(rename_all = "camelCase", default)]
pub struct JournalConfig {
/// Sync strategy: execute host blocks (`blocks`, default) or apply journals
/// from upstream sources (`journals`).
#[from_env(
var = "SIGNET_JOURNAL_SYNC_STRATEGY",
desc = "Journal sync strategy: 'blocks' or 'journals' [default: blocks]",
optional
)]
sync_strategy: Option<SyncStrategy>,

/// Prioritised upstream journal WebSocket source URLs (comma-separated).
/// Required when `sync_strategy` is `journals`.
#[from_env(
var = "SIGNET_JOURNAL_SOURCES",
desc = "Comma-separated upstream journal WebSocket URLs (required for journals strategy)",
optional
)]
sources: Option<Vec<String>>,

/// Per-source stall timeout in milliseconds for the journal client. Falls
/// back to the client default (60s) when unset.
#[from_env(
var = "SIGNET_JOURNAL_CLIENT_SOURCE_STALL_TIMEOUT_MS",
desc = "Journal client per-source stall timeout in ms [default: 60000]",
optional
)]
client_source_stall_timeout_ms: Option<u64>,

/// Faulty-source backoff in milliseconds for the journal client. Falls back
/// to the client default (30s) when unset.
#[from_env(
var = "SIGNET_JOURNAL_CLIENT_SOURCE_BACKOFF_MS",
desc = "Journal client faulty-source backoff in ms [default: 30000]",
optional
)]
client_source_backoff_ms: Option<u64>,

/// Maximum total byte size of the journal ring buffer.
#[from_env(
var = "SIGNET_JOURNAL_RING_BUFFER_MAX_BYTES",
Expand Down Expand Up @@ -86,11 +168,37 @@ impl JournalConfig {
NonZeroU64::new(value).expect("DEFAULT_MAX_SUBSCRIBER_LAG is non-zero")
}

/// The configured sync strategy, defaulting to [`SyncStrategy::Blocks`].
pub fn sync_strategy(&self) -> SyncStrategy {
self.sync_strategy.unwrap_or_default()
}

/// Upstream journal WebSocket source URLs (as raw strings). Empty when none
/// are configured. Required when [`Self::sync_strategy`] is
/// [`SyncStrategy::Journals`].
pub fn sources(&self) -> &[String] {
self.sources.as_deref().unwrap_or(&[])
}

/// Per-source stall timeout for the journal client, when overridden. `None`
/// lets the client's own default (60s) stand.
pub fn client_source_stall_timeout(&self) -> Option<Duration> {
self.client_source_stall_timeout_ms.map(Duration::from_millis)
}

/// Faulty-source backoff for the journal client, when overridden. `None`
/// lets the client's own default (30s) stand.
pub fn client_source_backoff(&self) -> Option<Duration> {
self.client_source_backoff_ms.map(Duration::from_millis)
}

/// Emit a warning for any field that is explicitly set to a value the
/// journal chain will silently normalize. Covers a zero
/// `max_subscriber_lag` (which the chain rejects, so the default is
/// substituted) and a `ring_buffer_max_count` below [`SAFETY_MARGIN`]
/// (which the chain clamps up). Intended to be called once at startup.
/// (which the chain clamps up). Also warns when journal-client-only or
/// `journals`-strategy-only options are set but the strategy will ignore
/// them. Intended to be called once at startup.
pub fn warn_on_misconfiguration(&self) {
if self.max_subscriber_lag == Some(0) {
warn!(
Expand All @@ -109,5 +217,101 @@ impl JournalConfig {
margin and will be clamped up"
);
}
// The journal-sync inputs (sources and client tuning knobs) are only consulted under
// the `journals` strategy. If they are set while the node will execute blocks, they are
// dead config - surface that rather than silently ignoring them.
if self.sync_strategy() != SyncStrategy::Journals {
if !self.sources().is_empty() {
warn!(
"SIGNET_JOURNAL_SOURCES is set but the sync strategy is not 'journals'; \
the configured sources will be ignored"
);
}
if self.client_source_stall_timeout_ms.is_some()
|| self.client_source_backoff_ms.is_some()
{
warn!(
"journal client tuning knobs are set but the sync strategy is not \
'journals'; they will be ignored"
);
}
}
}

/// Validate cross-field invariants. Intended to be called once at startup,
/// after [`Self::warn_on_misconfiguration`].
///
/// # Errors
///
/// Returns [`JournalConfigError::MissingSources`] when the strategy is
/// [`SyncStrategy::Journals`] but no upstream sources are configured.
pub fn validate(&self) -> Result<(), JournalConfigError> {
if self.sync_strategy() == SyncStrategy::Journals && self.sources().is_empty() {
return Err(JournalConfigError::MissingSources);
}
Ok(())
}

/// Construct a journal-sync configuration ([`SyncStrategy::Journals`]) pointing at the given
/// upstream sources, with short client timeouts so tests fail over and retry quickly. All
/// other fields take their defaults.
#[cfg(any(test, feature = "test_utils"))]
pub fn journal_sync_for_test(sources: Vec<String>) -> Self {
Self {
sync_strategy: Some(SyncStrategy::Journals),
sources: Some(sources),
client_source_stall_timeout_ms: Some(200),
client_source_backoff_ms: Some(50),
..Default::default()
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn sync_strategy_parses_case_insensitively() {
assert_eq!("blocks".parse::<SyncStrategy>().unwrap(), SyncStrategy::Blocks);
assert_eq!("Journals".parse::<SyncStrategy>().unwrap(), SyncStrategy::Journals);
assert_eq!(" JOURNALS ".parse::<SyncStrategy>().unwrap(), SyncStrategy::Journals);
"neither".parse::<SyncStrategy>().unwrap_err();
}

#[test]
fn default_strategy_is_blocks() {
assert_eq!(JournalConfig::default().sync_strategy(), SyncStrategy::Blocks);
}

#[test]
fn validate_requires_sources_for_journals() {
let config =
JournalConfig { sync_strategy: Some(SyncStrategy::Journals), ..Default::default() };
config.validate().unwrap_err();

let config = JournalConfig {
sync_strategy: Some(SyncStrategy::Journals),
sources: Some(vec!["ws://host:9545".to_owned()]),
..Default::default()
};
config.validate().unwrap();
}

#[test]
fn validate_allows_blocks_without_sources() {
JournalConfig::default().validate().unwrap();
}

#[test]
fn client_timeouts_convert_from_millis() {
let config = JournalConfig {
client_source_stall_timeout_ms: Some(1500),
client_source_backoff_ms: Some(250),
..Default::default()
};
assert_eq!(config.client_source_stall_timeout(), Some(Duration::from_millis(1500)));
assert_eq!(config.client_source_backoff(), Some(Duration::from_millis(250)));
assert_eq!(JournalConfig::default().client_source_stall_timeout(), None);
}
}
2 changes: 1 addition & 1 deletion crates/node-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use core::SignetNodeConfig;
mod journal;
pub use journal::{
DEFAULT_MAX_SUBSCRIBER_LAG, DEFAULT_RING_BUFFER_MAX_BYTES, DEFAULT_RING_BUFFER_MAX_COUNT,
JournalConfig,
JournalConfig, JournalConfigError, ParseSyncStrategyError, SyncStrategy,
};

mod storage;
Expand Down
8 changes: 7 additions & 1 deletion crates/node-config/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ use std::borrow::Cow;

/// Make a test config.
pub fn test_config() -> SignetNodeConfig {
test_config_with_journal(JournalConfig::default())
}

/// Make a test config with a caller-supplied [`JournalConfig`]. Used by journal-sync tests to
/// point the node at an upstream WebSocket source.
pub const fn test_config_with_journal(journal: JournalConfig) -> SignetNodeConfig {
SignetNodeConfig::new(
BlobFetcherConfig::new(Cow::Borrowed("")),
StorageConfig::new(Cow::Borrowed("NOP"), Cow::Borrowed("NOP")),
None,
JournalConfig::default(),
journal,
GenesisSpec::Known(KnownChains::Test),
SlotCalculator::new(0, 0, 12),
)
Expand Down
4 changes: 4 additions & 0 deletions crates/node-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ tracing.workspace = true
tracing-subscriber.workspace = true

[dev-dependencies]
bytes.workspace = true
serde_json.workspace = true
serial_test = "3.2.0"
signet-journal-chain = { workspace = true, features = ["test-utils", "signet-extract"] }
signet-journal.workspace = true
tokio-util.workspace = true
trevm.workspace = true
Loading
Loading