Skip to content

fix(observe): preserve streaming context without output capture#1628

Merged
hassiebp merged 3 commits intomainfrom
hassieb/fix-observe-streaming-context-without-output-capture
Apr 20, 2026
Merged

fix(observe): preserve streaming context without output capture#1628
hassiebp merged 3 commits intomainfrom
hassieb/fix-observe-streaming-context-without-output-capture

Conversation

@hassiebp
Copy link
Copy Markdown
Contributor

@hassiebp hassiebp commented Apr 16, 2026

Summary

  • preserve generator and StreamingResponse.body_iterator wrapping even when capture_output=False
  • keep deferred span finalization tied to stream consumption while still skipping output capture when disabled
  • add unit regressions for sync generators and async streaming responses with disabled output capture

Verification

  • uv run --frozen pytest tests/unit/test_observe.py
  • uv run --frozen ruff check langfuse/_client/observe.py tests/unit/test_observe.py
  • uv run --frozen ruff format --check langfuse/_client/observe.py tests/unit/test_observe.py
  • uv run --frozen mypy langfuse/_client/observe.py --no-error-summary

Disclaimer: Experimental PR review

Greptile Summary

This PR fixes a regression where capture_output=False inadvertently skipped generator and StreamingResponse.body_iterator wrapping entirely, causing spans to close prematurely instead of deferring finalization until stream exhaustion and breaking context propagation for nested @observe calls. The fix extracts a _handle_observe_result helper that unconditionally wraps all streaming result types, passing capture_output into the wrapper classes so they skip span.update(output=…) without affecting span.end() or context preservation.

Confidence Score: 5/5

Safe to merge — the fix is logically correct and well-targeted, with only a minor memory efficiency suggestion remaining.

All identified findings are P2: unconditional item accumulation when capture_output=False wastes memory but does not affect correctness or observable behavior, and test coverage for the direct async-generator path is a nice-to-have. No P0/P1 issues found. The core fix (unconditional wrapping, conditional output recording) is sound.

No files require special attention.

Important Files Changed

Filename Overview
langfuse/_client/observe.py Extracts _handle_observe_result to unconditionally wrap generators/StreamingResponses regardless of capture_output, propagating the flag into wrapper classes to conditionally skip output recording while preserving deferred span finalization and context propagation.
tests/unit/test_observe.py New unit test file with regression tests for sync generator and StreamingResponse with capture_output=False; covers context propagation, deferred span finalization, and absence of output attribute. Direct async generator path (inspect.isasyncgen) is not explicitly tested with capture_output=False.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["func returns result"] --> B["_handle_observe_result()"]
    B --> C{inspect.isgenerator?}
    C -- Yes --> D["_wrap_sync_generator_result(capture_output)"]
    D --> E["_ContextPreservedSyncGeneratorWrapper"]
    E --> F{Item yielded}
    F -- next item --> G["context.run(next, generator)\nappend to items"]
    G --> F
    F -- StopIteration --> H{capture_output?}
    H -- True --> I["span.update(output=items)\nspan.end()"]
    H -- False --> J["span.end()"]
    C -- No --> K{inspect.isasyncgen?}
    K -- Yes --> L["_wrap_async_generator_result(capture_output)"]
    L --> M["_ContextPreservedAsyncGeneratorWrapper"]
    M --> N{Item yielded}
    N -- next item --> O["create_task(__anext__, context)\nappend to items"]
    O --> N
    N -- StopAsyncIteration --> P{capture_output?}
    P -- True --> Q["span.update(output=items)\nspan.end()"]
    P -- False --> R["span.end()"]
    K -- No --> S{StreamingResponse?}
    S -- Yes --> T["wrap body_iterator\nreturn True, result"]
    S -- No --> U{capture_output?}
    U -- True --> V["span.update(output=result)\nreturn False, result"]
    U -- False --> W["return False, result"]
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: langfuse/_client/observe.py
Line: 574

Comment:
**Unnecessary item accumulation when `capture_output=False`**

Both `_ContextPreservedSyncGeneratorWrapper` and `_ContextPreservedAsyncGeneratorWrapper` unconditionally append every yielded item to `self.items`, even when `capture_output=False`. When output capture is disabled, `self.items` is never read, so this allocates memory proportional to the full stream size for no purpose. For large LLM streaming outputs this is wasteful.

Guard the append in both wrappers:
```python
if self.capture_output:
    self.items.append(item)
```

The same fix applies to `_ContextPreservedAsyncGeneratorWrapper.__anext__` at the equivalent line (~line 647).

How can I resolve this? If you propose a fix, please make it concise.

Reviews (1): Last reviewed commit: "fix(observe): preserve streaming context..." | Re-trigger Greptile

@github-actions
Copy link
Copy Markdown

@claude review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 75eec09658

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread langfuse/_client/observe.py
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — the item-accumulation concern from the bot review is already addressed in this diff.

Extended reasoning...

Overview

The PR touches two files: (core decorator logic) and a new . It extracts a helper that unconditionally wraps generator/StreamingResponse results regardless of , threading the flag into the wrapper classes so they skip while still deferring until stream exhaustion.

Security risks

None. The change does not touch authentication, authorization, cryptography, or external data boundaries.

Level of scrutiny

Low-to-medium. This is a targeted bug fix for a regression in observable tracing behavior. The code paths are well-understood and the change has clear before/after semantics. The existing test suite plus the two new regression tests provide adequate coverage.

Other factors

The bot raised a P1 concern about unbounded item buffering when . That concern is already resolved in this PR — both and guard behind . No outstanding issues remain.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8db48f3e37

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread langfuse/_client/observe.py
@hassiebp hassiebp merged commit 26ab5d9 into main Apr 20, 2026
16 of 17 checks passed
@hassiebp hassiebp deleted the hassieb/fix-observe-streaming-context-without-output-capture branch April 20, 2026 12:52
Comment on lines 571 to 595
try:
# Run the generator's __next__ in the preserved context
item = self.context.run(next, self.generator)
self.items.append(item)
if self.capture_output:
self.items.append(item)

return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)
self.span.update(output=output)

self.span.update(output=output).end()
self.span.end()

raise # Re-raise StopIteration

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 After this PR, generators are wrapped unconditionally (including when capture_output=False), so is_return_type_generator=True and the finally block skips span.end(); if a consumer abandons the generator without exhausting it, the span leaks indefinitely. Before this PR, capture_output=False generators were never wrapped, so span.end() was always guaranteed by the finally block. Adding close()/__del__ (and aclose() for async) to the wrapper classes that call self.span.end() would restore the guarantee.

Extended reasoning...

What the bug is

Before this PR, generator/async-generator wrapping was gated behind if capture_output is True. When capture_output=False, generators were returned unwrapped, is_return_type_generator remained False, and the finally block always called span.end() — guaranteeing cleanup even for abandoned generators.

This PR introduces _handle_observe_result, which unconditionally wraps every generator regardless of capture_output. As a result, is_return_type_generator is always set to True for generator returns, and the finally block's if not is_return_type_generator: span.end() guard is permanently skipped.

The specific code path

span.end() now lives exclusively in the wrapper's StopIteration/StopAsyncIteration handlers (lines 581 and 657 of the new file). If a consumer breaks early, never iterates, or raises outside the wrapper, neither handler is reached.

Neither _ContextPreservedSyncGeneratorWrapper nor _ContextPreservedAsyncGeneratorWrapper defines close(), __del__(), or aclose() methods. Python's generator protocol calls generator.close() on GC or explicit close(), but since the wrapper doesn't implement it, there is no hook to end the span.

Why existing code doesn't prevent it

The exception handler in __next__/__anext__ catches errors raised from within the generator, not errors raised by the consumer after receiving a value. When the consumer calls break or goes out of scope, Python calls wrapper.close() — which, because __class__.close is not overridden, uses the default implementation that just calls throw(GeneratorExit) on the underlying generator. self.span.end() is never invoked.

Concrete proof

Before this PR: span.end() is called in the finally block right after result = func(...) returns, because is_return_type_generator stays False.

After this PR: is_return_type_generator = True, finally is skipped, and StopIteration is never raised. The span remains open indefinitely.

Impact

Any @observe(capture_output=False)-decorated function returning a generator that is not fully exhausted (e.g., early break, timeout, error in calling code) will silently leak its span. In long-running services with many partial stream reads, this accumulates unbounded open spans.

How to fix

Add close() and __del__() to _ContextPreservedSyncGeneratorWrapper, and aclose() plus __del__() to _ContextPreservedAsyncGeneratorWrapper:

This matches the guarantee that existed for capture_output=False before this PR and makes both modes consistent.

Comment thread tests/unit/test_observe.py
Comment thread tests/unit/test_observe.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant