diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index e34f6df48a..ba79dfc6ca 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -4449,6 +4449,90 @@ def merge_entities( ) ) + async def acheck_graph_consistency(self) -> dict[str, Any]: + """Asynchronously check consistency between the graph and the relationships VDB. + + Detects edges that exist in the knowledge graph but have no corresponding + entry in the relationships vector database (orphan edges). These can + accumulate after merge or delete operations where the graph write succeeds + but the subsequent VDB upsert fails. + + Returns: + A dict with keys: + + - ``orphan_graph_edges``: list of ``(src, tgt)`` tuples present in + the graph but missing from the VDB. + - ``total_graph_edges``: total number of edges in the graph. + - ``total_vdb_relations``: number of those edges that have a matching + VDB entry. + + Example:: + + report = await rag.acheck_graph_consistency() + print(report["orphan_graph_edges"]) # [(src, tgt), ...] + """ + from lightrag.utils_graph import check_graph_consistency + + return await check_graph_consistency( + self.chunk_entity_relation_graph, + self.relationships_vdb, + ) + + async def arepair_graph_consistency( + self, *, dry_run: bool = False + ) -> dict[str, Any]: + """Asynchronously detect and repair graph ↔ VDB consistency issues. + + Finds edges that exist in the knowledge graph but have no corresponding + entry in the relationships VDB (orphan edges), then removes them from + the graph so that both stores are back in sync. + + This is safe to run on a live instance: it only removes edges that have + no VDB counterpart and could therefore never be retrieved by a query. + + Args: + dry_run: When ``True``, report issues without making any changes. + Defaults to ``False``. + + Returns: + A dict with keys: + + - ``orphan_graph_edges``: list of ``(src, tgt)`` tuples found + (and removed when ``dry_run=False``). + - ``total_graph_edges``: total number of edges before any repair. + - ``total_vdb_relations``: number of edges that had a matching VDB entry. + - ``repaired``: ``True`` if orphan edges were removed. + + Example:: + + # Inspect first + report = await rag.arepair_graph_consistency(dry_run=True) + print(f"{len(report['orphan_graph_edges'])} orphan edges found") + + # Then fix + report = await rag.arepair_graph_consistency() + print(f"Repaired: {report['repaired']}") + """ + from lightrag.utils_graph import repair_graph_consistency + + return await repair_graph_consistency( + self.chunk_entity_relation_graph, + self.relationships_vdb, + dry_run=dry_run, + ) + + def check_graph_consistency(self) -> dict[str, Any]: + """Synchronous wrapper for :meth:`acheck_graph_consistency`.""" + return always_get_an_event_loop().run_until_complete( + self.acheck_graph_consistency() + ) + + def repair_graph_consistency(self, *, dry_run: bool = False) -> dict[str, Any]: + """Synchronous wrapper for :meth:`arepair_graph_consistency`.""" + return always_get_an_event_loop().run_until_complete( + self.arepair_graph_consistency(dry_run=dry_run) + ) + async def aexport_data( self, output_path: str, diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py index 4468b8092d..3909731ea4 100644 --- a/lightrag/utils_graph.py +++ b/lightrag/utils_graph.py @@ -1379,7 +1379,7 @@ async def _merge_entities_impl( } # Apply relationship updates - logger.info(f"Entity Merge: updatign {len(relation_updates)} relations") + logger.info(f"Entity Merge: updating {len(relation_updates)} relations") for rel_data in relation_updates.values(): await chunk_entity_relation_graph.upsert_edge( rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] @@ -1412,6 +1412,9 @@ async def _merge_entities_impl( ) await relationships_vdb.delete(relations_to_delete) + # Track edges whose VDB write fails so we can roll back the matching graph write. + # This prevents graph/VDB drift: either both stores have the edge, or neither does. + vdb_failed_edges: list[tuple[str, str]] = [] for rel_data in relation_updates.values(): edge_data = rel_data["data"] normalized_src = rel_data["norm_src"] @@ -1438,12 +1441,31 @@ async def _merge_entities_impl( "file_path": edge_data.get("file_path", ""), } } - await relationships_vdb.upsert(relation_data_for_vdb) - logger.debug( - f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" + try: + await relationships_vdb.upsert(relation_data_for_vdb) + logger.debug( + f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" + ) + except Exception as e: + logger.warning( + f"Entity Merge: VDB upsert failed for `{normalized_src}`~`{normalized_tgt}`: {e}. " + f"Rolling back graph edge to maintain graph/VDB consistency." + ) + vdb_failed_edges.append((rel_data["graph_src"], rel_data["graph_tgt"])) + + # Roll back graph edges whose VDB upsert failed so the two stores stay in sync. + # Without this, source-entity deletion (step 10) leaves orphan edges in the graph. + if vdb_failed_edges: + logger.warning( + f"Entity Merge: rolling back {len(vdb_failed_edges)} graph edge(s) " + f"due to VDB upsert failures" ) + await chunk_entity_relation_graph.remove_edges(vdb_failed_edges) - logger.info(f"Entity Merge: {len(relation_updates)} relations in vdb updated") + logger.info( + f"Entity Merge: {len(relation_updates) - len(vdb_failed_edges)}/{len(relation_updates)} " + f"relations in vdb updated" + ) # 8. Update entity vector representation description = merged_entity_data.get("description", "") @@ -1763,3 +1785,139 @@ async def get_relation_info( result["vector_data"] = vector_data return result + + +async def check_graph_consistency( + chunk_entity_relation_graph, + relationships_vdb, +) -> dict[str, Any]: + """Check consistency between graph storage and the relationships vector database. + + An orphan graph edge is an edge that exists in the knowledge graph but has no + corresponding entry in the relationships VDB. This can happen when a merge or + delete operation writes the graph edge successfully but the subsequent VDB upsert + fails (e.g. embedder crash, context-length exceeded, network timeout). + + Args: + chunk_entity_relation_graph: Graph storage instance. + relationships_vdb: Relationships vector database storage instance. + + Returns: + A dict with the following keys: + + - ``orphan_graph_edges``: list of ``(src, tgt)`` tuples present in the graph + but missing from the VDB. + - ``total_graph_edges``: total number of edges in the graph. + - ``total_vdb_relations``: number of those graph edges that have a matching + VDB entry. + """ + all_edges = await chunk_entity_relation_graph.get_all_edges() + if not all_edges: + return { + "orphan_graph_edges": [], + "total_graph_edges": 0, + "total_vdb_relations": 0, + } + + # Some backends (e.g. PostgreSQL/AGE) return entity_id values wrapped in + # double-quotes when casting agtype to text. Strip them for consistency. + def _clean(value: Any) -> str: + s = str(value) if value is not None else "" + if len(s) >= 2 and s[0] == '"' and s[-1] == '"': + s = s[1:-1] + return s + + # Build a mapping: VDB relation_id → (raw_src, raw_tgt) from the graph + id_to_pair: dict[str, tuple[str, str]] = {} + for edge in all_edges: + raw_src = edge.get("source", "") + raw_tgt = edge.get("target", "") + src = _clean(raw_src) + tgt = _clean(raw_tgt) + if not src or not tgt: + continue + normalized_src, normalized_tgt = sorted([src, tgt]) + relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-") + # Keep the original (un-normalized) direction so callers can pass it + # directly to remove_edges() if needed. + id_to_pair.setdefault(relation_id, (src, tgt)) + + if not id_to_pair: + return { + "orphan_graph_edges": [], + "total_graph_edges": len(all_edges), + "total_vdb_relations": 0, + } + + relation_ids = list(id_to_pair.keys()) + vdb_results = await relationships_vdb.get_by_ids(relation_ids) + + found_ids = { + relation_ids[i] for i, result in enumerate(vdb_results) if result is not None + } + + orphan_graph_edges = [ + id_to_pair[rid] for rid in relation_ids if rid not in found_ids + ] + + return { + "orphan_graph_edges": orphan_graph_edges, + "total_graph_edges": len(all_edges), + "total_vdb_relations": len(found_ids), + } + + +async def repair_graph_consistency( + chunk_entity_relation_graph, + relationships_vdb, + *, + dry_run: bool = False, +) -> dict[str, Any]: + """Detect and optionally repair graph ↔ VDB consistency issues. + + Finds edges that exist in the knowledge graph but have no corresponding entry + in the relationships VDB (orphan edges), then removes them from the graph so + that both stores are back in sync. + + This is safe to run on a live instance: it only ever *removes* edges from the + graph, and only edges that have no VDB counterpart (i.e. edges that could never + be retrieved by a query anyway). + + Args: + chunk_entity_relation_graph: Graph storage instance. + relationships_vdb: Relationships vector database storage instance. + dry_run: When ``True``, report issues without making any changes. + + Returns: + A dict with the following keys: + + - ``orphan_graph_edges``: list of ``(src, tgt)`` tuples that were found + (and removed when ``dry_run=False``). + - ``total_graph_edges``: total number of edges before any repair. + - ``total_vdb_relations``: number of edges that had a matching VDB entry. + - ``repaired``: ``True`` if orphan edges were removed (always ``False`` + when ``dry_run=True``). + """ + report = await check_graph_consistency( + chunk_entity_relation_graph, + relationships_vdb, + ) + orphans = report["orphan_graph_edges"] + repaired = False + + if orphans and not dry_run: + logger.warning( + f"Graph consistency repair: removing {len(orphans)} orphan graph edge(s)" + ) + await chunk_entity_relation_graph.remove_edges(orphans) + repaired = True + logger.info("Graph consistency repair: complete") + elif orphans: + logger.warning( + f"Graph consistency check (dry_run=True): found {len(orphans)} orphan edge(s) " + f"— rerun with dry_run=False to remove them" + ) + else: + logger.info("Graph consistency check: no orphan edges found") + + return {**report, "repaired": repaired} diff --git a/tests/test_graph_consistency.py b/tests/test_graph_consistency.py new file mode 100644 index 0000000000..404122350a --- /dev/null +++ b/tests/test_graph_consistency.py @@ -0,0 +1,262 @@ +"""Tests for graph/VDB consistency utilities and the edge-count drift fix. + +These are offline tests that use only the NetworkX in-memory graph backend +and a mock VDB — no external services required. +""" + +from __future__ import annotations + +import pytest +from unittest.mock import AsyncMock, MagicMock +from typing import Any + + +# --------------------------------------------------------------------------- +# Helpers / minimal stubs +# --------------------------------------------------------------------------- + + +def _make_graph_storage(edges: list[tuple[str, str]]) -> MagicMock: + """Return a mock BaseGraphStorage pre-populated with the given edges.""" + storage = MagicMock() + storage.get_all_edges = AsyncMock( + return_value=[{"source": s, "target": t} for s, t in edges] + ) + removed: list[tuple[str, str]] = [] + storage._removed = removed + + async def _remove_edges(pairs): + removed.extend(pairs) + + storage.remove_edges = _remove_edges + return storage + + +def _make_vdb(present_ids: set[str]) -> MagicMock: + """Return a mock BaseVectorStorage that knows about `present_ids`.""" + vdb = MagicMock() + + async def _get_by_ids(ids: list[str]) -> list[Any]: + return [{"id": i} if i in present_ids else None for i in ids] + + vdb.get_by_ids = _get_by_ids + vdb.upsert = AsyncMock() + vdb.delete = AsyncMock() + return vdb + + +# --------------------------------------------------------------------------- +# check_graph_consistency +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_check_consistency_no_edges(): + from lightrag.utils_graph import check_graph_consistency + + graph = _make_graph_storage([]) + vdb = _make_vdb(set()) + report = await check_graph_consistency(graph, vdb) + assert report["total_graph_edges"] == 0 + assert report["orphan_graph_edges"] == [] + + +@pytest.mark.asyncio +async def test_check_consistency_all_present(): + from lightrag.utils_graph import check_graph_consistency + from lightrag.utils import compute_mdhash_id + + edges = [("Alice", "Bob"), ("Bob", "Carol")] + # Build the expected VDB IDs (same normalisation the utility uses) + ids = set() + for s, t in edges: + ns, nt = sorted([s, t]) + ids.add(compute_mdhash_id(ns + nt, prefix="rel-")) + + graph = _make_graph_storage(edges) + vdb = _make_vdb(ids) + report = await check_graph_consistency(graph, vdb) + assert report["total_graph_edges"] == 2 + assert report["total_vdb_relations"] == 2 + assert report["orphan_graph_edges"] == [] + + +@pytest.mark.asyncio +async def test_check_consistency_detects_orphans(): + from lightrag.utils_graph import check_graph_consistency + from lightrag.utils import compute_mdhash_id + + edges = [("Alice", "Bob"), ("Bob", "Carol")] + # Only Alice-Bob is in the VDB + ns, nt = sorted(["Alice", "Bob"]) + present = {compute_mdhash_id(ns + nt, prefix="rel-")} + + graph = _make_graph_storage(edges) + vdb = _make_vdb(present) + report = await check_graph_consistency(graph, vdb) + assert report["total_graph_edges"] == 2 + assert report["total_vdb_relations"] == 1 + assert len(report["orphan_graph_edges"]) == 1 + orphan_src, orphan_tgt = report["orphan_graph_edges"][0] + assert {orphan_src, orphan_tgt} == {"Bob", "Carol"} + + +@pytest.mark.asyncio +async def test_check_consistency_strips_agtype_quotes(): + """PostgreSQL/AGE returns entity_id values wrapped in double-quotes.""" + from lightrag.utils_graph import check_graph_consistency + from lightrag.utils import compute_mdhash_id + + # Simulate AGE wrapping the values in extra double-quotes + graph = MagicMock() + graph.get_all_edges = AsyncMock( + return_value=[{"source": '"Alice"', "target": '"Bob"'}] + ) + + ns, nt = sorted(["Alice", "Bob"]) + present = {compute_mdhash_id(ns + nt, prefix="rel-")} + vdb = _make_vdb(present) + + report = await check_graph_consistency(graph, vdb) + assert report["orphan_graph_edges"] == [] + + +# --------------------------------------------------------------------------- +# repair_graph_consistency +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_repair_dry_run_does_not_mutate(): + from lightrag.utils_graph import repair_graph_consistency + + edges = [("Alice", "Bob")] + graph = _make_graph_storage(edges) + vdb = _make_vdb(set()) # No VDB entries → orphan + + report = await repair_graph_consistency(graph, vdb, dry_run=True) + assert len(report["orphan_graph_edges"]) == 1 + assert report["repaired"] is False + assert graph._removed == [] # Nothing was actually removed + + +@pytest.mark.asyncio +async def test_repair_removes_orphan_edges(): + from lightrag.utils_graph import repair_graph_consistency + + edges = [("Alice", "Bob"), ("Bob", "Carol")] + from lightrag.utils import compute_mdhash_id + + ns, nt = sorted(["Alice", "Bob"]) + present = {compute_mdhash_id(ns + nt, prefix="rel-")} + + graph = _make_graph_storage(edges) + vdb = _make_vdb(present) + + report = await repair_graph_consistency(graph, vdb) + assert report["repaired"] is True + assert len(report["orphan_graph_edges"]) == 1 + # The orphan edge was passed to remove_edges + assert len(graph._removed) == 1 + orphan_src, orphan_tgt = graph._removed[0] + assert {orphan_src, orphan_tgt} == {"Bob", "Carol"} + + +@pytest.mark.asyncio +async def test_repair_no_op_when_consistent(): + from lightrag.utils_graph import repair_graph_consistency + from lightrag.utils import compute_mdhash_id + + edges = [("Alice", "Bob")] + ns, nt = sorted(["Alice", "Bob"]) + present = {compute_mdhash_id(ns + nt, prefix="rel-")} + + graph = _make_graph_storage(edges) + vdb = _make_vdb(present) + + report = await repair_graph_consistency(graph, vdb) + assert report["repaired"] is False + assert graph._removed == [] + + +# --------------------------------------------------------------------------- +# VDB rollback inside _merge_entities_impl +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_merge_rolls_back_graph_edge_on_vdb_failure(): + """When a VDB upsert throws during merge, the matching graph edge must be + removed so graph and VDB stay in sync (no orphan edges).""" + from lightrag.utils_graph import _merge_entities_impl + + # --- graph storage --- + graph = MagicMock() + graph.has_node = AsyncMock(side_effect=lambda n: n in {"A", "B", "C"}) + + async def _get_node(name): + return { + "entity_id": name, + "description": name, + "entity_type": "UNKNOWN", + "source_id": "chunk1", + "file_path": "", + } + + graph.get_node = _get_node + graph.upsert_node = AsyncMock() + graph.get_node_edges = AsyncMock( + side_effect=lambda n: [("A", "C")] if n == "A" else [] + ) + + async def _get_edge(src, tgt): + return { + "description": "A relates to C", + "keywords": "test", + "source_id": "chunk1", + "file_path": "", + "weight": 1.0, + } + + graph.get_edge = _get_edge + + rolled_back: list[list[tuple[str, str]]] = [] + + async def _remove_edges(pairs): + rolled_back.append(list(pairs)) + + graph.remove_edges = _remove_edges + graph.delete_node = AsyncMock() + graph.index_done_callback = AsyncMock(return_value=True) + graph.upsert_edge = AsyncMock() + + # --- entities VDB --- + ent_vdb = MagicMock() + ent_vdb.upsert = AsyncMock() + ent_vdb.delete = AsyncMock() + ent_vdb.get_by_id = AsyncMock(return_value=None) + ent_vdb.index_done_callback = AsyncMock(return_value=True) + + # --- relationships VDB — upsert always fails --- + rel_vdb = MagicMock() + rel_vdb.delete = AsyncMock() + rel_vdb.upsert = AsyncMock(side_effect=Exception("embedder timeout")) + rel_vdb.index_done_callback = AsyncMock(return_value=True) + + # Run the merge (A → B, with C as a neighbour of A) + await _merge_entities_impl( + graph, + ent_vdb, + rel_vdb, + source_entities=["A"], + target_entity="B", + ) + + # The graph write happened (upsert_edge was called) + graph.upsert_edge.assert_awaited_once() + # The failed VDB upsert triggered a rollback via remove_edges + assert len(rolled_back) == 1, "remove_edges should have been called once" + rolled = rolled_back[0] + assert len(rolled) == 1 + src, tgt = rolled[0] + assert {src, tgt} == {"B", "C"}