diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index fce6580dcf..b4216c11bd 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -412,6 +412,11 @@ pub enum DomainEvent { toolkit: String, connection_id: String, }, + /// The connected Composio toolkit set changed (connect/revoke/config flip). + /// + /// `toolkits` is the currently-active, sanitised slug list that should + /// drive orchestrator delegation schema rebuilds. + ComposioIntegrationsChanged { toolkits: Vec }, /// A Composio action was executed (success or failure) via the backend. ComposioActionExecuted { tool: String, @@ -795,6 +800,7 @@ impl DomainEvent { Self::ComposioTriggerReceived { .. } | Self::ComposioConnectionCreated { .. } | Self::ComposioConnectionDeleted { .. } + | Self::ComposioIntegrationsChanged { .. } | Self::ComposioActionExecuted { .. } | Self::ComposioConfigChanged { .. } => "composio", @@ -895,6 +901,7 @@ impl DomainEvent { Self::ComposioTriggerReceived { .. } => "ComposioTriggerReceived", Self::ComposioConnectionCreated { .. } => "ComposioConnectionCreated", Self::ComposioConnectionDeleted { .. } => "ComposioConnectionDeleted", + Self::ComposioIntegrationsChanged { .. } => "ComposioIntegrationsChanged", Self::ComposioActionExecuted { .. } => "ComposioActionExecuted", Self::ComposioConfigChanged { .. } => "ComposioConfigChanged", Self::TriggerEvaluated { .. } => "TriggerEvaluated", diff --git a/src/core/event_bus/events_tests.rs b/src/core/event_bus/events_tests.rs index 96d5d0e91d..3c1dc6a0f5 100644 --- a/src/core/event_bus/events_tests.rs +++ b/src/core/event_bus/events_tests.rs @@ -312,6 +312,12 @@ fn all_variants_have_correct_domain() { }, "composio", ), + ( + DomainEvent::ComposioIntegrationsChanged { + toolkits: vec!["gmail".into(), "notion".into()], + }, + "composio", + ), ( DomainEvent::ComposioConfigChanged { mode: "direct".into(), diff --git a/src/openhuman/agent/harness/engine/core.rs b/src/openhuman/agent/harness/engine/core.rs index 5588fd5001..a7e30ce1ef 100644 --- a/src/openhuman/agent/harness/engine/core.rs +++ b/src/openhuman/agent/harness/engine/core.rs @@ -189,7 +189,7 @@ pub(crate) async fn run_turn_engine( } // Caller-specific pre-dispatch work (e.g. Agent's ContextManager). - observer.before_dispatch(history, iteration).await?; + observer.before_dispatch(history, tools, iteration).await?; tracing::debug!(iteration, "[agent_loop] sending LLM request"); let image_marker_count = multimodal::count_image_markers(history); diff --git a/src/openhuman/agent/harness/engine/state.rs b/src/openhuman/agent/harness/engine/state.rs index 359c6fc5b7..c7efae1896 100644 --- a/src/openhuman/agent/harness/engine/state.rs +++ b/src/openhuman/agent/harness/engine/state.rs @@ -18,6 +18,7 @@ use anyhow::Result; use async_trait::async_trait; +use super::tool_source::ToolSource; use crate::openhuman::agent::harness::parse::ParsedToolCall; use crate::openhuman::inference::provider::{ChatMessage, ToolCall, UsageInfo}; @@ -29,6 +30,7 @@ pub(crate) trait TurnObserver: Send { async fn before_dispatch( &mut self, _history: &mut Vec, + _tools: &mut dyn ToolSource, _iteration: usize, ) -> Result<()> { Ok(()) diff --git a/src/openhuman/agent/harness/engine/tool_source.rs b/src/openhuman/agent/harness/engine/tool_source.rs index 0e35a5879b..328ae36ae4 100644 --- a/src/openhuman/agent/harness/engine/tool_source.rs +++ b/src/openhuman/agent/harness/engine/tool_source.rs @@ -17,6 +17,7 @@ //! subagent and `Agent` impls land in later phases. use std::collections::HashSet; +use std::sync::Arc; use async_trait::async_trait; @@ -24,6 +25,7 @@ use super::super::payload_summarizer::PayloadSummarizer; use super::progress::ProgressReporter; use super::{run_one_tool, ToolRunResult}; use crate::openhuman::agent::harness::parse::ParsedToolCall; +use crate::openhuman::agent_tool_policy::ToolPolicySession; use crate::openhuman::tools::policy::ToolPolicy; use crate::openhuman::tools::{Tool, ToolSpec}; @@ -45,6 +47,22 @@ pub(crate) trait ToolSource: Send { progress: &dyn ProgressReporter, progress_call_id: &str, ) -> ToolRunResult; + + /// Replace the caller-specific runtime snapshot after a dynamic refresh. + /// Default no-op for non-agent callers. + #[allow(clippy::too_many_arguments)] + fn sync_agent_surface( + &mut self, + _tools: Arc>>, + _visible_tool_names: HashSet, + _tool_policy_session: ToolPolicySession, + _payload_summarizer: Option>, + _prefer_markdown: bool, + _budget_bytes: usize, + _should_send_specs: bool, + _advertised_specs: Vec, + ) { + } } /// The channel/CLI/triage tool source: a persistent `registry`, optional diff --git a/src/openhuman/agent/harness/session/builder.rs b/src/openhuman/agent/harness/session/builder.rs index 9c13ead4ca..fda8f74551 100644 --- a/src/openhuman/agent/harness/session/builder.rs +++ b/src/openhuman/agent/harness/session/builder.rs @@ -613,8 +613,12 @@ impl AgentBuilder { Arc::new(crate::openhuman::agent::tool_policy::AllowAllToolPolicy) }), last_seen_integrations_hash: 0, + composio_integrations_rx: None, + announced_integrations: std::collections::HashSet::new(), + pending_integration_announcement: Vec::new(), archivist_hook: self.archivist_hook, synthesized_tool_names: std::collections::HashSet::new(), + pending_synthesized_tools_mask: std::collections::HashSet::new(), }) } } diff --git a/src/openhuman/agent/harness/session/tests.rs b/src/openhuman/agent/harness/session/tests.rs index e570f45923..dcf3581e67 100644 --- a/src/openhuman/agent/harness/session/tests.rs +++ b/src/openhuman/agent/harness/session/tests.rs @@ -6,6 +6,7 @@ //! (`MockProvider`, `RecordingProvider`, `MockTool`) are defined here. use super::types::{Agent, AgentBuilder}; +use crate::core::event_bus::{init_global, publish_global, DomainEvent}; use crate::openhuman::agent::dispatcher::{NativeToolDispatcher, XmlToolDispatcher}; use crate::openhuman::inference::provider::{ChatRequest, ConversationMessage, Provider}; use crate::openhuman::memory::Memory; @@ -174,6 +175,22 @@ fn build_minimal_agent_with_definition_name(definition_name: Option<&str>) -> Ag builder.build().expect("minimal agent build should succeed") } +fn integration_delegate_toolkit_enum(agent: &Agent) -> Vec { + let spec = agent + .tool_specs() + .iter() + .find(|spec| spec.name == "delegate_to_integrations_agent") + .expect("delegate_to_integrations_agent tool spec should be present"); + let mut out: Vec = spec.parameters["properties"]["toolkit"]["enum"] + .as_array() + .expect("toolkit enum should be an array") + .iter() + .filter_map(|v| v.as_str().map(ToString::to_string)) + .collect(); + out.sort(); + out +} + /// Regression test for the `build_session_agent_inner` agent-id /// threading bug. /// @@ -256,6 +273,141 @@ fn set_connected_integrations_marks_session_initialized_and_updates_hash() { ); } +#[test] +fn refresh_delegation_tools_updates_schema_even_when_tool_arc_is_shared() { + use crate::openhuman::agent::harness::AgentDefinitionRegistry; + + AgentDefinitionRegistry::init_global_builtins().unwrap(); + let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator")); + agent.set_connected_integrations(vec![ + crate::openhuman::context::prompt::ConnectedIntegration { + toolkit: "gmail".into(), + description: "Email".into(), + tools: vec![], + gated_tools: vec![], + connected: true, + non_active_status: None, + }, + ]); + + assert!(agent.refresh_delegation_tools()); + assert_eq!( + integration_delegate_toolkit_enum(&agent), + vec!["gmail".to_string()] + ); + + // Simulate an in-flight turn holding a shared Arc clone. + let _shared_tools = agent.tools_arc(); + agent.set_connected_integrations(vec![ + crate::openhuman::context::prompt::ConnectedIntegration { + toolkit: "gmail".into(), + description: "Email".into(), + tools: vec![], + gated_tools: vec![], + connected: true, + non_active_status: None, + }, + crate::openhuman::context::prompt::ConnectedIntegration { + toolkit: "notion".into(), + description: "Docs".into(), + tools: vec![], + gated_tools: vec![], + connected: true, + non_active_status: None, + }, + ]); + + assert!(agent.refresh_delegation_tools()); + assert_eq!( + integration_delegate_toolkit_enum(&agent), + vec!["gmail".to_string(), "notion".to_string()] + ); +} + +/// Regression for #3044: repeated mid-session connects while the `tools` +/// Arc stays shared (the normal `before_dispatch` path, where +/// `AgentToolSource` holds a clone) must not accumulate duplicate +/// synthesised `ToolSpec`s. +/// +/// Before the fix, a failed `tools` reconcile rolled `synthesized_tool_names` +/// back to the *old* mask. On the next refresh the spec `retain` used that +/// stale mask and failed to drop the intervening refresh's specs, so the +/// synthesised delegate spec piled up once per connect. +#[test] +fn refresh_delegation_tools_no_duplicate_specs_across_shared_arc_connects() { + use crate::openhuman::agent::harness::AgentDefinitionRegistry; + + AgentDefinitionRegistry::init_global_builtins().unwrap(); + let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator")); + + let conn = |slug: &str, desc: &str| crate::openhuman::context::prompt::ConnectedIntegration { + toolkit: slug.into(), + description: desc.into(), + tools: vec![], + gated_tools: vec![], + connected: true, + non_active_status: None, + }; + + let delegate_spec_count = |agent: &Agent| -> usize { + agent + .tool_specs() + .iter() + .filter(|s| s.name == "delegate_to_integrations_agent") + .count() + }; + + // Turn 1: gmail connects. + agent.set_connected_integrations(vec![conn("gmail", "Email")]); + assert!(agent.refresh_delegation_tools()); + + // Hold a shared clone across every subsequent refresh so `Arc::get_mut` + // always fails — exactly what happens during an in-flight turn. + let _shared_tools = agent.tools_arc(); + + // Turn 2: notion connects mid-session. + agent.set_connected_integrations(vec![conn("gmail", "Email"), conn("notion", "Docs")]); + assert!(agent.refresh_delegation_tools()); + + // Turn 3: slack connects mid-session — this is where the old code + // produced a duplicate `delegate_to_integrations_agent` spec. + agent.set_connected_integrations(vec![ + conn("gmail", "Email"), + conn("notion", "Docs"), + conn("slack", "Chat"), + ]); + assert!(agent.refresh_delegation_tools()); + + assert_eq!( + delegate_spec_count(&agent), + 1, + "exactly one synthesised delegate spec must remain after repeated shared-Arc connects" + ); + assert_eq!( + integration_delegate_toolkit_enum(&agent), + vec![ + "gmail".to_string(), + "notion".to_string(), + "slack".to_string() + ] + ); +} + +#[test] +fn composio_listener_drains_integrations_changed_events() { + let _ = init_global(64); + let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator")); + agent.ensure_composio_integrations_listener(); + publish_global(DomainEvent::ComposioIntegrationsChanged { + toolkits: vec!["gmail".into()], + }); + assert!(agent.drain_composio_integrations_changed_events()); + assert!( + !agent.drain_composio_integrations_changed_events(), + "event queue should be drained after one pass" + ); +} + #[tokio::test] async fn turn_without_tools_returns_text() { let workspace = tempfile::TempDir::new().expect("temp workspace"); diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index 32c73bcc48..b8ca29a6de 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -98,6 +98,45 @@ fn normalize_tool_call<'a>(call: &'a ParsedToolCall) -> Cow<'a, ParsedToolCall> }) } +/// Compute the one-shot mid-session connect announcement. +/// +/// Given the toolkit slugs currently connected and the set of slugs already +/// announced to the model this session, returns a natural-language note for +/// any genuinely-new slugs (and records them in `announced` so they are never +/// re-announced). Returns `None` when nothing new connected. +/// +/// Kept as a free function (no `&self`) so the delta logic is unit-testable +/// without standing up a full `Agent` — see `turn_tests.rs`. +/// Returns the toolkit slugs in `connected` that have not yet been announced +/// this session, marking them announced. Empty when nothing is new. +fn newly_connected_slugs( + connected: &[String], + announced: &mut std::collections::HashSet, +) -> Vec { + let newly: Vec = connected + .iter() + .filter(|slug| !announced.contains(*slug)) + .cloned() + .collect(); + for slug in &newly { + announced.insert(slug.clone()); + } + newly +} + +/// Render the one-shot user-turn note for a set of freshly-connected slugs. +/// Empty input yields `None`. +fn integration_announcement_note(slugs: &[String]) -> Option { + if slugs.is_empty() { + return None; + } + Some(format!( + "[integration update] These integration(s) connected during this conversation and are available right now: {}. \ +Use delegate_to_integrations_agent with the matching toolkit slug to act on them immediately — do not tell the user to reconnect or restart.", + slugs.join(", ") + )) +} + impl Agent { /// Executes a single interaction "turn" with the agent. /// @@ -129,6 +168,7 @@ impl Agent { self.history.len(), self.config.max_tool_iterations ); + self.ensure_composio_integrations_listener(); // ── Session transcript resume ───────────────────────────────── // On a fresh session (empty history), look for a previous // transcript to pre-populate the exact provider messages for @@ -188,6 +228,13 @@ impl Agent { // Subsequent turns short-circuit unless this hash changes. self.last_seen_integrations_hash = crate::openhuman::composio::connected_set_hash(&self.connected_integrations); + // Seed the announced set with the startup connected toolkits so + // only genuinely-new mid-session connects get announced later. + self.announced_integrations = self + .connected_integrations + .iter() + .map(|i| i.toolkit.clone()) + .collect(); } else { // Deliberately do NOT rebuild the system prompt on subsequent // turns. The rendered prompt is the KV-cache prefix the inference @@ -226,41 +273,7 @@ impl Agent { // runtime `Config` snapshot directly, so this read avoids the // old `Config::load_or_init()` round-trip on every turn. // - if let Some(cfg) = self.integration_runtime_config.as_ref() { - if let Some(cache_view) = - crate::openhuman::composio::cached_active_integrations(cfg) - { - let new_hash = crate::openhuman::composio::connected_set_hash(&cache_view); - if new_hash != self.last_seen_integrations_hash { - log::info!( - "[agent_loop] connection set changed mid-session (hash {:x} -> {:x}); refreshing tool schema (system prompt left intact for KV cache)", - self.last_seen_integrations_hash, - new_hash - ); - // Snapshot the previous integration list so we - // can roll back if `refresh_delegation_tools` - // bails on a shared `Arc` — otherwise the - // agent's `connected_integrations` and its - // schema would disagree until the next - // event-driven refresh, and the - // `last_seen_integrations_hash` advance below - // would suppress retries. - let prev_integrations = - std::mem::replace(&mut self.connected_integrations, cache_view); - if self.refresh_delegation_tools() { - self.last_seen_integrations_hash = new_hash; - self.connected_integrations_initialized = true; - } else { - // Reconcile aborted (shared Arc) — restore - // the previous integration list so the - // next turn re-detects the same change and - // retries cleanly. We deliberately do NOT - // advance `last_seen_integrations_hash`. - self.connected_integrations = prev_integrations; - } - } - } - } + let _ = self.refresh_delegation_tools_from_cached_integrations("turn-boundary"); // Cache empty/expired or config unavailable => no signal. // We leave the current tool surface alone and pick up any // real change on the next turn after the UI's 5 s poll has @@ -458,6 +471,17 @@ impl Agent { } }; + // Consume any one-shot mid-session connect announcement parked by + // `refresh_delegation_tools_from_cached_integrations`. It rides on the + // user turn (NOT a system message — `trim_history` hoists system + // messages to the front and would bust the KV-cache prefix) and + // `.take()` clears it so it fires exactly once. + let pending_slugs = std::mem::take(&mut self.pending_integration_announcement); + let enriched = match integration_announcement_note(&pending_slugs) { + Some(note) => format!("{note}\n\n{enriched}"), + None => enriched, + }; + self.history .push(ConversationMessage::Chat(ChatMessage::user(enriched))); @@ -1122,6 +1146,119 @@ impl Agent { self.connected_integrations_initialized = true; } + /// Lazily attach this session to the global event bus so it can + /// observe `ComposioIntegrationsChanged` notifications. + pub(super) fn ensure_composio_integrations_listener(&mut self) { + if self.composio_integrations_rx.is_some() { + return; + } + if let Some(bus) = crate::core::event_bus::global() { + self.composio_integrations_rx = Some(bus.raw_receiver()); + log::debug!( + "[agent_loop] armed composio integrations listener for session='{}'", + self.event_session_id + ); + } + } + + /// Drain pending `ComposioIntegrationsChanged` events. + /// + /// Returns `true` when we observed at least one relevant event (or lag) and + /// should re-check cached integrations before the next provider call. + pub(super) fn drain_composio_integrations_changed_events(&mut self) -> bool { + self.ensure_composio_integrations_listener(); + let Some(rx) = self.composio_integrations_rx.as_mut() else { + return false; + }; + use tokio::sync::broadcast::error::TryRecvError; + + let mut saw_signal = false; + let mut closed = false; + loop { + match rx.try_recv() { + Ok(crate::core::event_bus::DomainEvent::ComposioIntegrationsChanged { + toolkits, + }) => { + saw_signal = true; + log::info!( + "[agent_loop] received composio integrations changed event (active_toolkits={:?})", + toolkits + ); + } + Ok(_) => {} + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Lagged(skipped)) => { + saw_signal = true; + log::warn!( + "[agent_loop] composio integrations listener lagged by {} event(s); forcing cache re-check", + skipped + ); + } + Err(TryRecvError::Closed) => { + closed = true; + break; + } + } + } + if closed { + self.composio_integrations_rx = None; + } + saw_signal + } + + /// Reconcile the session's delegation schema against the latest cached + /// integrations snapshot. Returns `true` only when a refresh applied. + pub(super) fn refresh_delegation_tools_from_cached_integrations( + &mut self, + trigger: &str, + ) -> bool { + let Some(cfg) = self.integration_runtime_config.as_ref() else { + return false; + }; + let Some(cache_view) = crate::openhuman::composio::cached_active_integrations(cfg) else { + return false; + }; + + let new_hash = crate::openhuman::composio::connected_set_hash(&cache_view); + if new_hash == self.last_seen_integrations_hash { + return false; + } + + log::info!( + "[agent_loop] composio set changed ({trigger}) hash {:x} -> {:x}; refreshing delegation schema (system prompt unchanged for KV cache)", + self.last_seen_integrations_hash, + new_hash + ); + + let prev_integrations = std::mem::replace(&mut self.connected_integrations, cache_view); + if self.refresh_delegation_tools() { + self.last_seen_integrations_hash = new_hash; + self.connected_integrations_initialized = true; + // Surface newly-connected toolkits onto the next user message so + // the model acts on them on the FIRST post-connect ask instead of + // refusing from stale chat context. Schema-only refresh already + // updated the enum; this closes the prose/decision gap. + let connected_slugs: Vec = self + .connected_integrations + .iter() + .map(|i| i.toolkit.clone()) + .collect(); + // Append (don't overwrite) so a second connect before the next + // user turn doesn't drop the first one's announcement. Slugs are + // already de-duped against `announced_integrations`, but guard the + // pending list too in case the same slug is re-queued. + for slug in newly_connected_slugs(&connected_slugs, &mut self.announced_integrations) { + if !self.pending_integration_announcement.contains(&slug) { + self.pending_integration_announcement.push(slug); + } + } + true + } else { + self.connected_integrations = prev_integrations; + false + } + } + /// Re-synthesise `delegate_*` tools for the orchestrator's `subagents` /// declaration using the live `connected_integrations` slice, and /// reconcile the resulting set into `self.tools` / `self.tool_specs` / @@ -1156,23 +1293,17 @@ impl Agent { /// [`Self::last_seen_integrations_hash`] vs. /// [`crate::openhuman::composio::cached_active_integrations`]). /// - /// **Atomicity**: when `Arc::get_mut` fails (a sub-agent or other - /// caller has already captured a clone of the tool list), we restore - /// the previous `synthesized_tool_names` and bail. The next refresh - /// attempt will re-apply the full transition cleanly rather than - /// resuming from a partial state. This should never happen on a turn - /// boundary in production — sub-agents always drop their snapshots - /// before the parent's next turn — but it's defended against anyway. + /// **Shared-Arc behavior**: when `self.tools` is currently shared + /// (e.g. an in-flight turn cloned the Arc into its tool source), we + /// still refresh `self.tool_specs` / `self.visible_tool_specs` so the + /// provider-facing schema updates immediately. The executable tool + /// registry is refreshed only when `self.tools` has unique ownership. + /// This keeps same-turn routing unblocked while preserving ownership + /// safety for non-cloneable `Box` values. /// - /// **Return value** — `true` when the agent's tool surface is now - /// consistent with `self.connected_integrations` (either because a - /// successful reconcile applied, or because no reconcile was needed). - /// `false` only when the Arc was shared and the reconcile was - /// aborted; callers should treat this as "retry next turn" and - /// **not** advance any signal they use to gate future refreshes - /// (e.g. `last_seen_integrations_hash`) — otherwise a one-shot - /// shared-Arc collision could suppress further reconciliation until - /// another integration event happened to bump the hash again. + /// **Return value** — `true` when schema reconciliation succeeded (or + /// no reconcile was needed). Returns `false` only when a non-shared + /// reconcile path failed unexpectedly. pub fn refresh_delegation_tools(&mut self) -> bool { use crate::openhuman::agent::harness::definition::AgentDefinitionRegistry; use crate::openhuman::tools::orchestrator_tools::collect_orchestrator_tools; @@ -1201,40 +1332,58 @@ impl Agent { let synthed_specs: Vec = synthed.iter().map(|t| t.spec()).collect(); - // Skip the Arc mutation entirely when neither the previous nor - // the next synthesis produced any names — saves an - // `Arc::get_mut` probe on agents that don't declare `subagents`. + // Skip mutation when neither the previous nor the next synthesis + // produced any names — saves work on agents without dynamic + // delegation. if self.synthesized_tool_names.is_empty() && synthed_names.is_empty() { return true; } - // Mask used to drop the previous synthesis. We `take` it now so - // the restore-on-failure path below can put it back if the Arc - // turns out to be shared. + // Mask of the previous synthesis — the names whose `tool_specs` are + // currently live (this set is kept in lock-step with `tool_specs`). let old_synth = std::mem::take(&mut self.synthesized_tool_names); - match ( - Arc::get_mut(&mut self.tools), - Arc::get_mut(&mut self.tool_specs), - ) { - (Some(tools_vec), Some(specs_vec)) => { - tools_vec.retain(|t| !old_synth.contains(t.name())); - specs_vec.retain(|s| !old_synth.contains(&s.name)); - tools_vec.extend(synthed); - specs_vec.extend(synthed_specs); - } - _ => { - log::warn!( - "[agent] refresh_delegation_tools: tools/tool_specs Arc is shared — \ - cannot reconcile delegation surface (would have produced {} synthesised tool(s)). \ - Restoring previous synthesized_tool_names so the next refresh retries cleanly.", - synthed_names.len() - ); - self.synthesized_tool_names = old_synth; - return false; - } + // `tool_specs` are plain data and therefore cloneable; we can always + // reconcile schema even when the Arc is shared. Drop exactly the + // previous synthesised spec set, then append the fresh one. + { + let specs_vec = Arc::make_mut(&mut self.tool_specs); + specs_vec.retain(|s| !old_synth.contains(&s.name)); + specs_vec.extend(synthed_specs); } + // `tools` contains non-cloneable trait objects. Reconcile it only when + // uniquely owned. The set of stale synthesised *instances* to drop is + // the previous synthesis (`old_synth`) plus any instances a prior + // shared-Arc refresh couldn't remove (`pending_synthesized_tools_mask`). + let tools_remove_mask: std::collections::HashSet = old_synth + .iter() + .chain(self.pending_synthesized_tools_mask.iter()) + .cloned() + .collect(); + let tools_reconciled = if let Some(tools_vec) = Arc::get_mut(&mut self.tools) { + tools_vec.retain(|t| !tools_remove_mask.contains(t.name())); + tools_vec.extend(synthed); + // `tools` now matches `tool_specs` exactly — nothing pending. + self.pending_synthesized_tools_mask.clear(); + true + } else { + // Schema (`tool_specs`) was updated to the new set, but the stale + // tool *instances* still sit in `self.tools`. Record their names + // so the next unique-owner refresh removes them. Crucially we do + // NOT roll `synthesized_tool_names` back to `old_synth` here — that + // would desync it from `tool_specs` and cause duplicate specs on + // the following refresh (#3044). + self.pending_synthesized_tools_mask = tools_remove_mask; + log::warn!( + "[agent] refresh_delegation_tools: tools Arc is shared — refreshed schema only \ + ({} synthesised tool name(s)); {} stale tool instance(s) pending removal on the next unique-owner refresh", + synthed_names.len(), + self.pending_synthesized_tools_mask.len() + ); + false + }; + // `visible_tool_names` carries an explicit allowlist for // [`ToolScope::Named`] agents. Drop the previously-synthesised // names and add the new ones so the visible set tracks the @@ -1273,15 +1422,20 @@ impl Agent { .cloned() .collect(); - self.synthesized_tool_names = synthed_names; + // `tool_specs` always reconciled to the new set, so the name mask must + // track that set unconditionally — whether or not `tools` (the + // executable instances) could be reconciled this pass. + self.synthesized_tool_names = synthed_names.clone(); log::info!( - "[agent] refresh_delegation_tools: reconciled delegation surface for agent '{}' (display='{}'); now {} synthesised tool(s); added={:?} removed={:?}", + "[agent] refresh_delegation_tools: reconciled delegation schema for agent '{}' (display='{}'); now {} synthesised tool name(s); added={:?} removed={:?} tools_reconciled={} pending_tool_instances={}", self.agent_definition_id, self.agent_definition_name, - self.synthesized_tool_names.len(), + synthed_names.len(), added, - removed + removed, + tools_reconciled, + self.pending_synthesized_tools_mask.len() ); true } diff --git a/src/openhuman/agent/harness/session/turn_engine_adapter.rs b/src/openhuman/agent/harness/session/turn_engine_adapter.rs index 03dd8e67d9..996d2394bf 100644 --- a/src/openhuman/agent/harness/session/turn_engine_adapter.rs +++ b/src/openhuman/agent/harness/session/turn_engine_adapter.rs @@ -144,6 +144,27 @@ impl ToolSource for AgentToolSource { success: exec_result.success, } } + + fn sync_agent_surface( + &mut self, + tools: Arc>>, + visible_tool_names: HashSet, + tool_policy_session: ToolPolicySession, + payload_summarizer: Option>, + prefer_markdown: bool, + budget_bytes: usize, + should_send_specs: bool, + advertised_specs: Vec, + ) { + self.tools = tools; + self.visible_tool_names = visible_tool_names; + self.tool_policy_session = tool_policy_session; + self.payload_summarizer = payload_summarizer; + self.prefer_markdown = prefer_markdown; + self.budget_bytes = budget_bytes; + self.should_send_specs = should_send_specs; + self.advertised_specs = advertised_specs; + } } /// Turn observer for `Agent::turn`: owns the typed-history rebuild, context @@ -189,8 +210,32 @@ impl TurnObserver for AgentObserver<'_> { async fn before_dispatch( &mut self, buf: &mut Vec, - _iteration: usize, + tools: &mut dyn crate::openhuman::agent::harness::engine::ToolSource, + iteration: usize, ) -> Result<()> { + if self.agent.drain_composio_integrations_changed_events() { + let refreshed = self + .agent + .refresh_delegation_tools_from_cached_integrations("event"); + if refreshed { + log::debug!( + "[agent_loop] midturn:resync-delegation-tools — composio integrations changed; resyncing tool surface (iteration={} visible_tools={})", + iteration, + self.agent.visible_tool_names.len() + ); + tools.sync_agent_surface( + Arc::clone(&self.agent.tools), + self.agent.visible_tool_names.clone(), + self.agent.tool_policy_session.clone(), + self.agent.payload_summarizer.clone(), + self.agent.context.prefer_markdown_tool_output(), + self.agent.context.tool_result_budget_bytes(), + self.agent.tool_dispatcher.should_send_tool_specs(), + self.agent.visible_tool_specs.as_ref().clone(), + ); + } + } + // Pre-dispatch token-budget trim on the typed history. if let Some(context_window) = context_window_for_model(&self.effective_model) { super::super::token_budget::trim_conversation_history_to_budget( diff --git a/src/openhuman/agent/harness/session/turn_tests.rs b/src/openhuman/agent/harness/session/turn_tests.rs index fe740cc5ad..fd0f702af3 100644 --- a/src/openhuman/agent/harness/session/turn_tests.rs +++ b/src/openhuman/agent/harness/session/turn_tests.rs @@ -1622,3 +1622,85 @@ fn bound_cached_transcript_messages_strips_multiple_trailing_envelopes() { "all trailing tool_calls envelopes must be stripped" ); } + +#[test] +fn integration_announcement_fires_once_for_new_toolkit() { + // Seed the announced set with the startup-connected toolkit, mirroring the + // turn-1 seed in `run_turn`. + let mut announced: HashSet = HashSet::new(); + announced.insert("gmail".to_string()); + + // A mid-session connect adds `slack`: it should be announced, and recorded + // so it never re-announces. + let connected = vec!["gmail".to_string(), "slack".to_string()]; + let newly = newly_connected_slugs(&connected, &mut announced); + assert_eq!(newly, vec!["slack".to_string()]); + let note = integration_announcement_note(&newly) + .expect("a newly-connected toolkit must produce an announcement"); + assert!( + note.contains("slack"), + "announcement must name the new toolkit slug, got: {note}" + ); + assert!( + !note.contains("gmail"), + "already-announced toolkit must not be re-announced, got: {note}" + ); + assert!( + announced.contains("slack"), + "the new slug must be recorded as announced" + ); + + // A second refresh with the identical connected set parks nothing — every + // slug is now in `announced`. + let second = newly_connected_slugs(&connected, &mut announced); + assert!( + second.is_empty(), + "an unchanged connected set must not re-surface a slug, got: {second:?}" + ); + assert!(integration_announcement_note(&second).is_none()); +} + +#[test] +fn integration_announcement_accumulates_two_connects_in_one_note() { + // Two mid-session connects between consecutive user turns must BOTH be + // announced — the second must not overwrite the first (#3044 regression: + // the old `Option` field dropped the earlier note). + let mut announced: HashSet = HashSet::new(); + announced.insert("gmail".to_string()); + let mut pending: Vec = Vec::new(); + + // First connect: notion. + for slug in newly_connected_slugs(&["gmail".to_string(), "notion".to_string()], &mut announced) + { + if !pending.contains(&slug) { + pending.push(slug); + } + } + // Second connect before the user turn: slack. + for slug in newly_connected_slugs( + &[ + "gmail".to_string(), + "notion".to_string(), + "slack".to_string(), + ], + &mut announced, + ) { + if !pending.contains(&slug) { + pending.push(slug); + } + } + + let note = integration_announcement_note(&pending).expect("two connects must produce a note"); + assert!( + note.contains("notion"), + "first connect must survive: {note}" + ); + assert!( + note.contains("slack"), + "second connect must be present: {note}" + ); + assert!( + !note.contains("gmail"), + "startup slug must not re-announce: {note}" + ); +} diff --git a/src/openhuman/agent/harness/session/types.rs b/src/openhuman/agent/harness/session/types.rs index da821f5a6f..43d7875ca2 100644 --- a/src/openhuman/agent/harness/session/types.rs +++ b/src/openhuman/agent/harness/session/types.rs @@ -190,6 +190,29 @@ pub struct Agent { /// dormant on session startup and only fires when integrations /// actually change mid-conversation. pub(super) last_seen_integrations_hash: u64, + /// Per-session raw receiver for `DomainEvent::ComposioIntegrationsChanged`. + /// Armed lazily on first turn when the global event bus is available. + /// Drained before each provider dispatch so a connection that flips to + /// ACTIVE mid-turn can refresh the delegation schema in the same thread. + pub(super) composio_integrations_rx: + Option>, + /// Toolkit slugs already surfaced to the model as freshly-connected + /// this session. Seeded at turn 1 with the startup connected set, then + /// extended whenever a mid-session connect is announced — so each new + /// toolkit is announced exactly once, never re-announced per turn. + pub(super) announced_integrations: std::collections::HashSet, + /// Toolkit slugs that connected mid-session and still need announcing on + /// the next user message ("X connected this session, use it now"). Parked + /// by `refresh_delegation_tools_from_cached_integrations` and rendered + + /// cleared when the next user message is built — the note rides on the + /// user turn (NOT the system prompt) so the KV-cache prefix stays + /// byte-identical. + /// + /// Accumulated as a list (not a single rendered string) so two connects + /// between consecutive user turns both surface: a second connect appends + /// its slug instead of overwriting the first's note. Order-preserving + + /// de-duped on insert. + pub(super) pending_integration_announcement: Vec, /// Optional reference to the `ArchivistHook` registered in /// `post_turn_hooks`. Kept separately so the turn loop can call /// `flush_open_segment` at session-memory-extraction time (the @@ -209,7 +232,28 @@ pub struct Agent { /// /// Populated by `refresh_delegation_tools` itself; empty at /// construction time. + /// + /// Invariant: this tracks the names whose **`tool_specs`** are currently + /// live. `tool_specs` reconcile on every refresh (they're cloneable + /// data), so this set always equals the most recent synthesised set — + /// even when the executable `tools` Vec could not be reconciled because + /// its `Arc` was shared. Removing stale `tools` entries is tracked + /// separately by [`Self::pending_synthesized_tools_mask`]. pub(super) synthesized_tool_names: std::collections::HashSet, + /// Names of synthesised tool *instances* still present in [`Agent::tools`] + /// that a future unique-owner refresh must drop. + /// + /// When `refresh_delegation_tools` updates `tool_specs` but cannot + /// reconcile `tools` (the `Arc` is shared — the normal case while + /// `AgentToolSource` holds a clone during `before_dispatch`), the + /// previously-synthesised tool objects remain in `tools`. Their names are + /// accumulated here so the next refresh that *does* own `tools` uniquely + /// removes them — instead of overloading `synthesized_tool_names` (which + /// must stay in sync with `tool_specs`) and corrupting the spec + /// reconciliation on the following refresh (duplicate `ToolSpec`s, #3044). + /// + /// Empty at construction time and whenever `tools` is fully reconciled. + pub(super) pending_synthesized_tools_mask: std::collections::HashSet, } /// A builder for creating `Agent` instances with custom configuration. diff --git a/src/openhuman/agent_orchestration/tools/skill_delegation.rs b/src/openhuman/agent_orchestration/tools/skill_delegation.rs index 08e082cd67..05d497c494 100644 --- a/src/openhuman/agent_orchestration/tools/skill_delegation.rs +++ b/src/openhuman/agent_orchestration/tools/skill_delegation.rs @@ -80,6 +80,73 @@ fn build_description(connected: &[(String, String)]) -> String { buf } +/// Test-only override for the live status fetch. When set, the live re-check +/// returns this value instead of touching `Config::load_or_init` / +/// `fetch_connected_integrations_status` — which would otherwise read the host +/// machine's login/config state and could hit the Composio backend over HTTP, +/// making the reject-path unit tests environment-dependent. `Some(None)` +/// forces the "Unavailable" outcome (no live data); `Some(Some(vec))` injects +/// a deterministic connected set. +#[cfg(test)] +thread_local! { + static LIVE_FETCH_OVERRIDE: std::cell::RefCell>>> = + const { std::cell::RefCell::new(None) }; +} + +#[cfg(test)] +fn set_live_fetch_override(value: Option>) { + LIVE_FETCH_OVERRIDE.with(|o| *o.borrow_mut() = Some(value)); +} + +#[cfg(test)] +fn clear_live_fetch_override() { + LIVE_FETCH_OVERRIDE.with(|o| *o.borrow_mut() = None); +} + +async fn fetch_live_connected_toolkit_slugs_once() -> Option> { + #[cfg(test)] + { + if let Some(injected) = LIVE_FETCH_OVERRIDE.with(|o| o.borrow().clone()) { + return injected; + } + } + let config = crate::openhuman::config::Config::load_or_init() + .await + .ok()?; + match crate::openhuman::composio::fetch_connected_integrations_status(&config).await { + crate::openhuman::composio::FetchConnectedIntegrationsStatus::Authoritative(entries) => { + let mut toolkits: Vec = entries + .into_iter() + .filter(|entry| entry.connected) + .map(|entry| sanitise_slug(&entry.toolkit)) + .collect(); + toolkits.sort(); + toolkits.dedup(); + Some(toolkits) + } + crate::openhuman::composio::FetchConnectedIntegrationsStatus::Unavailable => None, + } +} + +fn resolve_connected_toolkits( + snapshot: &[(String, String)], + slug: &str, + live_connected: Option<&[String]>, +) -> (bool, Vec) { + if snapshot.iter().any(|(known_slug, _)| known_slug == slug) { + let allowed = snapshot.iter().map(|(slug, _)| slug.clone()).collect(); + return (true, allowed); + } + if let Some(live) = live_connected { + let known = live.iter().any(|s| s == slug); + return (known, live.to_vec()); + } + ( + false, + snapshot.iter().map(|(slug, _)| slug.clone()).collect(), + ) +} + #[async_trait] impl Tool for SkillDelegationTool { fn name(&self) -> &str { @@ -153,16 +220,27 @@ impl Tool for SkillDelegationTool { ))); } let slug = sanitise_slug(&raw_toolkit); - let known = self + let mut live_connected: Option> = None; + let mut known = self .connected_toolkits .iter() .any(|(known_slug, _)| known_slug == &slug); if !known { - let allowed: Vec<&str> = self - .connected_toolkits - .iter() - .map(|(slug, _)| slug.as_str()) - .collect(); + // Safety net for same-thread OAuth races: do one live status + // refresh before rejecting an unknown toolkit, mirroring the + // spawn_subagent integrations pre-flight. + live_connected = fetch_live_connected_toolkit_slugs_once().await; + } + let (known_after_recheck, allowed) = + resolve_connected_toolkits(&self.connected_toolkits, &slug, live_connected.as_deref()); + if known_after_recheck && !known { + log::info!( + "[skill-delegation] toolkit '{}' accepted after live re-check (session schema stale)", + slug + ); + } + known = known_after_recheck; + if !known { log::debug!( "[skill-delegation] reject: toolkit '{}' (sanitised='{}') not in connected set {:?}", raw_toolkit, @@ -282,6 +360,10 @@ mod tests { #[tokio::test] async fn execute_rejects_unknown_toolkit_with_allowed_list() { + // Force the live re-check to return "Unavailable" so the test never + // reads host config or reaches the Composio backend — the reject must + // come purely from the in-memory snapshot (gmail/notion, no slack). + set_live_fetch_override(None); let tool = SkillDelegationTool::for_connected(vec![ ("gmail".to_string(), "Email.".to_string()), ("notion".to_string(), "Docs.".to_string()), @@ -291,6 +373,7 @@ mod tests { .execute(json!({"toolkit": "slack", "prompt": "hi"})) .await .unwrap(); + clear_live_fetch_override(); assert!(result.is_error); let body = result.output(); assert!(body.contains("slack")); @@ -315,6 +398,10 @@ mod tests { async fn execute_normalises_toolkit_input_before_matching() { // Mixed-case + odd-character user input must collapse onto the // canonical slug before the connectedness check fires. + // Pin the live re-check to the same snapshot so the test is hermetic + // (no host config / backend read): `gmail` stays unknown, while the + // normalised `google_calendar` is accepted. + set_live_fetch_override(Some(vec!["google_calendar".to_string()])); let tool = SkillDelegationTool::for_connected(vec![( "google_calendar".to_string(), "Calendar.".to_string(), @@ -360,5 +447,28 @@ mod tests { ); } } + clear_live_fetch_override(); + } + + #[test] + fn resolve_connected_toolkits_prefers_live_recheck_for_unknown_slug() { + let snapshot = vec![("gmail".to_string(), "Email".to_string())]; + + let (known_snapshot, allowed_snapshot) = + resolve_connected_toolkits(&snapshot, "gmail", None); + assert!(known_snapshot); + assert_eq!(allowed_snapshot, vec!["gmail".to_string()]); + + let live = vec!["gmail".to_string(), "notion".to_string()]; + let (known_live, allowed_live) = + resolve_connected_toolkits(&snapshot, "notion", Some(live.as_slice())); + assert!(known_live); + assert_eq!(allowed_live, live); + + let live_no_match = vec!["gmail".to_string(), "notion".to_string()]; + let (known_none, allowed_none) = + resolve_connected_toolkits(&snapshot, "slack", Some(live_no_match.as_slice())); + assert!(!known_none); + assert_eq!(allowed_none, live_no_match); } } diff --git a/src/openhuman/memory_sync/composio/bus.rs b/src/openhuman/memory_sync/composio/bus.rs index b78e381461..af087a2ed5 100644 --- a/src/openhuman/memory_sync/composio/bus.rs +++ b/src/openhuman/memory_sync/composio/bus.rs @@ -530,10 +530,23 @@ impl EventHandler for ComposioConnectionCreatedSubscriber { // incident triage. match ops::fetch_connected_integrations_status(ctx.config.as_ref()).await { FetchConnectedIntegrationsStatus::Authoritative(entries) => { + let mut toolkits: Vec = entries + .iter() + .filter(|entry| entry.connected) + .map(|entry| entry.toolkit.clone()) + .collect(); + toolkits.sort(); + toolkits.dedup(); + crate::core::event_bus::publish_global( + DomainEvent::ComposioIntegrationsChanged { + toolkits: toolkits.clone(), + }, + ); tracing::debug!( toolkit = %toolkit, connection_id = %connection_id, cached_entries = entries.len(), + active_toolkits = ?toolkits, "[composio:bus] eagerly warmed integrations cache after connection became active" ); } @@ -700,11 +713,13 @@ async fn wait_for_connection_active( /// (#1710). /// /// The subscriber is intentionally tiny: it only clears the cache, -/// which guarantees the very next `fetch_connected_integrations` / -/// `cached_active_integrations` read hits the new client. We don't -/// eagerly warm the cache here because we don't have a config handle -/// in the event payload and `load_config_with_timeout` would race the -/// concurrent `cfg_mut.save()` call that produced this event. +/// then attempts a best-effort eager warm + `ComposioIntegrationsChanged` +/// publish in a detached task so active sessions can refresh their +/// delegation schema without waiting for the next turn boundary. +/// +/// The warm/publish step is intentionally opportunistic: if config load +/// or backend access fails we leave the cache cold and rely on the +/// existing 5 s UI poll / next-turn fallback path. pub struct ComposioConfigChangedSubscriber; impl ComposioConfigChangedSubscriber { @@ -740,6 +755,45 @@ impl EventHandler for ComposioConfigChangedSubscriber { "[composio-cache] config changed — invalidating integrations cache" ); ops::invalidate_connected_integrations_cache(); + + tokio::spawn(async move { + let config = match config_rpc::load_config_with_timeout().await { + Ok(config) => config, + Err(error) => { + tracing::debug!( + error = %error, + "[composio-cache] config changed eager warm skipped: config load failed" + ); + return; + } + }; + + match ops::fetch_connected_integrations_status(&config).await { + FetchConnectedIntegrationsStatus::Authoritative(entries) => { + let mut toolkits: Vec = entries + .iter() + .filter(|entry| entry.connected) + .map(|entry| entry.toolkit.clone()) + .collect(); + toolkits.sort(); + toolkits.dedup(); + crate::core::event_bus::publish_global( + DomainEvent::ComposioIntegrationsChanged { + toolkits: toolkits.clone(), + }, + ); + tracing::debug!( + active_toolkits = ?toolkits, + "[composio-cache] config changed eager warm complete; published integrations changed" + ); + } + FetchConnectedIntegrationsStatus::Unavailable => { + tracing::debug!( + "[composio-cache] config changed eager warm skipped: backend unavailable" + ); + } + } + }); } } diff --git a/tests/memory_raw_coverage_e2e.rs b/tests/memory_raw_coverage_e2e.rs index 3547810fae..b810775f8e 100644 --- a/tests/memory_raw_coverage_e2e.rs +++ b/tests/memory_raw_coverage_e2e.rs @@ -368,7 +368,7 @@ fn memory_sources_validation_and_sync_classification_edges() { assert_eq!(classify_unknown("GMAIL_FETCH_EMAILS"), ToolScope::Read); assert_eq!( toolkit_from_slug(" MICROSOFT_TEAMS_SEND "), - Some("microsoft".into()) + Some("microsoft_teams".into()) ); assert_eq!(toolkit_from_slug(""), None); let catalog = [CuratedTool {