diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 37ad0c4c26..af8ddb4099 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2106,6 +2106,417 @@ async def aget_docs_by_ids( # Return the dictionary containing statuses only for the found document IDs return found_statuses + async def _purge_doc_chunks_and_kg( + self, + doc_id: str, + chunk_ids: set[str], + *, + pipeline_status: dict, + pipeline_status_lock: Any, + ) -> None: + """Remove a document's chunks and clean up its knowledge-graph contributions. + + Used by: + - The pipeline resume branch in ``process_document`` when a + document whose content is already extracted is re-processed + under different ``process_options``: chunks must be wiped and + entities/relations rebuilt fresh. + - Future deletion paths that want a focused "purge KG only" + operation without the LLM-cache / doc_status / full_docs + cleanup that ``adelete_by_doc_id`` also performs. + + What this method does: + 1. Reads ``full_entities`` / ``full_relations`` to identify which + graph nodes / edges this document contributed to. + 2. For each affected entity / relation, intersects the doc's + ``chunk_ids`` with the union of chunk-tracking entries + (``entity_chunks`` / ``relation_chunks``) and graph + ``source_id`` lists, then classifies it as either + *delete-outright* (no remaining sources) or *rebuild* + (still references chunks from other documents). + 3. Deletes the chunks themselves from ``chunks_vdb`` and + ``text_chunks``. + 4. For *delete-outright* entries: removes the relationship / + entity from the graph storage, vector storage, and chunk + tracking. + 5. Calls :py:meth:`_insert_done` to persist graph changes + before rebuilding (so the rebuild step sees a consistent + state). + 6. Calls :func:`rebuild_knowledge_from_chunks` to rebuild any + *rebuild* entries from their remaining chunks (so other + documents that also contributed to the same entity / + relation keep their data intact). + 7. Deletes the per-doc ``full_entities`` / ``full_relations`` + index rows so subsequent re-extraction starts fresh. + + Does NOT touch: + - ``doc_status`` / ``full_docs`` records — caller manages those. + - ``llm_response_cache`` — orthogonal to KG cleanup. + - Pipeline busy-flag — assumes the caller already holds the + pipeline (i.e. this runs inside a pipeline run). + + Idempotent: passing an empty ``chunk_ids`` returns immediately + without touching storage. + """ + if not chunk_ids: + return + + # ---- 1. Analyze affected entities/relations from full_entities/full_relations ---- + entities_to_delete: set[str] = set() + entities_to_rebuild: dict[str, list[str]] = {} + relationships_to_delete: set[tuple[str, str]] = set() + relationships_to_rebuild: dict[tuple[str, str], list[str]] = {} + entity_chunk_updates: dict[str, list[str]] = {} + relation_chunk_updates: dict[tuple[str, str], list[str]] = {} + + try: + doc_entities_data = await self.full_entities.get_by_id(doc_id) + doc_relations_data = await self.full_relations.get_by_id(doc_id) + + affected_nodes: list[dict[str, Any]] = [] + affected_edges: list[dict[str, Any]] = [] + + if doc_entities_data and "entity_names" in doc_entities_data: + entity_names = doc_entities_data["entity_names"] + nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch( + entity_names + ) + for entity_name in entity_names: + node_data = nodes_dict.get(entity_name) + if node_data: + if "id" not in node_data: + node_data["id"] = entity_name + affected_nodes.append(node_data) + + if doc_relations_data and "relation_pairs" in doc_relations_data: + relation_pairs = doc_relations_data["relation_pairs"] + edge_pairs_dicts = [ + {"src": pair[0], "tgt": pair[1]} for pair in relation_pairs + ] + edges_dict = await self.chunk_entity_relation_graph.get_edges_batch( + edge_pairs_dicts + ) + for pair in relation_pairs: + src, tgt = pair[0], pair[1] + edge_data = edges_dict.get((src, tgt)) + if edge_data: + if "source" not in edge_data: + edge_data["source"] = src + if "target" not in edge_data: + edge_data["target"] = tgt + affected_edges.append(edge_data) + except Exception as e: + logger.error( + f"[purge] Failed to analyze affected graph elements for {doc_id}: {e}" + ) + raise Exception(f"Failed to analyze graph dependencies: {e}") from e + + # ---- 2. Classify entities/relations into delete vs rebuild ---- + try: + for node_data in affected_nodes: + node_label = node_data.get("entity_id") + if not node_label: + continue + + existing_sources: list[str] = [] + graph_sources: list[str] = [] + if self.entity_chunks: + stored_chunks = await self.entity_chunks.get_by_id(node_label) + if stored_chunks and isinstance(stored_chunks, dict): + existing_sources = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + if node_data.get("source_id"): + graph_sources = [ + chunk_id + for chunk_id in node_data["source_id"].split(GRAPH_FIELD_SEP) + if chunk_id + ] + + if not existing_sources: + existing_sources = graph_sources + + if not existing_sources: + entities_to_delete.add(node_label) + entity_chunk_updates[node_label] = [] + continue + + remaining_sources = subtract_source_ids(existing_sources, chunk_ids) + graph_references_deleted_chunks = bool( + graph_sources and set(graph_sources) & chunk_ids + ) + + if not remaining_sources: + entities_to_delete.add(node_label) + entity_chunk_updates[node_label] = [] + elif ( + remaining_sources != existing_sources + or graph_references_deleted_chunks + ): + entities_to_rebuild[node_label] = remaining_sources + entity_chunk_updates[node_label] = remaining_sources + + async with pipeline_status_lock: + log_message = ( + f"[purge] {doc_id}: {len(entities_to_rebuild)} entity(ies) " + f"to rebuild, {len(entities_to_delete)} to delete" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + for edge_data in affected_edges: + src = edge_data.get("source") + tgt = edge_data.get("target") + if not src or not tgt or "source_id" not in edge_data: + continue + + edge_tuple = tuple(sorted((src, tgt))) + if ( + edge_tuple in relationships_to_delete + or edge_tuple in relationships_to_rebuild + ): + continue + + existing_sources = [] + graph_sources = [] + if self.relation_chunks: + storage_key = make_relation_chunk_key(src, tgt) + stored_chunks = await self.relation_chunks.get_by_id(storage_key) + if stored_chunks and isinstance(stored_chunks, dict): + existing_sources = [ + chunk_id + for chunk_id in stored_chunks.get("chunk_ids", []) + if chunk_id + ] + + if edge_data.get("source_id"): + graph_sources = [ + chunk_id + for chunk_id in edge_data["source_id"].split(GRAPH_FIELD_SEP) + if chunk_id + ] + + if not existing_sources: + existing_sources = graph_sources + + if not existing_sources: + relationships_to_delete.add(edge_tuple) + relation_chunk_updates[edge_tuple] = [] + continue + + remaining_sources = subtract_source_ids(existing_sources, chunk_ids) + graph_references_deleted_chunks = bool( + graph_sources and set(graph_sources) & chunk_ids + ) + + if not remaining_sources: + relationships_to_delete.add(edge_tuple) + relation_chunk_updates[edge_tuple] = [] + elif ( + remaining_sources != existing_sources + or graph_references_deleted_chunks + ): + relationships_to_rebuild[edge_tuple] = remaining_sources + relation_chunk_updates[edge_tuple] = remaining_sources + + async with pipeline_status_lock: + log_message = ( + f"[purge] {doc_id}: {len(relationships_to_rebuild)} relation(s) " + f"to rebuild, {len(relationships_to_delete)} to delete" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + # Update entity/relation chunk-tracking with the remaining sources. + current_time = int(time.time()) + if entity_chunk_updates and self.entity_chunks: + entity_upsert_payload = {} + for entity_name, remaining in entity_chunk_updates.items(): + if not remaining: + continue + entity_upsert_payload[entity_name] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + if entity_upsert_payload: + await self.entity_chunks.upsert(entity_upsert_payload) + + if relation_chunk_updates and self.relation_chunks: + relation_upsert_payload = {} + for edge_tuple, remaining in relation_chunk_updates.items(): + if not remaining: + continue + storage_key = make_relation_chunk_key(*edge_tuple) + relation_upsert_payload[storage_key] = { + "chunk_ids": remaining, + "count": len(remaining), + "updated_at": current_time, + } + if relation_upsert_payload: + await self.relation_chunks.upsert(relation_upsert_payload) + except Exception as e: + logger.error( + f"[purge] Failed to process graph analysis results for {doc_id}: {e}" + ) + raise Exception(f"Failed to process graph dependencies: {e}") from e + + # ---- 3. Delete chunks themselves ---- + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) + async with pipeline_status_lock: + log_message = ( + f"[purge] {doc_id}: deleted {len(chunk_ids)} chunk(s) from storage" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: + logger.error(f"[purge] Failed to delete chunks for {doc_id}: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e + + # ---- 4. Delete relationships with no remaining sources ---- + if relationships_to_delete: + try: + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in relationships_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + async with pipeline_status_lock: + log_message = ( + f"[purge] {doc_id}: deleted " + f"{len(relationships_to_delete)} relation(s)" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: + logger.error( + f"[purge] Failed to delete relationships for {doc_id}: {e}" + ) + raise Exception(f"Failed to delete relationships: {e}") from e + + # ---- 5. Delete entities with no remaining sources ---- + if entities_to_delete: + try: + nodes_edges_dict = ( + await self.chunk_entity_relation_graph.get_nodes_edges_batch( + list(entities_to_delete) + ) + ) + + edges_to_delete: set[tuple[str, str]] = set() + for entity, edges in nodes_edges_dict.items(): + if edges: + for src, tgt in edges: + edges_to_delete.add(tuple(sorted((src, tgt)))) + + if edges_to_delete: + rel_ids_to_delete = [] + for src, tgt in edges_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in edges_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + logger.info( + f"[purge] {doc_id}: cleaned {len(edges_to_delete)} residual " + f"edge(s) from VDB and chunk-tracking storage" + ) + + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) + + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) + + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) + + async with pipeline_status_lock: + log_message = ( + f"[purge] {doc_id}: deleted " + f"{len(entities_to_delete)} entity(ies)" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + except Exception as e: + logger.error(f"[purge] Failed to delete entities for {doc_id}: {e}") + raise Exception(f"Failed to delete entities: {e}") from e + + # ---- 6. Persist pre-rebuild changes ---- + try: + await self._insert_done() + except Exception as e: + logger.error(f"[purge] Failed to persist pre-rebuild changes: {e}") + raise Exception(f"Failed to persist pre-rebuild changes: {e}") from e + + # ---- 7. Rebuild entities/relations that still have remaining sources ---- + if entities_to_rebuild or relationships_to_rebuild: + try: + await rebuild_knowledge_from_chunks( + entities_to_rebuild=entities_to_rebuild, + relationships_to_rebuild=relationships_to_rebuild, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entities_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + text_chunks_storage=self.text_chunks, + llm_response_cache=self.llm_response_cache, + global_config=self._build_global_config(), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + entity_chunks_storage=self.entity_chunks, + relation_chunks_storage=self.relation_chunks, + ) + except Exception as e: + logger.error(f"[purge] Failed to rebuild knowledge from chunks: {e}") + raise Exception(f"Failed to rebuild knowledge graph: {e}") from e + + # ---- 8. Delete per-doc full_entities / full_relations index rows ---- + try: + await self.full_entities.delete([doc_id]) + await self.full_relations.delete([doc_id]) + except Exception as e: + logger.error( + f"[purge] Failed to delete full_entities/full_relations rows for {doc_id}: {e}" + ) + raise Exception( + f"Failed to delete from full_entities/full_relations: {e}" + ) from e + async def adelete_by_doc_id( self, doc_id: str, delete_llm_cache: bool = False ) -> DeletionResult: diff --git a/lightrag/pipeline.py b/lightrag/pipeline.py index 0a9ff39c6e..a303cdf856 100644 --- a/lightrag/pipeline.py +++ b/lightrag/pipeline.py @@ -47,6 +47,7 @@ from lightrag.extraction.interchange import parse_interchange_jsonl from lightrag.parser_routing import ( canonicalize_parser_hinted_basename, + resolve_file_parser_directives, resolve_stored_document_parser_engine, ) from lightrag.utils import ( @@ -65,6 +66,7 @@ compute_file_content_hash, compute_text_content_hash, doc_status_field, + doc_status_transition_metadata, document_canonical_key, document_source_key, get_by_path, @@ -858,9 +860,11 @@ async def _validate_and_fix_document_consistency( "file_path": resolved_file_path, "track_id": getattr(status_doc, "track_id", ""), "content_hash": getattr(status_doc, "content_hash", None), - # Clear any error messages and processing metadata + # Clear transient error / processing fields but preserve + # long-lived per-doc metadata (process_options) seeded + # at enqueue time. "error_msg": "", - "metadata": {}, + "metadata": doc_status_transition_metadata(status_doc), } # Update the status in to_process_docs as well @@ -1201,6 +1205,9 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "file_path": file_path, "track_id": status_doc.track_id, "content_hash": status_doc.content_hash, + "metadata": doc_status_transition_metadata( + status_doc + ), } } ) @@ -1272,6 +1279,9 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "file_path": file_path, "track_id": status_doc.track_id, "content_hash": status_doc.content_hash, + "metadata": doc_status_transition_metadata( + status_doc + ), } } ) @@ -1297,6 +1307,97 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: (content_data or {}).get("process_options", "") ) + # ---- Resume guard ---- + # When the pipeline picks up a non-fresh document whose + # content has already been extracted into full_docs, we + # must purge any stale chunks / entities / relations + # from a previous interrupted attempt BEFORE re-running + # chunking + entity extraction under the *current* + # process_options. Skipping this would either leave + # orphaned chunk-IDs in the vector DB or mix old and + # new chunks together, neither of which is safe. + # + # Both pipeline entry points (worker-driven and inline) + # converge here, so this is the single canonical place + # to do the purge regardless of which path got us here. + content_already_extracted = isinstance( + content_data, dict + ) and ( + ( + content_data.get("format") + == FULL_DOCS_FORMAT_LIGHTRAG + and content_data.get("lightrag_document_path") + ) + or ( + content_data.get("format") == FULL_DOCS_FORMAT_RAW + and (content_data.get("content") or "").strip() + ) + ) + stored_chunk_ids = set( + chunk_id + for chunk_id in (status_doc.chunks_list or []) + if isinstance(chunk_id, str) and chunk_id + ) + if content_already_extracted: + # Engine-mismatch warning: changing the parser engine + # after extraction is *not* honoured — the extracted + # content is the source of truth. Users wanting to + # re-extract with a new engine must delete + + # re-upload. + intended_engine, _ = resolve_file_parser_directives( + file_path + ) + stored_engine = ( + content_data.get("parsed_engine") or "" + ).lower() + if ( + intended_engine + and stored_engine + and intended_engine != stored_engine + ): + log_message = ( + f"[resume] {doc_id}: filename hint / " + f"LIGHTRAG_PARSER implies engine=" + f"{intended_engine!r} but full_docs " + f"already has parsed_engine=" + f"{stored_engine!r}; keeping the existing " + f"extraction. Delete + re-upload to " + f"switch engines." + ) + logger.warning(log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append( + log_message + ) + + if stored_chunk_ids: + log_message = ( + f"[resume] {doc_id}: purging " + f"{len(stored_chunk_ids)} chunk(s) and " + f"associated KG entries from a previous run " + f"before rebuilding under current " + f"process_options" + ) + logger.info(log_message) + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append( + log_message + ) + await self._purge_doc_chunks_and_kg( + doc_id, + stored_chunk_ids, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + # The status_doc carries chunks_list / chunks_count + # from the prior run; clear them so subsequent + # state-machine upserts don't accidentally + # re-write stale IDs. + status_doc.chunks_list = [] + status_doc.chunks_count = 0 + # Try to parse as interchange JSONL (smart extraction output) parsed_interchange = parse_interchange_jsonl( content, self.tokenizer @@ -1496,10 +1597,13 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id "content_hash": status_doc.content_hash, - "metadata": { - "processing_start_time": processing_start_time, - **extraction_meta, - }, + "metadata": doc_status_transition_metadata( + status_doc, + extra={ + "processing_start_time": processing_start_time, + **extraction_meta, + }, + ), } } ) @@ -1612,10 +1716,13 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id "content_hash": status_doc.content_hash, - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - }, + "metadata": doc_status_transition_metadata( + status_doc, + extra={ + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + }, + ), } } ) @@ -1675,11 +1782,14 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id "content_hash": status_doc.content_hash, - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - **extraction_meta, - }, + "metadata": doc_status_transition_metadata( + status_doc, + extra={ + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + **extraction_meta, + }, + ), } } ) @@ -1752,11 +1862,14 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id "content_hash": status_doc.content_hash, - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - **extraction_meta, - }, + "metadata": doc_status_transition_metadata( + status_doc, + extra={ + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + **extraction_meta, + }, + ), } } ) @@ -1787,6 +1900,9 @@ async def parse_worker(engine: str, in_q: asyncio.Queue): "file_path": file_path_w, "track_id": status_doc_w.track_id, "content_hash": status_doc_w.content_hash, + "metadata": doc_status_transition_metadata( + status_doc_w + ), } } ) @@ -1849,6 +1965,9 @@ async def parse_worker(engine: str, in_q: asyncio.Queue): ), "track_id": status_doc_w.track_id, "content_hash": status_doc_w.content_hash, + "metadata": doc_status_transition_metadata( + status_doc_w + ), } } ) @@ -1880,6 +1999,9 @@ async def analyze_worker(): "file_path": file_path_w, "track_id": status_doc_w.track_id, "content_hash": status_doc_w.content_hash, + "metadata": doc_status_transition_metadata( + status_doc_w + ), } } ) @@ -2598,12 +2720,15 @@ async def _mark_duplicate_after_parse( "track_id": status_doc.track_id, "content_hash": content_hash, "error_msg": message, - "metadata": { - "is_duplicate": True, - "duplicate_kind": "content_hash", - "original_doc_id": original_doc_id, - "original_track_id": original_track_id, - }, + "metadata": doc_status_transition_metadata( + status_doc, + extra={ + "is_duplicate": True, + "duplicate_kind": "content_hash", + "original_doc_id": original_doc_id, + "original_track_id": original_track_id, + }, + ), } } ) diff --git a/lightrag/utils_pipeline.py b/lightrag/utils_pipeline.py index 7539c2dbd8..ad7f1e9fb8 100644 --- a/lightrag/utils_pipeline.py +++ b/lightrag/utils_pipeline.py @@ -133,6 +133,54 @@ def doc_status_field(doc: Any, field: str, default: Any = "") -> Any: return getattr(doc, field, default) +# Long-lived per-document metadata fields that must survive every +# doc_status state transition. ``process_options`` records the user's +# per-file processing strategy at enqueue time and is read by analyze / +# chunk / KG-skip stages and by admin/list APIs throughout the document's +# lifetime, so we cannot let an intermediate transition (PARSING / +# ANALYZING / PROCESSING / PROCESSED / FAILED upsert) clobber it. +_DOC_STATUS_METADATA_CARRY_OVER_KEYS: tuple[str, ...] = ("process_options",) + + +def doc_status_metadata_carry_over(status_doc: Any) -> dict[str, Any]: + """Return the subset of ``status_doc.metadata`` to preserve across upserts. + + ``doc_status`` storage backends generally treat the ``metadata`` field + as an opaque blob and **replace** it on every upsert, so callers must + explicitly carry forward fields they want to keep. This helper centralises + the list of fields we always carry: today only ``process_options``, but + new long-lived metadata can be added by extending + ``_DOC_STATUS_METADATA_CARRY_OVER_KEYS``. + """ + if status_doc is None: + return {} + raw_metadata = doc_status_field(status_doc, "metadata", {}) + if not isinstance(raw_metadata, dict): + return {} + carry: dict[str, Any] = {} + for key in _DOC_STATUS_METADATA_CARRY_OVER_KEYS: + if key in raw_metadata and raw_metadata[key] not in (None, ""): + carry[key] = raw_metadata[key] + return carry + + +def doc_status_transition_metadata( + status_doc: Any, + *, + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build a doc_status ``metadata`` payload that preserves carry-over fields. + + Use at every state-transition upsert site so the user's + ``process_options`` (and any future long-lived metadata fields) survive + PENDING → PARSING → ANALYZING → PROCESSING → PROCESSED / FAILED. + """ + payload = doc_status_metadata_carry_over(status_doc) + if extra: + payload.update(extra) + return payload + + def doc_status_value(doc: Any) -> str: status = doc_status_field(doc, "status", "") if isinstance(status, DocStatus): diff --git a/tests/test_doc_status_chunk_preservation.py b/tests/test_doc_status_chunk_preservation.py index 8cfc6d9db5..8d0d72f871 100644 --- a/tests/test_doc_status_chunk_preservation.py +++ b/tests/test_doc_status_chunk_preservation.py @@ -354,9 +354,20 @@ async def fail_extract(self, chunks, pipeline_status, pipeline_status_lock): @pytest.mark.asyncio -async def test_extract_failure_before_chunking_preserves_previous_chunk_snapshot( +async def test_extract_failure_before_chunking_clears_stale_chunk_snapshot( tmp_path, ): + """The resume branch of ``apipeline_process_enqueue_documents`` purges + any stale ``chunks_list`` from a previous interrupted run *before* + chunking starts (so the new run does not mix old and new chunks). + Therefore, when chunking subsequently fails on the retry, the failed + doc_status reflects the post-purge state — the previous snapshot is + intentionally not preserved any more. + + Earlier this test asserted the opposite ("preserve previous snapshot + across failure"), which conflicted with the documented resume rule + that "已抽取文档一律删掉所有的文本块,重新走多模态分析和实体关系提取". + """ rag = await _build_rag(tmp_path, "extract_failure_pre_chunking", _failing_chunking) try: content = "chunking failure document" @@ -390,8 +401,11 @@ async def test_extract_failure_before_chunking_preserves_previous_chunk_snapshot failed_status = await rag.doc_status.get_by_id(doc_id) assert failed_status is not None assert _status_to_text(failed_status["status"]) == "failed" - assert failed_status.get("chunks_list") == previous_chunks - assert failed_status.get("chunks_count") == len(previous_chunks) + # Resume purged the stale list before chunking; the failure record + # therefore shows zero chunks rather than the previous snapshot. + assert failed_status.get("chunks_list") == [] + assert failed_status.get("chunks_count") == 0 + assert "chunking fail sentinel" in (failed_status.get("error_msg") or "") finally: await rag.finalize_storages() diff --git a/tests/test_pipeline_release_closure.py b/tests/test_pipeline_release_closure.py index 96f33a7699..b027a3e446 100644 --- a/tests/test_pipeline_release_closure.py +++ b/tests/test_pipeline_release_closure.py @@ -256,6 +256,92 @@ def test_resolve_file_parser_directives_priority(monkeypatch): assert options == "!" +@pytest.mark.offline +def test_doc_status_metadata_carry_over_helper(): + """``doc_status_transition_metadata`` preserves long-lived per-doc fields + (currently ``process_options``) and layers in any transition-specific + extras passed via ``extra=``. Empty / missing carry-over fields are + dropped, not written as null. + """ + from lightrag.utils_pipeline import doc_status_transition_metadata + + class _StubStatusDoc: + def __init__(self, metadata): + self.metadata = metadata + + # Carries process_options forward. + md = doc_status_transition_metadata(_StubStatusDoc({"process_options": "iet"})) + assert md == {"process_options": "iet"} + + # Layers in transition extras while keeping the carry-over. + md = doc_status_transition_metadata( + _StubStatusDoc({"process_options": "R!"}), + extra={"processing_start_time": 12345}, + ) + assert md == {"process_options": "R!", "processing_start_time": 12345} + + # No carry-over when metadata is missing or empty. + assert doc_status_transition_metadata(_StubStatusDoc({})) == {} + assert doc_status_transition_metadata(None) == {} + + # Empty / None process_options are not written as null. + assert doc_status_transition_metadata(_StubStatusDoc({"process_options": ""})) == {} + assert ( + doc_status_transition_metadata(_StubStatusDoc({"process_options": None})) == {} + ) + + +def _status_value_text(status): + """Helper: extract the value of a DocStatus enum or raw status string.""" + if hasattr(status, "value"): + return status.value + return str(status) + + +@pytest.mark.offline +def test_doc_status_metadata_survives_processed_transition(tmp_path): + """End-to-end: a document enqueued with process_options must keep + ``metadata.process_options`` set in ``doc_status`` after the pipeline + drives it all the way to PROCESSED. This exercises the full state + machine (PENDING → PARSING → ANALYZING → PROCESSING → PROCESSED) and + asserts the carry-over works at every transition. + """ + + async def _run(): + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + await rag.apipeline_enqueue_documents( + "Some content body for chunking.", + file_paths="metadata_carry.txt", + track_id="track-md-carry", + process_options="iet!", + ) + + doc_id = compute_mdhash_id("metadata_carry.txt", prefix="doc-") + pending_status = await rag.doc_status.get_by_id(doc_id) + assert pending_status is not None + assert (pending_status.get("metadata") or {}).get( + "process_options" + ) == "iet!" + + # Run the pipeline through to PROCESSED. + await rag.apipeline_process_enqueue_documents() + + final_status = await rag.doc_status.get_by_id(doc_id) + assert final_status is not None + assert _status_value_text(final_status.get("status")) == "processed" + metadata = final_status.get("metadata") or {} + assert metadata.get("process_options") == "iet!", ( + f"process_options dropped during state-machine transitions; " + f"final metadata: {metadata!r}" + ) + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + @pytest.mark.offline def test_apipeline_enqueue_persists_process_options(tmp_path): async def _run(): @@ -290,6 +376,571 @@ async def _run(): asyncio.run(_run()) +@pytest.mark.offline +def test_purge_doc_chunks_and_kg_is_noop_for_empty_chunks(tmp_path): + """``_purge_doc_chunks_and_kg`` with an empty chunk_ids set must be a + no-op so callers (including the resume branch) can invoke it + unconditionally without first checking for non-empty chunks_list. + """ + + async def _run(): + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) + + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) + # Empty set: must return immediately without touching storage. + await rag._purge_doc_chunks_and_kg( + "doc-empty", + set(), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + # No exceptions → success. Calling twice in a row is also fine + # since the helper is idempotent on the empty input. + await rag._purge_doc_chunks_and_kg( + "doc-empty", + set(), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_purge_doc_chunks_and_kg_clears_chunks_for_unknown_doc(tmp_path): + """When the doc has chunk_ids but no graph contributions yet + (full_entities / full_relations empty), the helper must still clear + the chunks from chunks_vdb / text_chunks without raising. This + exercises the resume path for documents whose previous run was + interrupted between chunking and entity extraction. + """ + + async def _run(): + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) + + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + # Seed text_chunks + chunks_vdb with two stale chunks. + await rag.text_chunks.upsert( + { + "doc-X-chunk-0": { + "content": "stale chunk 0", + "chunk_order_index": 0, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + "doc-X-chunk-1": { + "content": "stale chunk 1", + "chunk_order_index": 1, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + } + ) + await rag.chunks_vdb.upsert( + { + "doc-X-chunk-0": { + "content": "stale chunk 0", + "chunk_order_index": 0, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + "doc-X-chunk-1": { + "content": "stale chunk 1", + "chunk_order_index": 1, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + } + ) + await rag.text_chunks.index_done_callback() + await rag.chunks_vdb.index_done_callback() + + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) + + await rag._purge_doc_chunks_and_kg( + "doc-X", + {"doc-X-chunk-0", "doc-X-chunk-1"}, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + + # Both chunks gone from text_chunks. + remaining = await rag.text_chunks.get_by_ids( + ["doc-X-chunk-0", "doc-X-chunk-1"] + ) + assert remaining == [None, None] + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_resume_purges_old_chunks_when_content_already_extracted(tmp_path): + """When ``apipeline_process_enqueue_documents`` picks up a document + whose content is already extracted (full_docs.format=raw with content) + and whose doc_status carries a non-empty chunks_list from a previous + half-finished run, the resume branch must call + ``_purge_doc_chunks_and_kg`` with the old chunk-IDs *before* the + chunking and entity-extraction stages run. This test wraps the + helper so we can assert it is invoked exactly once with the expected + inputs, then bails out so we don't have to mock the whole VLM / + entity-extract stack. + """ + + async def _run(): + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + doc_id = compute_mdhash_id("resume.txt", prefix="doc-") + + # Seed full_docs as if extraction already completed. + await rag.full_docs.upsert( + { + doc_id: { + "content": "previously extracted body", + "file_path": "resume.txt", + "canonical_basename": "resume.txt", + "format": "raw", + "parsed_engine": "legacy", + "content_hash": "deadbeef", + } + } + ) + # Seed doc_status as PROCESSING with chunks_list from a prior + # half-finished run so the resume branch has something to purge. + stale_chunks = [f"{doc_id}-chunk-{i:03d}" for i in range(2)] + await rag.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSING, + "content_summary": "previously extracted body", + "content_length": len("previously extracted body"), + "created_at": "2026-01-01T00:00:00+00:00", + "updated_at": "2026-01-01T00:00:01+00:00", + "file_path": "resume.txt", + "canonical_basename": "resume.txt", + "track_id": "track-resume", + "content_hash": "deadbeef", + "chunks_list": stale_chunks, + "chunks_count": len(stale_chunks), + } + } + ) + + # Wrap the helper to record invocations, and raise after the call + # so the test exits cleanly without exercising downstream stages. + calls: list[tuple[str, set[str]]] = [] + original = rag._purge_doc_chunks_and_kg + + class _ResumePurged(Exception): + pass + + async def _wrapped(doc_id_arg, chunk_ids_arg, **kwargs): + calls.append((doc_id_arg, set(chunk_ids_arg))) + # Run the real helper so the side-effects (chunks gone from + # storage) are observable, then short-circuit. + await original(doc_id_arg, chunk_ids_arg, **kwargs) + raise _ResumePurged() + + rag._purge_doc_chunks_and_kg = _wrapped # type: ignore[method-assign] + + # Pipeline will pick up the PROCESSING document, hit the resume + # branch, call our wrapped purge, and our wrapper raises. + await rag.apipeline_process_enqueue_documents() + + # Helper was invoked exactly once with the stale chunk-IDs. + assert len(calls) == 1 + invoked_doc_id, invoked_chunks = calls[0] + assert invoked_doc_id == doc_id + assert invoked_chunks == set(stale_chunks) + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_resume_skips_purge_when_chunks_list_empty(tmp_path): + """If the doc was extracted but never chunked (chunks_list empty), + the resume branch must NOT call the purge helper — there's nothing + to clean up. + """ + + async def _run(): + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + doc_id = compute_mdhash_id("noskip.txt", prefix="doc-") + + await rag.full_docs.upsert( + { + doc_id: { + "content": "fresh body", + "file_path": "noskip.txt", + "canonical_basename": "noskip.txt", + "format": "raw", + "parsed_engine": "legacy", + "content_hash": "fresh", + } + } + ) + await rag.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PARSING, + "content_summary": "fresh body", + "content_length": len("fresh body"), + "created_at": "2026-01-01T00:00:00+00:00", + "updated_at": "2026-01-01T00:00:01+00:00", + "file_path": "noskip.txt", + "canonical_basename": "noskip.txt", + "track_id": "track-noskip", + "content_hash": "fresh", + "chunks_list": [], + "chunks_count": 0, + } + } + ) + + calls: list[tuple[str, set[str]]] = [] + + async def _spy(doc_id_arg, chunk_ids_arg, **kwargs): + calls.append((doc_id_arg, set(chunk_ids_arg))) + # Don't actually purge; just record the call and let the + # pipeline continue past this test boundary. + raise RuntimeError("test stop after purge check") + + rag._purge_doc_chunks_and_kg = _spy # type: ignore[method-assign] + + try: + await rag.apipeline_process_enqueue_documents() + except Exception: + # Whether the pipeline reaches our spy or fails downstream + # doesn't matter for this test; we only care that the spy + # was NOT called for an empty chunks_list. + pass + + assert ( + calls == [] + ), "purge helper should not be called when chunks_list is empty" + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_apipeline_enqueue_allows_concurrent_with_busy(tmp_path): + """``busy=True`` no longer blocks enqueue. Concurrent processing is + explicitly permitted — the running loop's request_pending mechanism + picks up newly-enqueued docs after the current batch. Enqueue + nudges request_pending so a freshly-arrived doc is never stranded + when the call site does not subsequently invoke + ``apipeline_process_enqueue_documents``. + """ + + async def _run(): + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) + + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) + # Empty set: must return immediately without touching storage. + await rag._purge_doc_chunks_and_kg( + "doc-empty", + set(), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + # No exceptions → success. Calling twice in a row is also fine + # since the helper is idempotent on the empty input. + await rag._purge_doc_chunks_and_kg( + "doc-empty", + set(), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_purge_doc_chunks_and_kg_clears_chunks_for_unknown_doc(tmp_path): + """When the doc has chunk_ids but no graph contributions yet + (full_entities / full_relations empty), the helper must still clear + the chunks from chunks_vdb / text_chunks without raising. This + exercises the resume path for documents whose previous run was + interrupted between chunking and entity extraction. + """ + + async def _run(): + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_namespace_lock, + ) + + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + # Seed text_chunks + chunks_vdb with two stale chunks. + await rag.text_chunks.upsert( + { + "doc-X-chunk-0": { + "content": "stale chunk 0", + "chunk_order_index": 0, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + "doc-X-chunk-1": { + "content": "stale chunk 1", + "chunk_order_index": 1, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + } + ) + await rag.chunks_vdb.upsert( + { + "doc-X-chunk-0": { + "content": "stale chunk 0", + "chunk_order_index": 0, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + "doc-X-chunk-1": { + "content": "stale chunk 1", + "chunk_order_index": 1, + "full_doc_id": "doc-X", + "tokens": 4, + "file_path": "x.txt", + }, + } + ) + await rag.text_chunks.index_done_callback() + await rag.chunks_vdb.index_done_callback() + + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) + + await rag._purge_doc_chunks_and_kg( + "doc-X", + {"doc-X-chunk-0", "doc-X-chunk-1"}, + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + ) + + # Both chunks gone from text_chunks. + remaining = await rag.text_chunks.get_by_ids( + ["doc-X-chunk-0", "doc-X-chunk-1"] + ) + assert remaining == [None, None] + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_resume_purges_old_chunks_when_content_already_extracted(tmp_path): + """When ``apipeline_process_enqueue_documents`` picks up a document + whose content is already extracted (full_docs.format=raw with content) + and whose doc_status carries a non-empty chunks_list from a previous + half-finished run, the resume branch must call + ``_purge_doc_chunks_and_kg`` with the old chunk-IDs *before* the + chunking and entity-extraction stages run. This test wraps the + helper so we can assert it is invoked exactly once with the expected + inputs, then bails out so we don't have to mock the whole VLM / + entity-extract stack. + """ + + async def _run(): + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + doc_id = compute_mdhash_id("resume.txt", prefix="doc-") + + # Seed full_docs as if extraction already completed. + await rag.full_docs.upsert( + { + doc_id: { + "content": "previously extracted body", + "file_path": "resume.txt", + "canonical_basename": "resume.txt", + "format": "raw", + "parsed_engine": "legacy", + "content_hash": "deadbeef", + } + } + ) + # Seed doc_status as PROCESSING with chunks_list from a prior + # half-finished run so the resume branch has something to purge. + stale_chunks = [f"{doc_id}-chunk-{i:03d}" for i in range(2)] + await rag.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSING, + "content_summary": "previously extracted body", + "content_length": len("previously extracted body"), + "created_at": "2026-01-01T00:00:00+00:00", + "updated_at": "2026-01-01T00:00:01+00:00", + "file_path": "resume.txt", + "canonical_basename": "resume.txt", + "track_id": "track-resume", + "content_hash": "deadbeef", + "chunks_list": stale_chunks, + "chunks_count": len(stale_chunks), + } + } + ) + + # Wrap the helper to record invocations, and raise after the call + # so the test exits cleanly without exercising downstream stages. + calls: list[tuple[str, set[str]]] = [] + original = rag._purge_doc_chunks_and_kg + + class _ResumePurged(Exception): + pass + + async def _wrapped(doc_id_arg, chunk_ids_arg, **kwargs): + calls.append((doc_id_arg, set(chunk_ids_arg))) + # Run the real helper so the side-effects (chunks gone from + # storage) are observable, then short-circuit. + await original(doc_id_arg, chunk_ids_arg, **kwargs) + raise _ResumePurged() + + rag._purge_doc_chunks_and_kg = _wrapped # type: ignore[method-assign] + + # Pipeline will pick up the PROCESSING document, hit the resume + # branch, call our wrapped purge, and our wrapper raises. + await rag.apipeline_process_enqueue_documents() + + # Helper was invoked exactly once with the stale chunk-IDs. + assert len(calls) == 1 + invoked_doc_id, invoked_chunks = calls[0] + assert invoked_doc_id == doc_id + assert invoked_chunks == set(stale_chunks) + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_resume_skips_purge_when_chunks_list_empty(tmp_path): + """If the doc was extracted but never chunked (chunks_list empty), + the resume branch must NOT call the purge helper — there's nothing + to clean up. + """ + + async def _run(): + rag = _new_rag(tmp_path) + await rag.initialize_storages() + try: + doc_id = compute_mdhash_id("noskip.txt", prefix="doc-") + + await rag.full_docs.upsert( + { + doc_id: { + "content": "fresh body", + "file_path": "noskip.txt", + "canonical_basename": "noskip.txt", + "format": "raw", + "parsed_engine": "legacy", + "content_hash": "fresh", + } + } + ) + await rag.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PARSING, + "content_summary": "fresh body", + "content_length": len("fresh body"), + "created_at": "2026-01-01T00:00:00+00:00", + "updated_at": "2026-01-01T00:00:01+00:00", + "file_path": "noskip.txt", + "canonical_basename": "noskip.txt", + "track_id": "track-noskip", + "content_hash": "fresh", + "chunks_list": [], + "chunks_count": 0, + } + } + ) + + calls: list[tuple[str, set[str]]] = [] + + async def _spy(doc_id_arg, chunk_ids_arg, **kwargs): + calls.append((doc_id_arg, set(chunk_ids_arg))) + # Don't actually purge; just record the call and let the + # pipeline continue past this test boundary. + raise RuntimeError("test stop after purge check") + + rag._purge_doc_chunks_and_kg = _spy # type: ignore[method-assign] + + try: + await rag.apipeline_process_enqueue_documents() + except Exception: + # Whether the pipeline reaches our spy or fails downstream + # doesn't matter for this test; we only care that the spy + # was NOT called for an empty chunks_list. + pass + + assert ( + calls == [] + ), "purge helper should not be called when chunks_list is empty" + finally: + await rag.finalize_storages() + + asyncio.run(_run()) + + @pytest.mark.offline def test_apipeline_enqueue_allows_concurrent_with_busy(tmp_path): """``busy=True`` no longer blocks enqueue. Concurrent processing is