From b81888599336960a0b53f33f31017b0462146208 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Wed, 15 Apr 2026 17:00:06 +0200 Subject: [PATCH 1/5] fix(openai): preserve native v1 stream contract --- langfuse/openai.py | 229 +++++++++++++++++++++++++++++++------- tests/unit/test_openai.py | 201 +++++++++++++++++++++++++++++++++ 2 files changed, 388 insertions(+), 42 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 16d293e73..8b165d6e1 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -830,6 +830,167 @@ def _is_streaming_response(response: Any) -> bool: ) +def _finalize_stream_response( + *, + resource: OpenAiDefinition, + items: list[Any], + generation: LangfuseGeneration, + completion_start_time: Optional[datetime], +) -> None: + try: + model, completion, usage, metadata = ( + _extract_streamed_response_api_response(items) + if resource.object == "Responses" or resource.object == "AsyncResponses" + else _extract_streamed_openai_response(resource, items) + ) + + _create_langfuse_update( + completion, + generation, + completion_start_time, + model=model, + usage=usage, + metadata=metadata, + ) + except Exception: + pass + finally: + generation.end() + + +async def _finalize_stream_response_async( + *, + resource: OpenAiDefinition, + items: list[Any], + generation: LangfuseGeneration, + completion_start_time: Optional[datetime], +) -> None: + _finalize_stream_response( + resource=resource, + items=items, + generation=generation, + completion_start_time=completion_start_time, + ) + + +def _instrument_openai_stream( + *, + resource: OpenAiDefinition, + response: Any, + generation: LangfuseGeneration, +) -> Any: + if not hasattr(response, "_iterator"): + return LangfuseResponseGeneratorSync( + resource=resource, + response=response, + generation=generation, + ) + + items: list[Any] = [] + raw_iterator = response._iterator + completion_start_time: Optional[datetime] = None + is_finalized = False + close = response.close + + def finalize_once() -> None: + nonlocal is_finalized + if is_finalized: + return + + is_finalized = True + _finalize_stream_response( + resource=resource, + items=items, + generation=generation, + completion_start_time=completion_start_time, + ) + + def traced_iterator() -> Any: + nonlocal completion_start_time + try: + for item in raw_iterator: + items.append(item) + + if completion_start_time is None: + completion_start_time = _get_timestamp() + + yield item + finally: + finalize_once() + + def traced_close() -> Any: + try: + return close() + finally: + finalize_once() + + response._iterator = traced_iterator() + response.close = traced_close + + return response + + +def _instrument_openai_async_stream( + *, + resource: OpenAiDefinition, + response: Any, + generation: LangfuseGeneration, +) -> Any: + if not hasattr(response, "_iterator"): + return LangfuseResponseGeneratorAsync( + resource=resource, + response=response, + generation=generation, + ) + + items: list[Any] = [] + raw_iterator = response._iterator + completion_start_time: Optional[datetime] = None + is_finalized = False + close = response.close + + async def finalize_once() -> None: + nonlocal is_finalized + if is_finalized: + return + + is_finalized = True + await _finalize_stream_response_async( + resource=resource, + items=items, + generation=generation, + completion_start_time=completion_start_time, + ) + + async def traced_iterator() -> Any: + nonlocal completion_start_time + try: + async for item in raw_iterator: + items.append(item) + + if completion_start_time is None: + completion_start_time = _get_timestamp() + + yield item + finally: + await finalize_once() + + async def traced_close() -> Any: + try: + return await close() + finally: + await finalize_once() + + async def traced_aclose() -> Any: + return await traced_close() + + response._iterator = traced_iterator() + response.close = traced_close + response.aclose = traced_aclose + + return response + + @_langfuse_wrapper def _wrap( open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any @@ -863,7 +1024,13 @@ def _wrap( try: openai_response = wrapped(**arg_extractor.get_openai_args()) - if _is_streaming_response(openai_response): + if _is_openai_v1() and isinstance(openai_response, openai.Stream): + return _instrument_openai_stream( + resource=open_ai_resource, + response=openai_response, + generation=generation, + ) + elif _is_streaming_response(openai_response): return LangfuseResponseGeneratorSync( resource=open_ai_resource, response=openai_response, @@ -934,7 +1101,13 @@ async def _wrap_async( try: openai_response = await wrapped(**arg_extractor.get_openai_args()) - if _is_streaming_response(openai_response): + if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream): + return _instrument_openai_async_stream( + resource=open_ai_resource, + response=openai_response, + generation=generation, + ) + elif _is_streaming_response(openai_response): return LangfuseResponseGeneratorAsync( resource=open_ai_resource, response=openai_response, @@ -1045,26 +1218,12 @@ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: pass def _finalize(self) -> None: - try: - model, completion, usage, metadata = ( - _extract_streamed_response_api_response(self.items) - if self.resource.object == "Responses" - or self.resource.object == "AsyncResponses" - else _extract_streamed_openai_response(self.resource, self.items) - ) - - _create_langfuse_update( - completion, - self.generation, - self.completion_start_time, - model=model, - usage=usage, - metadata=metadata, - ) - except Exception: - pass - finally: - self.generation.end() + _finalize_stream_response( + resource=self.resource, + items=self.items, + generation=self.generation, + completion_start_time=self.completion_start_time, + ) class LangfuseResponseGeneratorAsync: @@ -1116,26 +1275,12 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None pass async def _finalize(self) -> None: - try: - model, completion, usage, metadata = ( - _extract_streamed_response_api_response(self.items) - if self.resource.object == "Responses" - or self.resource.object == "AsyncResponses" - else _extract_streamed_openai_response(self.resource, self.items) - ) - - _create_langfuse_update( - completion, - self.generation, - self.completion_start_time, - model=model, - usage=usage, - metadata=metadata, - ) - except Exception: - pass - finally: - self.generation.end() + await _finalize_stream_response_async( + resource=self.resource, + items=self.items, + generation=self.generation, + completion_start_time=self.completion_start_time, + ) async def close(self) -> None: """Close the response and release the connection. diff --git a/tests/unit/test_openai.py b/tests/unit/test_openai.py index 6ef51ff54..1b993631d 100644 --- a/tests/unit/test_openai.py +++ b/tests/unit/test_openai.py @@ -7,6 +7,75 @@ from langfuse.openai import openai as lf_openai +class DummySyncResponse: + def __init__(self) -> None: + self.closed = False + + def close(self) -> None: + self.closed = True + + +class DummyAsyncResponse: + def __init__(self) -> None: + self.closed = False + + async def aclose(self) -> None: + self.closed = True + + +class DummyOpenAIStream(lf_openai.Stream): + def __init__(self, items, response) -> None: + self.response = response + self._iterator = iter(items) + + +class DummyOpenAIAsyncStream(lf_openai.AsyncStream): + def __init__(self, items, response) -> None: + self.response = response + self._iterator = self._stream(items) + + async def _stream(self, items): + for item in items: + yield item + + +def _make_chat_stream_chunks(): + usage = SimpleNamespace(prompt_tokens=3, completion_tokens=1, total_tokens=4) + + return [ + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role="assistant", + content="2", + function_call=None, + tool_calls=None, + ), + finish_reason=None, + ) + ], + usage=None, + ), + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role=None, + content=None, + function_call=None, + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=usage, + ), + ] + + def test_chat_completion_exports_generation_span( langfuse_memory_client, get_span, json_attr ): @@ -161,6 +230,48 @@ def test_chat_completion_error_marks_generation_error(langfuse_memory_client, ge assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in span.attributes +def test_openai_stream_preserves_original_stream_contract( + langfuse_memory_client, get_span, json_attr +): + openai_client = lf_openai.OpenAI(api_key="test") + raw_response = DummySyncResponse() + raw_stream = DummyOpenAIStream(_make_chat_stream_chunks(), raw_response) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = openai_client.chat.completions.create( + name="unit-openai-native-stream", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + assert stream is raw_stream + assert isinstance(stream, lf_openai.Stream) + assert stream.response is raw_response + + chunks = list(stream) + stream.close() + + assert len(chunks) == 2 + assert raw_response.closed is True + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-stream") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop" + assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == { + "prompt_tokens": 3, + "completion_tokens": 1, + "total_tokens": 4, + } + + @pytest.mark.asyncio async def test_async_chat_completion_exports_generation_span( langfuse_memory_client, get_span, json_attr @@ -206,6 +317,96 @@ async def test_async_chat_completion_exports_generation_span( } +@pytest.mark.asyncio +async def test_openai_async_stream_preserves_original_stream_contract( + langfuse_memory_client, get_span, json_attr +): + openai_client = lf_openai.AsyncOpenAI(api_key="test") + raw_response = DummyAsyncResponse() + raw_stream = DummyOpenAIAsyncStream(_make_chat_stream_chunks(), raw_response) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = await openai_client.chat.completions.create( + name="unit-openai-native-async-stream", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + assert stream is raw_stream + assert isinstance(stream, lf_openai.AsyncStream) + assert stream.response is raw_response + assert hasattr(stream, "aclose") + + chunks = [] + async for chunk in stream: + chunks.append(chunk) + + await stream.aclose() + + assert len(chunks) == 2 + assert raw_response.closed is True + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-async-stream") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop" + assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == { + "prompt_tokens": 3, + "completion_tokens": 1, + "total_tokens": 4, + } + + +@pytest.mark.asyncio +async def test_openai_async_stream_supports_anext( + langfuse_memory_client, get_span, json_attr +): + openai_client = lf_openai.AsyncOpenAI(api_key="test") + raw_stream = DummyOpenAIAsyncStream( + _make_chat_stream_chunks(), DummyAsyncResponse() + ) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = await openai_client.chat.completions.create( + name="unit-openai-native-async-anext", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + first = await stream.__anext__() + second = await stream.__anext__() + + assert first.choices[0].delta.content == "2" + assert second.choices[0].finish_reason == "stop" + + with pytest.raises(StopAsyncIteration): + await stream.__anext__() + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-async-anext") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop" + assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == { + "prompt_tokens": 3, + "completion_tokens": 1, + "total_tokens": 4, + } + + def test_embedding_exports_dimensions_and_count( langfuse_memory_client, get_span, json_attr ): From 2dcc02b984d37e7a0bcf256299369dd30caf4103 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 16 Apr 2026 11:29:41 +0200 Subject: [PATCH 2/5] fix(openai): finalize sync stream on early break --- langfuse/openai.py | 46 ++++++++++++++++++++++++--------------- tests/unit/test_openai.py | 32 +++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 17 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 8b165d6e1..c242cc036 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -830,6 +830,30 @@ def _is_streaming_response(response: Any) -> bool: ) +_openai_stream_iter_hook_installed = False + + +def _install_openai_stream_iteration_hooks() -> None: + global _openai_stream_iter_hook_installed + + if not _is_openai_v1(): + return + + if not _openai_stream_iter_hook_installed: + original_iter = openai.Stream.__iter__ + + def traced_iter(self: Any) -> Any: + try: + yield from original_iter(self) + finally: + finalize_once = getattr(self, "_langfuse_finalize_once", None) + if finalize_once is not None: + finalize_once() + + openai.Stream.__iter__ = traced_iter + _openai_stream_iter_hook_installed = True + + def _finalize_stream_response( *, resource: OpenAiDefinition, @@ -858,21 +882,6 @@ def _finalize_stream_response( generation.end() -async def _finalize_stream_response_async( - *, - resource: OpenAiDefinition, - items: list[Any], - generation: LangfuseGeneration, - completion_start_time: Optional[datetime], -) -> None: - _finalize_stream_response( - resource=resource, - items=items, - generation=generation, - completion_start_time=completion_start_time, - ) - - def _instrument_openai_stream( *, resource: OpenAiDefinition, @@ -905,6 +914,8 @@ def finalize_once() -> None: completion_start_time=completion_start_time, ) + response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined] + def traced_iterator() -> Any: nonlocal completion_start_time try: @@ -955,7 +966,7 @@ async def finalize_once() -> None: return is_finalized = True - await _finalize_stream_response_async( + _finalize_stream_response( resource=resource, items=items, generation=generation, @@ -1167,6 +1178,7 @@ def register_tracing() -> None: register_tracing() +_install_openai_stream_iteration_hooks() class LangfuseResponseGeneratorSync: @@ -1275,7 +1287,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None pass async def _finalize(self) -> None: - await _finalize_stream_response_async( + _finalize_stream_response( resource=self.resource, items=self.items, generation=self.generation, diff --git a/tests/unit/test_openai.py b/tests/unit/test_openai.py index 1b993631d..a01f644fd 100644 --- a/tests/unit/test_openai.py +++ b/tests/unit/test_openai.py @@ -272,6 +272,38 @@ def test_openai_stream_preserves_original_stream_contract( } +def test_openai_stream_break_still_finalizes_generation( + langfuse_memory_client, get_span +): + openai_client = lf_openai.OpenAI(api_key="test") + raw_response = DummySyncResponse() + raw_stream = DummyOpenAIStream(_make_chat_stream_chunks(), raw_response) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = openai_client.chat.completions.create( + name="unit-openai-native-stream-break", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + for chunk in stream: + assert chunk.choices[0].delta.content == "2" + break + + assert raw_response.closed is False + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-stream-break") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + + @pytest.mark.asyncio async def test_async_chat_completion_exports_generation_span( langfuse_memory_client, get_span, json_attr From b5a7071be72b52f36125f29878ce80ac51c5e7b7 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:23:02 +0200 Subject: [PATCH 3/5] fix(openai): guard fallback stream finalization --- langfuse/openai.py | 10 ++++++ tests/unit/test_openai.py | 74 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/langfuse/openai.py b/langfuse/openai.py index c242cc036..6d5be27da 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -1195,6 +1195,7 @@ def __init__( self.response = response self.generation = generation self.completion_start_time: Optional[datetime] = None + self._is_finalized = False def __iter__(self) -> Any: try: @@ -1230,6 +1231,10 @@ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: pass def _finalize(self) -> None: + if self._is_finalized: + return + + self._is_finalized = True _finalize_stream_response( resource=self.resource, items=self.items, @@ -1252,6 +1257,7 @@ def __init__( self.response = response self.generation = generation self.completion_start_time: Optional[datetime] = None + self._is_finalized = False async def __aiter__(self) -> Any: try: @@ -1287,6 +1293,10 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None pass async def _finalize(self) -> None: + if self._is_finalized: + return + + self._is_finalized = True _finalize_stream_response( resource=self.resource, items=self.items, diff --git a/tests/unit/test_openai.py b/tests/unit/test_openai.py index a01f644fd..10a731a31 100644 --- a/tests/unit/test_openai.py +++ b/tests/unit/test_openai.py @@ -3,6 +3,7 @@ import pytest +import langfuse.openai as lf_openai_module from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse.openai import openai as lf_openai @@ -39,6 +40,17 @@ async def _stream(self, items): yield item +class DummyGeneration: + def __init__(self) -> None: + self.end_calls = 0 + + def update(self, **kwargs): + return self + + def end(self) -> None: + self.end_calls += 1 + + def _make_chat_stream_chunks(): usage = SimpleNamespace(prompt_tokens=3, completion_tokens=1, total_tokens=4) @@ -76,6 +88,24 @@ def _make_chat_stream_chunks(): ] +def _make_single_chunk_stream(): + return SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role="assistant", + content="2", + function_call=None, + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=None, + ) + + def test_chat_completion_exports_generation_span( langfuse_memory_client, get_span, json_attr ): @@ -439,6 +469,50 @@ async def test_openai_async_stream_supports_anext( } +def test_fallback_sync_stream_finalizes_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + + def fallback_stream(): + yield _make_single_chunk_stream() + + wrapper = lf_openai_module.LangfuseResponseGeneratorSync( + resource=resource, + response=fallback_stream(), + generation=generation, + ) + + list(wrapper) + + with pytest.raises(StopIteration): + next(wrapper) + + assert generation.end_calls == 1 + + +@pytest.mark.asyncio +async def test_fallback_async_stream_finalizes_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + + async def fallback_stream(): + yield _make_single_chunk_stream() + + wrapper = lf_openai_module.LangfuseResponseGeneratorAsync( + resource=resource, + response=fallback_stream(), + generation=generation, + ) + + async for _ in wrapper: + pass + + with pytest.raises(StopAsyncIteration): + await wrapper.__anext__() + + assert generation.end_calls == 1 + + def test_embedding_exports_dimensions_and_count( langfuse_memory_client, get_span, json_attr ): From 95c933d2922b93424ca01041401abdf82b2d3637 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:45:50 +0200 Subject: [PATCH 4/5] fix(openai): satisfy mypy for stream hooks --- langfuse/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 6d5be27da..02bd7d678 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -850,7 +850,7 @@ def traced_iter(self: Any) -> Any: if finalize_once is not None: finalize_once() - openai.Stream.__iter__ = traced_iter + setattr(openai.Stream, "__iter__", traced_iter) _openai_stream_iter_hook_installed = True From bdf0d171acdb29feabed865ca39ba8f0869b3373 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:29:15 +0200 Subject: [PATCH 5/5] fix(openai): finalize stream exits --- langfuse/openai.py | 58 +++++++++++++++++++-- tests/unit/test_openai.py | 106 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/langfuse/openai.py b/langfuse/openai.py index 02bd7d678..1ce09f754 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -21,7 +21,7 @@ from collections import defaultdict from dataclasses import dataclass from datetime import datetime -from inspect import isclass +from inspect import isawaitable, isclass from typing import Any, Optional, cast from openai._types import NotGiven @@ -841,6 +841,7 @@ def _install_openai_stream_iteration_hooks() -> None: if not _openai_stream_iter_hook_installed: original_iter = openai.Stream.__iter__ + original_aiter = openai.AsyncStream.__aiter__ def traced_iter(self: Any) -> Any: try: @@ -850,7 +851,17 @@ def traced_iter(self: Any) -> Any: if finalize_once is not None: finalize_once() + async def traced_aiter(self: Any) -> Any: + try: + async for item in original_aiter(self): + yield item + finally: + finalize_once = getattr(self, "_langfuse_finalize_once", None) + if finalize_once is not None: + await finalize_once() + setattr(openai.Stream, "__iter__", traced_iter) + setattr(openai.AsyncStream, "__aiter__", traced_aiter) _openai_stream_iter_hook_installed = True @@ -973,6 +984,8 @@ async def finalize_once() -> None: completion_start_time=completion_start_time, ) + response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined] + async def traced_iterator() -> Any: nonlocal completion_start_time try: @@ -1228,7 +1241,16 @@ def __enter__(self) -> Any: return self.__iter__() def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - pass + self.close() + + def close(self) -> None: + close = getattr(self.response, "close", None) + + try: + if callable(close): + close() + finally: + self._finalize() def _finalize(self) -> None: if self._is_finalized: @@ -1290,7 +1312,7 @@ async def __aenter__(self) -> Any: return self.__aiter__() async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - pass + await self.aclose() async def _finalize(self) -> None: if self._is_finalized: @@ -1309,11 +1331,37 @@ async def close(self) -> None: Automatically called if the response body is read to completion. """ - await self.response.close() + close = getattr(self.response, "close", None) + aclose = getattr(self.response, "aclose", None) + + try: + if callable(close): + result = close() + if isawaitable(result): + await result + elif callable(aclose): + result = aclose() + if isawaitable(result): + await result + finally: + await self._finalize() async def aclose(self) -> None: """Close the response and release the connection. Automatically called if the response body is read to completion. """ - await self.response.aclose() + aclose = getattr(self.response, "aclose", None) + close = getattr(self.response, "close", None) + + try: + if callable(aclose): + result = aclose() + if isawaitable(result): + await result + elif callable(close): + result = close() + if isawaitable(result): + await result + finally: + await self._finalize() diff --git a/tests/unit/test_openai.py b/tests/unit/test_openai.py index 10a731a31..ce69b1c76 100644 --- a/tests/unit/test_openai.py +++ b/tests/unit/test_openai.py @@ -1,3 +1,4 @@ +import asyncio from types import SimpleNamespace from unittest.mock import patch @@ -51,6 +52,18 @@ def end(self) -> None: self.end_calls += 1 +class DummyFallbackAsyncResponse: + def __init__(self) -> None: + self.close_calls = 0 + self.aclose_calls = 0 + + async def close(self) -> None: + self.close_calls += 1 + + async def aclose(self) -> None: + self.aclose_calls += 1 + + def _make_chat_stream_chunks(): usage = SimpleNamespace(prompt_tokens=3, completion_tokens=1, total_tokens=4) @@ -469,6 +482,42 @@ async def test_openai_async_stream_supports_anext( } +@pytest.mark.asyncio +async def test_openai_async_stream_break_still_finalizes_generation( + langfuse_memory_client, get_span +): + openai_client = lf_openai.AsyncOpenAI(api_key="test") + raw_stream = DummyOpenAIAsyncStream( + _make_chat_stream_chunks(), DummyAsyncResponse() + ) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = await openai_client.chat.completions.create( + name="unit-openai-native-async-stream-break", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + async for chunk in stream: + assert chunk.choices[0].delta.content == "2" + break + + # Async generator finalizers are scheduled across event-loop turns. + for _ in range(5): + await asyncio.sleep(0) + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-async-stream-break") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + + def test_fallback_sync_stream_finalizes_once(): resource = SimpleNamespace(object="Completions", type="chat") generation = DummyGeneration() @@ -490,6 +539,24 @@ def fallback_stream(): assert generation.end_calls == 1 +def test_fallback_sync_stream_exit_finalizes_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + + def fallback_stream(): + yield _make_single_chunk_stream() + + wrapper = lf_openai_module.LangfuseResponseGeneratorSync( + resource=resource, + response=fallback_stream(), + generation=generation, + ) + + wrapper.__exit__(None, None, None) + + assert generation.end_calls == 1 + + @pytest.mark.asyncio async def test_fallback_async_stream_finalizes_once(): resource = SimpleNamespace(object="Completions", type="chat") @@ -513,6 +580,45 @@ async def fallback_stream(): assert generation.end_calls == 1 +@pytest.mark.asyncio +async def test_fallback_async_stream_close_and_exit_finalize_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + response = DummyFallbackAsyncResponse() + + wrapper = lf_openai_module.LangfuseResponseGeneratorAsync( + resource=resource, + response=response, + generation=generation, + ) + + await wrapper.close() + await wrapper.__aexit__(None, None, None) + + assert generation.end_calls == 1 + assert response.close_calls == 1 + assert response.aclose_calls == 1 + + +@pytest.mark.asyncio +async def test_fallback_async_stream_aclose_finalizes_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + + async def fallback_stream(): + yield _make_single_chunk_stream() + + wrapper = lf_openai_module.LangfuseResponseGeneratorAsync( + resource=resource, + response=fallback_stream(), + generation=generation, + ) + + await wrapper.aclose() + + assert generation.end_calls == 1 + + def test_embedding_exports_dimensions_and_count( langfuse_memory_client, get_span, json_attr ):