Conversation
2f60526 to
c18d0b4
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2f60526a16
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Additional findings (outside current diff — PR may have been updated during review):
-
🟣
langfuse/_client/observe.py:721-734— This is a pre-existing bug in_ContextPreservedAsyncGeneratorWrapper.__anext__: the innerexcept TypeErrorblock, intended to detect Python < 3.10 whereasyncio.create_task()rejects thecontextkeyword, will also catch anyTypeErrorraised inside the user's async generator body on Python 3.10+. When that happens the exception is silently swallowed, the fallbackawait self.generator.__anext__()is called on an already-closed generator, which raisesStopAsyncIteration, causing the outer handler to call_finalize()and mark the span as successful — so the caller sees a false end-of-stream instead of the actual error. A narrower fix is to probe the Python version once at module level (_ASYNCIO_TASK_CONTEXT_SUPPORTED = sys.version_info >= (3, 10)) and branch on that flag instead of catchingTypeErrorfrom the task call.Extended reasoning...
What the bug is and how it manifests
In
_ContextPreservedAsyncGeneratorWrapper.__anext__(observe.py ~721-734) the code uses a try/except pattern to detect whether the running Python version supports thecontextkeyword ofasyncio.create_task():try: item = await asyncio.create_task( self.generator.__anext__(), context=self.context ) except TypeError: # Python < 3.10 fallback item = await self.generator.__anext__()
On Python 3.10+,
asyncio.create_task()fully accepts thecontextkeyword, so anyTypeErrorreaching theexceptclause can only have come from inside the user's async generator body. That user-facing error is silently swallowed and treated as a compatibility probe failure.The specific code path that triggers it
- Python >= 3.10 is in use (so
create_task(context=...)never raisesTypeErroritself). - The user's async generator body executes inside the task and raises
TypeError(e.g. a type-check failure, bad arithmetic, missing attribute that returns the wrong type, etc.). - The exception propagates from the awaited task to
await asyncio.create_task(...). - The inner
except TypeError:catches it — incorrectly treating it as the Python < 3.10 compatibility signal. - The fallback
await self.generator.__anext__()is called. Because the generator's frame was already unwound by theTypeError, this immediately raisesStopAsyncIteration. - The outer
except StopAsyncIteration:fires, calls_finalize(), marks the span as successful, and re-raisesStopAsyncIteration. - The caller receives
StopAsyncIteration— a false end-of-stream signal — and never sees the originalTypeError.
Why existing code does not prevent it
The
except TypeErrorguard was written for a legitimate purpose (runtime Python version detection) but is too broad. It has no way to distinguish aTypeErrorthat came fromcreate_task's own argument validation versus one that bubbled up from inside the generator coroutine. The outerexcept (Exception, asyncio.CancelledError)handler that would have correctly called_finalize_with_error()is never reached because the inner handler consumes the exception first.What the impact is
- Any
TypeErrorraised in user generator code is silently lost on Python 3.10+. - The associated Langfuse span is recorded as a successful completion rather than an error, so observability data is incorrect and debugging is made significantly harder.
- The caller receives
StopAsyncIterationand may believe the stream ended normally, potentially masking data-integrity issues.
How to fix it
Determine Python version support once at module level and branch on that flag instead of catching
TypeError:_ASYNCIO_TASK_CONTEXT_SUPPORTED = sys.version_info >= (3, 10) async def __anext__(self) -> Any: try: if _ASYNCIO_TASK_CONTEXT_SUPPORTED: item = await asyncio.create_task( self.generator.__anext__(), context=self.context ) else: item = await self.generator.__anext__() ...
This eliminates the ambiguous
except TypeErrorentirely so any exception from the generator body propagates naturally to the outer handlers.Step-by-step proof
async def gen(): yield 1 raise TypeError('internal error') # user code raises TypeError yield 2 wrapper = _ContextPreservedAsyncGeneratorWrapper(gen(), ctx, span, True, None) # Call 1: returns 1 — OK item = await wrapper.__anext__() # → 1 # Call 2 (Python 3.10+): # create_task runs gen.__anext__(), generator raises TypeError('internal error') # → propagates to 'await create_task(...)' # → caught by inner 'except TypeError:' ← BUG: wrong handler # fallback: await self.generator.__anext__() # → generator is closed, raises StopAsyncIteration # → caught by outer 'except StopAsyncIteration:' # → _finalize() called, span.end() called with no error # → StopAsyncIteration re-raised item = await wrapper.__anext__() # raises StopAsyncIteration (expected: TypeError) # span shows SUCCESS (expected: ERROR with 'internal error')
- Python >= 3.10 is in use (so
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9ee80d33d6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 03ba99d3a7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Summary
StreamingResponse.body_iteratorwrapping even whencapture_output=FalseVerification
uv run --frozen pytest tests/unit/test_observe.pyuv run --frozen ruff check langfuse/_client/observe.py tests/unit/test_observe.pyuv run --frozen ruff format --check langfuse/_client/observe.py tests/unit/test_observe.pyuv run --frozen mypy langfuse/_client/observe.py tests/unit/test_observe.py --no-error-summaryDisclaimer: Experimental PR review
Greptile Summary
This PR fixes a context-propagation bug where
capture_output=Falseon a generator-returning orStreamingResponse-returning function caused the span to be finalized immediately on function return, orphaning any child spans created during iteration. The fix moves generator detection unconditionally into the new_handle_observe_resulthelper so the span always stays open until the generator is exhausted or closed, while still skipping output buffering whencapture_output=False. Both wrapper classes gain idempotentclose/aclose/__del__finalizers to prevent spans from leaking on early termination or GC.Confidence Score: 5/5
Safe to merge; the fix is well-scoped and all remaining findings are P2 suggestions.
No P0/P1 issues found. The logic correctly gates output buffering on
capture_outputwhile always preserving span context for generator/streaming returns. Idempotency guards prevent double-finalization. The only note is a P2 documentation suggestion about the intentional best-effort async__del__behavior.No files require special attention.
Important Files Changed
_handle_observe_result; generators are now always wrapped (regardless ofcapture_output) so span context is preserved, with output buffering conditionally skipped; wrapper classes gain idempotentclose/aclose/__del__finalizers.capture_output=False,StreamingResponsewrapping, double-close idempotency, and__del__-based abandonment cleanup for both wrapper types.Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A[Decorated function returns] --> B{_handle_observe_result} B -->|isgenerator| C[_wrap_sync_generator_result\ncapture_output passed through] B -->|isasyncgen| D[_wrap_async_generator_result\ncapture_output passed through] B -->|StreamingResponse| E[wrap body_iterator\ncapture_output passed through] B -->|plain value + capture_output=True| F[span.update output=result] B -->|plain value + capture_output=False| G[no output update] C --> H[_ContextPreservedSyncGeneratorWrapper] D --> I[_ContextPreservedAsyncGeneratorWrapper] E --> I H -->|exhausted / close / __del__| J{capture_output?} I -->|exhausted / aclose / __del__| J J -->|True| K[span.update output=items\nspan.end] J -->|False| L[span.end only\nitems list stays empty] F --> M[is_return_type_generator=False\nspan.end in finally] G --> MPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix(observe): finalize abandoned generat..." | Re-trigger Greptile