diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 242fe57240..34c3314280 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -432,7 +432,16 @@ async def validation_exception_handler( ) else: # For other endpoints, return the default FastAPI validation error - return JSONResponse(status_code=422, content={"detail": exc.errors()}) + # Pydantic model_validator errors may contain unserializable objects + # in ctx['error'], so convert them to strings before JSON encoding + errors = [] + for error in exc.errors(): + error_dict = dict(error) + if "ctx" in error_dict and "error" in error_dict["ctx"]: + error_dict["ctx"] = dict(error_dict["ctx"]) + error_dict["ctx"]["error"] = str(error_dict["ctx"]["error"]) + errors.append(error_dict) + return JSONResponse(status_code=422, content={"detail": errors}) def get_cors_origins(): """Get allowed origins from global_args diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 9e6fab9dbb..70b448d33c 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -19,9 +19,10 @@ Depends, File, HTTPException, + Query, UploadFile, ) -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator from lightrag import LightRAG from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus @@ -240,6 +241,11 @@ class InsertTextRequest(BaseModel): file_source: Optional[str] = Field( default=None, min_length=0, description="File Source" ) + doc_id: Optional[str] = Field( + default=None, + min_length=1, + description="Optional custom document ID. If not provided, an MD5 hash-based ID will be auto-generated from the content.", + ) @field_validator("text", mode="after") @classmethod @@ -251,11 +257,19 @@ def strip_text_after(cls, text: str) -> str: def normalize_source_before(cls, file_source: Optional[str]) -> str: return normalize_file_path(file_source) + @field_validator("doc_id", mode="after") + @classmethod + def strip_doc_id_after(cls, doc_id: Optional[str]) -> Optional[str]: + if doc_id is None: + return None + return doc_id.strip() + model_config = ConfigDict( json_schema_extra={ "example": { "text": "This is a sample text to be inserted into the RAG system.", "file_source": "Source of the text (optional)", + "doc_id": "my-custom-doc-id", } } ) @@ -276,6 +290,10 @@ class InsertTextsRequest(BaseModel): file_sources: Optional[list[str]] = Field( default=None, min_length=0, description="Sources of the texts" ) + doc_ids: Optional[list[str]] = Field( + default=None, + description="Optional custom document IDs, one per text. If provided, length must match texts. If not provided, MD5 hash-based IDs will be auto-generated from content.", + ) @field_validator("texts", mode="after") @classmethod @@ -292,6 +310,30 @@ def normalize_sources_before( return [normalize_file_path(file_source) for file_source in file_sources] + @field_validator("doc_ids", mode="after") + @classmethod + def validate_doc_ids(cls, doc_ids: Optional[list[str]]) -> Optional[list[str]]: + if doc_ids is None: + return None + if not doc_ids: + raise ValueError("doc_ids list cannot be empty") + validated = [] + for did in doc_ids: + if not did or not did.strip(): + raise ValueError("Document IDs must not be empty or whitespace-only") + validated.append(did.strip()) + if len(validated) != len(set(validated)): + raise ValueError("Document IDs must be unique within the request") + return validated + + @model_validator(mode="after") + def check_doc_ids_length(self): + if self.doc_ids is not None and len(self.doc_ids) != len(self.texts): + raise ValueError( + f"doc_ids length ({len(self.doc_ids)}) must match texts length ({len(self.texts)})" + ) + return self + model_config = ConfigDict( json_schema_extra={ "example": { @@ -302,6 +344,10 @@ def normalize_sources_before( "file_sources": [ "First file source (optional)", ], + "doc_ids": [ + "my-doc-1", + "my-doc-2", + ], } } ) @@ -1229,7 +1275,7 @@ def escape_sheet_title(title: str) -> str: async def pipeline_enqueue_file( - rag: LightRAG, file_path: Path, track_id: str = None + rag: LightRAG, file_path: Path, track_id: str = None, doc_id: str | None = None ) -> tuple[bool, str]: """Add a file to the queue for processing @@ -1602,7 +1648,10 @@ async def pipeline_enqueue_file( try: await rag.apipeline_enqueue_documents( - content, file_paths=file_path.name, track_id=track_id + content, + ids=[doc_id] if doc_id else None, + file_paths=file_path.name, + track_id=track_id, ) logger.info( @@ -1686,17 +1735,20 @@ async def pipeline_enqueue_file( logger.error(f"Error deleting file {file_path}: {str(e)}") -async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None): +async def pipeline_index_file( + rag: LightRAG, file_path: Path, track_id: str = None, doc_id: str | None = None +): """Index a file with track_id Args: rag: LightRAG instance file_path: Path to the saved file track_id: Optional tracking ID + doc_id: Optional custom document ID """ try: success, returned_track_id = await pipeline_enqueue_file( - rag, file_path, track_id + rag, file_path, track_id, doc_id=doc_id ) if success: await rag.apipeline_process_enqueue_documents() @@ -1745,6 +1797,7 @@ async def pipeline_index_texts( texts: List[str], file_sources: List[str] = None, track_id: str = None, + doc_ids: List[str] | None = None, ): """Index a list of texts with track_id @@ -1753,6 +1806,7 @@ async def pipeline_index_texts( texts: The texts to index file_sources: Sources of the texts track_id: Optional tracking ID + doc_ids: Optional custom document IDs, one per text """ if not texts: return @@ -1770,7 +1824,10 @@ async def pipeline_index_texts( ) await rag.apipeline_enqueue_documents( - input=texts, file_paths=normalized_file_sources, track_id=track_id + input=texts, + ids=doc_ids, + file_paths=normalized_file_sources, + track_id=track_id, ) await rag.apipeline_process_enqueue_documents() @@ -2119,7 +2176,9 @@ async def scan_for_new_documents(background_tasks: BackgroundTasks): "/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)] ) async def upload_to_input_dir( - background_tasks: BackgroundTasks, file: UploadFile = File(...) + background_tasks: BackgroundTasks, + file: UploadFile = File(...), + doc_id: Optional[str] = Query(default=None, description="Optional custom document ID"), ): """ Upload a file to the input directory and index it. @@ -2173,6 +2232,9 @@ async def upload_to_input_dir( HTTPException: If the file type is not supported (400), file too large (413), or other errors occur (500). """ try: + # Normalize doc_id early + doc_id = doc_id.strip() if doc_id else None + # Sanitize filename to prevent Path Traversal attacks safe_filename = sanitize_filename(file.filename, doc_manager.input_dir) @@ -2216,6 +2278,18 @@ async def upload_to_input_dir( track_id=existing_track_id, ) + # Check if custom doc_id already exists in doc_status storage + if doc_id: + existing_doc = await rag.doc_status.get_by_id(doc_id) + if existing_doc: + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Document ID '{doc_id}' already exists in document storage (Status: {status}).", + track_id=existing_track_id, + ) + file_path = doc_manager.input_dir / safe_filename # Check if file already exists in file system if file_path.exists(): @@ -2267,7 +2341,9 @@ async def upload_to_input_dir( track_id = generate_track_id("upload") # Add to background tasks and get track_id - background_tasks.add_task(pipeline_index_file, rag, file_path, track_id) + background_tasks.add_task( + pipeline_index_file, rag, file_path, track_id, doc_id=doc_id + ) return InsertResponse( status="success", @@ -2326,9 +2402,12 @@ async def insert_text( track_id=existing_track_id, ) - # Check if content already exists by computing content hash (doc_id) - sanitized_text = sanitize_text_for_encoding(request.text) - content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") + # Check if content already exists + if request.doc_id: + content_doc_id = request.doc_id.strip() + else: + sanitized_text = sanitize_text_for_encoding(request.text) + content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") existing_doc = await rag.doc_status.get_by_id(content_doc_id) if existing_doc: # Content already exists, return duplicated with existing track_id @@ -2336,7 +2415,7 @@ async def insert_text( existing_track_id = existing_doc.get("track_id") or "" return InsertResponse( status="duplicated", - message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", + message=f"Document ID '{content_doc_id}' already exists in document storage (Status: {status}).", track_id=existing_track_id, ) @@ -2349,6 +2428,7 @@ async def insert_text( [request.text], file_sources=[request.file_source], track_id=track_id, + doc_ids=[request.doc_id] if request.doc_id else None, ) return InsertResponse( @@ -2408,20 +2488,31 @@ async def insert_texts( track_id=existing_track_id, ) - # Check if any content already exists by computing content hash (doc_id) - for text in request.texts: - sanitized_text = sanitize_text_for_encoding(text) - content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") - existing_doc = await rag.doc_status.get_by_id(content_doc_id) - if existing_doc: - # Content already exists, return duplicated with existing track_id - status = existing_doc.get("status", "unknown") - existing_track_id = existing_doc.get("track_id") or "" - return InsertResponse( - status="duplicated", - message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", - track_id=existing_track_id, - ) + # Check if any content already exists + if request.doc_ids: + existing_docs = await rag.doc_status.get_by_ids(request.doc_ids) + for doc_id, existing_doc in zip(request.doc_ids, existing_docs): + if existing_doc is not None: + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Document ID '{doc_id}' already exists in document storage (Status: {status}).", + track_id=existing_track_id, + ) + else: + for text in request.texts: + sanitized_text = sanitize_text_for_encoding(text) + content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-") + existing_doc = await rag.doc_status.get_by_id(content_doc_id) + if existing_doc: + status = existing_doc.get("status", "unknown") + existing_track_id = existing_doc.get("track_id") or "" + return InsertResponse( + status="duplicated", + message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).", + track_id=existing_track_id, + ) # Generate track_id for texts insertion track_id = generate_track_id("insert") @@ -2432,6 +2523,7 @@ async def insert_texts( request.texts, file_sources=request.file_sources, track_id=track_id, + doc_ids=request.doc_ids, ) return InsertResponse( diff --git a/lightrag/api/routers/query_routes.py b/lightrag/api/routers/query_routes.py index 22958158a1..aea307b579 100644 --- a/lightrag/api/routers/query_routes.py +++ b/lightrag/api/routers/query_routes.py @@ -148,6 +148,10 @@ class ReferenceItem(BaseModel): reference_id: str = Field(description="Unique reference identifier") file_path: str = Field(description="Path to the source file") + doc_id: Optional[str] = Field( + default=None, + description="Document ID associated with this reference", + ) content: Optional[List[str]] = Field( default=None, description="List of chunk contents from this file (only present when include_chunk_content=True)", diff --git a/lightrag/operate.py b/lightrag/operate.py index 8ca3f5d303..0a75139032 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -3553,6 +3553,7 @@ async def _get_vector_context( if "content" in result: chunk_with_metadata = { "content": result["content"], + "full_doc_id": result.get("full_doc_id", ""), "created_at": result.get("created_at", None), "file_path": result.get("file_path", "unknown_source"), "source_type": "vector", # Mark the source type @@ -4013,6 +4014,7 @@ async def _merge_all_chunks( merged_chunks.append( { "content": chunk["content"], + "full_doc_id": chunk.get("full_doc_id", ""), "file_path": chunk.get("file_path", "unknown_source"), "chunk_id": chunk_id, } @@ -4027,6 +4029,7 @@ async def _merge_all_chunks( merged_chunks.append( { "content": chunk["content"], + "full_doc_id": chunk.get("full_doc_id", ""), "file_path": chunk.get("file_path", "unknown_source"), "chunk_id": chunk_id, } @@ -4041,6 +4044,7 @@ async def _merge_all_chunks( merged_chunks.append( { "content": chunk["content"], + "full_doc_id": chunk.get("full_doc_id", ""), "file_path": chunk.get("file_path", "unknown_source"), "chunk_id": chunk_id, } diff --git a/lightrag/utils.py b/lightrag/utils.py index 68421961ef..ab5e755f35 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -3309,6 +3309,7 @@ def convert_to_user_format( "content": chunk.get("content", ""), "file_path": chunk.get("file_path", "unknown_source"), "chunk_id": chunk.get("chunk_id", ""), + "doc_id": chunk.get("full_doc_id", ""), } formatted_chunks.append(chunk_data) @@ -3361,10 +3362,13 @@ def generate_reference_list_from_chunks( # 1. Extract all valid file_paths and count their occurrences file_path_counts = {} + file_path_to_doc_id = {} for chunk in chunks: file_path = chunk.get("file_path", "") if file_path and file_path != "unknown_source": file_path_counts[file_path] = file_path_counts.get(file_path, 0) + 1 + if not file_path_to_doc_id.get(file_path): + file_path_to_doc_id[file_path] = chunk.get("full_doc_id", "") # 2. Sort file paths by frequency (descending), then by first appearance order # Create a list of (file_path, count, first_index) tuples @@ -3399,6 +3403,12 @@ def generate_reference_list_from_chunks( # 5. Build reference_list reference_list = [] for i, file_path in enumerate(unique_file_paths): - reference_list.append({"reference_id": str(i + 1), "file_path": file_path}) + reference_list.append( + { + "reference_id": str(i + 1), + "file_path": file_path, + "doc_id": file_path_to_doc_id.get(file_path, ""), + } + ) return reference_list, updated_chunks