Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lightrag/api/lightrag_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,16 @@ async def validation_exception_handler(
)
else:
# For other endpoints, return the default FastAPI validation error
return JSONResponse(status_code=422, content={"detail": exc.errors()})
# Pydantic model_validator errors may contain unserializable objects
# in ctx['error'], so convert them to strings before JSON encoding
errors = []
for error in exc.errors():
error_dict = dict(error)
if "ctx" in error_dict and "error" in error_dict["ctx"]:
error_dict["ctx"] = dict(error_dict["ctx"])
error_dict["ctx"]["error"] = str(error_dict["ctx"]["error"])
errors.append(error_dict)
return JSONResponse(status_code=422, content={"detail": errors})

def get_cors_origins():
"""Get allowed origins from global_args
Expand Down
144 changes: 118 additions & 26 deletions lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
Depends,
File,
HTTPException,
Query,
UploadFile,
)
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from lightrag import LightRAG
from lightrag.base import DeletionResult, DocProcessingStatus, DocStatus
Expand Down Expand Up @@ -240,6 +241,11 @@ class InsertTextRequest(BaseModel):
file_source: Optional[str] = Field(
default=None, min_length=0, description="File Source"
)
doc_id: Optional[str] = Field(
default=None,
min_length=1,
description="Optional custom document ID. If not provided, an MD5 hash-based ID will be auto-generated from the content.",
)

@field_validator("text", mode="after")
@classmethod
Expand All @@ -251,11 +257,19 @@ def strip_text_after(cls, text: str) -> str:
def normalize_source_before(cls, file_source: Optional[str]) -> str:
return normalize_file_path(file_source)

@field_validator("doc_id", mode="after")
@classmethod
def strip_doc_id_after(cls, doc_id: Optional[str]) -> Optional[str]:
if doc_id is None:
return None
return doc_id.strip()

model_config = ConfigDict(
json_schema_extra={
"example": {
"text": "This is a sample text to be inserted into the RAG system.",
"file_source": "Source of the text (optional)",
"doc_id": "my-custom-doc-id",
}
}
)
Expand All @@ -276,6 +290,10 @@ class InsertTextsRequest(BaseModel):
file_sources: Optional[list[str]] = Field(
default=None, min_length=0, description="Sources of the texts"
)
doc_ids: Optional[list[str]] = Field(
default=None,
description="Optional custom document IDs, one per text. If provided, length must match texts. If not provided, MD5 hash-based IDs will be auto-generated from content.",
)

@field_validator("texts", mode="after")
@classmethod
Expand All @@ -292,6 +310,30 @@ def normalize_sources_before(

return [normalize_file_path(file_source) for file_source in file_sources]

@field_validator("doc_ids", mode="after")
@classmethod
def validate_doc_ids(cls, doc_ids: Optional[list[str]]) -> Optional[list[str]]:
if doc_ids is None:
return None
if not doc_ids:
raise ValueError("doc_ids list cannot be empty")
validated = []
for did in doc_ids:
if not did or not did.strip():
raise ValueError("Document IDs must not be empty or whitespace-only")
validated.append(did.strip())
if len(validated) != len(set(validated)):
raise ValueError("Document IDs must be unique within the request")
return validated

@model_validator(mode="after")
def check_doc_ids_length(self):
if self.doc_ids is not None and len(self.doc_ids) != len(self.texts):
raise ValueError(
f"doc_ids length ({len(self.doc_ids)}) must match texts length ({len(self.texts)})"
)
return self

model_config = ConfigDict(
json_schema_extra={
"example": {
Expand All @@ -302,6 +344,10 @@ def normalize_sources_before(
"file_sources": [
"First file source (optional)",
],
"doc_ids": [
"my-doc-1",
"my-doc-2",
],
}
}
)
Expand Down Expand Up @@ -1229,7 +1275,7 @@ def escape_sheet_title(title: str) -> str:


async def pipeline_enqueue_file(
rag: LightRAG, file_path: Path, track_id: str = None
rag: LightRAG, file_path: Path, track_id: str = None, doc_id: str | None = None
) -> tuple[bool, str]:
"""Add a file to the queue for processing

Expand Down Expand Up @@ -1602,7 +1648,10 @@ async def pipeline_enqueue_file(

try:
await rag.apipeline_enqueue_documents(
content, file_paths=file_path.name, track_id=track_id
content,
ids=[doc_id] if doc_id else None,
file_paths=file_path.name,
track_id=track_id,
)

logger.info(
Expand Down Expand Up @@ -1686,17 +1735,20 @@ async def pipeline_enqueue_file(
logger.error(f"Error deleting file {file_path}: {str(e)}")


async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None):
async def pipeline_index_file(
rag: LightRAG, file_path: Path, track_id: str = None, doc_id: str | None = None
):
"""Index a file with track_id

Args:
rag: LightRAG instance
file_path: Path to the saved file
track_id: Optional tracking ID
doc_id: Optional custom document ID
"""
try:
success, returned_track_id = await pipeline_enqueue_file(
rag, file_path, track_id
rag, file_path, track_id, doc_id=doc_id
)
if success:
await rag.apipeline_process_enqueue_documents()
Expand Down Expand Up @@ -1745,6 +1797,7 @@ async def pipeline_index_texts(
texts: List[str],
file_sources: List[str] = None,
track_id: str = None,
doc_ids: List[str] | None = None,
):
"""Index a list of texts with track_id

Expand All @@ -1753,6 +1806,7 @@ async def pipeline_index_texts(
texts: The texts to index
file_sources: Sources of the texts
track_id: Optional tracking ID
doc_ids: Optional custom document IDs, one per text
"""
if not texts:
return
Expand All @@ -1770,7 +1824,10 @@ async def pipeline_index_texts(
)

await rag.apipeline_enqueue_documents(
input=texts, file_paths=normalized_file_sources, track_id=track_id
input=texts,
ids=doc_ids,
file_paths=normalized_file_sources,
track_id=track_id,
)
await rag.apipeline_process_enqueue_documents()

Expand Down Expand Up @@ -2119,7 +2176,9 @@ async def scan_for_new_documents(background_tasks: BackgroundTasks):
"/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
)
async def upload_to_input_dir(
background_tasks: BackgroundTasks, file: UploadFile = File(...)
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
doc_id: Optional[str] = Query(default=None, description="Optional custom document ID"),
):
"""
Upload a file to the input directory and index it.
Expand Down Expand Up @@ -2173,6 +2232,9 @@ async def upload_to_input_dir(
HTTPException: If the file type is not supported (400), file too large (413), or other errors occur (500).
"""
try:
# Normalize doc_id early
doc_id = doc_id.strip() if doc_id else None

# Sanitize filename to prevent Path Traversal attacks
safe_filename = sanitize_filename(file.filename, doc_manager.input_dir)

Expand Down Expand Up @@ -2216,6 +2278,18 @@ async def upload_to_input_dir(
track_id=existing_track_id,
)

# Check if custom doc_id already exists in doc_status storage
if doc_id:
existing_doc = await rag.doc_status.get_by_id(doc_id)
if existing_doc:
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Document ID '{doc_id}' already exists in document storage (Status: {status}).",
track_id=existing_track_id,
)

file_path = doc_manager.input_dir / safe_filename
# Check if file already exists in file system
if file_path.exists():
Expand Down Expand Up @@ -2267,7 +2341,9 @@ async def upload_to_input_dir(
track_id = generate_track_id("upload")

# Add to background tasks and get track_id
background_tasks.add_task(pipeline_index_file, rag, file_path, track_id)
background_tasks.add_task(
pipeline_index_file, rag, file_path, track_id, doc_id=doc_id
)

return InsertResponse(
status="success",
Expand Down Expand Up @@ -2326,17 +2402,20 @@ async def insert_text(
track_id=existing_track_id,
)

# Check if content already exists by computing content hash (doc_id)
sanitized_text = sanitize_text_for_encoding(request.text)
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
# Check if content already exists
if request.doc_id:
content_doc_id = request.doc_id.strip()
else:
sanitized_text = sanitize_text_for_encoding(request.text)
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
if existing_doc:
# Content already exists, return duplicated with existing track_id
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
message=f"Document ID '{content_doc_id}' already exists in document storage (Status: {status}).",
track_id=existing_track_id,
)

Expand All @@ -2349,6 +2428,7 @@ async def insert_text(
[request.text],
file_sources=[request.file_source],
track_id=track_id,
doc_ids=[request.doc_id] if request.doc_id else None,
)

return InsertResponse(
Expand Down Expand Up @@ -2408,20 +2488,31 @@ async def insert_texts(
track_id=existing_track_id,
)

# Check if any content already exists by computing content hash (doc_id)
for text in request.texts:
sanitized_text = sanitize_text_for_encoding(text)
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
if existing_doc:
# Content already exists, return duplicated with existing track_id
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
track_id=existing_track_id,
)
# Check if any content already exists
if request.doc_ids:
existing_docs = await rag.doc_status.get_by_ids(request.doc_ids)
for doc_id, existing_doc in zip(request.doc_ids, existing_docs):
if existing_doc is not None:
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Document ID '{doc_id}' already exists in document storage (Status: {status}).",
track_id=existing_track_id,
)
else:
for text in request.texts:
sanitized_text = sanitize_text_for_encoding(text)
content_doc_id = compute_mdhash_id(sanitized_text, prefix="doc-")
existing_doc = await rag.doc_status.get_by_id(content_doc_id)
if existing_doc:
status = existing_doc.get("status", "unknown")
existing_track_id = existing_doc.get("track_id") or ""
return InsertResponse(
status="duplicated",
message=f"Identical content already exists in document storage (doc_id: {content_doc_id}, Status: {status}).",
track_id=existing_track_id,
)

# Generate track_id for texts insertion
track_id = generate_track_id("insert")
Expand All @@ -2432,6 +2523,7 @@ async def insert_texts(
request.texts,
file_sources=request.file_sources,
track_id=track_id,
doc_ids=request.doc_ids,
)

return InsertResponse(
Expand Down
4 changes: 4 additions & 0 deletions lightrag/api/routers/query_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class ReferenceItem(BaseModel):

reference_id: str = Field(description="Unique reference identifier")
file_path: str = Field(description="Path to the source file")
doc_id: Optional[str] = Field(
default=None,
description="Document ID associated with this reference",
)
content: Optional[List[str]] = Field(
default=None,
description="List of chunk contents from this file (only present when include_chunk_content=True)",
Expand Down
4 changes: 4 additions & 0 deletions lightrag/operate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3553,6 +3553,7 @@ async def _get_vector_context(
if "content" in result:
chunk_with_metadata = {
"content": result["content"],
"full_doc_id": result.get("full_doc_id", ""),
"created_at": result.get("created_at", None),
"file_path": result.get("file_path", "unknown_source"),
"source_type": "vector", # Mark the source type
Expand Down Expand Up @@ -4013,6 +4014,7 @@ async def _merge_all_chunks(
merged_chunks.append(
{
"content": chunk["content"],
"full_doc_id": chunk.get("full_doc_id", ""),
"file_path": chunk.get("file_path", "unknown_source"),
"chunk_id": chunk_id,
}
Expand All @@ -4027,6 +4029,7 @@ async def _merge_all_chunks(
merged_chunks.append(
{
"content": chunk["content"],
"full_doc_id": chunk.get("full_doc_id", ""),
"file_path": chunk.get("file_path", "unknown_source"),
"chunk_id": chunk_id,
}
Expand All @@ -4041,6 +4044,7 @@ async def _merge_all_chunks(
merged_chunks.append(
{
"content": chunk["content"],
"full_doc_id": chunk.get("full_doc_id", ""),
"file_path": chunk.get("file_path", "unknown_source"),
"chunk_id": chunk_id,
}
Expand Down
Loading