Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
from langfuse.api.resources.commons.errors.error import Error
from langfuse.api.resources.commons.errors.not_found_error import NotFoundError
from langfuse.api.resources.commons.types import DatasetRunWithItems
from langfuse.api.resources.datasets.types import (
DeleteDatasetRunResponse,
PaginatedDatasetRuns,
)
from langfuse.api.resources.commons.errors.not_found_error import NotFoundError
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
from langfuse.api.resources.prompts.types import (
CreatePromptRequest_Chat,
Expand Down Expand Up @@ -3081,6 +3081,7 @@ def run_batched_evaluation(
mapper: MapperFunction,
filter: Optional[str] = None,
fetch_batch_size: int = 50,
fetch_trace_fields: Optional[str] = None,
max_items: Optional[int] = None,
max_retries: int = 3,
evaluators: List[EvaluatorFunction],
Expand Down Expand Up @@ -3123,6 +3124,7 @@ def run_batched_evaluation(
Default: None (fetches all items).
fetch_batch_size: Number of items to fetch per API call and hold in memory.
Larger values may be faster but use more memory. Default: 50.
fetch_trace_fields: Comma-separated list of fields to include in the when fetching traces. Available field groups: 'core' (always included), 'io' (input, output, metadata), 'scores', 'observations', 'metrics'. If not specified, all fields are returned. Example: 'core,scores,metrics'. Note: Excluded 'observations' or 'scores' fields return empty arrays; excluded 'metrics' returns -1 for 'totalCost' and 'latency'. Only relevant if scope is 'traces'.
Comment thread
hassiebp marked this conversation as resolved.
Outdated
max_items: Maximum total number of items to process. If None, processes all
items matching the filter. Useful for testing or limiting evaluation runs.
Default: None (process all).
Expand Down Expand Up @@ -3291,6 +3293,7 @@ def composite_evaluator(*, item, evaluations):
evaluators=evaluators,
filter=filter,
fetch_batch_size=fetch_batch_size,
fetch_trace_fields=fetch_trace_fields,
max_items=max_items,
max_concurrency=max_concurrency,
composite_evaluator=composite_evaluator,
Expand Down
6 changes: 6 additions & 0 deletions langfuse/batch_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ async def run_async(
evaluators: List[EvaluatorFunction],
filter: Optional[str] = None,
fetch_batch_size: int = 50,
fetch_trace_fields: Optional[str] = None,
max_items: Optional[int] = None,
max_concurrency: int = 50,
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
Expand All @@ -866,6 +867,7 @@ async def run_async(
evaluators: List of evaluation functions to run on each item.
filter: JSON filter string for querying items.
fetch_batch_size: Number of items to fetch per API call.
fetch_trace_fields: Comma-separated list of fields to include in the when fetching traces. Available field groups: 'core' (always included), 'io' (input, output, metadata), 'scores', 'observations', 'metrics'. If not specified, all fields are returned. Example: 'core,scores,metrics'. Note: Excluded 'observations' or 'scores' fields return empty arrays; excluded 'metrics' returns -1 for 'totalCost' and 'latency'. Only relevant if scope is 'traces'.
max_items: Maximum number of items to process (None = all).
max_concurrency: Maximum number of concurrent evaluations.
composite_evaluator: Optional function to create composite scores.
Expand Down Expand Up @@ -935,6 +937,7 @@ async def run_async(
page=page,
limit=fetch_batch_size,
max_retries=max_retries,
fields=fetch_trace_fields,
)
except Exception as e:
# Failed after max_retries - create resume token and return
Expand Down Expand Up @@ -1114,6 +1117,7 @@ async def _fetch_batch_with_retry(
page: int,
limit: int,
max_retries: int,
fields: Optional[str],
) -> List[Union[TraceWithFullDetails, ObservationsView]]:
"""Fetch a batch of items with retry logic.

Expand All @@ -1124,6 +1128,7 @@ async def _fetch_batch_with_retry(
limit: Number of items per page.
max_retries: Maximum number of retry attempts.
verbose: Whether to log retry attempts.
fields: Trace fields to fetch

Returns:
List of items from the API.
Expand All @@ -1137,6 +1142,7 @@ async def _fetch_batch_with_retry(
limit=limit,
filter=filter,
request_options={"max_retries": max_retries},
fields=fields,
) # type: ignore
return list(response.data) # type: ignore
elif scope == "observations":
Expand Down
Loading