diff --git a/docs/AdvancedFeatures.md b/docs/AdvancedFeatures.md index c4a58042a4..86ff4c9a56 100644 --- a/docs/AdvancedFeatures.md +++ b/docs/AdvancedFeatures.md @@ -20,7 +20,7 @@ Configure parser routing and external parser service endpoints in `.env`: ```bash LIGHTRAG_PARSER=pdf:mineru,docx:docling,pptx:docling,xlsx:docling,*:legacy MINERU_ENDPOINT=http://localhost:8000/api/v1/task -DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async +DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async ``` Then upload documents through LightRAG Server. `LIGHTRAG_PARSER` rules match suffixes such as `pdf`, may be separated with commas or semicolons, and are evaluated from left to right. If a rule enables MinerU or Docling, the matching endpoint must be configured before server startup. Per-file hints such as `paper.[mineru].pdf` and `memo.[native].docx` override the default rules. Parsed multimodal sidecars are written by the pipeline and consumed by the normal indexing flow. See [File Processing Configuration](./FileProcessingConfiguration-zh.md) for detailed routing rules and examples. diff --git a/docs/FileProcessingConfiguration-zh.md b/docs/FileProcessingConfiguration-zh.md index 57201bd4c5..adcb199100 100644 --- a/docs/FileProcessingConfiguration-zh.md +++ b/docs/FileProcessingConfiguration-zh.md @@ -32,7 +32,7 @@ filename.[OPTIONS].ext ```bash LIGHTRAG_PARSER=pdf:mineru,docx:native-iet,*:legacy MINERU_ENDPOINT=http://localhost:8000/api/v1/task -DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async +DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async ``` ```text diff --git a/env.docker-compose-full b/env.docker-compose-full index 40038c146a..3e4d4b0c7c 100644 --- a/env.docker-compose-full +++ b/env.docker-compose-full @@ -254,13 +254,15 @@ ENTITY_EXTRACTION_USE_JSON=true # MINERU_POLL_INTERVAL_SECONDS=2 # MINERU_MAX_POLLS=180 -# DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async -# DOCLING_POLL_ENDPOINT=http://localhost:8081/v1/convert/file/async/{task_id} +# DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async +# DOCLING_POLL_ENDPOINT=http://localhost:5001/v1/status/poll/{task_id} # DOCLING_POLL_METHOD=GET # DOCLING_ID_FIELD=task_id -# DOCLING_STATUS_FIELD=status +# DOCLING_STATUS_FIELD=task_status # DOCLING_RESULT_URL_FIELD=result_url -# DOCLING_CONTENT_FIELD=content +# DOCLING_RESULT_ENDPOINT=http://localhost:5001/v1/result/{task_id} +# DOCLING_CONTENT_FIELD=document.md_content +# DOCLING_FILE_FIELD=files # DOCLING_SUCCESS_VALUES=done,success,completed # DOCLING_FAILED_VALUES=failed,error,cancelled # DOCLING_POLL_INTERVAL_SECONDS=2 diff --git a/env.example b/env.example index 31663c0bf6..4ca1cf24f2 100644 --- a/env.example +++ b/env.example @@ -265,13 +265,15 @@ ENTITY_EXTRACTION_USE_JSON=true # MINERU_POLL_INTERVAL_SECONDS=2 # MINERU_MAX_POLLS=180 -# DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async -# DOCLING_POLL_ENDPOINT=http://localhost:8081/v1/convert/file/async/{task_id} +# DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async +# DOCLING_POLL_ENDPOINT=http://localhost:5001/v1/status/poll/{task_id} # DOCLING_POLL_METHOD=GET # DOCLING_ID_FIELD=task_id -# DOCLING_STATUS_FIELD=status +# DOCLING_STATUS_FIELD=task_status # DOCLING_RESULT_URL_FIELD=result_url -# DOCLING_CONTENT_FIELD=content +# DOCLING_RESULT_ENDPOINT=http://localhost:5001/v1/result/{task_id} +# DOCLING_CONTENT_FIELD=document.md_content +# DOCLING_FILE_FIELD=files # DOCLING_SUCCESS_VALUES=done,success,completed # DOCLING_FAILED_VALUES=failed,error,cancelled # DOCLING_POLL_INTERVAL_SECONDS=2 diff --git a/lightrag/pipeline.py b/lightrag/pipeline.py index 6c57d69e3a..e96873dcec 100644 --- a/lightrag/pipeline.py +++ b/lightrag/pipeline.py @@ -1945,8 +1945,7 @@ async def _finalize_doc_failure( ) else: error_msg = ( - f"User cancelled {current_file_number}/" - f"{total_files}: {file_path}" + f"User cancelled {current_file_number}/{total_files}: {file_path}" ) logger.warning(error_msg) async with pipeline_status_lock: @@ -2147,22 +2146,31 @@ async def parse_docling( endpoint = os.getenv("DOCLING_ENDPOINT", "").strip() if not endpoint: raise ValueError("DOCLING_ENDPOINT is required for Docling parsing") + docling_base = endpoint.rstrip("/") + convert_path = "/v1/convert/file/async" + if docling_base.endswith(convert_path): + docling_base = docling_base[: -len(convert_path)] protocol = { "upload_url": endpoint, "poll_url_template": os.getenv( "DOCLING_POLL_ENDPOINT", - endpoint + "/{task_id}", + docling_base + "/v1/status/poll/{task_id}", ), "poll_method": os.getenv("DOCLING_POLL_METHOD", "GET"), "id_field": os.getenv("DOCLING_ID_FIELD", "task_id"), - "status_field": os.getenv("DOCLING_STATUS_FIELD", "status"), + "status_field": os.getenv("DOCLING_STATUS_FIELD", "task_status"), "result_url_field": os.getenv("DOCLING_RESULT_URL_FIELD", "result_url"), - "content_field": os.getenv("DOCLING_CONTENT_FIELD", "content"), + "result_url_template": os.getenv( + "DOCLING_RESULT_ENDPOINT", + docling_base + "/v1/result/{task_id}", + ), + "content_field": os.getenv("DOCLING_CONTENT_FIELD", "document.md_content"), + "file_field": os.getenv("DOCLING_FILE_FIELD", "files"), "success_values": os.getenv( "DOCLING_SUCCESS_VALUES", "done,success,succeeded,completed,finished", ), - "failed_values": os.getenv("DOCLING_FAILED_VALUES", "failed,error"), + "failed_values": os.getenv("DOCLING_FAILED_VALUES", "failed,error,failure"), "poll_interval_seconds": float( os.getenv("DOCLING_POLL_INTERVAL_SECONDS", "2") ), @@ -2224,7 +2232,9 @@ async def _call_protocol_parse_service( id_field = str(protocol.get("id_field", "id")) status_field = str(protocol.get("status_field", "status")) result_url_field = str(protocol.get("result_url_field", "result_url")) + result_url_template = str(protocol.get("result_url_template", "")).strip() content_field = str(protocol.get("content_field", "content")) + file_field = str(protocol.get("file_field", "file")) poll_url_tpl = str(protocol.get("poll_url_template", "")).strip() poll_method = str(protocol.get("poll_method", "GET")).upper() poll_interval = float(protocol.get("poll_interval_seconds", 2.0)) @@ -2244,11 +2254,28 @@ async def _call_protocol_parse_service( if x.strip() ) + def _string_content(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + return value or None + return json.dumps(value, ensure_ascii=False) + + def _extract_response_content(text: str) -> str | None: + if not text: + return None + try: + payload = json.loads(text) + except Exception: + return text + content_val = get_by_path(payload, content_field) + return _string_content(content_val) or text + timeout = httpx.Timeout(120.0, connect=30.0) async with httpx.AsyncClient(timeout=timeout) as client: with open(file_path, "rb") as f: resp = await client.post( - upload_url, files={"file": (Path(file_path).name, f)} + upload_url, files={file_field: (Path(file_path).name, f)} ) if resp.status_code >= 400: raise RuntimeError( @@ -2258,7 +2285,7 @@ async def _call_protocol_parse_service( task_id = get_by_path(upload_payload, id_field) if not task_id: direct_content = get_by_path(upload_payload, content_field) - return str(direct_content) if direct_content else None + return _string_content(direct_content) task_id = str(task_id) poll_url = ( @@ -2279,12 +2306,16 @@ async def _call_protocol_parse_service( if status_val in success_values: result_url = get_by_path(poll_payload, result_url_field) + if not result_url and result_url_template: + result_url = result_url_template.format( + task_id=task_id, trace_id=task_id, id=task_id + ) if result_url: dl = await client.get(str(result_url)) dl.raise_for_status() - return dl.text + return _extract_response_content(dl.text) content_val = get_by_path(poll_payload, content_field) - return str(content_val) if content_val else None + return _string_content(content_val) if status_val in failed_values: raise RuntimeError( f"Parse service failed for task {task_id}: {poll_payload}" diff --git a/tests/test_pipeline_release_closure.py b/tests/test_pipeline_release_closure.py index 76bc2aee7a..dffbd37efe 100644 --- a/tests/test_pipeline_release_closure.py +++ b/tests/test_pipeline_release_closure.py @@ -5,6 +5,7 @@ import numpy as np import pytest +import lightrag.pipeline as pipeline_module from lightrag import LightRAG, ROLES, RoleLLMConfig from lightrag.base import DocStatus from lightrag.constants import ( @@ -2638,3 +2639,215 @@ async def _fake_service(protocol, file_path): await rag.finalize_storages() asyncio.run(_run()) + + +@pytest.mark.offline +def test_parse_docling_uses_docling_serve_async_defaults(tmp_path, monkeypatch): + async def _run(): + rag = _new_rag(tmp_path) + await rag.initialize_storages() + + src_file = tmp_path / "demo.pdf" + src_file.write_bytes(b"fake-pdf") + + captured_protocol = {} + + async def _fake_service(protocol, file_path): + captured_protocol.update(protocol) + assert file_path == str(src_file) + return "# Extracted markdown" + + monkeypatch.setattr(rag, "_call_protocol_parse_service", _fake_service) + monkeypatch.setenv( + "DOCLING_ENDPOINT", "http://localhost:5001/v1/convert/file/async" + ) + + parsed = await rag.parse_docling( + doc_id="doc-docling-defaults", + file_path=str(src_file), + content_data={"content": ""}, + ) + + assert parsed["content"] == "# Extracted markdown" + assert ( + captured_protocol["poll_url_template"] + == "http://localhost:5001/v1/status/poll/{task_id}" + ) + assert ( + captured_protocol["result_url_template"] + == "http://localhost:5001/v1/result/{task_id}" + ) + assert captured_protocol["status_field"] == "task_status" + assert captured_protocol["content_field"] == "document.md_content" + assert captured_protocol["file_field"] == "files" + assert captured_protocol["failed_values"] == "failed,error,failure" + + await rag.finalize_storages() + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_protocol_parse_service_raises_on_docling_failure_status(tmp_path, monkeypatch): + class FakeTimeout: + def __init__(self, *args, **kwargs): + pass + + class FakeResponse: + def __init__(self, payload): + self.status_code = 200 + self.text = json.dumps(payload) + + def json(self): + return json.loads(self.text) + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def post(self, url, files=None, json=None): + del url, files, json + return FakeResponse({"task_id": "job-1", "task_status": "queued"}) + + async def get(self, url, params=None): + del url, params + return FakeResponse( + { + "task_id": "job-1", + "task_status": "failure", + "error": "conversion failed", + } + ) + + class FakeHttpx: + Timeout = FakeTimeout + AsyncClient = FakeAsyncClient + + async def _fake_sleep(delay): + del delay + + async def _run(): + monkeypatch.setattr(pipeline_module, "httpx", FakeHttpx) + monkeypatch.setattr(pipeline_module.asyncio, "sleep", _fake_sleep) + + src_file = tmp_path / "demo.pdf" + src_file.write_bytes(b"fake-pdf") + rag = _new_rag(tmp_path / "work") + + with pytest.raises(RuntimeError, match="Parse service failed"): + await rag._call_protocol_parse_service( + { + "upload_url": "http://localhost:5001/v1/convert/file/async", + "poll_url_template": "http://localhost:5001/v1/status/poll/{task_id}", + "id_field": "task_id", + "status_field": "task_status", + "content_field": "document.md_content", + "file_field": "files", + "success_values": "success", + "failed_values": "failed,error,failure", + "poll_interval_seconds": 0, + "max_polls": 1, + }, + str(src_file), + ) + + asyncio.run(_run()) + + +@pytest.mark.offline +def test_protocol_parse_service_extracts_docling_result_markdown(tmp_path, monkeypatch): + posted_files = [] + requested_urls = [] + + class FakeTimeout: + def __init__(self, *args, **kwargs): + pass + + class FakeResponse: + def __init__(self, payload): + self.status_code = 200 + self.text = json.dumps(payload) + + def json(self): + return json.loads(self.text) + + def raise_for_status(self): + pass + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + async def post(self, url, files=None, json=None): + del json + posted_files.append(files) + requested_urls.append(url) + return FakeResponse({"task_id": "task-1", "task_status": "queued"}) + + async def get(self, url, params=None): + del params + requested_urls.append(url) + if url.endswith("/v1/status/poll/task-1"): + return FakeResponse({"task_id": "task-1", "task_status": "success"}) + return FakeResponse( + { + "document": { + "filename": "demo.pdf", + "md_content": "# Docling markdown\n\nBody", + }, + "status": "success", + } + ) + + class FakeHttpx: + Timeout = FakeTimeout + AsyncClient = FakeAsyncClient + + async def _fake_sleep(delay): + del delay + + async def _run(): + monkeypatch.setattr(pipeline_module, "httpx", FakeHttpx) + monkeypatch.setattr(pipeline_module.asyncio, "sleep", _fake_sleep) + + src_file = tmp_path / "demo.pdf" + src_file.write_bytes(b"fake-pdf") + rag = _new_rag(tmp_path / "work") + + result = await rag._call_protocol_parse_service( + { + "upload_url": "http://localhost:5001/v1/convert/file/async", + "poll_url_template": "http://localhost:5001/v1/status/poll/{task_id}", + "result_url_template": "http://localhost:5001/v1/result/{task_id}", + "id_field": "task_id", + "status_field": "task_status", + "content_field": "document.md_content", + "file_field": "files", + "success_values": "success", + "poll_interval_seconds": 0, + "max_polls": 1, + }, + str(src_file), + ) + + assert result == "# Docling markdown\n\nBody" + assert "files" in posted_files[0] + assert requested_urls == [ + "http://localhost:5001/v1/convert/file/async", + "http://localhost:5001/v1/status/poll/task-1", + "http://localhost:5001/v1/result/task-1", + ] + + asyncio.run(_run())