Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ RUST_BACKTRACE=1
# ---------------------------------------------------------------------------
# Testing (do not set in production)
# ---------------------------------------------------------------------------
# [optional] Disable the cross-agent file-state guard (staleness detection
# for parallel subagent writes). Set to 0/false/off/no to disable.
# OPENHUMAN_FILE_STATE_GUARD=1

# [optional] Enable mock service mode
# OPENHUMAN_SERVICE_MOCK=0
# [optional] Path to mock state file
Expand Down
1 change: 1 addition & 0 deletions src/core/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,7 @@ pub async fn bootstrap_core_runtime(embedded_core: bool) {
// --- Event bus bootstrap ---
// Ensure the global event bus is initialized (no-op if already done by start_channels).
crate::core::event_bus::init_global(crate::core::event_bus::DEFAULT_CAPACITY);
crate::openhuman::file_state::init_global();
// Register domain subscribers for cross-module event handling.
// Uses a Once guard so repeated calls to bootstrap_core_runtime()
// cannot double-subscribe.
Expand Down
67 changes: 38 additions & 29 deletions src/openhuman/agent/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::openhuman::tools::Tool;

use super::harness::definition::{AgentDefinitionRegistry, SandboxMode};
use super::harness::{run_tool_call_loop, with_current_sandbox_mode};
use crate::openhuman::file_state::with_file_state_agent_id;

/// Method name used to dispatch an agentic turn through the native bus.
pub const AGENT_RUN_TURN_METHOD: &str = "agent.run_turn";
Expand Down Expand Up @@ -256,37 +257,45 @@ pub fn register_agent_handlers() {
// registry, so re-scoping here is mandatory — the
// task-local does NOT propagate across that boundary
// implicitly.
let file_state_id = format!(
"bus:{}:{}",
channel_name,
target_agent_id.as_deref().unwrap_or("root")
);
Comment on lines +260 to +264
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make the bus file-state ID unique per turn.

"bus:{channel}:{target_agent}" collapses every concurrent turn on the same channel/agent into one logical writer. Since stale-read detection ignores same-agent writes, overlapping root turns can overwrite each other without tripping the new guard. Include a per-turn/session discriminator here (or generate one once per request) so each run gets its own file-state identity.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/agent/bus.rs` around lines 260 - 264, The current file_state_id
built in the file_state_id variable ("bus:{channel_name}:{target_agent_id}") is
not unique per turn; change it to include a per-turn/session discriminator (for
example a request/turn id passed into this context, or generate a short
UUID/timestamp once per request) so each run gets its own identity. Update the
construction of file_state_id to append that discriminator (e.g.
"bus:{channel_name}:{target_agent_id}:{turn_id}") and ensure the discriminator
is created once per request/run (not per write) so other logic that checks
same-agent writes will no longer collapse concurrent turns; make this change
where file_state_id is formed and propagate the new turn/request id into this
scope if necessary.

let text = turn_origin::with_origin(
origin,
with_current_sandbox_mode(sandbox_mode, async {
run_tool_call_loop(
provider.as_ref(),
&mut history,
tools_registry.as_ref(),
&provider_name,
&model,
temperature,
silent,
&channel_name,
&multimodal,
&multimodal_files,
max_tool_iterations,
on_delta,
visible_tool_names.as_ref(),
&extra_tools,
on_progress,
// Bus path runs ad-hoc agent turns without an Agent
// handle, so we pass None — payload summarization is
// wired into the orchestrator session via Agent::turn,
// not the bus dispatcher.
None,
// Use the default (allow-all) tool policy. Custom
// policies can be wired in via AgentTurnRequest when
// per-channel policy configuration is added (#2134).
&crate::openhuman::tools::policy::DefaultToolPolicy,
)
.await
}),
with_file_state_agent_id(
file_state_id,
with_current_sandbox_mode(sandbox_mode, async {
run_tool_call_loop(
provider.as_ref(),
&mut history,
tools_registry.as_ref(),
&provider_name,
&model,
temperature,
silent,
&channel_name,
&multimodal,
&multimodal_files,
max_tool_iterations,
on_delta,
visible_tool_names.as_ref(),
&extra_tools,
on_progress,
// Bus path runs ad-hoc agent turns without an Agent
// handle, so we pass None — payload summarization is
// wired into the orchestrator session via Agent::turn,
// not the bus dispatcher.
None,
// Use the default (allow-all) tool policy. Custom
// policies can be wired in via AgentTurnRequest when
// per-channel policy configuration is added (#2134).
&crate::openhuman::tools::policy::DefaultToolPolicy,
)
.await
}),
),
)
.await
.map_err(|e| e.to_string())?;
Expand Down
20 changes: 12 additions & 8 deletions src/openhuman/agent/harness/subagent_runner/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::openhuman::agent::harness::{
use crate::openhuman::context::prompt::{
render_subagent_system_prompt, PromptContext, PromptTool, SubagentRenderOptions,
};
use crate::openhuman::file_state::with_file_state_agent_id;
use crate::openhuman::inference::provider::{ChatMessage, ChatRequest, Provider};
use crate::openhuman::memory_conversations::ConversationMessage;
use crate::openhuman::tools::{Tool, ToolCategory, ToolSpec};
Expand Down Expand Up @@ -363,14 +364,17 @@ pub async fn run_subagent(
// state machine lives on the heap (#2234 CI failure under
// `cargo-llvm-cov`).
let mut outcome = with_spawn_depth(attempted_depth, async {
with_current_sandbox_mode(definition.sandbox_mode, async {
Box::pin(run_typed_mode(
definition,
task_prompt,
&options,
&parent,
&task_id,
))
with_file_state_agent_id(task_id.clone(), async {
with_current_sandbox_mode(definition.sandbox_mode, async {
Box::pin(run_typed_mode(
definition,
task_prompt,
&options,
&parent,
&task_id,
))
.await
})
.await
})
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::openhuman::agent::harness::definition::{AgentDefinition, AgentDefinit
use crate::openhuman::agent::harness::fork_context::current_parent;
use crate::openhuman::agent::harness::subagent_runner::{run_subagent, SubagentRunOptions};
use crate::openhuman::agent::progress::AgentProgress;
use crate::openhuman::file_state;
use crate::openhuman::tools::traits::{PermissionLevel, Tool, ToolResult};
use async_trait::async_trait;
use futures::future::join_all;
Expand Down Expand Up @@ -51,6 +52,8 @@ struct ParallelAgentResult {
ownership: Option<String>,
elapsed_ms: u64,
iterations: u32,
#[serde(skip_serializing_if = "Vec::is_empty")]
stale_parent_reads: Vec<String>,
}

#[async_trait]
Expand Down Expand Up @@ -191,6 +194,7 @@ impl Tool for SpawnParallelAgentsTool {
ownership: task.ownership,
elapsed_ms: 0,
iterations: 0,
stale_parent_reads: Vec::new(),
});
continue;
}
Expand All @@ -211,6 +215,7 @@ impl Tool for SpawnParallelAgentsTool {
ownership: task.ownership,
elapsed_ms: 0,
iterations: 0,
stale_parent_reads: Vec::new(),
});
continue;
};
Expand All @@ -237,6 +242,7 @@ impl Tool for SpawnParallelAgentsTool {
ownership: task.ownership,
elapsed_ms: 0,
iterations: 0,
stale_parent_reads: Vec::new(),
});
continue;
}
Expand Down Expand Up @@ -391,6 +397,25 @@ impl Tool for SpawnParallelAgentsTool {
results.push(result);
}

// Parent reminder: check if any child wrote to files the parent
// had previously read, and annotate the result.
if let Some(parent_agent_id) = file_state::current_file_state_agent_id() {
let child_ids: Vec<String> = results.iter().map(|r| r.task_id.clone()).collect();
let stale = file_state::parent_stale_files(&parent_agent_id, &child_ids);
if !stale.is_empty() {
let stale_strings: Vec<String> =
stale.iter().map(|p| p.display().to_string()).collect();
tracing::debug!(
parent = %parent_agent_id,
stale_count = stale.len(),
"[file_state] parent reads stale after child writes"
);
for result in &mut results {
result.stale_parent_reads = stale_strings.clone();
}
}
}

let failures = results.iter().filter(|r| !r.success).count();
tracing::debug!(
parent_session = %parent_session,
Expand Down Expand Up @@ -457,6 +482,7 @@ async fn run_one_parallel_task(
ownership: task.ownership,
elapsed_ms: outcome.elapsed.as_millis() as u64,
iterations: outcome.iterations as u32,
stale_parent_reads: Vec::new(),
}
}
Err(err) => {
Expand All @@ -476,6 +502,7 @@ async fn run_one_parallel_task(
ownership: task.ownership,
elapsed_ms: started.elapsed().as_millis() as u64,
iterations: 0,
stale_parent_reads: Vec::new(),
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions src/openhuman/file_state/agent_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Task-local carrier for the currently-executing agent's identity so
//! file tools can attribute reads/writes without widening the `Tool` trait.
//!
//! Follows the same pattern as `sandbox_context.rs`. Set by the agent
//! harness around tool execution; tools read via [`current_file_state_agent_id`].

tokio::task_local! {
static FILE_STATE_AGENT_ID: String;
}

/// Returns the current agent's identity for file-state tracking, if set.
///
/// Returns `None` outside an agent turn (CLI, JSON-RPC direct, unit tests).
pub fn current_file_state_agent_id() -> Option<String> {
FILE_STATE_AGENT_ID.try_with(|id| id.clone()).ok()
}

/// Run `future` with `agent_id` installed as the file-state identity.
pub async fn with_file_state_agent_id<F, R>(agent_id: String, future: F) -> R
where
F: std::future::Future<Output = R>,
{
FILE_STATE_AGENT_ID.scope(agent_id, future).await
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn returns_none_outside_scope() {
assert_eq!(current_file_state_agent_id(), None);
}

#[tokio::test]
async fn installs_and_reads_agent_id() {
let observed =
with_file_state_agent_id("agent-1".into(), async { current_file_state_agent_id() })
.await;
assert_eq!(observed, Some("agent-1".to_string()));
}

#[tokio::test]
async fn does_not_leak_across_scopes() {
with_file_state_agent_id("agent-1".into(), async {
assert_eq!(current_file_state_agent_id(), Some("agent-1".to_string()));
})
.await;
assert_eq!(current_file_state_agent_id(), None);
}

#[tokio::test]
async fn nested_scope_overrides_outer() {
with_file_state_agent_id("parent".into(), async {
assert_eq!(current_file_state_agent_id(), Some("parent".to_string()));
with_file_state_agent_id("child".into(), async {
assert_eq!(current_file_state_agent_id(), Some("child".to_string()));
})
.await;
assert_eq!(current_file_state_agent_id(), Some("parent".to_string()));
})
.await;
}
}
21 changes: 21 additions & 0 deletions src/openhuman/file_state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Process-wide file state coordinator for cross-agent staleness detection.
//!
//! Parallel subagents and worker threads share a workspace. Without
//! coordination one worker can read a file, a sibling can edit it, and
//! the first worker can later write based on stale content. This module
//! tracks per-agent read stamps and per-path write stamps so that write
//! tools can detect the conflict and return a model-facing error
//! requiring the agent to re-read.
//!
//! Disable with `OPENHUMAN_FILE_STATE_GUARD=0` (or `false`).

mod agent_context;
mod ops;
mod types;

pub use agent_context::{current_file_state_agent_id, with_file_state_agent_id};
pub use ops::{
acquire_path_lock, check_partial_read, check_stale_read, init_global, parent_stale_files,
record_read, record_write, try_global,
};
pub use types::{FileStateCoordinator, ReadStamp};
Loading
Loading