Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 122 additions & 5 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
Union,
cast,
overload,
Generator,
)

import backoff
import httpx
from opentelemetry import trace
from opentelemetry import trace as otel_trace_api
from opentelemetry import (
baggage as otel_baggage_api,
trace as otel_trace_api,
context as otel_context_api,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.util._decorator import (
Expand All @@ -44,6 +48,9 @@
ObservationTypeLiteralNoEvent,
ObservationTypeSpanLike,
get_observation_types_list,
LANGFUSE_CTX_USER_ID,
LANGFUSE_CTX_SESSION_ID,
LANGFUSE_CTX_METADATA,
)
from langfuse._client.datasets import DatasetClient, DatasetItemClient
from langfuse._client.environment_variables import (
Expand Down Expand Up @@ -189,6 +196,7 @@ class Langfuse:
_resources: Optional[LangfuseResourceManager] = None
_mask: Optional[MaskFunction] = None
_otel_tracer: otel_trace_api.Tracer
_host: str

def __init__(
self,
Expand All @@ -211,8 +219,10 @@ def __init__(
additional_headers: Optional[Dict[str, str]] = None,
tracer_provider: Optional[TracerProvider] = None,
):
self._host = host or cast(
str, os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
self._host = (
host
if host is not None
else os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com")
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original version is more readableand to my understanding they do the same. If this is true, should we revert this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, they are both equivalent

self._environment = environment or cast(
str, os.environ.get(LANGFUSE_TRACING_ENVIRONMENT)
Expand Down Expand Up @@ -350,6 +360,108 @@ def start_span(
status_message=status_message,
)

@_agnosticcontextmanager
def with_attributes(
self,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[dict[str, str]] = None,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we highlight somehow that this metadata is only to be used if it should be set on all children? I'm wondering whether people understand and make use of the difference for setting metadata on current span vs all spans?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with the sentiment! Options:

  • with_trace_attributes
  • with_context_attributes
  • with_propagated_attributes

...
Lets quickly talk sync about this. Nailing this is key.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think about this from the perspective on how this is consumed, namely via context manager. so the 'with' is already there.

from langfuse import get_client

langfuse = get_client()

...

with langfuse.correlation_context(session_id='my-session'):
    # do stuff
    pass

I prefer correlation_context or set_correlation_context as it makes sure that

  • this is not necessarily related to a 'trace'
  • this has some propagation included ('context')
  • is used for aggregating / correlating multiple spans ('correlation')

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • propagation_context
  • propagated_context
  • distributed_context
  • trace_scope
  • trace_context
  • shared_attributes

as_baggage: bool = False,
) -> Generator[None, None, None]:
"""Creates a context manager that propagates the given attributes to all spans created within the context.

Args:
session_id (str): Session identifier.
user_id (str): User identifier.
metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters.
as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage
for cross-service propagation. If False, stores only in local context for
current-service propagation. Defaults to False.
Copy link
Copy Markdown
Member

@maxdeichmann maxdeichmann Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a sentence here that this will be added to the headers of all outgoing HTTP requests. I would add the warning from below up here as not everyone is reading everything.


Returns:
Context manager that sets values on all spans created within its scope.

Warning:
When as_baggage=True, the values will be included in HTTP headers of any
outbound requests made within this context. Only use this for non-sensitive
identifiers that are safe to transmit across service boundaries.

Example:
```python
# Local context only (default)
with langfuse.with_attributes(session_id="session_123"):
with langfuse.start_as_current_span(name="process-request") as span:
# This span and all its children will have session_id="session_123"
child_span = langfuse.start_span(name="child-operation")

# Cross-service propagation (use with caution)
with langfuse.with_attributes(session_id="session_123", as_baggage=True):
# session_id will be propagated to external service calls
response = requests.get("https://api.example.com/data")
```
"""
current_context = otel_context_api.get_current()
current_span = otel_trace_api.get_current_span()

# Process session_id
if session_id is not None:
current_context = otel_context_api.set_value(
LANGFUSE_CTX_SESSION_ID, session_id, current_context
)
if current_span is not None and current_span.is_recording():
current_span.set_attribute("session.id", session_id)
if as_baggage:
current_context = otel_baggage_api.set_baggage(
"session.id", session_id, current_context
)

# Process user_id
if user_id is not None:
current_context = otel_context_api.set_value(
LANGFUSE_CTX_USER_ID, user_id, current_context
)
if current_span is not None and current_span.is_recording():
current_span.set_attribute("user.id", user_id)
if as_baggage:
current_context = otel_baggage_api.set_baggage(
"user.id", user_id, current_context
)

# Process metadata
if metadata is not None:
# Truncate values with size > 200 to 200 characters and emit warning including the ky
for k, v in metadata.items():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have an overall limit fo e.g. 4k characters?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxdeichmann IMO that would be complicated to enforce and eventually, this is an issue that will surface quickly on the client side. I would highlight it in the documentation, but don't complicate the happy path further.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the risk is here that our users run into failing APIs as the header is too large to be processed. I thought we could add a simple logic such as: keeping a count of added characters to the header. If threshhold exceeded, stop adding key/values to the metadata.

if not isinstance(v, str):
# Ignore unreachable mypy warning as this runtime guard should make sense either way
warnings.warn( # type: ignore[unreachable]
f"Metadata values must be strings, got {type(v)} for key '{k}'"
)
del metadata[k]
if len(v) > 200:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ran into an error here in production where v is None and this length check failed.

  File "/app/documents_parser/telemetry/middleware.py", line 41, in dispatch
    with client.with_attributes(
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/opentelemetry/util/_decorator.py", line 61, in __enter__
    return next(self.gen)  # type: ignore
           ^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/langfuse/_client/client.py", line 440, in with_attributes
    if len(v) > 200:
       ^^^^^^
TypeError: object of type 'NoneType' has no len()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bug on our side, but I think the package should probably warn and fail gracefully.

warnings.warn(
f"Metadata value for key '{k}' exceeds 200 characters and will be truncated."
)
metadata[k] = v[:200]

current_context = otel_context_api.set_value(
LANGFUSE_CTX_METADATA, metadata, current_context
)
if current_span is not None and current_span.is_recording():
for k, v in metadata.items():
current_span.set_attribute(f"langfuse.metadata.{k}", v)
if as_baggage:
for k, v in metadata.items():
current_context = otel_baggage_api.set_baggage(
f"langfuse.metadata.{k}", str(v), current_context
)

# Activate context, execute, and detach context
token = otel_context_api.attach(current_context)
try:
yield
finally:
otel_context_api.detach(token)

def start_as_current_span(
self,
*,
Expand Down Expand Up @@ -1667,6 +1779,11 @@ def update_current_trace(
span.update(output=response)
```
"""
warnings.warn(
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ",
DeprecationWarning,
stacklevel=2,
)
if not self._tracing_enabled:
langfuse_logger.debug(
"Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode."
Expand Down Expand Up @@ -1811,7 +1928,7 @@ def _create_remote_parent_span(
is_remote=False,
)

return trace.NonRecordingSpan(span_context)
return otel_trace_api.NonRecordingSpan(span_context)

def _is_valid_trace_id(self, trace_id: str) -> bool:
pattern = r"^[0-9a-f]{32}$"
Expand Down
5 changes: 5 additions & 0 deletions langfuse/_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

LANGFUSE_TRACER_NAME = "langfuse-sdk"

# Context key constants for Langfuse context propagation
LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id"
LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id"
LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata"


"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""
ObservationTypeGenerationLike: TypeAlias = Literal[
Expand Down
5 changes: 5 additions & 0 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ def update_trace(
tags: List of tags to categorize the trace
public: Whether the trace should be publicly accessible
"""
warnings.warn(
Comment thread
Steffen911 marked this conversation as resolved.
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ",
DeprecationWarning,
stacklevel=2,
)
if not self._otel_span.is_recording():
return self

Expand Down
95 changes: 93 additions & 2 deletions langfuse/_client/span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
"""

import base64
import json
import os
from typing import Dict, List, Optional

from opentelemetry import baggage, context as context_api
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from langfuse._client.constants import LANGFUSE_TRACER_NAME
from langfuse._client.constants import (
LANGFUSE_TRACER_NAME,
LANGFUSE_CTX_USER_ID,
LANGFUSE_CTX_SESSION_ID,
LANGFUSE_CTX_METADATA,
)
from langfuse._client.environment_variables import (
LANGFUSE_FLUSH_AT,
LANGFUSE_FLUSH_INTERVAL,
Expand Down Expand Up @@ -114,6 +122,89 @@ def __init__(
else None,
)

def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
"""Handle span start event and propagate context and baggage to span attributes.

This method is called when a span starts and applies context propagation:
1. Propagates all baggage keys as span attributes
2. Propagates langfuse.ctx.* context variables as span attributes
3. Distributes langfuse.ctx.metadata keys as individual langfuse.metadata.* attributes

Args:
span: The span that is starting
parent_context: The context when the span was created (optional)
"""
# Get the current context (use parent_context if available, otherwise current)
current_context = parent_context or context_api.get_current()

# Dictionary to collect span attributes that were propagated
propagated_attributes = {}

# 1. Propagate all baggage keys as span attributes
baggage_entries = baggage.get_all(context=current_context)
for key, value in baggage_entries.items():
# Only propagate user.id, session.id and langfuse.metadata.* as those are set by us on the baggage
if key.startswith("langfuse.metadata.") or key in [
"user.id",
"session.id",
]:
propagated_attributes[key] = value
Comment thread
Steffen911 marked this conversation as resolved.
langfuse_logger.debug(
f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'"
)

# 2. Propagate langfuse.ctx.* context variables
langfuse_ctx_keys = [LANGFUSE_CTX_USER_ID, LANGFUSE_CTX_SESSION_ID]
for ctx_key in langfuse_ctx_keys:
try:
value = context_api.get_value(ctx_key, context=current_context)
if value is not None:
# Convert context key to span attribute name (remove langfuse.ctx. prefix)
attr_key = ctx_key.replace("langfuse.ctx.", "")
propagated_attributes[attr_key] = value
langfuse_logger.debug(
f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'"
)
except Exception as e:
langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}")

# 3. Handle langfuse.ctx.metadata - distribute keys as individual attributes
try:
# Get metadata dict from context
metadata_dict = context_api.get_value(
LANGFUSE_CTX_METADATA, context=current_context
)
if metadata_dict is not None and isinstance(metadata_dict, dict):
# Set each metadata key as a separate span attribute with langfuse.metadata. prefix
for key, value in metadata_dict.items():
attr_key = f"langfuse.metadata.{key}"

# Convert value to appropriate type for span attribute (naive or json stringify)
attr_value = (
value
if isinstance(value, (str, int, float, bool))
else json.dumps(value)
)

propagated_attributes[attr_key] = attr_value
langfuse_logger.debug(
f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'"
)
except Exception as e:
langfuse_logger.debug(f"Could not read metadata from context: {e}")

# Log summary of propagated attributes
if propagated_attributes:
langfuse_logger.debug(
f"Propagated {len(propagated_attributes)} attributes to span '{span.name}': {list(propagated_attributes.keys())}"
)

# Set all propagated attributes on the span
for key, value in propagated_attributes.items():
span.set_attribute(key, value) # type: ignore[arg-type]

return super().on_start(span, parent_context)

def on_end(self, span: ReadableSpan) -> None:
# Only export spans that belong to the scoped project
# This is important to not send spans to wrong project in multi-project setups
Expand Down
Loading