Skip to content

fix(observe): preserve streaming context without output capture#1634

Open
hassiebp wants to merge 3 commits intomainfrom
hassieb/fix-observe-streaming-context-without-output-capture
Open

fix(observe): preserve streaming context without output capture#1634
hassiebp wants to merge 3 commits intomainfrom
hassieb/fix-observe-streaming-context-without-output-capture

Conversation

@hassiebp
Copy link
Copy Markdown
Contributor

@hassiebp hassiebp commented Apr 20, 2026

Summary

  • preserve generator and StreamingResponse.body_iterator wrapping even when capture_output=False
  • avoid buffering yielded stream items in memory when output capture is disabled
  • finalize wrapped generators on exhaustion, explicit close, and best-effort abandonment cleanup so spans do not stay open indefinitely
  • add unit regressions covering sync and async streaming behavior, disabled output capture, and early-terminated wrapper cleanup

Verification

  • uv run --frozen pytest tests/unit/test_observe.py
  • uv run --frozen ruff check langfuse/_client/observe.py tests/unit/test_observe.py
  • uv run --frozen ruff format --check langfuse/_client/observe.py tests/unit/test_observe.py
  • uv run --frozen mypy langfuse/_client/observe.py tests/unit/test_observe.py --no-error-summary

Disclaimer: Experimental PR review

Greptile Summary

This PR fixes a context-propagation bug where capture_output=False on a generator-returning or StreamingResponse-returning function caused the span to be finalized immediately on function return, orphaning any child spans created during iteration. The fix moves generator detection unconditionally into the new _handle_observe_result helper so the span always stays open until the generator is exhausted or closed, while still skipping output buffering when capture_output=False. Both wrapper classes gain idempotent close/aclose/__del__ finalizers to prevent spans from leaking on early termination or GC.

Confidence Score: 5/5

Safe to merge; the fix is well-scoped and all remaining findings are P2 suggestions.

No P0/P1 issues found. The logic correctly gates output buffering on capture_output while always preserving span context for generator/streaming returns. Idempotency guards prevent double-finalization. The only note is a P2 documentation suggestion about the intentional best-effort async __del__ behavior.

No files require special attention.

Important Files Changed

Filename Overview
langfuse/_client/observe.py Refactored result-handling into _handle_observe_result; generators are now always wrapped (regardless of capture_output) so span context is preserved, with output buffering conditionally skipped; wrapper classes gain idempotent close/aclose/__del__ finalizers.
tests/unit/test_observe.py New unit tests covering sync/async generator context-preservation with capture_output=False, StreamingResponse wrapping, double-close idempotency, and __del__-based abandonment cleanup for both wrapper types.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Decorated function returns] --> B{_handle_observe_result}
    B -->|isgenerator| C[_wrap_sync_generator_result\ncapture_output passed through]
    B -->|isasyncgen| D[_wrap_async_generator_result\ncapture_output passed through]
    B -->|StreamingResponse| E[wrap body_iterator\ncapture_output passed through]
    B -->|plain value + capture_output=True| F[span.update output=result]
    B -->|plain value + capture_output=False| G[no output update]
    C --> H[_ContextPreservedSyncGeneratorWrapper]
    D --> I[_ContextPreservedAsyncGeneratorWrapper]
    E --> I
    H -->|exhausted / close / __del__| J{capture_output?}
    I -->|exhausted / aclose / __del__| J
    J -->|True| K[span.update output=items\nspan.end]
    J -->|False| L[span.end only\nitems list stays empty]
    F --> M[is_return_type_generator=False\nspan.end in finally]
    G --> M
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: langfuse/_client/observe.py
Line: 708-709

Comment:
**Async generator cleanup skipped in `__del__`**

`__del__` calls `_finalize()` to end the span synchronously, but it never calls `await self.generator.aclose()`. Any `finally` blocks in the underlying async generator will therefore not run when the wrapper is abandoned — only Python's asyncgen hook (installed by the event loop) would eventually close it. This is workable as best-effort cleanup, but it means the span-end time and any generator-side resource cleanup are decoupled. A brief docstring or comment noting this limitation would help future readers understand the intentional trade-off.

How can I resolve this? If you propose a fix, please make it concise.

Reviews (1): Last reviewed commit: "fix(observe): finalize abandoned generat..." | Re-trigger Greptile

@hassiebp hassiebp force-pushed the hassieb/fix-observe-streaming-context-without-output-capture branch from 2f60526 to c18d0b4 Compare April 20, 2026 13:57
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2f60526a16

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread langfuse/_client/observe.py Outdated
Comment thread langfuse/_client/observe.py Outdated
Copy link
Copy Markdown

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional findings (outside current diff — PR may have been updated during review):

  • 🟣 langfuse/_client/observe.py:721-734 — This is a pre-existing bug in _ContextPreservedAsyncGeneratorWrapper.__anext__: the inner except TypeError block, intended to detect Python < 3.10 where asyncio.create_task() rejects the context keyword, will also catch any TypeError raised inside the user's async generator body on Python 3.10+. When that happens the exception is silently swallowed, the fallback await self.generator.__anext__() is called on an already-closed generator, which raises StopAsyncIteration, causing the outer handler to call _finalize() and mark the span as successful — so the caller sees a false end-of-stream instead of the actual error. A narrower fix is to probe the Python version once at module level (_ASYNCIO_TASK_CONTEXT_SUPPORTED = sys.version_info >= (3, 10)) and branch on that flag instead of catching TypeError from the task call.

    Extended reasoning...

    What the bug is and how it manifests

    In _ContextPreservedAsyncGeneratorWrapper.__anext__ (observe.py ~721-734) the code uses a try/except pattern to detect whether the running Python version supports the context keyword of asyncio.create_task():

    try:
        item = await asyncio.create_task(
            self.generator.__anext__(), context=self.context
        )
    except TypeError:
        # Python < 3.10 fallback
        item = await self.generator.__anext__()

    On Python 3.10+, asyncio.create_task() fully accepts the context keyword, so any TypeError reaching the except clause can only have come from inside the user's async generator body. That user-facing error is silently swallowed and treated as a compatibility probe failure.

    The specific code path that triggers it

    1. Python >= 3.10 is in use (so create_task(context=...) never raises TypeError itself).
    2. The user's async generator body executes inside the task and raises TypeError (e.g. a type-check failure, bad arithmetic, missing attribute that returns the wrong type, etc.).
    3. The exception propagates from the awaited task to await asyncio.create_task(...).
    4. The inner except TypeError: catches it — incorrectly treating it as the Python < 3.10 compatibility signal.
    5. The fallback await self.generator.__anext__() is called. Because the generator's frame was already unwound by the TypeError, this immediately raises StopAsyncIteration.
    6. The outer except StopAsyncIteration: fires, calls _finalize(), marks the span as successful, and re-raises StopAsyncIteration.
    7. The caller receives StopAsyncIteration — a false end-of-stream signal — and never sees the original TypeError.

    Why existing code does not prevent it

    The except TypeError guard was written for a legitimate purpose (runtime Python version detection) but is too broad. It has no way to distinguish a TypeError that came from create_task's own argument validation versus one that bubbled up from inside the generator coroutine. The outer except (Exception, asyncio.CancelledError) handler that would have correctly called _finalize_with_error() is never reached because the inner handler consumes the exception first.

    What the impact is

    • Any TypeError raised in user generator code is silently lost on Python 3.10+.
    • The associated Langfuse span is recorded as a successful completion rather than an error, so observability data is incorrect and debugging is made significantly harder.
    • The caller receives StopAsyncIteration and may believe the stream ended normally, potentially masking data-integrity issues.

    How to fix it

    Determine Python version support once at module level and branch on that flag instead of catching TypeError:

    _ASYNCIO_TASK_CONTEXT_SUPPORTED = sys.version_info >= (3, 10)
    
    async def __anext__(self) -> Any:
        try:
            if _ASYNCIO_TASK_CONTEXT_SUPPORTED:
                item = await asyncio.create_task(
                    self.generator.__anext__(), context=self.context
                )
            else:
                item = await self.generator.__anext__()
            ...

    This eliminates the ambiguous except TypeError entirely so any exception from the generator body propagates naturally to the outer handlers.

    Step-by-step proof

    async def gen():
        yield 1
        raise TypeError('internal error')   # user code raises TypeError
        yield 2
    
    wrapper = _ContextPreservedAsyncGeneratorWrapper(gen(), ctx, span, True, None)
    
    # Call 1: returns 1 — OK
    item = await wrapper.__anext__()   # → 1
    
    # Call 2 (Python 3.10+):
    #   create_task runs gen.__anext__(), generator raises TypeError('internal error')
    #   → propagates to 'await create_task(...)'
    #   → caught by inner 'except TypeError:'   ← BUG: wrong handler
    #   fallback: await self.generator.__anext__()
    #   → generator is closed, raises StopAsyncIteration
    #   → caught by outer 'except StopAsyncIteration:'
    #   → _finalize() called, span.end() called with no error
    #   → StopAsyncIteration re-raised
    item = await wrapper.__anext__()   # raises StopAsyncIteration  (expected: TypeError)
    # span shows SUCCESS  (expected: ERROR with 'internal error')

@blacksmith-sh

This comment has been minimized.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9ee80d33d6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread langfuse/_client/observe.py Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 03ba99d3a7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +703 to +704
except TypeError:
await self.context.run(asyncio.create_task, self.generator.aclose())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Narrow TypeError fallback in async close

aclose() wraps await asyncio.create_task(..., context=self.context) in except TypeError, so it also catches TypeError raised inside self.generator.aclose() (e.g., from generator cleanup code), not just unsupported context kwargs. In that case the method incorrectly runs the fallback path and can suppress the original cleanup failure, then _finalize() marks the span as successfully ended. This makes real generator-finalization errors invisible in production and can hide broken cleanup logic whenever the async generator raises TypeError during close.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant