Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/core/event_bus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ pub enum DomainEvent {
toolkit: String,
connection_id: String,
},
/// The connected Composio toolkit set changed (connect/revoke/config flip).
///
/// `toolkits` is the currently-active, sanitised slug list that should
/// drive orchestrator delegation schema rebuilds.
ComposioIntegrationsChanged { toolkits: Vec<String> },
/// A Composio action was executed (success or failure) via the backend.
ComposioActionExecuted {
tool: String,
Expand Down Expand Up @@ -795,6 +800,7 @@ impl DomainEvent {
Self::ComposioTriggerReceived { .. }
| Self::ComposioConnectionCreated { .. }
| Self::ComposioConnectionDeleted { .. }
| Self::ComposioIntegrationsChanged { .. }
| Self::ComposioActionExecuted { .. }
| Self::ComposioConfigChanged { .. } => "composio",

Expand Down Expand Up @@ -895,6 +901,7 @@ impl DomainEvent {
Self::ComposioTriggerReceived { .. } => "ComposioTriggerReceived",
Self::ComposioConnectionCreated { .. } => "ComposioConnectionCreated",
Self::ComposioConnectionDeleted { .. } => "ComposioConnectionDeleted",
Self::ComposioIntegrationsChanged { .. } => "ComposioIntegrationsChanged",
Self::ComposioActionExecuted { .. } => "ComposioActionExecuted",
Self::ComposioConfigChanged { .. } => "ComposioConfigChanged",
Self::TriggerEvaluated { .. } => "TriggerEvaluated",
Expand Down
6 changes: 6 additions & 0 deletions src/core/event_bus/events_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ fn all_variants_have_correct_domain() {
},
"composio",
),
(
DomainEvent::ComposioIntegrationsChanged {
toolkits: vec!["gmail".into(), "notion".into()],
},
"composio",
),
(
DomainEvent::ComposioConfigChanged {
mode: "direct".into(),
Expand Down
2 changes: 1 addition & 1 deletion src/openhuman/agent/harness/engine/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub(crate) async fn run_turn_engine(
}

// Caller-specific pre-dispatch work (e.g. Agent's ContextManager).
observer.before_dispatch(history, iteration).await?;
observer.before_dispatch(history, tools, iteration).await?;

tracing::debug!(iteration, "[agent_loop] sending LLM request");
let image_marker_count = multimodal::count_image_markers(history);
Expand Down
2 changes: 2 additions & 0 deletions src/openhuman/agent/harness/engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use anyhow::Result;
use async_trait::async_trait;

use super::tool_source::ToolSource;
use crate::openhuman::agent::harness::parse::ParsedToolCall;
use crate::openhuman::inference::provider::{ChatMessage, ToolCall, UsageInfo};

Expand All @@ -29,6 +30,7 @@ pub(crate) trait TurnObserver: Send {
async fn before_dispatch(
&mut self,
_history: &mut Vec<ChatMessage>,
_tools: &mut dyn ToolSource,
_iteration: usize,
) -> Result<()> {
Ok(())
Expand Down
18 changes: 18 additions & 0 deletions src/openhuman/agent/harness/engine/tool_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
//! subagent and `Agent` impls land in later phases.

use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;

use super::super::payload_summarizer::PayloadSummarizer;
use super::progress::ProgressReporter;
use super::{run_one_tool, ToolRunResult};
use crate::openhuman::agent::harness::parse::ParsedToolCall;
use crate::openhuman::agent_tool_policy::ToolPolicySession;
use crate::openhuman::tools::policy::ToolPolicy;
use crate::openhuman::tools::{Tool, ToolSpec};

Expand All @@ -45,6 +47,22 @@ pub(crate) trait ToolSource: Send {
progress: &dyn ProgressReporter,
progress_call_id: &str,
) -> ToolRunResult;

/// Replace the caller-specific runtime snapshot after a dynamic refresh.
/// Default no-op for non-agent callers.
#[allow(clippy::too_many_arguments)]
fn sync_agent_surface(
&mut self,
_tools: Arc<Vec<Box<dyn Tool>>>,
_visible_tool_names: HashSet<String>,
_tool_policy_session: ToolPolicySession,
_payload_summarizer: Option<Arc<dyn PayloadSummarizer>>,
_prefer_markdown: bool,
_budget_bytes: usize,
_should_send_specs: bool,
_advertised_specs: Vec<ToolSpec>,
) {
}
}

/// The channel/CLI/triage tool source: a persistent `registry`, optional
Expand Down
3 changes: 3 additions & 0 deletions src/openhuman/agent/harness/session/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ impl AgentBuilder {
Arc::new(crate::openhuman::agent::tool_policy::AllowAllToolPolicy)
}),
last_seen_integrations_hash: 0,
composio_integrations_rx: None,
announced_integrations: std::collections::HashSet::new(),
pending_integration_announcement: None,
archivist_hook: self.archivist_hook,
synthesized_tool_names: std::collections::HashSet::new(),
})
Expand Down
83 changes: 83 additions & 0 deletions src/openhuman/agent/harness/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! (`MockProvider`, `RecordingProvider`, `MockTool`) are defined here.

use super::types::{Agent, AgentBuilder};
use crate::core::event_bus::{init_global, publish_global, DomainEvent};
use crate::openhuman::agent::dispatcher::{NativeToolDispatcher, XmlToolDispatcher};
use crate::openhuman::inference::provider::{ChatRequest, ConversationMessage, Provider};
use crate::openhuman::memory::Memory;
Expand Down Expand Up @@ -174,6 +175,22 @@ fn build_minimal_agent_with_definition_name(definition_name: Option<&str>) -> Ag
builder.build().expect("minimal agent build should succeed")
}

fn integration_delegate_toolkit_enum(agent: &Agent) -> Vec<String> {
let spec = agent
.tool_specs()
.iter()
.find(|spec| spec.name == "delegate_to_integrations_agent")
.expect("delegate_to_integrations_agent tool spec should be present");
let mut out: Vec<String> = spec.parameters["properties"]["toolkit"]["enum"]
.as_array()
.expect("toolkit enum should be an array")
.iter()
.filter_map(|v| v.as_str().map(ToString::to_string))
.collect();
out.sort();
out
}

/// Regression test for the `build_session_agent_inner` agent-id
/// threading bug.
///
Expand Down Expand Up @@ -256,6 +273,72 @@ fn set_connected_integrations_marks_session_initialized_and_updates_hash() {
);
}

#[test]
fn refresh_delegation_tools_updates_schema_even_when_tool_arc_is_shared() {
use crate::openhuman::agent::harness::AgentDefinitionRegistry;

AgentDefinitionRegistry::init_global_builtins().unwrap();
let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator"));
agent.set_connected_integrations(vec![
crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: "gmail".into(),
description: "Email".into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
},
]);

assert!(agent.refresh_delegation_tools());
assert_eq!(
integration_delegate_toolkit_enum(&agent),
vec!["gmail".to_string()]
);

// Simulate an in-flight turn holding a shared Arc clone.
let _shared_tools = agent.tools_arc();
agent.set_connected_integrations(vec![
crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: "gmail".into(),
description: "Email".into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
},
crate::openhuman::context::prompt::ConnectedIntegration {
toolkit: "notion".into(),
description: "Docs".into(),
tools: vec![],
gated_tools: vec![],
connected: true,
non_active_status: None,
},
]);

assert!(agent.refresh_delegation_tools());
assert_eq!(
integration_delegate_toolkit_enum(&agent),
vec!["gmail".to_string(), "notion".to_string()]
);
}

#[test]
fn composio_listener_drains_integrations_changed_events() {
let _ = init_global(64);
let mut agent = build_minimal_agent_with_definition_name(Some("orchestrator"));
agent.ensure_composio_integrations_listener();
publish_global(DomainEvent::ComposioIntegrationsChanged {
toolkits: vec!["gmail".into()],
});
assert!(agent.drain_composio_integrations_changed_events());
assert!(
!agent.drain_composio_integrations_changed_events(),
"event queue should be drained after one pass"
);
}

#[tokio::test]
async fn turn_without_tools_returns_text() {
let workspace = tempfile::TempDir::new().expect("temp workspace");
Expand Down
Loading
Loading