diff --git a/app/src/lib/i18n/chunks/ar-5.ts b/app/src/lib/i18n/chunks/ar-5.ts index 2d19596c62..2b8139c06d 100644 --- a/app/src/lib/i18n/chunks/ar-5.ts +++ b/app/src/lib/i18n/chunks/ar-5.ts @@ -552,6 +552,9 @@ const ar5: TranslationMap = { 'skills.uninstall.confirmTitle': 'إلغاء تثبيت {name}؟', 'conversations.taskKanban.blocked': 'محظور', 'conversations.taskKanban.done': 'مكتمل', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'قيد التنفيذ', 'intelligence.memoryChunk.detail.copiedHint': 'تم النسخ', 'settings.composio.notYetRouted': 'لم يتم توجيهه بعد', diff --git a/app/src/lib/i18n/chunks/bn-5.ts b/app/src/lib/i18n/chunks/bn-5.ts index e8274af96d..7fc7113f7f 100644 --- a/app/src/lib/i18n/chunks/bn-5.ts +++ b/app/src/lib/i18n/chunks/bn-5.ts @@ -559,6 +559,9 @@ const bn5: TranslationMap = { 'skills.uninstall.confirmTitle': '{name} আনইনস্টল করবেন?', 'conversations.taskKanban.blocked': 'ব্লকড', 'conversations.taskKanban.done': 'সম্পন্ন', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'চলমান', 'intelligence.memoryChunk.detail.copiedHint': 'কপি হয়েছে', 'settings.composio.notYetRouted': 'এখনও রুট করা হয়নি', diff --git a/app/src/lib/i18n/chunks/de-5.ts b/app/src/lib/i18n/chunks/de-5.ts index abdc21d622..6e3a48d745 100644 --- a/app/src/lib/i18n/chunks/de-5.ts +++ b/app/src/lib/i18n/chunks/de-5.ts @@ -582,6 +582,9 @@ const de5: TranslationMap = { 'skills.uninstall.confirmTitle': '{name} deinstallieren?', 'conversations.taskKanban.blocked': 'Blockiert', 'conversations.taskKanban.done': 'Fertig', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'In Bearbeitung', 'intelligence.memoryChunk.detail.copiedHint': 'kopiert', 'settings.composio.notYetRouted': 'noch nicht geroutet', diff --git a/app/src/lib/i18n/chunks/en-5.ts b/app/src/lib/i18n/chunks/en-5.ts index 15268b3708..a5d8d37cd4 100644 --- a/app/src/lib/i18n/chunks/en-5.ts +++ b/app/src/lib/i18n/chunks/en-5.ts @@ -604,6 +604,9 @@ const en5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Uninstall {name}?', 'conversations.taskKanban.blocked': 'Blocked', 'conversations.taskKanban.done': 'Done', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'In progress', 'intelligence.memoryChunk.detail.copiedHint': 'copied', 'settings.composio.notYetRouted': 'not yet routed', diff --git a/app/src/lib/i18n/chunks/es-5.ts b/app/src/lib/i18n/chunks/es-5.ts index d02753f15c..cb90055516 100644 --- a/app/src/lib/i18n/chunks/es-5.ts +++ b/app/src/lib/i18n/chunks/es-5.ts @@ -564,6 +564,9 @@ const es5: TranslationMap = { 'skills.uninstall.confirmTitle': '¿Desinstalar {name}?', 'conversations.taskKanban.blocked': 'Bloqueado', 'conversations.taskKanban.done': 'Completado', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'En progreso', 'intelligence.memoryChunk.detail.copiedHint': 'copiado', 'settings.composio.notYetRouted': 'aún sin enrutar', diff --git a/app/src/lib/i18n/chunks/fr-5.ts b/app/src/lib/i18n/chunks/fr-5.ts index b67b21a524..2e4ceea4b3 100644 --- a/app/src/lib/i18n/chunks/fr-5.ts +++ b/app/src/lib/i18n/chunks/fr-5.ts @@ -568,6 +568,9 @@ const fr5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Désinstaller {name} ?', 'conversations.taskKanban.blocked': 'Bloqué', 'conversations.taskKanban.done': 'Terminé', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'En cours', 'intelligence.memoryChunk.detail.copiedHint': 'copié', 'settings.composio.notYetRouted': 'pas encore routé', diff --git a/app/src/lib/i18n/chunks/hi-5.ts b/app/src/lib/i18n/chunks/hi-5.ts index bebb43d2d2..e8dad3dd3e 100644 --- a/app/src/lib/i18n/chunks/hi-5.ts +++ b/app/src/lib/i18n/chunks/hi-5.ts @@ -561,6 +561,9 @@ const hi5: TranslationMap = { 'skills.uninstall.confirmTitle': '{name} अनइंस्टॉल करें?', 'conversations.taskKanban.blocked': 'अवरुद्ध', 'conversations.taskKanban.done': 'पूर्ण', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'प्रगति पर', 'intelligence.memoryChunk.detail.copiedHint': 'कॉपी हो गया', 'settings.composio.notYetRouted': 'अभी तक रूट नहीं हुआ', diff --git a/app/src/lib/i18n/chunks/id-5.ts b/app/src/lib/i18n/chunks/id-5.ts index 645f38e34d..89d9a62879 100644 --- a/app/src/lib/i18n/chunks/id-5.ts +++ b/app/src/lib/i18n/chunks/id-5.ts @@ -561,6 +561,9 @@ const id5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Copot {name}?', 'conversations.taskKanban.blocked': 'Terhambat', 'conversations.taskKanban.done': 'Selesai', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'Sedang berjalan', 'intelligence.memoryChunk.detail.copiedHint': 'disalin', 'settings.composio.notYetRouted': 'belum dirutekan', diff --git a/app/src/lib/i18n/chunks/it-5.ts b/app/src/lib/i18n/chunks/it-5.ts index 771b41688f..308866e62d 100644 --- a/app/src/lib/i18n/chunks/it-5.ts +++ b/app/src/lib/i18n/chunks/it-5.ts @@ -565,6 +565,9 @@ const it5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Disinstallare {name}?', 'conversations.taskKanban.blocked': 'Bloccato', 'conversations.taskKanban.done': 'Fatto', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'In corso', 'intelligence.memoryChunk.detail.copiedHint': 'copiato', 'settings.composio.notYetRouted': 'non ancora instradato', diff --git a/app/src/lib/i18n/chunks/ko-5.ts b/app/src/lib/i18n/chunks/ko-5.ts index 50ed453fb1..3f76c4b135 100644 --- a/app/src/lib/i18n/chunks/ko-5.ts +++ b/app/src/lib/i18n/chunks/ko-5.ts @@ -436,6 +436,9 @@ const ko5: TranslationMap = { 'skills.uninstall.confirmTitle': '{name}을(를) 제거하시겠습니까?', 'conversations.taskKanban.blocked': '차단됨', 'conversations.taskKanban.done': '완료', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': '진행 중', 'intelligence.memoryChunk.detail.copiedHint': '복사됨', 'settings.composio.notYetRouted': '아직 라우팅되지 않음', diff --git a/app/src/lib/i18n/chunks/pl-5.ts b/app/src/lib/i18n/chunks/pl-5.ts index 7c4bbcf006..5485079cde 100644 --- a/app/src/lib/i18n/chunks/pl-5.ts +++ b/app/src/lib/i18n/chunks/pl-5.ts @@ -613,6 +613,9 @@ const pl5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Odinstalować {name}?', 'conversations.taskKanban.blocked': 'Zablokowane', 'conversations.taskKanban.done': 'Zrobione', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'W toku', 'intelligence.memoryChunk.detail.copiedHint': 'skopiowano', 'settings.composio.notYetRouted': 'jeszcze nie trasowane', diff --git a/app/src/lib/i18n/chunks/pt-5.ts b/app/src/lib/i18n/chunks/pt-5.ts index 8c99a157bd..605d63f805 100644 --- a/app/src/lib/i18n/chunks/pt-5.ts +++ b/app/src/lib/i18n/chunks/pt-5.ts @@ -565,6 +565,9 @@ const pt5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Desinstalar {name}?', 'conversations.taskKanban.blocked': 'Bloqueado', 'conversations.taskKanban.done': 'Concluído', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'Em andamento', 'intelligence.memoryChunk.detail.copiedHint': 'copiado', 'settings.composio.notYetRouted': 'ainda não roteado', diff --git a/app/src/lib/i18n/chunks/ru-5.ts b/app/src/lib/i18n/chunks/ru-5.ts index 77a6282285..8ffb4a2c82 100644 --- a/app/src/lib/i18n/chunks/ru-5.ts +++ b/app/src/lib/i18n/chunks/ru-5.ts @@ -561,6 +561,9 @@ const ru5: TranslationMap = { 'skills.uninstall.confirmTitle': 'Удалить {name}?', 'conversations.taskKanban.blocked': 'Заблокировано', 'conversations.taskKanban.done': 'Готово', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'В работе', 'intelligence.memoryChunk.detail.copiedHint': 'скопировано', 'settings.composio.notYetRouted': 'пока не маршрутизируется', diff --git a/app/src/lib/i18n/chunks/zh-CN-5.ts b/app/src/lib/i18n/chunks/zh-CN-5.ts index d91ea3bf15..5786db46c8 100644 --- a/app/src/lib/i18n/chunks/zh-CN-5.ts +++ b/app/src/lib/i18n/chunks/zh-CN-5.ts @@ -533,6 +533,9 @@ const zhCN5: TranslationMap = { 'skills.uninstall.confirmTitle': '卸载 {name}?', 'conversations.taskKanban.blocked': '已阻塞', 'conversations.taskKanban.done': '已完成', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': '进行中', 'intelligence.memoryChunk.detail.copiedHint': '已复制', 'settings.composio.notYetRouted': '尚未路由', diff --git a/app/src/lib/i18n/en.ts b/app/src/lib/i18n/en.ts index b47bf4e483..17e37afe26 100644 --- a/app/src/lib/i18n/en.ts +++ b/app/src/lib/i18n/en.ts @@ -4025,6 +4025,9 @@ const en: TranslationMap = { 'skills.uninstall.confirmTitle': 'Uninstall {name}?', 'conversations.taskKanban.blocked': 'Blocked', 'conversations.taskKanban.done': 'Done', + 'conversations.taskKanban.awaitingApproval': 'Awaiting approval', + 'conversations.taskKanban.ready': 'Ready', + 'conversations.taskKanban.rejected': 'Rejected', 'conversations.taskKanban.inProgress': 'In progress', 'intelligence.memoryChunk.detail.copiedHint': 'copied', 'settings.composio.notYetRouted': 'not yet routed', diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 3b93f3f8e5..9a687559a5 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -85,6 +85,7 @@ import { getComposerBlockedSendFeedback, handleComposerSlashCommand, } from './conversations/composerSendDecision'; +import { runDecidePlan } from './conversations/taskPlanActions'; import { type AgentBubblePosition, buildAcceptedInlineCompletion, @@ -1627,6 +1628,16 @@ const Conversations = ({ onUpdateCard={(card, nextCard) => { void handleUpdateTaskCard(card, nextCard); }} + onDecidePlan={(card, approve) => { + void runDecidePlan({ + threadId: selectedThreadId, + card, + approve, + dispatch, + notify: setSendAdvisory, + t, + }); + }} /> )} {visibleMessages.map(msg => ( diff --git a/app/src/pages/conversations/components/TaskKanbanBoard.test.tsx b/app/src/pages/conversations/components/TaskKanbanBoard.test.tsx new file mode 100644 index 0000000000..d1ba9bd87e --- /dev/null +++ b/app/src/pages/conversations/components/TaskKanbanBoard.test.tsx @@ -0,0 +1,78 @@ +import { fireEvent, render, screen } from '@testing-library/react'; +import { describe, expect, it, vi } from 'vitest'; + +import type { TaskBoard, TaskBoardCard } from '../../../types/turnState'; +import { TaskKanbanBoard } from './TaskKanbanBoard'; + +// Echo i18n keys so we can query by the stable key strings. +vi.mock('../../../lib/i18n/I18nContext', () => ({ useT: () => ({ t: (key: string) => key }) })); + +function card(partial: Partial): TaskBoardCard { + return { + id: 'c1', + title: 'Do thing', + status: 'todo', + order: 0, + updatedAt: '', + ...partial, + } as TaskBoardCard; +} + +function board(cards: TaskBoardCard[]): TaskBoard { + return { threadId: 't1', cards, updatedAt: '' }; +} + +describe('TaskKanbanBoard approval surface', () => { + it('renders Approve/Reject on an awaiting_approval card and calls onDecidePlan', () => { + const onDecidePlan = vi.fn(); + render( + + ); + + fireEvent.click(screen.getByTitle('chat.approval.approve')); + expect(onDecidePlan).toHaveBeenCalledWith(expect.objectContaining({ id: 'a' }), true); + + fireEvent.click(screen.getByTitle('chat.approval.deny')); + expect(onDecidePlan).toHaveBeenCalledWith(expect.objectContaining({ id: 'a' }), false); + }); + + it('buckets ready→todo and rejected→blocked columns so the cards still render', () => { + render( + + ); + + expect(screen.getByText('Ready card')).toBeInTheDocument(); + expect(screen.getByText('Rejected card')).toBeInTheDocument(); + // An approval-flow card without onDecidePlan shows no approve/reject controls. + expect(screen.queryByTitle('chat.approval.approve')).toBeNull(); + }); + + it('edit dialog status select has a matching option for approval-flow statuses', () => { + // Regression: the dialog ` so a + * card whose status is `awaiting_approval`/`ready`/`rejected` renders a + * matching option instead of a controlled-select value with no option (which + * React warns about and which renders as the first option, hiding the real + * status from the user). */ +const STATUS_LABEL_KEYS: Record = { + todo: 'conversations.taskKanban.todo', + awaiting_approval: 'conversations.taskKanban.awaitingApproval', + ready: 'conversations.taskKanban.ready', + in_progress: 'conversations.taskKanban.inProgress', + blocked: 'conversations.taskKanban.blocked', + done: 'conversations.taskKanban.done', + rejected: 'conversations.taskKanban.rejected', +}; + +const ALL_STATUSES = Object.keys(STATUS_LABEL_KEYS) as TaskBoardCardStatus[]; + +/** Whether a status owns a kanban column (vs the approval-flow statuses that + * are bucketed into an existing column). */ +function isColumnStatus(status: TaskBoardCardStatus): boolean { + return STATUS_INDEX.has(status); +} + +/** Map a card status to the column it renders under. The approval-flow + * statuses don't get their own columns: pre-execution ones sit in `todo`, + * `rejected` sits with `blocked`. */ +function columnFor(status: TaskBoardCardStatus): TaskBoardCardStatus { + switch (status) { + case 'awaiting_approval': + case 'ready': + return 'todo'; + case 'rejected': + return 'blocked'; + default: + return status; + } +} + interface TaskKanbanBoardProps { board: TaskBoard; disabled?: boolean; onMove?: (card: TaskBoardCard, status: TaskBoardCardStatus) => void; onUpdateCard?: (card: TaskBoardCard, nextCard: TaskBoardCard) => void; + /** Approve/reject a card awaiting plan approval. */ + onDecidePlan?: (card: TaskBoardCard, approve: boolean) => void; } export function TaskKanbanBoard({ @@ -36,6 +77,7 @@ export function TaskKanbanBoard({ disabled = false, onMove, onUpdateCard, + onDecidePlan, }: TaskKanbanBoardProps) { const { t } = useT(); const [selectedCardId, setSelectedCardId] = useState(null); @@ -55,7 +97,7 @@ export function TaskKanbanBoard({ ); for (const card of [...board.cards].sort((a, b) => a.order - b.order)) { - cardsByStatus[card.status]?.push(card); + cardsByStatus[columnFor(card.status)]?.push(card); } const moveCard = (card: TaskBoardCard, direction: -1 | 1) => { @@ -97,7 +139,26 @@ export function TaskKanbanBoard({

{card.title}

- {onMove && ( + {card.status === 'awaiting_approval' && onDecidePlan ? ( +
+ + +
+ ) : onMove && isColumnStatus(card.status) ? (
- )} + ) : null}
{card.assignedAgent && ( @@ -286,9 +347,9 @@ function TaskBriefDialog({ value={status} onChange={e => setStatus(e.target.value as TaskBoardCardStatus)} className="w-full rounded-md border border-stone-200 bg-white px-2 py-1.5 text-sm text-stone-900 dark:border-neutral-700 dark:bg-neutral-950 dark:text-neutral-50"> - {COLUMN_DEFS.map(column => ( - ))} diff --git a/app/src/pages/conversations/taskPlanActions.test.ts b/app/src/pages/conversations/taskPlanActions.test.ts new file mode 100644 index 0000000000..00021ff19e --- /dev/null +++ b/app/src/pages/conversations/taskPlanActions.test.ts @@ -0,0 +1,65 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +import type { TaskBoardCard } from '../../types/turnState'; +import { runDecidePlan } from './taskPlanActions'; + +const mockDecidePlan = vi.fn(); +vi.mock('../../services/api/threadApi', () => ({ + threadApi: { decidePlan: (...args: unknown[]) => mockDecidePlan(...args) }, +})); + +function card(): TaskBoardCard { + return { id: 'c1', title: 'T', status: 'awaiting_approval', order: 0, updatedAt: '' }; +} + +const t = (key: string) => key; + +describe('runDecidePlan', () => { + beforeEach(() => mockDecidePlan.mockReset()); + + it('is a no-op when there is no active thread', async () => { + const dispatch = vi.fn(); + const notify = vi.fn(); + + await runDecidePlan({ threadId: null, card: card(), approve: true, dispatch, notify, t }); + + expect(mockDecidePlan).not.toHaveBeenCalled(); + expect(dispatch).not.toHaveBeenCalled(); + expect(notify).not.toHaveBeenCalled(); + }); + + it('dispatches the refreshed board on success', async () => { + mockDecidePlan.mockResolvedValueOnce({ threadId: 't1', cards: [], updatedAt: '' }); + const dispatch = vi.fn(); + const notify = vi.fn(); + + await runDecidePlan({ threadId: 't1', card: card(), approve: true, dispatch, notify, t }); + + expect(mockDecidePlan).toHaveBeenCalledWith('t1', 'c1', true); + expect(dispatch).toHaveBeenCalledTimes(1); + expect(notify).not.toHaveBeenCalled(); + }); + + it('does not dispatch when the RPC returns null', async () => { + mockDecidePlan.mockResolvedValueOnce(null); + const dispatch = vi.fn(); + const notify = vi.fn(); + + await runDecidePlan({ threadId: 't1', card: card(), approve: false, dispatch, notify, t }); + + expect(mockDecidePlan).toHaveBeenCalledWith('t1', 'c1', false); + expect(dispatch).not.toHaveBeenCalled(); + expect(notify).not.toHaveBeenCalled(); + }); + + it('notifies (without throwing or dispatching) on RPC failure', async () => { + mockDecidePlan.mockRejectedValueOnce(new Error('boom')); + const dispatch = vi.fn(); + const notify = vi.fn(); + + await runDecidePlan({ threadId: 't1', card: card(), approve: true, dispatch, notify, t }); + + expect(notify).toHaveBeenCalledWith('conversations.taskKanban.updateFailed'); + expect(dispatch).not.toHaveBeenCalled(); + }); +}); diff --git a/app/src/pages/conversations/taskPlanActions.ts b/app/src/pages/conversations/taskPlanActions.ts new file mode 100644 index 0000000000..ac0bfe8609 --- /dev/null +++ b/app/src/pages/conversations/taskPlanActions.ts @@ -0,0 +1,51 @@ +/** + * Task-plan approval action, extracted from `Conversations` so the whole + * guard → decide → persist → refresh → error-advisory flow is unit-testable + * without mounting the conversations page. + */ +import debugFactory from 'debug'; + +import { threadApi } from '../../services/api/threadApi'; +import { setTaskBoardForThread } from '../../store/chatRuntimeSlice'; +import type { TaskBoardCard } from '../../types/turnState'; + +const debug = debugFactory('conversations:taskPlan'); + +export interface RunDecidePlanArgs { + /** Active thread; a no-op when null (nothing to decide against). */ + threadId: string | null; + card: TaskBoardCard; + approve: boolean; + /** Redux dispatch (kept loosely typed so callers don't need the store type). */ + dispatch: (action: unknown) => void; + /** Surface a user-facing advisory on failure. */ + notify: (message: string) => void; + /** Translator for the advisory message. */ + t: (key: string) => string; +} + +/** + * Approve or reject a card's plan via the core RPC, then optimistically refresh + * the thread's board from the returned snapshot. A failure is logged and + * surfaced via `notify` (never thrown) so a failed decision degrades + * gracefully. No-op when `threadId` is null. + */ +export async function runDecidePlan({ + threadId, + card, + approve, + dispatch, + notify, + t, +}: RunDecidePlanArgs): Promise { + if (!threadId) return; + try { + const saved = await threadApi.decidePlan(threadId, card.id, approve); + if (saved) { + dispatch(setTaskBoardForThread({ threadId, board: saved })); + } + } catch (error) { + debug('decidePlan failed: %o', error); + notify(t('conversations.taskKanban.updateFailed')); + } +} diff --git a/app/src/services/api/threadApi.test.ts b/app/src/services/api/threadApi.test.ts index 87d2c1c32e..b7e7219c9f 100644 --- a/app/src/services/api/threadApi.test.ts +++ b/app/src/services/api/threadApi.test.ts @@ -140,4 +140,33 @@ describe('threadApi', () => { }); expect(result).toEqual(thread); }); + + it('approves a plan via the todos_decide_plan RPC and rebuilds the board', async () => { + mockCallCoreRpc.mockResolvedValueOnce({ + data: { threadId: 'thread-1', cards: [{ id: 'card-1', title: 'T', status: 'ready' }] }, + }); + + const { threadApi } = await import('./threadApi'); + const board = await threadApi.decidePlan('thread-1', 'card-1', true); + + expect(mockCallCoreRpc).toHaveBeenCalledWith({ + method: 'openhuman.todos_decide_plan', + params: { thread_id: 'thread-1', id: 'card-1', approve: true }, + }); + expect(board?.threadId).toBe('thread-1'); + expect(board?.cards[0].status).toBe('ready'); + }); + + it('returns null from decidePlan when the snapshot has no cards', async () => { + mockCallCoreRpc.mockResolvedValueOnce({ data: {} }); + + const { threadApi } = await import('./threadApi'); + const board = await threadApi.decidePlan('thread-1', 'card-1', false); + + expect(mockCallCoreRpc).toHaveBeenCalledWith({ + method: 'openhuman.todos_decide_plan', + params: { thread_id: 'thread-1', id: 'card-1', approve: false }, + }); + expect(board).toBeNull(); + }); }); diff --git a/app/src/services/api/threadApi.ts b/app/src/services/api/threadApi.ts index 9a9062902d..ec0e2e469d 100644 --- a/app/src/services/api/threadApi.ts +++ b/app/src/services/api/threadApi.ts @@ -161,6 +161,32 @@ export const threadApi = { return data?.taskBoard ?? null; }, + /** + * Approve or reject a task-board card that is awaiting plan approval + * (`openhuman.todos_decide_plan`). Approve → the card becomes runnable + * (`ready`); reject → `rejected`. Returns the updated board (rebuilt from + * the returned todos snapshot) or null. + */ + decidePlan: async ( + threadId: string, + cardId: string, + approve: boolean + ): Promise => { + const response = await callCoreRpc<{ + data?: { threadId?: string | null; cards?: TaskBoardCard[] }; + }>({ + method: 'openhuman.todos_decide_plan', + params: { thread_id: threadId, id: cardId, approve }, + }); + const data = unwrapEnvelope(response); + if (!data?.cards) return null; + return { + threadId: data.threadId ?? threadId, + cards: data.cards, + updatedAt: new Date().toISOString(), + }; + }, + updateLabels: async (threadId: string, labels: string[]): Promise => { const response = await callCoreRpc>({ method: 'openhuman.threads_update_labels', diff --git a/app/src/types/turnState.ts b/app/src/types/turnState.ts index 7d450bbb28..fbe005dcb6 100644 --- a/app/src/types/turnState.ts +++ b/app/src/types/turnState.ts @@ -11,7 +11,14 @@ export type PersistedTurnPhase = 'thinking' | 'tool_use' | 'subagent'; export type PersistedToolStatus = 'running' | 'success' | 'error'; -export type TaskBoardCardStatus = 'todo' | 'in_progress' | 'blocked' | 'done'; +export type TaskBoardCardStatus = + | 'todo' + | 'awaiting_approval' + | 'ready' + | 'in_progress' + | 'blocked' + | 'done' + | 'rejected'; export type TaskApprovalMode = 'required' | 'not_required'; export interface TaskBoardCard { @@ -27,6 +34,10 @@ export interface TaskBoardCard { evidence?: string[]; notes?: string | null; blocker?: string | null; + /** Provider/source identifiers for a card ingested from a task source + * (`{provider, source_id, external_id, url, repo?, urgency}`); absent on + * agent/UI-authored cards. */ + sourceMetadata?: Record | null; order: number; updatedAt: string; } diff --git a/app/src/utils/tauriCommands/taskSources.ts b/app/src/utils/tauriCommands/taskSources.ts index 8e7d13515d..9364fc7bf6 100644 --- a/app/src/utils/tauriCommands/taskSources.ts +++ b/app/src/utils/tauriCommands/taskSources.ts @@ -56,6 +56,9 @@ export interface TaskSource { intervalSecs: number; target: TaskSourceTarget; maxTasksPerFetch: number; + /** Static executor routing (G7): personality/skill/agent handle every card + * from this source is pre-assigned to. */ + assignedExecutor?: string; createdAt: string; lastFetchAt?: string; lastStatus?: string; @@ -100,6 +103,8 @@ export interface TaskSourcePatch { target?: TaskSourceTarget; maxTasksPerFetch?: number; connectionId?: string; + /** Executor routing (G7): personality/skill/agent handle to pre-assign. */ + assignedExecutor?: string; } export interface TaskSourceAddParams { @@ -110,6 +115,7 @@ export interface TaskSourceAddParams { interval_secs?: number; target?: TaskSourceTarget; max_tasks_per_fetch?: number; + assigned_executor?: string; } function ensureTauri(): void { diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index fd7905917e..8fab7df50a 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -656,6 +656,18 @@ pub enum DomainEvent { provider: String, error: String, }, + /// A task-board card needs human plan approval before the dispatcher will + /// execute it (emitted when `autonomy.require_task_plan_approval` is on and + /// the dispatcher parks a `todo` card at `awaiting_approval`). + /// + /// Surfacing: the parked card is persisted with status `awaiting_approval`, + /// so the kanban board renders it with inline Approve/Reject on the next + /// board fetch/refresh — that is the current (poll-based) surface and the + /// reason this telemetry event has no dedicated subscriber yet. A realtime + /// socket bridge (à la `ApprovalRequested` → `approval_request`) is a + /// deliberate follow-up; emitting the event now lets that bridge attach + /// without a schema change. + TaskPlanAwaitingApproval { card_id: String, thread_id: String }, } impl DomainEvent { @@ -747,6 +759,8 @@ impl DomainEvent { | Self::TaskSourceTaskIngested { .. } | Self::TaskSourceFetchFailed { .. } => "task_sources", + Self::TaskPlanAwaitingApproval { .. } => "agent", + Self::ApprovalRequested { .. } | Self::ApprovalDecided { .. } => "approval", Self::McpServerInstalled { .. } @@ -837,6 +851,7 @@ impl DomainEvent { Self::TaskSourceFetched { .. } => "TaskSourceFetched", Self::TaskSourceTaskIngested { .. } => "TaskSourceTaskIngested", Self::TaskSourceFetchFailed { .. } => "TaskSourceFetchFailed", + Self::TaskPlanAwaitingApproval { .. } => "TaskPlanAwaitingApproval", } } diff --git a/src/core/jsonrpc.rs b/src/core/jsonrpc.rs index a1a04aedbc..67cde6ae57 100644 --- a/src/core/jsonrpc.rs +++ b/src/core/jsonrpc.rs @@ -1846,6 +1846,9 @@ fn register_domain_subscribers( // Task-sources proactive ingestion: connection-created hook + poll. crate::openhuman::task_sources::bus::register_task_sources_subscriber(); crate::openhuman::task_sources::start_periodic_poll(); + // Board poller: dispatch the highest-urgency `todo` card on the + // task-sources board (catch-all for cards without a proactive trigger). + crate::openhuman::agent::task_dispatcher::start_board_poller(); // Seed memory_sources with active Composio connections so the // user sees their connected integrations as memory sources by // default. Best-effort: failure is logged but does not block startup. diff --git a/src/openhuman/agent/mod.rs b/src/openhuman/agent/mod.rs index bbdf6733a8..9575efcef9 100644 --- a/src/openhuman/agent/mod.rs +++ b/src/openhuman/agent/mod.rs @@ -42,6 +42,7 @@ pub mod prompts; mod schemas; pub mod stop_hooks; pub mod task_board; +pub mod task_dispatcher; pub mod tool_policy; pub mod tools; pub mod tree_loader; diff --git a/src/openhuman/agent/task_board.rs b/src/openhuman/agent/task_board.rs index 095c77d9d6..bad2a721bd 100644 --- a/src/openhuman/agent/task_board.rs +++ b/src/openhuman/agent/task_board.rs @@ -18,18 +18,31 @@ const TASK_BOARD_EXTENSION: &str = "json"; #[serde(rename_all = "snake_case")] pub enum TaskCardStatus { Todo, + /// Plan approval required and pending — the dispatcher parked the card here + /// and emitted `TaskPlanAwaitingApproval`; it will not run until a human + /// approves (→ `Ready`) or rejects (→ `Rejected`). + AwaitingApproval, + /// Approved for execution — the dispatcher runs `Ready` cards without a + /// further approval check (distinguishes "approved" from the initial + /// `Todo`, which the approval gate would otherwise re-park). + Ready, InProgress, Blocked, Done, + /// Plan approval was denied; the card is not executed. + Rejected, } impl TaskCardStatus { pub fn as_str(&self) -> &'static str { match self { Self::Todo => "todo", + Self::AwaitingApproval => "awaiting_approval", + Self::Ready => "ready", Self::InProgress => "in_progress", Self::Blocked => "blocked", Self::Done => "done", + Self::Rejected => "rejected", } } } @@ -74,6 +87,12 @@ pub struct TaskBoardCard { pub notes: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub blocker: Option, + /// Provider/source identifiers for a card ingested from a task source + /// (`{provider, source_id, external_id, url, repo?, urgency}`). Set by + /// the `task_sources` route; consumed downstream for prioritisation and + /// external write-back. `None` for agent/UI-authored cards. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_metadata: Option, #[serde(default)] pub order: u32, #[serde(default)] @@ -420,6 +439,7 @@ mod tests { evidence: vec![" cargo test ".into()], notes: Some(" note ".into()), blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, @@ -436,6 +456,7 @@ mod tests { evidence: Vec::new(), notes: Some("waiting on user".into()), blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, @@ -506,6 +527,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, @@ -522,6 +544,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 99, updated_at: String::new(), }, diff --git a/src/openhuman/agent/task_dispatcher.rs b/src/openhuman/agent/task_dispatcher.rs new file mode 100644 index 0000000000..fd8e6a27df --- /dev/null +++ b/src/openhuman/agent/task_dispatcher.rs @@ -0,0 +1,814 @@ +//! Deterministic task-card dispatcher. +//! +//! Turns a [`TaskBoardCard`] into work: it **claims** the card via a +//! compare-and-set (re-load the board and transition only a `Todo`/`Ready` +//! card to `in_progress`, so a stale/concurrent re-dispatch of the same card +//! is rejected), runs a single **autonomous agent turn** toward the card's +//! objective, and **writes the outcome back** to the board (`done` + evidence +//! on success, `blocked` + reason on failure). +//! +//! This is the one executor both dispatch paths converge on: +//! - the **board poller** (cards that arrived without a proactive trigger), and +//! - the **proactive triage** arm (`agent::triage::apply_decision`), once it has +//! decided to act on a task-board card. +//! +//! The runner mirrors `skills::spawn_skill_run_background`: build the +//! `orchestrator` agent fresh inside a detached task, cap tool iterations, and +//! run `agent.run_single` under `with_autonomous_iter_cap`. PR-4 generalises the +//! executor from the default agent to a resolved personality/skill; this module +//! keeps the default-agent path so the pipeline runs end-to-end first. + +use std::path::Path; +use std::sync::OnceLock; +use std::time::Duration; + +use crate::openhuman::agent::harness::definition::{AgentDefinitionRegistry, PromptSource}; +use crate::openhuman::agent::harness::session::Agent; +use crate::openhuman::agent::harness::subagent_runner::with_autonomous_iter_cap; +use crate::openhuman::agent::personality_paths::PersonalityContext; +use crate::openhuman::agent::task_board::{TaskBoardCard, TaskCardStatus}; +use crate::openhuman::config::Config; +use crate::openhuman::todos::ops::{self, BoardLocation, CardPatch}; + +/// Max chars of a personality SOUL.md / MEMORY.md or skill guideline block +/// folded into the agent's system-prompt suffix. +const EXECUTOR_PREAMBLE_MAX_CHARS: usize = 800; + +/// Tool-iteration ceiling for an autonomous task run. Matches the skill-run +/// cap — a task brief is the same shape of bounded autonomous work. +const TASK_RUN_MAX_ITERATIONS: usize = 200; + +/// Max chars of the agent's final output retained as board `evidence`. +const EVIDENCE_MAX_CHARS: usize = 2_000; + +/// Render a card into the goal prompt handed to the autonomous run. +/// +/// The card's `content`/title is the display form; the prompt leads with the +/// clean `objective`, then any `plan` steps and `acceptance_criteria`, and a +/// pointer to the originating source so the agent can pull related context from +/// memory via its `memory_recall` tool (the GitHub/Notion/… activity for this +/// item is ingested into the summary tree by the memory-sources domain). +pub fn build_task_prompt(card: &TaskBoardCard) -> String { + let mut lines: Vec = Vec::new(); + + let objective = card + .objective + .as_deref() + .map(str::trim) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| card.title.trim()); + lines.push(format!( + "You are autonomously executing one task to completion. Objective:\n{objective}" + )); + + if !card.plan.is_empty() { + lines.push("\nPlan:".to_string()); + for (i, step) in card.plan.iter().enumerate() { + lines.push(format!("{}. {}", i + 1, step.trim())); + } + } + + if !card.acceptance_criteria.is_empty() { + lines.push("\nAcceptance criteria (the task is done only when all hold):".to_string()); + for c in &card.acceptance_criteria { + lines.push(format!("- {}", c.trim())); + } + } + + if let Some(meta) = &card.source_metadata { + let provider = meta.get("provider").and_then(|v| v.as_str()); + let repo = meta.get("repo").and_then(|v| v.as_str()); + let external_id = meta.get("external_id").and_then(|v| v.as_str()); + let url = meta.get("url").and_then(|v| v.as_str()); + let mut origin = String::new(); + if let Some(p) = provider { + origin.push_str(p); + } + if let Some(r) = repo { + origin.push_str(&format!(" {r}")); + } + if let Some(id) = external_id { + origin.push_str(&format!("#{id}")); + } + // Gate on a known provider so the origin string is always meaningful + // (an id-only card would render "#123" with a leading space). + if provider.is_some() { + lines.push(format!( + "\nThis task originates from {}. Its activity has been ingested into memory — use \ + your memory_recall tool to pull related context (prior discussion, linked items) \ + before and while you work.", + origin.trim() + )); + } + if let Some(u) = url { + lines.push(format!("Source link: {u}")); + } + // G9b — agent-driven external write-back. When the upstream item is + // addressable (provider + id), instruct the agent to close the loop on + // the source itself via its integration tools. Runs under the + // connection's existing write scope (no extra approval gate); if it + // can't, it reports that instead of failing. + if provider.is_some() && external_id.is_some() { + lines.push(format!( + "\nWhen the task is complete, record the outcome on the upstream source ({}): use \ + your integration tools to add a comment summarising the resolution and, if the \ + work fully addresses it, close/resolve the item. If you lack the permission or \ + connection to do so, say so in your final summary instead of guessing.", + origin.trim() + )); + } + } + + lines.push( + "\nWork the task to completion. Do not pick up unrelated work. When finished, your final \ + message should summarise what you did and the evidence (commits, PRs, results)." + .to_string(), + ); + + lines.join("\n") +} + +/// Outcome of a dispatch attempt. +#[derive(Debug)] +pub enum DispatchOutcome { + /// The card was claimed and a detached autonomous run was spawned. + Running { run_id: String }, + /// Plan approval is required; the card was parked at `awaiting_approval` + /// and a `TaskPlanAwaitingApproval` event was emitted. No run was spawned. + AwaitingApproval, +} + +/// Dispatch one card: gate on plan approval, claim it, run an autonomous turn, +/// write the result back. +/// +/// Returns `Ok(Running)` once the card is claimed and the detached run is +/// spawned, `Ok(AwaitingApproval)` if the card was parked for human approval, +/// or `Err` *without* spawning when the card is no longer claimable — its +/// freshly-loaded status isn't `Todo`/`Ready` (already running/done, or another +/// dispatcher won the claim). Benign: the poller retries next tick. +pub async fn dispatch_card( + location: BoardLocation, + card: TaskBoardCard, +) -> Result { + let card_id = card.id.clone(); + + let config = Config::load_or_init() + .await + .map_err(|e| format!("load config: {e:#}"))?; + + // Claim CAS: re-load the card's CURRENT status before acting. The passed-in + // `card` may be stale — the poller and the proactive triage arm can target + // the same card, and a re-triggered card may already be running or done. + // Only `Todo`/`Ready` are claimable. This is what actually dedupes a + // double-dispatch: `enforce_single_in_progress` alone does NOT, because + // re-flipping an already-`InProgress` card to `InProgress` keeps the count + // at one (→ `Ok`). Thread boards aren't lock-serialised, so a narrow TOCTOU + // window between this read and the write remains; a fully race-free claim + // needs a per-thread-locked compare-and-set `ops` primitive (follow-up). + // Re-load the full current card (not just its status): the passed-in `card` + // can be stale (edited between selection and claim), so the run must use the + // fresh objective / plan / assigned_agent, not the snapshot the caller held. + let fresh_card = ops::list(&location) + .map_err(|e| format!("[task_dispatcher] reload before claim failed for {card_id}: {e}"))? + .cards + .into_iter() + .find(|c| c.id == card_id) + .ok_or_else(|| format!("[task_dispatcher] card {card_id} not found on board; skipping"))?; + if !matches!( + fresh_card.status, + TaskCardStatus::Todo | TaskCardStatus::Ready + ) { + return Err(format!( + "[task_dispatcher] card {card_id} not claimable (status: {}); skipping", + fresh_card.status.as_str() + )); + } + + // Plan-approval gate: when required, a `todo` card is parked for human + // approval before it can run. `Ready` (already approved) bypasses. + if config.autonomy.require_task_plan_approval && fresh_card.status == TaskCardStatus::Todo { + ops::update_status(&location, &card_id, TaskCardStatus::AwaitingApproval).map_err(|e| { + format!("[task_dispatcher] park-for-approval failed for {card_id}: {e}") + })?; + if let Some(thread_id) = location.thread_id() { + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::TaskPlanAwaitingApproval { + card_id: card_id.clone(), + thread_id: thread_id.to_string(), + }, + ); + } + tracing::info!(card_id = %card_id, "[task_dispatcher] parked card awaiting plan approval"); + return Ok(DispatchOutcome::AwaitingApproval); + } + + let prompt = build_task_prompt(&fresh_card); + + // Claim: Todo|Ready→InProgress. + ops::update_status(&location, &card_id, TaskCardStatus::InProgress) + .map_err(|e| format!("[task_dispatcher] claim failed for card {card_id}: {e}"))?; + + let run_id = uuid::Uuid::new_v4().to_string(); + + // Resolve which executor runs this card: default agent, a personality, or + // a skill — one autonomous-run interface, three presets (G4 + G3). + let executor = resolve_executor(&config.workspace_dir, fresh_card.assigned_agent.as_deref()); + tracing::info!( + card_id = %card_id, + run_id = %run_id, + executor = %executor.label, + agent_id = %executor.agent_id, + prompt_chars = prompt.chars().count(), + "[task_dispatcher] card claimed (→in_progress), spawning autonomous run" + ); + + let run_id_for_return = run_id.clone(); + let location_for_run = location.clone(); + tokio::spawn(async move { + let outcome = run_autonomous(config, &executor, &prompt, &run_id).await; + write_back(&location_for_run, &card_id, &run_id, outcome); + }); + + Ok(DispatchOutcome::Running { + run_id: run_id_for_return, + }) +} + +/// A resolved executor: which built-in agent definition to build, an optional +/// system-prompt suffix carrying a personality identity or skill guidelines, +/// and a label for logs/telemetry. +#[derive(Debug, Clone, PartialEq)] +struct ResolvedExecutor { + agent_id: String, + prompt_suffix: Option, + label: String, +} + +impl ResolvedExecutor { + fn default_agent() -> Self { + Self { + agent_id: "orchestrator".to_string(), + prompt_suffix: None, + label: "default".to_string(), + } + } +} + +/// Map a card's `assigned_agent` handle to one of three executor presets: +/// **personality** (scoped SOUL/MEMORY folded into the prompt suffix, run as +/// that profile's agent), **skill** (orchestrator seeded with the skill's +/// `SKILL.md` guidelines), or **built-in agent**. An unset or unresolved handle +/// degrades to the default `orchestrator` — "use the personality if valid, +/// otherwise the default agent." +fn resolve_executor(workspace_dir: &Path, assigned: Option<&str>) -> ResolvedExecutor { + let Some(handle) = assigned.map(str::trim).filter(|s| !s.is_empty()) else { + return ResolvedExecutor::default_agent(); + }; + if handle == "orchestrator" { + return ResolvedExecutor::default_agent(); + } + + // 1) Personality (#2895): a user-defined profile with scoped identity. + if let Ok(state) = crate::openhuman::agent::profiles::load_profiles(workspace_dir) { + if let Some(profile) = state.profiles.iter().find(|p| p.id == handle) { + let ctx = PersonalityContext::from_profile(workspace_dir, profile.clone()); + let mut preamble = format!( + "You are acting as the personality `{}` (\"{}\"). {}", + profile.id, profile.name, profile.description + ); + if let Some(soul) = &ctx.soul_md_override { + preamble.push_str("\n\n[Personality SOUL.md]\n"); + preamble.push_str(&truncate_chars(soul, EXECUTOR_PREAMBLE_MAX_CHARS)); + } + if let Some(mem) = &ctx.memory_md_override { + preamble.push_str("\n\n[Personality MEMORY.md]\n"); + preamble.push_str(&truncate_chars(mem, EXECUTOR_PREAMBLE_MAX_CHARS)); + } + return ResolvedExecutor { + agent_id: profile.agent_id.clone(), + prompt_suffix: Some(preamble), + label: format!("personality:{handle}"), + }; + } + } + + // 2) Skill (#2824): the same autonomous run, seeded with SKILL.md. + if let Some(skill) = crate::openhuman::skills::registry::get_skill(workspace_dir, handle) { + let guidelines = match &skill.definition.system_prompt { + PromptSource::Inline(s) => truncate_chars(s, EXECUTOR_PREAMBLE_MAX_CHARS), + _ => String::new(), + }; + let suffix = format!( + "You are executing this task as the skill `{handle}`. Follow these skill \ + guidelines exactly:\n\n{guidelines}" + ); + return ResolvedExecutor { + agent_id: "orchestrator".to_string(), + prompt_suffix: Some(suffix), + label: format!("skill:{handle}"), + }; + } + + // 3) Built-in agent definition. + if AgentDefinitionRegistry::global() + .and_then(|r| r.get(handle)) + .is_some() + { + return ResolvedExecutor { + agent_id: handle.to_string(), + prompt_suffix: None, + label: format!("agent:{handle}"), + }; + } + + // 4) Unresolved → degrade to the default agent (don't fail the card). + tracing::warn!( + handle = %handle, + "[task_dispatcher] assigned executor did not resolve to a personality/skill/agent; \ + using default orchestrator" + ); + ResolvedExecutor { + label: "default-fallback".to_string(), + ..ResolvedExecutor::default_agent() + } +} + +/// Run the resolved executor as a single autonomous turn using the +/// already-loaded config. The executor's prompt suffix (personality identity or +/// skill guidelines) rides in the system prompt; the card goal is the turn input. +/// +/// SECURITY / threat model (prompt injection): the card objective/content and +/// `source_metadata` derive from external, attacker-influenceable text (e.g. a +/// GitHub issue body anyone in a watched repo can file), and this background +/// run is gate-free at the per-tool level (background turns auto-allow, like +/// skill runs) while `build_task_prompt` may instruct it to write back to the +/// upstream item. The interactive checkpoint is therefore the up-front +/// **plan-approval gate** (`require_task_plan_approval`), which a human reviews +/// before the run starts — not per-action egress/write approval. Egress is +/// widened to `*` only when the operator set no explicit allow-list (matching +/// skill runs, since real task work needs broad reach: git, package registries, +/// provider APIs). Tightening egress to the source provider's domains for +/// source-ingested runs is a considered follow-up (it would break general task +/// work, so it needs to key off provenance) — tracked for a later PR. +async fn run_autonomous( + mut config: Config, + executor: &ResolvedExecutor, + prompt: &str, + run_id: &str, +) -> Result { + config.agent.max_tool_iterations = TASK_RUN_MAX_ITERATIONS; + // Match skill-run egress handling: only widen to the permissive default + // when the operator hasn't configured an explicit allow-list. See the + // threat-model note above on why `*` is the default here. + if config.http_request.allowed_domains.is_empty() { + config.http_request.allowed_domains = vec!["*".to_string()]; + } + + let mut agent = Agent::from_config_for_agent_with_profile( + &config, + &executor.agent_id, + None, + executor.prompt_suffix.clone(), + ) + .map_err(|e| format!("build agent: {e:#}"))?; + agent.set_event_context(run_id.to_string(), "task"); + agent.set_agent_definition_name(format!( + "task-{}-{}", + executor.label, + run_id.get(..8).unwrap_or(run_id) + )); + + with_autonomous_iter_cap(TASK_RUN_MAX_ITERATIONS, agent.run_single(prompt)) + .await + .map_err(|e| format!("{e:#}")) +} + +/// Deterministic board write-back: the dispatcher owns the card lifecycle. +/// Success → `done` + evidence; failure → `blocked` + blocker reason. An +/// external write failure here is logged, never propagated — the run already +/// happened. +fn write_back( + location: &BoardLocation, + card_id: &str, + run_id: &str, + outcome: Result, +) { + let patch = match &outcome { + Ok(output) => { + tracing::info!( + card_id = %card_id, + run_id = %run_id, + output_chars = output.chars().count(), + "[task_dispatcher] run complete → done" + ); + CardPatch { + status: Some(TaskCardStatus::Done), + evidence: Some(vec![truncate_chars(output.trim(), EVIDENCE_MAX_CHARS)]), + ..Default::default() + } + } + Err(err) => { + tracing::warn!( + card_id = %card_id, + run_id = %run_id, + error = %err, + "[task_dispatcher] run failed → blocked" + ); + CardPatch { + status: Some(TaskCardStatus::Blocked), + blocker: Some(truncate_chars(err, EVIDENCE_MAX_CHARS)), + ..Default::default() + } + } + }; + + if let Err(e) = ops::edit(location, card_id, patch) { + tracing::error!( + card_id = %card_id, + run_id = %run_id, + error = %e, + "[task_dispatcher] board write-back failed (run outcome lost from board)" + ); + } +} + +fn truncate_chars(s: &str, max: usize) -> String { + if s.chars().count() <= max { + return s.to_string(); + } + let mut out: String = s.chars().take(max.saturating_sub(1)).collect(); + out.push('…'); + out +} + +// ── Board poller ────────────────────────────────────────────────────────── + +/// How often the poller wakes to look for a dispatchable card. +const POLLER_TICK_SECONDS: u64 = 60; + +static POLLER_STARTED: OnceLock<()> = OnceLock::new(); + +/// Spawn the board poller. Idempotent — only the first call installs the loop. +/// +/// Each tick it scans the `task-sources` board and dispatches the +/// highest-urgency `todo` card via [`dispatch_card`], gated by background-AI +/// capacity (`scheduler_gate`). This is the catch-all for cards that arrive +/// without a proactive trigger (`TodoOnly` sources, manual cards, or proactive +/// turns the gate skipped). Cards that *did* get a proactive trigger are +/// dispatched by the triage arm; the claim-based lock makes firing both safe. +pub fn start_board_poller() { + if POLLER_STARTED.set(()).is_err() { + tracing::debug!("[task_dispatcher:poller] already running, skipping start"); + return; + } + tokio::spawn(async move { + tracing::info!( + tick_seconds = POLLER_TICK_SECONDS, + "[task_dispatcher:poller] starting" + ); + let mut ticker = tokio::time::interval(Duration::from_secs(POLLER_TICK_SECONDS)); + ticker.tick().await; // skip the immediate fire so startup isn't slammed + loop { + ticker.tick().await; + if let Err(e) = poll_once().await { + tracing::warn!(error = %e, "[task_dispatcher:poller] tick failed (continuing)"); + } + } + }); +} + +/// One poller tick: dispatch the highest-urgency `todo` card on the +/// task-sources board, if any and if capacity allows. `pub(crate)` so tests can +/// drive a tick without the real interval. +pub(crate) async fn poll_once() -> Result<(), String> { + // Gate on background-AI capacity (autonomy / power / pause). Dropping the + // permit immediately is fine: this is a "may background work start now" + // check; the run itself is detached. + let Some(_permit) = crate::openhuman::scheduler_gate::wait_for_capacity().await else { + tracing::debug!("[task_dispatcher:poller] scheduler gate denied capacity; idle tick"); + return Ok(()); + }; + + let config = Config::load_or_init() + .await + .map_err(|e| format!("load config: {e:#}"))?; + if !config.task_sources.enabled { + return Ok(()); + } + + let location = BoardLocation::Thread { + workspace_dir: config.workspace_dir.clone(), + thread_id: crate::openhuman::task_sources::TASK_SOURCES_THREAD_ID.to_string(), + }; + let snapshot = ops::list(&location)?; + + // `enforce_single_in_progress` caps the board at one running card, so if + // one is already in progress there's nothing for this tick to claim. + if snapshot + .cards + .iter() + .any(|c| c.status == TaskCardStatus::InProgress) + { + return Ok(()); + } + + let Some(card) = pick_next_todo(&snapshot.cards) else { + return Ok(()); + }; + + tracing::info!( + card_id = %card.id, + urgency = card_urgency(&card), + "[task_dispatcher:poller] dispatching highest-urgency todo card" + ); + dispatch_card(location, card).await.map(|_| ()) +} + +/// Highest-urgency dispatchable card (`todo` or approved `ready`; urgency from +/// `source_metadata.urgency`, default 0.0; ties broken toward the lower board +/// `order`). Returns a clone. `dispatch_card` then either runs a `ready` card +/// or parks a `todo` one for approval, per the autonomy setting. +fn pick_next_todo(cards: &[TaskBoardCard]) -> Option { + cards + .iter() + .filter(|c| matches!(c.status, TaskCardStatus::Todo | TaskCardStatus::Ready)) + .max_by(|a, b| { + card_urgency(a) + .partial_cmp(&card_urgency(b)) + .unwrap_or(std::cmp::Ordering::Equal) + // On equal urgency, prefer the lower `order` (earlier card): + // reversing the order comparison makes it the "greater" pick. + .then(b.order.cmp(&a.order)) + }) + .cloned() +} + +fn card_urgency(card: &TaskBoardCard) -> f64 { + card.source_metadata + .as_ref() + .and_then(|m| m.get("urgency")) + .and_then(serde_json::Value::as_f64) + .unwrap_or(0.0) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn card(objective: Option<&str>) -> TaskBoardCard { + TaskBoardCard { + id: "task-1".into(), + title: "[GitHub] Fix login bug".into(), + status: TaskCardStatus::Todo, + objective: objective.map(str::to_string), + plan: vec![], + assigned_agent: None, + allowed_tools: vec![], + approval_mode: None, + acceptance_criteria: vec![], + evidence: vec![], + notes: None, + blocker: None, + source_metadata: None, + order: 0, + updated_at: String::new(), + } + } + + #[test] + fn prompt_uses_objective_then_falls_back_to_title() { + let p = build_task_prompt(&card(Some("Fix the login bug"))); + assert!(p.contains("Fix the login bug")); + assert!(!p.contains("[GitHub]")); + + let p2 = build_task_prompt(&card(None)); + assert!(p2.contains("[GitHub] Fix login bug")); + } + + #[test] + fn prompt_includes_plan_and_acceptance_criteria() { + let mut c = card(Some("Do it")); + c.plan = vec!["step one".into(), "step two".into()]; + c.acceptance_criteria = vec!["tests pass".into()]; + let p = build_task_prompt(&c); + assert!(p.contains("Plan:")); + assert!(p.contains("1. step one")); + assert!(p.contains("2. step two")); + assert!(p.contains("Acceptance criteria")); + assert!(p.contains("- tests pass")); + } + + #[test] + fn prompt_points_at_source_and_memory_when_metadata_present() { + let mut c = card(Some("Resolve issue")); + c.source_metadata = Some(json!({ + "provider": "github", + "repo": "octo/repo", + "external_id": "123", + "url": "https://github.com/octo/repo/issues/123", + })); + let p = build_task_prompt(&c); + assert!(p.contains("github octo/repo#123")); + assert!(p.contains("memory_recall")); + assert!(p.contains("https://github.com/octo/repo/issues/123")); + } + + #[test] + fn prompt_omits_source_block_without_metadata() { + let p = build_task_prompt(&card(Some("Do it"))); + assert!(!p.contains("memory_recall")); + assert!(!p.contains("record the outcome on the upstream source")); + } + + #[test] + fn prompt_includes_external_writeback_when_addressable() { + let mut c = card(Some("Resolve issue")); + c.source_metadata = Some(json!({ + "provider": "github", + "repo": "octo/repo", + "external_id": "123", + })); + let p = build_task_prompt(&c); + assert!(p.contains("record the outcome on the upstream source")); + assert!(p.contains("close/resolve the item")); + } + + #[test] + fn prompt_omits_writeback_when_not_addressable() { + // Urgency-only metadata (no provider/external_id) can't address an + // upstream item, so no write-back instruction. + let mut c = card(Some("Do it")); + c.source_metadata = Some(json!({ "urgency": 0.5 })); + let p = build_task_prompt(&c); + assert!(!p.contains("record the outcome on the upstream source")); + } + + #[test] + fn truncate_caps_long_strings() { + let s = "x".repeat(5_000); + let out = truncate_chars(&s, EVIDENCE_MAX_CHARS); + assert!(out.chars().count() <= EVIDENCE_MAX_CHARS); + assert!(out.ends_with('…')); + } + + fn card_with( + id: &str, + status: TaskCardStatus, + urgency: Option, + order: u32, + ) -> TaskBoardCard { + let mut c = card(Some("obj")); + c.id = id.into(); + c.status = status; + c.order = order; + c.source_metadata = urgency.map(|u| json!({ "urgency": u })); + c + } + + #[test] + fn poller_picks_highest_urgency_todo_skipping_other_statuses() { + let cards = vec![ + card_with("a", TaskCardStatus::Todo, Some(0.3), 0), + card_with("b", TaskCardStatus::Done, Some(0.99), 1), + card_with("c", TaskCardStatus::Todo, Some(0.8), 2), + card_with("d", TaskCardStatus::Todo, None, 3), + ]; + let picked = pick_next_todo(&cards).expect("a todo card is available"); + assert_eq!( + picked.id, "c", + "highest-urgency todo wins, done card ignored" + ); + } + + #[test] + fn poller_breaks_urgency_ties_toward_lower_order() { + let cards = vec![ + card_with("late", TaskCardStatus::Todo, Some(0.5), 5), + card_with("early", TaskCardStatus::Todo, Some(0.5), 2), + ]; + assert_eq!(pick_next_todo(&cards).unwrap().id, "early"); + } + + #[test] + fn poller_returns_none_when_no_todo_cards() { + let cards = vec![card_with("a", TaskCardStatus::Done, Some(0.9), 0)]; + assert!(pick_next_todo(&cards).is_none()); + } + + #[test] + fn poller_dispatches_ready_cards_and_skips_approval_states() { + // Approved `ready` cards are dispatchable; `awaiting_approval` and + // `rejected` are not. + let cards = vec![ + card_with("await", TaskCardStatus::AwaitingApproval, Some(0.99), 0), + card_with("rej", TaskCardStatus::Rejected, Some(0.95), 1), + card_with("ready", TaskCardStatus::Ready, Some(0.5), 2), + ]; + assert_eq!(pick_next_todo(&cards).unwrap().id, "ready"); + } + + #[test] + fn poller_prefers_higher_urgency_across_todo_and_ready() { + let cards = vec![ + card_with("ready-low", TaskCardStatus::Ready, Some(0.3), 0), + card_with("todo-high", TaskCardStatus::Todo, Some(0.9), 1), + ]; + assert_eq!(pick_next_todo(&cards).unwrap().id, "todo-high"); + } + + #[test] + fn resolver_defaults_to_orchestrator_for_unset_or_orchestrator_handle() { + let dir = tempfile::tempdir().unwrap(); + for handle in [None, Some(""), Some(" "), Some("orchestrator")] { + let r = resolve_executor(dir.path(), handle); + assert_eq!(r.agent_id, "orchestrator"); + assert_eq!(r.label, "default"); + assert!(r.prompt_suffix.is_none()); + } + } + + #[test] + fn resolver_uses_personality_branch_for_builtin_profile() { + // `load_profiles` returns built-in profiles for any empty workspace, so + // the personality branch is reachable with no fixture file. "research" + // is a built-in profile backed by the "researcher" agent. + let dir = tempfile::tempdir().unwrap(); + let r = resolve_executor(dir.path(), Some("research")); + assert_eq!(r.label, "personality:research"); + assert_eq!(r.agent_id, "researcher"); + let suffix = r.prompt_suffix.expect("personality preamble present"); + assert!(suffix.contains("acting as the personality `research`")); + } + + #[test] + fn resolver_degrades_to_default_for_unresolved_handle() { + let dir = tempfile::tempdir().unwrap(); + let r = resolve_executor(dir.path(), Some("no-such-executor-xyz")); + assert_eq!(r.agent_id, "orchestrator"); + assert_eq!(r.label, "default-fallback"); + assert!(r.prompt_suffix.is_none()); + } + + fn board_loc(dir: &std::path::Path) -> BoardLocation { + BoardLocation::Thread { + workspace_dir: dir.to_path_buf(), + thread_id: "t1".to_string(), + } + } + + #[test] + fn write_back_marks_done_with_evidence_on_success() { + let dir = tempfile::tempdir().unwrap(); + let loc = board_loc(dir.path()); + let id = ops::add(&loc, "do the thing", CardPatch::default()) + .unwrap() + .cards[0] + .id + .clone(); + ops::update_status(&loc, &id, TaskCardStatus::InProgress).unwrap(); + + write_back( + &loc, + &id, + "run-1", + Ok("completed: opened PR #5".to_string()), + ); + + let card = ops::list(&loc) + .unwrap() + .cards + .into_iter() + .find(|c| c.id == id) + .unwrap(); + assert_eq!(card.status, TaskCardStatus::Done); + assert!(card.evidence.iter().any(|e| e.contains("opened PR #5"))); + } + + #[test] + fn write_back_marks_blocked_with_reason_on_failure() { + let dir = tempfile::tempdir().unwrap(); + let loc = board_loc(dir.path()); + let id = ops::add(&loc, "do the thing", CardPatch::default()) + .unwrap() + .cards[0] + .id + .clone(); + ops::update_status(&loc, &id, TaskCardStatus::InProgress).unwrap(); + + write_back(&loc, &id, "run-1", Err("agent build failed".to_string())); + + let card = ops::list(&loc) + .unwrap() + .cards + .into_iter() + .find(|c| c.id == id) + .unwrap(); + assert_eq!(card.status, TaskCardStatus::Blocked); + assert!(card + .blocker + .as_deref() + .unwrap_or_default() + .contains("agent build failed")); + } +} diff --git a/src/openhuman/agent/tools/todo.rs b/src/openhuman/agent/tools/todo.rs index 8dd17e4903..85fd7332a2 100644 --- a/src/openhuman/agent/tools/todo.rs +++ b/src/openhuman/agent/tools/todo.rs @@ -255,6 +255,7 @@ fn patch_from_args(args: &serde_json::Value) -> anyhow::Result { evidence: optional_string_array(args, "evidence")?, notes: optional_string(args, "notes"), blocker: optional_string(args, "blocker"), + source_metadata: None, }) } diff --git a/src/openhuman/agent/triage/envelope.rs b/src/openhuman/agent/triage/envelope.rs index 53267a82c6..aa80e0382b 100644 --- a/src/openhuman/agent/triage/envelope.rs +++ b/src/openhuman/agent/triage/envelope.rs @@ -10,6 +10,18 @@ use chrono::{DateTime, Utc}; use serde_json::Value; +use crate::openhuman::todos::ops::BoardLocation; + +/// Links a trigger to the task-board card it concerns, so the triage +/// `apply_decision` arm can hand the card to the deterministic dispatcher +/// (claim + autonomous run + write-back) instead of the one-shot triage +/// sub-agent. `None` for triggers with no board card (composio/webhook/cron). +#[derive(Debug, Clone)] +pub struct TaskCardLink { + pub card_id: String, + pub location: BoardLocation, +} + /// Where the trigger came from, plus source-specific identifiers the /// triage prompt wants to surface (toolkit/trigger slug, cron job id, /// webhook tunnel id, etc.). @@ -84,6 +96,10 @@ pub struct TriggerEnvelope { /// pipeline can report a meaningful `latency_ms` when it /// publishes [`crate::core::event_bus::DomainEvent::TriggerEvaluated`]. pub received_at: DateTime, + + /// Set when this trigger corresponds to a task-board card, so the + /// triage escalation arm routes it through the deterministic dispatcher. + pub card_link: Option, } impl TriggerEnvelope { @@ -118,6 +134,7 @@ impl TriggerEnvelope { display_label: format!("composio/{toolkit}/{trigger}"), payload, received_at: Utc::now(), + card_link: None, } } @@ -136,6 +153,7 @@ impl TriggerEnvelope { display_label: format!("webhook/{method}/{path}"), payload, received_at: Utc::now(), + card_link: None, } } @@ -153,6 +171,7 @@ impl TriggerEnvelope { display_label: format!("cron/{job_name}"), payload: serde_json::json!({ "output": output }), received_at: Utc::now(), + card_link: None, } } @@ -171,8 +190,17 @@ impl TriggerEnvelope { display_label: format!("external/{caller_id}"), payload, received_at: Utc::now(), + card_link: None, } } + + /// Attach a task-board card link so the triage escalation arm dispatches + /// the card deterministically (claim + autonomous run + write-back). + #[must_use] + pub fn with_task_card(mut self, card_id: String, location: BoardLocation) -> Self { + self.card_link = Some(TaskCardLink { card_id, location }); + self + } } #[cfg(test)] @@ -202,6 +230,28 @@ mod tests { assert_eq!(env.payload["from"], "a@b.com"); } + #[test] + fn with_task_card_attaches_card_link() { + let env = TriggerEnvelope::from_external( + "task_sources:ts-1", + "external task ingested", + json!({}), + ); + assert!(env.card_link.is_none(), "no link by default"); + + let location = BoardLocation::Thread { + workspace_dir: std::path::PathBuf::from("/tmp/ws"), + thread_id: "task-sources".to_string(), + }; + let linked = env.with_task_card("card-1".to_string(), location); + let link = linked.card_link.expect("card_link attached"); + assert_eq!(link.card_id, "card-1"); + match link.location { + BoardLocation::Thread { thread_id, .. } => assert_eq!(thread_id, "task-sources"), + _ => panic!("expected Thread board location"), + } + } + #[test] fn composio_envelope_falls_back_to_metadata_id_when_uuid_missing() { let env = TriggerEnvelope::from_composio( diff --git a/src/openhuman/agent/triage/escalation.rs b/src/openhuman/agent/triage/escalation.rs index 2c38d61f47..419558fe4a 100644 --- a/src/openhuman/agent/triage/escalation.rs +++ b/src/openhuman/agent/triage/escalation.rs @@ -27,7 +27,7 @@ use crate::openhuman::agent::Agent; use crate::openhuman::config::Config; use super::decision::TriageAction; -use super::envelope::TriggerEnvelope; +use super::envelope::{TaskCardLink, TriggerEnvelope}; use super::evaluator::TriageRun; use super::events; @@ -57,6 +57,13 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho reason = %run.decision.reason, "[triage::escalation] DROP — no downstream work" ); + // A dropped trigger that carries a board card (proactive + // task-source ingest) must be terminally gated, or the board + // poller — which dispatches any `Todo`/`Ready` card regardless of + // the triage verdict — would re-run it on the next tick, silently + // breaking the noise-gating contract documented on + // `SourceTarget::AgentTodoProactive`. + gate_linked_card_terminal(envelope, "drop"); } TriageAction::Acknowledge => { tracing::info!( @@ -65,6 +72,9 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho reason = %run.decision.reason, "[triage::escalation] ACKNOWLEDGE — logged (memory-write is a future addition)" ); + // Acknowledge means "seen, no autonomous action needed" — same as + // drop, the linked card must not be picked up by the board poller. + gate_linked_card_terminal(envelope, "acknowledge"); } TriageAction::React | TriageAction::Escalate => { let target = run @@ -85,6 +95,44 @@ pub async fn apply_decision(run: TriageRun, envelope: &TriggerEnvelope) -> anyho "[triage::escalation] dispatching sub-agent" ); + // ── Unified task-board path ─────────────────────── + // A trigger linked to a board card is handed to the deterministic + // dispatcher (claim → autonomous run → write-back). The claim + // (todo→in_progress) deduplicates against the board poller, so + // firing both is safe. Non-card triggers (composio/webhook/cron) + // fall through to the one-shot triage sub-agent below. + if let Some(link) = &envelope.card_link { + use crate::openhuman::agent::task_dispatcher::DispatchOutcome; + match dispatch_linked_card(link).await { + Ok(DispatchOutcome::Running { run_id }) => { + tracing::info!( + card_id = %link.card_id, + run_id = %run_id, + "[triage::escalation] task-card dispatched to deterministic runner" + ); + events::publish_escalated(envelope, "task_dispatcher"); + } + Ok(DispatchOutcome::AwaitingApproval) => { + // Parked for plan approval (autonomy gate). Not an + // escalation yet — the approval flow resumes it. + tracing::info!( + card_id = %link.card_id, + "[triage::escalation] task-card parked awaiting plan approval" + ); + } + Err(reason) => { + // A failed claim (another card already in progress, or + // the card vanished) is benign — the poller retries. + tracing::info!( + card_id = %link.card_id, + reason = %reason, + "[triage::escalation] task-card dispatch skipped (claim failed?)" + ); + } + } + return Ok(()); + } + // ── External-effect approval gate (#1339) ───────── // React / Escalate fire a sub-agent that may call // external-effect tools on the user's behalf. Catching @@ -268,6 +316,77 @@ async fn dispatch_target_agent(agent_id: &str, prompt: &str) -> anyhow::Result Result { + let snapshot = crate::openhuman::todos::ops::list(&link.location)?; + let card = snapshot + .cards + .into_iter() + .find(|c| c.id == link.card_id) + .ok_or_else(|| format!("card `{}` not found on board", link.card_id))?; + crate::openhuman::agent::task_dispatcher::dispatch_card(link.location.clone(), card).await +} + +/// Terminally gate a card-linked trigger that triage decided to `drop` / +/// `acknowledge`, so the board poller (which dispatches any pending +/// `Todo`/`Ready` card) won't re-run it. Only a still-pending card is gated; +/// if it already advanced (the poller claimed it, or it's already terminal) +/// it is left untouched. Best-effort: a missing card or write failure is +/// logged, never propagated — the trigger was already evaluated. +fn gate_linked_card_terminal(envelope: &TriggerEnvelope, decision: &str) { + use crate::openhuman::agent::task_board::TaskCardStatus; + use crate::openhuman::todos::ops; + + let Some(link) = &envelope.card_link else { + return; + }; + + let current = match ops::list(&link.location) { + Ok(snapshot) => snapshot + .cards + .into_iter() + .find(|c| c.id == link.card_id) + .map(|c| c.status), + Err(e) => { + tracing::warn!( + card_id = %link.card_id, + error = %e, + "[triage::escalation] reload before gating linked card failed" + ); + return; + } + }; + + match current { + Some(TaskCardStatus::Todo | TaskCardStatus::Ready | TaskCardStatus::AwaitingApproval) => { + match ops::update_status(&link.location, &link.card_id, TaskCardStatus::Rejected) { + Ok(_) => tracing::info!( + card_id = %link.card_id, + decision = %decision, + "[triage::escalation] gated task-card → rejected (poller will skip)" + ), + Err(e) => tracing::warn!( + card_id = %link.card_id, + decision = %decision, + error = %e, + "[triage::escalation] failed to gate task-card (poller may re-dispatch)" + ), + } + } + other => tracing::debug!( + card_id = %link.card_id, + decision = %decision, + status = ?other, + "[triage::escalation] linked task-card not pending; no gating needed" + ), + } +} + #[cfg(test)] mod tests { use super::*; @@ -426,6 +545,75 @@ mod tests { ))); } + fn seed_task_card() -> ( + tempfile::TempDir, + crate::openhuman::todos::ops::BoardLocation, + String, + ) { + use crate::openhuman::todos::ops::{self, BoardLocation, CardPatch}; + let dir = tempfile::tempdir().unwrap(); + let location = BoardLocation::Thread { + workspace_dir: dir.path().to_path_buf(), + thread_id: "task-sources".to_string(), + }; + let card_id = ops::add(&location, "ingested issue", CardPatch::default()) + .unwrap() + .cards[0] + .id + .clone(); + (dir, location, card_id) + } + + #[tokio::test] + async fn apply_decision_drop_gates_linked_card_to_rejected() { + use crate::openhuman::agent::task_board::TaskCardStatus; + use crate::openhuman::todos::ops; + + let _events_guard = test_events_guard().await; + let _ = init_global(32); + let (_dir, location, card_id) = seed_task_card(); + + let envelope = envelope("esc-drop-card").with_task_card(card_id.clone(), location.clone()); + apply_decision(run(TriageAction::Drop), &envelope) + .await + .expect("drop should not fail"); + + let status = ops::list(&location) + .unwrap() + .cards + .into_iter() + .find(|c| c.id == card_id) + .map(|c| c.status); + assert_eq!( + status, + Some(TaskCardStatus::Rejected), + "a dropped card-linked trigger must be gated terminally so the board poller skips it" + ); + } + + #[tokio::test] + async fn apply_decision_acknowledge_gates_linked_card_to_rejected() { + use crate::openhuman::agent::task_board::TaskCardStatus; + use crate::openhuman::todos::ops; + + let _events_guard = test_events_guard().await; + let _ = init_global(32); + let (_dir, location, card_id) = seed_task_card(); + + let envelope = envelope("esc-ack-card").with_task_card(card_id.clone(), location.clone()); + apply_decision(run(TriageAction::Acknowledge), &envelope) + .await + .expect("acknowledge should not fail"); + + let status = ops::list(&location) + .unwrap() + .cards + .into_iter() + .find(|c| c.id == card_id) + .map(|c| c.status); + assert_eq!(status, Some(TaskCardStatus::Rejected)); + } + #[tokio::test] async fn apply_decision_react_failure_publishes_failed_event() { let _events_guard = test_events_guard().await; diff --git a/src/openhuman/channels/runtime/startup.rs b/src/openhuman/channels/runtime/startup.rs index 5970b921e2..5570bcce06 100644 --- a/src/openhuman/channels/runtime/startup.rs +++ b/src/openhuman/channels/runtime/startup.rs @@ -69,6 +69,9 @@ pub async fn start_channels(mut config: Config) -> Result<()> { // configured external sources onto the agent's todo board. crate::openhuman::task_sources::bus::register_task_sources_subscriber(); crate::openhuman::task_sources::start_periodic_poll(); + // Board poller: dispatch the highest-urgency `todo` card on the + // task-sources board (catch-all for cards without a proactive trigger). + crate::openhuman::agent::task_dispatcher::start_board_poller(); // Native request handlers. Re-registering is safe (latest wins) so // this is idempotent even if `bootstrap_core_runtime` also runs. // Must happen before `run_message_dispatch_loop` begins, because diff --git a/src/openhuman/notifications/rpc.rs b/src/openhuman/notifications/rpc.rs index 6d124d90c8..61ee4b5de3 100644 --- a/src/openhuman/notifications/rpc.rs +++ b/src/openhuman/notifications/rpc.rs @@ -104,6 +104,7 @@ pub async fn handle_ingest(params: Map) -> Result "raw": req.raw_payload, }), received_at: ingest_started_at, + card_link: None, }; match run_triage(&envelope).await { diff --git a/src/openhuman/task_sources/ops.rs b/src/openhuman/task_sources/ops.rs index 3207adea80..c70925f07d 100644 --- a/src/openhuman/task_sources/ops.rs +++ b/src/openhuman/task_sources/ops.rs @@ -44,6 +44,7 @@ pub async fn add( interval_secs: Option, target: Option, max_tasks_per_fetch: Option, + assigned_executor: Option, ) -> Result, String> { let defaults = &config.task_sources; let interval_secs = interval_secs.unwrap_or(defaults.default_interval_secs); @@ -66,9 +67,25 @@ pub async fn add( ) .map_err(|e| e.to_string())?; + // Apply the optional static executor routing (G7) as a follow-up patch so + // `add_source`'s signature (and its many callers) stays unchanged. + let source = match assigned_executor.filter(|s| !s.trim().is_empty()) { + Some(executor) => store::update_source( + config, + &source.id, + TaskSourcePatch { + assigned_executor: Some(executor), + ..Default::default() + }, + ) + .map_err(|e| e.to_string())?, + None => source, + }; + tracing::info!( source_id = %source.id, provider = %source.provider.as_str(), + assigned_executor = ?source.assigned_executor, "[task_sources:ops] add created source" ); Ok(RpcOutcome::new(source, vec![])) diff --git a/src/openhuman/task_sources/periodic.rs b/src/openhuman/task_sources/periodic.rs index c8cda8a998..5035f081c4 100644 --- a/src/openhuman/task_sources/periodic.rs +++ b/src/openhuman/task_sources/periodic.rs @@ -165,6 +165,7 @@ mod tests { interval_secs, target: SourceTarget::TodoOnly, max_tasks_per_fetch: 25, + assigned_executor: None, created_at: Utc::now(), last_fetch_at: None, last_status: None, diff --git a/src/openhuman/task_sources/route.rs b/src/openhuman/task_sources/route.rs index 115def3665..0dbce4fec3 100644 --- a/src/openhuman/task_sources/route.rs +++ b/src/openhuman/task_sources/route.rs @@ -19,7 +19,7 @@ use crate::openhuman::todos::ops::{ }; use crate::openhuman::{scheduler_gate, todos}; -use super::types::{EnrichedTask, SourceTarget, TaskSource}; +use super::types::{EnrichedTask, FilterSpec, SourceTarget, TaskSource}; /// Stable thread id whose board collects every ingested task. pub const TASK_SOURCES_THREAD_ID: &str = "task-sources"; @@ -44,7 +44,7 @@ pub async fn route_enriched( Ok(card_id) } SourceTarget::AgentTodoProactive => { - dispatch_triage(source, enriched).await?; + dispatch_triage(config, source, enriched, &card_id).await?; Ok(card_id) } } @@ -112,11 +112,36 @@ fn add_card( Some(notes_parts.join("\n")) }; + // Objective: the bare upstream title (the card `content`/title is the + // `[provider] title` display form; the objective is the clean goal the + // executing agent works toward). + let objective = { + let t = task.title.trim(); + (!t.is_empty()).then(|| t.to_string()) + }; + + // Stamp the source identifiers the downstream dispatcher / write-back + // needs (provider + repo + issue id + url) plus the enrichment urgency + // used for prioritisation. This is the only writer of `source_metadata`. + let source_metadata = build_source_metadata(source, enriched); + + // G7: pre-assign the card to the source's configured executor so the + // dispatcher runs it deterministically (no LLM router). Unset → unassigned. + let assigned_agent = source + .assigned_executor + .as_deref() + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(str::to_string); + let snapshot = todo_add( &location, &content, CardPatch { notes, + objective, + assigned_agent, + source_metadata: Some(source_metadata), ..Default::default() }, ) @@ -139,10 +164,43 @@ fn add_card( Ok(new_card_id) } +/// Build the card's `source_metadata` from the originating source + task: +/// the provider/repo/issue identifiers a later dispatcher or external +/// write-back needs to address the upstream item, plus the enrichment +/// urgency used to prioritise pickup. Repo is only present for GitHub +/// sources (the other providers don't carry a repo concept). +fn build_source_metadata(source: &TaskSource, enriched: &EnrichedTask) -> serde_json::Value { + let task = &enriched.task; + let mut meta = json!({ + "provider": task.provider, + "source_id": source.id, + "external_id": task.external_id, + "urgency": enriched.urgency, + }); + if let Some(url) = task.url.as_deref().map(str::trim).filter(|s| !s.is_empty()) { + meta["url"] = json!(url); + } + if let FilterSpec::Github { + repo: Some(repo), .. + } = &source.filter + { + let repo = repo.trim(); + if !repo.is_empty() { + meta["repo"] = json!(repo); + } + } + meta +} + /// Dispatch a triage turn for a proactive task, gated by scheduler /// capacity. Card creation already happened; a gated-off or deferred /// turn is non-fatal — the task still sits on the board. -async fn dispatch_triage(source: &TaskSource, enriched: &EnrichedTask) -> Result<(), String> { +async fn dispatch_triage( + config: &Config, + source: &TaskSource, + enriched: &EnrichedTask, + card_id: &str, +) -> Result<(), String> { // Respect background-AI throttling. When the gate denies capacity // (Off / paused), we keep the card but skip the proactive turn. let Some(_permit) = scheduler_gate::wait_for_capacity().await else { @@ -164,11 +222,19 @@ async fn dispatch_triage(source: &TaskSource, enriched: &EnrichedTask) -> Result "sourceId": source.id, }); + // Link the envelope to the board card so triage's escalation arm routes + // it through the deterministic dispatcher (claim → autonomous run → + // write-back) instead of the one-shot triage sub-agent. + let location = BoardLocation::Thread { + workspace_dir: config.workspace_dir.clone(), + thread_id: TASK_SOURCES_THREAD_ID.to_string(), + }; let envelope = TriggerEnvelope::from_external( &format!("task_sources:{}", source.id), "external task ingested", payload, - ); + ) + .with_task_card(card_id.to_string(), location); let outcome = run_triage(&envelope) .await @@ -228,6 +294,9 @@ pub fn board_cards( #[cfg(test)] mod tests { use super::*; + use crate::openhuman::task_sources::types::ProviderSlug; + use crate::openhuman::task_sources::NormalizedTask; + use chrono::Utc; #[test] fn provider_label_titlecases_known_and_unknown() { @@ -236,4 +305,146 @@ mod tests { assert_eq!(provider_label("asana"), "Asana"); assert_eq!(provider_label(""), ""); } + + fn github_source(repo: Option<&str>) -> TaskSource { + TaskSource { + id: "ts-1".into(), + provider: ProviderSlug::Github, + connection_id: None, + name: None, + enabled: true, + filter: FilterSpec::Github { + repo: repo.map(str::to_string), + labels: vec![], + assignee_is_me: true, + state: None, + extra: json!({}), + }, + interval_secs: 1800, + target: SourceTarget::AgentTodoProactive, + max_tasks_per_fetch: 25, + assigned_executor: None, + created_at: Utc::now(), + last_fetch_at: None, + last_status: None, + } + } + + fn enriched(external_id: &str, url: Option<&str>, urgency: f32) -> EnrichedTask { + let task = NormalizedTask { + external_id: external_id.into(), + provider: "github".into(), + title: "Fix the bug".into(), + url: url.map(str::to_string), + ..Default::default() + }; + EnrichedTask { + task, + summary: "Fix the bug".into(), + urgency, + linked_people: vec![], + linked_memory_ids: vec![], + agent_prompt: "do it".into(), + enriched_at: Utc::now(), + } + } + + #[test] + fn source_metadata_carries_github_repo_and_identifiers() { + let src = github_source(Some("octo/repo")); + let e = enriched("123", Some("https://github.com/octo/repo/issues/123"), 0.7); + let meta = build_source_metadata(&src, &e); + assert_eq!(meta["provider"], json!("github")); + assert_eq!(meta["source_id"], json!("ts-1")); + assert_eq!(meta["external_id"], json!("123")); + assert_eq!(meta["repo"], json!("octo/repo")); + assert_eq!( + meta["url"], + json!("https://github.com/octo/repo/issues/123") + ); + let urgency = meta["urgency"].as_f64().expect("urgency is a number"); + assert!((urgency - 0.7).abs() < 1e-6, "urgency was {urgency}"); + } + + #[test] + fn source_metadata_omits_absent_repo_and_url() { + let src = github_source(None); + let e = enriched("9", None, 0.4); + let meta = build_source_metadata(&src, &e); + assert!(meta.get("repo").is_none()); + assert!(meta.get("url").is_none()); + assert_eq!(meta["external_id"], json!("9")); + let urgency = meta["urgency"].as_f64().expect("urgency is a number"); + assert!((urgency - 0.4).abs() < 1e-6, "urgency was {urgency}"); + } + + fn temp_config() -> (tempfile::TempDir, Config) { + let tmp = tempfile::tempdir().unwrap(); + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + (tmp, config) + } + + #[test] + fn add_card_stamps_objective_assigned_agent_and_metadata() { + let (_tmp, config) = temp_config(); + let mut src = github_source(Some("octo/repo")); + // Whitespace around the executor must be trimmed into assigned_agent. + src.assigned_executor = Some(" agent-x ".into()); + let e = enriched("123", Some("https://github.com/octo/repo/issues/123"), 0.7); + + add_card(&config, &src, &e, None).expect("add_card succeeds"); + + let cards = board_cards(&config).expect("board_cards"); + assert_eq!(cards.len(), 1); + let card = &cards[0]; + // Display title is the `[provider] title` form; objective is the bare title. + assert_eq!(card.title, "[GitHub] Fix the bug"); + assert_eq!(card.objective.as_deref(), Some("Fix the bug")); + assert_eq!(card.assigned_agent.as_deref(), Some("agent-x")); + let meta = card + .source_metadata + .as_ref() + .expect("source_metadata present"); + assert_eq!(meta["external_id"], json!("123")); + assert_eq!(meta["repo"], json!("octo/repo")); + } + + #[test] + fn add_card_drops_whitespace_only_assigned_executor() { + let (_tmp, config) = temp_config(); + let mut src = github_source(None); + src.assigned_executor = Some(" ".into()); + let e = enriched("9", None, 0.4); + + add_card(&config, &src, &e, None).expect("add_card succeeds"); + + let cards = board_cards(&config).expect("board_cards"); + assert_eq!(cards.len(), 1); + assert!( + cards[0].assigned_agent.is_none(), + "whitespace-only executor should not assign the card" + ); + } + + #[test] + fn source_metadata_has_no_repo_for_non_github_provider() { + let mut src = github_source(Some("octo/repo")); + // A non-GitHub filter carries no repo concept. + src.provider = ProviderSlug::Linear; + src.filter = FilterSpec::Linear { + team_id: None, + assignee_is_me: true, + state: None, + extra: json!({}), + }; + let e = enriched("LIN-5", None, 0.5); + let meta = build_source_metadata(&src, &e); + assert!(meta.get("repo").is_none()); + assert_eq!(meta["source_id"], json!("ts-1")); + } } diff --git a/src/openhuman/task_sources/schemas.rs b/src/openhuman/task_sources/schemas.rs index e09a11e2d8..7d3ec292cb 100644 --- a/src/openhuman/task_sources/schemas.rs +++ b/src/openhuman/task_sources/schemas.rs @@ -169,6 +169,13 @@ pub fn schemas(function: &str) -> ControllerSchema { comment: "Per-fetch task cap (u32 range); defaults from config.", required: false, }, + FieldSchema { + name: "assigned_executor", + ty: TypeSchema::Option(Box::new(TypeSchema::String)), + comment: "Optional executor handle (personality/skill/agent id) every \ + card from this source is pre-assigned to.", + required: false, + }, ], outputs: vec![FieldSchema { name: "source", @@ -369,6 +376,7 @@ fn handle_add(params: Map) -> ControllerFuture { let interval_secs = read_optional::(¶ms, "interval_secs")?; let target = read_optional::(¶ms, "target")?; let max = read_optional_u32(¶ms, "max_tasks_per_fetch")?; + let assigned_executor = read_optional::(¶ms, "assigned_executor")?; to_json( ops::add( &config, @@ -379,6 +387,7 @@ fn handle_add(params: Map) -> ControllerFuture { interval_secs, target, max, + assigned_executor, ) .await?, ) diff --git a/src/openhuman/task_sources/store.rs b/src/openhuman/task_sources/store.rs index de2dce7c1e..e3998f4428 100644 --- a/src/openhuman/task_sources/store.rs +++ b/src/openhuman/task_sources/store.rs @@ -26,20 +26,26 @@ use super::types::{ }; /// Compute an edit-aware content hash for a task. Two fetches of the -/// same `external_id` whose title/body/status/updated_at differ produce +/// same `external_id` whose title/body/status/updated_at/url differ produce /// different hashes, so an *edited* upstream item re-ingests. /// +/// `url` is part of the canonical form because it is load-bearing downstream +/// (it lands in the card's `source_metadata`/notes and drives external +/// write-back); a provider that edits the URL without advancing `updated_at` +/// would otherwise leave a stale link on the board. +/// /// Uses SHA-256 over a canonical field-delimited representation so the /// digest is stable across Rust/toolchain versions (the dedup key is /// persisted on disk; a non-deterministic hasher would force spurious /// re-ingests after a toolchain bump). pub fn content_hash(task: &NormalizedTask) -> String { let canonical = format!( - "{}\u{1f}{}\u{1f}{}\u{1f}{}", + "{}\u{1f}{}\u{1f}{}\u{1f}{}\u{1f}{}", task.title, task.body.as_deref().unwrap_or(""), task.status.as_deref().unwrap_or(""), task.updated_at.as_deref().unwrap_or(""), + task.url.as_deref().unwrap_or(""), ); let digest = Sha256::digest(canonical.as_bytes()); format!("{digest:x}") @@ -166,6 +172,9 @@ pub fn update_source(config: &Config, id: &str, patch: TaskSourcePatch) -> Resul if let Some(connection_id) = patch.connection_id { source.connection_id = Some(connection_id).filter(|s| !s.trim().is_empty()); } + if let Some(assigned_executor) = patch.assigned_executor { + source.assigned_executor = Some(assigned_executor).filter(|s| !s.trim().is_empty()); + } let filter_json = serde_json::to_string(&source.filter).context("serialize filter")?; let target_json = serde_json::to_string(&source.target).context("serialize target")?; @@ -176,8 +185,9 @@ pub fn update_source(config: &Config, id: &str, patch: TaskSourcePatch) -> Resul conn.execute( "UPDATE task_sources SET provider = ?1, connection_id = ?2, name = ?3, enabled = ?4, filter = ?5, - interval_secs = ?6, target = ?7, max_tasks_per_fetch = ?8 - WHERE id = ?9", + interval_secs = ?6, target = ?7, max_tasks_per_fetch = ?8, + assigned_executor = ?9 + WHERE id = ?10", params![ source.provider.as_str(), source.connection_id, @@ -187,6 +197,7 @@ pub fn update_source(config: &Config, id: &str, patch: TaskSourcePatch) -> Resul interval_i64, target_json, i64::from(source.max_tasks_per_fetch), + source.assigned_executor, id, ], ) @@ -339,7 +350,8 @@ pub fn clear_all(config: &Config) -> Result { } const SELECT_SOURCE_COLUMNS: &str = "SELECT id, provider, connection_id, name, enabled, filter, \ - interval_secs, target, max_tasks_per_fetch, created_at, last_fetch_at, last_status \ + interval_secs, target, max_tasks_per_fetch, created_at, last_fetch_at, last_status, \ + assigned_executor \ FROM task_sources"; fn map_source_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { @@ -369,6 +381,7 @@ fn map_source_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { target, max_tasks_per_fetch: u32::try_from(row.get::<_, i64>(8)?) .map_err(|_| sql_conv("invalid max_tasks_per_fetch in task_sources DB"))?, + assigned_executor: row.get(12)?, created_at: parse_rfc3339(&created_at_raw).map_err(sql_conv)?, last_fetch_at: match last_fetch_raw { Some(raw) => Some(parse_rfc3339(&raw).map_err(sql_conv)?), @@ -421,7 +434,8 @@ fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) max_tasks_per_fetch INTEGER NOT NULL, created_at TEXT NOT NULL, last_fetch_at TEXT, - last_status TEXT + last_status TEXT, + assigned_executor TEXT ); CREATE TABLE IF NOT EXISTS ingested_tasks ( source_id TEXT NOT NULL, @@ -441,6 +455,8 @@ fn with_connection(config: &Config, f: impl FnOnce(&Connection) -> Result) // Additive migration: add card_id to existing databases that pre-date // this column. Tolerate "duplicate column" in case of a concurrent open. add_column_if_missing(&conn, "ingested_tasks", "card_id", "TEXT")?; + // G7: static executor routing on a source. + add_column_if_missing(&conn, "task_sources", "assigned_executor", "TEXT")?; f(&conn) } diff --git a/src/openhuman/task_sources/store_tests.rs b/src/openhuman/task_sources/store_tests.rs index c45f6f85bc..a17b1c2c82 100644 --- a/src/openhuman/task_sources/store_tests.rs +++ b/src/openhuman/task_sources/store_tests.rs @@ -215,6 +215,68 @@ fn dedup_detects_seen_and_edited_tasks() { assert_eq!(listed[0].external_id, "42"); } +#[tokio::test] +async fn add_with_assigned_executor_persists_and_filters_blank() { + use crate::openhuman::task_sources::ops; + + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + // Some(non-empty) → persisted via the follow-up update_source patch + // (exercises both ops::add's assigned-executor branch and store's + // update_source patch arm). The store layer preserves the value verbatim; + // route::add_card is what trims it when stamping a card's assigned_agent. + let out = ops::add( + &config, + ProviderSlug::Github, + None, + None, + github_filter(), + Some(1800), + Some(SourceTarget::TodoOnly), + Some(25), + Some("my-skill".into()), + ) + .await + .expect("add with executor"); + assert_eq!(out.value.assigned_executor.as_deref(), Some("my-skill")); + + // Re-read from disk to confirm persistence (not just the returned value). + let fetched = get_source(&config, &out.value.id).unwrap(); + assert_eq!(fetched.assigned_executor.as_deref(), Some("my-skill")); + + // Whitespace-only executor is filtered to None before the patch runs. + let blank = ops::add( + &config, + ProviderSlug::Github, + None, + None, + github_filter(), + Some(1800), + Some(SourceTarget::TodoOnly), + Some(25), + Some(" ".into()), + ) + .await + .expect("add with blank executor"); + assert_eq!(blank.value.assigned_executor, None); +} + +#[test] +fn content_hash_changes_when_only_url_changes() { + // `url` is load-bearing downstream (source_metadata / external write-back), + // so a URL-only upstream edit must produce a different hash and re-ingest — + // even if `updated_at` didn't advance (coarse-`updated_at` providers). + let base = sample_task("7", "Same title", "2025-01-01T00:00:00Z"); + let mut moved = base.clone(); + moved.url = Some("https://example.com/issues/7".into()); + assert_ne!( + content_hash(&base), + content_hash(&moved), + "a URL-only change must re-ingest" + ); +} + #[test] fn list_ingested_orders_newest_first() { let tmp = TempDir::new().unwrap(); diff --git a/src/openhuman/task_sources/types.rs b/src/openhuman/task_sources/types.rs index f060711f3f..b884a1e5a2 100644 --- a/src/openhuman/task_sources/types.rs +++ b/src/openhuman/task_sources/types.rs @@ -165,6 +165,12 @@ pub struct TaskSource { pub interval_secs: u64, pub target: SourceTarget, pub max_tasks_per_fetch: u32, + /// Static executor routing (G7): a personality / skill / agent handle that + /// every card from this source is pre-assigned to, so the dispatcher runs + /// it deterministically without the LLM router. `None` leaves cards + /// unassigned (router / poller decides). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub assigned_executor: Option, pub created_at: DateTime, #[serde(default, skip_serializing_if = "Option::is_none")] pub last_fetch_at: Option>, @@ -191,6 +197,8 @@ pub struct TaskSourcePatch { pub max_tasks_per_fetch: Option, #[serde(default)] pub connection_id: Option, + #[serde(default)] + pub assigned_executor: Option, } /// An enriched, agent-ready task produced by [`super::enrich`]. @@ -318,6 +326,7 @@ mod tests { interval_secs: 1800, target: SourceTarget::TodoOnly, max_tasks_per_fetch: 25, + assigned_executor: None, created_at: DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z") .unwrap() .with_timezone(&Utc), diff --git a/src/openhuman/threads/turn_state/mirror_tests.rs b/src/openhuman/threads/turn_state/mirror_tests.rs index de869b8b34..5dd16cd158 100644 --- a/src/openhuman/threads/turn_state/mirror_tests.rs +++ b/src/openhuman/threads/turn_state/mirror_tests.rs @@ -164,6 +164,7 @@ fn task_board_update_is_stored_and_flushed() { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 0, updated_at: "2026-05-15T00:00:00Z".into(), }], diff --git a/src/openhuman/todos/ops.rs b/src/openhuman/todos/ops.rs index f234a391b7..7c06ababde 100644 --- a/src/openhuman/todos/ops.rs +++ b/src/openhuman/todos/ops.rs @@ -36,11 +36,14 @@ fn maybe_scratch_lock(location: &BoardLocation) -> Option Result { match raw.trim().to_ascii_lowercase().as_str() { "todo" | "pending" => Ok(TaskCardStatus::Todo), + "awaiting_approval" | "awaiting-approval" => Ok(TaskCardStatus::AwaitingApproval), + "ready" | "approved" => Ok(TaskCardStatus::Ready), "in_progress" | "in-progress" | "inprogress" | "started" => Ok(TaskCardStatus::InProgress), "blocked" => Ok(TaskCardStatus::Blocked), "done" | "completed" | "complete" => Ok(TaskCardStatus::Done), + "rejected" | "denied" => Ok(TaskCardStatus::Rejected), other => Err(format!( - "invalid status '{other}' (expected todo|in_progress|blocked|done)" + "invalid status '{other}' (expected todo|awaiting_approval|ready|in_progress|blocked|done|rejected)" )), } } @@ -68,6 +71,9 @@ pub struct CardPatch { pub evidence: Option>, pub notes: Option, pub blocker: Option, + /// Provider/source identifiers for a task-source-ingested card. `Some` + /// sets the card's `source_metadata`; `None` leaves it untouched. + pub source_metadata: Option, } /// Where to load/save the working set of cards. @@ -158,10 +164,12 @@ pub fn render_markdown(cards: &[TaskBoardCard]) -> String { let mut out = String::new(); for card in cards { let marker = match card.status { - TaskCardStatus::Todo => "[ ]", + TaskCardStatus::Todo | TaskCardStatus::Ready => "[ ]", + TaskCardStatus::AwaitingApproval => "[?]", TaskCardStatus::InProgress => "[~]", TaskCardStatus::Blocked => "[!]", TaskCardStatus::Done => "[x]", + TaskCardStatus::Rejected => "[-]", }; out.push_str("- "); out.push_str(marker); @@ -261,6 +269,7 @@ pub fn add( evidence: patch.evidence.unwrap_or_default(), notes: patch.notes.and_then(non_empty), blocker: patch.blocker.and_then(non_empty), + source_metadata: patch.source_metadata, order: cards.len() as u32, updated_at: Utc::now().to_rfc3339(), }; @@ -322,6 +331,9 @@ pub fn edit(location: &BoardLocation, id: &str, patch: CardPatch) -> Result Result { + let cards = load_cards(location)?; + let current = cards + .iter() + .find(|c| c.id == id) + .ok_or_else(|| format!("todo id '{id}' not found"))?; + if current.status != TaskCardStatus::AwaitingApproval { + return Err(format!( + "card '{id}' is not awaiting approval (status: {})", + current.status.as_str() + )); + } + let new_status = if approve { + TaskCardStatus::Ready + } else { + TaskCardStatus::Rejected + }; + update_status(location, id, new_status) +} + /// Remove a card by id. Errors if `id` is unknown. pub fn remove(location: &BoardLocation, id: &str) -> Result { tracing::debug!( @@ -478,6 +518,13 @@ mod tests { ); assert_eq!(parse_status("blocked").unwrap(), TaskCardStatus::Blocked); assert_eq!(parse_status("done").unwrap(), TaskCardStatus::Done); + assert_eq!( + parse_status("awaiting_approval").unwrap(), + TaskCardStatus::AwaitingApproval + ); + assert_eq!(parse_status("ready").unwrap(), TaskCardStatus::Ready); + assert_eq!(parse_status("approved").unwrap(), TaskCardStatus::Ready); + assert_eq!(parse_status("rejected").unwrap(), TaskCardStatus::Rejected); assert!(parse_status("nope").is_err()); } @@ -533,6 +580,83 @@ mod tests { assert_eq!(snap.cards[0].title, "Refined plan"); } + #[test] + fn source_metadata_round_trips_through_add_and_edit() { + let dir = tempdir().unwrap(); + let loc = thread_loc(dir.path(), "t1"); + let added = add( + &loc, + "ingested task", + CardPatch { + source_metadata: Some(serde_json::json!({ + "provider": "github", + "external_id": "7", + })), + ..Default::default() + }, + ) + .unwrap(); + let id = added.cards[0].id.clone(); + assert_eq!( + added.cards[0].source_metadata.as_ref().unwrap()["external_id"], + serde_json::json!("7") + ); + + // A subsequent edit with `Some(..)` replaces the stamped metadata. + let snap = edit( + &loc, + &id, + CardPatch { + source_metadata: Some(serde_json::json!({ + "provider": "github", + "external_id": "8", + })), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + snap.cards[0].source_metadata.as_ref().unwrap()["external_id"], + serde_json::json!("8") + ); + + // An edit that leaves `source_metadata: None` preserves the value. + let snap2 = edit( + &loc, + &id, + CardPatch { + notes: Some("touch".into()), + ..Default::default() + }, + ) + .unwrap(); + assert_eq!( + snap2.cards[0].source_metadata.as_ref().unwrap()["external_id"], + serde_json::json!("8") + ); + } + + #[test] + fn decide_plan_approves_and_rejects_only_when_awaiting() { + let dir = tempdir().unwrap(); + let loc = thread_loc(dir.path(), "t1"); + let added = add(&loc, "task", CardPatch::default()).unwrap(); + let id = added.cards[0].id.clone(); + + // A todo card isn't awaiting approval yet → decision rejected. + assert!(decide_plan(&loc, &id, true).is_err()); + + // Park it, then approve → Ready. + update_status(&loc, &id, TaskCardStatus::AwaitingApproval).unwrap(); + let approved = decide_plan(&loc, &id, true).unwrap(); + assert_eq!(approved.cards[0].status, TaskCardStatus::Ready); + + // Re-park, then reject → Rejected. + update_status(&loc, &id, TaskCardStatus::AwaitingApproval).unwrap(); + let rejected = decide_plan(&loc, &id, false).unwrap(); + assert_eq!(rejected.cards[0].status, TaskCardStatus::Rejected); + } + #[test] fn edit_can_clear_approval_mode() { let dir = tempdir().unwrap(); @@ -609,6 +733,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 0, updated_at: String::new(), }, @@ -625,6 +750,7 @@ mod tests { evidence: Vec::new(), notes: None, blocker: None, + source_metadata: None, order: 1, updated_at: String::new(), }, diff --git a/src/openhuman/todos/schemas.rs b/src/openhuman/todos/schemas.rs index 466fd779bd..582d589390 100644 --- a/src/openhuman/todos/schemas.rs +++ b/src/openhuman/todos/schemas.rs @@ -19,6 +19,7 @@ pub fn all_controller_schemas() -> Vec { schemas("add"), schemas("edit"), schemas("update_status"), + schemas("decide_plan"), schemas("remove"), schemas("replace"), schemas("clear"), @@ -43,6 +44,10 @@ pub fn all_registered_controllers() -> Vec { schema: schemas("update_status"), handler: handle_update_status, }, + RegisteredController { + schema: schemas("decide_plan"), + handler: handle_decide_plan, + }, RegisteredController { schema: schemas("remove"), handler: handle_remove, @@ -137,7 +142,27 @@ pub fn schemas(function: &str) -> ControllerSchema { inputs: vec![ thread_id_input(), required_string("id", "Card identifier."), - required_string("status", "New status (todo|in_progress|blocked|done)."), + required_string( + "status", + "New status (todo|awaiting_approval|ready|in_progress|blocked|done|rejected).", + ), + ], + outputs: vec![snapshot_output()], + }, + "decide_plan" => ControllerSchema { + namespace: "todos", + function: "decide_plan", + description: "Approve or reject a card awaiting plan approval \ + (approve → ready/runnable; reject → rejected).", + inputs: vec![ + thread_id_input(), + required_string("id", "Card identifier."), + FieldSchema { + name: "approve", + ty: TypeSchema::Bool, + comment: "true to approve (card becomes runnable), false to reject.", + required: true, + }, ], outputs: vec![snapshot_output()], }, @@ -260,6 +285,13 @@ struct RemoveParams { id: String, } +#[derive(Debug, Deserialize)] +struct DecidePlanParams { + thread_id: String, + id: String, + approve: bool, +} + #[derive(Debug, Deserialize)] struct ReplaceParams { thread_id: String, @@ -291,6 +323,7 @@ fn handle_add(params: Map) -> ControllerFuture { evidence: p.evidence, notes: p.notes, blocker: p.blocker, + source_metadata: None, }; tracing::debug!(thread_id = %p.thread_id, "[rpc][todos] add entry"); snapshot_to_json(ops::add(&loc, &p.content, patch)?) @@ -314,6 +347,7 @@ fn handle_edit(params: Map) -> ControllerFuture { evidence: p.evidence, notes: p.notes, blocker: p.blocker, + source_metadata: None, }; tracing::debug!(thread_id = %p.thread_id, id = %p.id, "[rpc][todos] edit entry"); snapshot_to_json(ops::edit(&loc, &p.id, patch)?) @@ -335,6 +369,20 @@ fn handle_update_status(params: Map) -> ControllerFuture { }) } +fn handle_decide_plan(params: Map) -> ControllerFuture { + Box::pin(async move { + let p = parse::(params)?; + let loc = thread_location(&p.thread_id).await?; + tracing::debug!( + thread_id = %p.thread_id, + id = %p.id, + approve = p.approve, + "[rpc][todos] decide_plan entry" + ); + snapshot_to_json(ops::decide_plan(&loc, &p.id, p.approve)?) + }) +} + fn parse_approval_mode(raw: Option) -> Result, String> { let Some(raw) = raw else { return Ok(None);