Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 96 additions & 8 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import urllib.parse
from datetime import datetime
from hashlib import sha256
from time import time_ns
from typing import Any, Dict, List, Literal, Optional, Union, cast, overload

import backoff
Expand Down Expand Up @@ -37,7 +38,11 @@
LANGFUSE_TRACING_ENVIRONMENT,
)
from langfuse._client.resource_manager import LangfuseResourceManager
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
from langfuse._client.span import (
LangfuseEvent,
LangfuseGeneration,
LangfuseSpan,
)
from langfuse._utils import _get_timestamp
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
Expand Down Expand Up @@ -945,6 +950,89 @@ def update_current_trace(
public=public,
)

def create_event(
self,
*,
trace_context: Optional[TraceContext] = None,
name: str,
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
version: Optional[str] = None,
level: Optional[SpanLevel] = None,
status_message: Optional[str] = None,
) -> LangfuseEvent:
"""Create a new Langfuse observation of type 'EVENT'.

The created Langfuse Event observation will be the child of the current span in the context.

Args:
trace_context: Optional context for connecting to an existing trace
name: Name of the span (e.g., function or operation name)
input: Input data for the operation (can be any JSON-serializable object)
output: Output data from the operation (can be any JSON-serializable object)
metadata: Additional metadata to associate with the span
version: Version identifier for the code or component
level: Importance level of the span (info, warning, error)
status_message: Optional status message for the span

Returns:
The Langfuse Event object

Example:
```python
event = langfuse.create_event(name="process-event")
```
"""
timestamp = time_ns()
attributes = create_span_attributes(
input=input,
output=output,
metadata=metadata,
version=version,
level=level,
status_message=status_message,
)

if trace_context:
trace_id = trace_context.get("trace_id", None)
parent_span_id = trace_context.get("parent_span_id", None)

if trace_id:
remote_parent_span = self._create_remote_parent_span(
trace_id=trace_id, parent_span_id=parent_span_id
)

with otel_trace_api.use_span(
cast(otel_trace_api.Span, remote_parent_span)
):
otel_span = self._otel_tracer.start_span(
name=name, attributes=attributes, start_time=timestamp
)
otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True)

return LangfuseEvent(
otel_span=otel_span,
langfuse_client=self,
input=input,
output=output,
metadata=metadata,
environment=self._environment,
).end(end_time=timestamp)

otel_span = self._otel_tracer.start_span(
name=name, attributes=attributes, start_time=timestamp
)

return LangfuseEvent(
otel_span=otel_span,
langfuse_client=self,
input=input,
output=output,
metadata=metadata,
environment=self._environment,
).end(end_time=timestamp)

def _create_remote_parent_span(
self, *, trace_id: str, parent_span_id: Optional[str]
):
Expand Down Expand Up @@ -1032,7 +1120,8 @@ def _create_observation_id(self, *, seed: Optional[str] = None) -> str:

return sha256(seed.encode("utf-8")).digest()[:8].hex()

def create_trace_id(self, *, seed: Optional[str] = None) -> str:
@staticmethod
def create_trace_id(*, seed: Optional[str] = None) -> str:
"""Create a unique trace ID for use with Langfuse.

This method generates a unique trace ID for use with various Langfuse APIs.
Expand Down Expand Up @@ -1076,7 +1165,7 @@ def create_trace_id(self, *, seed: Optional[str] = None) -> str:
if not seed:
trace_id_int = RandomIdGenerator().generate_trace_id()

return self._format_otel_trace_id(trace_id_int)
return Langfuse._format_otel_trace_id(trace_id_int)

return sha256(seed.encode("utf-8")).digest()[:16].hex()

Expand All @@ -1090,7 +1179,8 @@ def _get_otel_span_id(self, otel_span: otel_trace_api.Span):

return self._format_otel_span_id(span_context.span_id)

def _format_otel_span_id(self, span_id_int: int) -> str:
@staticmethod
def _format_otel_span_id(span_id_int: int) -> str:
"""Format an integer span ID to a 16-character lowercase hex string.

Internal method to convert an OpenTelemetry integer span ID to the standard
Expand All @@ -1104,7 +1194,8 @@ def _format_otel_span_id(self, span_id_int: int) -> str:
"""
return format(span_id_int, "016x")

def _format_otel_trace_id(self, trace_id_int: int) -> str:
@staticmethod
def _format_otel_trace_id(trace_id_int: int) -> str:
"""Format an integer trace ID to a 32-character lowercase hex string.

Internal method to convert an OpenTelemetry integer trace ID to the standard
Expand Down Expand Up @@ -1394,9 +1485,6 @@ def score_current_trace(
config_id=config_id,
)

def update_finished_trace(self):
pass

def flush(self):
"""Force flush all pending spans and events to the Langfuse API.

Expand Down
117 changes: 101 additions & 16 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
and scoring integration specific to Langfuse's observability platform.
"""

from abc import ABC, abstractmethod
from datetime import datetime
from time import time_ns
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -45,7 +45,7 @@
from langfuse.types import MapValue, ScoreDataType, SpanLevel


class LangfuseSpanWrapper(ABC):
class LangfuseSpanWrapper:
"""Abstract base class for all Langfuse span types.

This class provides common functionality for all Langfuse span types, including
Expand All @@ -64,7 +64,7 @@ def __init__(
*,
otel_span: otel_trace_api.Span,
langfuse_client: "Langfuse",
as_type: Literal["span", "generation"],
as_type: Literal["span", "generation", "event"],
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
Expand Down Expand Up @@ -133,18 +133,6 @@ def end(self, *, end_time: Optional[int] = None):

return self

@abstractmethod
def update(self, **kwargs) -> Union["LangfuseSpan", "LangfuseGeneration"]:
"""Update the span with new information.

Abstract method that must be implemented by subclasses to update
the span with new information during its lifecycle.

Args:
**kwargs: Subclass-specific update parameters
"""
pass

def update_trace(
self,
*,
Expand Down Expand Up @@ -352,7 +340,7 @@ def _set_processed_span_attributes(
self,
*,
span: otel_trace_api.Span,
as_type: Optional[Literal["span", "generation"]] = None,
as_type: Optional[Literal["span", "generation", "event"]] = None,
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
Expand Down Expand Up @@ -934,6 +922,69 @@ def start_as_current_generation(
),
)

def create_event(
self,
*,
name: str,
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
version: Optional[str] = None,
level: Optional[SpanLevel] = None,
status_message: Optional[str] = None,
) -> "LangfuseEvent":
"""Create a new Langfuse observation of type 'EVENT'.

Args:
name: Name of the span (e.g., function or operation name)
input: Input data for the operation (can be any JSON-serializable object)
output: Output data from the operation (can be any JSON-serializable object)
metadata: Additional metadata to associate with the span
version: Version identifier for the code or component
level: Importance level of the span (info, warning, error)
status_message: Optional status message for the span

Returns:
The LangfuseEvent object

Example:
```python
event = langfuse.create_event(name="process-event")
```
"""
timestamp = time_ns()
attributes = create_span_attributes(
input=input,
output=output,
metadata=metadata,
version=version,
level=level,
status_message=status_message,
)

with otel_trace_api.use_span(self._otel_span):
new_otel_span = self._langfuse_client._otel_tracer.start_span(
name=name, attributes=attributes, start_time=timestamp
)

if new_otel_span.is_recording:
self._set_processed_span_attributes(
span=new_otel_span,
as_type="event",
input=input,
output=output,
metadata=metadata,
)

return LangfuseEvent(
otel_span=new_otel_span,
langfuse_client=self._langfuse_client,
input=input,
output=output,
metadata=metadata,
environment=self._environment,
).end(end_time=timestamp)


class LangfuseGeneration(LangfuseSpanWrapper):
"""Specialized span implementation for AI model generations in Langfuse.
Expand Down Expand Up @@ -1069,3 +1120,37 @@ def update(
self._otel_span.set_attributes(attributes=attributes)

return self


class LangfuseEvent(LangfuseSpanWrapper):
"""Specialized span implementation for Langfuse Events."""

def __init__(
self,
*,
otel_span: otel_trace_api.Span,
langfuse_client: "Langfuse",
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
environment: Optional[str] = None,
):
"""Initialize a new LangfuseEvent span.

Args:
otel_span: The OpenTelemetry span to wrap
langfuse_client: Reference to the parent Langfuse client
input: Input data for the generation (e.g., prompts)
output: Output from the generation (e.g., completions)
Comment thread
hassiebp marked this conversation as resolved.
Outdated
metadata: Additional metadata to associate with the generation
environment: The tracing environment
"""
super().__init__(
otel_span=otel_span,
as_type="event",
langfuse_client=langfuse_client,
input=input,
output=output,
metadata=metadata,
environment=environment,
)