Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
591 changes: 363 additions & 228 deletions hindsight-api-slim/hindsight_api/engine/consolidation/consolidator.py

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions hindsight-api-slim/hindsight_api/engine/entity_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import asyncio
import json
import logging
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import UTC, datetime
Expand Down Expand Up @@ -230,7 +231,7 @@ async def resolve_entities_batch(
unit_event_date,
conn=None,
entity_labels: list | None = None,
) -> list[str]:
) -> list[uuid.UUID]:
"""
Resolve multiple entities in batch (MUCH faster than sequential).

Expand Down Expand Up @@ -271,7 +272,7 @@ async def _resolve_entities_batch_impl(
unit_event_date,
taxonomy_lookup: set[str] | None = None,
labels_cfg=None,
) -> list[str]:
) -> list[uuid.UUID]:
if self.entity_lookup == "trigram":
# Route to backend-specific fuzzy strategy.
# Non-PG backends (Oracle) use UTL_MATCH instead of pg_trgm.
Expand Down Expand Up @@ -311,7 +312,7 @@ async def _resolve_entities_batch_full(
unit_event_date,
taxonomy_lookup: set[str] | None = None,
labels_cfg=None,
) -> list[str]:
) -> list[uuid.UUID]:
"""Original strategy: load all bank entities then match in Python."""
# Query ALL candidates for this bank
all_entities = await conn.fetch(
Expand Down Expand Up @@ -395,7 +396,7 @@ async def _resolve_entities_batch_trigram(
unit_event_date,
taxonomy_lookup: set[str] | None = None,
labels_cfg=None,
) -> list[str]:
) -> list[uuid.UUID]:
"""
Trigram strategy: fetch only similar candidates per entity name using pg_trgm.

Expand Down Expand Up @@ -499,7 +500,7 @@ async def _resolve_entities_batch_oracle_fuzzy(
unit_event_date: datetime | None,
taxonomy_lookup: set[str] | None = None,
labels_cfg=None,
) -> list[str]:
) -> list[uuid.UUID]:
"""
Oracle strategy: fetch similar candidates using UTL_MATCH.JARO_WINKLER_SIMILARITY.

Expand Down Expand Up @@ -607,7 +608,7 @@ async def _resolve_from_candidates(
cooccurrence_map: dict[str, set[str]],
taxonomy_lookup: set[str] | None = None,
labels_cfg=None,
) -> list[str]:
) -> list[uuid.UUID]:
"""Shared scoring + upsert logic used by both lookup strategies."""

# Resolve each entity using pre-fetched candidates
Expand Down
590 changes: 436 additions & 154 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py

Large diffs are not rendered by default.

130 changes: 89 additions & 41 deletions hindsight-api-slim/hindsight_api/engine/retain/fact_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,34 +163,21 @@ async def ensure_bank_exists(conn, bank_id: str, ops=None) -> None:
await create_bank_vector_indexes(conn, bank_id, str(internal_id), ops=ops)


async def delete_stale_observations_for_memories(
async def _snapshot_stale_observations(
conn,
bank_id: str,
fact_ids: "list[str | uuid.UUID]",
ops=None,
) -> int:
"""Delete observations whose source memories are about to be removed.

Mirrors the cleanup performed by ``MemoryEngine.delete_document`` so that
every code path that removes ``memory_units`` also removes the
observations derived from them. Without this, ingesting a fresh version
of a document via the retain pipeline (which does a full-replace
``DELETE FROM documents`` cascade) used to leave orphan observations
pointing at memory IDs that no longer existed.

For each observation referencing any of ``fact_ids``:
1. Delete the observation row (its text is stale once even one source
memory disappears).
2. Reset ``consolidated_at = NULL`` on the surviving source memories so
they get re-consolidated under fresh observations on the next run.

Must be called within an active transaction, before the source memories
are deleted.
) -> "tuple[list, list]":
"""Find observations derived from ``fact_ids`` and the surviving co-sources to reset.

Returns the number of observations deleted.
Returns ``(obs_ids, remaining_source_ids)``. Read-only: performs no writes, so it
reads the affected observation rows by id array / junction (which survive source
deletion) and may run either before or after the source memories are removed.
Returns ``([], [])`` when ``fact_ids`` is empty or no observation references them.
"""
if not fact_ids:
return 0
return [], []

fact_uuids = [uuid.UUID(str(fid)) if not isinstance(fid, uuid.UUID) else fid for fid in fact_ids]

Expand Down Expand Up @@ -226,7 +213,7 @@ async def delete_stale_observations_for_memories(
)

if not affected_obs:
return 0
return [], []

deleted_set = {str(uid) for uid in fact_uuids}
obs_ids = [obs["id"] for obs in affected_obs]
Expand All @@ -239,6 +226,22 @@ async def delete_stale_observations_for_memories(
remaining_source_ids.append(src_id)
seen_remaining.add(src_str)

return obs_ids, remaining_source_ids


async def _apply_stale_observation_deletion(
conn,
bank_id: str,
obs_ids: list,
remaining_source_ids: list,
) -> int:
"""Delete the snapshotted observation rows and reset surviving co-sources.

Returns the number of observations deleted (0 when ``obs_ids`` is empty).
"""
if not obs_ids:
return 0

await conn.execute(
f"DELETE FROM {fq_table('memory_units')} WHERE id = ANY($1::uuid[])",
obs_ids,
Expand All @@ -262,6 +265,40 @@ async def delete_stale_observations_for_memories(
return len(obs_ids)


async def delete_stale_observations_for_memories(
conn,
bank_id: str,
fact_ids: "list[str | uuid.UUID]",
ops=None,
) -> int:
"""Delete observations whose source memories are about to be removed.

Mirrors the cleanup performed by ``MemoryEngine.delete_document`` so that
every code path that removes ``memory_units`` also removes the
observations derived from them. Without this, ingesting a fresh version
of a document via the retain pipeline (which does a full-replace
``DELETE FROM documents`` cascade) used to leave orphan observations
pointing at memory IDs that no longer existed.

For each observation referencing any of ``fact_ids``:
1. Delete the observation row (its text is stale once even one source
memory disappears).
2. Reset ``consolidated_at = NULL`` on the surviving source memories so
they get re-consolidated under fresh observations on the next run.

Must be called within an active transaction. Order-independent w.r.t. the source
delete: it snapshots the affected observations (by id array / junction, which
survive source deletion) before writing, so it is safe to invoke either before or
after the source memories are removed.

Returns the number of observations deleted.
"""
obs_ids, remaining_source_ids = await _snapshot_stale_observations(conn, bank_id, fact_ids, ops=ops)
if not obs_ids:
return 0
return await _apply_stale_observation_deletion(conn, bank_id, obs_ids, remaining_source_ids)


async def handle_document_tracking(
conn,
bank_id: str,
Expand Down Expand Up @@ -300,11 +337,13 @@ async def handle_document_tracking(

# Delete old document first (cascades to units and links).
# Only delete on the first batch to avoid deleting data we just inserted.
# Before the cascade, fan out to delete observations derived from the
# outgoing memory_units — otherwise the FK ON DELETE CASCADE removes the
# source memory_units but leaves observation rows pointing at IDs that
# no longer exist (consolidated_at on co-source memories also stays
# frozen). Same cleanup the explicit ``delete_document`` API performs.
# Snapshot the observations derived from the outgoing memory_units BEFORE the
# delete, then remove those observations (and reset co-source consolidated_at)
# AFTER the sources are gone. Otherwise the FK ON DELETE CASCADE removes the
# source memory_units but leaves observation rows pointing at IDs that no longer
# exist (and co-source consolidated_at stays frozen). Same cleanup the explicit
# ``delete_document`` API performs; sources are deleted first to keep the
# SOURCE -> OBSERVATION lock order.
preserved_created_at = None
if is_first_batch:
existing_unit_rows = await conn.fetch(
Expand All @@ -315,32 +354,41 @@ async def handle_document_tracking(
document_id,
)
existing_unit_ids = [row["id"] for row in existing_unit_rows]
obs_ids: list = []
remaining_source_ids: list = []
if existing_unit_ids:
invalidated = await delete_stale_observations_for_memories(conn, bank_id, existing_unit_ids, ops=ops)
if invalidated:
logger.info(
f"[RETAIN] Document {document_id} re-ingested: invalidated "
f"{invalidated} observation(s) derived from {len(existing_unit_ids)} outgoing memory_units"
)
# Snapshot affected observations BEFORE deleting sources (the PG array /
# Oracle junction are read here while still intact); apply the deletion
# AFTER the source delete so the lock order is SOURCE -> OBSERVATION,
# matching consolidation/invalidation and avoiding a cross-order deadlock.
obs_ids, remaining_source_ids = await _snapshot_stale_observations(
conn, bank_id, existing_unit_ids, ops=ops
)
# Capture link-recompute victims BEFORE the cascade. Same staleness
# applies on upsert as on explicit delete: surviving units in OTHER
# documents that linked to these doomed units are about to lose
# those links. ``ops`` may be None for older callers that haven't
# been wired up — skip enqueue in that case rather than crash.
# documents that linked to these doomed units are about to lose those
# links. ``ops`` may be None for older callers — skip enqueue in that case.
if ops is not None:
from ..graph_maintenance import enqueue_relink_victims

await enqueue_relink_victims(conn, bank_id, [str(uid) for uid in existing_unit_ids], ops=ops)
# Explicitly delete memory_units by document_id BEFORE deleting the
# document row. The CASCADE from documents→chunks→memory_units only
# catches units that have a non-NULL chunk_id FK. Units with chunk_id=NULL
# (e.g. from partial writes or edge cases) would survive the cascade.
# This explicit delete ensures complete cleanup.
# Delete source memory_units FIRST. The CASCADE from documents->chunks->
# memory_units only catches units with a non-NULL chunk_id FK; units with
# chunk_id=NULL would survive. Deleting sources before the derived
# observations also fixes the lock order (SOURCE -> OBSERVATION).
await conn.execute(
f"DELETE FROM {fq_table('memory_units')} WHERE document_id = $1 AND bank_id = $2",
document_id,
bank_id,
)
# Then delete the affected observations + reset surviving co-sources.
if obs_ids:
invalidated = await _apply_stale_observation_deletion(conn, bank_id, obs_ids, remaining_source_ids)
if invalidated:
logger.info(
f"[RETAIN] Document {document_id} re-ingested: invalidated "
f"{invalidated} observation(s) derived from {len(existing_unit_ids)} outgoing memory_units"
)
# Capture created_at before deletion so re-ingestion preserves it.
preserved_created_at = await conn.fetchval(
f"DELETE FROM {fq_table('documents')} WHERE id = $1 AND bank_id = $2 RETURNING created_at",
Expand Down
5 changes: 3 additions & 2 deletions hindsight-api-slim/hindsight_api/engine/retain/link_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import time
import uuid
from datetime import UTC, datetime, timedelta

from ..._vector_index import ann_search_tuning_settings, configured_vector_extension
Expand Down Expand Up @@ -300,7 +301,7 @@ async def resolve_entities_only(
llm_entities: list[list[dict]],
log_buffer: list[str] = None,
entity_labels: list | None = None,
) -> tuple[list[str], list[tuple], dict[str, list[str]]]:
) -> tuple[list[uuid.UUID], list[tuple], dict[str, list[uuid.UUID]]]:
"""
Phase 1 of entity processing: resolve entity names to canonical IDs.

Expand Down Expand Up @@ -350,7 +351,7 @@ async def resolve_entities_only(
)

# Build unit_to_entity_ids mapping
unit_to_entity_ids: dict[str, list[str]] = {}
unit_to_entity_ids: dict[str, list[uuid.UUID]] = {}
for idx, (unit_id, _local_idx, _fact_date) in enumerate(entity_to_unit):
if unit_id not in unit_to_entity_ids:
unit_to_entity_ids[unit_id] = []
Expand Down
Loading