diff --git a/Cargo.lock b/Cargo.lock index 04e5b58420..6d8e8c841c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8180,6 +8180,7 @@ version = "0.1.0" dependencies = [ "ahash 0.8.12", "async-channel", + "blake3", "bytes", "compio", "compio-quic", @@ -8191,6 +8192,7 @@ dependencies = [ "libc", "rand 0.10.1", "rcgen", + "ring", "rustls", "rustls-pemfile", "scopeguard", @@ -8199,6 +8201,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tracing", + "zeroize", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0f9ffd242a..29828a7696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -336,6 +336,7 @@ wiremock = "0.6" yew = { version = "0.23", features = ["csr"] } yew-router = "0.20" zbus-secret-service-keyring-store = { version = "1.0.0", features = ["rt-async-io-crypto-rust"] } +zeroize = "1" zip = { version = "8.6.0", default-features = false, features = ["deflate"] } [profile.release] diff --git a/core/binary_protocol/src/consensus/command.rs b/core/binary_protocol/src/consensus/command.rs index 384bf696ae..6adafef83e 100644 --- a/core/binary_protocol/src/consensus/command.rs +++ b/core/binary_protocol/src/consensus/command.rs @@ -40,6 +40,11 @@ pub enum Command2 { DoViewChange = 11, StartView = 12, Eviction = 13, + + // Replica-to-replica auth handshake (server-ng consensus plane). + ReplicaHello = 14, + ReplicaChallenge = 15, + ReplicaFinish = 16, } // SAFETY: Command2 is #[repr(u8)] with no padding bytes. @@ -50,7 +55,7 @@ unsafe impl CheckedBitPattern for Command2 { type Bits = u8; fn is_valid_bit_pattern(bits: &u8) -> bool { - *bits <= 13 + *bits <= Self::ReplicaFinish as u8 } } @@ -68,4 +73,19 @@ mod tests { let result = bytemuck::checked::try_from_bytes::(&buf); assert!(result.is_err()); } + + #[test] + fn replica_auth_commands_are_valid_bit_patterns() { + // Locks the is_valid_bit_pattern bump: 14/15/16 parse, 17 still rejects. + for command in 14u8..=16 { + let mut buf: AVec> = AVec::new(16); + buf.resize(256, 0); + buf[60] = command; + assert!(bytemuck::checked::try_from_bytes::(&buf).is_ok()); + } + let mut buf: AVec> = AVec::new(16); + buf.resize(256, 0); + buf[60] = 17; + assert!(bytemuck::checked::try_from_bytes::(&buf).is_err()); + } } diff --git a/core/binary_protocol/src/consensus/header.rs b/core/binary_protocol/src/consensus/header.rs index 650c30772a..a644efcd9d 100644 --- a/core/binary_protocol/src/consensus/header.rs +++ b/core/binary_protocol/src/consensus/header.rs @@ -25,6 +25,10 @@ use std::mem::offset_of; pub const HEADER_SIZE: usize = 256; +/// Length of [`GenericHeader::reserved_command`], the per-command scratch area +/// the replica-auth handshake writes its nonce / MAC / reject-reason into. +pub const RESERVED_COMMAND_LEN: usize = 128; + /// Byte offset of [`GenericHeader::size`] within the on-wire header. /// /// Single source of truth for transports that decode the size field @@ -89,7 +93,7 @@ pub struct GenericHeader { pub command: Command2, pub replica: u8, pub reserved_frame: [u8; 66], - pub reserved_command: [u8; 128], + pub reserved_command: [u8; RESERVED_COMMAND_LEN], } const _: () = { assert!(size_of::() == HEADER_SIZE); @@ -101,7 +105,10 @@ const _: () = { offset_of!(GenericHeader, reserved_command) == offset_of!(GenericHeader, reserved_frame) + size_of::<[u8; 66]>() ); - assert!(offset_of!(GenericHeader, reserved_command) + size_of::<[u8; 128]>() == HEADER_SIZE); + assert!( + offset_of!(GenericHeader, reserved_command) + size_of::<[u8; RESERVED_COMMAND_LEN]>() + == HEADER_SIZE + ); }; impl ConsensusHeader for GenericHeader { diff --git a/core/binary_protocol/src/consensus/mod.rs b/core/binary_protocol/src/consensus/mod.rs index 6132c2da78..851ac33202 100644 --- a/core/binary_protocol/src/consensus/mod.rs +++ b/core/binary_protocol/src/consensus/mod.rs @@ -45,7 +45,7 @@ pub use command::Command2; pub use error::ConsensusError; pub use header::{ CommitHeader, ConsensusHeader, DoViewChangeHeader, EvictionHeader, EvictionReason, - GenericHeader, HEADER_SIZE, PrepareHeader, PrepareOkHeader, ReplyHeader, RequestHeader, - SIZE_FIELD_OFFSET, StartViewChangeHeader, StartViewHeader, read_size_field, + GenericHeader, HEADER_SIZE, PrepareHeader, PrepareOkHeader, RESERVED_COMMAND_LEN, ReplyHeader, + RequestHeader, SIZE_FIELD_OFFSET, StartViewChangeHeader, StartViewHeader, read_size_field, }; pub use operation::Operation; diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 49c813ff14..00e2e5653c 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -72,8 +72,8 @@ pub use codec::{WireDecode, WireEncode}; pub use consensus::{ Command2, CommitHeader, ConsensusError, ConsensusHeader, DoViewChangeHeader, EvictionHeader, EvictionReason, GenericHeader, HEADER_SIZE, Operation, PrepareHeader, PrepareOkHeader, - ReplyHeader, RequestHeader, SIZE_FIELD_OFFSET, StartViewChangeHeader, StartViewHeader, - read_size_field, + RESERVED_COMMAND_LEN, ReplyHeader, RequestHeader, SIZE_FIELD_OFFSET, StartViewChangeHeader, + StartViewHeader, read_size_field, }; pub use dispatch::{COMMAND_TABLE, CommandMeta, lookup_by_operation, lookup_command}; pub use error::WireError; diff --git a/core/configs/src/server_config/cluster.rs b/core/configs/src/server_config/cluster.rs index 5581532075..ef348e1039 100644 --- a/core/configs/src/server_config/cluster.rs +++ b/core/configs/src/server_config/cluster.rs @@ -27,8 +27,47 @@ pub struct ClusterConfig { /// every node so operators ship one config. The running node's identity /// is supplied out-of-band via the `--replica-id` CLI flag, which /// selects the entry in this list that describes the current node. + // + // TODO(hubcio): IGGY-155 `register-replica` CLI (a validated roster + // append) is deferred - it is convenience only over a manual TOML edit, + // and `ClusterConfig::validate` already rejects a malformed roster at + // boot. Add it only if scripted/automated roster edits become a need. #[serde(default)] pub nodes: Vec, + /// Replica-to-replica authentication settings (PSK + BLAKE3 handshake). + #[serde(default)] + pub auth: ClusterAuthConfig, +} + +/// Replica-to-replica authentication for the consensus (`tcp_replica`) port. +#[derive(Debug, Default, Deserialize, Serialize, Clone, ConfigEnv)] +#[serde(deny_unknown_fields)] +pub struct ClusterAuthConfig { + /// When true, every replica peer must complete the authenticated handshake + /// or be rejected, and [`Self::shared_secret`] is mandatory. When false + /// (default) the replica handshake stays in legacy unauthenticated mode and + /// `shared_secret` is not used for authentication. A configured non-empty + /// `shared_secret` must still meet the 32-byte minimum whenever the cluster + /// is enabled (a short value fails boot even with auth off). + /// + /// Enabling auth is a coordinated-restart change, and not the only one: the + /// consensus `cluster_id` is derived from `ClusterConfig::name` + /// unconditionally, so a mixed-version roster fails to connect regardless of + /// this flag. Flip every node in one restart. + #[serde(default)] + pub enabled: bool, + /// Cluster-wide pre-shared key for replica-to-replica authentication. + /// + /// At least 32 bytes of CSPRNG output, byte-identical across every node. + /// Provisioned out-of-band, normally via `IGGY_CLUSTER_AUTH_SHARED_SECRET` + /// rather than the on-disk config. + // skip_serializing keeps the PSK out of the runtime `current_config.toml` + // (and the `ServerConfig` diagnostic snapshot that cats it). The live + // secret is read from env / on-disk config at boot, never from the + // snapshot, so it must never be persisted there. Deserialize is retained. + #[serde(default, skip_serializing)] + #[config_env(secret)] + pub shared_secret: String, } #[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] @@ -52,3 +91,35 @@ pub struct TransportPorts { /// Dedicated port for replica-to-replica consensus traffic. pub tcp_replica: Option, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shared_secret_is_never_serialized() { + // Regression guard: the runtime current_config.toml (and the + // ServerConfig diagnostic snapshot that cats it) are produced by + // serializing this struct, so the PSK must not survive serialize. + // skip_serializing is format-agnostic, so a JSON dump proves the toml + // path too. + let config = ClusterConfig { + enabled: true, + name: "iggy-cluster".to_owned(), + nodes: Vec::new(), + auth: ClusterAuthConfig { + enabled: true, + shared_secret: "current-psk-MUST-NOT-be-persisted".to_owned(), + }, + }; + let serialized = serde_json::to_string(&config).expect("serialize cluster config"); + assert!( + !serialized.contains("MUST-NOT-be-persisted"), + "PSK leaked into serialized config: {serialized}" + ); + assert!( + !serialized.contains("shared_secret"), + "shared_secret field present in serialized config: {serialized}" + ); + } +} diff --git a/core/configs/src/server_config/defaults.rs b/core/configs/src/server_config/defaults.rs index f6bbba85c2..b138d7406a 100644 --- a/core/configs/src/server_config/defaults.rs +++ b/core/configs/src/server_config/defaults.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::cluster::{ClusterConfig, ClusterNodeConfig, TransportPorts}; +use super::cluster::{ClusterAuthConfig, ClusterConfig, ClusterNodeConfig, TransportPorts}; use super::http::{HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig}; use super::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; use super::server::{ @@ -605,6 +605,7 @@ impl Default for ClusterConfig { }, }) .collect(), + auth: ClusterAuthConfig::default(), } } } diff --git a/core/configs/src/server_config/http.rs b/core/configs/src/server_config/http.rs index 61df847e47..cb6271d328 100644 --- a/core/configs/src/server_config/http.rs +++ b/core/configs/src/server_config/http.rs @@ -75,8 +75,13 @@ pub struct HttpJwtConfig { #[config_env(leaf)] #[serde_as(as = "DisplayFromStr")] pub not_before: IggyDuration, + // skip_serializing keeps the secrets out of the runtime current_config.toml + // (and the diagnostic snapshot that cats it). The live secrets are read from + // env / on-disk config at boot, never from the snapshot. + #[serde(default, skip_serializing)] #[config_env(secret)] pub encoding_secret: String, + #[serde(default, skip_serializing)] #[config_env(secret)] pub decoding_secret: String, pub use_base64_secret: bool, @@ -158,3 +163,36 @@ impl HttpJwtConfig { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn jwt_secrets_are_never_serialized() { + // current_config.toml (and the diagnostic snapshot that cats it) is + // produced by serializing this struct, so the JWT secrets must not + // survive a serialize. Built via deserialize to skip enumerating the + // duration fields; skip_serializing is format-agnostic, so a JSON dump + // proves the toml path too. + let json = r#"{ + "algorithm": "HS256", + "issuer": "iggy.apache.org", + "audience": "iggy.apache.org", + "valid_issuers": ["iggy.apache.org"], + "valid_audiences": ["iggy.apache.org"], + "access_token_expiry": "1 h", + "clock_skew": "5 s", + "not_before": "0 s", + "encoding_secret": "encoding-MUST-NOT-be-persisted", + "decoding_secret": "decoding-MUST-NOT-be-persisted", + "use_base64_secret": false + }"#; + let config: HttpJwtConfig = serde_json::from_str(json).expect("deserialize jwt config"); + let serialized = serde_json::to_string(&config).expect("serialize jwt config"); + assert!( + !serialized.contains("MUST-NOT-be-persisted"), + "JWT secret leaked into serialized config: {serialized}" + ); + } +} diff --git a/core/configs/src/server_config/system.rs b/core/configs/src/server_config/system.rs index ed66c9449e..59c9b397f7 100644 --- a/core/configs/src/server_config/system.rs +++ b/core/configs/src/server_config/system.rs @@ -103,6 +103,10 @@ pub struct LoggingConfig { #[derive(Debug, Deserialize, Serialize, ConfigEnv)] pub struct EncryptionConfig { pub enabled: bool, + // skip_serializing keeps the key out of the runtime current_config.toml (and + // the diagnostic snapshot that cats it). The live key is read from env / + // on-disk config at boot, never from the snapshot. + #[serde(default, skip_serializing)] #[config_env(secret)] pub key: String, } @@ -354,3 +358,25 @@ impl SystemPaths for SystemConfig { SystemConfig::get_runtime_path(self) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encryption_key_is_never_serialized() { + // current_config.toml (and the diagnostic snapshot that cats it) is + // produced by serializing this struct, so the key must not survive a + // serialize. skip_serializing is format-agnostic, so a JSON dump proves + // the toml path too. + let config = EncryptionConfig { + enabled: true, + key: "encryption-key-MUST-NOT-be-persisted".to_owned(), + }; + let serialized = serde_json::to_string(&config).expect("serialize encryption config"); + assert!( + !serialized.contains("MUST-NOT-be-persisted"), + "encryption key leaked into serialized config: {serialized}" + ); + } +} diff --git a/core/configs/src/server_config/validators.rs b/core/configs/src/server_config/validators.rs index 28bb9925ec..2889a571e3 100644 --- a/core/configs/src/server_config/validators.rs +++ b/core/configs/src/server_config/validators.rs @@ -484,6 +484,11 @@ impl Validatable for ShardingConfig { } } +/// Length floor for the replica-auth PSK, in raw bytes. The 32-byte MAC key +/// is KDF-derived from these bytes at use-site, so any encoding clearing this +/// length is accepted. +const MIN_SHARED_SECRET_LEN: usize = 32; + impl Validatable for ClusterConfig { fn validate(&self) -> Result<(), ConfigurationError> { if !self.enabled { @@ -584,6 +589,25 @@ impl Validatable for ClusterConfig { } } + // Replica-auth PSK (only reached when the cluster is enabled; the early + // return above skips these while it is disabled). When auth is enabled + // the key is mandatory; any configured key must clear the length floor - + // a typo guard that fires with auth off too, though only while the + // cluster itself is enabled. + let secret_len = self.auth.shared_secret.len(); + if self.auth.enabled && self.auth.shared_secret.is_empty() { + eprintln!( + "Invalid cluster configuration: cluster.auth.shared_secret must be set when cluster.auth.enabled is true" + ); + return Err(ConfigurationError::InvalidConfigurationValue); + } + if !self.auth.shared_secret.is_empty() && secret_len < MIN_SHARED_SECRET_LEN { + eprintln!( + "Invalid cluster configuration: cluster.auth.shared_secret must be >= {MIN_SHARED_SECRET_LEN} bytes" + ); + return Err(ConfigurationError::InvalidConfigurationValue); + } + Ok(()) } } @@ -591,7 +615,9 @@ impl Validatable for ClusterConfig { #[cfg(test)] mod cluster_validate_tests { use super::*; - use crate::server_config::cluster::{ClusterConfig, ClusterNodeConfig, TransportPorts}; + use crate::server_config::cluster::{ + ClusterAuthConfig, ClusterConfig, ClusterNodeConfig, TransportPorts, + }; fn node(name: &str, id: u8) -> ClusterNodeConfig { ClusterNodeConfig { @@ -607,6 +633,7 @@ mod cluster_validate_tests { enabled: true, name: "iggy-cluster".to_string(), nodes, + auth: ClusterAuthConfig::default(), } } @@ -720,6 +747,44 @@ mod cluster_validate_tests { let c = cfg(vec![n1]); assert!(c.validate().is_err()); } + + #[test] + fn validate_accepts_empty_secret_when_auth_disabled() { + // Default: no secret, auth off -> legacy mode, must pass. + let c = cfg(vec![node("n1", 0), node("n2", 1)]); + assert!(c.validate().is_ok()); + } + + #[test] + fn validate_rejects_missing_secret_when_auth_enabled() { + let mut c = cfg(vec![node("n1", 0), node("n2", 1)]); + c.auth.enabled = true; + assert!(c.validate().is_err()); + } + + #[test] + fn validate_rejects_short_secret_when_auth_enabled() { + let mut c = cfg(vec![node("n1", 0), node("n2", 1)]); + c.auth.enabled = true; + c.auth.shared_secret = "a".repeat(MIN_SHARED_SECRET_LEN - 1); + assert!(c.validate().is_err()); + } + + #[test] + fn validate_rejects_short_secret_even_when_auth_disabled() { + // Typo guard: a configured-but-short key fails even with auth off. + let mut c = cfg(vec![node("n1", 0), node("n2", 1)]); + c.auth.shared_secret = "a".repeat(MIN_SHARED_SECRET_LEN - 1); + assert!(c.validate().is_err()); + } + + #[test] + fn validate_accepts_valid_secret_when_auth_enabled() { + let mut c = cfg(vec![node("n1", 0), node("n2", 1)]); + c.auth.enabled = true; + c.auth.shared_secret = "a".repeat(MIN_SHARED_SECRET_LEN); + assert!(c.validate().is_ok()); + } } #[cfg(test)] diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml index 17265aa7e7..7573e82016 100644 --- a/core/message_bus/Cargo.toml +++ b/core/message_bus/Cargo.toml @@ -31,6 +31,7 @@ publish = false [dependencies] ahash = { workspace = true } async-channel = { workspace = true } +blake3 = { workspace = true } bytes = { workspace = true } compio = { workspace = true } compio-quic = { workspace = true } @@ -41,6 +42,7 @@ iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } libc = { workspace = true } rand = { workspace = true } +ring = { workspace = true } rustls = { workspace = true } rustls-pemfile = { workspace = true } scopeguard = { workspace = true } @@ -48,6 +50,7 @@ server_common = { workspace = true } socket2 = { workspace = true, features = ["all"] } thiserror = { workspace = true } tracing = { workspace = true } +zeroize = { workspace = true } [dev-dependencies] rcgen = { workspace = true } diff --git a/core/message_bus/src/connector.rs b/core/message_bus/src/connector.rs index d2bd797515..1282fde0cc 100644 --- a/core/message_bus/src/connector.rs +++ b/core/message_bus/src/connector.rs @@ -19,20 +19,24 @@ //! //! Runs only on shard 0. For each peer replica with `peer_id > self_id` the //! connector dials at startup and re-dials on a periodic sweep. After a -//! successful TCP connect the connector sends a plaintext `Ping` frame +//! successful TCP connect the connector sends a plaintext `ReplicaHello` frame //! announcing this replica's id and `cluster_id` and hands the stream //! to the `on_dialed` callback supplied by the shard bootstrap, which //! duplicates the fd and ships it to the owning shard via the //! inter-shard channel //! (see `shard::coordinator::ShardZeroCoordinator`). //! -//! No transport-level authentication: the future `LOGIN_REPLICA` -//! command in the caller (`server-ng`) carries the cluster shared -//! secret post-connect. +//! When `cluster.auth.enabled` is set the dialer runs the dialer +//! half of the 3-message mutual BLAKE3 keyed-MAC handshake (see +//! [`crate::replica::auth`]): it sends `nonce_d` in the `ReplicaHello`, verifies +//! the acceptor's `ReplicaChallenge` MAC, and sends a `ReplicaFinish` frame +//! carrying its own MAC before delegating the fd. With no secret configured it +//! sends a plain `ReplicaHello` exactly as before. use crate::IggyMessageBus; use crate::framing; use crate::lifecycle::ShutdownToken; +use crate::replica::auth::{self, ReplicaAuth, Transcript}; use crate::{AcceptedReplicaFn, GenericHeader, Message}; use compio::net::TcpStream; use iggy_binary_protocol::{Command2, HEADER_SIZE}; @@ -55,15 +59,27 @@ pub const DEFAULT_RECONNECT_PERIOD: Duration = Duration::from_secs(5); /// sweep in the background. The periodic task handle is tracked on the bus /// so graceful shutdown can await it. #[allow(clippy::future_not_send)] +#[allow(clippy::too_many_arguments)] pub async fn start( bus: &Rc, cluster_id: u128, self_id: u8, peers: Vec<(u8, SocketAddr)>, + auth: Option, + handshake_grace: Duration, on_dialed: AcceptedReplicaFn, reconnect_period: Duration, ) { - connect_all(bus, cluster_id, self_id, &peers, &on_dialed).await; + connect_all( + bus, + cluster_id, + self_id, + &peers, + auth.as_ref(), + handshake_grace, + &on_dialed, + ) + .await; let handler = on_dialed.clone(); let token = bus.token(); @@ -74,6 +90,8 @@ pub async fn start( cluster_id, self_id, peers, + auth, + handshake_grace, handler, reconnect_period, token, @@ -89,11 +107,14 @@ async fn connect_all( cluster_id: u128, self_id: u8, peers: &[(u8, SocketAddr)], + auth: Option<&ReplicaAuth>, + handshake_grace: Duration, on_dialed: &AcceptedReplicaFn, ) { - for &(peer_id, addr) in peers { + let max_message_size = bus.config().max_message_size; + let dials = peers.iter().filter_map(|&(peer_id, addr)| { if peer_id <= self_id { - continue; + return None; } // Skip peers that already have a live mapping on this cluster. // `replicas().contains` covers single-shard deployments where the @@ -111,38 +132,79 @@ async fn connect_all( replica = peer_id, "skip reconnect: peer already registered on cluster" ); - continue; + return None; } - connect_one(cluster_id, self_id, peer_id, addr, on_dialed).await; - } + Some(connect_one( + cluster_id, + self_id, + peer_id, + addr, + auth, + max_message_size, + handshake_grace, + on_dialed, + )) + }); + // Dial concurrently: an accept-but-silent peer must not stall the rest + // behind its handshake-grace read. The futures share one task, so the + // `on_dialed` installs never overlap. Inbound vs outbound never collide for + // the same peer (directionality: we dial only higher ids, accept only + // lower); the per-peer `owning_shard` CAS in `install_replica_conn` + // arbitrates the cross-shard inbound install race. + futures::future::join_all(dials).await; } -#[allow(clippy::future_not_send)] +#[allow(clippy::future_not_send, clippy::too_many_arguments)] async fn periodic_reconnect( bus: &Rc, cluster_id: u128, self_id: u8, peers: Vec<(u8, SocketAddr)>, + auth: Option, + handshake_grace: Duration, on_dialed: AcceptedReplicaFn, period: Duration, token: ShutdownToken, ) { while token.sleep_or_shutdown(period).await { - connect_all(bus, cluster_id, self_id, &peers, &on_dialed).await; + connect_all( + bus, + cluster_id, + self_id, + &peers, + auth.as_ref(), + handshake_grace, + &on_dialed, + ) + .await; } debug!("replica reconnect periodic task exiting"); } -/// Dial a single peer, send the plaintext `Ping` frame, and hand the -/// stream to `on_dialed` on success. Dial / write failures are logged -/// and swallowed; VSR tolerates missing peers and the periodic sweep -/// retries. -#[allow(clippy::future_not_send)] +/// Dial a single peer and hand the stream to `on_dialed` on success. +/// +/// With `auth = None` this sends a plaintext `ReplicaHello` exactly as before. +/// With `auth = Some` it runs the dialer half of the mutual MAC handshake +/// (`ReplicaHello`+`nonce_d` -> verify `ReplicaChallenge` -> `ReplicaFinish`+`mac_d`) +/// before delegating. The dialer holds the strictly lower id, the acceptor the +/// higher; the transcript binds `dialer_id = self_id`, `acceptor_id = peer_id`. +/// Dial, write, read, timeout, and MAC failures are logged and swallowed; VSR +/// tolerates missing peers and the periodic sweep retries. If the acceptor +/// answers with a nonzero `ReplicaChallenge` status (a reject), the reason is +/// logged and the dial is abandoned. +#[allow( + clippy::future_not_send, + clippy::similar_names, + clippy::too_many_arguments +)] async fn connect_one( cluster_id: u128, self_id: u8, peer_id: u8, addr: SocketAddr, + auth: Option<&ReplicaAuth>, + max_message_size: usize, + handshake_grace: Duration, on_dialed: &AcceptedReplicaFn, ) { let mut stream = match TcpStream::connect(addr).await { @@ -156,27 +218,167 @@ async fn connect_one( debug!(replica = peer_id, %addr, "set_nodelay failed: {e}"); } - let ping = build_ping_message(cluster_id, self_id); - if let Err(e) = framing::write_message(&mut stream, ping).await { - warn!(replica = peer_id, %addr, "handshake write failed: {e}"); - return; + // Bound the whole handshake leg (every write plus the ReplicaChallenge read) under one + // grace. A peer that accepts then stops reading would otherwise park a + // dialer write forever and wedge the inline shard-0 reconnect sweep. + // Mirrors the acceptor's handshake_read timeout; the OS connect stays + // outside, like the acceptor's accept. + match compio::time::timeout( + handshake_grace, + dial_handshake( + &mut stream, + cluster_id, + self_id, + peer_id, + addr, + auth, + max_message_size, + ), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(())) => return, + Err(_) => { + warn!(replica = peer_id, %addr, grace = ?handshake_grace, "handshake exceeded grace"); + return; + } } - info!(replica = peer_id, %addr, "dialed peer replica, delegating fd"); + info!( + replica = peer_id, + %addr, + authenticated = auth.is_some(), + "dialed peer replica, delegating fd" + ); on_dialed(stream, peer_id); } -/// Build a plaintext `Ping` frame announcing this replica's id and -/// `cluster_id`. No nonce, no timestamp, no MAC: cluster-secret -/// validation moves to the future `LOGIN_REPLICA` command in the caller. -fn build_ping_message(cluster_id: u128, replica_id: u8) -> Message { +/// Run the dialer handshake on an established stream. Logs and returns `Err(())` +/// on any failure; the caller abandons the dial and the periodic sweep retries. +/// With `auth = None` it sends a single plaintext `ReplicaHello`, exactly as before. +#[allow(clippy::future_not_send, clippy::similar_names)] +async fn dial_handshake( + stream: &mut TcpStream, + cluster_id: u128, + self_id: u8, + peer_id: u8, + addr: SocketAddr, + auth: Option<&ReplicaAuth>, + max_message_size: usize, +) -> Result<(), ()> { + let Some(auth) = auth else { + let hello = build_hello_message(cluster_id, self_id, None); + if let Err(e) = framing::write_message(stream, hello).await { + warn!(replica = peer_id, %addr, "handshake write failed: {e}"); + return Err(()); + } + return Ok(()); + }; + + let nonce_d = match auth::random_nonce() { + Ok(nonce) => nonce, + Err(e) => { + warn!(replica = peer_id, %addr, "nonce generation failed: {e}"); + return Err(()); + } + }; + let hello = build_hello_message(cluster_id, self_id, Some(&nonce_d)); + if let Err(e) = framing::write_message(stream, hello).await { + warn!(replica = peer_id, %addr, "handshake write failed: {e}"); + return Err(()); + } + + let challenge = match framing::read_message(stream, max_message_size).await { + Ok(msg) => msg, + Err(e) => { + warn!(replica = peer_id, %addr, "handshake read failed: {e}"); + return Err(()); + } + }; + if challenge.header().command != Command2::ReplicaChallenge { + warn!( + replica = peer_id, + %addr, + command = ?challenge.header().command, + "unexpected replica handshake response command" + ); + return Err(()); + } + // Read command + status before nonce/MAC: a reject (or garbage) status + // carries no valid nonce/MAC, so parsing them would be meaningless. + let status = auth::read_status(&challenge.header().reserved_command); + if status != auth::HandshakeStatus::Ok { + warn!( + replica = peer_id, + %addr, + reason = status.as_str(), + "peer rejected replica handshake" + ); + return Err(()); + } + let nonce_a = auth::read_nonce(&challenge.header().reserved_command); + let mac_a = auth::read_mac(&challenge.header().reserved_command); + let transcript = Transcript { + cluster_id, + dialer_id: self_id, + acceptor_id: peer_id, + nonce_d, + nonce_a, + }; + if !auth.verify_acceptor_mac(&transcript, &mac_a) { + warn!(replica = peer_id, %addr, "replica handshake MAC verification failed"); + return Err(()); + } + let mac_d = auth.dialer_mac(&transcript); + if let Err(e) = + framing::write_message(stream, build_finish_message(cluster_id, self_id, &mac_d)).await + { + warn!(replica = peer_id, %addr, "handshake finish write failed: {e}"); + return Err(()); + } + Ok(()) +} + +/// Build a `ReplicaHello` frame announcing this replica's id and `cluster_id`. +/// When `nonce_d` is set it is placed in `reserved_command[0..32]` to open +/// the authenticated handshake; otherwise the frame is the legacy plaintext +/// announce. +fn build_hello_message( + cluster_id: u128, + replica_id: u8, + nonce_d: Option<&[u8; auth::NONCE_LEN]>, +) -> Message { + #[allow(clippy::cast_possible_truncation)] + Message::::new(size_of::()).transmute_header( + |_, h: &mut GenericHeader| { + h.command = Command2::ReplicaHello; + h.cluster = cluster_id; + h.replica = replica_id; + h.size = HEADER_SIZE as u32; + if let Some(nonce) = nonce_d { + h.reserved_command[..auth::NONCE_LEN].copy_from_slice(nonce); + } + }, + ) +} + +/// Build the dialer's `ReplicaFinish` frame carrying `mac_d` in +/// `reserved_command[32..64]`. +fn build_finish_message( + cluster_id: u128, + replica_id: u8, + mac_d: &[u8; auth::MAC_LEN], +) -> Message { #[allow(clippy::cast_possible_truncation)] Message::::new(size_of::()).transmute_header( |_, h: &mut GenericHeader| { - h.command = Command2::Ping; + h.command = Command2::ReplicaFinish; h.cluster = cluster_id; h.replica = replica_id; h.size = HEADER_SIZE as u32; + h.reserved_command[auth::NONCE_LEN..auth::NONCE_LEN + auth::MAC_LEN] + .copy_from_slice(mac_d); }, ) } diff --git a/core/message_bus/src/replica/auth.rs b/core/message_bus/src/replica/auth.rs new file mode 100644 index 0000000000..ee05a3f9c7 --- /dev/null +++ b/core/message_bus/src/replica/auth.rs @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Replica-to-replica authentication primitives. +//! +//! All crypto for the PSK + BLAKE3 keyed-MAC handshake lives here so the +//! hot transport files (`listener`, `connector`) carry no cipher logic. +//! +//! The handshake is a 3-message mutual challenge-response riding the +//! already-zeroed `reserved_command` bytes of the 256-byte `GenericHeader`. +//! Each message has its own `Command2` discriminant (no Ping/Pong reuse): +//! +//! 1. `ReplicaHello` dialer -> acceptor: `nonce_d` (no MAC; the dialer has +//! no acceptor nonce yet); +//! 2. `ReplicaChallenge` acceptor -> dialer: a `status` byte, plus `nonce_a` + +//! `mac_a` when the status is `Ok` (a nonzero status is a reject, no nonce/MAC); +//! 3. `ReplicaFinish` dialer -> acceptor: `mac_d` (dialer direction). +//! +//! Each MAC is `keyed_hash(key, DOMAIN_TAG || cluster_id || dialer_id || +//! acceptor_id || nonce_d || nonce_a || dir)`. Binding both nonces, the +//! ordered peer pair, and a direction byte defeats replay and reflection. +//! The MAC key is a subkey derived from the configured shared secret, so +//! the raw secret never reaches the wire transcript. + +use blake3::Hash; +use iggy_binary_protocol::RESERVED_COMMAND_LEN; +use iggy_common::IggyError; +use ring::rand::{SecureRandom, SystemRandom}; +use zeroize::Zeroizing; + +/// Length of each handshake nonce, in bytes. +pub const NONCE_LEN: usize = 32; +/// Length of each handshake MAC, in bytes. +pub const MAC_LEN: usize = 32; +/// Offset of the `status` byte in a `ReplicaChallenge` frame's `reserved_command`. +/// +/// Past the nonce (`[0..32]`) and MAC (`[32..64]`) regions a successful +/// challenge uses. Byte `0` is [`HandshakeStatus::Ok`]; any nonzero value is a +/// reject reason, so a zeroed (`Ok`) challenge carries `nonce_a` + `mac_a` while +/// a reject carries the reason here and leaves the nonce/MAC regions zero. +pub const STATUS_OFFSET: usize = NONCE_LEN + MAC_LEN; + +/// Domain separation tag mixed into every MAC transcript. +const DOMAIN_TAG: &[u8] = b"apache-iggy replica-auth v1"; +/// Context for deriving the MAC subkey from the configured secret. +const KEY_CONTEXT: &str = "apache-iggy replica-auth v1 psk->mac-key"; +/// Direction byte for a MAC produced by the dialer. +const DIR_DIALER: u8 = 1; +/// Direction byte for a MAC produced by the acceptor. +const DIR_ACCEPTOR: u8 = 2; + +/// Stable domain-separation cluster id derived from the cluster name. +/// +/// Byte-identical across the roster because every node shares the same +/// `cluster.name`. It is a domain-separation tag, not the security gate +/// (the PSK is). +#[must_use] +pub fn cluster_domain_id(name: &str) -> u128 { + let digest = blake3::hash(name.as_bytes()); + let mut bytes = [0u8; 16]; + bytes.copy_from_slice(&digest.as_bytes()[..16]); + u128::from_le_bytes(bytes) +} + +/// Generate a fresh CSPRNG nonce. +/// +/// # Errors +/// +/// Returns [`IggyError::IoError`] if the system CSPRNG fails (a catastrophic +/// condition). The caller drops the handshake: the dialer's periodic sweep +/// retries it, the acceptor fails the inbound and the peer sees EOF. +pub fn random_nonce() -> Result<[u8; NONCE_LEN], IggyError> { + let mut nonce = [0u8; NONCE_LEN]; + SystemRandom::new() + .fill(&mut nonce) + .map_err(|_| IggyError::IoError("replica-auth CSPRNG failure".to_owned()))?; + Ok(nonce) +} + +/// Extract the nonce from `reserved_command[0..32]` of a handshake frame. +#[must_use] +pub fn read_nonce(reserved_command: &[u8; RESERVED_COMMAND_LEN]) -> [u8; NONCE_LEN] { + let mut nonce = [0u8; NONCE_LEN]; + nonce.copy_from_slice(&reserved_command[..NONCE_LEN]); + nonce +} + +/// Extract the MAC from `reserved_command[32..64]` of a handshake frame. +#[must_use] +pub fn read_mac(reserved_command: &[u8; RESERVED_COMMAND_LEN]) -> [u8; MAC_LEN] { + let mut mac = [0u8; MAC_LEN]; + mac.copy_from_slice(&reserved_command[NONCE_LEN..NONCE_LEN + MAC_LEN]); + mac +} + +/// Whether a frame's `reserved_command` carries a handshake nonce (i.e. the +/// peer speaks the authenticated protocol) versus an all-zero legacy frame. +#[must_use] +pub fn has_nonce(reserved_command: &[u8; RESERVED_COMMAND_LEN]) -> bool { + reserved_command[..NONCE_LEN] != [0u8; NONCE_LEN] +} + +/// Outcome of a replica handshake, carried as the `status` byte of a +/// `ReplicaChallenge` frame at [`STATUS_OFFSET`]. +/// +/// [`Self::Ok`] (byte `0`) means the challenge carries `nonce_a` + `mac_a`. A +/// nonzero reject reason is sent back only to an authenticated, still-waiting +/// dialer so the joining node learns the cause from its own logs instead of +/// seeing a bare connection close. [`Self::AuthRequired`] and +/// [`Self::MacMismatch`] are log-only labels: their peer is not reading a +/// response, so no frame is emitted for them and those bytes never reach the wire. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[repr(u8)] +pub enum HandshakeStatus { + /// Challenge accepted; `nonce_a` + `mac_a` follow. + Ok = 0, + /// A handshake frame carried the wrong command for its position: a non- + /// `ReplicaHello` first frame, or a non-`ReplicaFinish` finish frame. As a + /// `read_status` result it also covers byte 1 and any unrecognised byte. + UnknownCommand = 1, + /// Frame's cluster id did not match ours. + ClusterMismatch = 2, + /// Peer id was out of range or not strictly lower than the acceptor's id. + DirectionalRule = 3, + /// Enforcement is on and the peer sent no handshake nonce (log-only). + AuthRequired = 4, + /// The dialer's finish MAC did not verify (log-only). + MacMismatch = 5, +} + +impl HandshakeStatus { + /// Stable lowercase label for structured logs. + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Ok => "ok", + Self::UnknownCommand => "unknown_command", + Self::ClusterMismatch => "cluster_mismatch", + Self::DirectionalRule => "directional_rule", + Self::AuthRequired => "auth_required", + Self::MacMismatch => "mac_mismatch", + } + } +} + +/// Decode the `status` byte of a `ReplicaChallenge` response frame. +/// +/// Total: byte `0` is [`HandshakeStatus::Ok`]; `1..=5` map to their reason; any +/// other (garbage) byte is treated as [`HandshakeStatus::UnknownCommand`] so the +/// dialer rejects rather than mistaking it for success. Discriminants must stay +/// in sync with [`HandshakeStatus`]; the `status_round_trips` test enforces it. +#[must_use] +pub const fn read_status(reserved_command: &[u8; RESERVED_COMMAND_LEN]) -> HandshakeStatus { + match reserved_command[STATUS_OFFSET] { + 0 => HandshakeStatus::Ok, + 2 => HandshakeStatus::ClusterMismatch, + 3 => HandshakeStatus::DirectionalRule, + 4 => HandshakeStatus::AuthRequired, + 5 => HandshakeStatus::MacMismatch, + // 1 and every unrecognised byte: a reject the dialer must not read as Ok. + _ => HandshakeStatus::UnknownCommand, + } +} + +/// The fields bound into a handshake MAC. Identical on both peers. +pub struct Transcript { + pub cluster_id: u128, + pub dialer_id: u8, + pub acceptor_id: u8, + pub nonce_d: [u8; NONCE_LEN], + pub nonce_a: [u8; NONCE_LEN], +} + +/// Cluster-wide replica authentication context. +/// +/// Holds the derived MAC subkey (never the raw secret). Threaded as +/// `Option`; `None` means auth is disabled and the handshake stays +/// in legacy unauthenticated mode. `Some` means auth is enabled and enforced - +/// a peer that does not complete the handshake is rejected. +#[derive(Clone)] +pub struct ReplicaAuth { + key: Zeroizing<[u8; 32]>, +} + +impl ReplicaAuth { + /// Derive the MAC subkey from the configured secret material. + #[must_use] + pub fn new(secret_material: &[u8]) -> Self { + Self { + key: Zeroizing::new(blake3::derive_key(KEY_CONTEXT, secret_material)), + } + } + + /// MAC the acceptor sends in the `ReplicaChallenge` frame. + #[must_use] + pub fn acceptor_mac(&self, transcript: &Transcript) -> [u8; MAC_LEN] { + *self.mac(DIR_ACCEPTOR, transcript).as_bytes() + } + + /// MAC the dialer sends in the finish frame. + #[must_use] + pub fn dialer_mac(&self, transcript: &Transcript) -> [u8; MAC_LEN] { + *self.mac(DIR_DIALER, transcript).as_bytes() + } + + /// Verify the acceptor's MAC in constant time. + #[must_use] + pub fn verify_acceptor_mac(&self, transcript: &Transcript, received: &[u8; MAC_LEN]) -> bool { + self.mac(DIR_ACCEPTOR, transcript) == Hash::from_bytes(*received) + } + + /// Verify the dialer's MAC in constant time. + #[must_use] + pub fn verify_dialer_mac(&self, transcript: &Transcript, received: &[u8; MAC_LEN]) -> bool { + self.mac(DIR_DIALER, transcript) == Hash::from_bytes(*received) + } + + fn mac(&self, dir: u8, transcript: &Transcript) -> Hash { + let mut hasher = blake3::Hasher::new_keyed(&self.key); + hasher.update(DOMAIN_TAG); + hasher.update(&transcript.cluster_id.to_le_bytes()); + hasher.update(&[transcript.dialer_id, transcript.acceptor_id]); + hasher.update(&transcript.nonce_d); + hasher.update(&transcript.nonce_a); + hasher.update(&[dir]); + hasher.finalize() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const SECRET: &[u8] = b"0123456789abcdef0123456789abcdef"; + + fn transcript() -> Transcript { + Transcript { + cluster_id: 0xDEAD_BEEF, + dialer_id: 3, + acceptor_id: 1, + nonce_d: [0x11; NONCE_LEN], + nonce_a: [0x22; NONCE_LEN], + } + } + + #[test] + fn macs_round_trip() { + let auth = ReplicaAuth::new(SECRET); + let t = transcript(); + assert!(auth.verify_acceptor_mac(&t, &auth.acceptor_mac(&t))); + assert!(auth.verify_dialer_mac(&t, &auth.dialer_mac(&t))); + } + + #[test] + fn wrong_key_is_rejected() { + let signer = ReplicaAuth::new(SECRET); + let attacker = ReplicaAuth::new(b"ffffffffffffffffffffffffffffffff"); + let t = transcript(); + assert!(!attacker.verify_dialer_mac(&t, &signer.dialer_mac(&t))); + } + + #[test] + fn direction_is_not_reflectable() { + // The acceptor's MAC must not validate as a dialer MAC (reflection). + let auth = ReplicaAuth::new(SECRET); + let t = transcript(); + assert!(!auth.verify_dialer_mac(&t, &auth.acceptor_mac(&t))); + assert!(!auth.verify_acceptor_mac(&t, &auth.dialer_mac(&t))); + } + + #[test] + fn changed_nonce_is_rejected() { + // A captured MAC cannot be replayed against a fresh nonce. + let auth = ReplicaAuth::new(SECRET); + let t = transcript(); + let mac = auth.dialer_mac(&t); + let replayed = Transcript { + nonce_a: [0x33; NONCE_LEN], + ..transcript() + }; + assert!(!auth.verify_dialer_mac(&replayed, &mac)); + } + + #[test] + fn swapped_peer_pair_is_rejected() { + let auth = ReplicaAuth::new(SECRET); + let t = transcript(); + let mac = auth.dialer_mac(&t); + let swapped = Transcript { + dialer_id: t.acceptor_id, + acceptor_id: t.dialer_id, + ..transcript() + }; + assert!(!auth.verify_dialer_mac(&swapped, &mac)); + } + + #[test] + fn cluster_domain_id_is_stable_and_distinct() { + assert_eq!( + cluster_domain_id("iggy-cluster"), + cluster_domain_id("iggy-cluster") + ); + assert_ne!( + cluster_domain_id("iggy-cluster"), + cluster_domain_id("other") + ); + } + + #[test] + fn random_nonces_differ() { + assert_ne!(random_nonce().unwrap(), random_nonce().unwrap()); + } + + #[test] + fn status_round_trips() { + const ALL: [HandshakeStatus; 6] = [ + HandshakeStatus::Ok, + HandshakeStatus::UnknownCommand, + HandshakeStatus::ClusterMismatch, + HandshakeStatus::DirectionalRule, + HandshakeStatus::AuthRequired, + HandshakeStatus::MacMismatch, + ]; + for status in ALL { + let mut reserved = [0u8; RESERVED_COMMAND_LEN]; + // Mirror the on-wire write path (build_challenge_message reject). + reserved[STATUS_OFFSET] = status as u8; + assert_eq!(read_status(&reserved), status); + } + } + + #[test] + fn ok_status_reads_ok() { + // nonce[0..32] + mac[32..64] filled, the status byte stays zero. + let mut reserved = [0u8; RESERVED_COMMAND_LEN]; + reserved[..NONCE_LEN].fill(0xAB); + reserved[NONCE_LEN..NONCE_LEN + MAC_LEN].fill(0xCD); + assert_eq!(read_status(&reserved), HandshakeStatus::Ok); + } + + #[test] + fn unknown_status_byte_reads_reject() { + // A garbage status byte must NOT read as Ok (else the dialer would parse + // a bogus MAC). It collapses to a reject. + let mut reserved = [0u8; RESERVED_COMMAND_LEN]; + reserved[STATUS_OFFSET] = 200; + assert_ne!(read_status(&reserved), HandshakeStatus::Ok); + } +} diff --git a/core/message_bus/src/replica/io.rs b/core/message_bus/src/replica/io.rs index 0a320b7848..b3663bed20 100644 --- a/core/message_bus/src/replica/io.rs +++ b/core/message_bus/src/replica/io.rs @@ -42,6 +42,7 @@ use iggy_common::IggyError; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use crate::connector::start as start_connector; +use crate::replica::auth::ReplicaAuth; use crate::replica::listener::{bind as bind_replica_listener, run as run_replica_listener}; use crate::transports::quic::server_config_with_cert; use crate::transports::tls::{TlsServerCredentials, install_default_crypto_provider}; @@ -180,6 +181,7 @@ pub async fn start_on_shard_zero( cluster_id: u128, self_id: u8, replica_count: u8, + auth: Option, peers: Vec<(u8, SocketAddr)>, on_accepted_replica: AcceptedReplicaFn, on_accepted_client: AcceptedClientFn, @@ -213,6 +215,7 @@ pub async fn start_on_shard_zero( let on_accepted_replica_for_listener = on_accepted_replica.clone(); let listener_max_message_size = bus.config().max_message_size; let listener_handshake_grace = bus.config().handshake_grace; + let auth_for_listener = auth.clone(); let replica_handle = compio::runtime::spawn(async move { run_replica_listener( replica_listener, @@ -220,6 +223,7 @@ pub async fn start_on_shard_zero( cluster_id, self_id, replica_count, + auth_for_listener, on_accepted_replica_for_listener, listener_max_message_size, listener_handshake_grace, @@ -326,6 +330,8 @@ pub async fn start_on_shard_zero( cluster_id, self_id, peers, + auth, + bus.config().handshake_grace, on_accepted_replica, reconnect_period, ) @@ -380,6 +386,7 @@ pub async fn start_on_shard_zero_default( cluster_id, self_id, replica_count, + None, peers, on_accepted_replica, on_accepted_client, diff --git a/core/message_bus/src/replica/listener.rs b/core/message_bus/src/replica/listener.rs index 4f8b5bdd74..c87e06122a 100644 --- a/core/message_bus/src/replica/listener.rs +++ b/core/message_bus/src/replica/listener.rs @@ -29,7 +29,7 @@ //! datagram-oriented or gateway-terminated transport violates one or //! more of those assumptions. //! -//! Runs only on shard 0. On every accepted `Ping` frame the listener +//! Runs only on shard 0. On every accepted `ReplicaHello` frame the listener //! hands the accepted `TcpStream` to an `on_accepted` callback provided //! by the shard bootstrap, which dup-and-ships the fd to the owning //! shard via the inter-shard channel @@ -40,55 +40,53 @@ //! only dials peers with strictly greater ids and only accepts inbound //! from peers with strictly lower ids. No race, no tiebreaker. //! -//! # Trust boundary (regression vs prior MAC) +//! # Trust boundary (PSK handshake) //! -//! The current listener has NO transport-layer authentication. The -//! `Ping` frame announces the dialing replica's id in plaintext; the -//! bus uses it to key the registry and to enforce the directional rule -//! (only inbound from peers with strictly lower ids), but cannot -//! cryptographically verify the announced id. This is a hard regression -//! versus the previous BLAKE3-keyed MAC over `Ping::reserved_command` -//! bytes paired with a per-peer nonce ring (see TODO at the end of this -//! module preamble). An attacker on the wire who learns the cluster id -//! can register as any peer with a smaller replica id and feed forged -//! consensus traffic until `server-ng` rejects it at the application -//! layer. +//! When `cluster.auth.enabled` is set the listener runs the acceptor half of a +//! 3-message mutual BLAKE3 keyed-MAC handshake over the `reserved_command` +//! bytes (see [`crate::replica::auth`]) and REJECTS any peer that does not +//! complete it. The MAC proves the peer possesses the cluster PSK: it +//! authenticates cluster MEMBERSHIP, not per-replica identity. The peer id that +//! keys the registry is the announced `ReplicaHello` header byte. It is folded +//! into the keyed MAC (a dialer cannot MAC one id and announce another), but the +//! MAC key is a single cluster-wide PSK-derived subkey, not a per-replica key, +//! so any PSK holder can mint a valid MAC for any (smaller) replica id - there +//! is no anti-Sybil guarantee. With auth disabled the listener stays +//! in legacy mode: the `ReplicaHello` id is trusted unverified. //! -//! Until `LOGIN_REPLICA` lands and re-establishes per-peer mutual -//! authentication, operators MUST deploy the replica port on a trusted -//! L2 boundary (cluster-local VPC, dedicated private subnet, encrypted -//! overlay such as `WireGuard`, or an air-gapped management network). -//! Treating "no public exposure of the replica port" as the only gate -//! is the supported configuration; do NOT assume any authentication -//! beyond that boundary. +//! Enabling auth is a coordinated-restart change, and not the only one: the +//! consensus `cluster_id` is derived from the cluster name unconditionally +//! (`auth::cluster_domain_id`, was a constant), so a mixed-version roster fails +//! to connect regardless of the auth setting. Flip every node in one restart. //! -//! Identity is established post-handshake by the caller (`server-ng`) -//! via the future `LOGIN_REPLICA` command that carries the cluster's -//! shared secret. Until that command succeeds the caller MUST treat the -//! replica as unauthenticated and refuse to honor consensus messages -//! from it. +//! Because the PSK gates membership and not identity, an on-wire attacker who +//! learns the cluster id (auth off) OR an insider key-holder (auth on) can +//! register as any peer with a smaller replica id, so operators MUST deploy the +//! replica port on a trusted L2 boundary (cluster-local VPC, dedicated private +//! subnet, encrypted overlay such as `WireGuard`, or an air-gapped management +//! network). //! -//! TLS / encryption is NOT provided here: the trusted-boundary -//! deployment requirement above implicitly assumes the operator -//! supplies link-level confidentiality (VPC-level encryption, overlay -//! tunnel, or physical isolation). +//! TLS / encryption is NOT provided here: the handshake authenticates the +//! peer but the data stream stays plaintext (so the fd remains dupable for +//! cross-shard delegation). On-wire confidentiality, if required, comes from +//! out-of-band link encryption, and on-wire frame integrity from the +//! follow-up per-frame session-key MAC. // -// TODO(hubcio): the prior BLAKE3-keyed MAC over the `Ping` frame's -// `reserved_command` bytes plus the per-peer nonce ring used to gate -// the registry insert at the transport layer. That gate is gone and -// `LOGIN_REPLICA` is not yet implemented, so the listener currently -// accepts any peer that knows the cluster id and directional rule. -// Restore the transport-layer MAC, or land `LOGIN_REPLICA`, before -// relying on the network boundary alone. +// TODO(hubcio): follow-ups - dual-key rotation acceptance window, and the +// per-frame session-key MAC over installed frame headers that closes the +// MITM-inject / panic-DoS vector. use crate::framing; use crate::lifecycle::ShutdownToken; +use crate::replica::auth::{self, HandshakeStatus, ReplicaAuth, Transcript}; use crate::socket_opts::bind_reusable_tcp_listener; use crate::{AcceptedReplicaFn, GenericHeader, Message}; use compio::net::{TcpListener, TcpStream}; use futures::FutureExt; -use iggy_binary_protocol::Command2; +use iggy_binary_protocol::{Command2, HEADER_SIZE}; use iggy_common::IggyError; +use std::cell::Cell; +use std::mem::size_of; use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; @@ -116,11 +114,20 @@ pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyErr Ok((listener, actual)) } +/// Hard cap on concurrently in-flight inbound handshakes. A legitimate dialer +/// set is bounded by `self_id` (< `replica_count` <= 255), so the cap only +/// sheds load under a hostile flood; it bounds fd / memory without ever +/// stalling the accept loop. +const MAX_INFLIGHT_HANDSHAKES: usize = 256; + /// Run the inbound replica listener accept loop until the shutdown token fires. /// -/// Every accepted connection that passes the plaintext `Ping` frame -/// check fires the `on_accepted` callback; the callback owns the -/// accepted stream from that point on. +/// With `auth = None`, every accepted connection that passes the plaintext +/// `ReplicaHello` frame check (command, cluster, directional rule) fires the +/// `on_accepted` callback. With `auth = Some` the connection must additionally +/// complete the mutual MAC handshake; a missing nonce, a non-`ReplicaFinish` +/// finish frame, a failed dialer-MAC, or a grace timeout drops it without +/// firing the callback. Once fired, the callback owns the accepted stream. /// /// Each [`TcpStream`] returned by `accept()` is handed to its own /// spawned task that runs the `handshake_read` step under @@ -135,11 +142,14 @@ pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyErr /// threaded from [`crate::MessageBusConfig::handshake_grace`] by the /// bootstrap caller. /// -/// Completed handshake handles are reaped opportunistically at the top -/// of the loop so the in-flight handle vector stays bounded for the -/// listener's lifetime; remaining handles are awaited on shutdown so -/// in-progress handshakes get a chance to finish (or hit the grace -/// timeout) before the listener fully exits. +/// Each spawned handshake task is fire-and-forget (`detach`): compio 0.19's +/// `JoinHandle` exposes no `is_finished`, so there is no handle vector to +/// reap. Two bounds keep it safe: each task is capped by `handshake_grace`, +/// and the number of concurrently in-flight tasks is capped by +/// [`MAX_INFLIGHT_HANDSHAKES`] (excess inbound is dropped). On shutdown the +/// accept loop exits; each detached task owns its accepted stream (not the +/// listener), so it self-terminates within `handshake_grace`, or is torn down +/// with the runtime if still in flight. #[allow(clippy::future_not_send, clippy::too_many_arguments)] pub async fn run( listener: TcpListener, @@ -147,6 +157,7 @@ pub async fn run( cluster_id: u128, self_id: u8, replica_count: u8, + auth: Option, on_accepted: AcceptedReplicaFn, max_message_size: usize, handshake_grace: Duration, @@ -155,6 +166,7 @@ pub async fn run( "Replica listener accepting on {:?}", listener.local_addr().ok() ); + let inflight = Rc::new(Cell::new(0usize)); loop { futures::select! { () = token.wait().fuse() => { @@ -164,13 +176,28 @@ pub async fn run( result = listener.accept().fuse() => { match result { Ok((stream, peer_addr)) => { + if inflight.get() >= MAX_INFLIGHT_HANDSHAKES { + warn!( + %peer_addr, + cap = MAX_INFLIGHT_HANDSHAKES, + "replica handshake in-flight cap reached; dropping inbound" + ); + continue; + } let on_accepted = on_accepted.clone(); + let auth = auth.clone(); // Fire-and-forget: compio 0.19's `JoinHandle` has no - // `is_finished`, so the prior bounded-handle sweep is - // gone. `detach()` lets each handshake task self-manage; - // on shutdown the listener drops, failing in-flight - // handshakes so detached tasks exit. + // `is_finished`, so there is no handle vector to reap. + // `detach()` lets each handshake task self-manage; the + // `InflightGuard` decrements the shared counter on every + // exit path, keeping the `MAX_INFLIGHT_HANDSHAKES` cap + // accurate. Detached tasks own their accepted stream, not + // the listener: each is bounded by `handshake_grace` and + // torn down with the runtime if still in flight. + inflight.set(inflight.get() + 1); + let guard = InflightGuard(Rc::clone(&inflight)); compio::runtime::spawn(async move { + let _guard = guard; let mut stream = stream; let res = compio::time::timeout( handshake_grace, @@ -180,6 +207,7 @@ pub async fn run( self_id, replica_count, max_message_size, + auth.as_ref(), ), ) .await; @@ -210,30 +238,215 @@ pub async fn run( } } -/// Read the 256 B `Ping` frame, enforce command + cluster match and the -/// directional rule, return the announced replica id. No transport-level -/// authentication. -#[allow(clippy::future_not_send)] +/// Decrements the shared in-flight handshake counter when the spawned task +/// ends, on any exit path (success, error, or grace timeout). Pairs with the +/// pre-spawn increment in [`run`] to keep [`MAX_INFLIGHT_HANDSHAKES`] accurate. +struct InflightGuard(Rc>); + +impl Drop for InflightGuard { + fn drop(&mut self) { + self.0.set(self.0.get() - 1); + } +} + +/// Read the 256 B `ReplicaHello` frame, enforce command + cluster match and the +/// directional rule, then (when `auth` is configured) run the acceptor half +/// of the mutual MAC handshake and return the cryptographically verified +/// peer id. With `auth = None` the listener stays in legacy mode and returns +/// the announced id unverified. +/// +/// The dialer (the peer) holds the strictly lower id; this acceptor holds the +/// higher id (see the directional rule). The transcript therefore binds +/// `dialer_id = peer_id`, `acceptor_id = self_id`. +/// +/// On a rejection an authenticated, still-waiting dialer is answered with a +/// nonzero-status [`build_challenge_message`] (see [`reject`]) so it learns the +/// cause from its own logs rather than seeing a bare connection close. +#[allow(clippy::future_not_send, clippy::similar_names)] async fn handshake_read( stream: &mut TcpStream, our_cluster: u128, self_id: u8, replica_count: u8, max_message_size: usize, + auth: Option<&ReplicaAuth>, ) -> Result { let msg = framing::read_message(stream, max_message_size).await?; let header = msg.header(); - if header.command != Command2::Ping { - return Err(IggyError::InvalidCommand); + let peer_id = header.replica; + let has_nonce = auth::has_nonce(&header.reserved_command); + // Only a peer that sent a nonce speaks the authenticated protocol and is + // waiting to read our response; a legacy (no-nonce) dialer delegates its + // fd without reading, so a reject frame would land in its VSR reader instead. + let nackable = auth.is_some() && has_nonce; + + if header.command != Command2::ReplicaHello { + return reject( + stream, + our_cluster, + self_id, + peer_id, + HandshakeStatus::UnknownCommand, + nackable, + ) + .await; } if header.cluster != our_cluster { - return Err(IggyError::InvalidCommand); + return reject( + stream, + our_cluster, + self_id, + peer_id, + HandshakeStatus::ClusterMismatch, + nackable, + ) + .await; } // Directional rule: a replica only accepts inbound from peers with // strictly lower ids. The peer is responsible for not dialing us if // it has the higher id; this is just defensive. - if header.replica >= replica_count || header.replica >= self_id { - return Err(IggyError::InvalidCommand); + if peer_id >= replica_count || peer_id >= self_id { + return reject( + stream, + our_cluster, + self_id, + peer_id, + HandshakeStatus::DirectionalRule, + nackable, + ) + .await; + } + + let Some(auth) = auth else { + return Ok(peer_id); + }; + + if !has_nonce { + // Auth is enabled and enforced: a legacy (no-nonce) peer is rejected. + // No reject frame: a legacy dialer delegates its fd without reading a + // response. + return reject( + stream, + our_cluster, + self_id, + peer_id, + HandshakeStatus::AuthRequired, + false, + ) + .await; + } + + let nonce_d = auth::read_nonce(&header.reserved_command); + let nonce_a = auth::random_nonce()?; + let transcript = Transcript { + cluster_id: our_cluster, + dialer_id: peer_id, + acceptor_id: self_id, + nonce_d, + nonce_a, + }; + let mac_a = auth.acceptor_mac(&transcript); + framing::write_message( + stream, + build_challenge_message( + our_cluster, + self_id, + HandshakeStatus::Ok, + Some((&nonce_a, &mac_a)), + ), + ) + .await?; + + let finish = framing::read_message(stream, max_message_size).await?; + // Past this point the dialer has delegated its fd (or bailed on `mac_a`) and + // is no longer reading, so every reject here is log-only (no frame). + // Check the command before the MAC: the finish frame is identified by its + // own discriminant, not by handshake position. + if finish.header().command != Command2::ReplicaFinish { + return reject( + stream, + our_cluster, + self_id, + peer_id, + HandshakeStatus::UnknownCommand, + false, + ) + .await; + } + let mac_d = auth::read_mac(&finish.header().reserved_command); + if !auth.verify_dialer_mac(&transcript, &mac_d) { + return reject( + stream, + our_cluster, + self_id, + peer_id, + HandshakeStatus::MacMismatch, + false, + ) + .await; + } + Ok(peer_id) +} + +/// Log a rejected handshake with its `reason` and, when `nack` is set, send a +/// best-effort reject [`build_challenge_message`] (a `ReplicaChallenge` with a +/// nonzero status and no nonce/MAC) so an authenticated, still-waiting dialer +/// learns the cause. `nack` is false for legacy (no-nonce) and post-challenge +/// rejects, whose peer is not reading a response (a frame would reach its VSR +/// reader instead). Always returns `Err` so callers `return`. +#[allow(clippy::future_not_send, clippy::similar_names)] +async fn reject( + stream: &mut TcpStream, + cluster_id: u128, + self_id: u8, + peer_id: u8, + reason: HandshakeStatus, + nack: bool, +) -> Result { + warn!( + replica = peer_id, + reason = reason.as_str(), + "rejecting replica handshake" + ); + if nack + && let Err(e) = framing::write_message( + stream, + build_challenge_message(cluster_id, self_id, reason, None), + ) + .await + { + debug!( + replica = peer_id, + "failed to send replica handshake reject: {e}" + ); } - Ok(header.replica) + Err(IggyError::InvalidCommand) +} + +/// Build the acceptor's `ReplicaChallenge` frame. `status` goes at +/// [`auth::STATUS_OFFSET`] ([`HandshakeStatus::Ok`] writes the implicit zero). +/// On success pass `Some((nonce_a, mac_a))`, placed at `reserved_command[0..32]` +/// and `[32..64]`; on a reject pass `None` (no nonce/MAC). The three disjoint +/// regions (nonce / MAC / status) never overlap. +fn build_challenge_message( + cluster_id: u128, + replica_id: u8, + status: HandshakeStatus, + nonce_mac: Option<(&[u8; auth::NONCE_LEN], &[u8; auth::MAC_LEN])>, +) -> Message { + #[allow(clippy::cast_possible_truncation)] + Message::::new(size_of::()).transmute_header( + |_, h: &mut GenericHeader| { + h.command = Command2::ReplicaChallenge; + h.cluster = cluster_id; + h.replica = replica_id; + h.size = HEADER_SIZE as u32; + h.reserved_command[auth::STATUS_OFFSET] = status as u8; + if let Some((nonce_a, mac_a)) = nonce_mac { + h.reserved_command[..auth::NONCE_LEN].copy_from_slice(nonce_a); + h.reserved_command[auth::NONCE_LEN..auth::NONCE_LEN + auth::MAC_LEN] + .copy_from_slice(mac_a); + } + }, + ) } diff --git a/core/message_bus/src/replica/mod.rs b/core/message_bus/src/replica/mod.rs index a9dd182aa7..665eca0ba2 100644 --- a/core/message_bus/src/replica/mod.rs +++ b/core/message_bus/src/replica/mod.rs @@ -23,5 +23,6 @@ //! shard-0 bootstrap (which also binds the SDK client listeners) live //! in [`io`]. +pub mod auth; pub mod io; pub mod listener; diff --git a/core/message_bus/tests/backpressure.rs b/core/message_bus/tests/backpressure.rs index 8186a1ae1f..1fc68977ce 100644 --- a/core/message_bus/tests/backpressure.rs +++ b/core/message_bus/tests/backpressure.rs @@ -60,6 +60,7 @@ async fn try_send_returns_backpressure_when_queue_full() { CLUSTER, 1, 2, + None, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -76,6 +77,8 @@ async fn try_send_returns_backpressure_when_queue_full() { CLUSTER, 0, vec![(1, addr1)], + None, + Duration::from_secs(5), dial_0, DEFAULT_RECONNECT_PERIOD, ) diff --git a/core/message_bus/tests/connection_lost_notify.rs b/core/message_bus/tests/connection_lost_notify.rs index 033330d5bf..49a63af5c9 100644 --- a/core/message_bus/tests/connection_lost_notify.rs +++ b/core/message_bus/tests/connection_lost_notify.rs @@ -62,6 +62,7 @@ async fn connection_lost_fires_exactly_once_per_peer_disconnect() { CLUSTER, 1, 2, + None, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -76,6 +77,8 @@ async fn connection_lost_fires_exactly_once_per_peer_disconnect() { CLUSTER, 0, vec![(1u8, addr1)], + None, + Duration::from_secs(5), dial_0, DEFAULT_RECONNECT_PERIOD, ) diff --git a/core/message_bus/tests/directional_connection.rs b/core/message_bus/tests/directional_connection.rs index dbd96cdce8..8214e70515 100644 --- a/core/message_bus/tests/directional_connection.rs +++ b/core/message_bus/tests/directional_connection.rs @@ -51,6 +51,7 @@ async fn lower_id_dials_higher_id_accepts() { CLUSTER, 0, 2, + None, accept_0, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -68,6 +69,7 @@ async fn lower_id_dials_higher_id_accepts() { CLUSTER, 1, 2, + None, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -85,12 +87,24 @@ async fn lower_id_dials_higher_id_accepts() { CLUSTER, 0, peers.clone(), + None, + Duration::from_secs(5), dial_0, DEFAULT_RECONNECT_PERIOD, ) .await; let dial_1 = install_replicas_locally(bus1.clone(), on_message.clone()); - start_connector(&bus1, CLUSTER, 1, peers, dial_1, DEFAULT_RECONNECT_PERIOD).await; + start_connector( + &bus1, + CLUSTER, + 1, + peers, + None, + Duration::from_secs(5), + dial_1, + DEFAULT_RECONNECT_PERIOD, + ) + .await; // Wait for the directional connection to settle. let deadline = std::time::Instant::now() + Duration::from_secs(2); diff --git a/core/message_bus/tests/head_of_line.rs b/core/message_bus/tests/head_of_line.rs index a6ad9507b5..7ef1caeb1c 100644 --- a/core/message_bus/tests/head_of_line.rs +++ b/core/message_bus/tests/head_of_line.rs @@ -56,6 +56,7 @@ async fn slow_peer_does_not_block_other_peers() { CLUSTER, 1, 3, + None, accept_a, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -90,6 +91,8 @@ async fn slow_peer_does_not_block_other_peers() { CLUSTER, 0, vec![(1, addr_a), (2, addr_b)], + None, + Duration::from_secs(5), dial_delegate, DEFAULT_RECONNECT_PERIOD, ) diff --git a/core/message_bus/tests/reconnect.rs b/core/message_bus/tests/reconnect.rs index 7e250f301e..8508044012 100644 --- a/core/message_bus/tests/reconnect.rs +++ b/core/message_bus/tests/reconnect.rs @@ -43,7 +43,17 @@ async fn periodic_retry_picks_up_late_listener() { let on_message: MessageHandler = Rc::new(|_, _| {}); let period = Duration::from_millis(100); let dial_delegate = install_replicas_locally(bus0.clone(), on_message.clone()); - start_connector(&bus0, CLUSTER, 0, vec![(1, addr)], dial_delegate, period).await; + start_connector( + &bus0, + CLUSTER, + 0, + vec![(1, addr)], + None, + Duration::from_secs(5), + dial_delegate, + period, + ) + .await; assert!(!bus0.replicas().contains(1), "first connect should fail"); // Bring bus 1 online on the same address. @@ -58,6 +68,7 @@ async fn periodic_retry_picks_up_late_listener() { CLUSTER, 1, 2, + None, accept_delegate, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), diff --git a/core/message_bus/tests/reconnect_skip_connected.rs b/core/message_bus/tests/reconnect_skip_connected.rs index 4b81d680b6..c5aeae9caf 100644 --- a/core/message_bus/tests/reconnect_skip_connected.rs +++ b/core/message_bus/tests/reconnect_skip_connected.rs @@ -57,6 +57,7 @@ async fn periodic_reconnect_skips_already_connected_peer() { CLUSTER, 1, 2, + None, accept_delegate, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -70,7 +71,17 @@ async fn periodic_reconnect_skips_already_connected_peer() { // missing, bus1's listener will see N extra accepts. let period = Duration::from_millis(50); let dial_delegate = install_replicas_locally(bus0.clone(), on_message.clone()); - start_connector(&bus0, CLUSTER, 0, vec![(1u8, addr1)], dial_delegate, period).await; + start_connector( + &bus0, + CLUSTER, + 0, + vec![(1u8, addr1)], + None, + Duration::from_secs(5), + dial_delegate, + period, + ) + .await; // Wait for the initial connection to settle on both sides. let deadline = std::time::Instant::now() + Duration::from_secs(2); diff --git a/core/message_bus/tests/replica_auth_handshake.rs b/core/message_bus/tests/replica_auth_handshake.rs new file mode 100644 index 0000000000..cbd419136d --- /dev/null +++ b/core/message_bus/tests/replica_auth_handshake.rs @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end replica auth handshake over real loopback TCP. The dialer +//! holds the lower id and dials the higher-id acceptor. The MAC algebra +//! (reflection, replay, swapped pair) is unit-tested in +//! `message_bus::replica::auth`; here we assert the full 3-message exchange +//! gates the registry insert: only a peer that proves possession of the +//! cluster PSK is installed (membership, not per-replica identity). + +mod common; + +use common::{install_replicas_locally, loopback}; +use compio::net::TcpStream; +use iggy_binary_protocol::{Command2, GenericHeader, HEADER_SIZE}; +use iggy_common::IggyError; +use message_bus::IggyMessageBus; +use message_bus::connector::start as start_connector; +use message_bus::framing::{self, MAX_MESSAGE_SIZE}; +use message_bus::replica::auth::{self, HandshakeStatus, ReplicaAuth}; +use message_bus::replica::listener::{MessageHandler, bind, run}; +use server_common::Message; +use std::rc::Rc; +use std::time::{Duration, Instant}; + +const CLUSTER: u128 = 0xCAFE; +const SECRET_A: &[u8] = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; +const SECRET_B: &[u8] = b"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"; +const LONG_PERIOD: Duration = Duration::from_secs(30); +const GRACE: Duration = Duration::from_secs(10); + +fn noop_handler() -> MessageHandler { + Rc::new(|_peer, _msg| {}) +} + +/// Spawn replica 1's inbound listener with the given optional auth, returning +/// its bound address. Replica 1 is the acceptor (higher id) of the pair. +#[allow(clippy::future_not_send)] +async fn spawn_acceptor( + bus: &Rc, + auth: Option, +) -> std::net::SocketAddr { + let (listener, addr) = bind(loopback()).await.expect("bind acceptor"); + let token = bus.token(); + let delegate = install_replicas_locally(bus.clone(), noop_handler()); + let handle = compio::runtime::spawn(async move { + run( + listener, + token, + CLUSTER, + 1, + 2, + auth, + delegate, + message_bus::framing::MAX_MESSAGE_SIZE, + GRACE, + ) + .await; + }); + bus.track_background(handle); + addr +} + +#[compio::test] +async fn authenticated_handshake_registers_verified_peer() { + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, Some(ReplicaAuth::new(SECRET_A))).await; + + let dialer = Rc::new(IggyMessageBus::new(0)); + start_connector( + &dialer, + CLUSTER, + 0, + vec![(1, addr)], + Some(ReplicaAuth::new(SECRET_A)), + GRACE, + install_replicas_locally(dialer.clone(), noop_handler()), + LONG_PERIOD, + ) + .await; + + // Both ends register only after the mutual MAC verifies. + wait_until(|| dialer.replicas().contains(1), Duration::from_secs(2)).await; + wait_until(|| acceptor.replicas().contains(0), Duration::from_secs(2)).await; + + dialer.shutdown(Duration::from_secs(2)).await; + acceptor.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn wrong_key_rejects_peer() { + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, Some(ReplicaAuth::new(SECRET_B))).await; + + let dialer = Rc::new(IggyMessageBus::new(0)); + start_connector( + &dialer, + CLUSTER, + 0, + vec![(1, addr)], + Some(ReplicaAuth::new(SECRET_A)), + GRACE, + install_replicas_locally(dialer.clone(), noop_handler()), + LONG_PERIOD, + ) + .await; + + // Dialer rejects the acceptor's MAC; neither side completes the exchange. + settle().await; + assert!( + !dialer.replicas().contains(1), + "dialer must reject wrong key" + ); + assert!( + !acceptor.replicas().contains(0), + "acceptor must not install an unfinished peer" + ); + + dialer.shutdown(Duration::from_secs(2)).await; + acceptor.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn enforcement_rejects_legacy_peer() { + // Acceptor requires auth; dialer speaks the legacy (no-nonce) protocol. + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, Some(ReplicaAuth::new(SECRET_A))).await; + + let dialer = Rc::new(IggyMessageBus::new(0)); + start_connector( + &dialer, + CLUSTER, + 0, + vec![(1, addr)], + None, + GRACE, + install_replicas_locally(dialer.clone(), noop_handler()), + LONG_PERIOD, + ) + .await; + + settle().await; + assert!( + !acceptor.replicas().contains(0), + "enforcement must reject an unauthenticated peer" + ); + + dialer.shutdown(Duration::from_secs(2)).await; + acceptor.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn dialer_times_out_when_acceptor_sends_no_challenge() { + // Authenticated dialer vs a legacy acceptor that never answers with a + // ReplicaChallenge: the dialer's handshake_grace fires and it drops without + // registering. Exercises the mid-handshake timeout path. + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, None).await; + + let dialer = Rc::new(IggyMessageBus::new(0)); + let short_grace = Duration::from_millis(200); + start_connector( + &dialer, + CLUSTER, + 0, + vec![(1, addr)], + Some(ReplicaAuth::new(SECRET_A)), + short_grace, + install_replicas_locally(dialer.clone(), noop_handler()), + LONG_PERIOD, + ) + .await; + + compio::time::sleep(Duration::from_millis(700)).await; + assert!( + !dialer.replicas().contains(1), + "dialer must time out waiting for the ReplicaChallenge and not register" + ); + + dialer.shutdown(Duration::from_secs(2)).await; + acceptor.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn acceptor_nacks_authenticated_dialer_on_cluster_mismatch() { + // An authenticated dialer (sends a nonce) with the WRONG cluster id is + // rejected before the acceptor's success ReplicaChallenge. Because it is a + // waiting authenticated peer, the acceptor answers with a typed Nack (a + // ReplicaChallenge carrying a nonzero status) instead of a bare close. + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, Some(ReplicaAuth::new(SECRET_A))).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + let nonce = auth::random_nonce().expect("nonce"); + framing::write_message(&mut stream, build_hello(0xDEAD, 0, Some(&nonce))) + .await + .expect("write hello"); + + let resp = framing::read_message(&mut stream, MAX_MESSAGE_SIZE) + .await + .expect("read reject"); + assert_eq!(resp.header().command, Command2::ReplicaChallenge); + assert_eq!( + auth::read_status(&resp.header().reserved_command), + HandshakeStatus::ClusterMismatch, + ); + + acceptor.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn no_nack_to_unauthenticated_peer() { + // Enforcement on; a legacy (no-nonce) ReplicaHello is rejected. Such a dialer + // delegates its fd without reading, so the acceptor must NOT emit a Nack + // (it would land in the VSR reader). The peer just sees EOF. + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, Some(ReplicaAuth::new(SECRET_A))).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + framing::write_message(&mut stream, build_hello(CLUSTER, 0, None)) + .await + .expect("write hello"); + + let res = framing::read_message(&mut stream, MAX_MESSAGE_SIZE).await; + assert!( + matches!(res, Err(IggyError::ConnectionClosed)), + "expected bare close on unauthenticated reject" + ); + + acceptor.shutdown(Duration::from_secs(2)).await; +} + +#[compio::test] +async fn acceptor_rejects_wrong_command_on_frame3() { + // After the acceptor sends ReplicaChallenge(Ok), the dialer must answer with + // a ReplicaFinish. A third frame with any other command is rejected on the + // command byte (before the MAC), the peer is not installed, and - because a + // real dialer has stopped reading by now - no frame is sent back. + let acceptor = Rc::new(IggyMessageBus::new(0)); + let addr = spawn_acceptor(&acceptor, Some(ReplicaAuth::new(SECRET_A))).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + let nonce = auth::random_nonce().expect("nonce"); + framing::write_message(&mut stream, build_hello(CLUSTER, 0, Some(&nonce))) + .await + .expect("write hello"); + + let challenge = framing::read_message(&mut stream, MAX_MESSAGE_SIZE) + .await + .expect("read challenge"); + assert_eq!(challenge.header().command, Command2::ReplicaChallenge); + assert_eq!( + auth::read_status(&challenge.header().reserved_command), + HandshakeStatus::Ok, + ); + + // Wrong command in the finish slot (Prepare instead of ReplicaFinish). + framing::write_message(&mut stream, build_raw(CLUSTER, 0, Command2::Prepare)) + .await + .expect("write wrong finish"); + + let res = framing::read_message(&mut stream, MAX_MESSAGE_SIZE).await; + assert!( + matches!(res, Err(IggyError::ConnectionClosed)), + "expected bare close on wrong-command finish (no reject frame)" + ); + assert!( + !acceptor.replicas().contains(0), + "acceptor must not install a peer that sent a wrong finish command" + ); + + acceptor.shutdown(Duration::from_secs(2)).await; +} + +/// Build a raw `ReplicaHello` frame for the wire-level reject tests. With +/// `nonce` set it opens the authenticated handshake (`reserved_command[0..32]`); +/// without, it is a legacy plaintext announce. +#[allow(clippy::cast_possible_truncation)] +fn build_hello( + cluster_id: u128, + replica_id: u8, + nonce: Option<&[u8; auth::NONCE_LEN]>, +) -> Message { + Message::::new(HEADER_SIZE).transmute_header(|_, h: &mut GenericHeader| { + h.command = Command2::ReplicaHello; + h.cluster = cluster_id; + h.replica = replica_id; + h.size = HEADER_SIZE as u32; + if let Some(nonce) = nonce { + h.reserved_command[..auth::NONCE_LEN].copy_from_slice(nonce); + } + }) +} + +/// Build a raw frame with an arbitrary command for the wire-level tests (used to +/// send a wrong-command third frame in place of a `ReplicaFinish`). +#[allow(clippy::cast_possible_truncation)] +fn build_raw(cluster_id: u128, replica_id: u8, command: Command2) -> Message { + Message::::new(HEADER_SIZE).transmute_header(|_, h: &mut GenericHeader| { + h.command = command; + h.cluster = cluster_id; + h.replica = replica_id; + h.size = HEADER_SIZE as u32; + }) +} + +/// Let an in-flight (failing) handshake run to completion on loopback. +#[allow(clippy::future_not_send)] +async fn settle() { + compio::time::sleep(Duration::from_millis(400)).await; +} + +#[allow(clippy::future_not_send)] +async fn wait_until bool>(cond: F, timeout: Duration) { + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + if cond() { + return; + } + compio::time::sleep(Duration::from_millis(10)).await; + } + panic!("wait_until: condition not met within {timeout:?}"); +} diff --git a/core/message_bus/tests/replica_roundtrip.rs b/core/message_bus/tests/replica_roundtrip.rs index d9ce8ae660..5100b06eff 100644 --- a/core/message_bus/tests/replica_roundtrip.rs +++ b/core/message_bus/tests/replica_roundtrip.rs @@ -53,6 +53,7 @@ async fn two_replicas_exchange_prepare_and_ack() { CLUSTER, 1, 2, + None, accept_delegate_1, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -75,6 +76,8 @@ async fn two_replicas_exchange_prepare_and_ack() { CLUSTER, 0, vec![(1, addr1)], + None, + Duration::from_secs(5), dial_delegate_0, DEFAULT_RECONNECT_PERIOD, ) diff --git a/core/message_bus/tests/shard_zero_gating.rs b/core/message_bus/tests/shard_zero_gating.rs index bd93ab104c..be90629673 100644 --- a/core/message_bus/tests/shard_zero_gating.rs +++ b/core/message_bus/tests/shard_zero_gating.rs @@ -55,6 +55,7 @@ async fn shard_zero_binds_listener_and_starts_connector() { CLUSTER, 1, 2, + None, peer_accept, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -86,6 +87,7 @@ async fn shard_zero_binds_listener_and_starts_connector() { CLUSTER, 0, 2, + None, vec![(1u8, peer_addr)], accepted_replica, accepted_client, @@ -148,6 +150,7 @@ async fn non_zero_shard_skips_io() { CLUSTER, 1, 2, + None, vec![(0u8, dead_peer)], accepted_replica, accepted_client, @@ -225,6 +228,7 @@ async fn shard_zero_binds_all_six_planes_when_configured() { CLUSTER, 0, 1, + None, vec![], accepted_replica, accepted_client, @@ -294,6 +298,7 @@ async fn tcp_tls_listen_addr_without_credentials_rejected() { CLUSTER, 0, 1, + None, vec![], accepted_replica, accepted_client, @@ -335,6 +340,7 @@ async fn wss_listen_addr_without_credentials_rejected() { CLUSTER, 0, 1, + None, vec![], accepted_replica, accepted_client, diff --git a/core/message_bus/tests/tcp_tls_client_roundtrip.rs b/core/message_bus/tests/tcp_tls_client_roundtrip.rs index 9b584b5094..b90b40d414 100644 --- a/core/message_bus/tests/tcp_tls_client_roundtrip.rs +++ b/core/message_bus/tests/tcp_tls_client_roundtrip.rs @@ -83,6 +83,7 @@ async fn start_on_shard_zero_tcp_tls_round_trip() { CLUSTER, 0, 1, + None, vec![], accepted_replica, accepted_client, diff --git a/core/message_bus/tests/vectored_batch.rs b/core/message_bus/tests/vectored_batch.rs index 8adc3c6331..3d6ad84cc5 100644 --- a/core/message_bus/tests/vectored_batch.rs +++ b/core/message_bus/tests/vectored_batch.rs @@ -55,6 +55,7 @@ async fn writer_batches_pipelined_sends_in_order() { CLUSTER, 1, 2, + None, accept_1, message_bus::framing::MAX_MESSAGE_SIZE, Duration::from_secs(10), @@ -71,6 +72,8 @@ async fn writer_batches_pipelined_sends_in_order() { CLUSTER, 0, vec![(1, addr1)], + None, + Duration::from_secs(5), dial_0, DEFAULT_RECONNECT_PERIOD, ) diff --git a/core/message_bus/tests/wss_client_roundtrip.rs b/core/message_bus/tests/wss_client_roundtrip.rs index fc73c03ac1..24048325e5 100644 --- a/core/message_bus/tests/wss_client_roundtrip.rs +++ b/core/message_bus/tests/wss_client_roundtrip.rs @@ -83,6 +83,7 @@ async fn start_on_shard_zero_wss_round_trip() { CLUSTER, 0, 1, + None, vec![], accepted_replica, accepted_client, diff --git a/core/server-ng/config.toml b/core/server-ng/config.toml index 4aa14cc72a..19407bd0e7 100644 --- a/core/server-ng/config.toml +++ b/core/server-ng/config.toml @@ -556,6 +556,18 @@ enabled = false # This prevents accidental cross-cluster communication. name = "iggy-cluster" +# Replica-to-replica authentication (PSK + BLAKE3 keyed-MAC handshake). +[cluster.auth] +# When true, every replica peer must complete the authenticated handshake or be +# rejected, and shared_secret becomes mandatory. Off by default = legacy +# unauthenticated replica traffic. Enabling it is a coordinated-restart change. +enabled = false + +# Cluster-wide pre-shared key, >= 32 bytes of CSPRNG output, byte-identical on +# every node. Prefer the IGGY_CLUSTER_AUTH_SHARED_SECRET env var (masked in +# logs, never persisted) over storing it on disk. Ignored when enabled = false. +shared_secret = "" + # Full roster of cluster members. Byte-identical on every node. The running # node's identity is resolved at launch from the '--replica-id ' CLI # flag, which selects the entry in this list that describes the current diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 3d0e9d86e5..ab3a4580a3 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -40,6 +40,7 @@ use journal::prepare_journal::PrepareJournal; use message_bus::client_listener::{self, RequestHandler}; use message_bus::installer; use message_bus::installer::conn_info::{ClientConnMeta, ClientTransportKind}; +use message_bus::replica::auth::{self, ReplicaAuth}; use message_bus::replica::io as replica_io; use message_bus::replica::listener::{self as replica_listener}; use message_bus::transports::quic::server_config_with_cert; @@ -90,7 +91,6 @@ use std::thread; use std::time::Duration; use tracing::{error, info, warn}; -const CLUSTER_ID: u128 = 1; const SHARD_REPLICA_ID: u8 = 0; type ServerNgMuxStateMachine = MuxStateMachine; @@ -340,6 +340,9 @@ enum MetadataHandoff { } struct TcpTopology { + /// Domain-separation cluster id derived from `cluster.name`; threaded to + /// every consensus instance and the replica handshake so frames agree. + cluster_id: u128, self_replica_id: u8, replica_count: u8, client_listen_addr: SocketAddr, @@ -749,6 +752,7 @@ async fn shard_main( let consensus = restore_metadata_consensus( &journal, restored_op, + topology.cluster_id, topology.self_replica_id, topology.replica_count, Rc::clone(&bus), @@ -1062,6 +1066,7 @@ async fn build_shard_for_thread( namespace, topic_stats, &partition_metadata, + topology.cluster_id, topology.self_replica_id, topology.replica_count, Rc::clone(&bus), @@ -1096,7 +1101,7 @@ async fn build_shard_for_thread( senders, inbox, shards_table, - PartitionConsensusConfig::new(CLUSTER_ID, topology.replica_count, Rc::clone(&bus)), + PartitionConsensusConfig::new(topology.cluster_id, topology.replica_count, Rc::clone(&bus)), CoordinatorConfig::default(), metrics, ) @@ -1111,12 +1116,13 @@ async fn build_shard_for_thread( fn restore_metadata_consensus( journal: &PrepareJournal, restored_op: u64, + cluster_id: u128, self_replica_id: u8, replica_count: u8, bus: Rc, ) -> VsrConsensus> { let mut consensus = VsrConsensus::new( - CLUSTER_ID, + cluster_id, self_replica_id, replica_count, server_common::sharding::METADATA_CONSENSUS_NAMESPACE, @@ -1191,11 +1197,13 @@ const fn validate_recovered_namespace( }) } +#[allow(clippy::too_many_arguments)] async fn load_partition( config: &ServerNgConfig, namespace: IggyNamespace, topic_stats: Arc, partition_metadata: &Partition, + cluster_id: u128, self_replica_id: u8, replica_count: u8, bus: Rc, @@ -1205,7 +1213,7 @@ async fn load_partition( let partition_id = namespace.partition_id(); let stats = Arc::new(PartitionStats::new(topic_stats)); let consensus = VsrConsensus::new( - CLUSTER_ID, + cluster_id, self_replica_id, replica_count, namespace.inner(), @@ -1691,6 +1699,7 @@ fn resolve_tcp_topology( }); } return Ok(TcpTopology { + cluster_id: auth::cluster_domain_id(&config.cluster.name), // Keep parity with the current server binary and the integration // harness: `--replica-id 0` may be passed unconditionally in // single-node mode; any other id is rejected above so the WAL @@ -1743,6 +1752,7 @@ fn resolve_tcp_topology( let peers = resolve_cluster_replica_peers(&config.cluster.nodes, self_replica_id)?; Ok(TcpTopology { + cluster_id: auth::cluster_domain_id(&config.cluster.name), self_replica_id, replica_count, client_listen_addr, @@ -1871,6 +1881,7 @@ async fn start_via_replica_io( tcp_tls, } = accepted_clients; + let replica_auth = load_replica_auth(config); let bound = replica_io::start_on_shard_zero( &shard.bus, replica_addr, @@ -1882,9 +1893,10 @@ async fn start_via_replica_io( tcp_tls_credentials, None, None, - CLUSTER_ID, + topology.cluster_id, topology.self_replica_id, topology.replica_count, + replica_auth, topology.peers.clone(), accepted_replica, tcp, @@ -1967,16 +1979,20 @@ async fn start_manual_runtime( let token = shard.bus.token(); let max_message_size = shard.bus.config().max_message_size; let handshake_grace = shard.bus.config().handshake_grace; + let cluster_id = topology.cluster_id; let self_replica_id = topology.self_replica_id; let replica_count = topology.replica_count; + let replica_auth = load_replica_auth(config); + let auth_for_listener = replica_auth.clone(); let accepted_replica_for_listener = accepted_replica.clone(); let replica_handle = compio::runtime::spawn(async move { replica_listener::run( replica_listener, token, - CLUSTER_ID, + cluster_id, self_replica_id, replica_count, + auth_for_listener, accepted_replica_for_listener, max_message_size, handshake_grace, @@ -1986,9 +2002,11 @@ async fn start_manual_runtime( shard.bus.track_background(replica_handle); connector::start( &shard.bus, - CLUSTER_ID, + cluster_id, topology.self_replica_id, topology.peers.clone(), + replica_auth, + handshake_grace, accepted_replica, shard.bus.config().reconnect_period, ) @@ -2269,6 +2287,22 @@ async fn start_client_listeners( Ok(bound) } +/// Build the replica auth context from cluster config. Returns `None` when the +/// cluster or replica auth is disabled, keeping the handshake in legacy mode. +/// Only the derived MAC key is carried onward in [`ReplicaAuth`]; the raw secret +/// (masked in config logs via `config_env(secret)`) is read here only to derive +/// that key. `ClusterConfig::validate` guarantees a non-empty secret whenever +/// both `cluster.enabled` and `cluster.auth.enabled` are set (validate +/// early-returns `Ok` while `cluster.enabled` is false). +fn load_replica_auth(config: &ServerNgConfig) -> Option { + if !config.cluster.enabled || !config.cluster.auth.enabled { + return None; + } + Some(ReplicaAuth::new( + config.cluster.auth.shared_secret.as_bytes(), + )) +} + fn load_tcp_tls_server_credentials( config: &ServerNgConfig, ) -> Result {