Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/openhuman/agent/harness/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,18 @@ impl Agent {
};
let mut buf: Vec<ChatMessage> = Vec::new();

let outcome = super::super::engine::run_turn_engine(
// Box-pin the parent agent's engine call so its ~600-line
// generator state lives on the heap. Tools that delegate to
// sub-agents (orchestrator → researcher / personality /
// archetype / skill) recurse back into another
// `run_turn_engine` via `run_subagent`; without the box,
// both engines' state machines pile up on the same tokio
// worker stack and overflow the 2 MiB default. The inner
// boxes inside `run_typed_mode` aren't reached if the
// overflow happens during the parent's poll on the way in
// — verified against the `chat-harness-subagent` Playwright
// lane crash on PR #3151.
let outcome = Box::pin(super::super::engine::run_turn_engine(
provider.as_ref(),
&mut buf,
&mut tool_source,
Expand All @@ -545,7 +556,7 @@ impl Agent {
max_iterations,
None, // the web bridge streams via on_progress deltas, not on_delta
&[],
)
))
.await?;

// Pull the observer's accounting out, then drop it to release the
Expand Down
218 changes: 127 additions & 91 deletions src/openhuman/agent/harness/subagent_runner/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,103 +301,121 @@ pub async fn run_subagent(
task_prompt: &str,
options: SubagentRunOptions,
) -> Result<SubagentRunOutcome, SubagentRunError> {
let parent = current_parent().ok_or(SubagentRunError::NoParentContext)?;
let task_id = options
.task_id
.clone()
.unwrap_or_else(|| format!("sub-{}", uuid::Uuid::new_v4()));
let started = Instant::now();
let current_depth = current_spawn_depth();
let attempted_depth = current_depth.saturating_add(1);
// Unconditionally heap-allocate the entire run_subagent body so
// every caller — `dispatch_subagent`, `delegate_to_personality`,
// `spawn_subagent`, `spawn_parallel_agents`, `spawn_worker_thread`,
// `continue_subagent`, `escalation`, `payload_summarizer`,
// `session/turn.rs` extraction path, `agent_orchestration::ops`, and
// the recursive case from a sub-agent's own tool — doesn't have to
// carry this future's state inline. Tools that delegate run inside
// the parent agent's already-deep `run_turn_engine` poll, so the
// parent's stack would otherwise pile (parent engine state +
// dispatch_subagent state + run_subagent's wrapper state +
// run_typed_mode state + child engine state) onto tokio's 2 MiB
// worker stack and abort with "thread 'tokio-rt-worker' has
// overflowed its stack, fatal runtime error: stack overflow"
// — observed at `[subagent_runner] dispatching agent_id=researcher
// ...` in the `chat-harness-subagent` Playwright lane crash. The
// inner `Box::pin`s around `run_typed_mode` / `run_inner_loop` /
// child `run_turn_engine` further chunk the child's state so a
// single sub-agent run can't blow the stack either.
Box::pin(async move {
let parent = current_parent().ok_or(SubagentRunError::NoParentContext)?;
let task_id = options
.task_id
.clone()
.unwrap_or_else(|| format!("sub-{}", uuid::Uuid::new_v4()));
let started = Instant::now();
let current_depth = current_spawn_depth();
let attempted_depth = current_depth.saturating_add(1);

if attempted_depth > MAX_SPAWN_DEPTH {
tracing::warn!(
if attempted_depth > MAX_SPAWN_DEPTH {
tracing::warn!(
agent_id = %definition.id,
task_id = %task_id,
current_depth,
attempted_depth,
max_depth = MAX_SPAWN_DEPTH,
"[subagent_runner] spawn depth exceeded"
);
return Err(SubagentRunError::SpawnDepthExceeded {
attempted_depth,
max_depth: MAX_SPAWN_DEPTH,
});
}

tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
current_depth,
attempted_depth,
max_depth = MAX_SPAWN_DEPTH,
"[subagent_runner] spawn depth exceeded"
spawn_depth = attempted_depth,
max_spawn_depth = MAX_SPAWN_DEPTH,
prompt_chars = task_prompt.chars().count(),
skill_filter = ?options.skill_filter_override.as_deref().or(definition.skill_filter.as_deref()),
"[subagent_runner] dispatching"
);
return Err(SubagentRunError::SpawnDepthExceeded {
attempted_depth,
max_depth: MAX_SPAWN_DEPTH,
});
}

tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
spawn_depth = attempted_depth,
max_spawn_depth = MAX_SPAWN_DEPTH,
prompt_chars = task_prompt.chars().count(),
skill_filter = ?options.skill_filter_override.as_deref().or(definition.skill_filter.as_deref()),
"[subagent_runner] dispatching"
);

// Install the sub-agent's declared `sandbox_mode` as the active
// task-local for every tool invocation inside this run. Tools that
// want to gate on it (e.g. `composio_execute` rejecting
// Write/Admin slugs under `ReadOnly`) read it via
// `current_sandbox_mode()`; tools that don't care just ignore it.
// Box-pin the inner future so the large `run_typed_mode` state machine
// lives on the heap. Two stacked `task_local::scope` wrappers
// (`with_spawn_depth` + `with_current_sandbox_mode`) plus the deeply
// nested provider/tool loop inside `run_typed_mode` are otherwise large
// enough — under `cargo-llvm-cov` instrumentation in particular — to
// overflow tokio's 2 MiB per-thread test stack. See #2234 CI failure.
let mut outcome = with_spawn_depth(attempted_depth, async {
with_current_sandbox_mode(definition.sandbox_mode, async {
Box::pin(run_typed_mode(
definition,
task_prompt,
&options,
&parent,
&task_id,
))
// Install the sub-agent's declared `sandbox_mode` as the active
// task-local for every tool invocation inside this run. Tools
// that want to gate on it (e.g. `composio_execute` rejecting
// Write/Admin slugs under `ReadOnly`) read it via
// `current_sandbox_mode()`; tools that don't care just ignore
// it. Box-pin the inner future so the large `run_typed_mode`
// state machine lives on the heap (#2234 CI failure under
// `cargo-llvm-cov`).
let mut outcome = with_spawn_depth(attempted_depth, async {
with_current_sandbox_mode(definition.sandbox_mode, async {
Box::pin(run_typed_mode(
definition,
task_prompt,
&options,
&parent,
&task_id,
))
.await
})
.await
})
.await
})
.await?;

// Truncate result to the definition's cap if set.
// Use char-count (not byte-length) to avoid panicking on multi-byte
// UTF-8 sequences at the truncation boundary.
if let Some(cap) = definition.max_result_chars {
let original_chars = outcome.output.chars().count();
if original_chars > cap {
tracing::debug!(
agent_id = %definition.id,
original_chars,
cap,
"[subagent_runner] truncating oversized result to max_result_chars cap"
);
// Find the byte offset of the cap-th character boundary so
// `truncate` never lands mid-codepoint.
let byte_offset = outcome
.output
.char_indices()
.nth(cap)
.map(|(i, _)| i)
.unwrap_or(outcome.output.len());
outcome.output.truncate(byte_offset);
outcome.output.push_str("\n[...truncated]");
.await?;

// Truncate result to the definition's cap if set.
// Use char-count (not byte-length) to avoid panicking on
// multi-byte UTF-8 sequences at the truncation boundary.
if let Some(cap) = definition.max_result_chars {
let original_chars = outcome.output.chars().count();
if original_chars > cap {
tracing::debug!(
agent_id = %definition.id,
original_chars,
cap,
"[subagent_runner] truncating oversized result to max_result_chars cap"
);
// Find the byte offset of the cap-th character boundary
// so `truncate` never lands mid-codepoint.
let byte_offset = outcome
.output
.char_indices()
.nth(cap)
.map(|(i, _)| i)
.unwrap_or(outcome.output.len());
outcome.output.truncate(byte_offset);
outcome.output.push_str("\n[...truncated]");
}
}
}

tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
spawn_depth = attempted_depth,
elapsed_ms = outcome.elapsed.as_millis() as u64,
iterations = outcome.iterations,
output_chars = outcome.output.chars().count(),
"[subagent_runner] completed"
);
tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
spawn_depth = attempted_depth,
elapsed_ms = outcome.elapsed.as_millis() as u64,
iterations = outcome.iterations,
output_chars = outcome.output.chars().count(),
"[subagent_runner] completed"
);

let _ = started; // silence unused-warning if logging is compiled out
Ok(outcome)
let _ = started; // silence unused-warning if logging is compiled out
Ok(outcome)
})
.await
}

// ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -1174,7 +1192,11 @@ async fn run_typed_mode(
// Transcript persistence lives INSIDE the loop (one write per
// provider response), mirroring the main-agent turn loop in
// `session/turn.rs`. No post-loop write needed here.
let (output, iterations, _agg_usage, early_exit_tool) = run_inner_loop(
// Box-pin so `run_inner_loop`'s state machine (which itself wraps
// the engine call below) is heap-allocated independently of
// `run_typed_mode`. Belt-and-braces with the inner engine box at
// the recursion boundary inside `run_inner_loop`.
let (output, iterations, _agg_usage, early_exit_tool) = Box::pin(run_inner_loop(
subagent_provider.as_ref(),
&mut history,
&parent.all_tools,
Expand All @@ -1191,7 +1213,7 @@ async fn run_typed_mode(
handoff_cache.as_deref(),
parent,
definition.iteration_policy == IterationPolicy::Extended,
)
))
.await?;

// Determine status: if the turn engine exited early because of
Expand Down Expand Up @@ -1420,7 +1442,21 @@ async fn run_inner_loop(
};

let parser = super::super::engine::DefaultParser;
let outcome = super::super::engine::run_turn_engine(
// Heap-allocate the child `run_turn_engine` state machine. Sub-agents
// run as nested polls inside the *parent* agent's `run_turn_engine`
// (the orchestrator → tool exec → `dispatch_subagent` → `run_subagent`
// chain), so without the box the parent's tokio worker poll stack
// also has to carry the child engine's ~600-line generator. That
// crosses the 2 MiB tokio worker default and aborts with
// "thread 'tokio-rt-worker' has overflowed its stack" — see the
// `chat-harness-subagent` Playwright lane crash logged here:
// `[subagent_runner] dispatching agent_id=researcher ... → fatal
// runtime error: stack overflow`. Boxing here breaks the stack
// accumulation at the recursion boundary. Smoke-tested in
// `nested_subagent_dispatch_runs_on_a_constrained_worker_stack`;
// the deep end-to-end catcher is the `chat-harness-subagent`
// Playwright spec.
let outcome = Box::pin(super::super::engine::run_turn_engine(
provider,
history,
&mut tool_source,
Expand All @@ -1437,7 +1473,7 @@ async fn run_inner_loop(
max_iterations,
None, // sub-agents don't stream a draft
&["ask_user_clarification"],
)
))
.await?;

Ok((
Expand Down
94 changes: 94 additions & 0 deletions src/openhuman/agent/harness/subagent_runner/ops_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,3 +1287,97 @@ fn unsigned_in_user_fails_probe() {
"user with neither backend session nor direct key must NOT be reported as signed in"
);
}

/// Sanity-check: a parent agent delegating to a sub-agent must complete
/// without panicking, even on a worker thread with a tight stack — this
/// is the same recursion shape that crashed the
/// `chat-harness-subagent` Playwright lane in production with
/// `thread 'tokio-rt-worker' has overflowed its stack, fatal runtime
/// error: stack overflow`.
///
/// The deep ground-truth regression catcher for this is the
/// `chat-harness-subagent.spec.ts` Playwright spec, which exercises the
/// real orchestrator → researcher dispatch end-to-end (real provider
/// stream, real config load, real tool registry). The scripted unit
/// path here has much smaller per-frame state than production, so a
/// single stack size doesn't cleanly bracket boxed-vs-unboxed — we use
/// the loose 1 MiB worker stack as a smoke check that the dispatch
/// path remains poll-bounded after refactors. See `subagent_runner/
/// ops.rs` `Box::pin` callsites for the structural fix.
#[test]
fn nested_subagent_dispatch_runs_on_a_constrained_worker_stack() {
use async_trait::async_trait;
use std::sync::Arc;

struct RecursiveDelegateTool {
inner_def: AgentDefinition,
}

#[async_trait]
impl Tool for RecursiveDelegateTool {
fn name(&self) -> &str {
"delegate_inner"
}
fn description(&self) -> &str {
"Dispatches a nested sub-agent — reproduces the recursive engine poll."
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type":"object","properties":{}})
}
fn permission_level(&self) -> PermissionLevel {
PermissionLevel::Execute
}
async fn execute(&self, _args: serde_json::Value) -> anyhow::Result<ToolResult> {
let outcome = run_subagent(&self.inner_def, "inner go", SubagentRunOptions::default())
.await
.map_err(|e| anyhow::anyhow!("nested run_subagent failed: {e}"))?;
Ok(ToolResult::success(outcome.output))
}
}

let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_stack_size(1024 * 1024)
.enable_all()
.build()
.expect("build constrained-stack tokio runtime");

let outcome = runtime.block_on(async {
// Three scripted responses, shared by outer + inner runs
// (providers are Arc-cloned, so both pull from the same queue):
// [0] outer round 1: call `delegate_inner`
// [1] inner round 1: return final text
// [2] outer round 2: return final text using the tool result
let provider = ScriptedProvider::new(vec![
tool_response("delegate_inner", "{}"),
text_response("inner-final"),
text_response("outer-final: inner-final"),
]);

let inner_def = make_def_named_tools(&[]);
let delegate_tool: Box<dyn Tool> = Box::new(RecursiveDelegateTool { inner_def });
let parent = make_parent(
Arc::clone(&(provider.clone() as Arc<dyn Provider>)),
vec![delegate_tool],
);
let outer_def = make_def_named_tools(&["delegate_inner"]);

with_parent_context(parent, async {
run_subagent(&outer_def, "outer go", SubagentRunOptions::default()).await
})
.await
});

let outcome = outcome.expect(
"nested run_subagent must complete on a 1 MiB worker stack — \
a stack overflow here means the recursion boundary in \
`run_typed_mode` regressed (see `Box::pin` callsites around \
`run_inner_loop` and `run_turn_engine`).",
);
assert!(
outcome.output.contains("inner-final"),
"outer should fold the inner sub-agent's result into its final \
answer, got: {}",
outcome.output
);
}
Loading