Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ COPY uv.lock .

# Install base, API, and offline extras without the project to improve caching
RUN --mount=type=cache,target=/root/.local/share/uv \
uv sync --frozen --no-dev --extra api --extra offline --no-install-project --no-editable
uv sync --frozen --no-dev --extra api --extra offline --extra observability --no-install-project --no-editable

# Copy project sources after dependency layer
COPY lightrag/ ./lightrag/
Expand All @@ -56,7 +56,7 @@ COPY --from=frontend-builder /app/lightrag/api/webui ./lightrag/api/webui

# Sync project in non-editable mode and ensure pip is available for runtime installs
RUN --mount=type=cache,target=/root/.local/share/uv \
uv sync --frozen --no-dev --extra api --extra offline --no-editable \
uv sync --frozen --no-dev --extra api --extra offline --extra observability --no-editable \
&& /app/.venv/bin/python -m ensurepip --upgrade

# Prepare offline cache directory and pre-populate tiktoken data
Expand Down Expand Up @@ -89,7 +89,7 @@ ENV PATH=/app/.venv/bin:/root/.local/bin:$PATH
# Install dependencies with uv sync (uses locked versions from uv.lock)
# And ensure pip is available for runtime installs
RUN --mount=type=cache,target=/root/.local/share/uv \
uv sync --frozen --no-dev --extra api --extra offline --no-editable \
uv sync --frozen --no-dev --extra api --extra offline --extra observability --no-editable \
&& /app/.venv/bin/python -m ensurepip --upgrade

# Create persistent data directories AFTER package installation
Expand Down
2 changes: 1 addition & 1 deletion env.example
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ MEMGRAPH_DATABASE=memgraph

###########################################################
### Langfuse Observability Configuration
### Only works with LLM provided by OpenAI compatible API
### Works with ALL LLM providers (OpenAI, Ollama, Gemini, Bedrock, etc.)
### Install with: pip install lightrag-hku[observability]
### Sign up at: https://cloud.langfuse.com or self-host
###########################################################
Expand Down
3 changes: 3 additions & 0 deletions lightrag/api/lightrag_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
update_uvicorn_mode_config,
get_default_host,
)
from lightrag.tracing import shutdown as shutdown_tracing
from lightrag.utils import get_env_value
from lightrag import LightRAG, __version__ as core_version
from lightrag.api import __api_version__
Expand Down Expand Up @@ -364,6 +365,8 @@ async def lifespan(app: FastAPI):
# Clean up database connections
await rag.finalize_storages()

shutdown_tracing()

if "LIGHTRAG_GUNICORN_MODE" not in os.environ:
# Only perform cleanup in Uvicorn single-process mode
logger.debug("Unvicorn Mode: finalizing shared storage...")
Expand Down
81 changes: 81 additions & 0 deletions lightrag/lightrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,27 @@
make_relation_chunk_key,
normalize_source_ids_limit_method,
)
from lightrag.tracing import (
create_traced_llm_wrapper,
is_tracing_enabled,
flush as flush_tracing,
)
from lightrag.types import KnowledgeGraph

try:
from langfuse import get_client as langfuse_client, observe as langfuse_observe
except ImportError:

def langfuse_observe(**kwargs): # type: ignore[misc]
def _identity(func):
return func

return _identity

def langfuse_client(): # type: ignore[misc]
return None


from dotenv import load_dotenv

# use the .env that is inside the current folder
Expand Down Expand Up @@ -751,6 +771,11 @@ def __post_init__(self):
)
)

if is_tracing_enabled():
self.llm_model_func = create_traced_llm_wrapper(
self.llm_model_func, model_name=self.llm_model_name
)

self._storages_status = StoragesStatus.CREATED

async def initialize_storages(self):
Expand Down Expand Up @@ -839,6 +864,8 @@ async def finalize_storages(self):
else:
logger.debug("All storages finalized successfully")

flush_tracing()

self._storages_status = StoragesStatus.FINALIZED

async def check_and_migrate_data(self):
Expand Down Expand Up @@ -1233,6 +1260,7 @@ def insert(
)
)

@langfuse_observe(name="insert", capture_input=False)
async def ainsert(
self,
input: str | list[str],
Expand Down Expand Up @@ -1261,6 +1289,14 @@ async def ainsert(
if track_id is None:
track_id = generate_track_id("insert")

doc_count = len(input) if isinstance(input, list) else 1
client = langfuse_client()
if client is not None:
client.update_current_span(
input={"doc_count": doc_count, "track_id": track_id},
metadata={"workspace": self.workspace},
)

await self.apipeline_enqueue_documents(input, ids, file_paths, track_id)
await self.apipeline_process_enqueue_documents(
split_by_character, split_by_character_only
Expand Down Expand Up @@ -1752,7 +1788,15 @@ async def apipeline_process_enqueue_documents(
4. Process each chunk for entity and relation extraction
5. Update the document status
"""
await self._apipeline_process_enqueue_documents_impl(
split_by_character, split_by_character_only
)

async def _apipeline_process_enqueue_documents_impl(
self,
split_by_character: str | None = None,
split_by_character_only: bool = False,
) -> None:
# Get pipeline status shared data and lock
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
Expand Down Expand Up @@ -1869,6 +1913,11 @@ async def apipeline_process_enqueue_documents(
# Create a semaphore to limit the number of concurrent file processing
semaphore = asyncio.Semaphore(self.max_parallel_insert)

@langfuse_observe(
name="lightrag-insert-doc",
capture_input=False,
capture_output=False,
)
async def process_document(
doc_id: str,
status_doc: DocProcessingStatus,
Expand All @@ -1889,6 +1938,13 @@ async def process_document(
chunks: dict[str, Any] = {}
content_data: dict[str, Any] | None = None

client = langfuse_client()
if client is not None:
client.update_current_span(
input={"doc_id": doc_id, "file_path": file_path},
metadata={"workspace": self.workspace},
)

def get_failed_chunk_snapshot() -> tuple[list[str], int]:
if chunks:
chunk_ids = list(chunks.keys())
Expand All @@ -1900,6 +1956,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]:
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None

try:
# Resolve file_path from full_docs before honoring a queued
# cancellation so corrupted doc_status placeholders do not
Expand Down Expand Up @@ -2623,6 +2680,7 @@ def query_data(
loop = always_get_an_event_loop()
return loop.run_until_complete(self.aquery_data(query, param))

@langfuse_observe(name="query-data", capture_input=False)
async def aquery_data(
self,
query: str,
Expand Down Expand Up @@ -2734,6 +2792,19 @@ async def aquery_data(
actual data is nested under the 'data' field, with 'status' and 'message'
fields at the top level.
"""
client = langfuse_client()
if client is not None:
client.update_current_span(
input={"query": query[:200] if query else "", "mode": param.mode},
metadata={"workspace": self.workspace},
)
return await self._aquery_data_impl(query, param)

async def _aquery_data_impl(
self,
query: str,
param: QueryParam,
) -> dict[str, Any]:
global_config = asdict(self)

# Create a copy of param to avoid modifying the original
Expand Down Expand Up @@ -2830,6 +2901,7 @@ async def aquery_data(
await self._query_done()
return final_data

@langfuse_observe(name="query", capture_input=False)
async def aquery_llm(
self,
query: str,
Expand All @@ -2852,6 +2924,13 @@ async def aquery_llm(
"""
logger.debug(f"[aquery_llm] Query param: {param}")

client = langfuse_client()
if client is not None:
client.update_current_span(
input={"query": query if query else "", "mode": param.mode},
metadata={"workspace": self.workspace, "stream": param.stream},
)

global_config = asdict(self)

try:
Expand Down Expand Up @@ -2955,6 +3034,8 @@ async def aquery_llm(

except Exception as e:
logger.error(f"Query failed: {e}")
if client is not None:
client.update_current_span(level="ERROR", status_message=str(e))
# Return error response
return {
"status": "failure",
Expand Down
57 changes: 27 additions & 30 deletions lightrag/llm/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,7 @@

from dotenv import load_dotenv

# Try to import Langfuse for LLM observability (optional)
# Falls back to standard OpenAI client if not available
# Langfuse requires proper configuration to work correctly
LANGFUSE_ENABLED = False
try:
# Check if required Langfuse environment variables are set
langfuse_public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY")

# Only enable Langfuse if both keys are configured
if langfuse_public_key and langfuse_secret_key:
from langfuse.openai import AsyncOpenAI # type: ignore[import-untyped]

LANGFUSE_ENABLED = True
logger.info("Langfuse observability enabled for OpenAI client")
else:
from openai import AsyncOpenAI

logger.debug(
"Langfuse environment variables not configured, using standard OpenAI client"
)
except ImportError:
from openai import AsyncOpenAI

logger.debug("Langfuse not available, using standard OpenAI client")
from openai import AsyncOpenAI

# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
Expand Down Expand Up @@ -457,17 +433,27 @@ async def inner():
cot_active = False

# After streaming is complete, track token usage
if token_tracker and final_chunk_usage:
# Use actual usage from the API
if final_chunk_usage:
token_counts = {
"prompt_tokens": getattr(final_chunk_usage, "prompt_tokens", 0),
"completion_tokens": getattr(
final_chunk_usage, "completion_tokens", 0
),
"total_tokens": getattr(final_chunk_usage, "total_tokens", 0),
}
token_tracker.add_usage(token_counts)
if token_tracker:
token_tracker.add_usage(token_counts)
logger.debug(f"Streaming token usage (from API): {token_counts}")

from lightrag.tracing import is_tracing_enabled, report_token_usage

if is_tracing_enabled():
report_token_usage(
{
"input": token_counts["prompt_tokens"],
"output": token_counts["completion_tokens"],
}
)
elif token_tracker:
logger.debug("No usage information available in streaming response")
except Exception as e:
Expand Down Expand Up @@ -609,15 +595,26 @@ async def inner():
if r"\u" in final_content:
final_content = safe_unicode_decode(final_content.encode("utf-8"))

if token_tracker and hasattr(response, "usage"):
if hasattr(response, "usage") and response.usage is not None:
token_counts = {
"prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
"completion_tokens": getattr(
response.usage, "completion_tokens", 0
),
"total_tokens": getattr(response.usage, "total_tokens", 0),
}
token_tracker.add_usage(token_counts)
if token_tracker:
token_tracker.add_usage(token_counts)

from lightrag.tracing import is_tracing_enabled, report_token_usage

if is_tracing_enabled():
report_token_usage(
{
"input": token_counts["prompt_tokens"],
"output": token_counts["completion_tokens"],
}
)

logger.debug(f"Response content len: {len(final_content)}")
verbose_debug(f"Response: {response}")
Expand Down
Loading
Loading