Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 209 additions & 42 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,178 @@
)


_openai_stream_iter_hook_installed = False


def _install_openai_stream_iteration_hooks() -> None:
global _openai_stream_iter_hook_installed

if not _is_openai_v1():
return

if not _openai_stream_iter_hook_installed:
original_iter = openai.Stream.__iter__

def traced_iter(self: Any) -> Any:
try:
yield from original_iter(self)
finally:
finalize_once = getattr(self, "_langfuse_finalize_once", None)
if finalize_once is not None:
finalize_once()

setattr(openai.Stream, "__iter__", traced_iter)
_openai_stream_iter_hook_installed = True
Comment thread
claude[bot] marked this conversation as resolved.


def _finalize_stream_response(
*,
resource: OpenAiDefinition,
items: list[Any],
generation: LangfuseGeneration,
completion_start_time: Optional[datetime],
) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(items)
if resource.object == "Responses" or resource.object == "AsyncResponses"
else _extract_streamed_openai_response(resource, items)
)

_create_langfuse_update(
completion,
generation,
completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
generation.end()


def _instrument_openai_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorSync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]

def traced_iterator() -> Any:
nonlocal completion_start_time
try:
for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
finalize_once()

def traced_close() -> Any:
try:
return close()
finally:
finalize_once()

response._iterator = traced_iterator()
response.close = traced_close
Comment thread
claude[bot] marked this conversation as resolved.

return response


def _instrument_openai_async_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorAsync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

async def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

async def traced_iterator() -> Any:
nonlocal completion_start_time
try:
async for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
await finalize_once()

async def traced_close() -> Any:
try:
return await close()
finally:
await finalize_once()

async def traced_aclose() -> Any:
return await traced_close()

response._iterator = traced_iterator()
response.close = traced_close
response.aclose = traced_aclose
Comment thread
hassiebp marked this conversation as resolved.

return response


@_langfuse_wrapper
def _wrap(
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
Expand Down Expand Up @@ -863,7 +1035,13 @@
try:
openai_response = wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
return _instrument_openai_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorSync(
resource=open_ai_resource,
response=openai_response,
Comment on lines 1048 to 1060
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The _is_streaming_response function contains two dead conditions: or (_is_openai_v1() and isinstance(response, openai.Stream)) and the AsyncStream equivalent. These can never be true when the function is called, because both call-sites (_wrap and _wrap_async) only reach the elif _is_streaming_response(...) branch after the preceding isinstance(openai_response, openai.Stream/AsyncStream) checks have already returned False. The dead conditions create a misleading impression that _is_streaming_response is the canonical handler for native OpenAI v1 streams; a future developer removing the explicit isinstance checks while relying on _is_streaming_response as a unified check would silently lose native-stream instrumentation.

Extended reasoning...

What the dead code is and how it manifests

_is_streaming_response (lines 830–835) includes four conditions joined by or. The first two (isinstance(response, types.GeneratorType) and isinstance(response, types.AsyncGeneratorType)) are live: they handle non-OpenAI generator-style streams used by the fallback wrapper path. The last two — or (_is_openai_v1() and isinstance(response, openai.Stream)) and or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) — are always False at the only two call sites and therefore constitute dead code.

The specific code path that makes them unreachable

In _wrap (lines 1048–1060), the control flow is:

if _is_openai_v1() and isinstance(openai_response, openai.Stream):
    return _instrument_openai_stream(...)
elif _is_streaming_response(openai_response):
    ...

The elif is only reached when the if branch did not execute, meaning (_is_openai_v1() and isinstance(openai_response, openai.Stream)) is False. Inside _is_streaming_response, the third condition — _is_openai_v1() and isinstance(response, openai.Stream) — is exactly the same predicate, so it is always False when evaluated. The same logic applies symmetrically for AsyncStream in _wrap_async (lines 1125–1134). This PR is entirely responsible for introducing this dead state: before it, the if isinstance(openai_response, openai.Stream) checks did not exist, and the _is_streaming_response conditions were the sole handlers.

Why existing code does not prevent the confusion

There is no comment or refactoring to signal that the Stream/AsyncStream branches inside _is_streaming_response are now stale. The function reads as if it is the single authoritative place that decides whether a response is a streaming response, including native OpenAI v1 streams. This is no longer true after this PR's changes.

What the impact would be

There is no runtime impact today. The dead conditions never fire, and the correct instrumentation paths (_instrument_openai_stream / _instrument_openai_async_stream) are invoked by the explicit isinstance checks in _wrap/_wrap_async. The risk is purely a maintenance one: a future developer reading _is_streaming_response in isolation might trust it as the canonical streaming check, remove the explicit isinstance guards in _wrap/_wrap_async (perhaps while trying to simplify the branching), and silently break native-stream instrumentation — with no failing tests unless stream-specific tests are in place.

Addressing the refutation

One verifier noted this is benign dead code with no incorrect runtime behavior. That is accurate, and severity nit reflects it. However, the misleading impression the code creates is a genuine maintenance hazard introduced directly by this PR's refactoring, making it worth flagging even though it is not blocking.

How to fix it

Remove the last two conditions from _is_streaming_response:

def _is_streaming_response(response: Any) -> bool:
    return (
        isinstance(response, types.GeneratorType)
        or isinstance(response, types.AsyncGeneratorType)
    )

This makes the function's actual contract explicit: it only handles legacy generator-style streams. Native OpenAI v1 streams are handled by the explicit isinstance checks in _wrap/_wrap_async.

Expand Down Expand Up @@ -934,7 +1112,13 @@
try:
openai_response = await wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
return _instrument_openai_async_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorAsync(
resource=open_ai_resource,
response=openai_response,
Expand Down Expand Up @@ -994,6 +1178,7 @@


register_tracing()
_install_openai_stream_iteration_hooks()


class LangfuseResponseGeneratorSync:
Expand All @@ -1010,6 +1195,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

def __iter__(self) -> Any:
try:
Expand Down Expand Up @@ -1045,26 +1231,16 @@
pass

def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)


class LangfuseResponseGeneratorAsync:
Expand All @@ -1081,6 +1257,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

async def __aiter__(self) -> Any:
try:
Expand Down Expand Up @@ -1113,32 +1290,22 @@
return self.__aiter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass

async def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)

async def close(self) -> None:
"""Close the response and release the connection.

Check notice on line 1308 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Fallback async wrapper close/exit paths skip _finalize

The fallback async wrapper's `close()`, `aclose()`, and `__aexit__` methods never call `self._finalize()`, so if a user receives a `LangfuseResponseGeneratorAsync` wrapper (for non-OpenAI generator-style async streams) and closes or context-manages it without iterating, `generation.end()` is never called and the Langfuse span leaks. The same issue exists for `LangfuseResponseGeneratorSync.__exit__`. This is a pre-existing issue that predates this PR; the PR's `_is_finalized` guard improvements t
Comment thread
claude[bot] marked this conversation as resolved.
Outdated

Automatically called if the response body is read to completion.
"""
Expand Down
Loading
Loading