Skip to content
14 changes: 14 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
""".. include:: ../README.md"""

from langfuse.batch_evaluation import (
BatchEvaluationResult,
BatchEvaluationResumeToken,
CompositeEvaluatorFunction,
EvaluatorInputs,
EvaluatorStats,
MapperFunction,
)
from langfuse.experiment import Evaluation

from ._client import client as _client_module
Expand Down Expand Up @@ -41,6 +49,12 @@
"LangfuseRetriever",
"LangfuseGuardrail",
"Evaluation",
"EvaluatorInputs",
"MapperFunction",
"CompositeEvaluatorFunction",
"EvaluatorStats",
"BatchEvaluationResumeToken",
"BatchEvaluationResult",
"experiment",
"api",
]
236 changes: 236 additions & 0 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
Prompt_Chat,
Prompt_Text,
)
from langfuse.batch_evaluation import (
BatchEvaluationResult,
BatchEvaluationResumeToken,
CompositeEvaluatorFunction,
MapperFunction,
)
from langfuse.experiment import (
Evaluation,
EvaluatorFunction,
Expand Down Expand Up @@ -2919,6 +2925,236 @@ def _create_experiment_run_name(

return f"{name} - {iso_timestamp}"

def run_batched_evaluation(
self,
*,
scope: Literal["traces", "observations", "sessions"],
mapper: MapperFunction,
evaluators: List[EvaluatorFunction],
filter: Optional[str] = None,
fetch_batch_size: int = 50,
max_items: Optional[int] = None,
max_concurrency: int = 50,
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
metadata: Optional[Dict[str, Any]] = None,
max_retries: int = 3,
verbose: bool = False,
resume_from: Optional[BatchEvaluationResumeToken] = None,
) -> BatchEvaluationResult:
"""Fetch traces, observations, or sessions and run evaluations on each item.

This method provides a powerful way to evaluate existing data in Langfuse at scale.
It fetches items based on filters, transforms them using a mapper function, runs
evaluators on each item, and creates scores that are linked back to the original
entities. This is ideal for:

- Running evaluations on production traces after deployment
- Backtesting new evaluation metrics on historical data
- Batch scoring of observations for quality monitoring
- Periodic evaluation runs on recent data

The method uses a streaming/pipeline approach to process items in batches, making
it memory-efficient for large datasets. It includes comprehensive error handling,
retry logic, and resume capability for long-running evaluations.

Args:
scope: The type of items to evaluate. Must be one of:
- "traces": Evaluate complete traces with all their observations
- "observations": Evaluate individual observations (spans, generations, events)
- "sessions": Evaluate entire sessions with multiple traces
mapper: Function that transforms API response objects into evaluator inputs.
Receives a trace/observation/session object and returns an EvaluatorInputs
instance with input, output, expected_output, and metadata fields.
Can be sync or async.
evaluators: List of evaluation functions to run on each item. Each evaluator
receives the mapped inputs and returns Evaluation object(s). Evaluator
failures are logged but don't stop the batch evaluation.
filter: Optional JSON filter string for querying items (same format as Langfuse API). Examples:
- '{"tags": ["production"]}'
- '{"user_id": "user123", "timestamp": {"operator": ">", "value": "2024-01-01"}}'
Default: None (fetches all items).
fetch_batch_size: Number of items to fetch per API call and hold in memory.
Larger values may be faster but use more memory. Default: 50.
max_items: Maximum total number of items to process. If None, processes all
items matching the filter. Useful for testing or limiting evaluation runs.
Default: None (process all).
max_concurrency: Maximum number of items to evaluate concurrently. Controls
parallelism and resource usage. Default: 50.
composite_evaluator: Optional function that creates a composite score from
item-level evaluations. Receives the original item and its evaluations,
returns a single Evaluation. Useful for weighted averages or combined metrics.
Default: None.
metadata: Optional metadata dict to add to all created scores. Useful for
tracking evaluation runs, versions, or other context. Default: None.
max_retries: Maximum number of retry attempts for failed batch fetches.
Uses exponential backoff (1s, 2s, 4s). Default: 3.
verbose: If True, logs progress information to console. Useful for monitoring
long-running evaluations. Default: False.
resume_from: Optional resume token from a previous incomplete run. Allows
continuing evaluation after interruption or failure. Default: None.


Returns:
BatchEvaluationResult containing:
- total_items_fetched: Number of items fetched from API
- total_items_processed: Number of items successfully evaluated
- total_items_failed: Number of items that failed evaluation
- total_scores_created: Scores created by item-level evaluators
- total_composite_scores_created: Scores created by composite evaluator
- total_evaluations_failed: Individual evaluator failures
- evaluator_stats: Per-evaluator statistics (success rate, scores created)
- resume_token: Token for resuming if incomplete (None if completed)
- completed: True if all items processed
- duration_seconds: Total execution time
- failed_item_ids: IDs of items that failed
- error_summary: Error types and counts
- has_more_items: True if max_items reached but more exist

Raises:
ValueError: If invalid scope is provided.

Examples:
Basic trace evaluation:
```python
from langfuse import Langfuse, EvaluatorInputs, Evaluation

client = Langfuse()

# Define mapper to extract fields from traces
def trace_mapper(trace):
return EvaluatorInputs(
input=trace.input,
output=trace.output,
expected_output=None,
metadata={"trace_id": trace.id}
)

# Define evaluator
def length_evaluator(*, input, output, expected_output, metadata):
return Evaluation(
name="output_length",
value=len(output) if output else 0
)

# Run batch evaluation
result = client.run_batched_evaluation(
scope="traces",
mapper=trace_mapper,
evaluators=[length_evaluator],
filter='{"tags": ["production"]}',
max_items=1000,
verbose=True
)

print(f"Processed {result.total_items_processed} traces")
print(f"Created {result.total_scores_created} scores")
```

Evaluation with composite scorer:
```python
def accuracy_evaluator(*, input, output, expected_output, metadata):
# ... evaluation logic
return Evaluation(name="accuracy", value=0.85)

def relevance_evaluator(*, input, output, expected_output, metadata):
# ... evaluation logic
return Evaluation(name="relevance", value=0.92)

def composite_evaluator(*, item, evaluations):
# Weighted average of evaluations
weights = {"accuracy": 0.6, "relevance": 0.4}
total = sum(
e.value * weights.get(e.name, 0)
for e in evaluations
if isinstance(e.value, (int, float))
)
return Evaluation(
name="composite_score",
value=total,
comment=f"Weighted average of {len(evaluations)} metrics"
)

result = client.run_batched_evaluation(
scope="traces",
mapper=trace_mapper,
evaluators=[accuracy_evaluator, relevance_evaluator],
composite_evaluator=composite_evaluator,
filter='{"user_id": "important_user"}',
verbose=True
)
```

Handling incomplete runs with resume:
```python
# Initial run that may fail or timeout
result = client.run_batched_evaluation(
scope="observations",
mapper=obs_mapper,
evaluators=[my_evaluator],
max_items=10000,
verbose=True
)

# Check if incomplete
if not result.completed and result.resume_token:
print(f"Processed {result.resume_token.items_processed} items before interruption")

# Resume from where it left off
result = client.run_batched_evaluation(
scope="observations",
mapper=obs_mapper,
evaluators=[my_evaluator],
resume_from=result.resume_token,
verbose=True
)

print(f"Total items processed: {result.total_items_processed}")
```

Monitoring evaluator performance:
```python
result = client.run_batched_evaluation(...)

for stats in result.evaluator_stats:
success_rate = stats.successful_runs / stats.total_runs
print(f"{stats.name}:")
print(f" Success rate: {success_rate:.1%}")
print(f" Scores created: {stats.total_scores_created}")

if stats.failed_runs > 0:
print(f" ⚠️ Failed {stats.failed_runs} times")
```

Note:
- Evaluator failures are logged but don't stop the batch evaluation
- Individual item failures are tracked but don't stop processing
- Fetch failures are retried with exponential backoff
- All scores are automatically flushed to Langfuse at the end
- The resume mechanism uses timestamp-based filtering to avoid duplicates
"""
from langfuse.batch_evaluation import BatchEvaluationRunner

runner = BatchEvaluationRunner(self)
return cast(
BatchEvaluationResult,
run_async_safely(
runner.run_async(
scope=scope,
mapper=mapper,
evaluators=evaluators,
filter=filter,
fetch_batch_size=fetch_batch_size,
max_items=max_items,
max_concurrency=max_concurrency,
composite_evaluator=composite_evaluator,
metadata=metadata,
max_retries=max_retries,
verbose=verbose,
resume_from=resume_from,
)
),
)

def auth_check(self) -> bool:
"""Check if the provided credentials (public and secret key) are valid.

Expand Down
Loading
Loading