Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/core/event_bus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ pub enum DomainEvent {
toolkit: String,
connection_id: String,
},
/// The connected Composio toolkit set changed (connect/revoke/config flip).
///
/// `toolkits` is the currently-active, sanitised slug list that should
/// drive orchestrator delegation schema rebuilds.
ComposioIntegrationsChanged { toolkits: Vec<String> },
/// A Composio action was executed (success or failure) via the backend.
ComposioActionExecuted {
tool: String,
Expand Down Expand Up @@ -795,6 +800,7 @@ impl DomainEvent {
Self::ComposioTriggerReceived { .. }
| Self::ComposioConnectionCreated { .. }
| Self::ComposioConnectionDeleted { .. }
| Self::ComposioIntegrationsChanged { .. }
| Self::ComposioActionExecuted { .. }
| Self::ComposioConfigChanged { .. } => "composio",

Expand Down Expand Up @@ -895,6 +901,7 @@ impl DomainEvent {
Self::ComposioTriggerReceived { .. } => "ComposioTriggerReceived",
Self::ComposioConnectionCreated { .. } => "ComposioConnectionCreated",
Self::ComposioConnectionDeleted { .. } => "ComposioConnectionDeleted",
Self::ComposioIntegrationsChanged { .. } => "ComposioIntegrationsChanged",
Self::ComposioActionExecuted { .. } => "ComposioActionExecuted",
Self::ComposioConfigChanged { .. } => "ComposioConfigChanged",
Self::TriggerEvaluated { .. } => "TriggerEvaluated",
Expand Down
6 changes: 6 additions & 0 deletions src/core/event_bus/events_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ fn all_variants_have_correct_domain() {
},
"composio",
),
(
DomainEvent::ComposioIntegrationsChanged {
toolkits: vec!["gmail".into(), "notion".into()],
},
"composio",
),
(
DomainEvent::ComposioConfigChanged {
mode: "direct".into(),
Expand Down
2 changes: 1 addition & 1 deletion src/openhuman/agent/harness/engine/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub(crate) async fn run_turn_engine(
}

// Caller-specific pre-dispatch work (e.g. Agent's ContextManager).
observer.before_dispatch(history, iteration).await?;
observer.before_dispatch(history, tools, iteration).await?;

tracing::debug!(iteration, "[agent_loop] sending LLM request");
let image_marker_count = multimodal::count_image_markers(history);
Expand Down
2 changes: 2 additions & 0 deletions src/openhuman/agent/harness/engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use anyhow::Result;
use async_trait::async_trait;

use super::tool_source::ToolSource;
use crate::openhuman::agent::harness::parse::ParsedToolCall;
use crate::openhuman::inference::provider::{ChatMessage, ToolCall, UsageInfo};

Expand All @@ -29,6 +30,7 @@ pub(crate) trait TurnObserver: Send {
async fn before_dispatch(
&mut self,
_history: &mut Vec<ChatMessage>,
_tools: &mut dyn ToolSource,
_iteration: usize,
) -> Result<()> {
Ok(())
Expand Down
18 changes: 18 additions & 0 deletions src/openhuman/agent/harness/engine/tool_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
//! subagent and `Agent` impls land in later phases.

use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;

use super::super::payload_summarizer::PayloadSummarizer;
use super::progress::ProgressReporter;
use super::{run_one_tool, ToolRunResult};
use crate::openhuman::agent::harness::parse::ParsedToolCall;
use crate::openhuman::agent_tool_policy::ToolPolicySession;
use crate::openhuman::tools::policy::ToolPolicy;
use crate::openhuman::tools::{Tool, ToolSpec};

Expand All @@ -45,6 +47,22 @@ pub(crate) trait ToolSource: Send {
progress: &dyn ProgressReporter,
progress_call_id: &str,
) -> ToolRunResult;

/// Replace the caller-specific runtime snapshot after a dynamic refresh.
/// Default no-op for non-agent callers.
#[allow(clippy::too_many_arguments)]
fn sync_agent_surface(
&mut self,
_tools: Arc<Vec<Box<dyn Tool>>>,
_visible_tool_names: HashSet<String>,
_tool_policy_session: ToolPolicySession,
_payload_summarizer: Option<Arc<dyn PayloadSummarizer>>,
_prefer_markdown: bool,
_budget_bytes: usize,
_should_send_specs: bool,
_advertised_specs: Vec<ToolSpec>,
) {
}
}

/// The channel/CLI/triage tool source: a persistent `registry`, optional
Expand Down
4 changes: 4 additions & 0 deletions src/openhuman/agent/harness/session/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,12 @@ impl AgentBuilder {
Arc::new(crate::openhuman::agent::tool_policy::AllowAllToolPolicy)
}),
last_seen_integrations_hash: 0,
composio_integrations_rx: None,
announced_integrations: std::collections::HashSet::new(),
pending_integration_announcement: Vec::new(),
archivist_hook: self.archivist_hook,
synthesized_tool_names: std::collections::HashSet::new(),
pending_synthesized_tools_mask: std::collections::HashSet::new(),
})
}
}
Expand Down
152 changes: 152 additions & 0 deletions src/openhuman/agent/harness/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! (`MockProvider`, `RecordingProvider`, `MockTool`) are defined here.

use super::types::{Agent, AgentBuilder};
use crate::core::event_bus::{init_global, publish_global, DomainEvent};
use crate::openhuman::agent::dispatcher::{NativeToolDispatcher, XmlToolDispatcher};
use crate::openhuman::inference::provider::{ChatRequest, ConversationMessage, Provider};
use crate::openhuman::memory::Memory;
Expand Down Expand Up @@ -174,6 +175,22 @@ fn build_minimal_agent_with_definition_name(definition_name: Option<&str>) -> Ag
builder.build().expect("minimal agent build should succeed")
}

fn integration_delegate_toolkit_enum(agent: &Agent) -> Vec<String> {
let spec = agent
.tool_specs()
.iter()
.find(|spec| spec.name == "delegate_to_integrations_agent")
.expect("delegate_to_integrations_agent tool spec should be present");
let mut out: Vec<String> = spec.parameters["properties"]["toolkit"]["enum"]
.as_array()
.expect("toolkit enum should be an array")
.iter()
.filter_map(|v| v.as_str().map(ToString::to_string))
.collect();
out.sort();
out
}

/// Regression test for the `build_session_agent_inner` agent-id
/// threading bug.
///
Expand Down Expand Up @@ -256,6 +273,141 @@ fn set_connected_integrations_marks_session_initialized_and_updates_hash() {
);
}

#[test]
fn refresh_delegation_tools_updates_schema_even_when_tool_arc_is_shared() {
use crate::openhuman::agent::harness::AgentDefinitionRegistry;

AgentDefinitionRegistry::init_global_builtins().unwrap();
let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator"));
agent.set_connected_integrations(vec![
crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: "gmail".into(),
description: "Email".into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
},
]);

assert!(agent.refresh_delegation_tools());
assert_eq!(
integration_delegate_toolkit_enum(&agent),
vec!["gmail".to_string()]
);

// Simulate an in-flight turn holding a shared Arc clone.
let _shared_tools = agent.tools_arc();
agent.set_connected_integrations(vec![
crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: "gmail".into(),
description: "Email".into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
},
crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: "notion".into(),
description: "Docs".into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
},
]);

assert!(agent.refresh_delegation_tools());
assert_eq!(
integration_delegate_toolkit_enum(&agent),
vec!["gmail".to_string(), "notion".to_string()]
);
}

/// Regression for #3044: repeated mid-session connects while the `tools`
/// Arc stays shared (the normal `before_dispatch` path, where
/// `AgentToolSource` holds a clone) must not accumulate duplicate
/// synthesised `ToolSpec`s.
///
/// Before the fix, a failed `tools` reconcile rolled `synthesized_tool_names`
/// back to the *old* mask. On the next refresh the spec `retain` used that
/// stale mask and failed to drop the intervening refresh's specs, so the
/// synthesised delegate spec piled up once per connect.
#[test]
fn refresh_delegation_tools_no_duplicate_specs_across_shared_arc_connects() {
use crate::openhuman::agent::harness::AgentDefinitionRegistry;

AgentDefinitionRegistry::init_global_builtins().unwrap();
let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator"));

let conn = |slug: &str, desc: &str| crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: slug.into(),
description: desc.into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
};

let delegate_spec_count = |agent: &Agent| -> usize {
agent
.tool_specs()
.iter()
.filter(|s| s.name == "delegate_to_integrations_agent")
.count()
};

// Turn 1: gmail connects.
agent.set_connected_integrations(vec![conn("gmail", "Email")]);
assert!(agent.refresh_delegation_tools());

// Hold a shared clone across every subsequent refresh so `Arc::get_mut`
// always fails — exactly what happens during an in-flight turn.
let _shared_tools = agent.tools_arc();

// Turn 2: notion connects mid-session.
agent.set_connected_integrations(vec![conn("gmail", "Email"), conn("notion", "Docs")]);
assert!(agent.refresh_delegation_tools());

// Turn 3: slack connects mid-session — this is where the old code
// produced a duplicate `delegate_to_integrations_agent` spec.
agent.set_connected_integrations(vec![
conn("gmail", "Email"),
conn("notion", "Docs"),
conn("slack", "Chat"),
]);
assert!(agent.refresh_delegation_tools());

assert_eq!(
delegate_spec_count(&agent),
1,
"exactly one synthesised delegate spec must remain after repeated shared-Arc connects"
);
assert_eq!(
integration_delegate_toolkit_enum(&agent),
vec![
"gmail".to_string(),
"notion".to_string(),
"slack".to_string()
]
);
}

#[test]
fn composio_listener_drains_integrations_changed_events() {
let _ = init_global(64);
let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator"));
agent.ensure_composio_integrations_listener();
publish_global(DomainEvent::ComposioIntegrationsChanged {
toolkits: vec!["gmail".into()],
});
assert!(agent.drain_composio_integrations_changed_events());
assert!(
!agent.drain_composio_integrations_changed_events(),
"event queue should be drained after one pass"
);
}

#[tokio::test]
async fn turn_without_tools_returns_text() {
let workspace = tempfile::TempDir::new().expect("temp workspace");
Expand Down
Loading
Loading