Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/core/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,27 @@ fn run_server_command(args: &[String]) -> Result<()> {
crate::core::logging::init_for_cli_run(verbose, log_scope);

// Initialize the Tokio multi-threaded runtime.
//
// Worker stack size is bumped from tokio's 2 MiB default to 8 MiB
// because the agent harness's polling future tree
// (`run_turn_engine` -> tool exec -> `dispatch_subagent` ->
// `run_subagent` -> `run_typed_mode` -> `run_inner_loop` -> child
// `run_turn_engine`) generates very large generator state machines
// even with every recursion boundary `Box::pin`'d, and in debug
// builds the compiler constructs each generator on the stack before
// the heap move can elide it. The result was a hard
// `thread 'tokio-rt-worker' has overflowed its stack, fatal runtime
// error: stack overflow, aborting` whenever the orchestrator
// delegated to a sub-agent (see the `chat-harness-subagent`
// Playwright lane crash + identical signature reproduced inside
// `[subagent_runner] dispatching agent_id=researcher ...`). 8 MiB
// is the Linux process default and gives plenty of headroom; the
// memory cost on a desktop with ~12 worker threads is ~72 MiB
// additional address space (commit-on-touch, so the resident set
// grows only as the stack is actually used).
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_stack_size(8 * 1024 * 1024)
.build()?;
rt.block_on(async {
crate::core::jsonrpc::run_server(host.as_deref(), port, socketio_enabled).await
Expand Down
15 changes: 13 additions & 2 deletions src/openhuman/agent/harness/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,18 @@ impl Agent {
};
let mut buf: Vec<ChatMessage> = Vec::new();

let outcome = super::super::engine::run_turn_engine(
// Box-pin the parent agent's engine call so its ~600-line
// generator state lives on the heap. Tools that delegate to
// sub-agents (orchestrator → researcher / personality /
// archetype / skill) recurse back into another
// `run_turn_engine` via `run_subagent`; without the box,
// both engines' state machines pile up on the same tokio
// worker stack and overflow the 2 MiB default. The inner
// boxes inside `run_typed_mode` aren't reached if the
// overflow happens during the parent's poll on the way in
// — verified against the `chat-harness-subagent` Playwright
// lane crash on PR #3151.
let outcome = Box::pin(super::super::engine::run_turn_engine(
provider.as_ref(),
&mut buf,
&mut tool_source,
Expand All @@ -598,7 +609,7 @@ impl Agent {
max_iterations,
None, // the web bridge streams via on_progress deltas, not on_delta
&[],
)
))
.await?;

// Pull the observer's accounting out, then drop it to release the
Expand Down
218 changes: 127 additions & 91 deletions src/openhuman/agent/harness/subagent_runner/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,103 +301,121 @@ pub async fn run_subagent(
task_prompt: &str,
options: SubagentRunOptions,
) -> Result<SubagentRunOutcome, SubagentRunError> {
let parent = current_parent().ok_or(SubagentRunError::NoParentContext)?;
let task_id = options
.task_id
.clone()
.unwrap_or_else(|| format!("sub-{}", uuid::Uuid::new_v4()));
let started = Instant::now();
let current_depth = current_spawn_depth();
let attempted_depth = current_depth.saturating_add(1);
// Unconditionally heap-allocate the entire run_subagent body so
// every caller — `dispatch_subagent`, `delegate_to_personality`,
// `spawn_subagent`, `spawn_parallel_agents`, `spawn_worker_thread`,
// `continue_subagent`, `escalation`, `payload_summarizer`,
// `session/turn.rs` extraction path, `agent_orchestration::ops`, and
// the recursive case from a sub-agent's own tool — doesn't have to
// carry this future's state inline. Tools that delegate run inside
// the parent agent's already-deep `run_turn_engine` poll, so the
// parent's stack would otherwise pile (parent engine state +
// dispatch_subagent state + run_subagent's wrapper state +
// run_typed_mode state + child engine state) onto tokio's 2 MiB
// worker stack and abort with "thread 'tokio-rt-worker' has
// overflowed its stack, fatal runtime error: stack overflow"
// — observed at `[subagent_runner] dispatching agent_id=researcher
// ...` in the `chat-harness-subagent` Playwright lane crash. The
// inner `Box::pin`s around `run_typed_mode` / `run_inner_loop` /
// child `run_turn_engine` further chunk the child's state so a
// single sub-agent run can't blow the stack either.
Box::pin(async move {
let parent = current_parent().ok_or(SubagentRunError::NoParentContext)?;
let task_id = options
.task_id
.clone()
.unwrap_or_else(|| format!("sub-{}", uuid::Uuid::new_v4()));
let started = Instant::now();
let current_depth = current_spawn_depth();
let attempted_depth = current_depth.saturating_add(1);

if attempted_depth > MAX_SPAWN_DEPTH {
tracing::warn!(
if attempted_depth > MAX_SPAWN_DEPTH {
tracing::warn!(
agent_id = %definition.id,
task_id = %task_id,
current_depth,
attempted_depth,
max_depth = MAX_SPAWN_DEPTH,
"[subagent_runner] spawn depth exceeded"
);
return Err(SubagentRunError::SpawnDepthExceeded {
attempted_depth,
max_depth: MAX_SPAWN_DEPTH,
});
}

tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
current_depth,
attempted_depth,
max_depth = MAX_SPAWN_DEPTH,
"[subagent_runner] spawn depth exceeded"
spawn_depth = attempted_depth,
max_spawn_depth = MAX_SPAWN_DEPTH,
prompt_chars = task_prompt.chars().count(),
skill_filter = ?options.skill_filter_override.as_deref().or(definition.skill_filter.as_deref()),
"[subagent_runner] dispatching"
);
return Err(SubagentRunError::SpawnDepthExceeded {
attempted_depth,
max_depth: MAX_SPAWN_DEPTH,
});
}

tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
spawn_depth = attempted_depth,
max_spawn_depth = MAX_SPAWN_DEPTH,
prompt_chars = task_prompt.chars().count(),
skill_filter = ?options.skill_filter_override.as_deref().or(definition.skill_filter.as_deref()),
"[subagent_runner] dispatching"
);

// Install the sub-agent's declared `sandbox_mode` as the active
// task-local for every tool invocation inside this run. Tools that
// want to gate on it (e.g. `composio_execute` rejecting
// Write/Admin slugs under `ReadOnly`) read it via
// `current_sandbox_mode()`; tools that don't care just ignore it.
// Box-pin the inner future so the large `run_typed_mode` state machine
// lives on the heap. Two stacked `task_local::scope` wrappers
// (`with_spawn_depth` + `with_current_sandbox_mode`) plus the deeply
// nested provider/tool loop inside `run_typed_mode` are otherwise large
// enough — under `cargo-llvm-cov` instrumentation in particular — to
// overflow tokio's 2 MiB per-thread test stack. See #2234 CI failure.
let mut outcome = with_spawn_depth(attempted_depth, async {
with_current_sandbox_mode(definition.sandbox_mode, async {
Box::pin(run_typed_mode(
definition,
task_prompt,
&options,
&parent,
&task_id,
))
// Install the sub-agent's declared `sandbox_mode` as the active
// task-local for every tool invocation inside this run. Tools
// that want to gate on it (e.g. `composio_execute` rejecting
// Write/Admin slugs under `ReadOnly`) read it via
// `current_sandbox_mode()`; tools that don't care just ignore
// it. Box-pin the inner future so the large `run_typed_mode`
// state machine lives on the heap (#2234 CI failure under
// `cargo-llvm-cov`).
let mut outcome = with_spawn_depth(attempted_depth, async {
with_current_sandbox_mode(definition.sandbox_mode, async {
Box::pin(run_typed_mode(
definition,
task_prompt,
&options,
&parent,
&task_id,
))
.await
})
.await
})
.await
})
.await?;

// Truncate result to the definition's cap if set.
// Use char-count (not byte-length) to avoid panicking on multi-byte
// UTF-8 sequences at the truncation boundary.
if let Some(cap) = definition.max_result_chars {
let original_chars = outcome.output.chars().count();
if original_chars > cap {
tracing::debug!(
agent_id = %definition.id,
original_chars,
cap,
"[subagent_runner] truncating oversized result to max_result_chars cap"
);
// Find the byte offset of the cap-th character boundary so
// `truncate` never lands mid-codepoint.
let byte_offset = outcome
.output
.char_indices()
.nth(cap)
.map(|(i, _)| i)
.unwrap_or(outcome.output.len());
outcome.output.truncate(byte_offset);
outcome.output.push_str("\n[...truncated]");
.await?;

// Truncate result to the definition's cap if set.
// Use char-count (not byte-length) to avoid panicking on
// multi-byte UTF-8 sequences at the truncation boundary.
if let Some(cap) = definition.max_result_chars {
let original_chars = outcome.output.chars().count();
if original_chars > cap {
tracing::debug!(
agent_id = %definition.id,
original_chars,
cap,
"[subagent_runner] truncating oversized result to max_result_chars cap"
);
// Find the byte offset of the cap-th character boundary
// so `truncate` never lands mid-codepoint.
let byte_offset = outcome
.output
.char_indices()
.nth(cap)
.map(|(i, _)| i)
.unwrap_or(outcome.output.len());
outcome.output.truncate(byte_offset);
outcome.output.push_str("\n[...truncated]");
}
}
}

tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
spawn_depth = attempted_depth,
elapsed_ms = outcome.elapsed.as_millis() as u64,
iterations = outcome.iterations,
output_chars = outcome.output.chars().count(),
"[subagent_runner] completed"
);
tracing::info!(
agent_id = %definition.id,
task_id = %task_id,
spawn_depth = attempted_depth,
elapsed_ms = outcome.elapsed.as_millis() as u64,
iterations = outcome.iterations,
output_chars = outcome.output.chars().count(),
"[subagent_runner] completed"
);

let _ = started; // silence unused-warning if logging is compiled out
Ok(outcome)
let _ = started; // silence unused-warning if logging is compiled out
Ok(outcome)
})
.await
}

// ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -1174,7 +1192,11 @@ async fn run_typed_mode(
// Transcript persistence lives INSIDE the loop (one write per
// provider response), mirroring the main-agent turn loop in
// `session/turn.rs`. No post-loop write needed here.
let (output, iterations, _agg_usage, early_exit_tool) = run_inner_loop(
// Box-pin so `run_inner_loop`'s state machine (which itself wraps
// the engine call below) is heap-allocated independently of
// `run_typed_mode`. Belt-and-braces with the inner engine box at
// the recursion boundary inside `run_inner_loop`.
let (output, iterations, _agg_usage, early_exit_tool) = Box::pin(run_inner_loop(
subagent_provider.as_ref(),
&mut history,
&parent.all_tools,
Expand All @@ -1191,7 +1213,7 @@ async fn run_typed_mode(
handoff_cache.as_deref(),
parent,
definition.iteration_policy == IterationPolicy::Extended,
)
))
.await?;

// Determine status: if the turn engine exited early because of
Expand Down Expand Up @@ -1420,7 +1442,21 @@ async fn run_inner_loop(
};

let parser = super::super::engine::DefaultParser;
let outcome = super::super::engine::run_turn_engine(
// Heap-allocate the child `run_turn_engine` state machine. Sub-agents
// run as nested polls inside the *parent* agent's `run_turn_engine`
// (the orchestrator → tool exec → `dispatch_subagent` → `run_subagent`
// chain), so without the box the parent's tokio worker poll stack
// also has to carry the child engine's ~600-line generator. That
// crosses the 2 MiB tokio worker default and aborts with
// "thread 'tokio-rt-worker' has overflowed its stack" — see the
// `chat-harness-subagent` Playwright lane crash logged here:
// `[subagent_runner] dispatching agent_id=researcher ... → fatal
// runtime error: stack overflow`. Boxing here breaks the stack
// accumulation at the recursion boundary. Smoke-tested in
// `nested_subagent_dispatch_runs_on_a_constrained_worker_stack`;
// the deep end-to-end catcher is the `chat-harness-subagent`
// Playwright spec.
let outcome = Box::pin(super::super::engine::run_turn_engine(
provider,
history,
&mut tool_source,
Expand All @@ -1437,7 +1473,7 @@ async fn run_inner_loop(
max_iterations,
None, // sub-agents don't stream a draft
&["ask_user_clarification"],
)
))
.await?;

Ok((
Expand Down
Loading
Loading