From 4c72a6f7b0c1f7cc78e02f301db718e22ac1928e Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 22 May 2025 14:12:57 +0200 Subject: [PATCH 1/3] feat(core): add support for observation events --- langfuse/_client/client.py | 90 +++++++++++++++++++++++++++- langfuse/_client/span.py | 117 ++++++++++++++++++++++++++++++++----- 2 files changed, 190 insertions(+), 17 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index b2c82a30a..ddd9cd883 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -9,6 +9,7 @@ import urllib.parse from datetime import datetime from hashlib import sha256 +from time import time_ns from typing import Any, Dict, List, Literal, Optional, Union, cast, overload import backoff @@ -37,7 +38,11 @@ LANGFUSE_TRACING_ENVIRONMENT, ) from langfuse._client.resource_manager import LangfuseResourceManager -from langfuse._client.span import LangfuseGeneration, LangfuseSpan +from langfuse._client.span import ( + LangfuseEvent, + LangfuseGeneration, + LangfuseSpan, +) from langfuse._utils import _get_timestamp from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache @@ -945,6 +950,89 @@ def update_current_trace( public=public, ) + def create_event( + self, + *, + trace_context: Optional[TraceContext] = None, + name: str, + input: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Any] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + ) -> LangfuseEvent: + """Create a new Langfuse observation of type 'EVENT'. + + The created Langfuse Event observation will be the child of the current span in the context. + + Args: + trace_context: Optional context for connecting to an existing trace + name: Name of the span (e.g., function or operation name) + input: Input data for the operation (can be any JSON-serializable object) + output: Output data from the operation (can be any JSON-serializable object) + metadata: Additional metadata to associate with the span + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span + + Returns: + The Langfuse Event object + + Example: + ```python + event = langfuse.create_event(name="process-event") + ``` + """ + timestamp = time_ns() + attributes = create_span_attributes( + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, + ) + + if trace_context: + trace_id = trace_context.get("trace_id", None) + parent_span_id = trace_context.get("parent_span_id", None) + + if trace_id: + remote_parent_span = self._create_remote_parent_span( + trace_id=trace_id, parent_span_id=parent_span_id + ) + + with otel_trace_api.use_span( + cast(otel_trace_api.Span, remote_parent_span) + ): + otel_span = self._otel_tracer.start_span( + name=name, attributes=attributes, start_time=timestamp + ) + otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) + + return LangfuseEvent( + otel_span=otel_span, + langfuse_client=self, + input=input, + output=output, + metadata=metadata, + environment=self._environment, + ).end(end_time=timestamp) + + otel_span = self._otel_tracer.start_span( + name=name, attributes=attributes, start_time=timestamp + ) + + return LangfuseEvent( + otel_span=otel_span, + langfuse_client=self, + input=input, + output=output, + metadata=metadata, + environment=self._environment, + ).end(end_time=timestamp) + def _create_remote_parent_span( self, *, trace_id: str, parent_span_id: Optional[str] ): diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 99327d763..7f65358f8 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -13,8 +13,8 @@ and scoring integration specific to Langfuse's observability platform. """ -from abc import ABC, abstractmethod from datetime import datetime +from time import time_ns from typing import ( TYPE_CHECKING, Any, @@ -45,7 +45,7 @@ from langfuse.types import MapValue, ScoreDataType, SpanLevel -class LangfuseSpanWrapper(ABC): +class LangfuseSpanWrapper: """Abstract base class for all Langfuse span types. This class provides common functionality for all Langfuse span types, including @@ -64,7 +64,7 @@ def __init__( *, otel_span: otel_trace_api.Span, langfuse_client: "Langfuse", - as_type: Literal["span", "generation"], + as_type: Literal["span", "generation", "event"], input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, @@ -133,18 +133,6 @@ def end(self, *, end_time: Optional[int] = None): return self - @abstractmethod - def update(self, **kwargs) -> Union["LangfuseSpan", "LangfuseGeneration"]: - """Update the span with new information. - - Abstract method that must be implemented by subclasses to update - the span with new information during its lifecycle. - - Args: - **kwargs: Subclass-specific update parameters - """ - pass - def update_trace( self, *, @@ -352,7 +340,7 @@ def _set_processed_span_attributes( self, *, span: otel_trace_api.Span, - as_type: Optional[Literal["span", "generation"]] = None, + as_type: Optional[Literal["span", "generation", "event"]] = None, input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, @@ -934,6 +922,69 @@ def start_as_current_generation( ), ) + def create_event( + self, + *, + name: str, + input: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Any] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + ) -> "LangfuseEvent": + """Create a new Langfuse observation of type 'EVENT'. + + Args: + name: Name of the span (e.g., function or operation name) + input: Input data for the operation (can be any JSON-serializable object) + output: Output data from the operation (can be any JSON-serializable object) + metadata: Additional metadata to associate with the span + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span + + Returns: + The LangfuseEvent object + + Example: + ```python + event = langfuse.create_event(name="process-event") + ``` + """ + timestamp = time_ns() + attributes = create_span_attributes( + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, + ) + + with otel_trace_api.use_span(self._otel_span): + new_otel_span = self._langfuse_client._otel_tracer.start_span( + name=name, attributes=attributes, start_time=timestamp + ) + + if new_otel_span.is_recording: + self._set_processed_span_attributes( + span=new_otel_span, + as_type="event", + input=input, + output=output, + metadata=metadata, + ) + + return LangfuseEvent( + otel_span=new_otel_span, + langfuse_client=self._langfuse_client, + input=input, + output=output, + metadata=metadata, + environment=self._environment, + ).end(end_time=timestamp) + class LangfuseGeneration(LangfuseSpanWrapper): """Specialized span implementation for AI model generations in Langfuse. @@ -1069,3 +1120,37 @@ def update( self._otel_span.set_attributes(attributes=attributes) return self + + +class LangfuseEvent(LangfuseSpanWrapper): + """Specialized span implementation for Langfuse Events.""" + + def __init__( + self, + *, + otel_span: otel_trace_api.Span, + langfuse_client: "Langfuse", + input: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Any] = None, + environment: Optional[str] = None, + ): + """Initialize a new LangfuseEvent span. + + Args: + otel_span: The OpenTelemetry span to wrap + langfuse_client: Reference to the parent Langfuse client + input: Input data for the generation (e.g., prompts) + output: Output from the generation (e.g., completions) + metadata: Additional metadata to associate with the generation + environment: The tracing environment + """ + super().__init__( + otel_span=otel_span, + as_type="event", + langfuse_client=langfuse_client, + input=input, + output=output, + metadata=metadata, + environment=environment, + ) From 7f2be0e27c5827cae46a3fc7c17dbe56e5314c18 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 22 May 2025 16:26:08 +0200 Subject: [PATCH 2/3] chore: make create_trace_id static --- langfuse/_client/client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index ddd9cd883..d262f5047 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -1120,7 +1120,8 @@ def _create_observation_id(self, *, seed: Optional[str] = None) -> str: return sha256(seed.encode("utf-8")).digest()[:8].hex() - def create_trace_id(self, *, seed: Optional[str] = None) -> str: + @staticmethod + def create_trace_id(*, seed: Optional[str] = None) -> str: """Create a unique trace ID for use with Langfuse. This method generates a unique trace ID for use with various Langfuse APIs. @@ -1164,7 +1165,7 @@ def create_trace_id(self, *, seed: Optional[str] = None) -> str: if not seed: trace_id_int = RandomIdGenerator().generate_trace_id() - return self._format_otel_trace_id(trace_id_int) + return Langfuse._format_otel_trace_id(trace_id_int) return sha256(seed.encode("utf-8")).digest()[:16].hex() @@ -1178,7 +1179,8 @@ def _get_otel_span_id(self, otel_span: otel_trace_api.Span): return self._format_otel_span_id(span_context.span_id) - def _format_otel_span_id(self, span_id_int: int) -> str: + @staticmethod + def _format_otel_span_id(span_id_int: int) -> str: """Format an integer span ID to a 16-character lowercase hex string. Internal method to convert an OpenTelemetry integer span ID to the standard @@ -1192,7 +1194,8 @@ def _format_otel_span_id(self, span_id_int: int) -> str: """ return format(span_id_int, "016x") - def _format_otel_trace_id(self, trace_id_int: int) -> str: + @staticmethod + def _format_otel_trace_id(trace_id_int: int) -> str: """Format an integer trace ID to a 32-character lowercase hex string. Internal method to convert an OpenTelemetry integer trace ID to the standard @@ -1482,9 +1485,6 @@ def score_current_trace( config_id=config_id, ) - def update_finished_trace(self): - pass - def flush(self): """Force flush all pending spans and events to the Langfuse API. From 16409248bec28f705adfd317622afa8675b44eff Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 22 May 2025 18:01:22 +0200 Subject: [PATCH 3/3] Update langfuse/_client/span.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- langfuse/_client/span.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 7f65358f8..39d5c62eb 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -1140,8 +1140,8 @@ def __init__( Args: otel_span: The OpenTelemetry span to wrap langfuse_client: Reference to the parent Langfuse client - input: Input data for the generation (e.g., prompts) - output: Output from the generation (e.g., completions) + input: Input data for the event + output: Output from the event metadata: Additional metadata to associate with the generation environment: The tracing environment """