Skip to content
Merged
42 changes: 42 additions & 0 deletions scripts/run-dev-win.sh
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,29 @@ if [[ -z "$PNPM_EXE" ]]; then
exit 1
fi
echo "[run-dev-win] pnpm resolved to: $PNPM_EXE"

# `cargo tauri dev` runs its beforeDevCommand (`pnpm run dev`) via a native
# `cmd /S /C` that resolves bare `pnpm` off PATH. This script otherwise only
# ever calls pnpm by absolute path, so its dir was never on PATH and Tauri
# dies with "'pnpm' is not recognized". Prepend the resolved pnpm's dir — it
# ships pnpm.CMD alongside the bash shim, which cmd.exe uses.
# Split the dirname computation out of the export so a `dirname` failure
# surfaces with a non-zero exit (SC2155) instead of being swallowed by the
# enclosing `export`. `dirname` on a validated absolute path is reliable
# in practice, but the strict-mode posture is worth the extra line.
PNPM_DIR="$(dirname "$PNPM_EXE")"
# `dirname` returns `.` for a bare filename (e.g. if PNPM_EXE somehow
# resolved to just "pnpm" without a path component). Prepending `.` would
# inject the current working directory into PATH on a Windows dev machine
# — a privilege-escalation-flavoured surprise. Skip the prepend in that
# case (and on the also-degenerate empty result); the absolute-path call
# sites elsewhere in this script still work.
if [[ -n "$PNPM_DIR" && "$PNPM_DIR" != "." ]]; then
export PATH="$PNPM_DIR:$PATH"
echo "[run-dev-win] pnpm dir prepended to PATH: $PNPM_DIR"
else
echo "[run-dev-win] pnpm dir not prepended to PATH (PNPM_EXE has no path component: $PNPM_EXE)"
fi
echo "[run-dev-win] node on bash PATH: $(command -v node 2>/dev/null || echo '<not found>')"
echo "[run-dev-win] node.exe on bash PATH: $(command -v node.exe 2>/dev/null || echo '<not found>')"

Expand Down Expand Up @@ -576,6 +599,25 @@ else
DEV_PORT=1420
fi

# Tauri spawns beforeDevCommand (`pnpm run dev`) via a native `cmd /S /C`
# inheriting THIS process's env. By here PATH has the full system PATH stacked
# several times over (vcvars rebuild + Git-Bash /etc/profile re-runs + pnpm
# .bin layering); the MSYS→Windows conversion overflows the process
# environment-block limit, so the child inherits an EMPTY PATH and Tauri dies
# with "'pnpm' is not recognized" (even `where` is gone). Collapse PATH to
# first-seen entries (clean POSIX `/c/...` entries, so ':' split is safe).
_dedup_seen=":"
_dedup_new=""
IFS=':' read -ra _dedup_parts <<< "$PATH"
for _dp in "${_dedup_parts[@]}"; do
[[ -z "$_dp" ]] && continue
case "$_dedup_seen" in *":$_dp:"*) continue ;; esac
_dedup_seen="${_dedup_seen}${_dp}:"
_dedup_new="${_dedup_new:+$_dedup_new:}$_dp"
done
export PATH="$_dedup_new"
echo "[run-dev-win] PATH de-duplicated: ${#_dedup_parts[@]} → $(awk -v RS=: 'END{print NR}' <<< "$_dedup_new") entries"

if (( DEV_PORT != 1420 )); then
echo "[run-dev-win] OPENHUMAN_DEV_PORT=$DEV_PORT — overriding tauri devUrl"
"$PNPM_EXE" tauri dev -c "{\"build\":{\"devUrl\":\"http://localhost:$DEV_PORT\"}}"
Expand Down
300 changes: 216 additions & 84 deletions src/openhuman/memory/sync_status/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@
//! Single SQL query against `mem_tree_chunks`. Two layers of metrics:
//!
//! * **Lifetime** — `chunks_synced` (total ingested), `chunks_pending`
//! (`embedding IS NULL` = still in the extract+embed queue, not
//! yet appended to the source-tree buffer).
//! (no row in the `mem_tree_chunk_embeddings` sidecar = still in the
//! extract+embed queue, not yet appended to the source-tree buffer).
//!
//! NOTE: "embedded" is keyed off the per-(chunk,model) sidecar table
//! `mem_tree_chunk_embeddings` (#1574), NOT the legacy inline
//! `mem_tree_chunks.embedding` column. The #1574 §7 migration copied
//! every vector into the sidecar and stopped writing the inline
//! column, so it now reads back NULL for every chunk. Keying pending /
//! processed off the inline column made this RPC report 100% of chunks
//! as pending and `0` processed forever, regardless of real progress.
//!
//! * **Active sync wave** — `batch_total` / `batch_processed`. The
//! wave is identified by a *time-cluster anchor*: the earliest
Expand All @@ -27,6 +35,7 @@
use crate::openhuman::config::Config;
use crate::openhuman::memory::tree::store::with_connection;
use crate::rpc::RpcOutcome;
use rusqlite::Connection;

use super::types::{FreshnessLabel, MemorySyncStatus, StatusListResponse};

Expand All @@ -44,89 +53,8 @@ pub async fn status_list_rpc(config: &Config) -> Result<RpcOutcome<StatusListRes
let config = config.clone();
let statuses: Vec<MemorySyncStatus> = match tokio::task::spawn_blocking(move || {
with_connection(&config, |conn| -> anyhow::Result<Vec<MemorySyncStatus>> {
// Provider parsed from `source_id` prefix (substring before
// first ':'); falls back to `source_kind` when no prefix.
//
// `provider_chunks` projects per-row provider + the columns
// we need. `provider_pending` flags providers that still
// have at least one chunk waiting for an embedding —
// `wave_anchors` is gated on this so a fully-drained
// provider gets `batch_total = batch_processed = 0` (the
// UI then hides the progress bar instead of rendering a
// completed one for an idle connection). `wave_anchors`
// finds the earliest chunk within WAVE_WINDOW_MS of the
// most recent — the wave's start. The outer SELECT joins
// back to count both lifetime and in-wave totals.
let mut stmt = conn.prepare(
"WITH provider_chunks AS ( \
SELECT \
CASE \
WHEN INSTR(source_id, ':') > 0 \
THEN SUBSTR(source_id, 1, INSTR(source_id, ':') - 1) \
ELSE source_kind \
END AS provider, \
created_at_ms, \
embedding, \
timestamp_ms \
FROM mem_tree_chunks \
), \
provider_max AS ( \
SELECT provider, MAX(created_at_ms) AS max_created \
FROM provider_chunks \
GROUP BY provider \
), \
provider_pending AS ( \
SELECT provider, \
SUM(CASE WHEN embedding IS NULL THEN 1 ELSE 0 END) AS pending \
FROM provider_chunks \
GROUP BY provider \
), \
wave_anchors AS ( \
SELECT p.provider, MIN(p.created_at_ms) AS anchor \
FROM provider_chunks p \
JOIN provider_max m ON p.provider = m.provider \
JOIN provider_pending pp ON p.provider = pp.provider \
WHERE pp.pending > 0 \
AND p.created_at_ms >= m.max_created - ?1 \
GROUP BY p.provider \
) \
SELECT \
p.provider, \
COUNT(*) AS chunks_synced, \
SUM(CASE WHEN p.embedding IS NULL THEN 1 ELSE 0 END) AS chunks_pending, \
SUM(CASE WHEN w.anchor IS NOT NULL \
AND p.created_at_ms >= w.anchor \
THEN 1 ELSE 0 END) AS batch_total, \
SUM(CASE WHEN w.anchor IS NOT NULL \
AND p.created_at_ms >= w.anchor \
AND p.embedding IS NOT NULL \
THEN 1 ELSE 0 END) AS batch_processed, \
MAX(p.timestamp_ms) AS last_chunk_at_ms \
FROM provider_chunks p \
LEFT JOIN wave_anchors w ON p.provider = w.provider \
GROUP BY p.provider \
ORDER BY last_chunk_at_ms DESC",
)?;
let now_ms = chrono::Utc::now().timestamp_millis();
let iter = stmt.query_map([WAVE_WINDOW_MS], |row| {
let provider: String = row.get(0)?;
let chunks_synced: i64 = row.get(1)?;
let chunks_pending: i64 = row.get(2)?;
let batch_total: i64 = row.get(3)?;
let batch_processed: i64 = row.get(4)?;
let last_chunk_at_ms: Option<i64> = row.get(5)?;
Ok(MemorySyncStatus {
provider,
chunks_synced: chunks_synced.max(0) as u64,
chunks_pending: chunks_pending.max(0) as u64,
batch_total: batch_total.max(0) as u64,
batch_processed: batch_processed.max(0) as u64,
last_chunk_at_ms,
freshness: FreshnessLabel::from_age_ms(last_chunk_at_ms, now_ms),
})
})?;
let out = iter.collect::<Result<Vec<_>, _>>()?;
Ok(out)
Ok(query_sync_statuses(conn, now_ms)?)
})
})
.await
Expand Down Expand Up @@ -159,6 +87,99 @@ pub async fn status_list_rpc(config: &Config) -> Result<RpcOutcome<StatusListRes
Ok(RpcOutcome::new(StatusListResponse { statuses }, vec![]))
}

/// Run the per-provider lifetime + active-wave aggregation against `conn`.
///
/// Split out from [`status_list_rpc`] so it can be unit-tested against a
/// tempdir-backed connection without the async / spawn_blocking wrapper.
///
/// "Embedded" is decided by the presence of a row in the
/// `mem_tree_chunk_embeddings` sidecar (any model signature), NOT the legacy
/// inline `mem_tree_chunks.embedding` column — see the module header.
fn query_sync_statuses(conn: &Connection, now_ms: i64) -> rusqlite::Result<Vec<MemorySyncStatus>> {
// Provider parsed from `source_id` prefix (substring before first ':');
// falls back to `source_kind` when no prefix.
//
// `provider_chunks` projects per-row provider + an `embedded` flag (sidecar
// row present). `provider_pending` flags providers that still have at least
// one un-embedded chunk — `wave_anchors` is gated on this so a fully-drained
// provider gets `batch_total = batch_processed = 0` (the UI then hides the
// progress bar instead of rendering a completed one for an idle connection).
// `wave_anchors` finds the earliest chunk within WAVE_WINDOW_MS of the most
// recent — the wave's start. The outer SELECT joins back to count both
// lifetime and in-wave totals.
let mut stmt = conn.prepare(
"WITH provider_chunks AS ( \
SELECT \
CASE \
WHEN INSTR(source_id, ':') > 0 \
THEN SUBSTR(source_id, 1, INSTR(source_id, ':') - 1) \
ELSE source_kind \
END AS provider, \
created_at_ms, \
CASE WHEN EXISTS ( \
SELECT 1 FROM mem_tree_chunk_embeddings e \
WHERE e.chunk_id = c.id \
) THEN 1 ELSE 0 END AS embedded, \
timestamp_ms \
FROM mem_tree_chunks c \
), \
provider_max AS ( \
SELECT provider, MAX(created_at_ms) AS max_created \
FROM provider_chunks \
GROUP BY provider \
), \
provider_pending AS ( \
SELECT provider, \
SUM(CASE WHEN embedded = 0 THEN 1 ELSE 0 END) AS pending \
FROM provider_chunks \
GROUP BY provider \
), \
wave_anchors AS ( \
SELECT p.provider, MIN(p.created_at_ms) AS anchor \
FROM provider_chunks p \
JOIN provider_max m ON p.provider = m.provider \
JOIN provider_pending pp ON p.provider = pp.provider \
WHERE pp.pending > 0 \
AND p.created_at_ms >= m.max_created - ?1 \
GROUP BY p.provider \
) \
SELECT \
p.provider, \
COUNT(*) AS chunks_synced, \
SUM(CASE WHEN p.embedded = 0 THEN 1 ELSE 0 END) AS chunks_pending, \
SUM(CASE WHEN w.anchor IS NOT NULL \
AND p.created_at_ms >= w.anchor \
THEN 1 ELSE 0 END) AS batch_total, \
SUM(CASE WHEN w.anchor IS NOT NULL \
AND p.created_at_ms >= w.anchor \
AND p.embedded = 1 \
THEN 1 ELSE 0 END) AS batch_processed, \
MAX(p.timestamp_ms) AS last_chunk_at_ms \
FROM provider_chunks p \
LEFT JOIN wave_anchors w ON p.provider = w.provider \
GROUP BY p.provider \
ORDER BY last_chunk_at_ms DESC",
)?;
let iter = stmt.query_map([WAVE_WINDOW_MS], |row| {
let provider: String = row.get(0)?;
let chunks_synced: i64 = row.get(1)?;
let chunks_pending: i64 = row.get(2)?;
let batch_total: i64 = row.get(3)?;
let batch_processed: i64 = row.get(4)?;
let last_chunk_at_ms: Option<i64> = row.get(5)?;
Ok(MemorySyncStatus {
provider,
chunks_synced: chunks_synced.max(0) as u64,
chunks_pending: chunks_pending.max(0) as u64,
batch_total: batch_total.max(0) as u64,
batch_processed: batch_processed.max(0) as u64,
last_chunk_at_ms,
freshness: FreshnessLabel::from_age_ms(last_chunk_at_ms, now_ms),
})
})?;
iter.collect()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -195,4 +216,115 @@ mod tests {
assert!(json.get("result").is_none(), "must not be double-wrapped");
assert!(json.get("logs").is_none(), "must not be double-wrapped");
}

/// Regression for the legacy-column bug: pending / processed must be
/// derived from the `mem_tree_chunk_embeddings` sidecar, not the inline
/// `mem_tree_chunks.embedding` column (which is always NULL post-#1574).
/// A chunk with a sidecar row counts as processed even though its inline
/// column is NULL.
#[test]
fn pending_and_processed_key_off_sidecar_not_inline_column() {
use crate::openhuman::memory::tree::store::with_connection;
use rusqlite::params;
use tempfile::TempDir;

let tmp = TempDir::new().expect("tempdir");
let mut cfg = Config::default();
cfg.workspace_dir = tmp.path().to_path_buf();

let now = chrono::Utc::now().timestamp_millis();

with_connection(&cfg, |conn| {
let insert_chunk = |id: &str, source_id: &str, created: i64| {
conn.execute(
"INSERT INTO mem_tree_chunks \
(id, source_kind, source_id, owner, timestamp_ms, \
time_range_start_ms, time_range_end_ms, content, \
token_count, seq_in_source, created_at_ms) \
VALUES (?1, 'email', ?2, 'me@x.com', ?3, ?3, ?3, 'body', 10, 0, ?3)",
params![id, source_id, created],
)
.unwrap();
};
let embed = |id: &str| {
conn.execute(
"INSERT INTO mem_tree_chunk_embeddings \
(chunk_id, model_signature, vector, dim, created_at) \
VALUES (?1, 'sig', X'00000000', 1, 0.0)",
params![id],
)
.unwrap();
};

// gmail: 3 chunks inside the active wave; 2 embedded (sidecar), 1 not.
insert_chunk("g1", "gmail:acct", now - 1_000);
insert_chunk("g2", "gmail:acct", now - 2_000);
insert_chunk("g3", "gmail:acct", now - 3_000);
embed("g1");
embed("g2");

let statuses = query_sync_statuses(conn, now).unwrap();
let gmail = statuses
.iter()
.find(|s| s.provider == "gmail")
.expect("gmail provider row");

assert_eq!(gmail.chunks_synced, 3, "all three ingested");
assert_eq!(
gmail.chunks_pending, 1,
"only g3 lacks a sidecar embedding (inline column is NULL for all)"
);
assert_eq!(gmail.batch_total, 3, "all three are within the wave window");
assert_eq!(
gmail.batch_processed, 2,
"g1 and g2 have sidecar rows, so they count as processed"
);
Ok(())
})
.unwrap();
}

/// A provider with every chunk embedded must report zero wave (the UI
/// hides the progress bar): `batch_total = batch_processed = 0`.
#[test]
fn fully_embedded_provider_reports_no_active_wave() {
use crate::openhuman::memory::tree::store::with_connection;
use rusqlite::params;
use tempfile::TempDir;

let tmp = TempDir::new().expect("tempdir");
let mut cfg = Config::default();
cfg.workspace_dir = tmp.path().to_path_buf();
let now = chrono::Utc::now().timestamp_millis();

with_connection(&cfg, |conn| {
conn.execute(
"INSERT INTO mem_tree_chunks \
(id, source_kind, source_id, owner, timestamp_ms, \
time_range_start_ms, time_range_end_ms, content, \
token_count, seq_in_source, created_at_ms) \
VALUES ('s1', 'slack', 'slack:eng', 'me@x.com', ?1, ?1, ?1, 'b', 10, 0, ?1)",
params![now - 5_000],
)
.unwrap();
conn.execute(
"INSERT INTO mem_tree_chunk_embeddings \
(chunk_id, model_signature, vector, dim, created_at) \
VALUES ('s1', 'sig', X'00000000', 1, 0.0)",
[],
)
.unwrap();

let statuses = query_sync_statuses(conn, now).unwrap();
let slack = statuses
.iter()
.find(|s| s.provider == "slack")
.expect("slack provider row");
assert_eq!(slack.chunks_pending, 0);
assert_eq!(slack.batch_total, 0, "no pending chunks ⇒ no active wave");
assert_eq!(slack.batch_processed, 0);
Ok(())
})
.unwrap();
}
}
Loading
Loading