Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ COPY uv.lock .

# Install base, API, and offline extras without the project to improve caching
RUN --mount=type=cache,target=/root/.local/share/uv \
uv sync --frozen --no-dev --extra api --extra offline --no-install-project --no-editable
uv sync --frozen --no-dev --extra api --extra offline --extra observability --no-install-project --no-editable

# Copy project sources after dependency layer
COPY lightrag/ ./lightrag/
Expand All @@ -56,7 +56,7 @@ COPY --from=frontend-builder /app/lightrag/api/webui ./lightrag/api/webui

# Sync project in non-editable mode and ensure pip is available for runtime installs
RUN --mount=type=cache,target=/root/.local/share/uv \
uv sync --frozen --no-dev --extra api --extra offline --no-editable \
uv sync --frozen --no-dev --extra api --extra offline --extra observability --no-editable \
&& /app/.venv/bin/python -m ensurepip --upgrade

# Prepare offline cache directory and pre-populate tiktoken data
Expand Down Expand Up @@ -89,7 +89,7 @@ ENV PATH=/app/.venv/bin:/root/.local/bin:$PATH
# Install dependencies with uv sync (uses locked versions from uv.lock)
# And ensure pip is available for runtime installs
RUN --mount=type=cache,target=/root/.local/share/uv \
uv sync --frozen --no-dev --extra api --extra offline --no-editable \
uv sync --frozen --no-dev --extra api --extra offline --extra observability --no-editable \
&& /app/.venv/bin/python -m ensurepip --upgrade

# Create persistent data directories AFTER package installation
Expand Down
2 changes: 1 addition & 1 deletion env.example
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ MEMGRAPH_DATABASE=memgraph

###########################################################
### Langfuse Observability Configuration
### Only works with LLM provided by OpenAI compatible API
### Works with ALL LLM providers (OpenAI, Ollama, Gemini, Bedrock, etc.)
### Install with: pip install lightrag-hku[observability]
### Sign up at: https://cloud.langfuse.com or self-host
###########################################################
Expand Down
77 changes: 77 additions & 0 deletions lightrag/lightrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,25 @@
make_relation_chunk_key,
normalize_source_ids_limit_method,
)
from lightrag.tracing import (
create_traced_llm_wrapper,
is_tracing_enabled,
shutdown as shutdown_tracing,
)
from lightrag.types import KnowledgeGraph

try:
from langfuse import get_client as langfuse_client, observe as langfuse_observe
except ImportError:

def langfuse_observe(**kwargs): # type: ignore[misc]
def _identity(func):
return func

return _identity

def langfuse_client(): # type: ignore[misc]
return None
from dotenv import load_dotenv

# use the .env that is inside the current folder
Expand Down Expand Up @@ -751,6 +769,11 @@ def __post_init__(self):
)
)

if is_tracing_enabled():
self.llm_model_func = create_traced_llm_wrapper(
self.llm_model_func, model_name=self.llm_model_name
)

self._storages_status = StoragesStatus.CREATED

async def initialize_storages(self):
Expand Down Expand Up @@ -839,6 +862,8 @@ async def finalize_storages(self):
else:
logger.debug("All storages finalized successfully")

shutdown_tracing()
Comment thread
ganievs marked this conversation as resolved.
Outdated

self._storages_status = StoragesStatus.FINALIZED

async def check_and_migrate_data(self):
Expand Down Expand Up @@ -1233,6 +1258,7 @@ def insert(
)
)

@langfuse_observe(name="insert", capture_input=False)
async def ainsert(
self,
input: str | list[str],
Expand Down Expand Up @@ -1261,6 +1287,14 @@ async def ainsert(
if track_id is None:
track_id = generate_track_id("insert")

doc_count = len(input) if isinstance(input, list) else 1
client = langfuse_client()
if client is not None:
client.update_current_span(
input={"doc_count": doc_count, "track_id": track_id},
metadata={"workspace": self.workspace},
)

await self.apipeline_enqueue_documents(input, ids, file_paths, track_id)
await self.apipeline_process_enqueue_documents(
split_by_character, split_by_character_only
Expand Down Expand Up @@ -1752,7 +1786,15 @@ async def apipeline_process_enqueue_documents(
4. Process each chunk for entity and relation extraction
5. Update the document status
"""
await self._apipeline_process_enqueue_documents_impl(
split_by_character, split_by_character_only
)

async def _apipeline_process_enqueue_documents_impl(
self,
split_by_character: str | None = None,
split_by_character_only: bool = False,
) -> None:
# Get pipeline status shared data and lock
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
Expand Down Expand Up @@ -1798,6 +1840,7 @@ async def apipeline_process_enqueue_documents(
)
return


try:
# Process documents until no more documents or requests
while True:
Expand Down Expand Up @@ -1869,6 +1912,7 @@ async def apipeline_process_enqueue_documents(
# Create a semaphore to limit the number of concurrent file processing
semaphore = asyncio.Semaphore(self.max_parallel_insert)

@langfuse_observe(name="lightrag-insert-doc", capture_input=False, capture_output=False)
async def process_document(
doc_id: str,
status_doc: DocProcessingStatus,
Expand All @@ -1889,6 +1933,13 @@ async def process_document(
chunks: dict[str, Any] = {}
content_data: dict[str, Any] | None = None

client = langfuse_client()
if client is not None:
client.update_current_span(
input={"doc_id": doc_id, "file_path": file_path},
metadata={"workspace": self.workspace},
)

def get_failed_chunk_snapshot() -> tuple[list[str], int]:
if chunks:
chunk_ids = list(chunks.keys())
Expand All @@ -1900,6 +1951,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]:
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None

try:
# Resolve file_path from full_docs before honoring a queued
# cancellation so corrupted doc_status placeholders do not
Expand Down Expand Up @@ -2251,6 +2303,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]:
}
)


# Create processing tasks for all documents
doc_tasks = []
for doc_id, status_doc in to_process_docs.items():
Expand Down Expand Up @@ -2623,6 +2676,7 @@ def query_data(
loop = always_get_an_event_loop()
return loop.run_until_complete(self.aquery_data(query, param))

@langfuse_observe(name="query-data", capture_input=False)
async def aquery_data(
self,
query: str,
Expand Down Expand Up @@ -2734,6 +2788,19 @@ async def aquery_data(
actual data is nested under the 'data' field, with 'status' and 'message'
fields at the top level.
"""
client = langfuse_client()
if client is not None:
client.update_current_span(
input={"query": query[:200] if query else "", "mode": param.mode},
metadata={"workspace": self.workspace},
)
return await self._aquery_data_impl(query, param)

async def _aquery_data_impl(
self,
query: str,
param: QueryParam,
) -> dict[str, Any]:
global_config = asdict(self)

# Create a copy of param to avoid modifying the original
Expand Down Expand Up @@ -2830,6 +2897,7 @@ async def aquery_data(
await self._query_done()
return final_data

@langfuse_observe(name="query", capture_input=False)
async def aquery_llm(
self,
query: str,
Expand All @@ -2852,6 +2920,13 @@ async def aquery_llm(
"""
logger.debug(f"[aquery_llm] Query param: {param}")

client = langfuse_client()
if client is not None:
client.update_current_span(
input={"query": query if query else "", "mode": param.mode},
metadata={"workspace": self.workspace, "stream": param.stream},
)

global_config = asdict(self)

try:
Expand Down Expand Up @@ -2955,6 +3030,8 @@ async def aquery_llm(

except Exception as e:
logger.error(f"Query failed: {e}")
if client is not None:
client.update_current_span(level="ERROR", status_message=str(e))
# Return error response
return {
"status": "failure",
Expand Down
51 changes: 21 additions & 30 deletions lightrag/llm/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,7 @@

from dotenv import load_dotenv

# Try to import Langfuse for LLM observability (optional)
# Falls back to standard OpenAI client if not available
# Langfuse requires proper configuration to work correctly
LANGFUSE_ENABLED = False
try:
# Check if required Langfuse environment variables are set
langfuse_public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY")

# Only enable Langfuse if both keys are configured
if langfuse_public_key and langfuse_secret_key:
from langfuse.openai import AsyncOpenAI # type: ignore[import-untyped]

LANGFUSE_ENABLED = True
logger.info("Langfuse observability enabled for OpenAI client")
else:
from openai import AsyncOpenAI

logger.debug(
"Langfuse environment variables not configured, using standard OpenAI client"
)
except ImportError:
from openai import AsyncOpenAI

logger.debug("Langfuse not available, using standard OpenAI client")
from openai import AsyncOpenAI

# use the .env that is inside the current folder
# allows to use different .env file for each lightrag instance
Expand Down Expand Up @@ -457,17 +433,24 @@ async def inner():
cot_active = False

# After streaming is complete, track token usage
if token_tracker and final_chunk_usage:
# Use actual usage from the API
if final_chunk_usage:
token_counts = {
"prompt_tokens": getattr(final_chunk_usage, "prompt_tokens", 0),
"completion_tokens": getattr(
final_chunk_usage, "completion_tokens", 0
),
"total_tokens": getattr(final_chunk_usage, "total_tokens", 0),
}
token_tracker.add_usage(token_counts)
if token_tracker:
token_tracker.add_usage(token_counts)
logger.debug(f"Streaming token usage (from API): {token_counts}")

from lightrag.tracing import is_tracing_enabled, report_token_usage
if is_tracing_enabled():
report_token_usage({
"input": token_counts["prompt_tokens"],
"output": token_counts["completion_tokens"],
})
elif token_tracker:
logger.debug("No usage information available in streaming response")
except Exception as e:
Expand Down Expand Up @@ -609,15 +592,23 @@ async def inner():
if r"\u" in final_content:
final_content = safe_unicode_decode(final_content.encode("utf-8"))

if token_tracker and hasattr(response, "usage"):
if hasattr(response, "usage") and response.usage is not None:
token_counts = {
"prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
"completion_tokens": getattr(
response.usage, "completion_tokens", 0
),
"total_tokens": getattr(response.usage, "total_tokens", 0),
}
token_tracker.add_usage(token_counts)
if token_tracker:
token_tracker.add_usage(token_counts)

from lightrag.tracing import is_tracing_enabled, report_token_usage
if is_tracing_enabled():
report_token_usage({
"input": token_counts["prompt_tokens"],
"output": token_counts["completion_tokens"],
})

logger.debug(f"Response content len: {len(final_content)}")
verbose_debug(f"Response: {response}")
Expand Down
Loading
Loading