Conversation
|
@claude review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 75eec09658
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
LGTM — the item-accumulation concern from the bot review is already addressed in this diff.
Extended reasoning...
Overview
The PR touches two files: (core decorator logic) and a new . It extracts a helper that unconditionally wraps generator/StreamingResponse results regardless of , threading the flag into the wrapper classes so they skip while still deferring until stream exhaustion.
Security risks
None. The change does not touch authentication, authorization, cryptography, or external data boundaries.
Level of scrutiny
Low-to-medium. This is a targeted bug fix for a regression in observable tracing behavior. The code paths are well-understood and the change has clear before/after semantics. The existing test suite plus the two new regression tests provide adequate coverage.
Other factors
The bot raised a P1 concern about unbounded item buffering when . That concern is already resolved in this PR — both and guard behind . No outstanding issues remain.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8db48f3e37
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| try: | ||
| # Run the generator's __next__ in the preserved context | ||
| item = self.context.run(next, self.generator) | ||
| self.items.append(item) | ||
| if self.capture_output: | ||
| self.items.append(item) | ||
|
|
||
| return item | ||
|
|
||
| except StopIteration: | ||
| # Handle output and span cleanup when generator is exhausted | ||
| output: Any = self.items | ||
| if self.capture_output: | ||
| output: Any = self.items | ||
|
|
||
| if self.transform_fn is not None: | ||
| output = self.transform_fn(self.items) | ||
|
|
||
| if self.transform_fn is not None: | ||
| output = self.transform_fn(self.items) | ||
| elif all(isinstance(item, str) for item in self.items): | ||
| output = "".join(self.items) | ||
|
|
||
| elif all(isinstance(item, str) for item in self.items): | ||
| output = "".join(self.items) | ||
| self.span.update(output=output) | ||
|
|
||
| self.span.update(output=output).end() | ||
| self.span.end() | ||
|
|
||
| raise # Re-raise StopIteration | ||
|
|
There was a problem hiding this comment.
🔴 After this PR, generators are wrapped unconditionally (including when capture_output=False), so is_return_type_generator=True and the finally block skips span.end(); if a consumer abandons the generator without exhausting it, the span leaks indefinitely. Before this PR, capture_output=False generators were never wrapped, so span.end() was always guaranteed by the finally block. Adding close()/__del__ (and aclose() for async) to the wrapper classes that call self.span.end() would restore the guarantee.
Extended reasoning...
What the bug is
Before this PR, generator/async-generator wrapping was gated behind if capture_output is True. When capture_output=False, generators were returned unwrapped, is_return_type_generator remained False, and the finally block always called span.end() — guaranteeing cleanup even for abandoned generators.
This PR introduces _handle_observe_result, which unconditionally wraps every generator regardless of capture_output. As a result, is_return_type_generator is always set to True for generator returns, and the finally block's if not is_return_type_generator: span.end() guard is permanently skipped.
The specific code path
span.end() now lives exclusively in the wrapper's StopIteration/StopAsyncIteration handlers (lines 581 and 657 of the new file). If a consumer breaks early, never iterates, or raises outside the wrapper, neither handler is reached.
Neither _ContextPreservedSyncGeneratorWrapper nor _ContextPreservedAsyncGeneratorWrapper defines close(), __del__(), or aclose() methods. Python's generator protocol calls generator.close() on GC or explicit close(), but since the wrapper doesn't implement it, there is no hook to end the span.
Why existing code doesn't prevent it
The exception handler in __next__/__anext__ catches errors raised from within the generator, not errors raised by the consumer after receiving a value. When the consumer calls break or goes out of scope, Python calls wrapper.close() — which, because __class__.close is not overridden, uses the default implementation that just calls throw(GeneratorExit) on the underlying generator. self.span.end() is never invoked.
Concrete proof
Before this PR: span.end() is called in the finally block right after result = func(...) returns, because is_return_type_generator stays False.
After this PR: is_return_type_generator = True, finally is skipped, and StopIteration is never raised. The span remains open indefinitely.
Impact
Any @observe(capture_output=False)-decorated function returning a generator that is not fully exhausted (e.g., early break, timeout, error in calling code) will silently leak its span. In long-running services with many partial stream reads, this accumulates unbounded open spans.
How to fix
Add close() and __del__() to _ContextPreservedSyncGeneratorWrapper, and aclose() plus __del__() to _ContextPreservedAsyncGeneratorWrapper:
This matches the guarantee that existed for capture_output=False before this PR and makes both modes consistent.
Summary
StreamingResponse.body_iteratorwrapping even whencapture_output=FalseVerification
uv run --frozen pytest tests/unit/test_observe.pyuv run --frozen ruff check langfuse/_client/observe.py tests/unit/test_observe.pyuv run --frozen ruff format --check langfuse/_client/observe.py tests/unit/test_observe.pyuv run --frozen mypy langfuse/_client/observe.py --no-error-summaryDisclaimer: Experimental PR review
Greptile Summary
This PR fixes a regression where
capture_output=Falseinadvertently skipped generator andStreamingResponse.body_iteratorwrapping entirely, causing spans to close prematurely instead of deferring finalization until stream exhaustion and breaking context propagation for nested@observecalls. The fix extracts a_handle_observe_resulthelper that unconditionally wraps all streaming result types, passingcapture_outputinto the wrapper classes so they skipspan.update(output=…)without affectingspan.end()or context preservation.Confidence Score: 5/5
Safe to merge — the fix is logically correct and well-targeted, with only a minor memory efficiency suggestion remaining.
All identified findings are P2: unconditional item accumulation when capture_output=False wastes memory but does not affect correctness or observable behavior, and test coverage for the direct async-generator path is a nice-to-have. No P0/P1 issues found. The core fix (unconditional wrapping, conditional output recording) is sound.
No files require special attention.
Important Files Changed
Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A["func returns result"] --> B["_handle_observe_result()"] B --> C{inspect.isgenerator?} C -- Yes --> D["_wrap_sync_generator_result(capture_output)"] D --> E["_ContextPreservedSyncGeneratorWrapper"] E --> F{Item yielded} F -- next item --> G["context.run(next, generator)\nappend to items"] G --> F F -- StopIteration --> H{capture_output?} H -- True --> I["span.update(output=items)\nspan.end()"] H -- False --> J["span.end()"] C -- No --> K{inspect.isasyncgen?} K -- Yes --> L["_wrap_async_generator_result(capture_output)"] L --> M["_ContextPreservedAsyncGeneratorWrapper"] M --> N{Item yielded} N -- next item --> O["create_task(__anext__, context)\nappend to items"] O --> N N -- StopAsyncIteration --> P{capture_output?} P -- True --> Q["span.update(output=items)\nspan.end()"] P -- False --> R["span.end()"] K -- No --> S{StreamingResponse?} S -- Yes --> T["wrap body_iterator\nreturn True, result"] S -- No --> U{capture_output?} U -- True --> V["span.update(output=result)\nreturn False, result"] U -- False --> W["return False, result"]Prompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix(observe): preserve streaming context..." | Re-trigger Greptile