diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 00a2609e65..ecca316c50 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -1120,6 +1120,19 @@ const Conversations = ({ } }; + const handleDecidePlan = async (card: TaskBoardCard, approve: boolean): Promise => { + if (!selectedThreadId) return; + try { + const saved = await threadApi.decidePlan(selectedThreadId, card.id, approve); + if (saved) { + dispatch(setTaskBoardForThread({ threadId: selectedThreadId, board: saved })); + } + } catch (error) { + debug('decidePlan failed: %o', error); + setSendAdvisory(t('conversations.taskKanban.updateFailed')); + } + }; + const handleUpdateTaskCard = async ( card: TaskBoardCard, nextCard: TaskBoardCard @@ -1584,6 +1597,9 @@ const Conversations = ({ onUpdateCard={(card, nextCard) => { void handleUpdateTaskCard(card, nextCard); }} + onDecidePlan={(card, approve) => { + void handleDecidePlan(card, approve); + }} /> )} {visibleMessages.map(msg => ( diff --git a/app/src/pages/conversations/components/TaskKanbanBoard.tsx b/app/src/pages/conversations/components/TaskKanbanBoard.tsx index afb6c7af63..3311c2f492 100644 --- a/app/src/pages/conversations/components/TaskKanbanBoard.tsx +++ b/app/src/pages/conversations/components/TaskKanbanBoard.tsx @@ -24,11 +24,34 @@ const COLUMN_DEFS: ColumnDef[] = [ const STATUS_INDEX = new Map(COLUMN_DEFS.map((column, index) => [column.status, index])); +/** Whether a status owns a kanban column (vs the approval-flow statuses that + * are bucketed into an existing column). */ +function isColumnStatus(status: TaskBoardCardStatus): boolean { + return STATUS_INDEX.has(status); +} + +/** Map a card status to the column it renders under. The approval-flow + * statuses don't get their own columns: pre-execution ones sit in `todo`, + * `rejected` sits with `blocked`. */ +function columnFor(status: TaskBoardCardStatus): TaskBoardCardStatus { + switch (status) { + case 'awaiting_approval': + case 'ready': + return 'todo'; + case 'rejected': + return 'blocked'; + default: + return status; + } +} + interface TaskKanbanBoardProps { board: TaskBoard; disabled?: boolean; onMove?: (card: TaskBoardCard, status: TaskBoardCardStatus) => void; onUpdateCard?: (card: TaskBoardCard, nextCard: TaskBoardCard) => void; + /** Approve/reject a card awaiting plan approval. */ + onDecidePlan?: (card: TaskBoardCard, approve: boolean) => void; } export function TaskKanbanBoard({ @@ -36,6 +59,7 @@ export function TaskKanbanBoard({ disabled = false, onMove, onUpdateCard, + onDecidePlan, }: TaskKanbanBoardProps) { const { t } = useT(); const [selectedCardId, setSelectedCardId] = useState(null); @@ -55,7 +79,7 @@ export function TaskKanbanBoard({ ); for (const card of [...board.cards].sort((a, b) => a.order - b.order)) { - cardsByStatus[card.status]?.push(card); + cardsByStatus[columnFor(card.status)]?.push(card); } const moveCard = (card: TaskBoardCard, direction: -1 | 1) => { @@ -97,7 +121,26 @@ export function TaskKanbanBoard({

{card.title}

- {onMove && ( + {card.status === 'awaiting_approval' && onDecidePlan ? ( +
+ + +
+ ) : onMove && isColumnStatus(card.status) ? (
- )} + ) : null}
{card.assignedAgent && ( diff --git a/app/src/services/api/threadApi.ts b/app/src/services/api/threadApi.ts index 9a9062902d..ec0e2e469d 100644 --- a/app/src/services/api/threadApi.ts +++ b/app/src/services/api/threadApi.ts @@ -161,6 +161,32 @@ export const threadApi = { return data?.taskBoard ?? null; }, + /** + * Approve or reject a task-board card that is awaiting plan approval + * (`openhuman.todos_decide_plan`). Approve → the card becomes runnable + * (`ready`); reject → `rejected`. Returns the updated board (rebuilt from + * the returned todos snapshot) or null. + */ + decidePlan: async ( + threadId: string, + cardId: string, + approve: boolean + ): Promise => { + const response = await callCoreRpc<{ + data?: { threadId?: string | null; cards?: TaskBoardCard[] }; + }>({ + method: 'openhuman.todos_decide_plan', + params: { thread_id: threadId, id: cardId, approve }, + }); + const data = unwrapEnvelope(response); + if (!data?.cards) return null; + return { + threadId: data.threadId ?? threadId, + cards: data.cards, + updatedAt: new Date().toISOString(), + }; + }, + updateLabels: async (threadId: string, labels: string[]): Promise => { const response = await callCoreRpc>({ method: 'openhuman.threads_update_labels', diff --git a/app/src/types/turnState.ts b/app/src/types/turnState.ts index 7d450bbb28..fbe005dcb6 100644 --- a/app/src/types/turnState.ts +++ b/app/src/types/turnState.ts @@ -11,7 +11,14 @@ export type PersistedTurnPhase = 'thinking' | 'tool_use' | 'subagent'; export type PersistedToolStatus = 'running' | 'success' | 'error'; -export type TaskBoardCardStatus = 'todo' | 'in_progress' | 'blocked' | 'done'; +export type TaskBoardCardStatus = + | 'todo' + | 'awaiting_approval' + | 'ready' + | 'in_progress' + | 'blocked' + | 'done' + | 'rejected'; export type TaskApprovalMode = 'required' | 'not_required'; export interface TaskBoardCard { @@ -27,6 +34,10 @@ export interface TaskBoardCard { evidence?: string[]; notes?: string | null; blocker?: string | null; + /** Provider/source identifiers for a card ingested from a task source + * (`{provider, source_id, external_id, url, repo?, urgency}`); absent on + * agent/UI-authored cards. */ + sourceMetadata?: Record | null; order: number; updatedAt: string; } diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index fd7905917e..22c169adf3 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -656,6 +656,10 @@ pub enum DomainEvent { provider: String, error: String, }, + /// A task-board card needs human plan approval before the dispatcher will + /// execute it (emitted when `autonomy.require_task_plan_approval` is on and + /// the dispatcher parks a `todo` card at `awaiting_approval`). + TaskPlanAwaitingApproval { card_id: String, thread_id: String }, } impl DomainEvent { @@ -747,6 +751,8 @@ impl DomainEvent { | Self::TaskSourceTaskIngested { .. } | Self::TaskSourceFetchFailed { .. } => "task_sources", + Self::TaskPlanAwaitingApproval { .. } => "agent", + Self::ApprovalRequested { .. } | Self::ApprovalDecided { .. } => "approval", Self::McpServerInstalled { .. } @@ -837,6 +843,7 @@ impl DomainEvent { Self::TaskSourceFetched { .. } => "TaskSourceFetched", Self::TaskSourceTaskIngested { .. } => "TaskSourceTaskIngested", Self::TaskSourceFetchFailed { .. } => "TaskSourceFetchFailed", + Self::TaskPlanAwaitingApproval { .. } => "TaskPlanAwaitingApproval", } } diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs index a1a04aedbc..67cde6ae57 100644 --- a/src/core/jsonrpc.rs +++ b/src/core/jsonrpc.rs @@ -1846,6 +1846,9 @@ fn register_domain_subscribers( // Task-sources proactive ingestion: connection-created hook + poll. crate::openhuman::task_sources::bus::register_task_sources_subscriber(); crate::openhuman::task_sources::start_periodic_poll(); + // Board poller: dispatch the highest-urgency `todo` card on the + // task-sources board (catch-all for cards without a proactive trigger). + crate::openhuman::agent::task_dispatcher::start_board_poller(); // Seed memory_sources with active Composio connections so the // user sees their connected integrations as memory sources by // default. Best-effort: failure is logged but does not block startup. diff --git a/src/openhuman/agent/mod.rs b/src/openhuman/agent/mod.rs index 68ffe8c1a3..997a85742c 100644 --- a/src/openhuman/agent/mod.rs +++ b/src/openhuman/agent/mod.rs @@ -42,6 +42,7 @@ pub mod prompts; mod schemas; pub mod stop_hooks; pub mod task_board; +pub mod task_dispatcher; pub mod tool_policy; pub mod tree_loader; pub mod triage; diff --git a/src/openhuman/agent/task_board.rs b/src/openhuman/agent/task_board.rs index 095c77d9d6..bad2a721bd 100644 --- a/src/openhuman/agent/task_board.rs +++ b/src/openhuman/agent/task_board.rs @@ -18,18 +18,31 @@ const TASK_BOARD_EXTENSION: &str = "json"; #[serde(rename_all = "snake_case")] pub enum TaskCardStatus { Todo, + /// Plan approval required and pending — the dispatcher parked the card here + /// and emitted `TaskPlanAwaitingApproval`; it will not run until a human + /// approves (→ `Ready`) or rejects (→ `Rejected`). + AwaitingApproval, + /// Approved for execution — the dispatcher runs `Ready` cards without a + /// further approval check (distinguishes "approved" from the initial + /// `Todo`, which the approval gate would otherwise re-park). + Ready, InProgress, Blocked, Done, + /// Plan approval was denied; the card is not executed. + Rejected, } impl TaskCardStatus { pub fn as_str(&self) -> &'static str { match self { Self::Todo => "todo", + Self::AwaitingApproval => "awaiting_approval", + Self::Ready => "ready", Self::InProgress => "in_progress", Self::Blocked => "blocked", Self::Done => "done", + Self::Rejected => "rejected", } } } @@ -74,6 +87,12 @@ pub struct TaskBoardCard { pub notes: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub blocker: Option, + /// Provider/source identifiers for a card ingested from a task source + /// (`{provider, source_id, external_id, url, repo?, urgency}`). Set by + /// the `task_sources` route; consumed downstream for prioritisation and + /// external write-back. `None` for agent/UI-authored cards. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_metadata: Option, #[serde(default)] pub order: u32, #[serde(default)] @@ -420,6 +439,7 @@ mod tests { evidence: vec![" cargo test ".into()], notes: Some(" note ".into()), blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, @@ -436,6 +456,7 @@ mod tests { evidence: Vec::new(), notes: Some("waiting on user".into()), blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, @@ -506,6 +527,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, @@ -522,6 +544,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, diff --git a/src/openhuman/agent/task_dispatcher.rs b/src/openhuman/agent/task_dispatcher.rs new file mode 100644 index 0000000000..45094a02f8 --- /dev/null +++ b/src/openhuman/agent/task_dispatcher.rs @@ -0,0 +1,515 @@ +//! Deterministic task-card dispatcher. +//! +//! Turns a [`TaskBoardCard`] into work: it **claims** the card (flips it to +//! `in_progress`, which `todos::ops::enforce_single_in_progress` makes a +//! per-board mutual-exclusion lock), runs a single **autonomous agent turn** +//! toward the card's objective, and **writes the outcome back** to the board +//! (`done` + evidence on success, `blocked` + reason on failure). +//! +//! This is the one executor both dispatch paths converge on: +//! - the **board poller** (cards that arrived without a proactive trigger), and +//! - the **proactive triage** arm (`agent::triage::apply_decision`), once it has +//! decided to act on a task-board card. +//! +//! The runner mirrors `skills::spawn_skill_run_background`: build the +//! `orchestrator` agent fresh inside a detached task, cap tool iterations, and +//! run `agent.run_single` under `with_autonomous_iter_cap`. PR-4 generalises the +//! executor from the default agent to a resolved personality/skill; this module +//! keeps the default-agent path so the pipeline runs end-to-end first. + +use std::sync::OnceLock; +use std::time::Duration; + +use crate::openhuman::agent::harness::session::Agent; +use crate::openhuman::agent::harness::subagent_runner::with_autonomous_iter_cap; +use crate::openhuman::agent::task_board::{TaskBoardCard, TaskCardStatus}; +use crate::openhuman::config::Config; +use crate::openhuman::todos::ops::{self, BoardLocation, CardPatch}; + +/// Tool-iteration ceiling for an autonomous task run. Matches the skill-run +/// cap — a task brief is the same shape of bounded autonomous work. +const TASK_RUN_MAX_ITERATIONS: usize = 200; + +/// Max chars of the agent's final output retained as board `evidence`. +const EVIDENCE_MAX_CHARS: usize = 2_000; + +/// Render a card into the goal prompt handed to the autonomous run. +/// +/// The card's `content`/title is the display form; the prompt leads with the +/// clean `objective`, then any `plan` steps and `acceptance_criteria`, and a +/// pointer to the originating source so the agent can pull related context from +/// memory via its `memory_recall` tool (the GitHub/Notion/… activity for this +/// item is ingested into the summary tree by the memory-sources domain). +pub fn build_task_prompt(card: &TaskBoardCard) -> String { + let mut lines: Vec = Vec::new(); + + let objective = card + .objective + .as_deref() + .map(str::trim) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| card.title.trim()); + lines.push(format!( + "You are autonomously executing one task to completion. Objective:\n{objective}" + )); + + if !card.plan.is_empty() { + lines.push("\nPlan:".to_string()); + for (i, step) in card.plan.iter().enumerate() { + lines.push(format!("{}. {}", i + 1, step.trim())); + } + } + + if !card.acceptance_criteria.is_empty() { + lines.push("\nAcceptance criteria (the task is done only when all hold):".to_string()); + for c in &card.acceptance_criteria { + lines.push(format!("- {}", c.trim())); + } + } + + if let Some(meta) = &card.source_metadata { + let provider = meta.get("provider").and_then(|v| v.as_str()); + let repo = meta.get("repo").and_then(|v| v.as_str()); + let external_id = meta.get("external_id").and_then(|v| v.as_str()); + let url = meta.get("url").and_then(|v| v.as_str()); + let mut origin = String::new(); + if let Some(p) = provider { + origin.push_str(p); + } + if let Some(r) = repo { + origin.push_str(&format!(" {r}")); + } + if let Some(id) = external_id { + origin.push_str(&format!("#{id}")); + } + if !origin.trim().is_empty() { + lines.push(format!( + "\nThis task originates from {}. Its activity has been ingested into memory — use \ + your memory_recall tool to pull related context (prior discussion, linked items) \ + before and while you work.", + origin.trim() + )); + } + if let Some(u) = url { + lines.push(format!("Source link: {u}")); + } + } + + lines.push( + "\nWork the task to completion. Do not pick up unrelated work. When finished, your final \ + message should summarise what you did and the evidence (commits, PRs, results)." + .to_string(), + ); + + lines.join("\n") +} + +/// Outcome of a dispatch attempt. +#[derive(Debug)] +pub enum DispatchOutcome { + /// The card was claimed and a detached autonomous run was spawned. + Running { run_id: String }, + /// Plan approval is required; the card was parked at `awaiting_approval` + /// and a `TaskPlanAwaitingApproval` event was emitted. No run was spawned. + AwaitingApproval, +} + +/// Dispatch one card: gate on plan approval, claim it, run an autonomous turn, +/// write the result back. +/// +/// Returns `Ok(Running)` once the card is claimed and the detached run is +/// spawned, `Ok(AwaitingApproval)` if the card was parked for human approval, +/// or `Err` *without* spawning if the claim fails — most commonly because +/// another card on the board is already `in_progress` +/// (`enforce_single_in_progress`), the intended per-board throttle; the poller +/// retries next tick. +pub async fn dispatch_card( + location: BoardLocation, + card: TaskBoardCard, +) -> Result { + let card_id = card.id.clone(); + + let config = Config::load_or_init() + .await + .map_err(|e| format!("load config: {e:#}"))?; + + // Plan-approval gate: when required, a `todo` card is parked for human + // approval before it can run. `Ready` cards have already been approved, so + // they bypass the gate; `todo` cards with approval disabled run directly. + if config.autonomy.require_task_plan_approval && card.status == TaskCardStatus::Todo { + ops::update_status(&location, &card_id, TaskCardStatus::AwaitingApproval).map_err(|e| { + format!("[task_dispatcher] park-for-approval failed for {card_id}: {e}") + })?; + if let Some(thread_id) = location.thread_id() { + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::TaskPlanAwaitingApproval { + card_id: card_id.clone(), + thread_id: thread_id.to_string(), + }, + ); + } + tracing::info!(card_id = %card_id, "[task_dispatcher] parked card awaiting plan approval"); + return Ok(DispatchOutcome::AwaitingApproval); + } + + let prompt = build_task_prompt(&card); + + // Claim the card. Todo|Ready→InProgress; `enforce_single_in_progress` + // rejects a second concurrent in-progress card, so a failed claim means + // "something else is running" → skip. + ops::update_status(&location, &card_id, TaskCardStatus::InProgress) + .map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?; + + let run_id = uuid::Uuid::new_v4().to_string(); + tracing::info!( + card_id = %card_id, + run_id = %run_id, + prompt_chars = prompt.chars().count(), + "[task_dispatcher] card claimed (→in_progress), spawning autonomous run" + ); + + let run_id_for_return = run_id.clone(); + let location_for_run = location.clone(); + tokio::spawn(async move { + let outcome = run_autonomous(config, &prompt, &run_id).await; + write_back(&location_for_run, &card_id, &run_id, outcome); + }); + + Ok(DispatchOutcome::Running { + run_id: run_id_for_return, + }) +} + +/// Run the orchestrator agent for a single autonomous turn using the +/// already-loaded config. +async fn run_autonomous(mut config: Config, prompt: &str, run_id: &str) -> Result { + config.agent.max_tool_iterations = TASK_RUN_MAX_ITERATIONS; + // Match skill-run egress handling: only widen to the permissive default + // when the operator hasn't configured an explicit allow-list. + if config.http_request.allowed_domains.is_empty() { + config.http_request.allowed_domains = vec!["*".to_string()]; + } + + let mut agent = Agent::from_config_for_agent(&config, "orchestrator") + .map_err(|e| format!("build agent: {e:#}"))?; + agent.set_event_context(run_id.to_string(), "task"); + agent.set_agent_definition_name(format!( + "orchestrator-task-{}", + run_id.get(..8).unwrap_or(run_id) + )); + + with_autonomous_iter_cap(TASK_RUN_MAX_ITERATIONS, agent.run_single(prompt)) + .await + .map_err(|e| format!("{e:#}")) +} + +/// Deterministic board write-back: the dispatcher owns the card lifecycle. +/// Success → `done` + evidence; failure → `blocked` + blocker reason. An +/// external write failure here is logged, never propagated — the run already +/// happened. +fn write_back( + location: &BoardLocation, + card_id: &str, + run_id: &str, + outcome: Result, +) { + let patch = match &outcome { + Ok(output) => { + tracing::info!( + card_id = %card_id, + run_id = %run_id, + output_chars = output.chars().count(), + "[task_dispatcher] run complete → done" + ); + CardPatch { + status: Some(TaskCardStatus::Done), + evidence: Some(vec![truncate_chars(output.trim(), EVIDENCE_MAX_CHARS)]), + ..Default::default() + } + } + Err(err) => { + tracing::warn!( + card_id = %card_id, + run_id = %run_id, + error = %err, + "[task_dispatcher] run failed → blocked" + ); + CardPatch { + status: Some(TaskCardStatus::Blocked), + blocker: Some(truncate_chars(err, EVIDENCE_MAX_CHARS)), + ..Default::default() + } + } + }; + + if let Err(e) = ops::edit(location, card_id, patch) { + tracing::error!( + card_id = %card_id, + run_id = %run_id, + error = %e, + "[task_dispatcher] board write-back failed (run outcome lost from board)" + ); + } +} + +fn truncate_chars(s: &str, max: usize) -> String { + if s.chars().count() <= max { + return s.to_string(); + } + let mut out: String = s.chars().take(max.saturating_sub(1)).collect(); + out.push('…'); + out +} + +// ── Board poller ────────────────────────────────────────────────────────── + +/// How often the poller wakes to look for a dispatchable card. +const POLLER_TICK_SECONDS: u64 = 60; + +static POLLER_STARTED: OnceLock<()> = OnceLock::new(); + +/// Spawn the board poller. Idempotent — only the first call installs the loop. +/// +/// Each tick it scans the `task-sources` board and dispatches the +/// highest-urgency `todo` card via [`dispatch_card`], gated by background-AI +/// capacity (`scheduler_gate`). This is the catch-all for cards that arrive +/// without a proactive trigger (`TodoOnly` sources, manual cards, or proactive +/// turns the gate skipped). Cards that *did* get a proactive trigger are +/// dispatched by the triage arm; the claim-based lock makes firing both safe. +pub fn start_board_poller() { + if POLLER_STARTED.set(()).is_err() { + tracing::debug!("[task_dispatcher:poller] already running, skipping start"); + return; + } + tokio::spawn(async move { + tracing::info!( + tick_seconds = POLLER_TICK_SECONDS, + "[task_dispatcher:poller] starting" + ); + let mut ticker = tokio::time::interval(Duration::from_secs(POLLER_TICK_SECONDS)); + ticker.tick().await; // skip the immediate fire so startup isn't slammed + loop { + ticker.tick().await; + if let Err(e) = poll_once().await { + tracing::warn!(error = %e, "[task_dispatcher:poller] tick failed (continuing)"); + } + } + }); +} + +/// One poller tick: dispatch the highest-urgency `todo` card on the +/// task-sources board, if any and if capacity allows. `pub(crate)` so tests can +/// drive a tick without the real interval. +pub(crate) async fn poll_once() -> Result<(), String> { + // Gate on background-AI capacity (autonomy / power / pause). Dropping the + // permit immediately is fine: this is a "may background work start now" + // check; the run itself is detached. + let Some(_permit) = crate::openhuman::scheduler_gate::wait_for_capacity().await else { + tracing::debug!("[task_dispatcher:poller] scheduler gate denied capacity; idle tick"); + return Ok(()); + }; + + let config = Config::load_or_init() + .await + .map_err(|e| format!("load config: {e:#}"))?; + if !config.task_sources.enabled { + return Ok(()); + } + + let location = BoardLocation::Thread { + workspace_dir: config.workspace_dir.clone(), + thread_id: crate::openhuman::task_sources::TASK_SOURCES_THREAD_ID.to_string(), + }; + let snapshot = ops::list(&location)?; + + // `enforce_single_in_progress` caps the board at one running card, so if + // one is already in progress there's nothing for this tick to claim. + if snapshot + .cards + .iter() + .any(|c| c.status == TaskCardStatus::InProgress) + { + return Ok(()); + } + + let Some(card) = pick_next_todo(&snapshot.cards) else { + return Ok(()); + }; + + tracing::info!( + card_id = %card.id, + urgency = card_urgency(&card), + "[task_dispatcher:poller] dispatching highest-urgency todo card" + ); + dispatch_card(location, card).await.map(|_| ()) +} + +/// Highest-urgency dispatchable card (`todo` or approved `ready`; urgency from +/// `source_metadata.urgency`, default 0.0; ties broken toward the lower board +/// `order`). Returns a clone. `dispatch_card` then either runs a `ready` card +/// or parks a `todo` one for approval, per the autonomy setting. +fn pick_next_todo(cards: &[TaskBoardCard]) -> Option { + cards + .iter() + .filter(|c| matches!(c.status, TaskCardStatus::Todo | TaskCardStatus::Ready)) + .max_by(|a, b| { + card_urgency(a) + .partial_cmp(&card_urgency(b)) + .unwrap_or(std::cmp::Ordering::Equal) + // On equal urgency, prefer the lower `order` (earlier card): + // reversing the order comparison makes it the "greater" pick. + .then(b.order.cmp(&a.order)) + }) + .cloned() +} + +fn card_urgency(card: &TaskBoardCard) -> f64 { + card.source_metadata + .as_ref() + .and_then(|m| m.get("urgency")) + .and_then(serde_json::Value::as_f64) + .unwrap_or(0.0) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn card(objective: Option<&str>) -> TaskBoardCard { + TaskBoardCard { + id: "task-1".into(), + title: "[GitHub] Fix login bug".into(), + status: TaskCardStatus::Todo, + objective: objective.map(str::to_string), + plan: vec![], + assigned_agent: None, + allowed_tools: vec![], + approval_mode: None, + acceptance_criteria: vec![], + evidence: vec![], + notes: None, + blocker: None, + source_metadata: None, + order: 0, + updated_at: String::new(), + } + } + + #[test] + fn prompt_uses_objective_then_falls_back_to_title() { + let p = build_task_prompt(&card(Some("Fix the login bug"))); + assert!(p.contains("Fix the login bug")); + assert!(!p.contains("[GitHub]")); + + let p2 = build_task_prompt(&card(None)); + assert!(p2.contains("[GitHub] Fix login bug")); + } + + #[test] + fn prompt_includes_plan_and_acceptance_criteria() { + let mut c = card(Some("Do it")); + c.plan = vec!["step one".into(), "step two".into()]; + c.acceptance_criteria = vec!["tests pass".into()]; + let p = build_task_prompt(&c); + assert!(p.contains("Plan:")); + assert!(p.contains("1. step one")); + assert!(p.contains("2. step two")); + assert!(p.contains("Acceptance criteria")); + assert!(p.contains("- tests pass")); + } + + #[test] + fn prompt_points_at_source_and_memory_when_metadata_present() { + let mut c = card(Some("Resolve issue")); + c.source_metadata = Some(json!({ + "provider": "github", + "repo": "octo/repo", + "external_id": "123", + "url": "https://github.com/octo/repo/issues/123", + })); + let p = build_task_prompt(&c); + assert!(p.contains("github octo/repo#123")); + assert!(p.contains("memory_recall")); + assert!(p.contains("https://github.com/octo/repo/issues/123")); + } + + #[test] + fn prompt_omits_source_block_without_metadata() { + let p = build_task_prompt(&card(Some("Do it"))); + assert!(!p.contains("memory_recall")); + } + + #[test] + fn truncate_caps_long_strings() { + let s = "x".repeat(5_000); + let out = truncate_chars(&s, EVIDENCE_MAX_CHARS); + assert!(out.chars().count() <= EVIDENCE_MAX_CHARS); + assert!(out.ends_with('…')); + } + + fn card_with( + id: &str, + status: TaskCardStatus, + urgency: Option, + order: u32, + ) -> TaskBoardCard { + let mut c = card(Some("obj")); + c.id = id.into(); + c.status = status; + c.order = order; + c.source_metadata = urgency.map(|u| json!({ "urgency": u })); + c + } + + #[test] + fn poller_picks_highest_urgency_todo_skipping_other_statuses() { + let cards = vec![ + card_with("a", TaskCardStatus::Todo, Some(0.3), 0), + card_with("b", TaskCardStatus::Done, Some(0.99), 1), + card_with("c", TaskCardStatus::Todo, Some(0.8), 2), + card_with("d", TaskCardStatus::Todo, None, 3), + ]; + let picked = pick_next_todo(&cards).expect("a todo card is available"); + assert_eq!( + picked.id, "c", + "highest-urgency todo wins, done card ignored" + ); + } + + #[test] + fn poller_breaks_urgency_ties_toward_lower_order() { + let cards = vec![ + card_with("late", TaskCardStatus::Todo, Some(0.5), 5), + card_with("early", TaskCardStatus::Todo, Some(0.5), 2), + ]; + assert_eq!(pick_next_todo(&cards).unwrap().id, "early"); + } + + #[test] + fn poller_returns_none_when_no_todo_cards() { + let cards = vec![card_with("a", TaskCardStatus::Done, Some(0.9), 0)]; + assert!(pick_next_todo(&cards).is_none()); + } + + #[test] + fn poller_dispatches_ready_cards_and_skips_approval_states() { + // Approved `ready` cards are dispatchable; `awaiting_approval` and + // `rejected` are not. + let cards = vec![ + card_with("await", TaskCardStatus::AwaitingApproval, Some(0.99), 0), + card_with("rej", TaskCardStatus::Rejected, Some(0.95), 1), + card_with("ready", TaskCardStatus::Ready, Some(0.5), 2), + ]; + assert_eq!(pick_next_todo(&cards).unwrap().id, "ready"); + } + + #[test] + fn poller_prefers_higher_urgency_across_todo_and_ready() { + let cards = vec![ + card_with("ready-low", TaskCardStatus::Ready, Some(0.3), 0), + card_with("todo-high", TaskCardStatus::Todo, Some(0.9), 1), + ]; + assert_eq!(pick_next_todo(&cards).unwrap().id, "todo-high"); + } +} diff --git a/src/openhuman/agent/triage/envelope.rs b/src/openhuman/agent/triage/envelope.rs index 53267a82c6..b67140f2bc 100644 --- a/src/openhuman/agent/triage/envelope.rs +++ b/src/openhuman/agent/triage/envelope.rs @@ -10,6 +10,18 @@ use chrono::{DateTime, Utc}; use serde_json::Value; +use crate::openhuman::todos::ops::BoardLocation; + +/// Links a trigger to the task-board card it concerns, so the triage +/// `apply_decision` arm can hand the card to the deterministic dispatcher +/// (claim + autonomous run + write-back) instead of the one-shot triage +/// sub-agent. `None` for triggers with no board card (composio/webhook/cron). +#[derive(Debug, Clone)] +pub struct TaskCardLink { + pub card_id: String, + pub location: BoardLocation, +} + /// Where the trigger came from, plus source-specific identifiers the /// triage prompt wants to surface (toolkit/trigger slug, cron job id, /// webhook tunnel id, etc.). @@ -84,6 +96,10 @@ pub struct TriggerEnvelope { /// pipeline can report a meaningful `latency_ms` when it /// publishes [`crate::core::event_bus::DomainEvent::TriggerEvaluated`]. pub received_at: DateTime, + + /// Set when this trigger corresponds to a task-board card, so the + /// triage escalation arm routes it through the deterministic dispatcher. + pub card_link: Option, } impl TriggerEnvelope { @@ -118,6 +134,7 @@ impl TriggerEnvelope { display_label: format!("composio/{toolkit}/{trigger}"), payload, received_at: Utc::now(), + card_link: None, } } @@ -136,6 +153,7 @@ impl TriggerEnvelope { display_label: format!("webhook/{method}/{path}"), payload, received_at: Utc::now(), + card_link: None, } } @@ -153,6 +171,7 @@ impl TriggerEnvelope { display_label: format!("cron/{job_name}"), payload: serde_json::json!({ "output": output }), received_at: Utc::now(), + card_link: None, } } @@ -171,8 +190,17 @@ impl TriggerEnvelope { display_label: format!("external/{caller_id}"), payload, received_at: Utc::now(), + card_link: None, } } + + /// Attach a task-board card link so the triage escalation arm dispatches + /// the card deterministically (claim + autonomous run + write-back). + #[must_use] + pub fn with_task_card(mut self, card_id: String, location: BoardLocation) -> Self { + self.card_link = Some(TaskCardLink { card_id, location }); + self + } } #[cfg(test)] diff --git a/src/openhuman/agent/triage/escalation.rs b/src/openhuman/agent/triage/escalation.rs index 2c38d61f47..d8fece1313 100644 --- a/src/openhuman/agent/triage/escalation.rs +++ b/src/openhuman/agent/triage/escalation.rs @@ -27,7 +27,7 @@ use crate::openhuman::agent::Agent; use crate::openhuman::config::Config; use super::decision::TriageAction; -use super::envelope::TriggerEnvelope; +use super::envelope::{TaskCardLink, TriggerEnvelope}; use super::evaluator::TriageRun; use super::events; @@ -85,6 +85,44 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho "[triage::escalation] dispatching sub-agent" ); + // ── Unified task-board path ─────────────────────── + // A trigger linked to a board card is handed to the deterministic + // dispatcher (claim → autonomous run → write-back). The claim + // (todo→in_progress) deduplicates against the board poller, so + // firing both is safe. Non-card triggers (composio/webhook/cron) + // fall through to the one-shot triage sub-agent below. + if let Some(link) = &envelope.card_link { + use crate::openhuman::agent::task_dispatcher::DispatchOutcome; + match dispatch_linked_card(link).await { + Ok(DispatchOutcome::Running { run_id }) => { + tracing::info!( + card_id = %link.card_id, + run_id = %run_id, + "[triage::escalation] task-card dispatched to deterministic runner" + ); + events::publish_escalated(envelope, "task_dispatcher"); + } + Ok(DispatchOutcome::AwaitingApproval) => { + // Parked for plan approval (autonomy gate). Not an + // escalation yet — the approval flow resumes it. + tracing::info!( + card_id = %link.card_id, + "[triage::escalation] task-card parked awaiting plan approval" + ); + } + Err(reason) => { + // A failed claim (another card already in progress, or + // the card vanished) is benign — the poller retries. + tracing::info!( + card_id = %link.card_id, + reason = %reason, + "[triage::escalation] task-card dispatch skipped (claim failed?)" + ); + } + } + return Ok(()); + } + // ── External-effect approval gate (#1339) ───────── // React / Escalate fire a sub-agent that may call // external-effect tools on the user's behalf. Catching @@ -268,6 +306,22 @@ async fn dispatch_target_agent(agent_id: &str, prompt: &str) -> anyhow::Result Result { + let snapshot = crate::openhuman::todos::ops::list(&link.location)?; + let card = snapshot + .cards + .into_iter() + .find(|c| c.id == link.card_id) + .ok_or_else(|| format!("card `{}` not found on board", link.card_id))?; + crate::openhuman::agent::task_dispatcher::dispatch_card(link.location.clone(), card).await +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/openhuman/channels/runtime/startup.rs b/src/openhuman/channels/runtime/startup.rs index 5970b921e2..5570bcce06 100644 --- a/src/openhuman/channels/runtime/startup.rs +++ b/src/openhuman/channels/runtime/startup.rs @@ -69,6 +69,9 @@ pub async fn start_channels(mut config: Config) -> Result<()> { // configured external sources onto the agent's todo board. crate::openhuman::task_sources::bus::register_task_sources_subscriber(); crate::openhuman::task_sources::start_periodic_poll(); + // Board poller: dispatch the highest-urgency `todo` card on the + // task-sources board (catch-all for cards without a proactive trigger). + crate::openhuman::agent::task_dispatcher::start_board_poller(); // Native request handlers. Re-registering is safe (latest wins) so // this is idempotent even if `bootstrap_core_runtime` also runs. // Must happen before `run_message_dispatch_loop` begins, because diff --git a/src/openhuman/notifications/rpc.rs b/src/openhuman/notifications/rpc.rs index 6d124d90c8..61ee4b5de3 100644 --- a/src/openhuman/notifications/rpc.rs +++ b/src/openhuman/notifications/rpc.rs @@ -104,6 +104,7 @@ pub async fn handle_ingest(params: Map) -> Result "raw": req.raw_payload, }), received_at: ingest_started_at, + card_link: None, }; match run_triage(&envelope).await { diff --git a/src/openhuman/task_sources/route.rs b/src/openhuman/task_sources/route.rs index 115def3665..005a7df2a7 100644 --- a/src/openhuman/task_sources/route.rs +++ b/src/openhuman/task_sources/route.rs @@ -19,7 +19,7 @@ use crate::openhuman::todos::ops::{ }; use crate::openhuman::{scheduler_gate, todos}; -use super::types::{EnrichedTask, SourceTarget, TaskSource}; +use super::types::{EnrichedTask, FilterSpec, SourceTarget, TaskSource}; /// Stable thread id whose board collects every ingested task. pub const TASK_SOURCES_THREAD_ID: &str = "task-sources"; @@ -44,7 +44,7 @@ pub async fn route_enriched( Ok(card_id) } SourceTarget::AgentTodoProactive => { - dispatch_triage(source, enriched).await?; + dispatch_triage(config, source, enriched, &card_id).await?; Ok(card_id) } } @@ -112,11 +112,26 @@ fn add_card( Some(notes_parts.join("\n")) }; + // Objective: the bare upstream title (the card `content`/title is the + // `[provider] title` display form; the objective is the clean goal the + // executing agent works toward). + let objective = { + let t = task.title.trim(); + (!t.is_empty()).then(|| t.to_string()) + }; + + // Stamp the source identifiers the downstream dispatcher / write-back + // needs (provider + repo + issue id + url) plus the enrichment urgency + // used for prioritisation. This is the only writer of `source_metadata`. + let source_metadata = build_source_metadata(source, enriched); + let snapshot = todo_add( &location, &content, CardPatch { notes, + objective, + source_metadata: Some(source_metadata), ..Default::default() }, ) @@ -139,10 +154,43 @@ fn add_card( Ok(new_card_id) } +/// Build the card's `source_metadata` from the originating source + task: +/// the provider/repo/issue identifiers a later dispatcher or external +/// write-back needs to address the upstream item, plus the enrichment +/// urgency used to prioritise pickup. Repo is only present for GitHub +/// sources (the other providers don't carry a repo concept). +fn build_source_metadata(source: &TaskSource, enriched: &EnrichedTask) -> serde_json::Value { + let task = &enriched.task; + let mut meta = json!({ + "provider": task.provider, + "source_id": source.id, + "external_id": task.external_id, + "urgency": enriched.urgency, + }); + if let Some(url) = task.url.as_deref().map(str::trim).filter(|s| !s.is_empty()) { + meta["url"] = json!(url); + } + if let FilterSpec::Github { + repo: Some(repo), .. + } = &source.filter + { + let repo = repo.trim(); + if !repo.is_empty() { + meta["repo"] = json!(repo); + } + } + meta +} + /// Dispatch a triage turn for a proactive task, gated by scheduler /// capacity. Card creation already happened; a gated-off or deferred /// turn is non-fatal — the task still sits on the board. -async fn dispatch_triage(source: &TaskSource, enriched: &EnrichedTask) -> Result<(), String> { +async fn dispatch_triage( + config: &Config, + source: &TaskSource, + enriched: &EnrichedTask, + card_id: &str, +) -> Result<(), String> { // Respect background-AI throttling. When the gate denies capacity // (Off / paused), we keep the card but skip the proactive turn. let Some(_permit) = scheduler_gate::wait_for_capacity().await else { @@ -164,11 +212,19 @@ async fn dispatch_triage(source: &TaskSource, enriched: &EnrichedTask) -> Result "sourceId": source.id, }); + // Link the envelope to the board card so triage's escalation arm routes + // it through the deterministic dispatcher (claim → autonomous run → + // write-back) instead of the one-shot triage sub-agent. + let location = BoardLocation::Thread { + workspace_dir: config.workspace_dir.clone(), + thread_id: TASK_SOURCES_THREAD_ID.to_string(), + }; let envelope = TriggerEnvelope::from_external( &format!("task_sources:{}", source.id), "external task ingested", payload, - ); + ) + .with_task_card(card_id.to_string(), location); let outcome = run_triage(&envelope) .await @@ -228,6 +284,9 @@ pub fn board_cards( #[cfg(test)] mod tests { use super::*; + use crate::openhuman::task_sources::types::ProviderSlug; + use crate::openhuman::task_sources::NormalizedTask; + use chrono::Utc; #[test] fn provider_label_titlecases_known_and_unknown() { @@ -236,4 +295,92 @@ mod tests { assert_eq!(provider_label("asana"), "Asana"); assert_eq!(provider_label(""), ""); } + + fn github_source(repo: Option<&str>) -> TaskSource { + TaskSource { + id: "ts-1".into(), + provider: ProviderSlug::Github, + connection_id: None, + name: None, + enabled: true, + filter: FilterSpec::Github { + repo: repo.map(str::to_string), + labels: vec![], + assignee_is_me: true, + state: None, + extra: json!({}), + }, + interval_secs: 1800, + target: SourceTarget::AgentTodoProactive, + max_tasks_per_fetch: 25, + created_at: Utc::now(), + last_fetch_at: None, + last_status: None, + } + } + + fn enriched(external_id: &str, url: Option<&str>, urgency: f32) -> EnrichedTask { + let task = NormalizedTask { + external_id: external_id.into(), + provider: "github".into(), + title: "Fix the bug".into(), + url: url.map(str::to_string), + ..Default::default() + }; + EnrichedTask { + task, + summary: "Fix the bug".into(), + urgency, + linked_people: vec![], + linked_memory_ids: vec![], + agent_prompt: "do it".into(), + enriched_at: Utc::now(), + } + } + + #[test] + fn source_metadata_carries_github_repo_and_identifiers() { + let src = github_source(Some("octo/repo")); + let e = enriched("123", Some("https://github.com/octo/repo/issues/123"), 0.7); + let meta = build_source_metadata(&src, &e); + assert_eq!(meta["provider"], json!("github")); + assert_eq!(meta["source_id"], json!("ts-1")); + assert_eq!(meta["external_id"], json!("123")); + assert_eq!(meta["repo"], json!("octo/repo")); + assert_eq!( + meta["url"], + json!("https://github.com/octo/repo/issues/123") + ); + let urgency = meta["urgency"].as_f64().expect("urgency is a number"); + assert!((urgency - 0.7).abs() < 1e-6, "urgency was {urgency}"); + } + + #[test] + fn source_metadata_omits_absent_repo_and_url() { + let src = github_source(None); + let e = enriched("9", None, 0.4); + let meta = build_source_metadata(&src, &e); + assert!(meta.get("repo").is_none()); + assert!(meta.get("url").is_none()); + assert_eq!(meta["external_id"], json!("9")); + let urgency = meta["urgency"].as_f64().expect("urgency is a number"); + assert!((urgency - 0.4).abs() < 1e-6, "urgency was {urgency}"); + } + + #[test] + fn source_metadata_has_no_repo_for_non_github_provider() { + let mut src = github_source(Some("octo/repo")); + // A non-GitHub filter carries no repo concept. + src.provider = ProviderSlug::Linear; + src.filter = FilterSpec::Linear { + team_id: None, + assignee_is_me: true, + state: None, + extra: json!({}), + }; + let e = enriched("LIN-5", None, 0.5); + let meta = build_source_metadata(&src, &e); + assert!(meta.get("repo").is_none()); + assert_eq!(meta["source_id"], json!("ts-1")); + } } diff --git a/src/openhuman/threads/turn_state/mirror_tests.rs b/src/openhuman/threads/turn_state/mirror_tests.rs index de869b8b34..5dd16cd158 100644 --- a/src/openhuman/threads/turn_state/mirror_tests.rs +++ b/src/openhuman/threads/turn_state/mirror_tests.rs @@ -164,6 +164,7 @@ fn task_board_update_is_stored_and_flushed() { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 0, updated_at: "2026-05-15T00:00:00Z".into(), }], diff --git a/src/openhuman/todos/ops.rs b/src/openhuman/todos/ops.rs index b984a5a438..f4c85c9854 100644 --- a/src/openhuman/todos/ops.rs +++ b/src/openhuman/todos/ops.rs @@ -36,11 +36,14 @@ fn maybe_scratch_lock(location: &BoardLocation) -> Option Result { match raw.trim().to_ascii_lowercase().as_str() { "todo" | "pending" => Ok(TaskCardStatus::Todo), + "awaiting_approval" | "awaiting-approval" => Ok(TaskCardStatus::AwaitingApproval), + "ready" | "approved" => Ok(TaskCardStatus::Ready), "in_progress" | "in-progress" | "inprogress" | "started" => Ok(TaskCardStatus::InProgress), "blocked" => Ok(TaskCardStatus::Blocked), "done" | "completed" | "complete" => Ok(TaskCardStatus::Done), + "rejected" | "denied" => Ok(TaskCardStatus::Rejected), other => Err(format!( - "invalid status '{other}' (expected todo|in_progress|blocked|done)" + "invalid status '{other}' (expected todo|awaiting_approval|ready|in_progress|blocked|done|rejected)" )), } } @@ -68,6 +71,9 @@ pub struct CardPatch { pub evidence: Option>, pub notes: Option, pub blocker: Option, + /// Provider/source identifiers for a task-source-ingested card. `Some` + /// sets the card's `source_metadata`; `None` leaves it untouched. + pub source_metadata: Option, } /// Where to load/save the working set of cards. @@ -158,10 +164,12 @@ pub fn render_markdown(cards: &[TaskBoardCard]) -> String { let mut out = String::new(); for card in cards { let marker = match card.status { - TaskCardStatus::Todo => "[ ]", + TaskCardStatus::Todo | TaskCardStatus::Ready => "[ ]", + TaskCardStatus::AwaitingApproval => "[?]", TaskCardStatus::InProgress => "[~]", TaskCardStatus::Blocked => "[!]", TaskCardStatus::Done => "[x]", + TaskCardStatus::Rejected => "[-]", }; out.push_str("- "); out.push_str(marker); @@ -261,6 +269,7 @@ pub fn add( evidence: patch.evidence.unwrap_or_default(), notes: patch.notes.and_then(non_empty), blocker: patch.blocker.and_then(non_empty), + source_metadata: patch.source_metadata, order: cards.len() as u32, updated_at: Utc::now().to_rfc3339(), }; @@ -322,6 +331,9 @@ pub fn edit(location: &BoardLocation, id: &str, patch: CardPatch) -> Result Result { + let cards = load_cards(location)?; + let current = cards + .iter() + .find(|c| c.id == id) + .ok_or_else(|| format!("todo id '{id}' not found"))?; + if current.status != TaskCardStatus::AwaitingApproval { + return Err(format!( + "card '{id}' is not awaiting approval (status: {})", + current.status.as_str() + )); + } + let new_status = if approve { + TaskCardStatus::Ready + } else { + TaskCardStatus::Rejected + }; + update_status(location, id, new_status) +} + /// Remove a card by id. Errors if `id` is unknown. pub fn remove(location: &BoardLocation, id: &str) -> Result { tracing::debug!( @@ -478,6 +518,13 @@ mod tests { ); assert_eq!(parse_status("blocked").unwrap(), TaskCardStatus::Blocked); assert_eq!(parse_status("done").unwrap(), TaskCardStatus::Done); + assert_eq!( + parse_status("awaiting_approval").unwrap(), + TaskCardStatus::AwaitingApproval + ); + assert_eq!(parse_status("ready").unwrap(), TaskCardStatus::Ready); + assert_eq!(parse_status("approved").unwrap(), TaskCardStatus::Ready); + assert_eq!(parse_status("rejected").unwrap(), TaskCardStatus::Rejected); assert!(parse_status("nope").is_err()); } @@ -533,6 +580,83 @@ mod tests { assert_eq!(snap.cards[0].title, "Refined plan"); } + #[test] + fn source_metadata_round_trips_through_add_and_edit() { + let dir = tempdir().unwrap(); + let loc = thread_loc(dir.path(), "t1"); + let added = add( + &loc, + "ingested task", + CardPatch { + source_metadata: Some(serde_json::json!({ + "provider": "github", + "external_id": "7", + })), + ..Default::default() + }, + ) + .unwrap(); + let id = added.cards[0].id.clone(); + assert_eq!( + added.cards[0].source_metadata.as_ref().unwrap()["external_id"], + serde_json::json!("7") + ); + + // A subsequent edit with `Some(..)` replaces the stamped metadata. + let snap = edit( + &loc, + &id, + CardPatch { + source_metadata: Some(serde_json::json!({ + "provider": "github", + "external_id": "8", + })), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + snap.cards[0].source_metadata.as_ref().unwrap()["external_id"], + serde_json::json!("8") + ); + + // An edit that leaves `source_metadata: None` preserves the value. + let snap2 = edit( + &loc, + &id, + CardPatch { + notes: Some("touch".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + snap2.cards[0].source_metadata.as_ref().unwrap()["external_id"], + serde_json::json!("8") + ); + } + + #[test] + fn decide_plan_approves_and_rejects_only_when_awaiting() { + let dir = tempdir().unwrap(); + let loc = thread_loc(dir.path(), "t1"); + let added = add(&loc, "task", CardPatch::default()).unwrap(); + let id = added.cards[0].id.clone(); + + // A todo card isn't awaiting approval yet → decision rejected. + assert!(decide_plan(&loc, &id, true).is_err()); + + // Park it, then approve → Ready. + update_status(&loc, &id, TaskCardStatus::AwaitingApproval).unwrap(); + let approved = decide_plan(&loc, &id, true).unwrap(); + assert_eq!(approved.cards[0].status, TaskCardStatus::Ready); + + // Re-park, then reject → Rejected. + update_status(&loc, &id, TaskCardStatus::AwaitingApproval).unwrap(); + let rejected = decide_plan(&loc, &id, false).unwrap(); + assert_eq!(rejected.cards[0].status, TaskCardStatus::Rejected); + } + #[test] fn edit_can_clear_approval_mode() { let dir = tempdir().unwrap(); @@ -609,6 +733,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 0, updated_at: String::new(), }, @@ -625,6 +750,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 1, updated_at: String::new(), }, diff --git a/src/openhuman/todos/schemas.rs b/src/openhuman/todos/schemas.rs index 466fd779bd..582d589390 100644 --- a/src/openhuman/todos/schemas.rs +++ b/src/openhuman/todos/schemas.rs @@ -19,6 +19,7 @@ pub fn all_controller_schemas() -> Vec { schemas("add"), schemas("edit"), schemas("update_status"), + schemas("decide_plan"), schemas("remove"), schemas("replace"), schemas("clear"), @@ -43,6 +44,10 @@ pub fn all_registered_controllers() -> Vec { schema: schemas("update_status"), handler: handle_update_status, }, + RegisteredController { + schema: schemas("decide_plan"), + handler: handle_decide_plan, + }, RegisteredController { schema: schemas("remove"), handler: handle_remove, @@ -137,7 +142,27 @@ pub fn schemas(function: &str) -> ControllerSchema { inputs: vec![ thread_id_input(), required_string("id", "Card identifier."), - required_string("status", "New status (todo|in_progress|blocked|done)."), + required_string( + "status", + "New status (todo|awaiting_approval|ready|in_progress|blocked|done|rejected).", + ), + ], + outputs: vec![snapshot_output()], + }, + "decide_plan" => ControllerSchema { + namespace: "todos", + function: "decide_plan", + description: "Approve or reject a card awaiting plan approval \ + (approve → ready/runnable; reject → rejected).", + inputs: vec![ + thread_id_input(), + required_string("id", "Card identifier."), + FieldSchema { + name: "approve", + ty: TypeSchema::Bool, + comment: "true to approve (card becomes runnable), false to reject.", + required: true, + }, ], outputs: vec![snapshot_output()], }, @@ -260,6 +285,13 @@ struct RemoveParams { id: String, } +#[derive(Debug, Deserialize)] +struct DecidePlanParams { + thread_id: String, + id: String, + approve: bool, +} + #[derive(Debug, Deserialize)] struct ReplaceParams { thread_id: String, @@ -291,6 +323,7 @@ fn handle_add(params: Map) -> ControllerFuture { evidence: p.evidence, notes: p.notes, blocker: p.blocker, + source_metadata: None, }; tracing::debug!(thread_id = %p.thread_id, "[rpc][todos] add entry"); snapshot_to_json(ops::add(&loc, &p.content, patch)?) @@ -314,6 +347,7 @@ fn handle_edit(params: Map) -> ControllerFuture { evidence: p.evidence, notes: p.notes, blocker: p.blocker, + source_metadata: None, }; tracing::debug!(thread_id = %p.thread_id, id = %p.id, "[rpc][todos] edit entry"); snapshot_to_json(ops::edit(&loc, &p.id, patch)?) @@ -335,6 +369,20 @@ fn handle_update_status(params: Map) -> ControllerFuture { }) } +fn handle_decide_plan(params: Map) -> ControllerFuture { + Box::pin(async move { + let p = parse::(params)?; + let loc = thread_location(&p.thread_id).await?; + tracing::debug!( + thread_id = %p.thread_id, + id = %p.id, + approve = p.approve, + "[rpc][todos] decide_plan entry" + ); + snapshot_to_json(ops::decide_plan(&loc, &p.id, p.approve)?) + }) +} + fn parse_approval_mode(raw: Option) -> Result, String> { let Some(raw) = raw else { return Ok(None); diff --git a/src/openhuman/tools/impl/agent/todo.rs b/src/openhuman/tools/impl/agent/todo.rs index 8dd17e4903..85fd7332a2 100644 --- a/src/openhuman/tools/impl/agent/todo.rs +++ b/src/openhuman/tools/impl/agent/todo.rs @@ -255,6 +255,7 @@ fn patch_from_args(args: &serde_json::Value) -> anyhow::Result { evidence: optional_string_array(args, "evidence")?, notes: optional_string(args, "notes"), blocker: optional_string(args, "blocker"), + source_metadata: None, }) }