Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 95 additions & 84 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,42 +290,15 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:

try:
result = await func(*args, **kwargs)

if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True

return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

if inspect.isasyncgen(result):
is_return_type_generator = True

return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

# handle starlette.StreamingResponse
if type(result).__name__ == "StreamingResponse" and hasattr(
result, "body_iterator"
):
is_return_type_generator = True

result.body_iterator = (
self._wrap_async_generator_result(
langfuse_span_or_generation,
result.body_iterator,
transform_to_string,
)
)

langfuse_span_or_generation.update(output=result)

(
is_return_type_generator,
result,
) = self._handle_observe_result(
langfuse_span_or_generation,
result,
capture_output=capture_output,
transform_to_string=transform_to_string,
)
return result
except (Exception, asyncio.CancelledError) as e:
langfuse_span_or_generation.update(
Expand Down Expand Up @@ -408,42 +381,15 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:

try:
result = func(*args, **kwargs)

if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True

return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

if inspect.isasyncgen(result):
is_return_type_generator = True

return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

# handle starlette.StreamingResponse
if type(result).__name__ == "StreamingResponse" and hasattr(
result, "body_iterator"
):
is_return_type_generator = True

result.body_iterator = (
self._wrap_async_generator_result(
langfuse_span_or_generation,
result.body_iterator,
transform_to_string,
)
)

langfuse_span_or_generation.update(output=result)

(
is_return_type_generator,
result,
) = self._handle_observe_result(
langfuse_span_or_generation,
result,
capture_output=capture_output,
transform_to_string=transform_to_string,
)
return result
except (Exception, asyncio.CancelledError) as e:
langfuse_span_or_generation.update(
Expand Down Expand Up @@ -493,6 +439,7 @@ def _wrap_sync_generator_result(
LangfuseGuardrail,
],
generator: Generator,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
preserved_context = contextvars.copy_context()
Expand All @@ -501,6 +448,7 @@ def _wrap_sync_generator_result(
generator,
preserved_context,
langfuse_span_or_generation,
capture_output,
transform_to_string,
)

Expand All @@ -518,6 +466,7 @@ def _wrap_async_generator_result(
LangfuseGuardrail,
],
generator: AsyncGenerator,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
preserved_context = contextvars.copy_context()
Expand All @@ -526,9 +475,61 @@ def _wrap_async_generator_result(
generator,
preserved_context,
langfuse_span_or_generation,
capture_output,
transform_to_string,
)

def _handle_observe_result(
self,
langfuse_span_or_generation: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseGuardrail,
],
result: Any,
*,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Tuple[bool, Any]:
if inspect.isgenerator(result):
return True, self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
capture_output,
Comment thread
hassiebp marked this conversation as resolved.
Comment thread
hassiebp marked this conversation as resolved.
transform_to_string,
)

if inspect.isasyncgen(result):
return True, self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
capture_output,
transform_to_string,
)

# handle starlette.StreamingResponse
if type(result).__name__ == "StreamingResponse" and hasattr(
result, "body_iterator"
):
result.body_iterator = self._wrap_async_generator_result(
langfuse_span_or_generation,
result.body_iterator,
capture_output,
transform_to_string,
)
return True, result

if capture_output is True:
langfuse_span_or_generation.update(output=result)

return False, result


_decorator = LangfuseDecorator()

Expand All @@ -553,12 +554,14 @@ def __init__(
LangfuseEmbedding,
LangfuseGuardrail,
],
capture_output: bool,
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items: List[Any] = []
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn

def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
Expand All @@ -574,15 +577,18 @@ def __next__(self) -> Any:

except StopIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
self.span.update(output=output)

self.span.update(output=output).end()
self.span.end()

raise # Re-raise StopIteration

Comment on lines 571 to 595
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 After this PR, generators are wrapped unconditionally (including when capture_output=False), so is_return_type_generator=True and the finally block skips span.end(); if a consumer abandons the generator without exhausting it, the span leaks indefinitely. Before this PR, capture_output=False generators were never wrapped, so span.end() was always guaranteed by the finally block. Adding close()/__del__ (and aclose() for async) to the wrapper classes that call self.span.end() would restore the guarantee.

Extended reasoning...

What the bug is

Before this PR, generator/async-generator wrapping was gated behind if capture_output is True. When capture_output=False, generators were returned unwrapped, is_return_type_generator remained False, and the finally block always called span.end() — guaranteeing cleanup even for abandoned generators.

This PR introduces _handle_observe_result, which unconditionally wraps every generator regardless of capture_output. As a result, is_return_type_generator is always set to True for generator returns, and the finally block's if not is_return_type_generator: span.end() guard is permanently skipped.

The specific code path

span.end() now lives exclusively in the wrapper's StopIteration/StopAsyncIteration handlers (lines 581 and 657 of the new file). If a consumer breaks early, never iterates, or raises outside the wrapper, neither handler is reached.

Neither _ContextPreservedSyncGeneratorWrapper nor _ContextPreservedAsyncGeneratorWrapper defines close(), __del__(), or aclose() methods. Python's generator protocol calls generator.close() on GC or explicit close(), but since the wrapper doesn't implement it, there is no hook to end the span.

Why existing code doesn't prevent it

The exception handler in __next__/__anext__ catches errors raised from within the generator, not errors raised by the consumer after receiving a value. When the consumer calls break or goes out of scope, Python calls wrapper.close() — which, because __class__.close is not overridden, uses the default implementation that just calls throw(GeneratorExit) on the underlying generator. self.span.end() is never invoked.

Concrete proof

Before this PR: span.end() is called in the finally block right after result = func(...) returns, because is_return_type_generator stays False.

After this PR: is_return_type_generator = True, finally is skipped, and StopIteration is never raised. The span remains open indefinitely.

Impact

Any @observe(capture_output=False)-decorated function returning a generator that is not fully exhausted (e.g., early break, timeout, error in calling code) will silently leak its span. In long-running services with many partial stream reads, this accumulates unbounded open spans.

How to fix

Add close() and __del__() to _ContextPreservedSyncGeneratorWrapper, and aclose() plus __del__() to _ContextPreservedAsyncGeneratorWrapper:

This matches the guarantee that existed for capture_output=False before this PR and makes both modes consistent.

Expand Down Expand Up @@ -612,12 +618,14 @@ def __init__(
LangfuseEmbedding,
LangfuseGuardrail,
],
capture_output: bool,
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items: List[Any] = []
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn

def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
Expand All @@ -642,15 +650,18 @@ async def __anext__(self) -> Any:

except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
self.span.update(output=output)

self.span.update(output=output).end()
self.span.end()

raise # Re-raise StopAsyncIteration
except (Exception, asyncio.CancelledError) as e:
Expand Down
90 changes: 90 additions & 0 deletions tests/unit/test_observe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import sys

import pytest

from langfuse import observe
from langfuse._client.attributes import LangfuseOtelSpanAttributes


def _finished_spans_by_name(memory_exporter, name: str):
return [span for span in memory_exporter.get_finished_spans() if span.name == name]


def test_sync_generator_preserves_context_without_output_capture(
langfuse_memory_client, memory_exporter
):
@observe(name="child_step")
def child_step(index: int) -> str:
return f"item_{index}"

@observe(name="root", capture_output=False)
def root():
def body():
for index in range(2):
yield child_step(index)

return body()

generator = root()

assert memory_exporter.get_finished_spans() == []

assert list(generator) == ["item_0", "item_1"]

langfuse_memory_client.flush()

root_span = _finished_spans_by_name(memory_exporter, "root")[0]
child_spans = _finished_spans_by_name(memory_exporter, "child_step")

assert len(child_spans) == 2
assert all(child.parent is not None for child in child_spans)
assert all(
child.parent.span_id == root_span.context.span_id for child in child_spans
)
assert all(
child.context.trace_id == root_span.context.trace_id for child in child_spans
)
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in root_span.attributes


@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 11), reason="requires python3.11 or higher")
async def test_streaming_response_preserves_context_without_output_capture(
Comment thread
hassiebp marked this conversation as resolved.
Comment thread
hassiebp marked this conversation as resolved.
langfuse_memory_client, memory_exporter
):
class StreamingResponse:
def __init__(self, body_iterator):
self.body_iterator = body_iterator

@observe(name="stream_step")
async def stream_step(index: int) -> str:
return f"chunk_{index}"

async def body():
for index in range(2):
yield await stream_step(index)

@observe(name="endpoint", capture_output=False)
async def endpoint():
return StreamingResponse(body())

response = await endpoint()

assert memory_exporter.get_finished_spans() == []

assert [item async for item in response.body_iterator] == ["chunk_0", "chunk_1"]

langfuse_memory_client.flush()

endpoint_span = _finished_spans_by_name(memory_exporter, "endpoint")[0]
step_spans = _finished_spans_by_name(memory_exporter, "stream_step")

assert len(step_spans) == 2
assert all(step.parent is not None for step in step_spans)
assert all(
step.parent.span_id == endpoint_span.context.span_id for step in step_spans
)
assert all(
step.context.trace_id == endpoint_span.context.trace_id for step in step_spans
)
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in endpoint_span.attributes
Loading