diff --git a/app/src/lib/i18n/ar.ts b/app/src/lib/i18n/ar.ts index 0e48bf86e1..44989ac634 100644 --- a/app/src/lib/i18n/ar.ts +++ b/app/src/lib/i18n/ar.ts @@ -4649,6 +4649,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'نافذة الذاكرة', 'memoryData.windowUpdated': 'تم تحديث نافذة الذاكرة', 'memoryData.windowUpdatedMsg': 'تم الضبط على {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'مقاطعة', + 'runQueue.mode.steer': 'توجيه', + 'runQueue.mode.followup': 'متابعة', + 'runQueue.mode.collect': 'إضافة سياق', + 'runQueue.queued': 'تم وضع الرسالة في الانتظار', + 'runQueue.steerHint': 'وجّه الدور الحالي', + 'runQueue.followupHint': 'أضف إلى قائمة الانتظار كمتابعة', + 'runQueue.collectHint': 'إضافة كسياق إضافي', + 'runQueue.status': '{total} في الانتظار', + 'runQueue.cleared': 'تم مسح قائمة الانتظار', }; export default messages; diff --git a/app/src/lib/i18n/bn.ts b/app/src/lib/i18n/bn.ts index b9da099ee8..202d00b2a1 100644 --- a/app/src/lib/i18n/bn.ts +++ b/app/src/lib/i18n/bn.ts @@ -4739,6 +4739,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'মেমোরি উইন্ডো', 'memoryData.windowUpdated': 'মেমোরি উইন্ডো আপডেট হয়েছে', 'memoryData.windowUpdatedMsg': '{window}-এ সেট করা হয়েছে।', + + // Run queue + 'runQueue.mode.interrupt': 'বাধা দিন', + 'runQueue.mode.steer': 'পরিচালনা করুন', + 'runQueue.mode.followup': 'ফলো-আপ', + 'runQueue.mode.collect': 'প্রসঙ্গ যোগ করুন', + 'runQueue.queued': 'বার্তা সারিবদ্ধ হয়েছে', + 'runQueue.steerHint': 'বর্তমান পর্যায় পরিচালনা করুন', + 'runQueue.followupHint': 'ফলো-আপ হিসেবে সারিতে যোগ করুন', + 'runQueue.collectHint': 'অতিরিক্ত প্রসঙ্গ হিসেবে যোগ করুন', + 'runQueue.status': '{total}টি সারিবদ্ধ', + 'runQueue.cleared': 'সারি পরিষ্কার করা হয়েছে', }; export default messages; diff --git a/app/src/lib/i18n/de.ts b/app/src/lib/i18n/de.ts index 36d2fb6aca..11b2f93f5c 100644 --- a/app/src/lib/i18n/de.ts +++ b/app/src/lib/i18n/de.ts @@ -4874,6 +4874,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Speicherfenster', 'memoryData.windowUpdated': 'Speicherfenster aktualisiert', 'memoryData.windowUpdatedMsg': 'Auf {window} gesetzt.', + + // Run queue + 'runQueue.mode.interrupt': 'Unterbrechen', + 'runQueue.mode.steer': 'Steuern', + 'runQueue.mode.followup': 'Nachfassen', + 'runQueue.mode.collect': 'Kontext hinzufügen', + 'runQueue.queued': 'Nachricht in Warteschlange', + 'runQueue.steerHint': 'Aktuellen Durchlauf steuern', + 'runQueue.followupHint': 'Als Nachfassen einreihen', + 'runQueue.collectHint': 'Als zusätzlichen Kontext hinzufügen', + 'runQueue.status': '{total} in der Warteschlange', + 'runQueue.cleared': 'Warteschlange geleert', }; export default messages; diff --git a/app/src/lib/i18n/en.ts b/app/src/lib/i18n/en.ts index 8a2eff8501..7ca28bc37b 100644 --- a/app/src/lib/i18n/en.ts +++ b/app/src/lib/i18n/en.ts @@ -4985,6 +4985,18 @@ const en: TranslationMap = { // Monthly cost badge 'monthlyCost.badge': '${amount} this month', 'monthlyCost.noData': 'No syncs this month', + + // Run queue + 'runQueue.mode.interrupt': 'Interrupt', + 'runQueue.mode.steer': 'Steer', + 'runQueue.mode.followup': 'Follow-up', + 'runQueue.mode.collect': 'Add context', + 'runQueue.queued': 'Message queued', + 'runQueue.steerHint': 'Steer the current turn', + 'runQueue.followupHint': 'Queue as follow-up', + 'runQueue.collectHint': 'Add as extra context', + 'runQueue.status': '{total} queued', + 'runQueue.cleared': 'Queue cleared', }; export default en; diff --git a/app/src/lib/i18n/es.ts b/app/src/lib/i18n/es.ts index 88258b7de7..208b452253 100644 --- a/app/src/lib/i18n/es.ts +++ b/app/src/lib/i18n/es.ts @@ -4840,6 +4840,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Ventana de memoria', 'memoryData.windowUpdated': 'Ventana de memoria actualizada', 'memoryData.windowUpdatedMsg': 'Establecida en {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Interrumpir', + 'runQueue.mode.steer': 'Dirigir', + 'runQueue.mode.followup': 'Seguimiento', + 'runQueue.mode.collect': 'Añadir contexto', + 'runQueue.queued': 'Mensaje en cola', + 'runQueue.steerHint': 'Dirigir el turno actual', + 'runQueue.followupHint': 'Poner en cola como seguimiento', + 'runQueue.collectHint': 'Añadir como contexto adicional', + 'runQueue.status': '{total} en cola', + 'runQueue.cleared': 'Cola vaciada', }; export default messages; diff --git a/app/src/lib/i18n/fr.ts b/app/src/lib/i18n/fr.ts index 8c36a1d951..579d3e9b9f 100644 --- a/app/src/lib/i18n/fr.ts +++ b/app/src/lib/i18n/fr.ts @@ -4856,6 +4856,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Fenêtre de mémoire', 'memoryData.windowUpdated': 'Fenêtre de mémoire mise à jour', 'memoryData.windowUpdatedMsg': 'Définie sur {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Interrompre', + 'runQueue.mode.steer': 'Orienter', + 'runQueue.mode.followup': 'Suivi', + 'runQueue.mode.collect': 'Ajouter du contexte', + 'runQueue.queued': "Message en file d'attente", + 'runQueue.steerHint': 'Orienter le tour en cours', + 'runQueue.followupHint': "Mettre en file d'attente comme suivi", + 'runQueue.collectHint': 'Ajouter comme contexte supplémentaire', + 'runQueue.status': '{total} en attente', + 'runQueue.cleared': "File d'attente vidée", }; export default messages; diff --git a/app/src/lib/i18n/hi.ts b/app/src/lib/i18n/hi.ts index 70093ef389..da112d2a00 100644 --- a/app/src/lib/i18n/hi.ts +++ b/app/src/lib/i18n/hi.ts @@ -4746,6 +4746,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'मेमोरी विंडो', 'memoryData.windowUpdated': 'मेमोरी विंडो अपडेट हुई', 'memoryData.windowUpdatedMsg': '{window} पर सेट किया गया।', + + // Run queue + 'runQueue.mode.interrupt': 'बाधित करें', + 'runQueue.mode.steer': 'दिशा दें', + 'runQueue.mode.followup': 'अनुसरण', + 'runQueue.mode.collect': 'संदर्भ जोड़ें', + 'runQueue.queued': 'संदेश कतार में है', + 'runQueue.steerHint': 'वर्तमान चरण को दिशा दें', + 'runQueue.followupHint': 'अनुसरण के रूप में कतार में जोड़ें', + 'runQueue.collectHint': 'अतिरिक्त संदर्भ के रूप में जोड़ें', + 'runQueue.status': '{total} कतार में', + 'runQueue.cleared': 'कतार साफ़ की गई', }; export default messages; diff --git a/app/src/lib/i18n/id.ts b/app/src/lib/i18n/id.ts index 6c1aeadd14..c630b76278 100644 --- a/app/src/lib/i18n/id.ts +++ b/app/src/lib/i18n/id.ts @@ -4758,6 +4758,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Jendela memori', 'memoryData.windowUpdated': 'Jendela memori diperbarui', 'memoryData.windowUpdatedMsg': 'Diatur ke {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Interupsi', + 'runQueue.mode.steer': 'Arahkan', + 'runQueue.mode.followup': 'Tindak lanjut', + 'runQueue.mode.collect': 'Tambah konteks', + 'runQueue.queued': 'Pesan masuk antrean', + 'runQueue.steerHint': 'Arahkan giliran saat ini', + 'runQueue.followupHint': 'Masukkan ke antrean sebagai tindak lanjut', + 'runQueue.collectHint': 'Tambahkan sebagai konteks tambahan', + 'runQueue.status': '{total} dalam antrean', + 'runQueue.cleared': 'Antrean dikosongkan', }; export default messages; diff --git a/app/src/lib/i18n/it.ts b/app/src/lib/i18n/it.ts index cf14035936..c554b9c1dc 100644 --- a/app/src/lib/i18n/it.ts +++ b/app/src/lib/i18n/it.ts @@ -4830,6 +4830,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Finestra di memoria', 'memoryData.windowUpdated': 'Finestra di memoria aggiornata', 'memoryData.windowUpdatedMsg': 'Impostata su {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Interrompi', + 'runQueue.mode.steer': 'Guida', + 'runQueue.mode.followup': 'Seguito', + 'runQueue.mode.collect': 'Aggiungi contesto', + 'runQueue.queued': 'Messaggio in coda', + 'runQueue.steerHint': 'Guida il turno corrente', + 'runQueue.followupHint': 'Metti in coda come seguito', + 'runQueue.collectHint': 'Aggiungi come contesto extra', + 'runQueue.status': '{total} in coda', + 'runQueue.cleared': 'Coda svuotata', }; export default messages; diff --git a/app/src/lib/i18n/ko.ts b/app/src/lib/i18n/ko.ts index 0382bec511..ccdc037bc6 100644 --- a/app/src/lib/i18n/ko.ts +++ b/app/src/lib/i18n/ko.ts @@ -4694,6 +4694,18 @@ const messages: TranslationMap = { 'memoryData.windowError': '메모리 창', 'memoryData.windowUpdated': '메모리 창 업데이트됨', 'memoryData.windowUpdatedMsg': '{window}(으)로 설정되었습니다.', + + // Run queue + 'runQueue.mode.interrupt': '중단', + 'runQueue.mode.steer': '방향 조정', + 'runQueue.mode.followup': '후속 조치', + 'runQueue.mode.collect': '컨텍스트 추가', + 'runQueue.queued': '메시지가 대기열에 추가됨', + 'runQueue.steerHint': '현재 턴 방향 조정', + 'runQueue.followupHint': '후속 조치로 대기열에 추가', + 'runQueue.collectHint': '추가 컨텍스트로 추가', + 'runQueue.status': '{total}개 대기 중', + 'runQueue.cleared': '대기열이 비워졌습니다', }; export default messages; diff --git a/app/src/lib/i18n/pl.ts b/app/src/lib/i18n/pl.ts index f556da9714..670cd37a66 100644 --- a/app/src/lib/i18n/pl.ts +++ b/app/src/lib/i18n/pl.ts @@ -4822,6 +4822,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Okno pamięci', 'memoryData.windowUpdated': 'Okno pamięci zaktualizowane', 'memoryData.windowUpdatedMsg': 'Ustawiono na {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Przerwij', + 'runQueue.mode.steer': 'Kieruj', + 'runQueue.mode.followup': 'Kontynuacja', + 'runQueue.mode.collect': 'Dodaj kontekst', + 'runQueue.queued': 'Wiadomość w kolejce', + 'runQueue.steerHint': 'Kieruj bieżącą turą', + 'runQueue.followupHint': 'Dodaj do kolejki jako kontynuację', + 'runQueue.collectHint': 'Dodaj jako dodatkowy kontekst', + 'runQueue.status': '{total} w kolejce', + 'runQueue.cleared': 'Kolejka wyczyszczona', }; export default messages; diff --git a/app/src/lib/i18n/pt.ts b/app/src/lib/i18n/pt.ts index 827a57e11e..f9e5364478 100644 --- a/app/src/lib/i18n/pt.ts +++ b/app/src/lib/i18n/pt.ts @@ -4827,6 +4827,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Janela de memória', 'memoryData.windowUpdated': 'Janela de memória atualizada', 'memoryData.windowUpdatedMsg': 'Definida para {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Interromper', + 'runQueue.mode.steer': 'Direcionar', + 'runQueue.mode.followup': 'Acompanhamento', + 'runQueue.mode.collect': 'Adicionar contexto', + 'runQueue.queued': 'Mensagem na fila', + 'runQueue.steerHint': 'Direcionar o turno atual', + 'runQueue.followupHint': 'Colocar na fila como acompanhamento', + 'runQueue.collectHint': 'Adicionar como contexto extra', + 'runQueue.status': '{total} na fila', + 'runQueue.cleared': 'Fila limpa', }; export default messages; diff --git a/app/src/lib/i18n/ru.ts b/app/src/lib/i18n/ru.ts index e67cc2e541..b9c81001b9 100644 --- a/app/src/lib/i18n/ru.ts +++ b/app/src/lib/i18n/ru.ts @@ -4787,6 +4787,18 @@ const messages: TranslationMap = { 'memoryData.windowError': 'Окно памяти', 'memoryData.windowUpdated': 'Окно памяти обновлено', 'memoryData.windowUpdatedMsg': 'Установлено значение {window}.', + + // Run queue + 'runQueue.mode.interrupt': 'Прервать', + 'runQueue.mode.steer': 'Направить', + 'runQueue.mode.followup': 'Продолжение', + 'runQueue.mode.collect': 'Добавить контекст', + 'runQueue.queued': 'Сообщение в очереди', + 'runQueue.steerHint': 'Направить текущий ход', + 'runQueue.followupHint': 'Добавить в очередь как продолжение', + 'runQueue.collectHint': 'Добавить как дополнительный контекст', + 'runQueue.status': '{total} в очереди', + 'runQueue.cleared': 'Очередь очищена', }; export default messages; diff --git a/app/src/lib/i18n/zh-CN.ts b/app/src/lib/i18n/zh-CN.ts index a4c5a7622f..e7e435574c 100644 --- a/app/src/lib/i18n/zh-CN.ts +++ b/app/src/lib/i18n/zh-CN.ts @@ -4509,6 +4509,18 @@ const messages: TranslationMap = { 'memoryData.windowError': '记忆时间窗口', 'memoryData.windowUpdated': '记忆时间窗口已更新', 'memoryData.windowUpdatedMsg': '已设置为 {window}。', + + // Run queue + 'runQueue.mode.interrupt': '中断', + 'runQueue.mode.steer': '引导', + 'runQueue.mode.followup': '后续跟进', + 'runQueue.mode.collect': '添加上下文', + 'runQueue.queued': '消息已加入队列', + 'runQueue.steerHint': '引导当前轮次', + 'runQueue.followupHint': '作为后续跟进加入队列', + 'runQueue.collectHint': '作为额外上下文添加', + 'runQueue.status': '已排队 {total} 条', + 'runQueue.cleared': '队列已清空', }; export default messages; diff --git a/app/src/services/chatService.ts b/app/src/services/chatService.ts index 4a64b4db86..01198515ef 100644 --- a/app/src/services/chatService.ts +++ b/app/src/services/chatService.ts @@ -942,6 +942,8 @@ export function subscribeChatEvents(listeners: ChatEventListeners): () => void { }; } +export type QueueMode = 'interrupt' | 'steer' | 'followup' | 'collect'; + export interface ChatSendParams { threadId: string; message: string; @@ -954,6 +956,13 @@ export interface ChatSendParams { * working unchanged. */ locale?: string | null; + /** + * Queue mode for concurrent messages. When a turn is already in + * flight: `steer` injects at the next iteration boundary, `followup` + * queues for after the turn, `collect` adds as context. `interrupt` + * (default) aborts the running turn. + */ + queueMode?: QueueMode | null; } /** @@ -979,6 +988,7 @@ export async function chatSend(params: ChatSendParams): Promise { model_override: params.model ?? undefined, profile_id: params.profileId ?? undefined, locale: params.locale ?? undefined, + queue_mode: params.queueMode ?? undefined, }, }); } diff --git a/app/src/store/__tests__/chatRuntimeSlice.queue.test.ts b/app/src/store/__tests__/chatRuntimeSlice.queue.test.ts new file mode 100644 index 0000000000..4899589598 --- /dev/null +++ b/app/src/store/__tests__/chatRuntimeSlice.queue.test.ts @@ -0,0 +1,115 @@ +import { describe, expect, it } from 'vitest'; + +import reducer, { + beginInferenceTurn, + clearAllChatRuntime, + clearQueueStatusForThread, + clearRuntimeForThread, + endInferenceTurn, + setQueueStatusForThread, +} from '../chatRuntimeSlice'; + +describe('chatRuntimeSlice — queue status', () => { + it('stores and clears per-thread queue status', () => { + const withStatus = reducer( + undefined, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: true, steers: 1, followups: 0, collects: 2, total: 3 }, + }) + ); + + expect(withStatus.queueStatusByThread['thread-1']).toEqual({ + active: true, + steers: 1, + followups: 0, + collects: 2, + total: 3, + }); + + const cleared = reducer(withStatus, clearQueueStatusForThread({ threadId: 'thread-1' })); + expect(cleared.queueStatusByThread['thread-1']).toBeUndefined(); + }); + + it('updates queue status in place', () => { + let state = reducer( + undefined, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: true, steers: 1, followups: 0, collects: 0, total: 1 }, + }) + ); + state = reducer( + state, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: true, steers: 2, followups: 1, collects: 0, total: 3 }, + }) + ); + + expect(state.queueStatusByThread['thread-1']?.total).toBe(3); + expect(state.queueStatusByThread['thread-1']?.steers).toBe(2); + }); + + it('clearRuntimeForThread removes queue status', () => { + let state = reducer( + undefined, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: true, steers: 1, followups: 0, collects: 0, total: 1 }, + }) + ); + state = reducer(state, beginInferenceTurn({ threadId: 'thread-1' })); + state = reducer(state, clearRuntimeForThread({ threadId: 'thread-1' })); + + expect(state.queueStatusByThread['thread-1']).toBeUndefined(); + expect(state.inferenceTurnLifecycleByThread['thread-1']).toBeUndefined(); + }); + + it('clearAllChatRuntime removes all queue statuses', () => { + let state = reducer( + undefined, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: true, steers: 1, followups: 0, collects: 0, total: 1 }, + }) + ); + state = reducer( + state, + setQueueStatusForThread({ + threadId: 'thread-2', + status: { active: true, steers: 0, followups: 1, collects: 0, total: 1 }, + }) + ); + state = reducer(state, clearAllChatRuntime()); + + expect(Object.keys(state.queueStatusByThread)).toHaveLength(0); + }); + + it('inactive queue status has zero counts', () => { + const state = reducer( + undefined, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: false, steers: 0, followups: 0, collects: 0, total: 0 }, + }) + ); + + expect(state.queueStatusByThread['thread-1']?.active).toBe(false); + expect(state.queueStatusByThread['thread-1']?.total).toBe(0); + }); + + it('endInferenceTurn does not clear queue status', () => { + let state = reducer( + undefined, + setQueueStatusForThread({ + threadId: 'thread-1', + status: { active: true, steers: 1, followups: 0, collects: 0, total: 1 }, + }) + ); + state = reducer(state, beginInferenceTurn({ threadId: 'thread-1' })); + state = reducer(state, endInferenceTurn({ threadId: 'thread-1' })); + + expect(state.queueStatusByThread['thread-1']).toBeDefined(); + }); +}); diff --git a/app/src/store/chatRuntimeSlice.ts b/app/src/store/chatRuntimeSlice.ts index f5eb248ddd..12992cc11d 100644 --- a/app/src/store/chatRuntimeSlice.ts +++ b/app/src/store/chatRuntimeSlice.ts @@ -242,9 +242,20 @@ interface ChatRuntimeState { * download / retry affordances (#2779). */ artifactsByThread: Record; + /** Per-thread run queue status. Updated from queue_status RPC responses. */ + queueStatusByThread: Record; sessionTokenUsage: SessionTokenUsage; } +/** Snapshot of the active-run queue depth per lane. */ +export interface QueueStatus { + active: boolean; + steers: number; + followups: number; + collects: number; + total: number; +} + const initialState: ChatRuntimeState = { inferenceStatusByThread: {}, streamingAssistantByThread: {}, @@ -253,6 +264,7 @@ const initialState: ChatRuntimeState = { inferenceTurnLifecycleByThread: {}, pendingApprovalByThread: {}, artifactsByThread: {}, + queueStatusByThread: {}, sessionTokenUsage: { inputTokens: 0, outputTokens: 0, turns: 0, lastUpdated: 0 }, }; @@ -582,6 +594,15 @@ const chatRuntimeSlice = createSlice({ state.artifactsByThread[action.payload.threadId] = next; } }, + setQueueStatusForThread: ( + state, + action: PayloadAction<{ threadId: string; status: QueueStatus }> + ) => { + state.queueStatusByThread[action.payload.threadId] = action.payload.status; + }, + clearQueueStatusForThread: (state, action: PayloadAction<{ threadId: string }>) => { + delete state.queueStatusByThread[action.payload.threadId]; + }, beginInferenceTurn: (state, action: PayloadAction<{ threadId: string }>) => { state.inferenceTurnLifecycleByThread[action.payload.threadId] = 'started'; }, @@ -600,6 +621,7 @@ const chatRuntimeSlice = createSlice({ delete state.taskBoardByThread[action.payload.threadId]; delete state.inferenceTurnLifecycleByThread[action.payload.threadId]; delete state.pendingApprovalByThread[action.payload.threadId]; + delete state.queueStatusByThread[action.payload.threadId]; // Note: artifactsByThread intentionally NOT cleared here. The // ArtifactCard renders inline in the message timeline, so the // snapshot needs to survive turn boundaries — historic artifacts @@ -614,6 +636,7 @@ const chatRuntimeSlice = createSlice({ state.inferenceTurnLifecycleByThread = {}; state.pendingApprovalByThread = {}; state.artifactsByThread = {}; + state.queueStatusByThread = {}; }, recordChatTurnUsage: ( state, @@ -708,6 +731,8 @@ export const { upsertArtifactFailedForThread, clearArtifactsForThread, removeArtifactForThread, + setQueueStatusForThread, + clearQueueStatusForThread, beginInferenceTurn, markInferenceTurnStreaming, endInferenceTurn, diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index 3767dcf70d..406b38aa71 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -115,6 +115,32 @@ pub enum DomainEvent { reason: Option, }, + // ── Run Queue ────────────────────────────────────────────────────── + /// A message was queued into the active-run queue instead of interrupting. + RunQueueMessageQueued { + thread_id: String, + mode: String, + queue_depth: usize, + }, + /// A queued steer/collect message was delivered to the engine at an + /// iteration boundary. + RunQueueMessageDelivered { + thread_id: String, + mode: String, + iteration: u32, + }, + /// A queued followup message was dispatched as a fresh turn after the + /// current turn completed. + RunQueueFollowupDispatched { + thread_id: String, + followup_count: usize, + }, + /// The active turn was interrupted by a new message (default behavior). + RunQueueInterrupted { + thread_id: String, + cancelled_request_id: String, + }, + // ── Memory ────────────────────────────────────────────────────────── /// The configured embedding provider is unreachable or the requested model /// is not installed, so the memory pipeline fell back to an alternative. @@ -902,7 +928,11 @@ impl DomainEvent { | Self::AgentOrchestrationSpawned { .. } | Self::AgentOrchestrationCompleted { .. } | Self::AgentOrchestrationFailed { .. } - | Self::AgentOrchestrationClosed { .. } => "agent", + | Self::AgentOrchestrationClosed { .. } + | Self::RunQueueMessageQueued { .. } + | Self::RunQueueMessageDelivered { .. } + | Self::RunQueueFollowupDispatched { .. } + | Self::RunQueueInterrupted { .. } => "agent", Self::EmbeddingModelUnhealthy { .. } | Self::MemoryStored { .. } @@ -1024,6 +1054,10 @@ impl DomainEvent { Self::AgentOrchestrationCompleted { .. } => "AgentOrchestrationCompleted", Self::AgentOrchestrationFailed { .. } => "AgentOrchestrationFailed", Self::AgentOrchestrationClosed { .. } => "AgentOrchestrationClosed", + Self::RunQueueMessageQueued { .. } => "RunQueueMessageQueued", + Self::RunQueueMessageDelivered { .. } => "RunQueueMessageDelivered", + Self::RunQueueFollowupDispatched { .. } => "RunQueueFollowupDispatched", + Self::RunQueueInterrupted { .. } => "RunQueueInterrupted", Self::MemoryStored { .. } => "MemoryStored", Self::MemoryRecalled { .. } => "MemoryRecalled", Self::MemorySyncRequested { .. } => "MemorySyncRequested", diff --git a/src/core/event_bus/events_tests.rs b/src/core/event_bus/events_tests.rs index 3c1dc6a0f5..d05bcb0cb2 100644 --- a/src/core/event_bus/events_tests.rs +++ b/src/core/event_bus/events_tests.rs @@ -57,6 +57,37 @@ fn all_variants_have_correct_domain() { }, "agent", ), + // Run Queue + ( + DomainEvent::RunQueueMessageQueued { + thread_id: "t".into(), + mode: "steer".into(), + queue_depth: 1, + }, + "agent", + ), + ( + DomainEvent::RunQueueMessageDelivered { + thread_id: "t".into(), + mode: "steer".into(), + iteration: 2, + }, + "agent", + ), + ( + DomainEvent::RunQueueFollowupDispatched { + thread_id: "t".into(), + followup_count: 1, + }, + "agent", + ), + ( + DomainEvent::RunQueueInterrupted { + thread_id: "t".into(), + cancelled_request_id: "req-1".into(), + }, + "agent", + ), // Memory ( DomainEvent::MemoryStored { diff --git a/src/core/socketio.rs b/src/core/socketio.rs index 0894d6679e..b55c7d092a 100644 --- a/src/core/socketio.rs +++ b/src/core/socketio.rs @@ -285,6 +285,8 @@ struct ChatStartPayload { profile_id: Option, #[serde(default)] locale: Option, + #[serde(default)] + queue_mode: Option, } #[derive(Debug, Deserialize)] @@ -435,6 +437,7 @@ pub fn attach_socketio() -> (socketioxide::layer::SocketIoLayer, SocketIo) { payload.temperature, payload.profile_id, payload.locale, + payload.queue_mode, ) .await { diff --git a/src/openhuman/agent/harness/engine/core.rs b/src/openhuman/agent/harness/engine/core.rs index 52c7415c03..5740f84e32 100644 --- a/src/openhuman/agent/harness/engine/core.rs +++ b/src/openhuman/agent/harness/engine/core.rs @@ -19,6 +19,7 @@ use anyhow::Result; use std::fmt::Write as _; use std::io::Write as _; +use std::sync::Arc; use crate::openhuman::agent::cost::TurnCost; use crate::openhuman::agent::multimodal; @@ -30,6 +31,7 @@ use crate::openhuman::inference::provider::{ }; use super::super::parse::build_native_assistant_history; +use super::super::run_queue::RunQueue; use super::super::token_budget::trim_chat_messages_to_budget; use super::super::tool_loop::{RepeatFailureGuard, STREAM_CHUNK_MIN_CHARS}; use super::checkpoint::CheckpointStrategy; @@ -88,6 +90,7 @@ pub(crate) async fn run_turn_engine( max_iterations: usize, on_delta: Option>, early_exit_tool_names: &[&str], + run_queue: Option>, ) -> Result { let mut context_guard = context_window_for_model(model) .map(ContextGuard::with_context_window) @@ -114,6 +117,48 @@ pub(crate) async fn run_turn_engine( .iteration_started((iteration + 1) as u32, max_iterations as u32) .await; + // ── Run queue drain: inject steers/collects at safe boundary ── + if let Some(ref rq) = run_queue { + if rq.has_pending_injections().await { + let steers = rq.drain_steers().await; + let collects = rq.drain_collects().await; + for s in &steers { + log::info!( + "[run_queue] injecting steer iteration={} thread_id={} chars={}", + iteration + 1, + s.thread_id, + s.text.len() + ); + let steer_content = format!("[User steering message]: {}", s.text); + history.push(ChatMessage::user(steer_content)); + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::RunQueueMessageDelivered { + thread_id: s.thread_id.clone(), + mode: "steer".to_string(), + iteration: (iteration + 1) as u32, + }, + ); + } + for c in &collects { + log::info!( + "[run_queue] injecting collect iteration={} thread_id={} chars={}", + iteration + 1, + c.thread_id, + c.text.len() + ); + let collect_content = format!("[Additional context from user]: {}", c.text); + history.push(ChatMessage::user(collect_content)); + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::RunQueueMessageDelivered { + thread_id: c.thread_id.clone(), + mode: "collect".to_string(), + iteration: (iteration + 1) as u32, + }, + ); + } + } + } + // ── Stop hooks: policy check before the next LLM call ── if !stop_hooks.is_empty() { let state = TurnState { diff --git a/src/openhuman/agent/harness/mod.rs b/src/openhuman/agent/harness/mod.rs index 9f76fc0f4a..6e246a9093 100644 --- a/src/openhuman/agent/harness/mod.rs +++ b/src/openhuman/agent/harness/mod.rs @@ -32,6 +32,7 @@ pub(crate) mod memory_context; pub(crate) mod memory_context_safety; mod parse; pub(crate) mod payload_summarizer; +pub mod run_queue; pub mod sandbox_context; pub(crate) mod self_healing; pub mod session; diff --git a/src/openhuman/agent/harness/run_queue/mod.rs b/src/openhuman/agent/harness/run_queue/mod.rs new file mode 100644 index 0000000000..0c50eaa401 --- /dev/null +++ b/src/openhuman/agent/harness/run_queue/mod.rs @@ -0,0 +1,109 @@ +//! Active-run queue for mid-turn message steering. +//! +//! When an agent turn is in flight, incoming messages can be routed into +//! one of three lanes instead of aborting the turn: +//! +//! - **steers** — injected at the next iteration boundary as a new user +//! instruction the agent must address immediately. +//! - **followups** — dispatched as a fresh turn after the current one completes. +//! - **collects** — injected as additional context at the next iteration +//! boundary without being a distinct instruction. +//! +//! The engine drains steers and collects at safe points (after tool results are +//! committed to history), preserving the tool-call / tool-result pairing invariant. + +pub mod types; + +use std::sync::Arc; +use tokio::sync::Mutex; + +pub use types::{QueueMode, QueueStatus, QueuedMessage}; + +/// Thread-safe run queue with three lanes. Wrapped in `Arc` for shared +/// ownership between the web channel producer and the engine consumer. +#[derive(Debug)] +pub struct RunQueue { + inner: Mutex, +} + +#[derive(Debug, Default)] +struct RunQueueInner { + steers: Vec, + followups: Vec, + collects: Vec, +} + +impl RunQueue { + pub fn new() -> Arc { + Arc::new(Self { + inner: Mutex::new(RunQueueInner::default()), + }) + } + + /// Push a message into the appropriate lane based on its mode. + pub async fn push(&self, msg: QueuedMessage) { + let mut inner = self.inner.lock().await; + match msg.mode { + QueueMode::Steer => inner.steers.push(msg), + QueueMode::Followup => inner.followups.push(msg), + QueueMode::Collect => inner.collects.push(msg), + QueueMode::Interrupt => { + log::warn!( + "[run_queue] interrupt-mode message pushed to queue — should have been handled by caller" + ); + } + } + } + + /// Drain all pending steer messages (FIFO order). + pub async fn drain_steers(&self) -> Vec { + let mut inner = self.inner.lock().await; + std::mem::take(&mut inner.steers) + } + + /// Drain all pending collect messages (FIFO order). + pub async fn drain_collects(&self) -> Vec { + let mut inner = self.inner.lock().await; + std::mem::take(&mut inner.collects) + } + + /// Drain all pending followup messages (FIFO order). + pub async fn drain_followups(&self) -> Vec { + let mut inner = self.inner.lock().await; + std::mem::take(&mut inner.followups) + } + + /// Snapshot the current queue depth per lane. + pub async fn status(&self) -> QueueStatus { + let inner = self.inner.lock().await; + let steers = inner.steers.len(); + let followups = inner.followups.len(); + let collects = inner.collects.len(); + QueueStatus { + steers, + followups, + collects, + total: steers + followups + collects, + } + } + + /// Clear all lanes and return the total number of messages dropped. + pub async fn clear(&self) -> usize { + let mut inner = self.inner.lock().await; + let total = inner.steers.len() + inner.followups.len() + inner.collects.len(); + inner.steers.clear(); + inner.followups.clear(); + inner.collects.clear(); + total + } + + /// Check whether any steers or collects are pending (engine poll). + pub async fn has_pending_injections(&self) -> bool { + let inner = self.inner.lock().await; + !inner.steers.is_empty() || !inner.collects.is_empty() + } +} + +#[cfg(test)] +#[path = "run_queue_tests.rs"] +mod tests; diff --git a/src/openhuman/agent/harness/run_queue/run_queue_tests.rs b/src/openhuman/agent/harness/run_queue/run_queue_tests.rs new file mode 100644 index 0000000000..af9a050b91 --- /dev/null +++ b/src/openhuman/agent/harness/run_queue/run_queue_tests.rs @@ -0,0 +1,165 @@ +use super::*; + +fn msg(text: &str, mode: QueueMode) -> QueuedMessage { + QueuedMessage { + text: text.to_string(), + mode, + client_id: "c1".to_string(), + thread_id: "t1".to_string(), + queued_at_ms: 0, + model_override: None, + temperature: None, + profile_id: None, + locale: None, + } +} + +#[tokio::test] +async fn new_queue_is_empty() { + let q = RunQueue::new(); + let s = q.status().await; + assert_eq!(s.total, 0); + assert_eq!(s.steers, 0); + assert_eq!(s.followups, 0); + assert_eq!(s.collects, 0); +} + +#[tokio::test] +async fn push_steer_routes_to_steer_lane() { + let q = RunQueue::new(); + q.push(msg("fix it", QueueMode::Steer)).await; + let s = q.status().await; + assert_eq!(s.steers, 1); + assert_eq!(s.followups, 0); + assert_eq!(s.collects, 0); + assert_eq!(s.total, 1); +} + +#[tokio::test] +async fn push_followup_routes_to_followup_lane() { + let q = RunQueue::new(); + q.push(msg("then do this", QueueMode::Followup)).await; + let s = q.status().await; + assert_eq!(s.steers, 0); + assert_eq!(s.followups, 1); + assert_eq!(s.collects, 0); +} + +#[tokio::test] +async fn push_collect_routes_to_collect_lane() { + let q = RunQueue::new(); + q.push(msg("btw", QueueMode::Collect)).await; + let s = q.status().await; + assert_eq!(s.steers, 0); + assert_eq!(s.followups, 0); + assert_eq!(s.collects, 1); +} + +#[tokio::test] +async fn drain_steers_returns_fifo_and_empties() { + let q = RunQueue::new(); + q.push(msg("first", QueueMode::Steer)).await; + q.push(msg("second", QueueMode::Steer)).await; + let drained = q.drain_steers().await; + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].text, "first"); + assert_eq!(drained[1].text, "second"); + assert_eq!(q.status().await.steers, 0); +} + +#[tokio::test] +async fn drain_collects_returns_fifo_and_empties() { + let q = RunQueue::new(); + q.push(msg("a", QueueMode::Collect)).await; + q.push(msg("b", QueueMode::Collect)).await; + let drained = q.drain_collects().await; + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].text, "a"); + assert_eq!(drained[1].text, "b"); + assert_eq!(q.status().await.collects, 0); +} + +#[tokio::test] +async fn drain_followups_returns_fifo_and_empties() { + let q = RunQueue::new(); + q.push(msg("x", QueueMode::Followup)).await; + let drained = q.drain_followups().await; + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].text, "x"); + assert_eq!(q.status().await.followups, 0); +} + +#[tokio::test] +async fn clear_empties_all_lanes() { + let q = RunQueue::new(); + q.push(msg("s", QueueMode::Steer)).await; + q.push(msg("f", QueueMode::Followup)).await; + q.push(msg("c", QueueMode::Collect)).await; + let dropped = q.clear().await; + assert_eq!(dropped, 3); + assert_eq!(q.status().await.total, 0); +} + +#[tokio::test] +async fn has_pending_injections_true_for_steers() { + let q = RunQueue::new(); + assert!(!q.has_pending_injections().await); + q.push(msg("steer me", QueueMode::Steer)).await; + assert!(q.has_pending_injections().await); +} + +#[tokio::test] +async fn has_pending_injections_true_for_collects() { + let q = RunQueue::new(); + q.push(msg("ctx", QueueMode::Collect)).await; + assert!(q.has_pending_injections().await); +} + +#[tokio::test] +async fn has_pending_injections_false_for_followups_only() { + let q = RunQueue::new(); + q.push(msg("later", QueueMode::Followup)).await; + assert!(!q.has_pending_injections().await); +} + +#[tokio::test] +async fn drain_does_not_affect_other_lanes() { + let q = RunQueue::new(); + q.push(msg("s", QueueMode::Steer)).await; + q.push(msg("f", QueueMode::Followup)).await; + q.push(msg("c", QueueMode::Collect)).await; + let _ = q.drain_steers().await; + let s = q.status().await; + assert_eq!(s.steers, 0); + assert_eq!(s.followups, 1); + assert_eq!(s.collects, 1); +} + +#[tokio::test] +async fn multiple_pushes_accumulate() { + let q = RunQueue::new(); + for i in 0..5 { + q.push(msg(&format!("steer-{i}"), QueueMode::Steer)).await; + } + for i in 0..3 { + q.push(msg(&format!("follow-{i}"), QueueMode::Followup)) + .await; + } + let s = q.status().await; + assert_eq!(s.steers, 5); + assert_eq!(s.followups, 3); + assert_eq!(s.total, 8); +} + +#[tokio::test] +async fn queue_mode_display() { + assert_eq!(QueueMode::Interrupt.to_string(), "interrupt"); + assert_eq!(QueueMode::Steer.to_string(), "steer"); + assert_eq!(QueueMode::Followup.to_string(), "followup"); + assert_eq!(QueueMode::Collect.to_string(), "collect"); +} + +#[tokio::test] +async fn queue_mode_default_is_interrupt() { + assert_eq!(QueueMode::default(), QueueMode::Interrupt); +} diff --git a/src/openhuman/agent/harness/run_queue/types.rs b/src/openhuman/agent/harness/run_queue/types.rs new file mode 100644 index 0000000000..6c1f1dd1a6 --- /dev/null +++ b/src/openhuman/agent/harness/run_queue/types.rs @@ -0,0 +1,60 @@ +//! Types for the active-run queue model. + +use std::fmt; + +/// How a message arriving during an active agent turn should be handled. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum QueueMode { + /// Abort the in-flight turn and start fresh (default, backward-compatible). + Interrupt, + /// Inject the message at the next safe iteration boundary so the agent + /// sees it mid-turn without restarting. + Steer, + /// Queue the message as a follow-up turn that fires after the current + /// turn completes. + Followup, + /// Silently collect the message as additional context; the agent sees it + /// at the next iteration boundary but does not treat it as a new instruction. + Collect, +} + +impl Default for QueueMode { + fn default() -> Self { + Self::Interrupt + } +} + +impl fmt::Display for QueueMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Interrupt => write!(f, "interrupt"), + Self::Steer => write!(f, "steer"), + Self::Followup => write!(f, "followup"), + Self::Collect => write!(f, "collect"), + } + } +} + +/// A message sitting in the run queue, tagged with its lane. +#[derive(Debug, Clone)] +pub struct QueuedMessage { + pub text: String, + pub mode: QueueMode, + pub client_id: String, + pub thread_id: String, + pub queued_at_ms: u64, + pub model_override: Option, + pub temperature: Option, + pub profile_id: Option, + pub locale: Option, +} + +/// Snapshot of the queue state for introspection. +#[derive(Debug, Clone, serde::Serialize)] +pub struct QueueStatus { + pub steers: usize, + pub followups: usize, + pub collects: usize, + pub total: usize, +} diff --git a/src/openhuman/agent/harness/session/builder.rs b/src/openhuman/agent/harness/session/builder.rs index 3ee332f01a..4c31b01e6b 100644 --- a/src/openhuman/agent/harness/session/builder.rs +++ b/src/openhuman/agent/harness/session/builder.rs @@ -602,6 +602,7 @@ impl AgentBuilder { cached_transcript_messages: None, context, on_progress: None, + run_queue: None, connected_integrations: Vec::new(), connected_integrations_initialized: false, integration_runtime_config: None, diff --git a/src/openhuman/agent/harness/session/runtime.rs b/src/openhuman/agent/harness/session/runtime.rs index 0a6ccd3fe3..7bc904905c 100644 --- a/src/openhuman/agent/harness/session/runtime.rs +++ b/src/openhuman/agent/harness/session/runtime.rs @@ -215,6 +215,14 @@ impl Agent { self.on_progress = tx; } + /// Attach an active-run queue for mid-turn steering. + pub fn set_run_queue( + &mut self, + rq: Option>, + ) { + self.run_queue = rq; + } + /// Restrict which tools the main agent can see and call for this /// session. An empty set restores the default "all visible" behavior, /// still subject to the configured channel permission policy. diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index b9e48d5611..cf5a23849e 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -513,6 +513,7 @@ impl Agent { user_message: user_message.to_string(), max_iterations, }; + let turn_run_queue = self.run_queue.clone(); let cached_prefix = self.cached_transcript_messages.take(); let mut observer = AgentObserver { agent: self, @@ -556,6 +557,7 @@ impl Agent { max_iterations, None, // the web bridge streams via on_progress deltas, not on_delta &[], + turn_run_queue, )) .await?; diff --git a/src/openhuman/agent/harness/session/types.rs b/src/openhuman/agent/harness/session/types.rs index c6bae3ea17..aea1639227 100644 --- a/src/openhuman/agent/harness/session/types.rs +++ b/src/openhuman/agent/harness/session/types.rs @@ -131,6 +131,9 @@ pub struct Agent { /// this channel so callers (e.g. web channel) can surface live /// tool-call and iteration updates to the UI. pub(super) on_progress: Option>, + /// Optional active-run queue for mid-turn steering. When set, the + /// engine drains steers/collects at iteration boundaries. + pub(super) run_queue: Option>, /// Active Composio integrations the user has connected. Populated at /// agent build time and threaded into each agent's `prompt.rs` so /// the delegator / skill-executor voices can render their own diff --git a/src/openhuman/agent/harness/subagent_runner/ops.rs b/src/openhuman/agent/harness/subagent_runner/ops.rs index 2a3aa0ff17..9bd12a6b91 100644 --- a/src/openhuman/agent/harness/subagent_runner/ops.rs +++ b/src/openhuman/agent/harness/subagent_runner/ops.rs @@ -1473,6 +1473,7 @@ async fn run_inner_loop( max_iterations, None, // sub-agents don't stream a draft &["ask_user_clarification"], + None, // sub-agents don't support run-queue steering )) .await?; diff --git a/src/openhuman/agent/harness/tool_loop.rs b/src/openhuman/agent/harness/tool_loop.rs index 8a13305644..54f39377a6 100644 --- a/src/openhuman/agent/harness/tool_loop.rs +++ b/src/openhuman/agent/harness/tool_loop.rs @@ -310,6 +310,7 @@ pub(crate) async fn run_tool_call_loop( max_iterations, on_delta, &[], + None, ) .await .map(|outcome| outcome.text) diff --git a/src/openhuman/channels/bus.rs b/src/openhuman/channels/bus.rs index 27fdbe31f2..90598ad4e9 100644 --- a/src/openhuman/channels/bus.rs +++ b/src/openhuman/channels/bus.rs @@ -76,7 +76,7 @@ impl EventHandler for ChannelInboundSubscriber { crate::openhuman::channels::providers::web::subscribe_web_channel_events(); let request_id = match crate::openhuman::channels::providers::web::start_chat( - &client_id, &thread_id, message, None, None, None, None, + &client_id, &thread_id, message, None, None, None, None, None, ) .await { diff --git a/src/openhuman/channels/providers/web.rs b/src/openhuman/channels/providers/web.rs index d8f365dc9b..e7fe22625c 100644 --- a/src/openhuman/channels/providers/web.rs +++ b/src/openhuman/channels/providers/web.rs @@ -353,6 +353,7 @@ fn pick_target_agent_id(_config: &Config, profile: &AgentProfile) -> String { struct InFlightEntry { request_id: String, handle: tokio::task::JoinHandle<()>, + run_queue: Arc, } #[derive(Debug, Clone)] @@ -481,6 +482,7 @@ pub async fn start_chat( temperature: Option, profile_id: Option, locale: Option, + queue_mode: Option, ) -> Result { let client_id = client_id.trim().to_string(); let thread_id = thread_id.trim().to_string(); @@ -580,10 +582,79 @@ pub async fn start_chat( let map_key = key_for(&thread_id); + let parsed_mode = match queue_mode.as_deref() { + Some("steer") => crate::openhuman::agent::harness::run_queue::QueueMode::Steer, + Some("followup") => crate::openhuman::agent::harness::run_queue::QueueMode::Followup, + Some("collect") => crate::openhuman::agent::harness::run_queue::QueueMode::Collect, + _ => crate::openhuman::agent::harness::run_queue::QueueMode::Interrupt, + }; + + // Non-interrupt modes: push into the running turn's queue and return. + if !matches!( + parsed_mode, + crate::openhuman::agent::harness::run_queue::QueueMode::Interrupt + ) { + let in_flight = IN_FLIGHT.lock().await; + if let Some(existing) = in_flight.get(&map_key) { + let queued_msg = crate::openhuman::agent::harness::run_queue::QueuedMessage { + text: message.clone(), + mode: parsed_mode, + client_id: client_id.clone(), + thread_id: thread_id.clone(), + queued_at_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + model_override: model_override.clone(), + temperature, + profile_id: profile_id.clone(), + locale: locale.clone(), + }; + existing.run_queue.push(queued_msg).await; + let status = existing.run_queue.status().await; + log::info!( + "[web-channel] queued {} message thread_id={} request_id={} queue_depth={}", + parsed_mode, + thread_id, + request_id, + status.total + ); + crate::core::event_bus::publish_global(DomainEvent::RunQueueMessageQueued { + thread_id: thread_id.clone(), + mode: parsed_mode.to_string(), + queue_depth: status.total, + }); + return Ok(json!({ + "queued": true, + "queue_mode": parsed_mode.to_string(), + "client_id": client_id, + "thread_id": thread_id, + "request_id": request_id, + "queue_depth": status.total, + }) + .to_string()); + } + // No in-flight turn — fall through to start a fresh turn. + log::info!( + "[web-channel] no in-flight turn for {} mode thread_id={} — starting fresh", + parsed_mode, + thread_id + ); + } + { let mut in_flight = IN_FLIGHT.lock().await; if let Some(existing) = in_flight.remove(&map_key) { existing.handle.abort(); + log::info!( + "[web-channel] interrupted in-flight turn thread_id={} cancelled_request_id={}", + thread_id, + existing.request_id + ); + crate::core::event_bus::publish_global(DomainEvent::RunQueueInterrupted { + thread_id: thread_id.clone(), + cancelled_request_id: existing.request_id.clone(), + }); publish_web_channel_event(WebChannelEvent { event: "chat_error".to_string(), client_id: client_id.clone(), @@ -616,6 +687,9 @@ pub async fn start_chat( } } + let turn_run_queue = crate::openhuman::agent::harness::run_queue::RunQueue::new(); + let turn_run_queue_task = turn_run_queue.clone(); + let client_id_task = client_id.clone(); let thread_id_task = thread_id.clone(); let request_id_task = request_id.clone(); @@ -654,6 +728,7 @@ pub async fn start_chat( temperature, profile_id, locale, + turn_run_queue_task, ), ), ) @@ -762,11 +837,37 @@ pub async fn start_chat( } } - let mut in_flight = IN_FLIGHT.lock().await; - if let Some(current) = in_flight.get(&map_key_task) { - if current.request_id == request_id_task { - in_flight.remove(&map_key_task); - } + // Drain followup messages queued during this turn. + let followups = { + let mut in_flight = IN_FLIGHT.lock().await; + let followups = if let Some(current) = in_flight.get(&map_key_task) { + if current.request_id == request_id_task { + let fups = current.run_queue.drain_followups().await; + in_flight.remove(&map_key_task); + fups + } else { + Vec::new() + } + } else { + Vec::new() + }; + followups + }; + if !followups.is_empty() { + log::info!( + "[web-channel] dispatching {} followup(s) thread_id={}", + followups.len(), + thread_id_task + ); + crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::RunQueueFollowupDispatched { + thread_id: thread_id_task.clone(), + followup_count: followups.len(), + }, + ); + // Dispatch each followup as a fresh turn on a new task to avoid + // Send issues with the nested async closure. + dispatch_followups(followups); } }); @@ -777,6 +878,7 @@ pub async fn start_chat( InFlightEntry { request_id: request_id.clone(), handle, + run_queue: turn_run_queue, }, ); } @@ -784,6 +886,31 @@ pub async fn start_chat( Ok(request_id) } +fn dispatch_followups(followups: Vec) { + for fup in followups { + tokio::spawn(async move { + if let Err(err) = start_chat( + &fup.client_id, + &fup.thread_id, + &fup.text, + fup.model_override, + fup.temperature, + fup.profile_id, + fup.locale, + Some("followup".to_string()), + ) + .await + { + log::warn!( + "[web-channel] failed to dispatch followup thread_id={} err={}", + fup.thread_id, + err + ); + } + }); + } +} + /// Invalidate all cached agent sessions for the given thread ID. /// Called when a thread is deleted so stale sessions don't leak /// into reused thread IDs. @@ -885,6 +1012,7 @@ async fn run_chat_task( temperature: Option, profile_id: Option, locale: Option, + run_queue: Arc, ) -> Result { #[cfg(any(test, debug_assertions))] { @@ -1030,6 +1158,7 @@ async fn run_chat_task( // (instead of retroactively after the loop finishes). let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(64); agent.set_on_progress(Some(progress_tx)); + agent.set_run_queue(Some(run_queue)); let turn_state_store = TurnStateStore::new(config.workspace_dir.clone()); spawn_progress_bridge( progress_rx, @@ -1846,6 +1975,14 @@ struct WebChatParams { /// default language (English) so existing integrations don't /// silently change behaviour. locale: Option, + /// Queue mode for concurrent messages: `interrupt` (default), `steer`, + /// `followup`, or `collect`. + queue_mode: Option, +} + +#[derive(Debug, Deserialize)] +struct WebQueueParams { + thread_id: String, } #[derive(Debug, Deserialize)] @@ -1862,8 +1999,9 @@ pub async fn channel_web_chat( temperature: Option, profile_id: Option, locale: Option, + queue_mode: Option, ) -> Result, String> { - let request_id = start_chat( + let result = start_chat( client_id, thread_id, message, @@ -1871,20 +2009,89 @@ pub async fn channel_web_chat( temperature, profile_id, locale, + queue_mode, ) .await?; + // start_chat returns either a plain request_id string or a JSON string + // (for queued messages). Try to parse as JSON first. + if let Ok(parsed) = serde_json::from_str::(&result) { + return Ok(RpcOutcome::single_log(parsed, "web channel message queued")); + } + Ok(RpcOutcome::single_log( json!({ "accepted": true, "client_id": client_id.trim(), "thread_id": thread_id.trim(), - "request_id": request_id, + "request_id": result, }), "web channel request accepted", )) } +pub async fn channel_web_queue_status(thread_id: &str) -> Result, String> { + let map_key = key_for(thread_id); + let in_flight = IN_FLIGHT.lock().await; + if let Some(entry) = in_flight.get(&map_key) { + let status = entry.run_queue.status().await; + Ok(RpcOutcome::single_log( + json!({ + "thread_id": thread_id.trim(), + "active": true, + "request_id": entry.request_id, + "steers": status.steers, + "followups": status.followups, + "collects": status.collects, + "total": status.total, + }), + "queue status retrieved", + )) + } else { + Ok(RpcOutcome::single_log( + json!({ + "thread_id": thread_id.trim(), + "active": false, + "steers": 0, + "followups": 0, + "collects": 0, + "total": 0, + }), + "no active turn for thread", + )) + } +} + +pub async fn channel_web_queue_clear(thread_id: &str) -> Result, String> { + let map_key = key_for(thread_id); + let in_flight = IN_FLIGHT.lock().await; + if let Some(entry) = in_flight.get(&map_key) { + let dropped = entry.run_queue.clear().await; + log::info!( + "[web-channel] cleared queue thread_id={} dropped={}", + thread_id, + dropped + ); + Ok(RpcOutcome::single_log( + json!({ + "thread_id": thread_id.trim(), + "cleared": true, + "dropped": dropped, + }), + "queue cleared", + )) + } else { + Ok(RpcOutcome::single_log( + json!({ + "thread_id": thread_id.trim(), + "cleared": false, + "dropped": 0, + }), + "no active turn for thread", + )) + } +} + pub async fn channel_web_cancel( client_id: &str, thread_id: &str, @@ -1903,7 +2110,12 @@ pub async fn channel_web_cancel( } pub fn all_web_channel_controller_schemas() -> Vec { - vec![schemas("chat"), schemas("cancel")] + vec![ + schemas("chat"), + schemas("cancel"), + schemas("queue_status"), + schemas("queue_clear"), + ] } pub fn all_web_channel_registered_controllers() -> Vec { @@ -1916,6 +2128,14 @@ pub fn all_web_channel_registered_controllers() -> Vec { schema: schemas("cancel"), handler: handle_cancel, }, + RegisteredController { + schema: schemas("queue_status"), + handler: handle_queue_status, + }, + RegisteredController { + schema: schemas("queue_clear"), + handler: handle_queue_clear, + }, ] } @@ -1936,6 +2156,10 @@ pub fn schemas(function: &str) -> ControllerSchema { "locale", "Optional BCP-47 UI locale (e.g. 'ar', 'zh-CN'). Drives the \"reply in this language\" system-prompt directive.", ), + optional_string( + "queue_mode", + "Queue mode: 'interrupt' (default), 'steer', 'followup', or 'collect'.", + ), ], outputs: vec![json_output("ack", "Acceptance payload.")], }, @@ -1949,6 +2173,20 @@ pub fn schemas(function: &str) -> ControllerSchema { ], outputs: vec![json_output("ack", "Cancellation payload.")], }, + "queue_status" => ControllerSchema { + namespace: "channel", + function: "web_queue_status", + description: "Get the run queue status for a thread.", + inputs: vec![required_string("thread_id", "Thread identifier.")], + outputs: vec![json_output("status", "Queue status payload.")], + }, + "queue_clear" => ControllerSchema { + namespace: "channel", + function: "web_queue_clear", + description: "Clear the run queue for a thread.", + inputs: vec![required_string("thread_id", "Thread identifier.")], + outputs: vec![json_output("result", "Queue clear result.")], + }, _ => ControllerSchema { namespace: "channel", function: "unknown", @@ -1976,12 +2214,27 @@ fn handle_chat(params: Map) -> ControllerFuture { p.temperature, p.profile_id, p.locale, + p.queue_mode, ) .await?, ) }) } +fn handle_queue_status(params: Map) -> ControllerFuture { + Box::pin(async move { + let p = deserialize_params::(params)?; + to_json(channel_web_queue_status(&p.thread_id).await?) + }) +} + +fn handle_queue_clear(params: Map) -> ControllerFuture { + Box::pin(async move { + let p = deserialize_params::(params)?; + to_json(channel_web_queue_clear(&p.thread_id).await?) + }) +} + /// Map a frontend BCP-47 locale tag to a system-prompt directive /// instructing the agent to reply in that language. Returns `None` /// for English (the agent's default — adding "Respond in English" diff --git a/src/openhuman/channels/providers/web_tests.rs b/src/openhuman/channels/providers/web_tests.rs index 18eb3a1363..d1f087b780 100644 --- a/src/openhuman/channels/providers/web_tests.rs +++ b/src/openhuman/channels/providers/web_tests.rs @@ -24,17 +24,17 @@ static FORCED_ERROR_TEST_LOCK: Lazy> = Lazy::new(|| TokioMutex::n #[tokio::test] async fn start_chat_validates_required_fields() { - let err = start_chat("", "thread", "hello", None, None, None, None) + let err = start_chat("", "thread", "hello", None, None, None, None, None) .await .expect_err("client id should be required"); assert!(err.contains("client_id is required")); - let err = start_chat("client", "", "hello", None, None, None, None) + let err = start_chat("client", "", "hello", None, None, None, None, None) .await .expect_err("thread id should be required"); assert!(err.contains("thread_id is required")); - let err = start_chat("client", "thread", " ", None, None, None, None) + let err = start_chat("client", "thread", " ", None, None, None, None, None) .await .expect_err("message should be required"); assert!(err.contains("message is required")); @@ -50,6 +50,7 @@ async fn start_chat_rejects_prompt_injection_payload() { None, None, None, + None, ) .await .expect_err("prompt-injection payload should be rejected"); @@ -92,6 +93,7 @@ async fn start_chat_emits_sanitized_chat_error_on_inference_failure() { None, None, None, + None, ) .await .expect("start_chat should accept valid request"); @@ -502,6 +504,7 @@ async fn start_chat_chat_error_event_serializes_structured_fields_to_json_wire() None, None, None, + None, ) .await .expect("start_chat should accept valid request"); @@ -595,6 +598,7 @@ async fn start_chat_emits_structured_rate_limit_metadata_on_chat_error_event() { None, None, None, + None, ) .await .expect("start_chat should accept valid request"); @@ -1020,10 +1024,12 @@ fn web_channel_catalog_has_chat_and_cancel() { let s = all_web_channel_controller_schemas(); let c = all_web_channel_registered_controllers(); assert_eq!(s.len(), c.len()); - assert_eq!(s.len(), 2); + assert_eq!(s.len(), 4); let fns: Vec<&str> = s.iter().map(|x| x.function).collect(); assert!(fns.contains(&"web_chat")); assert!(fns.contains(&"web_cancel")); + assert!(fns.contains(&"web_queue_status")); + assert!(fns.contains(&"web_queue_clear")); } #[test] diff --git a/tests/channels_large_round25_raw_coverage_e2e.rs b/tests/channels_large_round25_raw_coverage_e2e.rs index 45b86c7591..4ffdf20978 100644 --- a/tests/channels_large_round25_raw_coverage_e2e.rs +++ b/tests/channels_large_round25_raw_coverage_e2e.rs @@ -114,7 +114,7 @@ async fn web_channel_validation_cancellation_and_error_events_are_observable() { ); assert!( - web::start_chat(" ", "thread", "hello", None, None, None, None) + web::start_chat(" ", "thread", "hello", None, None, None, None, None) .await .unwrap_err() .contains("client_id is required") @@ -137,6 +137,7 @@ async fn web_channel_validation_cancellation_and_error_events_are_observable() { Some(0.2), None, Some("en-US".to_string()), + None, ) .await .expect("start forced-error chat"); diff --git a/tests/channels_provider_deep_raw_coverage_e2e.rs b/tests/channels_provider_deep_raw_coverage_e2e.rs index 3a1837f78b..5e0af50096 100644 --- a/tests/channels_provider_deep_raw_coverage_e2e.rs +++ b/tests/channels_provider_deep_raw_coverage_e2e.rs @@ -229,16 +229,20 @@ async fn dispatch_harness_covers_error_context_compaction_and_timeout_paths() { #[tokio::test] async fn web_channel_validation_cancel_and_classifier_snapshots_are_publicly_exercised() { - assert!(start_chat("", "thread", "hello", None, None, None, None) - .await - .expect_err("empty client rejected") - .contains("client_id")); - assert!(start_chat("client", "", "hello", None, None, None, None) - .await - .expect_err("empty thread rejected") - .contains("thread_id")); assert!( - start_chat("client", "thread", " ", None, None, None, None) + start_chat("", "thread", "hello", None, None, None, None, None) + .await + .expect_err("empty client rejected") + .contains("client_id") + ); + assert!( + start_chat("client", "", "hello", None, None, None, None, None) + .await + .expect_err("empty thread rejected") + .contains("thread_id") + ); + assert!( + start_chat("client", "thread", " ", None, None, None, None, None) .await .expect_err("empty message rejected") .contains("message") @@ -261,6 +265,7 @@ async fn web_channel_validation_cancel_and_classifier_snapshots_are_publicly_exe None, None, None, + None, ) .await; assert!(blocked.is_err()); diff --git a/tests/channels_provider_leftovers_raw_coverage_e2e.rs b/tests/channels_provider_leftovers_raw_coverage_e2e.rs index 496b8455aa..6220faf012 100644 --- a/tests/channels_provider_leftovers_raw_coverage_e2e.rs +++ b/tests/channels_provider_leftovers_raw_coverage_e2e.rs @@ -346,6 +346,7 @@ async fn web_round19_covers_classifier_variants_and_cancel_cleanup() { None, None, None, + None, ) .await .expect("start forced web chat"); diff --git a/tests/channels_runtime_raw_coverage_e2e.rs b/tests/channels_runtime_raw_coverage_e2e.rs index bad3f881ae..e02d19e9aa 100644 --- a/tests/channels_runtime_raw_coverage_e2e.rs +++ b/tests/channels_runtime_raw_coverage_e2e.rs @@ -372,16 +372,20 @@ async fn yuanbao_public_channel_and_config_paths_are_isolated_from_network() { #[tokio::test] async fn web_channel_validation_cancel_and_event_subscription_are_fast() { - assert!(start_chat("", "thread", "hello", None, None, None, None) - .await - .expect_err("empty client rejected") - .contains("client_id")); - assert!(start_chat("client", "", "hello", None, None, None, None) - .await - .expect_err("empty thread rejected") - .contains("thread_id")); assert!( - start_chat("client", "thread", " ", None, None, None, None) + start_chat("", "thread", "hello", None, None, None, None, None) + .await + .expect_err("empty client rejected") + .contains("client_id") + ); + assert!( + start_chat("client", "", "hello", None, None, None, None, None) + .await + .expect_err("empty thread rejected") + .contains("thread_id") + ); + assert!( + start_chat("client", "thread", " ", None, None, None, None, None) .await .expect_err("empty message rejected") .contains("message") @@ -404,6 +408,7 @@ async fn web_channel_validation_cancel_and_event_subscription_are_fast() { None, None, None, + None, ) .await; assert!( diff --git a/tests/channels_web_startup_raw_coverage_e2e.rs b/tests/channels_web_startup_raw_coverage_e2e.rs index dfef20921b..d4e1849bf0 100644 --- a/tests/channels_web_startup_raw_coverage_e2e.rs +++ b/tests/channels_web_startup_raw_coverage_e2e.rs @@ -97,17 +97,23 @@ fn web_chat_lock() -> std::sync::MutexGuard<'static, ()> { async fn web_controllers_validate_inputs_and_emit_structured_forced_errors() { let _chat_lock = web_chat_lock(); let controller_schemas = all_web_channel_controller_schemas(); - assert_eq!(controller_schemas.len(), 2); + assert_eq!(controller_schemas.len(), 4); assert!(controller_schemas .iter() .any(|schema| schema.function == "web_chat")); assert!(controller_schemas .iter() .any(|schema| schema.function == "web_cancel")); - assert_eq!(all_web_channel_registered_controllers().len(), 2); + assert!(controller_schemas + .iter() + .any(|schema| schema.function == "web_queue_status")); + assert!(controller_schemas + .iter() + .any(|schema| schema.function == "web_queue_clear")); + assert_eq!(all_web_channel_registered_controllers().len(), 4); assert_eq!(schemas("missing").function, "unknown"); - let err = channel_web_chat("client", "thread", " ", None, None, None, None) + let err = channel_web_chat("client", "thread", " ", None, None, None, None, None) .await .expect_err("blank messages are rejected"); assert!(err.contains("message is required")); @@ -133,6 +139,7 @@ async fn web_controllers_validate_inputs_and_emit_structured_forced_errors() { Some(0.2), None, Some("zh-CN".to_string()), + None, ) .await .expect("chat request accepted") @@ -177,6 +184,7 @@ async fn web_chat_cancel_aborts_in_flight_thread_without_real_provider() { None, None, None, + None, ) .await .expect("start chat"); diff --git a/tests/channels_web_telegram_raw_coverage_e2e.rs b/tests/channels_web_telegram_raw_coverage_e2e.rs index f3fbc632c1..30ccf3709c 100644 --- a/tests/channels_web_telegram_raw_coverage_e2e.rs +++ b/tests/channels_web_telegram_raw_coverage_e2e.rs @@ -299,6 +299,7 @@ async fn web_channel_approval_bridge_forced_errors_and_newer_request_cancellatio Some(0.3), Some("missing-profile".to_string()), Some("en-US".to_string()), + None, ) .await .expect("forced chat accepted"); @@ -323,6 +324,7 @@ async fn web_channel_approval_bridge_forced_errors_and_newer_request_cancellatio None, None, None, + None, ) .await .expect("first chat accepted"); @@ -334,6 +336,7 @@ async fn web_channel_approval_bridge_forced_errors_and_newer_request_cancellatio None, None, None, + None, ) .await .expect("second chat accepted"); diff --git a/tests/channels_web_yuanbao_round22_raw_coverage_e2e.rs b/tests/channels_web_yuanbao_round22_raw_coverage_e2e.rs index 3498dd04f9..377ffd1efb 100644 --- a/tests/channels_web_yuanbao_round22_raw_coverage_e2e.rs +++ b/tests/channels_web_yuanbao_round22_raw_coverage_e2e.rs @@ -189,13 +189,13 @@ fn isolated_config() -> (tempfile::TempDir, Config) { #[tokio::test] async fn web_start_chat_validation_forced_error_and_cancel_paths_are_structured() { assert_eq!( - start_chat(" ", "thread", "hello", None, None, None, None) + start_chat(" ", "thread", "hello", None, None, None, None, None) .await .unwrap_err(), "client_id is required" ); assert_eq!( - start_chat("client", " ", "hello", None, None, None, None) + start_chat("client", " ", "hello", None, None, None, None, None) .await .unwrap_err(), "thread_id is required" @@ -214,6 +214,7 @@ async fn web_start_chat_validation_forced_error_and_cancel_paths_are_structured( Some(0.4), None, None, + None, ) .await .expect("accepted"); diff --git a/tests/tools_approval_channels_raw_coverage_e2e.rs b/tests/tools_approval_channels_raw_coverage_e2e.rs index 255b381864..f7dc8311cf 100644 --- a/tests/tools_approval_channels_raw_coverage_e2e.rs +++ b/tests/tools_approval_channels_raw_coverage_e2e.rs @@ -2073,7 +2073,7 @@ async fn web_channel_public_paths_cover_event_delivery_and_validation_errors() { assert_eq!( openhuman_core::openhuman::channels::web::start_chat( - "", "thread-1", "hello", None, None, None, None, + "", "thread-1", "hello", None, None, None, None, None, ) .await .expect_err("blank client_id"), @@ -2081,7 +2081,7 @@ async fn web_channel_public_paths_cover_event_delivery_and_validation_errors() { ); assert_eq!( openhuman_core::openhuman::channels::web::start_chat( - "client-1", "", "hello", None, None, None, None, + "client-1", "", "hello", None, None, None, None, None, ) .await .expect_err("blank thread_id"), @@ -2089,7 +2089,7 @@ async fn web_channel_public_paths_cover_event_delivery_and_validation_errors() { ); assert_eq!( openhuman_core::openhuman::channels::web::start_chat( - "client-1", "thread-1", " ", None, None, None, None, + "client-1", "thread-1", " ", None, None, None, None, None, ) .await .expect_err("blank message"), diff --git a/tests/tools_network_channels_raw_coverage_e2e.rs b/tests/tools_network_channels_raw_coverage_e2e.rs index 70f378ff88..26f759fa7e 100644 --- a/tests/tools_network_channels_raw_coverage_e2e.rs +++ b/tests/tools_network_channels_raw_coverage_e2e.rs @@ -550,13 +550,13 @@ async fn composio_direct_and_mouse_tools_cover_validation_policy_and_schema_path #[tokio::test] async fn web_channel_public_paths_cover_validation_cancel_schema_and_event_bus() { - assert_eq!(all_web_channel_controller_schemas().len(), 2); - assert_eq!(all_web_channel_registered_controllers().len(), 2); + assert_eq!(all_web_channel_controller_schemas().len(), 4); + assert_eq!(all_web_channel_registered_controllers().len(), 4); assert_eq!(web_channel_schema("chat").function, "web_chat"); assert_eq!(web_channel_schema("cancel").function, "web_cancel"); assert_eq!(web_channel_schema("missing").function, "unknown"); - let missing_client = start_chat(" ", "thread", "hello", None, None, None, None) + let missing_client = start_chat(" ", "thread", "hello", None, None, None, None, None) .await .expect_err("blank client"); assert_contains(&missing_client, "client_id is required");