From 3018cfabe4e5088cb70c229dd055af1cc3250291 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Wed, 3 Jun 2026 12:05:10 -0400 Subject: [PATCH] feat(file_state): guard cross-agent file edits from stale sibling writes (#3253) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a process-wide FileStateCoordinator that tracks per-agent read stamps and per-path write stamps across parallel subagents. Write tools (file_write, edit, apply_patch) now detect when another agent modified a file after this agent's last read and return a model-facing error requiring a re-read. Partial-read protection prevents overwrites after paginated reads. - file_state/types.rs — ReadStamp, WriteStamp, FileStateCoordinator - file_state/ops.rs — record_read/write, check_stale/partial, path locks - file_state/agent_context.rs — task-local agent ID for tool attribution - file_read records read stamps after successful reads - file_write/edit/apply_patch check staleness + partial before writing - edit/apply_patch acquire per-path async locks for read-modify-write - spawn_parallel_agents annotates results with stale_parent_reads - OPENHUMAN_FILE_STATE_GUARD=0 disables the guard entirely --- .env.example | 4 + src/core/jsonrpc.rs | 1 + src/openhuman/agent/bus.rs | 67 ++-- .../agent/harness/subagent_runner/ops.rs | 20 +- .../tools/spawn_parallel_agents.rs | 27 ++ src/openhuman/file_state/agent_context.rs | 64 ++++ src/openhuman/file_state/mod.rs | 21 ++ src/openhuman/file_state/ops.rs | 340 ++++++++++++++++++ src/openhuman/file_state/types.rs | 89 +++++ src/openhuman/mod.rs | 1 + .../tools/impl/filesystem/apply_patch.rs | 57 +++ .../tools/impl/filesystem/edit_file.rs | 35 +- .../tools/impl/filesystem/file_read.rs | 13 +- .../tools/impl/filesystem/file_write.rs | 34 +- 14 files changed, 728 insertions(+), 45 deletions(-) create mode 100644 src/openhuman/file_state/agent_context.rs create mode 100644 src/openhuman/file_state/mod.rs create mode 100644 src/openhuman/file_state/ops.rs create mode 100644 src/openhuman/file_state/types.rs diff --git a/.env.example b/.env.example index 58675dc3ee..4cc90a6c90 100644 --- a/.env.example +++ b/.env.example @@ -310,6 +310,10 @@ RUST_BACKTRACE=1 # --------------------------------------------------------------------------- # Testing (do not set in production) # --------------------------------------------------------------------------- +# [optional] Disable the cross-agent file-state guard (staleness detection +# for parallel subagent writes). Set to 0/false/off/no to disable. +# OPENHUMAN_FILE_STATE_GUARD=1 + # [optional] Enable mock service mode # OPENHUMAN_SERVICE_MOCK=0 # [optional] Path to mock state file diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs index cabdc05669..bab1585e3e 100644 --- a/src/core/jsonrpc.rs +++ b/src/core/jsonrpc.rs @@ -1992,6 +1992,7 @@ pub async fn bootstrap_core_runtime(embedded_core: bool) { // --- Event bus bootstrap --- // Ensure the global event bus is initialized (no-op if already done by start_channels). crate::core::event_bus::init_global(crate::core::event_bus::DEFAULT_CAPACITY); + crate::openhuman::file_state::init_global(); // Register domain subscribers for cross-module event handling. // Uses a Once guard so repeated calls to bootstrap_core_runtime() // cannot double-subscribe. diff --git a/src/openhuman/agent/bus.rs b/src/openhuman/agent/bus.rs index 0ee18b1fc9..ab64aea004 100644 --- a/src/openhuman/agent/bus.rs +++ b/src/openhuman/agent/bus.rs @@ -30,6 +30,7 @@ use crate::openhuman::tools::Tool; use super::harness::definition::{AgentDefinitionRegistry, SandboxMode}; use super::harness::{run_tool_call_loop, with_current_sandbox_mode}; +use crate::openhuman::file_state::with_file_state_agent_id; /// Method name used to dispatch an agentic turn through the native bus. pub const AGENT_RUN_TURN_METHOD: &str = "agent.run_turn"; @@ -256,37 +257,45 @@ pub fn register_agent_handlers() { // registry, so re-scoping here is mandatory — the // task-local does NOT propagate across that boundary // implicitly. + let file_state_id = format!( + "bus:{}:{}", + channel_name, + target_agent_id.as_deref().unwrap_or("root") + ); let text = turn_origin::with_origin( origin, - with_current_sandbox_mode(sandbox_mode, async { - run_tool_call_loop( - provider.as_ref(), - &mut history, - tools_registry.as_ref(), - &provider_name, - &model, - temperature, - silent, - &channel_name, - &multimodal, - &multimodal_files, - max_tool_iterations, - on_delta, - visible_tool_names.as_ref(), - &extra_tools, - on_progress, - // Bus path runs ad-hoc agent turns without an Agent - // handle, so we pass None — payload summarization is - // wired into the orchestrator session via Agent::turn, - // not the bus dispatcher. - None, - // Use the default (allow-all) tool policy. Custom - // policies can be wired in via AgentTurnRequest when - // per-channel policy configuration is added (#2134). - &crate::openhuman::tools::policy::DefaultToolPolicy, - ) - .await - }), + with_file_state_agent_id( + file_state_id, + with_current_sandbox_mode(sandbox_mode, async { + run_tool_call_loop( + provider.as_ref(), + &mut history, + tools_registry.as_ref(), + &provider_name, + &model, + temperature, + silent, + &channel_name, + &multimodal, + &multimodal_files, + max_tool_iterations, + on_delta, + visible_tool_names.as_ref(), + &extra_tools, + on_progress, + // Bus path runs ad-hoc agent turns without an Agent + // handle, so we pass None — payload summarization is + // wired into the orchestrator session via Agent::turn, + // not the bus dispatcher. + None, + // Use the default (allow-all) tool policy. Custom + // policies can be wired in via AgentTurnRequest when + // per-channel policy configuration is added (#2134). + &crate::openhuman::tools::policy::DefaultToolPolicy, + ) + .await + }), + ), ) .await .map_err(|e| e.to_string())?; diff --git a/src/openhuman/agent/harness/subagent_runner/ops.rs b/src/openhuman/agent/harness/subagent_runner/ops.rs index 2a3aa0ff17..5c2ac02971 100644 --- a/src/openhuman/agent/harness/subagent_runner/ops.rs +++ b/src/openhuman/agent/harness/subagent_runner/ops.rs @@ -32,6 +32,7 @@ use crate::openhuman::agent::harness::{ use crate::openhuman::context::prompt::{ render_subagent_system_prompt, PromptContext, PromptTool, SubagentRenderOptions, }; +use crate::openhuman::file_state::with_file_state_agent_id; use crate::openhuman::inference::provider::{ChatMessage, ChatRequest, Provider}; use crate::openhuman::memory_conversations::ConversationMessage; use crate::openhuman::tools::{Tool, ToolCategory, ToolSpec}; @@ -363,14 +364,17 @@ pub async fn run_subagent( // state machine lives on the heap (#2234 CI failure under // `cargo-llvm-cov`). let mut outcome = with_spawn_depth(attempted_depth, async { - with_current_sandbox_mode(definition.sandbox_mode, async { - Box::pin(run_typed_mode( - definition, - task_prompt, - &options, - &parent, - &task_id, - )) + with_file_state_agent_id(task_id.clone(), async { + with_current_sandbox_mode(definition.sandbox_mode, async { + Box::pin(run_typed_mode( + definition, + task_prompt, + &options, + &parent, + &task_id, + )) + .await + }) .await }) .await diff --git a/src/openhuman/agent_orchestration/tools/spawn_parallel_agents.rs b/src/openhuman/agent_orchestration/tools/spawn_parallel_agents.rs index 3c5ba895b4..6dfd5cf0d9 100644 --- a/src/openhuman/agent_orchestration/tools/spawn_parallel_agents.rs +++ b/src/openhuman/agent_orchestration/tools/spawn_parallel_agents.rs @@ -5,6 +5,7 @@ use crate::openhuman::agent::harness::definition::{AgentDefinition, AgentDefinit use crate::openhuman::agent::harness::fork_context::current_parent; use crate::openhuman::agent::harness::subagent_runner::{run_subagent, SubagentRunOptions}; use crate::openhuman::agent::progress::AgentProgress; +use crate::openhuman::file_state; use crate::openhuman::tools::traits::{PermissionLevel, Tool, ToolResult}; use async_trait::async_trait; use futures::future::join_all; @@ -51,6 +52,8 @@ struct ParallelAgentResult { ownership: Option, elapsed_ms: u64, iterations: u32, + #[serde(skip_serializing_if = "Vec::is_empty")] + stale_parent_reads: Vec, } #[async_trait] @@ -191,6 +194,7 @@ impl Tool for SpawnParallelAgentsTool { ownership: task.ownership, elapsed_ms: 0, iterations: 0, + stale_parent_reads: Vec::new(), }); continue; } @@ -211,6 +215,7 @@ impl Tool for SpawnParallelAgentsTool { ownership: task.ownership, elapsed_ms: 0, iterations: 0, + stale_parent_reads: Vec::new(), }); continue; }; @@ -237,6 +242,7 @@ impl Tool for SpawnParallelAgentsTool { ownership: task.ownership, elapsed_ms: 0, iterations: 0, + stale_parent_reads: Vec::new(), }); continue; } @@ -391,6 +397,25 @@ impl Tool for SpawnParallelAgentsTool { results.push(result); } + // Parent reminder: check if any child wrote to files the parent + // had previously read, and annotate the result. + if let Some(parent_agent_id) = file_state::current_file_state_agent_id() { + let child_ids: Vec = results.iter().map(|r| r.task_id.clone()).collect(); + let stale = file_state::parent_stale_files(&parent_agent_id, &child_ids); + if !stale.is_empty() { + let stale_strings: Vec = + stale.iter().map(|p| p.display().to_string()).collect(); + tracing::debug!( + parent = %parent_agent_id, + stale_count = stale.len(), + "[file_state] parent reads stale after child writes" + ); + for result in &mut results { + result.stale_parent_reads = stale_strings.clone(); + } + } + } + let failures = results.iter().filter(|r| !r.success).count(); tracing::debug!( parent_session = %parent_session, @@ -457,6 +482,7 @@ async fn run_one_parallel_task( ownership: task.ownership, elapsed_ms: outcome.elapsed.as_millis() as u64, iterations: outcome.iterations as u32, + stale_parent_reads: Vec::new(), } } Err(err) => { @@ -476,6 +502,7 @@ async fn run_one_parallel_task( ownership: task.ownership, elapsed_ms: started.elapsed().as_millis() as u64, iterations: 0, + stale_parent_reads: Vec::new(), } } } diff --git a/src/openhuman/file_state/agent_context.rs b/src/openhuman/file_state/agent_context.rs new file mode 100644 index 0000000000..3278b94a35 --- /dev/null +++ b/src/openhuman/file_state/agent_context.rs @@ -0,0 +1,64 @@ +//! Task-local carrier for the currently-executing agent's identity so +//! file tools can attribute reads/writes without widening the `Tool` trait. +//! +//! Follows the same pattern as `sandbox_context.rs`. Set by the agent +//! harness around tool execution; tools read via [`current_file_state_agent_id`]. + +tokio::task_local! { + static FILE_STATE_AGENT_ID: String; +} + +/// Returns the current agent's identity for file-state tracking, if set. +/// +/// Returns `None` outside an agent turn (CLI, JSON-RPC direct, unit tests). +pub fn current_file_state_agent_id() -> Option { + FILE_STATE_AGENT_ID.try_with(|id| id.clone()).ok() +} + +/// Run `future` with `agent_id` installed as the file-state identity. +pub async fn with_file_state_agent_id(agent_id: String, future: F) -> R +where + F: std::future::Future, +{ + FILE_STATE_AGENT_ID.scope(agent_id, future).await +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn returns_none_outside_scope() { + assert_eq!(current_file_state_agent_id(), None); + } + + #[tokio::test] + async fn installs_and_reads_agent_id() { + let observed = + with_file_state_agent_id("agent-1".into(), async { current_file_state_agent_id() }) + .await; + assert_eq!(observed, Some("agent-1".to_string())); + } + + #[tokio::test] + async fn does_not_leak_across_scopes() { + with_file_state_agent_id("agent-1".into(), async { + assert_eq!(current_file_state_agent_id(), Some("agent-1".to_string())); + }) + .await; + assert_eq!(current_file_state_agent_id(), None); + } + + #[tokio::test] + async fn nested_scope_overrides_outer() { + with_file_state_agent_id("parent".into(), async { + assert_eq!(current_file_state_agent_id(), Some("parent".to_string())); + with_file_state_agent_id("child".into(), async { + assert_eq!(current_file_state_agent_id(), Some("child".to_string())); + }) + .await; + assert_eq!(current_file_state_agent_id(), Some("parent".to_string())); + }) + .await; + } +} diff --git a/src/openhuman/file_state/mod.rs b/src/openhuman/file_state/mod.rs new file mode 100644 index 0000000000..b069a4eae4 --- /dev/null +++ b/src/openhuman/file_state/mod.rs @@ -0,0 +1,21 @@ +//! Process-wide file state coordinator for cross-agent staleness detection. +//! +//! Parallel subagents and worker threads share a workspace. Without +//! coordination one worker can read a file, a sibling can edit it, and +//! the first worker can later write based on stale content. This module +//! tracks per-agent read stamps and per-path write stamps so that write +//! tools can detect the conflict and return a model-facing error +//! requiring the agent to re-read. +//! +//! Disable with `OPENHUMAN_FILE_STATE_GUARD=0` (or `false`). + +mod agent_context; +mod ops; +mod types; + +pub use agent_context::{current_file_state_agent_id, with_file_state_agent_id}; +pub use ops::{ + acquire_path_lock, check_partial_read, check_stale_read, init_global, parent_stale_files, + record_read, record_write, try_global, +}; +pub use types::{FileStateCoordinator, ReadStamp}; diff --git a/src/openhuman/file_state/ops.rs b/src/openhuman/file_state/ops.rs new file mode 100644 index 0000000000..bf8be1133f --- /dev/null +++ b/src/openhuman/file_state/ops.rs @@ -0,0 +1,340 @@ +//! Operational API for the file state coordinator. + +use std::path::PathBuf; +use std::sync::{Arc, OnceLock}; +use std::time::{Instant, SystemTime}; +use tokio::sync::{Mutex, OwnedMutexGuard}; + +use super::types::{FileStateCoordinator, ReadStamp, WriteStamp}; + +// ── Singleton ──────────────────────────────────────────────────────────── + +static GLOBAL: OnceLock> = OnceLock::new(); + +/// Returns `true` when the guard is disabled via env var. +fn is_disabled() -> bool { + static DISABLED: OnceLock = OnceLock::new(); + *DISABLED.get_or_init(|| { + std::env::var("OPENHUMAN_FILE_STATE_GUARD") + .map(|v| matches!(v.as_str(), "0" | "false" | "off" | "no")) + .unwrap_or(false) + }) +} + +/// Initialise the process-global coordinator. Safe to call multiple times; +/// only the first call wins. +pub fn init_global() { + if is_disabled() { + tracing::debug!("[file_state] guard disabled via OPENHUMAN_FILE_STATE_GUARD"); + return; + } + let _ = GLOBAL.set(Arc::new(FileStateCoordinator::new())); + tracing::debug!("[file_state] coordinator initialised"); +} + +/// Returns the global coordinator, or `None` when disabled / not yet initialised. +pub fn try_global() -> Option> { + if is_disabled() { + return None; + } + GLOBAL.get().cloned() +} + +// ── Read tracking ──────────────────────────────────────────────────────── + +/// Record that `agent_id` read `resolved_path` at the given mtime. +pub fn record_read(agent_id: &str, resolved_path: PathBuf, mtime: SystemTime, partial: bool) { + let Some(coord) = try_global() else { return }; + tracing::trace!( + agent = agent_id, + path = %resolved_path.display(), + partial, + "[file_state] record_read" + ); + coord.reads.write().insert( + (agent_id.to_string(), resolved_path), + ReadStamp { + mtime, + timestamp: Instant::now(), + partial, + }, + ); +} + +// ── Write tracking ─────────────────────────────────────────────────────── + +/// Record that `agent_id` wrote `resolved_path`. +pub fn record_write(agent_id: &str, resolved_path: PathBuf) { + let Some(coord) = try_global() else { return }; + tracing::trace!( + agent = agent_id, + path = %resolved_path.display(), + "[file_state] record_write" + ); + let now = Instant::now(); + coord.writes.write().insert( + resolved_path.clone(), + WriteStamp { + writer: agent_id.to_string(), + timestamp: now, + }, + ); + // Also update this agent's own read stamp so its own subsequent + // writes don't trigger self-staleness. + coord.reads.write().insert( + (agent_id.to_string(), resolved_path), + ReadStamp { + mtime: SystemTime::now(), + timestamp: now, + partial: false, + }, + ); +} + +// ── Staleness checks ───────────────────────────────────────────────────── + +/// Check whether `agent_id`'s view of `resolved_path` is stale because +/// another agent wrote to it after this agent's last read. Returns an +/// error message when stale, `None` when safe. +pub fn check_stale_read(agent_id: &str, resolved_path: &PathBuf) -> Option { + let coord = try_global()?; + let reads = coord.reads.read(); + let writes = coord.writes.read(); + let read_key = (agent_id.to_string(), resolved_path.clone()); + let read_stamp = reads.get(&read_key)?; + let ws = writes.get(resolved_path)?; + if ws.writer != agent_id && ws.timestamp > read_stamp.timestamp { + let display_path = resolved_path.display(); + Some(format!( + "Stale read: file '{display_path}' was modified by agent '{}' after your last read. \ + Re-read the file before editing.", + ws.writer + )) + } else { + None + } +} + +/// Check whether `agent_id`'s last read of `resolved_path` was partial. +/// Returns an error message when partial, `None` when safe. +pub fn check_partial_read(agent_id: &str, resolved_path: &PathBuf) -> Option { + let coord = try_global()?; + let reads = coord.reads.read(); + let read_key = (agent_id.to_string(), resolved_path.clone()); + let read_stamp = reads.get(&read_key)?; + if read_stamp.partial { + let display_path = resolved_path.display(); + Some(format!( + "Partial read: your last read of '{display_path}' was partial (paginated). \ + Perform a full read before overwriting." + )) + } else { + None + } +} + +// ── Path locking ───────────────────────────────────────────────────────── + +/// Acquire an async lock on `resolved_path` for a read-modify-write +/// section. Returns an `OwnedMutexGuard` that releases when dropped. +/// Returns `None` when the coordinator is disabled. +pub async fn acquire_path_lock(resolved_path: &PathBuf) -> Option> { + let coord = try_global()?; + let mutex = { + let mut locks = coord.path_locks.write(); + locks + .entry(resolved_path.clone()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + Some(mutex.lock_owned().await) +} + +// ── Parent reminder ────────────────────────────────────────────────────── + +/// Return resolved paths that `parent_agent_id` had previously read but +/// were subsequently written by any agent in `child_agent_ids`. +pub fn parent_stale_files(parent_agent_id: &str, child_agent_ids: &[String]) -> Vec { + let Some(coord) = try_global() else { + return Vec::new(); + }; + let reads = coord.reads.read(); + let writes = coord.writes.read(); + let mut stale = Vec::new(); + for ((agent_id, path), read_stamp) in reads.iter() { + if agent_id != parent_agent_id { + continue; + } + if let Some(ws) = writes.get(path) { + if child_agent_ids.contains(&ws.writer) && ws.timestamp > read_stamp.timestamp { + stale.push(path.clone()); + } + } + } + stale.sort(); + stale.dedup(); + stale +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn fresh_coordinator() -> Arc { + Arc::new(FileStateCoordinator::new()) + } + + #[test] + fn record_and_check_no_staleness() { + let coord = fresh_coordinator(); + let path = PathBuf::from("/tmp/test/a.txt"); + coord.reads.write().insert( + ("agent-a".to_string(), path.clone()), + ReadStamp { + mtime: SystemTime::now(), + timestamp: Instant::now(), + partial: false, + }, + ); + let reads = coord.reads.read(); + let rs = reads.get(&("agent-a".to_string(), path.clone())).unwrap(); + assert!(!rs.partial); + assert!(coord.writes.read().get(&path).is_none()); + } + + #[test] + fn detect_sibling_write_staleness() { + let coord = fresh_coordinator(); + let path = PathBuf::from("/tmp/test/b.txt"); + let read_time = Instant::now(); + coord.reads.write().insert( + ("agent-a".to_string(), path.clone()), + ReadStamp { + mtime: SystemTime::now(), + timestamp: read_time, + partial: false, + }, + ); + std::thread::sleep(Duration::from_millis(5)); + coord.writes.write().insert( + path.clone(), + WriteStamp { + writer: "agent-b".to_string(), + timestamp: Instant::now(), + }, + ); + let stale = coord.stale_reads_for_parent("agent-a"); + assert_eq!(stale, vec![path]); + } + + #[test] + fn own_write_does_not_trigger_staleness() { + let coord = fresh_coordinator(); + let path = PathBuf::from("/tmp/test/c.txt"); + let now = Instant::now(); + coord.reads.write().insert( + ("agent-a".to_string(), path.clone()), + ReadStamp { + mtime: SystemTime::now(), + timestamp: now, + partial: false, + }, + ); + std::thread::sleep(Duration::from_millis(5)); + coord.writes.write().insert( + path.clone(), + WriteStamp { + writer: "agent-a".to_string(), + timestamp: Instant::now(), + }, + ); + let stale = coord.stale_reads_for_parent("agent-a"); + assert!(stale.is_empty()); + } + + #[test] + fn partial_read_detected() { + let coord = fresh_coordinator(); + let path = PathBuf::from("/tmp/test/d.txt"); + coord.reads.write().insert( + ("agent-a".to_string(), path.clone()), + ReadStamp { + mtime: SystemTime::now(), + timestamp: Instant::now(), + partial: true, + }, + ); + let reads = coord.reads.read(); + let rs = reads.get(&("agent-a".to_string(), path.clone())).unwrap(); + assert!(rs.partial); + } + + #[test] + fn parent_stale_files_detects_child_writes() { + let coord = fresh_coordinator(); + let path = PathBuf::from("/tmp/test/e.txt"); + let parent_read_time = Instant::now(); + coord.reads.write().insert( + ("parent".to_string(), path.clone()), + ReadStamp { + mtime: SystemTime::now(), + timestamp: parent_read_time, + partial: false, + }, + ); + std::thread::sleep(Duration::from_millis(5)); + coord.writes.write().insert( + path.clone(), + WriteStamp { + writer: "child-1".to_string(), + timestamp: Instant::now(), + }, + ); + let stale = coord.stale_reads_for_parent("parent"); + assert_eq!(stale, vec![path]); + } + + #[test] + fn paths_written_by_collects_correctly() { + let coord = fresh_coordinator(); + let p1 = PathBuf::from("/tmp/test/f1.txt"); + let p2 = PathBuf::from("/tmp/test/f2.txt"); + coord.writes.write().insert( + p1.clone(), + WriteStamp { + writer: "child-1".to_string(), + timestamp: Instant::now(), + }, + ); + coord.writes.write().insert( + p2.clone(), + WriteStamp { + writer: "child-2".to_string(), + timestamp: Instant::now(), + }, + ); + let result = coord.paths_written_by(&["child-1".to_string()]); + assert_eq!(result.len(), 1); + assert!(result.contains_key("child-1")); + assert_eq!(result["child-1"], vec![p1]); + } + + #[tokio::test] + async fn path_lock_serialises_access() { + let coord = fresh_coordinator(); + let path = PathBuf::from("/tmp/test/lock.txt"); + let mutex = { + let mut locks = coord.path_locks.write(); + locks + .entry(path.clone()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + + let guard = mutex.lock().await; + assert!(mutex.try_lock().is_err()); + drop(guard); + assert!(mutex.try_lock().is_ok()); + } +} diff --git a/src/openhuman/file_state/types.rs b/src/openhuman/file_state/types.rs new file mode 100644 index 0000000000..1e93ea65e4 --- /dev/null +++ b/src/openhuman/file_state/types.rs @@ -0,0 +1,89 @@ +//! Core types for the file state coordinator. + +use parking_lot::RwLock; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Instant, SystemTime}; +use tokio::sync::Mutex; + +/// Snapshot of a single file read by an agent. +#[derive(Debug, Clone)] +pub struct ReadStamp { + /// Filesystem mtime at the moment of the read. + pub mtime: SystemTime, + /// Monotonic clock timestamp of the read. + pub timestamp: Instant, + /// Whether the read was partial (paginated / offset+limit). + pub partial: bool, +} + +/// Per-path write metadata. +#[derive(Debug, Clone)] +pub(crate) struct WriteStamp { + /// Agent identity that performed the write. + pub writer: String, + /// Monotonic clock timestamp of the write. + pub timestamp: Instant, +} + +/// Process-global coordinator that tracks file reads and writes across +/// all agents in the process. Thread-safe via `RwLock`. +pub struct FileStateCoordinator { + /// Per-agent, per-resolved-path read stamps. + /// Key: `(agent_id, canonical_path)`. + pub(crate) reads: RwLock>, + + /// Per-resolved-path write stamp (last writer wins). + pub(crate) writes: RwLock>, + + /// Per-resolved-path async mutex for serialising read-modify-write + /// sections (used by `edit` and `apply_patch`). + pub(crate) path_locks: RwLock>>>, +} + +impl FileStateCoordinator { + pub fn new() -> Self { + Self { + reads: RwLock::new(HashMap::new()), + writes: RwLock::new(HashMap::new()), + path_locks: RwLock::new(HashMap::new()), + } + } + + /// Return the set of resolved paths that `parent_agent_id` has read + /// but were subsequently written by a different agent. + pub fn stale_reads_for_parent(&self, parent_agent_id: &str) -> Vec { + let reads = self.reads.read(); + let writes = self.writes.read(); + let mut stale = Vec::new(); + for ((agent_id, path), read_stamp) in reads.iter() { + if agent_id != parent_agent_id { + continue; + } + if let Some(ws) = writes.get(path) { + if ws.writer != parent_agent_id && ws.timestamp > read_stamp.timestamp { + stale.push(path.clone()); + } + } + } + stale.sort(); + stale.dedup(); + stale + } + + /// Collect all paths written by agents in the given set. + pub fn paths_written_by(&self, agent_ids: &[String]) -> HashMap> { + let writes = self.writes.read(); + let mut result: HashMap> = HashMap::new(); + for (path, ws) in writes.iter() { + if agent_ids.contains(&ws.writer) { + result + .entry(ws.writer.clone()) + .or_default() + .push(path.clone()); + } + } + result + } +} diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index 542bb0ad0f..d3a28b314e 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -46,6 +46,7 @@ pub mod devices; pub mod doctor; pub mod embeddings; pub mod encryption; +pub mod file_state; pub mod health; pub mod heartbeat; pub mod http_host; diff --git a/src/openhuman/tools/impl/filesystem/apply_patch.rs b/src/openhuman/tools/impl/filesystem/apply_patch.rs index 76dd2004bb..6f57f2b82d 100644 --- a/src/openhuman/tools/impl/filesystem/apply_patch.rs +++ b/src/openhuman/tools/impl/filesystem/apply_patch.rs @@ -6,6 +6,7 @@ //! before any file is written. If any edit fails validation, no files //! are touched. +use crate::openhuman::file_state; use crate::openhuman::security::{CommandClass, GateDecision, SecurityPolicy}; use crate::openhuman::tools::traits::{PermissionLevel, Tool, ToolResult}; use async_trait::async_trait; @@ -143,6 +144,55 @@ impl Tool for ApplyPatchTool { }); } + // Acquire per-path locks for all unique paths before any reads. + let unique_paths: Vec = { + let mut seen = std::collections::HashSet::new(); + parsed + .iter() + .filter_map(|e| { + if seen.insert(e.path.clone()) { + Some(e.path.clone()) + } else { + None + } + }) + .collect() + }; + let mut _path_guards = Vec::new(); + for p in &unique_paths { + let full = self.security.action_dir.join(p); + if let Ok(resolved) = tokio::fs::canonicalize(&full).await { + if let Some(guard) = file_state::acquire_path_lock(&resolved).await { + _path_guards.push(guard); + } + } + } + + // File-state guard: reject edits based on stale or partial reads. + if let Some(agent_id) = file_state::current_file_state_agent_id() { + for p in &unique_paths { + let full = self.security.action_dir.join(p); + if let Ok(resolved) = tokio::fs::canonicalize(&full).await { + if let Some(msg) = file_state::check_stale_read(&agent_id, &resolved) { + tracing::debug!( + agent = %agent_id, + path = %resolved.display(), + "[file_state] apply_patch blocked: stale read" + ); + return Ok(ToolResult::error(msg)); + } + if let Some(msg) = file_state::check_partial_read(&agent_id, &resolved) { + tracing::debug!( + agent = %agent_id, + path = %resolved.display(), + "[file_state] apply_patch blocked: partial read" + ); + return Ok(ToolResult::error(msg)); + } + } + } + } + // Resolve paths + load file contents (once per file). Apply edits in // memory; if any edit fails, return without writing. let mut buffers: HashMap = HashMap::new(); @@ -241,6 +291,13 @@ impl Tool for ApplyPatchTool { written.push(buf); summary.push(format!("{path}: {} replacement(s)", buf.edit_count)); } + // Record writes in the file-state coordinator. + if let Some(agent_id) = file_state::current_file_state_agent_id() { + for buf in buffers.values() { + file_state::record_write(&agent_id, buf.resolved.clone()); + } + } + summary.sort(); Ok(ToolResult::success(format!( "Applied {} edit(s) across {} file(s)\n{}", diff --git a/src/openhuman/tools/impl/filesystem/edit_file.rs b/src/openhuman/tools/impl/filesystem/edit_file.rs index 15e79f1853..41b1518498 100644 --- a/src/openhuman/tools/impl/filesystem/edit_file.rs +++ b/src/openhuman/tools/impl/filesystem/edit_file.rs @@ -6,6 +6,7 @@ //! exactly once in the file (so the model can't accidentally edit //! every match). Set `replace_all` to override. +use crate::openhuman::file_state; use crate::openhuman::security::{CommandClass, GateDecision, SecurityPolicy}; use crate::openhuman::tools::traits::{PermissionLevel, Tool, ToolResult}; use async_trait::async_trait; @@ -135,6 +136,29 @@ impl Tool for EditFileTool { } } + // Acquire per-path lock for the read-modify-write section. + let _path_guard = file_state::acquire_path_lock(&resolved).await; + + // File-state guard: reject edits based on stale or partial reads. + if let Some(agent_id) = file_state::current_file_state_agent_id() { + if let Some(msg) = file_state::check_stale_read(&agent_id, &resolved) { + tracing::debug!( + agent = %agent_id, + path = %resolved.display(), + "[file_state] edit blocked: stale read" + ); + return Ok(ToolResult::error(msg)); + } + if let Some(msg) = file_state::check_partial_read(&agent_id, &resolved) { + tracing::debug!( + agent = %agent_id, + path = %resolved.display(), + "[file_state] edit blocked: partial read" + ); + return Ok(ToolResult::error(msg)); + } + } + let contents = match tokio::fs::read_to_string(&resolved).await { Ok(c) => c, Err(e) => return Ok(ToolResult::error(format!("Failed to read file: {e}"))), @@ -160,9 +184,14 @@ impl Tool for EditFileTool { }; match tokio::fs::write(&resolved, &updated).await { - Ok(()) => Ok(ToolResult::success(format!( - "Edited {path}: {count} replacement(s)" - ))), + Ok(()) => { + if let Some(agent_id) = file_state::current_file_state_agent_id() { + file_state::record_write(&agent_id, resolved); + } + Ok(ToolResult::success(format!( + "Edited {path}: {count} replacement(s)" + ))) + } Err(e) => Ok(ToolResult::error(format!("Failed to write file: {e}"))), } } diff --git a/src/openhuman/tools/impl/filesystem/file_read.rs b/src/openhuman/tools/impl/filesystem/file_read.rs index 5438c34142..3e07a9fa06 100644 --- a/src/openhuman/tools/impl/filesystem/file_read.rs +++ b/src/openhuman/tools/impl/filesystem/file_read.rs @@ -1,3 +1,4 @@ +use crate::openhuman::file_state; use crate::openhuman::security::SecurityPolicy; use crate::openhuman::tools::traits::{Tool, ToolResult}; use async_trait::async_trait; @@ -90,7 +91,17 @@ impl Tool for FileReadTool { } match tokio::fs::read_to_string(&resolved_path).await { - Ok(contents) => Ok(ToolResult::success(contents)), + Ok(contents) => { + if let Some(agent_id) = file_state::current_file_state_agent_id() { + let mtime = tokio::fs::metadata(&resolved_path) + .await + .ok() + .and_then(|m| m.modified().ok()) + .unwrap_or(std::time::SystemTime::UNIX_EPOCH); + file_state::record_read(&agent_id, resolved_path, mtime, false); + } + Ok(ToolResult::success(contents)) + } Err(e) => Ok(ToolResult::error(format!("Failed to read file: {e}"))), } } diff --git a/src/openhuman/tools/impl/filesystem/file_write.rs b/src/openhuman/tools/impl/filesystem/file_write.rs index 73fc9245b2..169f4e1cd3 100644 --- a/src/openhuman/tools/impl/filesystem/file_write.rs +++ b/src/openhuman/tools/impl/filesystem/file_write.rs @@ -1,3 +1,4 @@ +use crate::openhuman::file_state; use crate::openhuman::security::{CommandClass, GateDecision, SecurityPolicy}; use crate::openhuman::tools::traits::{PermissionLevel, Tool, ToolResult}; use async_trait::async_trait; @@ -129,11 +130,36 @@ impl Tool for FileWriteTool { )); } + // File-state guard: reject writes based on stale or partial reads. + if let Some(agent_id) = file_state::current_file_state_agent_id() { + if let Some(msg) = file_state::check_stale_read(&agent_id, &resolved_target) { + tracing::debug!( + agent = %agent_id, + path = %resolved_target.display(), + "[file_state] file_write blocked: stale read" + ); + return Ok(ToolResult::error(msg)); + } + if let Some(msg) = file_state::check_partial_read(&agent_id, &resolved_target) { + tracing::debug!( + agent = %agent_id, + path = %resolved_target.display(), + "[file_state] file_write blocked: partial read" + ); + return Ok(ToolResult::error(msg)); + } + } + match tokio::fs::write(&resolved_target, content).await { - Ok(()) => Ok(ToolResult::success(format!( - "Written {} bytes to {path}", - content.len() - ))), + Ok(()) => { + if let Some(agent_id) = file_state::current_file_state_agent_id() { + file_state::record_write(&agent_id, resolved_target); + } + Ok(ToolResult::success(format!( + "Written {} bytes to {path}", + content.len() + ))) + } Err(e) => Ok(ToolResult::error(format!("Failed to write file: {e}"))), } }