Skip to content
Merged
32 changes: 32 additions & 0 deletions scripts/run-dev-win.sh
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,19 @@ if [[ -z "$PNPM_EXE" ]]; then
exit 1
fi
echo "[run-dev-win] pnpm resolved to: $PNPM_EXE"

# `cargo tauri dev` runs its beforeDevCommand (`pnpm run dev`) via a native
# `cmd /S /C` that resolves bare `pnpm` off PATH. This script otherwise only
# ever calls pnpm by absolute path, so its dir was never on PATH and Tauri
# dies with "'pnpm' is not recognized". Prepend the resolved pnpm's dir — it
# ships pnpm.CMD alongside the bash shim, which cmd.exe uses.
# Split the dirname computation out of the export so a `dirname` failure
# surfaces with a non-zero exit (SC2155) instead of being swallowed by the
# enclosing `export`. `dirname` on a validated absolute path is reliable
# in practice, but the strict-mode posture is worth the extra line.
PNPM_DIR="$(dirname "$PNPM_EXE")"
export PATH="$PNPM_DIR:$PATH"
echo "[run-dev-win] pnpm dir prepended to PATH: $PNPM_DIR"
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
echo "[run-dev-win] node on bash PATH: $(command -v node 2>/dev/null || echo '<not found>')"
echo "[run-dev-win] node.exe on bash PATH: $(command -v node.exe 2>/dev/null || echo '<not found>')"

Expand Down Expand Up @@ -576,6 +589,25 @@ else
DEV_PORT=1420
fi

# Tauri spawns beforeDevCommand (`pnpm run dev`) via a native `cmd /S /C`
# inheriting THIS process's env. By here PATH has the full system PATH stacked
# several times over (vcvars rebuild + Git-Bash /etc/profile re-runs + pnpm
# .bin layering); the MSYS→Windows conversion overflows the process
# environment-block limit, so the child inherits an EMPTY PATH and Tauri dies
# with "'pnpm' is not recognized" (even `where` is gone). Collapse PATH to
# first-seen entries (clean POSIX `/c/...` entries, so ':' split is safe).
_dedup_seen=":"
_dedup_new=""
IFS=':' read -ra _dedup_parts <<< "$PATH"
for _dp in "${_dedup_parts[@]}"; do
[[ -z "$_dp" ]] && continue
case "$_dedup_seen" in *":$_dp:"*) continue ;; esac
_dedup_seen="${_dedup_seen}${_dp}:"
_dedup_new="${_dedup_new:+$_dedup_new:}$_dp"
done
export PATH="$_dedup_new"
echo "[run-dev-win] PATH de-duplicated: ${#_dedup_parts[@]} → $(awk -v RS=: 'END{print NR}' <<< "$_dedup_new") entries"

if (( DEV_PORT != 1420 )); then
echo "[run-dev-win] OPENHUMAN_DEV_PORT=$DEV_PORT — overriding tauri devUrl"
"$PNPM_EXE" tauri dev -c "{\"build\":{\"devUrl\":\"http://localhost:$DEV_PORT\"}}"
Expand Down
18 changes: 17 additions & 1 deletion src/openhuman/memory/tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,23 @@ async fn persist(
let written = tokio::task::spawn_blocking(move || -> Result<Option<usize>> {
use std::collections::{HashMap, HashSet};
store::with_connection(&config_owned, |conn| {
let tx = conn.unchecked_transaction()?;
// IMMEDIATE, not the default DEFERRED: this transaction reads
// (get_chunk_lifecycle_status_tx) before it writes
// (upsert_staged_chunks_tx). A DEFERRED tx takes only a read
// lock at BEGIN and tries to upgrade to a write lock on the
// first write; under contention with the memory_tree worker
// pool SQLite returns SQLITE_BUSY *immediately* for that
// upgrade and does NOT invoke the busy handler (deadlock
// avoidance), so the connection's 15s busy_timeout is bypassed
// and Gmail/Composio ingest fails every message with "database
// is locked", stalling composio_sync past its 30s RPC cap.
// IMMEDIATE acquires the write lock at BEGIN, where the busy
// handler / busy_timeout DOES apply, so writers serialise and
// wait instead of failing fast.
let tx = rusqlite::Transaction::new_unchecked(
conn,
rusqlite::TransactionBehavior::Immediate,
)?;

// Authoritative source-level gate (documents only).
//
Expand Down
260 changes: 241 additions & 19 deletions src/openhuman/memory/tree/jobs/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::openhuman::memory::tree::score::extract::build_summary_extractor;
use crate::openhuman::memory::tree::score::store as score_store;
use crate::openhuman::memory::tree::store as chunk_store;
use crate::openhuman::memory::tree::tree_global::digest::{self, DigestOutcome};
use crate::openhuman::memory::tree::tree_source::store as summary_store;
use crate::openhuman::memory::tree::tree_source::{
build_summariser, get_or_create_source_tree, LabelStrategy, LeafRef,
};
Expand Down Expand Up @@ -579,10 +580,22 @@ async fn handle_reembed_backfill(config: &Config, job: &Job) -> Result<JobOutcom
chunk_store::with_connection(config, |conn| {
let chunks: Vec<String> = {
let mut stmt = conn.prepare(
// The second NOT EXISTS — `mem_tree_chunk_reembed_skipped` —
// is the runaway-loop fix (#1574 §6): without it, rows whose
// body file is missing on disk (or whose embed failed
// terminally) keep matching the worklist on every batch
// because the failure path only LOG-skipped, never wrote
// anything persistent. The handler below now marks such
// rows in `mem_tree_chunk_reembed_skipped` so they're
// excluded here on the next batch and the chain can
// actually reach "fully covered".
"SELECT id FROM mem_tree_chunks c
WHERE NOT EXISTS (
SELECT 1 FROM mem_tree_chunk_embeddings e
WHERE e.chunk_id = c.id AND e.model_signature = ?1)
AND NOT EXISTS (
SELECT 1 FROM mem_tree_chunk_reembed_skipped s
WHERE s.chunk_id = c.id AND s.model_signature = ?1)
LIMIT ?2",
)?;
let ids = stmt
Expand All @@ -598,10 +611,16 @@ async fn handle_reembed_backfill(config: &Config, job: &Job) -> Result<JobOutcom
Vec::new()
} else {
let mut stmt = conn.prepare(
// Summary-side counterpart of the runaway-loop fix; see
// the chunks worklist above for the full rationale.
"SELECT id FROM mem_tree_summaries s
WHERE s.deleted = 0 AND NOT EXISTS (
WHERE s.deleted = 0
AND NOT EXISTS (
SELECT 1 FROM mem_tree_summary_embeddings e
WHERE e.summary_id = s.id AND e.model_signature = ?1)
AND NOT EXISTS (
SELECT 1 FROM mem_tree_summary_reembed_skipped sk
WHERE sk.summary_id = s.id AND sk.model_signature = ?1)
LIMIT ?2",
)?;
let ids = stmt
Expand All @@ -625,40 +644,98 @@ async fn handle_reembed_backfill(config: &Config, job: &Job) -> Result<JobOutcom

// Phase 2 (no tx held): embed each row's stored source text. Per-row
// errors are skipped (logged) so a single bad row can't strand memory.
//
// #1574 §6 fix: terminal failures (body file missing on disk, embed
// wrong dim, embed unrecoverable error) are *persistently* tombstoned
// via `mark_chunk_reembed_skipped` / `mark_summary_reembed_skipped`.
// The worklist queries above exclude these tombstones, so a single
// unembeddable row is attempted at most ONCE per signature instead of
// re-selected on every batch forever (the original bug: 16 orphans
// generating ~128k warns across ~8k defers, observed in the wild).
// The mark itself is best-effort — if its own SQLite write fails the
// row will be retried on a later batch, which is the desired fallback.
let embedder =
build_embedder_from_config(config).context("build embedder in reembed_backfill")?;
let mut chunk_vecs: Vec<(String, Vec<f32>)> = Vec::new();
for id in &chunk_ids {
match content_read::read_chunk_body(config, id) {
Ok(body) => match embedder.embed(&body).await {
Ok(v) if pack_checked(&v).is_ok() => chunk_vecs.push((id.clone(), v)),
Ok(_) => log::warn!(
"[memory_tree::jobs] reembed_backfill: chunk {id} embed wrong dim, skipping"
),
Err(e) => log::warn!(
"[memory_tree::jobs] reembed_backfill: chunk {id} embed failed: {e}; skipping"
),
Ok(_) => {
log::warn!(
"[memory_tree::jobs] reembed_backfill: chunk {id} embed wrong dim, skipping (sig={active_sig})"
);
let _ = chunk_store::mark_chunk_reembed_skipped(
config,
id,
&active_sig,
"embed wrong dim",
);
}
Err(e) => {
log::warn!(
"[memory_tree::jobs] reembed_backfill: chunk {id} embed failed: {e}; skipping (sig={active_sig})"
);
let _ = chunk_store::mark_chunk_reembed_skipped(
config,
id,
&active_sig,
&format!("embed failed: {e}"),
);
}
},
Err(e) => log::warn!(
"[memory_tree::jobs] reembed_backfill: chunk {id} body read failed: {e}; skipping"
),
Err(e) => {
log::warn!(
"[memory_tree::jobs] reembed_backfill: chunk {id} body read failed: {e}; skipping (sig={active_sig})"
);
let _ = chunk_store::mark_chunk_reembed_skipped(
config,
id,
&active_sig,
&format!("body read failed: {e}"),
);
}
}
}
let mut summary_vecs: Vec<(String, Vec<f32>)> = Vec::new();
for id in &summary_ids {
match content_read::read_summary_body(config, id) {
Ok(body) => match embedder.embed(&body).await {
Ok(v) if pack_checked(&v).is_ok() => summary_vecs.push((id.clone(), v)),
Ok(_) => log::warn!(
"[memory_tree::jobs] reembed_backfill: summary {id} embed wrong dim, skipping"
),
Err(e) => log::warn!(
"[memory_tree::jobs] reembed_backfill: summary {id} embed failed: {e}; skipping"
),
Ok(_) => {
log::warn!(
"[memory_tree::jobs] reembed_backfill: summary {id} embed wrong dim, skipping (sig={active_sig})"
);
let _ = summary_store::mark_summary_reembed_skipped(
config,
id,
&active_sig,
"embed wrong dim",
);
}
Err(e) => {
log::warn!(
"[memory_tree::jobs] reembed_backfill: summary {id} embed failed: {e}; skipping (sig={active_sig})"
);
let _ = summary_store::mark_summary_reembed_skipped(
config,
id,
&active_sig,
&format!("embed failed: {e}"),
);
}
},
Err(e) => log::warn!(
"[memory_tree::jobs] reembed_backfill: summary {id} body read failed: {e}; skipping"
),
Err(e) => {
log::warn!(
"[memory_tree::jobs] reembed_backfill: summary {id} body read failed: {e}; skipping (sig={active_sig})"
);
let _ = summary_store::mark_summary_reembed_skipped(
config,
id,
&active_sig,
&format!("body read failed: {e}"),
);
}
}
}

Expand Down Expand Up @@ -1147,6 +1224,151 @@ mod tests {
);
}

/// #1574 §6 regression gate: a terminal-failure chunk (its body file is
/// missing on disk, despite the metadata row staying staged) is
/// persistently tombstoned by `mark_chunk_reembed_skipped` on the first
/// pass, then excluded from the next batch's worklist so the chain
/// terminates (`Done`) instead of looping forever. Without this guard
/// the §6 runaway-loop fix would silently regress — the same 16 orphans
/// → ~8k defers → ~128k warns symptom observed in the wild before the
/// fix landed (see PR body and store.rs:1195).
///
/// What the test pins:
/// 1. Tombstone row is written for the failing chunk (exactly one).
/// 2. The next-batch worklist `NOT EXISTS … reembed_skipped` clause
/// excludes the tombstoned row — the handler returns `Done`.
/// 3. The `ensure_reembed_backfill` migration probe agrees the space
/// is covered (or the chain would re-arm on every config save).
#[tokio::test]
async fn reembed_backfill_tombstones_orphan_and_terminates() {
use crate::openhuman::memory::tree::store::{
get_chunk_content_path, get_chunk_embedding_for_signature, tree_active_signature,
upsert_chunks, upsert_staged_chunks_tx,
};
use crate::openhuman::memory::tree::types::{
chunk_id, Chunk, Metadata, SourceKind, SourceRef,
};

let (_tmp, cfg) = test_config();
let ts = chrono::Utc.timestamp_millis_opt(1_700_000_000_000).unwrap();
let chunk = Chunk {
id: chunk_id(SourceKind::Chat, "slack:#eng", 0, "orphan-seed"),
content: "memory content about the orphaned phoenix project".into(),
metadata: Metadata {
source_kind: SourceKind::Chat,
source_id: "slack:#eng".into(),
owner: "alice".into(),
timestamp: ts,
time_range: (ts, ts),
tags: vec![],
source_ref: Some(SourceRef::new("slack://x")),
},
token_count: 12,
seq_in_source: 0,
created_at: ts,
partial_message: false,
};
upsert_chunks(&cfg, &[chunk.clone()]).unwrap();

// Stage the body file + metadata, then DELETE the body file from
// disk while leaving the staged DB rows intact. Reproduces the
// in-wild failure mode: chunk row + path hash both present, but
// the body content was lost (user moved workspace dirs, partial
// backup restore, manual file cleanup). `stage_chunks` returns
// paths relative to `content_root`; resolve absolute before unlink.
let content_root = cfg.memory_tree_content_root();
std::fs::create_dir_all(&content_root).unwrap();
let staged = content_store::stage_chunks(&content_root, &[chunk.clone()]).unwrap();
with_connection(&cfg, |conn| {
let tx = conn.unchecked_transaction()?;
upsert_staged_chunks_tx(&tx, &staged)?;
tx.commit()?;
Ok(())
})
.unwrap();
let staged_rel = get_chunk_content_path(&cfg, &chunk.id)
.unwrap()
.expect("staged body path");
let body_abs = content_root.join(&staged_rel);
std::fs::remove_file(&body_abs).unwrap();

let sig = tree_active_signature(&cfg);
let job = mk_running_job(
JobKind::ReembedBackfill,
serde_json::to_string(&ReembedBackfillPayload {
signature: sig.clone(),
})
.unwrap(),
);

// Pass 1: worklist picks up the orphan, body read fails, tombstone
// written, `Defer` to revisit (the handler doesn't distinguish
// "all rows tombstoned" from "more rows pending" inside this batch).
let out1 = handle_reembed_backfill(&cfg, &job).await.unwrap();
assert!(
matches!(out1, JobOutcome::Defer { .. }),
"first pass should Defer after failing to read body, got {out1:?}"
);
assert!(
get_chunk_embedding_for_signature(&cfg, &chunk.id, &sig)
.unwrap()
.is_none(),
"orphan chunk must not have a sidecar vector after failure"
);

// (1) Tombstone row exists for exactly this (chunk, sig).
let tombstone_count: i64 = with_connection(&cfg, |conn| {
Ok(conn.query_row(
"SELECT COUNT(*) FROM mem_tree_chunk_reembed_skipped
WHERE chunk_id = ?1 AND model_signature = ?2",
params![chunk.id, sig],
|r| r.get(0),
)?)
})
.unwrap();
assert_eq!(
tombstone_count, 1,
"orphan chunk must be tombstoned exactly once"
);

// (2) Pass 2: worklist NOT EXISTS clause excludes the tombstoned
// row; both worklists empty; chain completes.
let out2 = handle_reembed_backfill(&cfg, &job).await.unwrap();
assert_eq!(
out2,
JobOutcome::Done,
"tombstoned-only state must complete the chain"
);

// (3) Migration probe in `ensure_reembed_backfill` must agree the
// space is covered, otherwise the chain re-arms on every config
// save and we're back to the original infinite-loop bug.
let probe_uncovered: bool = with_connection(&cfg, |conn| {
Ok(conn.query_row(
"SELECT EXISTS(
SELECT 1 FROM mem_tree_chunks c
WHERE NOT EXISTS (SELECT 1 FROM mem_tree_chunk_embeddings e
WHERE e.chunk_id = c.id AND e.model_signature = ?1)
AND NOT EXISTS (SELECT 1 FROM mem_tree_chunk_reembed_skipped sk
WHERE sk.chunk_id = c.id AND sk.model_signature = ?1))
OR EXISTS(
SELECT 1 FROM mem_tree_summaries s
WHERE s.deleted = 0
AND NOT EXISTS (SELECT 1 FROM mem_tree_summary_embeddings e
WHERE e.summary_id = s.id AND e.model_signature = ?1)
AND NOT EXISTS (SELECT 1 FROM mem_tree_summary_reembed_skipped sk
WHERE sk.summary_id = s.id AND sk.model_signature = ?1))",
params![sig],
|r| r.get(0),
)?)
})
.unwrap();
assert!(
!probe_uncovered,
"after tombstoning the only orphan, the ensure_reembed_backfill probe must report covered"
);
}

/// #1574 §4: `ensure_reembed_backfill` (the switch-path trigger) enqueues
/// exactly one chain when there is uncovered work, is idempotent on
/// re-call (per-signature dedupe), and enqueues nothing for an
Expand Down
Loading
Loading