Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 261 additions & 46 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from inspect import isclass
from inspect import isawaitable, isclass
from typing import Any, Optional, cast

from openai._types import NotGiven
Expand Down Expand Up @@ -830,6 +830,191 @@
)


_openai_stream_iter_hook_installed = False


def _install_openai_stream_iteration_hooks() -> None:
global _openai_stream_iter_hook_installed

if not _is_openai_v1():
return

if not _openai_stream_iter_hook_installed:
original_iter = openai.Stream.__iter__
original_aiter = openai.AsyncStream.__aiter__

def traced_iter(self: Any) -> Any:
try:
yield from original_iter(self)
finally:
finalize_once = getattr(self, "_langfuse_finalize_once", None)
if finalize_once is not None:
finalize_once()

async def traced_aiter(self: Any) -> Any:
try:
async for item in original_aiter(self):
yield item
finally:
finalize_once = getattr(self, "_langfuse_finalize_once", None)
if finalize_once is not None:
await finalize_once()

Check failure on line 862 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Async break finalization non-deterministic: traced_aiter defers finalization to asyncio asyncgen hooks

The `traced_aiter` hook in `_install_openai_stream_iteration_hooks` is an async generator (`async for item in original_aiter(self): yield item`), so its `finally` block does **not** run synchronously when the caller does `async for chunk in stream: break` — Python defers async generator finalization to asyncio's `sys.set_asyncgen_hooks` mechanism (PEP 525), meaning `generation.end()` is called only after multiple event loop turns or never in non-asyncio environments. By contrast, the sync `trace
Comment on lines +854 to +862
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The traced_aiter hook in _install_openai_stream_iteration_hooks is an async generator (async for item in original_aiter(self): yield item), so its finally block does not run synchronously when the caller does async for chunk in stream: break — Python defers async generator finalization to asyncio's sys.set_asyncgen_hooks mechanism (PEP 525), meaning generation.end() is called only after multiple event loop turns or never in non-asyncio environments. By contrast, the sync traced_iter uses yield from original_iter(self), which propagates GeneratorExit synchronously through yield from and runs the finally block immediately on break. The test at line 507-509 acknowledges this with a 5-turn asyncio.sleep(0) workaround that has no correctness guarantee; a langfuse.flush() call immediately after async for chunk in stream: break without that workaround will silently miss the generation end event.

Extended reasoning...

What the bug is and how it manifests

_install_openai_stream_iteration_hooks installs both a sync hook on openai.Stream.__iter__ and an async hook on openai.AsyncStream.__aiter__. The sync hook uses yield from original_iter(self) inside a try/finally block, which means when the outer generator is closed (due to for ... break), Python synchronously propagates GeneratorExit through yield from into the sub-generator, and the finally block calls finalize_once() deterministically. The async hook is implemented as an async generator with async for item in original_aiter(self): yield item, making the finally block's execution dependent on async generator finalization semantics defined by PEP 525 — which are non-deterministic.

The specific code path that triggers it

When a user does async for chunk in stream: break, Python calls stream.__aiter__() which returns the async generator object from traced_aiter. After the first chunk is yielded and break fires, Python needs to close this async generator. Unlike synchronous generators (where for ... break synchronously calls .close()), Python does not synchronously call .aclose() on abandoned async generators. Instead, PEP 525 specifies that async generator finalization is handled by sys.set_asyncgen_hooks, which asyncio registers to schedule .aclose() on future event loop iterations. The finally block in traced_aiter — which calls await finalize_once() — is therefore deferred.

Why existing code does not prevent it

The is_finalized guard and response._langfuse_finalize_once are correctly set up, but they only matter once finalize_once is actually called. The problem is that the async finally block that calls finalize_once is not guaranteed to run promptly. In the sync path, yield from provides a deterministic propagation guarantee — GeneratorExit travels synchronously through the yield from chain and the finally block executes before the break statement returns control to the caller. No equivalent exists for async generators: there is no async yield from in Python.

What the impact would be

In asyncio environments, finalization is deferred by at least one event loop turn (and requires asyncio's asyncgen hooks to be registered). A user who calls langfuse.flush() immediately after async for chunk in stream: break without awaiting multiple event loop turns will miss the generation end event — the span will appear to be incomplete or missing. In non-asyncio environments (trio, anyio with a non-asyncio backend, or any context where asyncio hasn't registered its asyncgen hooks), finalization may never occur at all. The test itself documents this fragility at lines 507-509 with the comment "Async generator finalizers are scheduled across event-loop turns" and the 5x asyncio.sleep(0) heuristic, which has no formal correctness guarantee.

Addressing the refutation

One verifier argues this is a pre-existing limitation because the old LangfuseResponseGeneratorAsync.__aiter__ also used an async generator with the same try/finally pattern. This is accurate: the behavior predates this PR. However, the PR explicitly fixed the sync path (via yield from) to be deterministic and acknowledges the break-early use case with a dedicated test (test_openai_stream_break_still_finalizes_generation). The async counterpart test (test_openai_async_stream_break_still_finalizes_generation) requires the 5x sleep workaround, confirming the asymmetry was introduced consciously but not resolved. The correctness gap is real and the workaround is fragile.

How to fix it

A true fix requires replacing the async generator approach in traced_aiter with one that provides deterministic finalization on break. One approach is to implement a custom async iterator class (not an async generator) with an __anext__ method that detects exhaustion/cancellation and calls finalize_once directly, and an aclose method that also calls finalize_once. Since there is no async equivalent of yield from that propagates GeneratorExit synchronously, any solution based on async generators will have this limitation.

Step-by-step proof

  1. _instrument_openai_async_stream sets response._langfuse_finalize_once = finalize_once and replaces response._iterator with an async generator traced_iterator().
  2. _install_openai_stream_iteration_hooks replaces openai.AsyncStream.__aiter__ with the async generator traced_aiter.
  3. User writes: async for chunk in stream: break
  4. Python calls stream.__aiter__() → returns async generator object G from traced_aiter(stream).
  5. Python fetches first item from G (which fetches from original_aiter, which reads from response._iterator = traced_iterator()). First chunk yielded.
  6. User break fires. Python needs to close G (the async generator from traced_aiter).
  7. Per PEP 525 / asyncio: Python does NOT synchronously call G.aclose(). Instead asyncio's asyncgen finalizer hook is registered and aclose is scheduled for a future event loop turn.
  8. Control returns to the caller immediately. finalize_once has NOT been called. generation.end() has NOT been called.
  9. The test at line 507-509 requires 5x await asyncio.sleep(0) before langfuse.flush() to give asyncio's hook time to schedule G.aclose(), which then runs the finally block. Without these yields, generation.end() would not be called.

setattr(openai.Stream, "__iter__", traced_iter)
setattr(openai.AsyncStream, "__aiter__", traced_aiter)
_openai_stream_iter_hook_installed = True
Comment thread
claude[bot] marked this conversation as resolved.


def _finalize_stream_response(
*,
resource: OpenAiDefinition,
items: list[Any],
generation: LangfuseGeneration,
completion_start_time: Optional[datetime],
) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(items)
if resource.object == "Responses" or resource.object == "AsyncResponses"
else _extract_streamed_openai_response(resource, items)
)

_create_langfuse_update(
completion,
generation,
completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
generation.end()


def _instrument_openai_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorSync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]

def traced_iterator() -> Any:
nonlocal completion_start_time
try:
for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
finalize_once()

def traced_close() -> Any:
try:
return close()
finally:
finalize_once()

response._iterator = traced_iterator()
response.close = traced_close
Comment thread
claude[bot] marked this conversation as resolved.

return response


def _instrument_openai_async_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorAsync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

async def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]

async def traced_iterator() -> Any:
nonlocal completion_start_time
try:
async for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
await finalize_once()

async def traced_close() -> Any:
try:
return await close()
finally:
await finalize_once()

async def traced_aclose() -> Any:
return await traced_close()

response._iterator = traced_iterator()
response.close = traced_close
response.aclose = traced_aclose
Comment thread
hassiebp marked this conversation as resolved.

return response


@_langfuse_wrapper
def _wrap(
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
Expand Down Expand Up @@ -860,13 +1045,19 @@
prompt=langfuse_data.get("prompt", None),
)

try:
openai_response = wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
return _instrument_openai_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorSync(
resource=open_ai_resource,
response=openai_response,

Check warning on line 1060 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Dead code in _is_streaming_response: openai.Stream/AsyncStream conditions unreachable after PR

The `_is_streaming_response` function contains two dead conditions: `or (_is_openai_v1() and isinstance(response, openai.Stream))` and the `AsyncStream` equivalent. These can never be true when the function is called, because both call-sites (`_wrap` and `_wrap_async`) only reach the `elif _is_streaming_response(...)` branch after the preceding `isinstance(openai_response, openai.Stream/AsyncStream)` checks have already returned False. The dead conditions create a misleading impression that `_is
Comment on lines 1048 to 1060
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The _is_streaming_response function contains two dead conditions: or (_is_openai_v1() and isinstance(response, openai.Stream)) and the AsyncStream equivalent. These can never be true when the function is called, because both call-sites (_wrap and _wrap_async) only reach the elif _is_streaming_response(...) branch after the preceding isinstance(openai_response, openai.Stream/AsyncStream) checks have already returned False. The dead conditions create a misleading impression that _is_streaming_response is the canonical handler for native OpenAI v1 streams; a future developer removing the explicit isinstance checks while relying on _is_streaming_response as a unified check would silently lose native-stream instrumentation.

Extended reasoning...

What the dead code is and how it manifests

_is_streaming_response (lines 830–835) includes four conditions joined by or. The first two (isinstance(response, types.GeneratorType) and isinstance(response, types.AsyncGeneratorType)) are live: they handle non-OpenAI generator-style streams used by the fallback wrapper path. The last two — or (_is_openai_v1() and isinstance(response, openai.Stream)) and or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) — are always False at the only two call sites and therefore constitute dead code.

The specific code path that makes them unreachable

In _wrap (lines 1048–1060), the control flow is:

if _is_openai_v1() and isinstance(openai_response, openai.Stream):
    return _instrument_openai_stream(...)
elif _is_streaming_response(openai_response):
    ...

The elif is only reached when the if branch did not execute, meaning (_is_openai_v1() and isinstance(openai_response, openai.Stream)) is False. Inside _is_streaming_response, the third condition — _is_openai_v1() and isinstance(response, openai.Stream) — is exactly the same predicate, so it is always False when evaluated. The same logic applies symmetrically for AsyncStream in _wrap_async (lines 1125–1134). This PR is entirely responsible for introducing this dead state: before it, the if isinstance(openai_response, openai.Stream) checks did not exist, and the _is_streaming_response conditions were the sole handlers.

Why existing code does not prevent the confusion

There is no comment or refactoring to signal that the Stream/AsyncStream branches inside _is_streaming_response are now stale. The function reads as if it is the single authoritative place that decides whether a response is a streaming response, including native OpenAI v1 streams. This is no longer true after this PR's changes.

What the impact would be

There is no runtime impact today. The dead conditions never fire, and the correct instrumentation paths (_instrument_openai_stream / _instrument_openai_async_stream) are invoked by the explicit isinstance checks in _wrap/_wrap_async. The risk is purely a maintenance one: a future developer reading _is_streaming_response in isolation might trust it as the canonical streaming check, remove the explicit isinstance guards in _wrap/_wrap_async (perhaps while trying to simplify the branching), and silently break native-stream instrumentation — with no failing tests unless stream-specific tests are in place.

Addressing the refutation

One verifier noted this is benign dead code with no incorrect runtime behavior. That is accurate, and severity nit reflects it. However, the misleading impression the code creates is a genuine maintenance hazard introduced directly by this PR's refactoring, making it worth flagging even though it is not blocking.

How to fix it

Remove the last two conditions from _is_streaming_response:

def _is_streaming_response(response: Any) -> bool:
    return (
        isinstance(response, types.GeneratorType)
        or isinstance(response, types.AsyncGeneratorType)
    )

This makes the function's actual contract explicit: it only handles legacy generator-style streams. Native OpenAI v1 streams are handled by the explicit isinstance checks in _wrap/_wrap_async.

generation=generation,
)

Expand Down Expand Up @@ -934,7 +1125,13 @@
try:
openai_response = await wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
return _instrument_openai_async_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorAsync(
resource=open_ai_resource,
response=openai_response,
Expand Down Expand Up @@ -994,6 +1191,7 @@


register_tracing()
_install_openai_stream_iteration_hooks()


class LangfuseResponseGeneratorSync:
Expand All @@ -1010,6 +1208,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

def __iter__(self) -> Any:
try:
Expand Down Expand Up @@ -1042,29 +1241,28 @@
return self.__iter__()

def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass
self.close()

def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)
def close(self) -> None:
close = getattr(self.response, "close", None)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
try:
if callable(close):
close()
finally:
self.generation.end()
self._finalize()

def _finalize(self) -> None:
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)


class LangfuseResponseGeneratorAsync:
Expand All @@ -1081,6 +1279,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

async def __aiter__(self) -> Any:
try:
Expand Down Expand Up @@ -1113,40 +1312,56 @@
return self.__aiter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass
await self.aclose()

async def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)

async def close(self) -> None:
"""Close the response and release the connection.

Automatically called if the response body is read to completion.
"""
await self.response.close()
close = getattr(self.response, "close", None)
aclose = getattr(self.response, "aclose", None)

try:
if callable(close):
result = close()
if isawaitable(result):
await result
elif callable(aclose):
result = aclose()
if isawaitable(result):
await result
finally:
await self._finalize()

async def aclose(self) -> None:
"""Close the response and release the connection.

Automatically called if the response body is read to completion.
"""
await self.response.aclose()
aclose = getattr(self.response, "aclose", None)
close = getattr(self.response, "close", None)

try:
if callable(aclose):
result = aclose()
if isawaitable(result):
await result
elif callable(close):
result = close()
if isawaitable(result):
await result
finally:
await self._finalize()
Loading
Loading