Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
PaginatedDatasetRuns,
)
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
from langfuse.api.resources.ingestion.types.trace_body import TraceBody
from langfuse.api.resources.prompts.types import (
CreatePromptRequest_Chat,
CreatePromptRequest_Text,
Expand Down Expand Up @@ -2098,6 +2099,43 @@ def create_score(
f"Error creating score: Failed to process score event for trace_id={trace_id}, name={name}. Error: {e}"
)

def _create_trace_tags_via_ingestion(
self,
*,
trace_id: str,
tags: List[str],
) -> None:
"""Private helper to enqueue trace tag updates via ingestion API events."""
if not self._tracing_enabled:
return

if len(tags) == 0:
return

try:
new_body = TraceBody(
id=trace_id,
tags=tags,
)

event = {
"id": self.create_trace_id(),
"type": "trace-create",
"timestamp": _get_timestamp(),
"body": new_body,
}

if self._resources is not None:
self._resources.add_trace_task(
event,
trace_id=trace_id,
force_sample=True,
)
except Exception as e:
langfuse_logger.exception(
f"Error updating trace tags: Failed to process trace update event for trace_id={trace_id}. Error: {e}"
)

@overload
def score_current_span(
self,
Expand Down Expand Up @@ -3115,8 +3153,10 @@ def run_batched_evaluation(
max_retries: int = 3,
evaluators: List[EvaluatorFunction],
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
max_concurrency: int = 50,
max_concurrency: int = 5,
metadata: Optional[Dict[str, Any]] = None,
_add_observation_scores_to_trace: bool = False,
_additional_trace_tags: Optional[List[str]] = None,
resume_from: Optional[BatchEvaluationResumeToken] = None,
verbose: bool = False,
) -> BatchEvaluationResult:
Expand Down Expand Up @@ -3158,7 +3198,7 @@ def run_batched_evaluation(
items matching the filter. Useful for testing or limiting evaluation runs.
Default: None (process all).
max_concurrency: Maximum number of items to evaluate concurrently. Controls
parallelism and resource usage. Default: 50.
parallelism and resource usage. Default: 5.
composite_evaluator: Optional function that creates a composite score from
item-level evaluations. Receives the original item and its evaluations,
returns a single Evaluation. Useful for weighted averages or combined metrics.
Expand Down Expand Up @@ -3327,6 +3367,8 @@ def composite_evaluator(*, item, evaluations):
max_concurrency=max_concurrency,
composite_evaluator=composite_evaluator,
metadata=metadata,
_add_observation_scores_to_trace=_add_observation_scores_to_trace,
_additional_trace_tags=_additional_trace_tags,
max_retries=max_retries,
verbose=verbose,
resume_from=resume_from,
Expand Down
99 changes: 71 additions & 28 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,37 +306,49 @@ def reset(cls) -> None:

cls._instances.clear()

def _enqueue_ingestion_task(
self,
*,
event: dict,
trace_id: Optional[str],
sampling_name: str = "ingestion",
force_sample: bool = False,
) -> None:
"""Enqueue ingestion event with trace sampling aligned to the OTel sampler."""
# Sample ingestion events with the same sampler that is used for tracing
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
should_sample = (
force_sample
or isinstance(
tracer_provider, otel_trace_api.ProxyTracerProvider
) # default to in-sample if otel sampler is not available
or (
tracer_provider.sampler.should_sample(
parent_context=None,
trace_id=int(trace_id, 16),
name=sampling_name,
).decision
== Decision.RECORD_AND_SAMPLE
if trace_id is not None
else True
)
)

if should_sample:
self._score_ingestion_queue.put(event, block=False)

def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:
try:
# Sample scores with the same sampler that is used for tracing
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
should_sample = (
force_sample
or isinstance(
tracer_provider, otel_trace_api.ProxyTracerProvider
) # default to in-sample if otel sampler is not available
or (
(
tracer_provider.sampler.should_sample(
parent_context=None,
trace_id=int(event["body"].trace_id, 16),
name="score",
).decision
== Decision.RECORD_AND_SAMPLE
if hasattr(event["body"], "trace_id")
else True
)
if event["body"].trace_id
is not None # do not sample out session / dataset run scores
else True
)
trace_id = event["body"].trace_id
langfuse_logger.debug(
f"Score: Enqueuing event type={event['type']} for trace_id={trace_id} name={event['body'].name} value={event['body'].value}"
)
self._enqueue_ingestion_task(
event=event,
trace_id=trace_id,
sampling_name="score",
force_sample=force_sample,
)

if should_sample:
langfuse_logger.debug(
f"Score: Enqueuing event type={event['type']} for trace_id={event['body'].trace_id} name={event['body'].name} value={event['body'].value}"
)
self._score_ingestion_queue.put(event, block=False)

except Full:
langfuse_logger.warning(
Expand All @@ -351,6 +363,37 @@ def add_score_task(self, event: dict, *, force_sample: bool = False) -> None:

return

def add_trace_task(
self,
event: dict,
*,
trace_id: Optional[str],
force_sample: bool = False,
) -> None:
try:
langfuse_logger.debug(
f"Trace: Enqueuing event type={event['type']} for trace_id={trace_id}"
)
self._enqueue_ingestion_task(
event=event,
trace_id=trace_id,
sampling_name="trace",
force_sample=force_sample,
)

except Full:
langfuse_logger.warning(
"System overload: Trace ingestion queue has reached capacity (100,000 items). Trace update will be dropped. Consider increasing flush frequency or decreasing event volume."
)

return
except Exception as e:
langfuse_logger.error(
f"Unexpected error: Failed to process trace event. The trace update will be dropped. Error details: {e}"
)

return

@property
def tracer(self) -> Optional[Tracer]:
return self._otel_tracer
Expand Down
Loading
Loading