Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7c4c768
feat(agent): add agent_memory domain and memory retrieval agent
senamakel Jun 5, 2026
a95e64b
refactor(agent_memory): move memory_loader into agent_memory domain
senamakel Jun 5, 2026
80fffce
feat(agent_memory): add call_memory_agent tool and move agent def int…
senamakel Jun 5, 2026
abab50c
fix(memory): handle XML-style tool calls in smart walk parser
senamakel Jun 6, 2026
71b0135
Merge remote-tracking branch 'upstream/main' into feat/agent-memory-d…
senamakel Jun 6, 2026
15bf6d8
test(memory): add e2e tests for smart walk with mock LLM responses
senamakel Jun 6, 2026
622e636
style: apply cargo fmt formatting
senamakel Jun 6, 2026
973c464
refactor(agents): replace memory_recall/memory_tree/query_memory with…
senamakel Jun 6, 2026
e3b8371
fix: address CodeRabbit review feedback
senamakel Jun 6, 2026
63c34d8
style: apply cargo fmt line wrapping
senamakel Jun 6, 2026
df4b3d0
fix(test): update orchestrator_lists_memory_tree_tools for call_memor…
senamakel Jun 6, 2026
f966cb9
refactor: split 5 large modules into sub-module directories
senamakel Jun 6, 2026
a3fb68a
refactor(composio): split ops.rs (1746 lines) into ops/ directory
senamakel Jun 6, 2026
551b9f3
refactor(config): split schemas.rs (2004 lines) into schemas/ directory
senamakel Jun 6, 2026
d5fbbde
refactor(agent): split session/turn.rs (1908 lines) into turn/ directory
senamakel Jun 6, 2026
a97d8b9
refactor(agent): split session/builder.rs (1789 lines) into builder/ …
senamakel Jun 6, 2026
b70f8b2
refactor(agent): split subagent_runner/ops.rs (1947 lines) into ops/ …
senamakel Jun 6, 2026
7fa554f
refactor(config): fix schemas/ test re-exports and controller imports
senamakel Jun 6, 2026
dc3d4b8
refactor(memory): split query/smart_walk.rs (1695 lines) into smart_w…
senamakel Jun 6, 2026
1cd656a
refactor(security): split policy.rs (1426 lines) into policy/ directory
senamakel Jun 6, 2026
90148ea
refactor(agent): split prompts/mod.rs (1517 lines) into sub-modules
senamakel Jun 6, 2026
070b43a
refactor(memory): split schema.rs (1458 lines) into schema/ directory
senamakel Jun 6, 2026
c21cbc3
refactor(mcp_server): split tools.rs (1438 lines) into tools/ directory
senamakel Jun 6, 2026
65b54c3
refactor(agent): split task_dispatcher.rs (1358 lines) into sub-modules
senamakel Jun 6, 2026
fa25875
refactor(workflows): split schemas.rs (1429 lines) into schemas/ dire…
senamakel Jun 6, 2026
e2f083b
refactor(voice): split factory.rs into factory/ directory
senamakel Jun 6, 2026
bb2c17b
refactor(channels): split controllers/ops.rs into ops/ directory
senamakel Jun 6, 2026
fa562ee
refactor(memory_store): split content/compose.rs into compose/ directory
senamakel Jun 6, 2026
509a83b
refactor(inference): split provider/ops.rs into ops/ directory
senamakel Jun 6, 2026
9b0ecc2
refactor(agent): split harness/archivist.rs into archivist/ directory
senamakel Jun 6, 2026
2c6ffac
style: apply cargo fmt to all split modules
senamakel Jun 6, 2026
36e289e
style: apply cargo fmt to smart_walk tests
senamakel Jun 6, 2026
9770d22
Merge remote-tracking branch 'upstream/main' into refactor/split-larg…
senamakel Jun 6, 2026
c4f0a3c
fix(docs): update README links for split module directories
senamakel Jun 6, 2026
5fce6f0
fix(visibility): make test-re-exported items pub(crate) in sub-modules
senamakel Jun 6, 2026
3bea299
Merge remote-tracking branch 'upstream/main' into refactor/split-larg…
senamakel Jun 6, 2026
e3a317d
Merge remote-tracking branch 'upstream/main' into refactor/split-larg…
senamakel Jun 6, 2026
7938798
fix: resolve merge conflict + fix remaining test compilation errors
senamakel Jun 6, 2026
3f31a08
style: apply cargo fmt ordering to test re-exports
senamakel Jun 6, 2026
5a9ebd8
fix: update rpcMethods test to use split schemas path
senamakel Jun 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,257 changes: 0 additions & 1,257 deletions src/openhuman/agent/harness/archivist.rs

This file was deleted.

143 changes: 143 additions & 0 deletions src/openhuman/agent/harness/archivist/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! Small utility functions used across archivist sub-modules.

use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
use std::time::{SystemTime, UNIX_EPOCH};

/// Strip tool-call JSON blocks from an assistant response, leaving only the
/// prose text.
///
/// The archivist stores the full response (including `tool_calls_json`) in
/// the episodic log for diagnostic purposes. However, per the memory
/// ingestion policy, structured tool-call payloads must not reach the memory
/// tree — only the assistant's natural-language prose is ingested.
///
/// This function applies a lightweight heuristic: it removes any contiguous
/// spans of text that look like `<tool_call>…</tool_call>` XML/JSON blocks or
/// raw JSON objects that begin with `{"tool_calls":`. The output may be empty
/// if the entire response was tool-call markup — callers should handle that
/// case (empty text → no-op ingest).
pub(super) fn strip_tool_calls_from_response(response: &str) -> String {
// Fast path: if the response contains no obvious tool-call markers, return
// it unchanged to avoid unnecessary allocation.
if !response.contains("<tool_call>")
&& !response.contains("{\"tool_calls\"")
&& !response.contains("\"tool_use\"")
{
return response.to_string();
}

// Remove XML-style tool-call blocks.
let mut cleaned = response.to_string();

// Strip <tool_call>…</tool_call> spans (may span multiple lines).
while let Some(start) = cleaned.find("<tool_call>") {
if let Some(end) = cleaned[start..].find("</tool_call>") {
cleaned.drain(start..start + end + "</tool_call>".len());
} else {
// Unclosed tag — remove from the tag to end of string.
cleaned.truncate(start);
break;
}
}

// Drop JSON / tool-use payload lines the XML strip above cannot catch
// (evidence-vs-interpretation policy: tool-call payloads must never reach
// tree ingest).
cleaned = cleaned
.lines()
.filter(|line| {
let l = line.trim();
!(l.contains("\"tool_use\"")
|| l.starts_with("{\"tool_calls\"")
|| l.starts_with("\"tool_calls\""))
})
.collect::<Vec<_>>()
.join("\n");

// Trim and collapse runs of blank lines left by block removal.
let trimmed = cleaned
.lines()
.map(str::trim_end)
.collect::<Vec<_>>()
.join("\n");

// Collapse more than two consecutive newlines to two.
let mut result = String::with_capacity(trimmed.len());
let mut blank_run = 0usize;
for line in trimmed.lines() {
if line.is_empty() {
blank_run += 1;
if blank_run <= 2 {
result.push('\n');
}
} else {
blank_run = 0;
result.push_str(line);
result.push('\n');
}
}

result.trim().to_string()
}

/// Extract simple lessons from tool call outcomes (no LLM needed).
pub(super) fn extract_lesson_from_tools(
tool_calls: &[crate::openhuman::agent::hooks::ToolCallRecord],
) -> Option<String> {
let failures: Vec<&str> = tool_calls
.iter()
.filter(|tc| !tc.success)
.map(|tc| tc.name.as_str())
.collect();

if failures.is_empty() {
return None;
}

Some(format!(
"Tools that failed in this turn: {}",
failures.join(", ")
))
}

/// Extract a short profile key from event content (first few meaningful words).
pub(crate) fn extract_profile_key(content: &str, prefix: &str) -> String {
let words: Vec<&str> = content
.split_whitespace()
.filter(|w| w.len() > 2)
.take(4)
.collect();
let key = words.join("_").to_lowercase();
let key = key
.chars()
.filter(|c| c.is_ascii_alphanumeric() || *c == '_')
.collect::<String>();
if key.is_empty() {
format!("{prefix}_unknown")
} else {
format!("{prefix}_{key}")
}
}

/// Generate a simple UUID v4 (random).
pub(super) fn uuid_v4() -> String {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
format!("{:x}{:08x}", nanos, rand_u32())
}

/// Simple random u32 from system entropy.
fn rand_u32() -> u32 {
let state = RandomState::new();
let mut hasher = state.build_hasher();
hasher.write_u64(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64,
);
hasher.finish() as u32
}
155 changes: 155 additions & 0 deletions src/openhuman/agent/harness/archivist/hook_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! `PostTurnHook` implementation for `ArchivistHook`.

use super::helpers::extract_lesson_from_tools;
use super::types::ArchivistHook;
use crate::openhuman::agent::hooks::{PostTurnHook, TurnContext};
use crate::openhuman::memory_store::fts5::{self, EpisodicEntry};
use async_trait::async_trait;

#[async_trait]
impl PostTurnHook for ArchivistHook {
fn name(&self) -> &str {
"archivist"
}

async fn on_turn_complete(&self, ctx: &TurnContext) -> anyhow::Result<()> {
if !self.enabled {
return Ok(());
}

let Some(conn) = &self.conn else {
return Ok(());
};

let session_id = ctx.session_id.as_deref().unwrap_or("unknown");
let timestamp = Self::now_timestamp();

tracing::debug!(
"[archivist] indexing turn: session={session_id}, tools={}, duration={}ms",
ctx.tool_calls.len(),
ctx.turn_duration_ms
);

// Index user message.
fts5::episodic_insert(
conn,
&EpisodicEntry {
id: None,
session_id: session_id.to_string(),
timestamp,
role: "user".to_string(),
content: ctx.user_message.clone(),
lesson: None,
tool_calls_json: None,
cost_microdollars: 0,
},
)?;

// Retrieve the inserted episodic ID for segment tracking.
let current_episodic_id = {
let db = conn.lock();
db.query_row("SELECT last_insert_rowid()", [], |row| row.get::<_, i64>(0))
.unwrap_or(1)
};

// Index assistant response with tool call summary.
let tool_calls_json = if ctx.tool_calls.is_empty() {
None
} else {
Some(serde_json::to_string(&ctx.tool_calls).unwrap_or_default())
};

// Extract a simple lesson from tool failures (lightweight, no LLM needed).
let lesson = extract_lesson_from_tools(&ctx.tool_calls);

fts5::episodic_insert(
conn,
&EpisodicEntry {
id: None,
session_id: session_id.to_string(),
// Offset by 1ms so assistant entries sort after user entries within
// the same turn. Relies on turn timestamps having >=1ms resolution.
timestamp: timestamp + 0.001,
role: "assistant".to_string(),
content: ctx.assistant_response.clone(),
lesson,
tool_calls_json,
cost_microdollars: 0,
},
)?;

tracing::debug!("[archivist] episodic rows written: session={session_id}");

// Dual-write into memory_archivist::store (md-backed) so we can
// validate the FTS5 → md migration before flipping the read side.
// Best-effort: a write failure here must not break the turn. The
// user turn's assigned seq is captured into `current_seq` so the
// segment ops can store it alongside the FTS5 episodic id.
let mut current_seq: Option<u32> = None;
if let Some(cfg) = self.config.as_ref() {
let ts_ms = (timestamp * 1000.0) as i64;
let user_turn = crate::openhuman::memory_archivist::ArchivedTurn {
session_id: session_id.to_string(),
seq: 0, // assigned by record_turn
timestamp_ms: ts_ms,
role: "user".to_string(),
content: ctx.user_message.clone(),
lesson: None,
tool_calls_json: None,
cost_microdollars: 0,
};
match crate::openhuman::memory_archivist::store::record_turn(cfg, user_turn) {
Ok(stored) => current_seq = Some(stored.seq),
Err(e) => {
tracing::warn!("[archivist] memory_archivist user dual-write failed: {e}");
}
}
// Assistant turn carries the tool_calls_json + lesson the FTS5
// insert just wrote. Re-derive locally so we don't depend on
// FTS5 having returned.
let assistant_lesson = extract_lesson_from_tools(&ctx.tool_calls);
let assistant_tool_calls = if ctx.tool_calls.is_empty() {
None
} else {
Some(serde_json::to_string(&ctx.tool_calls).unwrap_or_default())
};
let assistant_turn = crate::openhuman::memory_archivist::ArchivedTurn {
session_id: session_id.to_string(),
seq: 0,
timestamp_ms: ts_ms + 1,
role: "assistant".to_string(),
content: ctx.assistant_response.clone(),
lesson: assistant_lesson,
tool_calls_json: assistant_tool_calls,
cost_microdollars: 0,
};
if let Err(e) =
crate::openhuman::memory_archivist::store::record_turn(cfg, assistant_turn)
{
tracing::warn!("[archivist] memory_archivist assistant dual-write failed: {e}");
}
}

// Manage conversation segmentation (sync boundary detection + SQLite
// operations). Returns the just-closed segment when a boundary fired.
let closed_segment = self.manage_segment_sync(
conn,
session_id,
timestamp,
&ctx.user_message,
current_episodic_id,
current_seq,
);

// Run async recap + embed + segment-tree ingest on the closed segment
// (if any). Per-turn tree ingest is intentionally absent — Phase 2
// moves the tree write to segment granularity inside on_segment_closed.
if let Some(ref segment) = closed_segment {
let now = Self::now_timestamp();
self.on_segment_closed(conn, segment, session_id, now).await;
}

tracing::debug!("[archivist] turn indexed successfully: session={session_id}");
Ok(())
}
}
Loading
Loading