Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/AdvancedFeatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Configure parser routing and external parser service endpoints in `.env`:
```bash
LIGHTRAG_PARSER=pdf:mineru,docx:docling,pptx:docling,xlsx:docling,*:legacy
MINERU_ENDPOINT=http://localhost:8000/api/v1/task
DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async
DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async
```

Then upload documents through LightRAG Server. `LIGHTRAG_PARSER` rules match suffixes such as `pdf`, may be separated with commas or semicolons, and are evaluated from left to right. If a rule enables MinerU or Docling, the matching endpoint must be configured before server startup. Per-file hints such as `paper.[mineru].pdf` and `memo.[native].docx` override the default rules. Parsed multimodal sidecars are written by the pipeline and consumed by the normal indexing flow. See [File Processing Configuration](./FileProcessingConfiguration-zh.md) for detailed routing rules and examples.
Expand Down
2 changes: 1 addition & 1 deletion docs/FileProcessingConfiguration-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ filename.[OPTIONS].ext
```bash
LIGHTRAG_PARSER=pdf:mineru,docx:native-iet,*:legacy
MINERU_ENDPOINT=http://localhost:8000/api/v1/task
DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async
DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async
```

```text
Expand Down
10 changes: 6 additions & 4 deletions env.docker-compose-full
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,15 @@ ENTITY_EXTRACTION_USE_JSON=true
# MINERU_POLL_INTERVAL_SECONDS=2
# MINERU_MAX_POLLS=180

# DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async
# DOCLING_POLL_ENDPOINT=http://localhost:8081/v1/convert/file/async/{task_id}
# DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async
# DOCLING_POLL_ENDPOINT=http://localhost:5001/v1/status/poll/{task_id}
# DOCLING_POLL_METHOD=GET
# DOCLING_ID_FIELD=task_id
# DOCLING_STATUS_FIELD=status
# DOCLING_STATUS_FIELD=task_status
# DOCLING_RESULT_URL_FIELD=result_url
# DOCLING_CONTENT_FIELD=content
# DOCLING_RESULT_ENDPOINT=http://localhost:5001/v1/result/{task_id}
# DOCLING_CONTENT_FIELD=document.md_content
# DOCLING_FILE_FIELD=files
# DOCLING_SUCCESS_VALUES=done,success,completed
# DOCLING_FAILED_VALUES=failed,error,cancelled
# DOCLING_POLL_INTERVAL_SECONDS=2
Expand Down
10 changes: 6 additions & 4 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,15 @@ ENTITY_EXTRACTION_USE_JSON=true
# MINERU_POLL_INTERVAL_SECONDS=2
# MINERU_MAX_POLLS=180

# DOCLING_ENDPOINT=http://localhost:8081/v1/convert/file/async
# DOCLING_POLL_ENDPOINT=http://localhost:8081/v1/convert/file/async/{task_id}
# DOCLING_ENDPOINT=http://localhost:5001/v1/convert/file/async
# DOCLING_POLL_ENDPOINT=http://localhost:5001/v1/status/poll/{task_id}
# DOCLING_POLL_METHOD=GET
# DOCLING_ID_FIELD=task_id
# DOCLING_STATUS_FIELD=status
# DOCLING_STATUS_FIELD=task_status
# DOCLING_RESULT_URL_FIELD=result_url
# DOCLING_CONTENT_FIELD=content
# DOCLING_RESULT_ENDPOINT=http://localhost:5001/v1/result/{task_id}
# DOCLING_CONTENT_FIELD=document.md_content
# DOCLING_FILE_FIELD=files
# DOCLING_SUCCESS_VALUES=done,success,completed
# DOCLING_FAILED_VALUES=failed,error,cancelled
# DOCLING_POLL_INTERVAL_SECONDS=2
Expand Down
51 changes: 41 additions & 10 deletions lightrag/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1945,8 +1945,7 @@ async def _finalize_doc_failure(
)
else:
error_msg = (
f"User cancelled {current_file_number}/"
f"{total_files}: {file_path}"
f"User cancelled {current_file_number}/{total_files}: {file_path}"
)
logger.warning(error_msg)
async with pipeline_status_lock:
Expand Down Expand Up @@ -2147,22 +2146,31 @@ async def parse_docling(
endpoint = os.getenv("DOCLING_ENDPOINT", "").strip()
if not endpoint:
raise ValueError("DOCLING_ENDPOINT is required for Docling parsing")
docling_base = endpoint.rstrip("/")
convert_path = "/v1/convert/file/async"
if docling_base.endswith(convert_path):
docling_base = docling_base[: -len(convert_path)]
protocol = {
"upload_url": endpoint,
"poll_url_template": os.getenv(
"DOCLING_POLL_ENDPOINT",
endpoint + "/{task_id}",
docling_base + "/v1/status/poll/{task_id}",
),
"poll_method": os.getenv("DOCLING_POLL_METHOD", "GET"),
"id_field": os.getenv("DOCLING_ID_FIELD", "task_id"),
"status_field": os.getenv("DOCLING_STATUS_FIELD", "status"),
"status_field": os.getenv("DOCLING_STATUS_FIELD", "task_status"),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Recognize Docling failure task status

When this default now reads Docling's task_status, failed async conversions from docling-serve report failure (the API docs list failure as the failed task state), but the default DOCLING_FAILED_VALUES below still only includes failed,error. In that scenario the failure branch is never reached, so a bad conversion keeps polling until DOCLING_MAX_POLLS and is reported as a timeout instead of surfacing the parser error; include failure in the default/env example failure values.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated the Docling default failed states to include failure and added a regression test so that a task_status: failure response raises the parser-service error immediately instead of polling until timeout.

Validation:

  • python -m uv run pytest tests\test_pipeline_release_closure.py::test_parse_docling_uses_docling_serve_async_defaults tests\test_pipeline_release_closure.py::test_protocol_parse_service_extracts_docling_result_markdown tests\test_pipeline_release_closure.py::test_protocol_parse_service_raises_on_docling_failure_status tests\test_pipeline_release_closure.py::test_parse_docling_empty_service_result_raises_without_fallback tests\test_pipeline_release_closure.py::test_parse_mineru_empty_service_result_raises_without_fallback -q
  • python -m uv run ruff check lightrag\pipeline.py tests\test_pipeline_release_closure.py
  • python -m uv run ruff format lightrag\pipeline.py tests\test_pipeline_release_closure.py --check
  • python -m py_compile lightrag\pipeline.py tests\test_pipeline_release_closure.py
  • git diff --check upstream/dev..HEAD

"result_url_field": os.getenv("DOCLING_RESULT_URL_FIELD", "result_url"),
"content_field": os.getenv("DOCLING_CONTENT_FIELD", "content"),
"result_url_template": os.getenv(
"DOCLING_RESULT_ENDPOINT",
docling_base + "/v1/result/{task_id}",
),
"content_field": os.getenv("DOCLING_CONTENT_FIELD", "document.md_content"),
"file_field": os.getenv("DOCLING_FILE_FIELD", "files"),
"success_values": os.getenv(
"DOCLING_SUCCESS_VALUES",
"done,success,succeeded,completed,finished",
),
"failed_values": os.getenv("DOCLING_FAILED_VALUES", "failed,error"),
"failed_values": os.getenv("DOCLING_FAILED_VALUES", "failed,error,failure"),
"poll_interval_seconds": float(
os.getenv("DOCLING_POLL_INTERVAL_SECONDS", "2")
),
Expand Down Expand Up @@ -2224,7 +2232,9 @@ async def _call_protocol_parse_service(
id_field = str(protocol.get("id_field", "id"))
status_field = str(protocol.get("status_field", "status"))
result_url_field = str(protocol.get("result_url_field", "result_url"))
result_url_template = str(protocol.get("result_url_template", "")).strip()
content_field = str(protocol.get("content_field", "content"))
file_field = str(protocol.get("file_field", "file"))
poll_url_tpl = str(protocol.get("poll_url_template", "")).strip()
poll_method = str(protocol.get("poll_method", "GET")).upper()
poll_interval = float(protocol.get("poll_interval_seconds", 2.0))
Expand All @@ -2244,11 +2254,28 @@ async def _call_protocol_parse_service(
if x.strip()
)

def _string_content(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value or None
return json.dumps(value, ensure_ascii=False)

def _extract_response_content(text: str) -> str | None:
if not text:
return None
try:
payload = json.loads(text)
except Exception:
return text
content_val = get_by_path(payload, content_field)
return _string_content(content_val) or text

timeout = httpx.Timeout(120.0, connect=30.0)
async with httpx.AsyncClient(timeout=timeout) as client:
with open(file_path, "rb") as f:
resp = await client.post(
upload_url, files={"file": (Path(file_path).name, f)}
upload_url, files={file_field: (Path(file_path).name, f)}
)
if resp.status_code >= 400:
raise RuntimeError(
Expand All @@ -2258,7 +2285,7 @@ async def _call_protocol_parse_service(
task_id = get_by_path(upload_payload, id_field)
if not task_id:
direct_content = get_by_path(upload_payload, content_field)
return str(direct_content) if direct_content else None
return _string_content(direct_content)
task_id = str(task_id)

poll_url = (
Expand All @@ -2279,12 +2306,16 @@ async def _call_protocol_parse_service(

if status_val in success_values:
result_url = get_by_path(poll_payload, result_url_field)
if not result_url and result_url_template:
result_url = result_url_template.format(
task_id=task_id, trace_id=task_id, id=task_id
)
if result_url:
dl = await client.get(str(result_url))
dl.raise_for_status()
return dl.text
return _extract_response_content(dl.text)
content_val = get_by_path(poll_payload, content_field)
return str(content_val) if content_val else None
return _string_content(content_val)
if status_val in failed_values:
raise RuntimeError(
f"Parse service failed for task {task_id}: {poll_payload}"
Expand Down
213 changes: 213 additions & 0 deletions tests/test_pipeline_release_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
import pytest

import lightrag.pipeline as pipeline_module
from lightrag import LightRAG, ROLES, RoleLLMConfig
from lightrag.base import DocStatus
from lightrag.constants import (
Expand Down Expand Up @@ -2638,3 +2639,215 @@ async def _fake_service(protocol, file_path):
await rag.finalize_storages()

asyncio.run(_run())


@pytest.mark.offline
def test_parse_docling_uses_docling_serve_async_defaults(tmp_path, monkeypatch):
async def _run():
rag = _new_rag(tmp_path)
await rag.initialize_storages()

src_file = tmp_path / "demo.pdf"
src_file.write_bytes(b"fake-pdf")

captured_protocol = {}

async def _fake_service(protocol, file_path):
captured_protocol.update(protocol)
assert file_path == str(src_file)
return "# Extracted markdown"

monkeypatch.setattr(rag, "_call_protocol_parse_service", _fake_service)
monkeypatch.setenv(
"DOCLING_ENDPOINT", "http://localhost:5001/v1/convert/file/async"
)

parsed = await rag.parse_docling(
doc_id="doc-docling-defaults",
file_path=str(src_file),
content_data={"content": ""},
)

assert parsed["content"] == "# Extracted markdown"
assert (
captured_protocol["poll_url_template"]
== "http://localhost:5001/v1/status/poll/{task_id}"
)
assert (
captured_protocol["result_url_template"]
== "http://localhost:5001/v1/result/{task_id}"
)
assert captured_protocol["status_field"] == "task_status"
assert captured_protocol["content_field"] == "document.md_content"
assert captured_protocol["file_field"] == "files"
assert captured_protocol["failed_values"] == "failed,error,failure"

await rag.finalize_storages()

asyncio.run(_run())


@pytest.mark.offline
def test_protocol_parse_service_raises_on_docling_failure_status(tmp_path, monkeypatch):
class FakeTimeout:
def __init__(self, *args, **kwargs):
pass

class FakeResponse:
def __init__(self, payload):
self.status_code = 200
self.text = json.dumps(payload)

def json(self):
return json.loads(self.text)

class FakeAsyncClient:
def __init__(self, *args, **kwargs):
pass

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return None

async def post(self, url, files=None, json=None):
del url, files, json
return FakeResponse({"task_id": "job-1", "task_status": "queued"})

async def get(self, url, params=None):
del url, params
return FakeResponse(
{
"task_id": "job-1",
"task_status": "failure",
"error": "conversion failed",
}
)

class FakeHttpx:
Timeout = FakeTimeout
AsyncClient = FakeAsyncClient

async def _fake_sleep(delay):
del delay

async def _run():
monkeypatch.setattr(pipeline_module, "httpx", FakeHttpx)
monkeypatch.setattr(pipeline_module.asyncio, "sleep", _fake_sleep)

src_file = tmp_path / "demo.pdf"
src_file.write_bytes(b"fake-pdf")
rag = _new_rag(tmp_path / "work")

with pytest.raises(RuntimeError, match="Parse service failed"):
await rag._call_protocol_parse_service(
{
"upload_url": "http://localhost:5001/v1/convert/file/async",
"poll_url_template": "http://localhost:5001/v1/status/poll/{task_id}",
"id_field": "task_id",
"status_field": "task_status",
"content_field": "document.md_content",
"file_field": "files",
"success_values": "success",
"failed_values": "failed,error,failure",
"poll_interval_seconds": 0,
"max_polls": 1,
},
str(src_file),
)

asyncio.run(_run())


@pytest.mark.offline
def test_protocol_parse_service_extracts_docling_result_markdown(tmp_path, monkeypatch):
posted_files = []
requested_urls = []

class FakeTimeout:
def __init__(self, *args, **kwargs):
pass

class FakeResponse:
def __init__(self, payload):
self.status_code = 200
self.text = json.dumps(payload)

def json(self):
return json.loads(self.text)

def raise_for_status(self):
pass

class FakeAsyncClient:
def __init__(self, *args, **kwargs):
pass

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return None

async def post(self, url, files=None, json=None):
del json
posted_files.append(files)
requested_urls.append(url)
return FakeResponse({"task_id": "task-1", "task_status": "queued"})

async def get(self, url, params=None):
del params
requested_urls.append(url)
if url.endswith("/v1/status/poll/task-1"):
return FakeResponse({"task_id": "task-1", "task_status": "success"})
return FakeResponse(
{
"document": {
"filename": "demo.pdf",
"md_content": "# Docling markdown\n\nBody",
},
"status": "success",
}
)

class FakeHttpx:
Timeout = FakeTimeout
AsyncClient = FakeAsyncClient

async def _fake_sleep(delay):
del delay

async def _run():
monkeypatch.setattr(pipeline_module, "httpx", FakeHttpx)
monkeypatch.setattr(pipeline_module.asyncio, "sleep", _fake_sleep)

src_file = tmp_path / "demo.pdf"
src_file.write_bytes(b"fake-pdf")
rag = _new_rag(tmp_path / "work")

result = await rag._call_protocol_parse_service(
{
"upload_url": "http://localhost:5001/v1/convert/file/async",
"poll_url_template": "http://localhost:5001/v1/status/poll/{task_id}",
"result_url_template": "http://localhost:5001/v1/result/{task_id}",
"id_field": "task_id",
"status_field": "task_status",
"content_field": "document.md_content",
"file_field": "files",
"success_values": "success",
"poll_interval_seconds": 0,
"max_polls": 1,
},
str(src_file),
)

assert result == "# Docling markdown\n\nBody"
assert "files" in posted_files[0]
assert requested_urls == [
"http://localhost:5001/v1/convert/file/async",
"http://localhost:5001/v1/status/poll/task-1",
"http://localhost:5001/v1/result/task-1",
]

asyncio.run(_run())
Loading