fix(openai): preserve native v1 stream contract#1627
Conversation
|
@claude review |
…orasync-object-has-no-attribute
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b5a7071be7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| async def traced_aiter(self: Any) -> Any: | ||
| try: | ||
| async for item in original_aiter(self): | ||
| yield item | ||
| finally: | ||
| finalize_once = getattr(self, "_langfuse_finalize_once", None) | ||
| if finalize_once is not None: | ||
| await finalize_once() | ||
|
|
There was a problem hiding this comment.
🔴 The traced_aiter hook in _install_openai_stream_iteration_hooks is an async generator (async for item in original_aiter(self): yield item), so its finally block does not run synchronously when the caller does async for chunk in stream: break — Python defers async generator finalization to asyncio's sys.set_asyncgen_hooks mechanism (PEP 525), meaning generation.end() is called only after multiple event loop turns or never in non-asyncio environments. By contrast, the sync traced_iter uses yield from original_iter(self), which propagates GeneratorExit synchronously through yield from and runs the finally block immediately on break. The test at line 507-509 acknowledges this with a 5-turn asyncio.sleep(0) workaround that has no correctness guarantee; a langfuse.flush() call immediately after async for chunk in stream: break without that workaround will silently miss the generation end event.
Extended reasoning...
What the bug is and how it manifests
_install_openai_stream_iteration_hooks installs both a sync hook on openai.Stream.__iter__ and an async hook on openai.AsyncStream.__aiter__. The sync hook uses yield from original_iter(self) inside a try/finally block, which means when the outer generator is closed (due to for ... break), Python synchronously propagates GeneratorExit through yield from into the sub-generator, and the finally block calls finalize_once() deterministically. The async hook is implemented as an async generator with async for item in original_aiter(self): yield item, making the finally block's execution dependent on async generator finalization semantics defined by PEP 525 — which are non-deterministic.
The specific code path that triggers it
When a user does async for chunk in stream: break, Python calls stream.__aiter__() which returns the async generator object from traced_aiter. After the first chunk is yielded and break fires, Python needs to close this async generator. Unlike synchronous generators (where for ... break synchronously calls .close()), Python does not synchronously call .aclose() on abandoned async generators. Instead, PEP 525 specifies that async generator finalization is handled by sys.set_asyncgen_hooks, which asyncio registers to schedule .aclose() on future event loop iterations. The finally block in traced_aiter — which calls await finalize_once() — is therefore deferred.
Why existing code does not prevent it
The is_finalized guard and response._langfuse_finalize_once are correctly set up, but they only matter once finalize_once is actually called. The problem is that the async finally block that calls finalize_once is not guaranteed to run promptly. In the sync path, yield from provides a deterministic propagation guarantee — GeneratorExit travels synchronously through the yield from chain and the finally block executes before the break statement returns control to the caller. No equivalent exists for async generators: there is no async yield from in Python.
What the impact would be
In asyncio environments, finalization is deferred by at least one event loop turn (and requires asyncio's asyncgen hooks to be registered). A user who calls langfuse.flush() immediately after async for chunk in stream: break without awaiting multiple event loop turns will miss the generation end event — the span will appear to be incomplete or missing. In non-asyncio environments (trio, anyio with a non-asyncio backend, or any context where asyncio hasn't registered its asyncgen hooks), finalization may never occur at all. The test itself documents this fragility at lines 507-509 with the comment "Async generator finalizers are scheduled across event-loop turns" and the 5x asyncio.sleep(0) heuristic, which has no formal correctness guarantee.
Addressing the refutation
One verifier argues this is a pre-existing limitation because the old LangfuseResponseGeneratorAsync.__aiter__ also used an async generator with the same try/finally pattern. This is accurate: the behavior predates this PR. However, the PR explicitly fixed the sync path (via yield from) to be deterministic and acknowledges the break-early use case with a dedicated test (test_openai_stream_break_still_finalizes_generation). The async counterpart test (test_openai_async_stream_break_still_finalizes_generation) requires the 5x sleep workaround, confirming the asymmetry was introduced consciously but not resolved. The correctness gap is real and the workaround is fragile.
How to fix it
A true fix requires replacing the async generator approach in traced_aiter with one that provides deterministic finalization on break. One approach is to implement a custom async iterator class (not an async generator) with an __anext__ method that detects exhaustion/cancellation and calls finalize_once directly, and an aclose method that also calls finalize_once. Since there is no async equivalent of yield from that propagates GeneratorExit synchronously, any solution based on async generators will have this limitation.
Step-by-step proof
_instrument_openai_async_streamsetsresponse._langfuse_finalize_once = finalize_onceand replacesresponse._iteratorwith an async generatortraced_iterator()._install_openai_stream_iteration_hooksreplacesopenai.AsyncStream.__aiter__with the async generatortraced_aiter.- User writes:
async for chunk in stream: break - Python calls
stream.__aiter__()→ returns async generator object G fromtraced_aiter(stream). - Python fetches first item from G (which fetches from
original_aiter, which reads fromresponse._iterator = traced_iterator()). First chunk yielded. - User
breakfires. Python needs to close G (the async generator fromtraced_aiter). - Per PEP 525 / asyncio: Python does NOT synchronously call
G.aclose(). Instead asyncio's asyncgen finalizer hook is registered andacloseis scheduled for a future event loop turn. - Control returns to the caller immediately.
finalize_oncehas NOT been called.generation.end()has NOT been called. - The test at line 507-509 requires 5x
await asyncio.sleep(0)beforelangfuse.flush()to give asyncio's hook time to scheduleG.aclose(), which then runs thefinallyblock. Without these yields,generation.end()would not be called.
| try: | ||
| openai_response = wrapped(**arg_extractor.get_openai_args()) | ||
|
|
||
| if _is_streaming_response(openai_response): | ||
| if _is_openai_v1() and isinstance(openai_response, openai.Stream): | ||
| return _instrument_openai_stream( | ||
| resource=open_ai_resource, | ||
| response=openai_response, | ||
| generation=generation, | ||
| ) | ||
| elif _is_streaming_response(openai_response): | ||
| return LangfuseResponseGeneratorSync( | ||
| resource=open_ai_resource, | ||
| response=openai_response, |
There was a problem hiding this comment.
🟡 The _is_streaming_response function contains two dead conditions: or (_is_openai_v1() and isinstance(response, openai.Stream)) and the AsyncStream equivalent. These can never be true when the function is called, because both call-sites (_wrap and _wrap_async) only reach the elif _is_streaming_response(...) branch after the preceding isinstance(openai_response, openai.Stream/AsyncStream) checks have already returned False. The dead conditions create a misleading impression that _is_streaming_response is the canonical handler for native OpenAI v1 streams; a future developer removing the explicit isinstance checks while relying on _is_streaming_response as a unified check would silently lose native-stream instrumentation.
Extended reasoning...
What the dead code is and how it manifests
_is_streaming_response (lines 830–835) includes four conditions joined by or. The first two (isinstance(response, types.GeneratorType) and isinstance(response, types.AsyncGeneratorType)) are live: they handle non-OpenAI generator-style streams used by the fallback wrapper path. The last two — or (_is_openai_v1() and isinstance(response, openai.Stream)) and or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) — are always False at the only two call sites and therefore constitute dead code.
The specific code path that makes them unreachable
In _wrap (lines 1048–1060), the control flow is:
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
return _instrument_openai_stream(...)
elif _is_streaming_response(openai_response):
...
The elif is only reached when the if branch did not execute, meaning (_is_openai_v1() and isinstance(openai_response, openai.Stream)) is False. Inside _is_streaming_response, the third condition — _is_openai_v1() and isinstance(response, openai.Stream) — is exactly the same predicate, so it is always False when evaluated. The same logic applies symmetrically for AsyncStream in _wrap_async (lines 1125–1134). This PR is entirely responsible for introducing this dead state: before it, the if isinstance(openai_response, openai.Stream) checks did not exist, and the _is_streaming_response conditions were the sole handlers.
Why existing code does not prevent the confusion
There is no comment or refactoring to signal that the Stream/AsyncStream branches inside _is_streaming_response are now stale. The function reads as if it is the single authoritative place that decides whether a response is a streaming response, including native OpenAI v1 streams. This is no longer true after this PR's changes.
What the impact would be
There is no runtime impact today. The dead conditions never fire, and the correct instrumentation paths (_instrument_openai_stream / _instrument_openai_async_stream) are invoked by the explicit isinstance checks in _wrap/_wrap_async. The risk is purely a maintenance one: a future developer reading _is_streaming_response in isolation might trust it as the canonical streaming check, remove the explicit isinstance guards in _wrap/_wrap_async (perhaps while trying to simplify the branching), and silently break native-stream instrumentation — with no failing tests unless stream-specific tests are in place.
Addressing the refutation
One verifier noted this is benign dead code with no incorrect runtime behavior. That is accurate, and severity nit reflects it. However, the misleading impression the code creates is a genuine maintenance hazard introduced directly by this PR's refactoring, making it worth flagging even though it is not blocking.
How to fix it
Remove the last two conditions from _is_streaming_response:
def _is_streaming_response(response: Any) -> bool:
return (
isinstance(response, types.GeneratorType)
or isinstance(response, types.AsyncGeneratorType)
)This makes the function's actual contract explicit: it only handles legacy generator-style streams. Native OpenAI v1 streams are handled by the explicit isinstance checks in _wrap/_wrap_async.
Summary
Fix OpenAI v1 streaming instrumentation to preserve the original
openai.Streamandopenai.AsyncStreamobjects instead of replacing them with Langfuse wrapper types.This keeps the native stream contract intact for
_iterator,.response,close()/aclose(),async for, and manual__anext__()while still collecting streamed chunks for Langfuse generation updates.Changes
async forand__anext__()Verification
uv run --frozen ruff check langfuse/openai.py tests/unit/test_openai.pyuv run --frozen pytest tests/unit/test_openai.pyRefs LFE-8788.
Disclaimer: Experimental PR review
Greptile Summary
This PR fixes OpenAI v1 streaming instrumentation by patching
_iteratorandclose/acloseon the nativeopenai.Streamandopenai.AsyncStreamobjects in place, rather than replacing them with Langfuse wrapper types. The approach is sound:_iteratorreplacement correctly intercepts all iteration paths (__iter__,__next__,__anext__,async for) as confirmed by the SDK's implementation, and theis_finalizedflag reliably prevents double-finalization.Confidence Score: 5/5
Safe to merge — core patching logic is correct, double-finalization is guarded, and tests cover all three iteration patterns.
All remaining findings are P2 style suggestions. The
_finalize_stream_response_asyncwrapper is harmless. No P0/P1 logic bugs found; the_iteratorreplacement approach is confirmed sound by the OpenAI SDK's implementation.No files require special attention.
Important Files Changed
_instrument_openai_stream/_instrument_openai_async_streamto monkey-patch_iteratorandclose/acloseon nativeopenai.Streamobjects, preserving type identity while still collecting chunks for Langfuse; logic is sound but_finalize_stream_response_asyncis a no-op async wrapper.__anext__usage;DummyOpenAIStream/DummyOpenAIAsyncStreamcorrectly bypasssuper().__init__and set_iteratordirectly, which is appropriate for unit-level testing.Sequence Diagram
sequenceDiagram participant User participant openai.Stream participant traced_iterator (generator) participant raw_iterator participant finalize_once participant LangfuseGeneration Note over User,LangfuseGeneration: _instrument_openai_stream patches _iterator and close in place User->>openai.Stream: for chunk in stream (or next()) openai.Stream->>traced_iterator (generator): __next__() via self._iterator traced_iterator (generator)->>raw_iterator: next item raw_iterator-->>traced_iterator (generator): chunk traced_iterator (generator)->>traced_iterator (generator): items.append(chunk), set completion_start_time traced_iterator (generator)-->>openai.Stream: yield chunk openai.Stream-->>User: chunk Note over traced_iterator (generator): On exhaustion or close() traced_iterator (generator)->>finalize_once: finally block (is_finalized guard) finalize_once->>LangfuseGeneration: _finalize_stream_response → generation.end() alt User calls stream.close() explicitly User->>openai.Stream: close() [patched to traced_close] openai.Stream->>openai.Stream: await original_close() (closes HTTP) openai.Stream->>finalize_once: finally in traced_close finalize_once->>finalize_once: is_finalized=True → no-op if already finalized endReviews (1): Last reviewed commit: "fix(openai): preserve native v1 stream c..." | Re-trigger Greptile