-
Notifications
You must be signed in to change notification settings - Fork 4.9k
fix: prevent orphan graph edges when VDB upsert fails during merge #2941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1379,7 +1379,7 @@ async def _merge_entities_impl( | |
| } | ||
|
|
||
| # Apply relationship updates | ||
| logger.info(f"Entity Merge: updatign {len(relation_updates)} relations") | ||
| logger.info(f"Entity Merge: updating {len(relation_updates)} relations") | ||
| for rel_data in relation_updates.values(): | ||
| await chunk_entity_relation_graph.upsert_edge( | ||
| rel_data["graph_src"], rel_data["graph_tgt"], rel_data["data"] | ||
|
|
@@ -1412,6 +1412,9 @@ async def _merge_entities_impl( | |
| ) | ||
| await relationships_vdb.delete(relations_to_delete) | ||
|
|
||
| # Track edges whose VDB write fails so we can roll back the matching graph write. | ||
| # This prevents graph/VDB drift: either both stores have the edge, or neither does. | ||
| vdb_failed_edges: list[tuple[str, str]] = [] | ||
| for rel_data in relation_updates.values(): | ||
| edge_data = rel_data["data"] | ||
| normalized_src = rel_data["norm_src"] | ||
|
|
@@ -1438,12 +1441,31 @@ async def _merge_entities_impl( | |
| "file_path": edge_data.get("file_path", ""), | ||
| } | ||
| } | ||
| await relationships_vdb.upsert(relation_data_for_vdb) | ||
| logger.debug( | ||
| f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" | ||
| try: | ||
| await relationships_vdb.upsert(relation_data_for_vdb) | ||
| logger.debug( | ||
| f"Entity Merge: updating vdb `{normalized_src}`~`{normalized_tgt}`" | ||
| ) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"Entity Merge: VDB upsert failed for `{normalized_src}`~`{normalized_tgt}`: {e}. " | ||
| f"Rolling back graph edge to maintain graph/VDB consistency." | ||
| ) | ||
| vdb_failed_edges.append((rel_data["graph_src"], rel_data["graph_tgt"])) | ||
|
|
||
| # Roll back graph edges whose VDB upsert failed so the two stores stay in sync. | ||
| # Without this, source-entity deletion (step 10) leaves orphan edges in the graph. | ||
| if vdb_failed_edges: | ||
| logger.warning( | ||
| f"Entity Merge: rolling back {len(vdb_failed_edges)} graph edge(s) " | ||
| f"due to VDB upsert failures" | ||
| ) | ||
| await chunk_entity_relation_graph.remove_edges(vdb_failed_edges) | ||
|
|
||
| logger.info(f"Entity Merge: {len(relation_updates)} relations in vdb updated") | ||
| logger.info( | ||
| f"Entity Merge: {len(relation_updates) - len(vdb_failed_edges)}/{len(relation_updates)} " | ||
| f"relations in vdb updated" | ||
| ) | ||
|
|
||
| # 8. Update entity vector representation | ||
| description = merged_entity_data.get("description", "") | ||
|
|
@@ -1763,3 +1785,139 @@ async def get_relation_info( | |
| result["vector_data"] = vector_data | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| async def check_graph_consistency( | ||
| chunk_entity_relation_graph, | ||
| relationships_vdb, | ||
| ) -> dict[str, Any]: | ||
| """Check consistency between graph storage and the relationships vector database. | ||
|
|
||
| An orphan graph edge is an edge that exists in the knowledge graph but has no | ||
| corresponding entry in the relationships VDB. This can happen when a merge or | ||
| delete operation writes the graph edge successfully but the subsequent VDB upsert | ||
| fails (e.g. embedder crash, context-length exceeded, network timeout). | ||
|
|
||
| Args: | ||
| chunk_entity_relation_graph: Graph storage instance. | ||
| relationships_vdb: Relationships vector database storage instance. | ||
|
|
||
| Returns: | ||
| A dict with the following keys: | ||
|
|
||
| - ``orphan_graph_edges``: list of ``(src, tgt)`` tuples present in the graph | ||
| but missing from the VDB. | ||
| - ``total_graph_edges``: total number of edges in the graph. | ||
| - ``total_vdb_relations``: number of those graph edges that have a matching | ||
| VDB entry. | ||
| """ | ||
| all_edges = await chunk_entity_relation_graph.get_all_edges() | ||
| if not all_edges: | ||
| return { | ||
| "orphan_graph_edges": [], | ||
| "total_graph_edges": 0, | ||
| "total_vdb_relations": 0, | ||
| } | ||
|
|
||
| # Some backends (e.g. PostgreSQL/AGE) return entity_id values wrapped in | ||
| # double-quotes when casting agtype to text. Strip them for consistency. | ||
| def _clean(value: Any) -> str: | ||
| s = str(value) if value is not None else "" | ||
| if len(s) >= 2 and s[0] == '"' and s[-1] == '"': | ||
| s = s[1:-1] | ||
| return s | ||
|
|
||
| # Build a mapping: VDB relation_id → (raw_src, raw_tgt) from the graph | ||
| id_to_pair: dict[str, tuple[str, str]] = {} | ||
| for edge in all_edges: | ||
| raw_src = edge.get("source", "") | ||
| raw_tgt = edge.get("target", "") | ||
| src = _clean(raw_src) | ||
| tgt = _clean(raw_tgt) | ||
| if not src or not tgt: | ||
| continue | ||
| normalized_src, normalized_tgt = sorted([src, tgt]) | ||
| relation_id = compute_mdhash_id(normalized_src + normalized_tgt, prefix="rel-") | ||
| # Keep the original (un-normalized) direction so callers can pass it | ||
| # directly to remove_edges() if needed. | ||
| id_to_pair.setdefault(relation_id, (src, tgt)) | ||
|
|
||
| if not id_to_pair: | ||
| return { | ||
| "orphan_graph_edges": [], | ||
| "total_graph_edges": len(all_edges), | ||
| "total_vdb_relations": 0, | ||
| } | ||
|
|
||
| relation_ids = list(id_to_pair.keys()) | ||
| vdb_results = await relationships_vdb.get_by_ids(relation_ids) | ||
|
|
||
| found_ids = { | ||
| relation_ids[i] for i, result in enumerate(vdb_results) if result is not None | ||
| } | ||
|
Comment on lines
+1855
to
+1857
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
|
|
||
| orphan_graph_edges = [ | ||
| id_to_pair[rid] for rid in relation_ids if rid not in found_ids | ||
| ] | ||
|
|
||
| return { | ||
| "orphan_graph_edges": orphan_graph_edges, | ||
| "total_graph_edges": len(all_edges), | ||
| "total_vdb_relations": len(found_ids), | ||
| } | ||
|
|
||
|
|
||
| async def repair_graph_consistency( | ||
| chunk_entity_relation_graph, | ||
| relationships_vdb, | ||
| *, | ||
| dry_run: bool = False, | ||
| ) -> dict[str, Any]: | ||
| """Detect and optionally repair graph ↔ VDB consistency issues. | ||
|
|
||
| Finds edges that exist in the knowledge graph but have no corresponding entry | ||
| in the relationships VDB (orphan edges), then removes them from the graph so | ||
| that both stores are back in sync. | ||
|
|
||
| This is safe to run on a live instance: it only ever *removes* edges from the | ||
| graph, and only edges that have no VDB counterpart (i.e. edges that could never | ||
| be retrieved by a query anyway). | ||
|
|
||
| Args: | ||
| chunk_entity_relation_graph: Graph storage instance. | ||
| relationships_vdb: Relationships vector database storage instance. | ||
| dry_run: When ``True``, report issues without making any changes. | ||
|
|
||
| Returns: | ||
| A dict with the following keys: | ||
|
|
||
| - ``orphan_graph_edges``: list of ``(src, tgt)`` tuples that were found | ||
| (and removed when ``dry_run=False``). | ||
| - ``total_graph_edges``: total number of edges before any repair. | ||
| - ``total_vdb_relations``: number of edges that had a matching VDB entry. | ||
| - ``repaired``: ``True`` if orphan edges were removed (always ``False`` | ||
| when ``dry_run=True``). | ||
| """ | ||
| report = await check_graph_consistency( | ||
| chunk_entity_relation_graph, | ||
| relationships_vdb, | ||
| ) | ||
| orphans = report["orphan_graph_edges"] | ||
| repaired = False | ||
|
|
||
| if orphans and not dry_run: | ||
| logger.warning( | ||
| f"Graph consistency repair: removing {len(orphans)} orphan graph edge(s)" | ||
| ) | ||
| await chunk_entity_relation_graph.remove_edges(orphans) | ||
| repaired = True | ||
| logger.info("Graph consistency repair: complete") | ||
| elif orphans: | ||
| logger.warning( | ||
| f"Graph consistency check (dry_run=True): found {len(orphans)} orphan edge(s) " | ||
| f"— rerun with dry_run=False to remove them" | ||
| ) | ||
| else: | ||
| logger.info("Graph consistency check: no orphan edges found") | ||
|
|
||
| return {**report, "repaired": repaired} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consistency check hashes only
normalized_src + normalized_tgt, but the codebase still handles legacy reverse-order relation IDs elsewhere (e.g., deleting both permutations inutils_graph.py:783-786). If an existing relation is stored only under the reverse hash, this logic marks it orphaned and repair can remove a valid graph edge. Use both permutations (ormake_relation_vdb_ids) before classifying an edge as orphan.Useful? React with 👍 / 👎.