Skip to content

fix(agent): refresh orchestrator integration context on mid-session OAuth (#3044)#3153

Merged
graycyrus merged 11 commits into
tinyhumansai:mainfrom
oxoxDev:fix/3044-orchestrator-oauth-refresh
Jun 1, 2026
Merged

fix(agent): refresh orchestrator integration context on mid-session OAuth (#3044)#3153
graycyrus merged 11 commits into
tinyhumansai:mainfrom
oxoxDev:fix/3044-orchestrator-oauth-refresh

Conversation

@oxoxDev
Copy link
Copy Markdown
Contributor

@oxoxDev oxoxDev commented Jun 1, 2026

Summary

  • Refresh the orchestrator's delegate_to_integrations_agent schema mid-conversation when a Composio toolkit connects, so a toolkit OAuth'd while a chat thread is open is usable without a new thread or app restart.
  • New DomainEvent::ComposioIntegrationsChanged, published on connect + config-flip; the active session drains it in before_dispatch and re-syncs its live tool surface in the same turn.
  • Eliminates the prior shared-Arc no-op: tool_specs (the provider-facing schema holding the toolkit enum) now reconciles via copy-on-write, so the enum updates same-turn even when the tool list is shared. The frozen system-prompt prefix stays byte-identical (KV-cache preserved).
  • Safety net: skill_delegation::execute() does one live status re-check before rejecting an unknown toolkit.
  • First-ask mitigation: announce newly-connected toolkit(s) on the next user turn so the model acts on them on the first post-connect ask instead of refusing from stale chat context.

Problem

delegate_to_integrations_agent's toolkit enum is baked into the tool schema at instantiation and the system prompt is frozen after turn 1 (KV-cache). When a user connects an integration mid-conversation (e.g. Notion OAuth completes while a thread is open), the active session keeps a stale view: Composio/Settings show ACTIVE and memory sync works, but routed delegation fails with "toolkit not connected" until the next thread or restart. (Symptom reported in #3030.)

Solution

Five micro-commits, dependency-ordered:

  1. feat(events) — add DomainEvent::ComposioIntegrationsChanged { toolkits } (core/event_bus/events.rs, #[non_exhaustive]).
  2. feat(composio) — publish it from memory_sync/composio/bus.rs after the connect eager-warm and the config-changed invalidation.
  3. feat(agent) — mid-turn refresh: before_dispatch drains the event → refresh_delegation_tools_from_cached_integrationssync_agent_surface pushes the fresh Arc/specs into the live AgentToolSource. refresh_delegation_tools reconciles tool_specs via Arc::make_mut (COW) so the schema always updates; non-cloneable Box<dyn Tool> executables reconcile when uniquely owned. System prompt untouched.
  4. fix(skill-delegation) — live re-check before rejecting an unknown toolkit (mirrors the spawn_subagent pre-flight).
  5. feat(agent) — first-ask mitigation: a one-shot note ("X connected this session, use it now") parked on refresh and consumed when the next user message is built. It rides the user turn, not the system prompt, so the KV-cache prefix is unchanged; deduped via announced_integrations (seeded at turn 1).

Submission Checklist

If a section does not apply to this change, mark the item as N/A with a one-line reason. Do not delete items.

  • Tests added or updated (happy path + at least one failure / edge case) per Testing Strategydrain_composio_integrations_changed_events test, resolve_connected_toolkits_prefers_live_recheck_for_unknown_slug, integration_announcement_fires_once_for_new_toolkit (incl. dedup / no re-announce).
  • Diff coverage ≥ 80% — new/changed lines (the synthesized_tool_names/pending_synthesized_tools_mask split, the announcement-accumulation path, the mid-turn resync log, and the hermetic skill-delegation test hooks) are exercised by the added regression tests (refresh_delegation_tools_no_duplicate_specs_across_shared_arc_connects, integration_announcement_accumulates_two_connects_in_one_note) plus the existing session/skill_delegation suites; cargo-llvm-cov not run locally (heavy), the CI Coverage Gate enforces the threshold.
  • Coverage matrix updated — N/A: bug fix, no user-facing feature added/removed/renamed.
  • All affected feature IDs from the matrix are listed under ## Related — N/A: no feature behaviour added or moved.
  • No new external network dependencies introduced — reuses existing Composio status fetch + event bus.
  • Manual smoke checklist updated if this touches release-cut surfaces — N/A: agent-harness internal; no release-cut UI surface.
  • Linked issue closed via Closes #NNN in the ## Related section.

Impact

  • A toolkit connected mid-conversation becomes usable in the same thread, no restart — the reported Refresh orchestrator integration context after OAuth (fixes stale delegate_to_integrations_agent) #3044 symptom.
  • KV-cache prefix unchanged — the system prompt is never rebuilt; dynamic context (the connect announcement) rides the user message, the codebase's documented channel.
  • No migration / no schema change — additive event variant + two Agent fields.
  • Staging live-verification (2026-06-01): connected Notion mid-thread → ComposioIntegrationsChanged received (active_toolkits=[github,notion,slack]) → refresh_delegation_tools … tools_reconciled=truedelegate_to_integrations_agent(notion) accepted and dispatched in the same thread, repeated delegations succeeded, no restart. KV-cache prefix confirmed unchanged in logs. The first-ask announcement (commit 5) is unit-covered; the core mid-session refresh (commits 1–4) is the part exercised end-to-end live.

Related


AI Authored PR Metadata (required for Codex/Linear PRs)

Linear Issue

  • Key: N/A
  • URL: N/A

Commit & Branch

  • Branch: fix/3044-orchestrator-oauth-refresh
  • Commit SHA: 8216f2e50d3df47570e8d20c71148f84ba1c877c

Validation Run

  • pnpm --filter openhuman-app format:check — N/A: no frontend files changed; cargo fmt --all --check clean.
  • pnpm typecheck — N/A: Rust-only change.
  • Focused tests: cargo test --lib openhuman::agent::harness::session → 110 passed; …::skill_delegation → 9 passed.
  • Rust fmt/check (if changed): cargo check --lib clean; cargo clippy --lib --no-deps 0 new warnings on changed files.
  • Tauri fmt/check (if changed): N/A — app/src-tauri not touched.

Validation Blocked

  • command: N/A
  • error: N/A
  • impact: N/A

Behavior Changes

  • Intended behavior change: mid-session Composio connects refresh the orchestrator delegation schema in-thread; newly-connected toolkits are announced on the next user turn.
  • User-visible effect: a toolkit connected mid-conversation is usable immediately (no new thread / restart), and the orchestrator no longer refuses it on the first ask.

Summary by CodeRabbit

  • New Features
    • Connect/disconnect Composio integrations mid-conversation; agent updates capabilities in real time.
    • Newly connected toolkits are announced once (can batch multiple connects) and prepended to the next user message.
    • Live validation checks toolkit availability before execution and refreshes delegation tooling automatically.
  • Bug Fixes
    • Prevents duplicate synthesized tool specs when integrations update with shared tool ownership.

@oxoxDev oxoxDev requested a review from a team June 1, 2026 13:21
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 1, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

Adds ComposioIntegrationsChanged events and per-session listeners to trigger mid-session delegation-tool schema refreshes; extends observer seam to pass ToolSource and sync engine tool-surface; adds live validation fallback and tests to ensure newly-connected toolkits are accepted without restarting a chat.

Changes

Dynamic Composio Integration Refresh

Layer / File(s) Summary
Event Infrastructure
src/core/event_bus/events.rs, src/core/event_bus/events_tests.rs
New DomainEvent::ComposioIntegrationsChanged { toolkits: Vec<String> } variant, mapped to domain "composio" with stable variant name; event test extended.
Turn-Engine Observer Seam
src/openhuman/agent/harness/engine/state.rs, src/openhuman/agent/harness/engine/core.rs, src/openhuman/agent/harness/engine/tool_source.rs
TurnObserver::before_dispatch now accepts &mut dyn ToolSource; ToolSource gains default sync_agent_surface hook; call-site updated to pass tools handle.
Event Publishing on Cache/Config
src/openhuman/memory_sync/composio/bus.rs
Post-OAuth cache warm and config-changed eager warm now publish ComposioIntegrationsChanged with sorted/deduped connected toolkit list.
Agent Session State
src/openhuman/agent/harness/session/types.rs, src/openhuman/agent/harness/session/builder.rs
Agent gains composio_integrations_rx, announced_integrations, pending_integration_announcement, and pending_synthesized_tools_mask; builder initializes them.
Agent::turn Integration & Reconciliation
src/openhuman/agent/harness/session/turn.rs
Lazily arms per-session listener, drains events, reconciles cached integrations via hash check, refactors refresh_delegation_tools for shared vs unique Arc ownership, and queues one-shot user announcement for newly connected toolkits.
Before-Dispatch Tool-Surface Sync
src/openhuman/agent/harness/session/turn_engine_adapter.rs
AgentToolSource::sync_agent_surface added; AgentObserver::before_dispatch calls it after reconciliation so engine mirror is up-to-date before dispatch.
Live Toolkit Validation Fallback
src/openhuman/agent_orchestration/tools/skill_delegation.rs
Adds snapshot-first, live-fallback resolve_connected_toolkits and one-time live re-check in execute; tests added/updated to cover acceptance and rejection paths.
Tests
src/openhuman/agent/harness/session/tests.rs, src/openhuman/agent/harness/session/turn_tests.rs
Added helpers and regression/unit tests for delegation-toolkit enum extraction, schema refresh with shared Arc, event drain behavior, and integration announcement semantics.

Sequence Diagram(s)

sequenceDiagram
  participant OAuth as OAuth Flow
  participant Bus as Composio Bus
  participant Session as Agent Session
  participant Engine as Turn Engine
  participant Tools as AgentToolSource
  participant SD as SkillDelegation
  
  OAuth->>Bus: connection reaches ACTIVE
  Bus->>Bus: cache warm, fetch integrations
  Bus->>Bus: publish ComposioIntegrationsChanged
  
  Session->>Session: listener drains event
  Session->>Session: refresh_delegation_tools_from_cached_integrations
  Session->>Session: set pending_integration_announcement
  
  Session->>Engine: Agent::turn with updated schema
  Engine->>Tools: before_dispatch(history, tools, iteration)
  Tools->>Tools: sync_agent_surface with refreshed state
  Tools-->>Engine: tool source synced
  
  Engine->>SD: user requests delegation
  SD->>SD: toolkit in schema ✓
  SD->>SD: execute with allowed toolkit
  SD-->>Engine: delegation result
  
  Engine->>Session: inject pending announcement to user message
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • senamakel
  • M3gA-Mind
  • graycyrus

🐰 I hopped through code, ears on the breeze,
A toolkit arrived mid-chat, with quiet ease.
I published the change and the agent awoke,
Synced tools, fresh schema — no restart, no yoke.
Happy hops — integrations now speak with a poke.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately summarizes the main change: refreshing orchestrator integration context when mid-session OAuth connects new toolkits. It is concise, specific, and directly matches the core objective.
Linked Issues check ✅ Passed The PR comprehensively addresses all acceptance criteria from #3044: publishes DomainEvent::ComposioIntegrationsChanged [bus.rs], active sessions drain events in before_dispatch [turn_engine_adapter.rs], tool_specs reconcile via Arc::make_mut [turn.rs], refresh_delegation_tools handles shared Arc [turn.rs], and live re-check before rejection [skill_delegation.rs]. Regression tests verify mid-session OAuth workflow, live recheck behavior, and announcement deduplication [turn_tests.rs, skill_delegation.rs, session/tests.rs].
Out of Scope Changes check ✅ Passed All changes are directly scoped to mid-session integration refresh: event publishing/consumption, tool schema updates, live rechecks, announcement management, and supporting infrastructure (ToolSource trait extension, TurnObserver signature update). No unrelated feature changes or refactorings detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot added feature Net-new user-facing capability or product behavior. agent Built-in agents, prompts, orchestration, and agent runtime in src/openhuman/agent/. memory Memory store, memory tree, recall, summarization, and embeddings in src/openhuman/memory/. labels Jun 1, 2026
Copy link
Copy Markdown
Contributor

@graycyrus graycyrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oxoxDev heads up — CI is failing on this PR (PR Submission Checklist is red, several checks still pending), so i'll hold off on fully approving until those are sorted. i did a pass through the diff though and spotted one thing worth addressing before that:

The partial-refresh path in refresh_delegation_tools has a synthesized_tool_names state inconsistency that will cause duplicate ToolSpec entries in self.tool_specs under repeated mid-session integration connects.

Here's the scenario:

  1. Session starts, gmail connected — synthesized_tool_names = {gmail_delegate}.
  2. Mid-session, notion connects. before_dispatch calls refresh_delegation_tools. Because AgentToolSource is alive and holds an Arc::clone of agent.tools, Arc::get_mut always fails here — tools_reconciled = false. Specs are updated correctly (gmail removed, gmail+notion added), but synthesized_tool_names is restored to {gmail_delegate} (old_synth).
  3. Slack connects later. old_synth = take(synthesized_tool_names) = {gmail_delegate}. The retain removes gmail_delegate from specs — but notion_delegate (from step 2) is not in old_synth, so it stays. Then extend adds gmail+notion+slack. notion_delegate is now in tool_specs twice.

This isn't a rare race — it's the normal path: AgentToolSource always holds a clone during before_dispatch, so Arc::get_mut always fails in this call site. Every time a second integration connects in the same session, the spec list grows stale duplicates.

The fix is to track synthed_names in synthesized_tool_names even when tools_reconciled = false (since specs were successfully updated), and separately maintain a pending_tools_mask for the stale executable reconciliation. Alternatively, on a partial refresh, scan tool_specs for existing entries with the new synthesized names before extending to avoid duplicates.

Fix the CI failures first and i'll come back for a full pass after.

Comment thread src/openhuman/agent/harness/session/turn.rs Outdated
coderabbitai[bot]
coderabbitai Bot previously requested changes Jun 1, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/openhuman/agent_orchestration/tools/skill_delegation.rs (1)

334-350: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Keep execute_rejects_unknown_toolkit_with_allowed_list hermetic by preventing real Composio backend calls

#[tokio::test]
async fn execute_rejects_unknown_toolkit_with_allowed_list() {
    let tool = SkillDelegationTool::for_connected(vec![
        ("gmail".to_string(), "Email.".to_string()),
        ("notion".to_string(), "Docs.".to_string()),
    ])
    .unwrap();
    let result = tool
        .execute(json!({"toolkit": "slack", "prompt": "hi"}))
        .await
        .unwrap();
    assert!(result.is_error);
    let body = result.output();
    assert!(body.contains("slack"));
    assert!(body.contains("gmail"));
    assert!(body.contains("notion"));
}

SkillDelegationTool::execute triggers a live re-check for unknown slugs via fetch_live_connected_toolkit_slugs_once(), which unconditionally calls Config::load_or_init() and then fetch_connected_integrations_status(&config). That status only becomes Unavailable when the Composio client can’t be constructed (no backend session token); if a session token is present, the code path can hit the backend over HTTP (via IntegrationClient) and the “allowed” set (gmail/notion) may differ from the snapshot, making this unit test environment-dependent. Add a cfg(test)/mock/stub so this unit-test path always returns Unavailable (or inject a mocked Config/client) rather than relying on the host machine’s login/config state.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/agent_orchestration/tools/skill_delegation.rs` around lines 334
- 350, The test execute_rejects_unknown_toolkit_with_allowed_list is
non-hermetic because SkillDelegationTool::execute calls
fetch_live_connected_toolkit_slugs_once which in turn calls Config::load_or_init
and fetch_connected_integrations_status (via IntegrationClient) and can hit the
real Composio backend; to fix, add a test-only stub (cfg(test)) or dependency
injection so fetch_live_connected_toolkit_slugs_once returns an Unavailable
status during tests: either add a #[cfg(test)] override that makes
fetch_live_connected_toolkit_slugs_once return Unavailable, or modify
SkillDelegationTool (or its constructor for_connected) to accept an optional
mock/config provider used by execute; reference the functions
SkillDelegationTool::execute, fetch_live_connected_toolkit_slugs_once,
Config::load_or_init, fetch_connected_integrations_status and IntegrationClient
when making the change.
🧹 Nitpick comments (2)
src/openhuman/agent/harness/session/turn_engine_adapter.rs (1)

210-232: ⚡ Quick win

Add a diagnostic log when the delegation tool surface is resynced mid-turn.

This new mid-turn refresh path runs silently. Given the PR's goal of verifying "mid-session OAuth → immediate delegation," a debug/trace line with a stable prefix and correlation fields makes the path observable in the field (and confirms the #3030 symptom is actually fixed at runtime). _iteration is already available to use as a correlation field.

📝 Proposed logging
     async fn before_dispatch(
         &mut self,
         buf: &mut Vec<ChatMessage>,
         tools: &mut dyn crate::openhuman::agent::harness::engine::ToolSource,
-        _iteration: usize,
+        iteration: usize,
     ) -> Result<()> {
         if self.agent.drain_composio_integrations_changed_events() {
             let refreshed = self
                 .agent
                 .refresh_delegation_tools_from_cached_integrations("event");
             if refreshed {
+                tracing::debug!(
+                    iteration,
+                    "[agent_loop] composio integrations changed — resyncing delegation tool surface"
+                );
                 tools.sync_agent_surface(
                     Arc::clone(&self.agent.tools),

As per coding guidelines: "default to verbose diagnostics on new/changed flows using log/tracing at debug/trace levels with stable grep-friendly prefixes and correlation fields."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/agent/harness/session/turn_engine_adapter.rs` around lines 210
- 232, Add a diagnostic tracing debug line inside before_dispatch when the
mid-turn refresh path actually runs (i.e., inside the if refreshed { ... }
block) to make resync events observable; log using tracing::debug! (or the
project's tracing facade) with a stable grep-friendly prefix like
"midturn:resync-delegation-tools" and include correlation fields such as
_iteration and a stable agent identifier or context values available on
self.agent (for example self.agent.context/session id or visible_tool_names
length) so the log shows iteration and basic agent/tool state before calling
tools.sync_agent_surface.
src/openhuman/agent_orchestration/tools/skill_delegation.rs (1)

205-215: 💤 Low value

Optional: the known reconciliation can be flattened.

The branch reduces to a single assignment plus a conditional log, which reads more directly:

♻️ Proposed simplification
-        if known_after_recheck && !known {
-            log::info!(
-                "[skill-delegation] toolkit '{}' accepted after live re-check (session schema stale)",
-                slug
-            );
-            known = true;
-        } else {
-            known = known_after_recheck;
-        }
+        if known_after_recheck && !known {
+            log::info!(
+                "[skill-delegation] toolkit '{}' accepted after live re-check (session schema stale)",
+                slug
+            );
+        }
+        known = known_after_recheck;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/agent_orchestration/tools/skill_delegation.rs` around lines 205
- 215, The current reconciliation for `known` in the `skill_delegation` logic
can be simplified: replace the if/else that sets `known` based on
`(known_after_recheck, allowed)` with a direct assignment `known =
known_after_recheck` and move the log into a conditional that checks the
transition from false to true (e.g., `if known && !previous_known {
log::info!(...) }`), using the prior `known` value saved before calling
`resolve_connected_toolkits` to detect the state change; update references to
`resolve_connected_toolkits`, `self.connected_toolkits`, `slug`, and
`live_connected` accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/openhuman/agent/harness/session/types.rs`:
- Around line 204-208: The field pending_integration_announcement should be
changed from a single Option<String> to a structured collection of pending
toolkit identifiers (e.g., Vec<String> or HashSet<String>) so multiple
successive refreshes are accumulated and stale entries can be pruned; update the
code paths that write to it (notably
refresh_delegation_tools_from_cached_integrations) to push/merge toolkit slugs
instead of replacing the string, update the consumer that previously read
pending_integration_announcement to render human-readable prose at consume time
and clear only the consumed slugs, and add logic to remove slugs when a toolkit
disconnects so you never announce disconnected tools. Ensure all references to
pending_integration_announcement (reads/writes) are updated to the new type and
behavior across the module.

In `@src/openhuman/memory_sync/composio/bus.rs`:
- Around line 533-544: The duplicated block that builds, sorts, dedups, and
publishes connected toolkit slugs should be extracted into a single helper in
this module (e.g., fn publish_connected_toolkits_event(entries: &[impl
AsRef<IntegrationEntry>])) that: filters entries by .as_ref().connected, maps to
.as_ref().toolkit.clone(), sorts and dedups the Vec<String>, calls
crate::core::event_bus::publish_global(DomainEvent::ComposioIntegrationsChanged
{ toolkits }), and emits the same tracing::debug with active_toolkits; then
replace the duplicated code in the connection-created path and the
config-changed eager warm path with calls to publish_connected_toolkits_event
using the same entries slice so both use the identical logic and logging.

---

Outside diff comments:
In `@src/openhuman/agent_orchestration/tools/skill_delegation.rs`:
- Around line 334-350: The test
execute_rejects_unknown_toolkit_with_allowed_list is non-hermetic because
SkillDelegationTool::execute calls fetch_live_connected_toolkit_slugs_once which
in turn calls Config::load_or_init and fetch_connected_integrations_status (via
IntegrationClient) and can hit the real Composio backend; to fix, add a
test-only stub (cfg(test)) or dependency injection so
fetch_live_connected_toolkit_slugs_once returns an Unavailable status during
tests: either add a #[cfg(test)] override that makes
fetch_live_connected_toolkit_slugs_once return Unavailable, or modify
SkillDelegationTool (or its constructor for_connected) to accept an optional
mock/config provider used by execute; reference the functions
SkillDelegationTool::execute, fetch_live_connected_toolkit_slugs_once,
Config::load_or_init, fetch_connected_integrations_status and IntegrationClient
when making the change.

---

Nitpick comments:
In `@src/openhuman/agent_orchestration/tools/skill_delegation.rs`:
- Around line 205-215: The current reconciliation for `known` in the
`skill_delegation` logic can be simplified: replace the if/else that sets
`known` based on `(known_after_recheck, allowed)` with a direct assignment
`known = known_after_recheck` and move the log into a conditional that checks
the transition from false to true (e.g., `if known && !previous_known {
log::info!(...) }`), using the prior `known` value saved before calling
`resolve_connected_toolkits` to detect the state change; update references to
`resolve_connected_toolkits`, `self.connected_toolkits`, `slug`, and
`live_connected` accordingly.

In `@src/openhuman/agent/harness/session/turn_engine_adapter.rs`:
- Around line 210-232: Add a diagnostic tracing debug line inside
before_dispatch when the mid-turn refresh path actually runs (i.e., inside the
if refreshed { ... } block) to make resync events observable; log using
tracing::debug! (or the project's tracing facade) with a stable grep-friendly
prefix like "midturn:resync-delegation-tools" and include correlation fields
such as _iteration and a stable agent identifier or context values available on
self.agent (for example self.agent.context/session id or visible_tool_names
length) so the log shows iteration and basic agent/tool state before calling
tools.sync_agent_surface.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d572806e-b57e-4fd2-b045-baea55fbcda8

📥 Commits

Reviewing files that changed from the base of the PR and between a40cd7e and 8216f2e.

📒 Files selected for processing (13)
  • src/core/event_bus/events.rs
  • src/core/event_bus/events_tests.rs
  • src/openhuman/agent/harness/engine/core.rs
  • src/openhuman/agent/harness/engine/state.rs
  • src/openhuman/agent/harness/engine/tool_source.rs
  • src/openhuman/agent/harness/session/builder.rs
  • src/openhuman/agent/harness/session/tests.rs
  • src/openhuman/agent/harness/session/turn.rs
  • src/openhuman/agent/harness/session/turn_engine_adapter.rs
  • src/openhuman/agent/harness/session/turn_tests.rs
  • src/openhuman/agent/harness/session/types.rs
  • src/openhuman/agent_orchestration/tools/skill_delegation.rs
  • src/openhuman/memory_sync/composio/bus.rs

Comment thread src/openhuman/agent/harness/session/types.rs Outdated
Comment on lines +533 to +544
let mut toolkits: Vec<String> = entries
.iter()
.filter(|entry| entry.connected)
.map(|entry| entry.toolkit.clone())
.collect();
toolkits.sort();
toolkits.dedup();
crate::core::event_bus::publish_global(
DomainEvent::ComposioIntegrationsChanged {
toolkits: toolkits.clone(),
},
);
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Extract duplicated toolkit computation and publishing logic.

The same logic appears in both the connection-created path (lines 533–544) and the config-changed eager warm (lines 773–784): filter connected entries, map to toolkit slugs, sort, dedup, and publish ComposioIntegrationsChanged. Extracting this into a helper function will ensure consistency and make future changes easier.

♻️ Proposed helper extraction

Add a helper function in this module:

/// Computes the sorted, deduplicated list of connected toolkit slugs from
/// integration entries and publishes a `ComposioIntegrationsChanged` event.
fn publish_connected_toolkits_event(entries: &[impl AsRef<IntegrationEntry>]) {
    let mut toolkits: Vec<String> = entries
        .iter()
        .filter(|entry| entry.as_ref().connected)
        .map(|entry| entry.as_ref().toolkit.clone())
        .collect();
    toolkits.sort();
    toolkits.dedup();
    crate::core::event_bus::publish_global(
        DomainEvent::ComposioIntegrationsChanged {
            toolkits: toolkits.clone(),
        },
    );
    tracing::debug!(
        active_toolkits = ?toolkits,
        "[composio-cache] published ComposioIntegrationsChanged event"
    );
}

Then replace lines 533–550 with:

-                            let mut toolkits: Vec<String> = entries
-                                .iter()
-                                .filter(|entry| entry.connected)
-                                .map(|entry| entry.toolkit.clone())
-                                .collect();
-                            toolkits.sort();
-                            toolkits.dedup();
-                            crate::core::event_bus::publish_global(
-                                DomainEvent::ComposioIntegrationsChanged {
-                                    toolkits: toolkits.clone(),
-                                },
-                            );
-                            tracing::debug!(
-                                toolkit = %toolkit,
-                                connection_id = %connection_id,
-                                cached_entries = entries.len(),
-                                active_toolkits = ?toolkits,
-                                "[composio:bus] eagerly warmed integrations cache after connection became active"
-                            );
+                            publish_connected_toolkits_event(&entries);
+                            tracing::debug!(
+                                toolkit = %toolkit,
+                                connection_id = %connection_id,
+                                cached_entries = entries.len(),
+                                "[composio:bus] eagerly warmed integrations cache after connection became active"
+                            );

And replace lines 773–788 with:

-                    let mut toolkits: Vec<String> = entries
-                        .iter()
-                        .filter(|entry| entry.connected)
-                        .map(|entry| entry.toolkit.clone())
-                        .collect();
-                    toolkits.sort();
-                    toolkits.dedup();
-                    crate::core::event_bus::publish_global(
-                        DomainEvent::ComposioIntegrationsChanged {
-                            toolkits: toolkits.clone(),
-                        },
-                    );
-                    tracing::debug!(
-                        active_toolkits = ?toolkits,
-                        "[composio-cache] config changed eager warm complete; published integrations changed"
-                    );
+                    publish_connected_toolkits_event(&entries);
+                    tracing::debug!(
+                        "[composio-cache] config changed eager warm complete"
+                    );

Also applies to: 773-784

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/openhuman/memory_sync/composio/bus.rs` around lines 533 - 544, The
duplicated block that builds, sorts, dedups, and publishes connected toolkit
slugs should be extracted into a single helper in this module (e.g., fn
publish_connected_toolkits_event(entries: &[impl AsRef<IntegrationEntry>]))
that: filters entries by .as_ref().connected, maps to .as_ref().toolkit.clone(),
sorts and dedups the Vec<String>, calls
crate::core::event_bus::publish_global(DomainEvent::ComposioIntegrationsChanged
{ toolkits }), and emits the same tracing::debug with active_toolkits; then
replace the duplicated code in the connection-created path and the
config-changed eager warm path with calls to publish_connected_toolkits_event
using the same entries slice so both use the identical logic and logging.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — leaving this one for now. It's a pure readability refactor (the two publish blocks are already behaviourally identical), and extracting it would widen the diff into composio/bus.rs which is otherwise untouched by this PR's correctness fixes. Happy to fold it into the follow-up pass @graycyrus mentioned, or as a separate cleanup PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

oxoxDev added 5 commits June 1, 2026 19:46
…nects (tinyhumansai#3044)

When refresh_delegation_tools updates tool_specs but can't reconcile the
tools Vec (Arc shared during before_dispatch — the normal path), it used
to roll synthesized_tool_names back to the old mask. The next refresh's
spec retain then used a stale mask and left the intervening refresh's
delegate spec in place, accumulating duplicate ToolSpecs per connect.

Keep synthesized_tool_names in lock-step with tool_specs (always the
latest synthesised set) and track stale tool *instances* awaiting removal
in a separate pending_synthesized_tools_mask, consumed by the next
unique-owner refresh.
…c connects (tinyhumansai#3044)

Three mid-session connects (gmail → notion → slack) with the tools Arc
held shared throughout must leave exactly one delegate_to_integrations_agent
spec. Fails before the synthesized_tool_names/pending-mask split.
…sai#3044)

Add a cfg(test) override for fetch_live_connected_toolkit_slugs_once so
the reject-path tests don't read host config or reach the Composio
backend over HTTP. Also flatten the known-after-recheck reconciliation
per review.
…mansai#3044)

Emit a grep-friendly [agent_loop] midturn:resync-delegation-tools debug
line with iteration + visible-tool count when a composio integrations
change triggers an in-turn tool-surface resync.
…verwriting (tinyhumansai#3044)

Two integration connects between consecutive user turns previously
clobbered each other: the second refresh's note overwrote the first in
the Option<String> field, so the earlier toolkit was never announced.
Park the newly-connected slugs as an order-preserving deduped Vec and
render the prose at consume time. Splits integration_announcement into
newly_connected_slugs (dedup vs announced set) + integration_announcement_note
(render). Adds a two-connect regression test.
@oxoxDev
Copy link
Copy Markdown
Contributor Author

oxoxDev commented Jun 1, 2026

@graycyrus great catch on the duplicate-spec state inconsistency — you nailed the exact mechanism. Fixed across 16ab684 and the three commits before it:

Duplicate ToolSpec on repeated mid-session connects (your main finding): synthesized_tool_names now stays in lock-step with tool_specs (always the latest synthesised set, since specs always reconcile). The stale tool instances that can't be dropped while the tools Arc is shared are tracked in a separate pending_synthesized_tools_mask, drained by the next unique-owner refresh. So the spec retain never runs against a stale mask and duplicates can't accumulate. New test refresh_delegation_tools_no_duplicate_specs_across_shared_arc_connects reproduces your gmail→notion→slack scenario with the Arc held shared throughout and asserts exactly one delegate spec survives.

Also addressed the rest of the review:

  • CodeRabbit – pending_integration_announcement overwrite: now an order-preserving deduped Vec<String>; two connects between user turns both surface instead of the second clobbering the first. Rendered at consume time; regression test added.
  • CodeRabbit – mid-turn resync observability: added a grep-friendly [agent_loop] midturn:resync-delegation-tools debug line with iteration + visible-tool count.
  • CodeRabbit – skill_delegation known flatten: applied.
  • CodeRabbit – non-hermetic execute_rejects_unknown_toolkit_with_allowed_list: added a cfg(test) override for the live status fetch so the reject-path tests never read host config or hit the Composio backend; both affected tests now pin the live result deterministically.
  • CodeRabbit – composio/bus.rs publish-block extraction: deferred (pure readability nitpick, would widen the diff into an otherwise-untouched file) — flagged on the inline thread for the follow-up pass.

On CI: the PR Submission Checklist red was a single unchecked coverage item, now resolved (the changed lines are covered by the new regression tests). The Rust Core Coverage red is a pre-existing flake on mainhttp_models_and_chat_use_mocked_ollama... / local_service_assets... in inference_voice_http_round23_raw_coverage_e2e.rs, unrelated to this PR's files (agent harness / composio), and main's own PR CI run is red on the same job. Local gates clean: cargo check --lib, cargo fmt --check, cargo clippy --lib --no-deps (no new findings on changed files), session + skill_delegation suites green.

@coderabbitai coderabbitai Bot added the bug label Jun 1, 2026
Copy link
Copy Markdown
Contributor

@graycyrus graycyrus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oxoxDev hey! the code looks good to me — the major issue from last round is properly addressed, and the follow-on fixes are clean. holding off on the formal approval until CI is fully green.

what changed since review 1:

  • The synthesized_tool_names / duplicate-spec bug is fixed. tool_specs now always reconciles via Arc::make_mut (never blocked by a shared Arc), synthesized_tool_names always advances to the new set unconditionally, and stale executable instances are tracked separately in pending_synthesized_tools_mask. The regression test refresh_delegation_tools_no_duplicate_specs_across_shared_arc_connects (gmail → notion → slack, Arc held shared throughout) is exactly the right coverage.

  • pending_integration_announcement changed from Option<String> to Vec<String> with prose rendered at consume time — correct, handles the multi-connect-between-turns case without losing earlier announcements.

  • The hermetic test overrides in skill_delegation (thread_local LIVE_FETCH_OVERRIDE) are the right pattern and don't touch production paths.

still open (non-blocking):

The slug normalization / publish duplication in composio/bus.rs (the filter/sort/dedup/publish block duplicated in ComposioConnectionCreatedSubscriber and ComposioConfigChangedSubscriber) is still unaddressed — understood you want to handle it separately. That's fine, just make sure it lands before too long.

Once CI goes green I'll approve.

memory_sources_validation_and_sync_classification_edges asserted
toolkit_from_slug(" MICROSOFT_TEAMS_SEND ") == Some("microsoft"), but the
slug map deliberately yields "microsoft_teams" (tool_scope.rs:98) and the
function's own unit test (tool_scope.rs) already asserts "microsoft_teams".
Align the e2e assertion with documented behavior; pre-existing failure on
main, unrelated to this PR's cost-audit changes.
@graycyrus graycyrus merged commit 4ab9e98 into tinyhumansai:main Jun 1, 2026
18 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent Built-in agents, prompts, orchestration, and agent runtime in src/openhuman/agent/. bug feature Net-new user-facing capability or product behavior. memory Memory store, memory tree, recall, summarization, and embeddings in src/openhuman/memory/.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refresh orchestrator integration context after OAuth (fixes stale delegate_to_integrations_agent)

2 participants