Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 99 additions & 86 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,42 +290,15 @@

try:
result = await func(*args, **kwargs)

if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True

return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

if inspect.isasyncgen(result):
is_return_type_generator = True

return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

# handle starlette.StreamingResponse
if type(result).__name__ == "StreamingResponse" and hasattr(
result, "body_iterator"
):
is_return_type_generator = True

result.body_iterator = (
self._wrap_async_generator_result(
langfuse_span_or_generation,
result.body_iterator,
transform_to_string,
)
)

langfuse_span_or_generation.update(output=result)

(
is_return_type_generator,
result,
) = self._handle_observe_result(
langfuse_span_or_generation,
result,
capture_output=capture_output,
transform_to_string=transform_to_string,
)
return result
except (Exception, asyncio.CancelledError) as e:
langfuse_span_or_generation.update(
Expand Down Expand Up @@ -408,42 +381,15 @@

try:
result = func(*args, **kwargs)

if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True

return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

if inspect.isasyncgen(result):
is_return_type_generator = True

return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

# handle starlette.StreamingResponse
if type(result).__name__ == "StreamingResponse" and hasattr(
result, "body_iterator"
):
is_return_type_generator = True

result.body_iterator = (
self._wrap_async_generator_result(
langfuse_span_or_generation,
result.body_iterator,
transform_to_string,
)
)

langfuse_span_or_generation.update(output=result)

(
is_return_type_generator,
result,
) = self._handle_observe_result(
langfuse_span_or_generation,
result,
capture_output=capture_output,
transform_to_string=transform_to_string,
)
return result
except (Exception, asyncio.CancelledError) as e:
langfuse_span_or_generation.update(
Expand Down Expand Up @@ -493,6 +439,7 @@
LangfuseGuardrail,
],
generator: Generator,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
preserved_context = contextvars.copy_context()
Expand All @@ -501,6 +448,7 @@
generator,
preserved_context,
langfuse_span_or_generation,
capture_output,
transform_to_string,
)

Expand All @@ -518,6 +466,7 @@
LangfuseGuardrail,
],
generator: AsyncGenerator,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
preserved_context = contextvars.copy_context()
Expand All @@ -526,9 +475,61 @@
generator,
preserved_context,
langfuse_span_or_generation,
capture_output,
transform_to_string,
)

def _handle_observe_result(
self,
langfuse_span_or_generation: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseGuardrail,
],
result: Any,
*,
capture_output: bool,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Tuple[bool, Any]:
if inspect.isgenerator(result):
return True, self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
capture_output,
Comment thread
hassiebp marked this conversation as resolved.
Comment thread
hassiebp marked this conversation as resolved.
transform_to_string,
)

if inspect.isasyncgen(result):
return True, self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
capture_output,
transform_to_string,
)

# handle starlette.StreamingResponse
if type(result).__name__ == "StreamingResponse" and hasattr(
result, "body_iterator"
):
result.body_iterator = self._wrap_async_generator_result(
langfuse_span_or_generation,
result.body_iterator,
capture_output,
transform_to_string,
)
return True, result

if capture_output is True:
langfuse_span_or_generation.update(output=result)

return False, result


_decorator = LangfuseDecorator()

Expand All @@ -553,39 +554,45 @@
LangfuseEmbedding,
LangfuseGuardrail,
],
capture_output: bool,
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items: List[Any] = []
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn

def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
return self

def __next__(self) -> Any:
try:
# Run the generator's __next__ in the preserved context
item = self.context.run(next, self.generator)
self.items.append(item)
if self.capture_output:
self.items.append(item)

return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
self.span.update(output=output)

self.span.update(output=output).end()
self.span.end()

raise # Re-raise StopIteration

Check failure on line 595 in langfuse/_client/observe.py

View check run for this annotation

Claude / Claude Code Review

Span leaks when capture_output=False generator is abandoned

After this PR, generators are wrapped unconditionally (including when `capture_output=False`), so `is_return_type_generator=True` and the `finally` block skips `span.end()`; if a consumer abandons the generator without exhausting it, the span leaks indefinitely. Before this PR, `capture_output=False` generators were never wrapped, so `span.end()` was always guaranteed by the `finally` block. Adding `close()`/`__del__` (and `aclose()` for async) to the wrapper classes that call `self.span.end()`
Comment on lines 571 to 595
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 After this PR, generators are wrapped unconditionally (including when capture_output=False), so is_return_type_generator=True and the finally block skips span.end(); if a consumer abandons the generator without exhausting it, the span leaks indefinitely. Before this PR, capture_output=False generators were never wrapped, so span.end() was always guaranteed by the finally block. Adding close()/__del__ (and aclose() for async) to the wrapper classes that call self.span.end() would restore the guarantee.

Extended reasoning...

What the bug is

Before this PR, generator/async-generator wrapping was gated behind if capture_output is True. When capture_output=False, generators were returned unwrapped, is_return_type_generator remained False, and the finally block always called span.end() — guaranteeing cleanup even for abandoned generators.

This PR introduces _handle_observe_result, which unconditionally wraps every generator regardless of capture_output. As a result, is_return_type_generator is always set to True for generator returns, and the finally block's if not is_return_type_generator: span.end() guard is permanently skipped.

The specific code path

span.end() now lives exclusively in the wrapper's StopIteration/StopAsyncIteration handlers (lines 581 and 657 of the new file). If a consumer breaks early, never iterates, or raises outside the wrapper, neither handler is reached.

Neither _ContextPreservedSyncGeneratorWrapper nor _ContextPreservedAsyncGeneratorWrapper defines close(), __del__(), or aclose() methods. Python's generator protocol calls generator.close() on GC or explicit close(), but since the wrapper doesn't implement it, there is no hook to end the span.

Why existing code doesn't prevent it

The exception handler in __next__/__anext__ catches errors raised from within the generator, not errors raised by the consumer after receiving a value. When the consumer calls break or goes out of scope, Python calls wrapper.close() — which, because __class__.close is not overridden, uses the default implementation that just calls throw(GeneratorExit) on the underlying generator. self.span.end() is never invoked.

Concrete proof

Before this PR: span.end() is called in the finally block right after result = func(...) returns, because is_return_type_generator stays False.

After this PR: is_return_type_generator = True, finally is skipped, and StopIteration is never raised. The span remains open indefinitely.

Impact

Any @observe(capture_output=False)-decorated function returning a generator that is not fully exhausted (e.g., early break, timeout, error in calling code) will silently leak its span. In long-running services with many partial stream reads, this accumulates unbounded open spans.

How to fix

Add close() and __del__() to _ContextPreservedSyncGeneratorWrapper, and aclose() plus __del__() to _ContextPreservedAsyncGeneratorWrapper:

This matches the guarantee that existed for capture_output=False before this PR and makes both modes consistent.

except (Exception, asyncio.CancelledError) as e:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
Expand All @@ -612,12 +619,14 @@
LangfuseEmbedding,
LangfuseGuardrail,
],
capture_output: bool,
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items: List[Any] = []
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn

def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
Expand All @@ -636,21 +645,25 @@
# Python < 3.10 fallback - context parameter not supported
item = await self.generator.__anext__()

self.items.append(item)
if self.capture_output:
self.items.append(item)

return item

except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
self.span.update(output=output)

self.span.update(output=output).end()
self.span.end()

raise # Re-raise StopAsyncIteration
except (Exception, asyncio.CancelledError) as e:
Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_observe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import sys

import pytest

from langfuse import observe
from langfuse._client.attributes import LangfuseOtelSpanAttributes


def _finished_spans_by_name(memory_exporter, name: str):
return [span for span in memory_exporter.get_finished_spans() if span.name == name]


def test_sync_generator_preserves_context_without_output_capture(
langfuse_memory_client, memory_exporter
):
@observe(name="child_step")
def child_step(index: int) -> str:
return f"item_{index}"

@observe(name="root", capture_output=False)
def root():
def body():
for index in range(2):
yield child_step(index)

return body()

generator = root()

assert memory_exporter.get_finished_spans() == []

assert list(generator) == ["item_0", "item_1"]
assert generator.items == []

langfuse_memory_client.flush()

root_span = _finished_spans_by_name(memory_exporter, "root")[0]
child_spans = _finished_spans_by_name(memory_exporter, "child_step")

assert len(child_spans) == 2
assert all(child.parent is not None for child in child_spans)
assert all(
child.parent.span_id == root_span.context.span_id for child in child_spans
)
assert all(
child.context.trace_id == root_span.context.trace_id for child in child_spans
)
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in root_span.attributes


@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 11), reason="requires python3.11 or higher")
async def test_streaming_response_preserves_context_without_output_capture(

Check warning on line 53 in tests/unit/test_observe.py

View check run for this annotation

Claude / Claude Code Review

Misleading comment and missing test coverage for Python 3.10 fallback path

The implementation comments in `_ContextPreservedAsyncGeneratorWrapper.__anext__` are wrong: line 639 says "Python 3.10+ approach with context parameter" but `asyncio.create_task(context=...)` was added in Python **3.11**, not 3.10. Consequently, the fallback comment (line 645) should say "Python < 3.11" instead of "Python < 3.10". The test skipif condition `sys.version_info < (3, 11)` is actually correct, but the misleading comments will confuse future maintainers, and the Python 3.10 fallback

Check warning on line 53 in tests/unit/test_observe.py

View check run for this annotation

Claude / Claude Code Review

test

The test `test_streaming_response_preserves_context_without_output_capture` has a `skipif` condition of `sys.version_info < (3, 11)`, but the `_ContextPreservedAsyncGeneratorWrapper` code itself comments "Python 3.10+ approach with context parameter", indicating `asyncio.create_task(context=...)` was added in Python 3.10. Since `pyproject.toml` declares `requires-python = ">=3.10,<4.0"`, the threshold should be `sys.version_info < (3, 10)` (or the guard removed entirely), so Python 3.10—the mini
Comment thread
hassiebp marked this conversation as resolved.
Comment thread
hassiebp marked this conversation as resolved.
langfuse_memory_client, memory_exporter
):
class StreamingResponse:
def __init__(self, body_iterator):
self.body_iterator = body_iterator

@observe(name="stream_step")
async def stream_step(index: int) -> str:
return f"chunk_{index}"

async def body():
for index in range(2):
yield await stream_step(index)

@observe(name="endpoint", capture_output=False)
async def endpoint():
return StreamingResponse(body())

response = await endpoint()

assert memory_exporter.get_finished_spans() == []

assert [item async for item in response.body_iterator] == ["chunk_0", "chunk_1"]
assert response.body_iterator.items == []

langfuse_memory_client.flush()

endpoint_span = _finished_spans_by_name(memory_exporter, "endpoint")[0]
step_spans = _finished_spans_by_name(memory_exporter, "stream_step")

assert len(step_spans) == 2
assert all(step.parent is not None for step in step_spans)
assert all(
step.parent.span_id == endpoint_span.context.span_id for step in step_spans
)
assert all(
step.context.trace_id == endpoint_span.context.trace_id for step in step_spans
)
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in endpoint_span.attributes
Loading