Skip to content

Commit 7aa632f

Browse files
authored
fix(openai): preserve native v1 stream contract (#1627)
1 parent 7f15dbf commit 7aa632f

2 files changed

Lines changed: 674 additions & 46 deletions

File tree

langfuse/openai.py

Lines changed: 261 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from collections import defaultdict
2222
from dataclasses import dataclass
2323
from datetime import datetime
24-
from inspect import isclass
24+
from inspect import isawaitable, isclass
2525
from typing import Any, Optional, cast
2626

2727
from openai._types import NotGiven
@@ -830,6 +830,191 @@ def _is_streaming_response(response: Any) -> bool:
830830
)
831831

832832

833+
_openai_stream_iter_hook_installed = False
834+
835+
836+
def _install_openai_stream_iteration_hooks() -> None:
837+
global _openai_stream_iter_hook_installed
838+
839+
if not _is_openai_v1():
840+
return
841+
842+
if not _openai_stream_iter_hook_installed:
843+
original_iter = openai.Stream.__iter__
844+
original_aiter = openai.AsyncStream.__aiter__
845+
846+
def traced_iter(self: Any) -> Any:
847+
try:
848+
yield from original_iter(self)
849+
finally:
850+
finalize_once = getattr(self, "_langfuse_finalize_once", None)
851+
if finalize_once is not None:
852+
finalize_once()
853+
854+
async def traced_aiter(self: Any) -> Any:
855+
try:
856+
async for item in original_aiter(self):
857+
yield item
858+
finally:
859+
finalize_once = getattr(self, "_langfuse_finalize_once", None)
860+
if finalize_once is not None:
861+
await finalize_once()
862+
863+
setattr(openai.Stream, "__iter__", traced_iter)
864+
setattr(openai.AsyncStream, "__aiter__", traced_aiter)
865+
_openai_stream_iter_hook_installed = True
866+
867+
868+
def _finalize_stream_response(
869+
*,
870+
resource: OpenAiDefinition,
871+
items: list[Any],
872+
generation: LangfuseGeneration,
873+
completion_start_time: Optional[datetime],
874+
) -> None:
875+
try:
876+
model, completion, usage, metadata = (
877+
_extract_streamed_response_api_response(items)
878+
if resource.object == "Responses" or resource.object == "AsyncResponses"
879+
else _extract_streamed_openai_response(resource, items)
880+
)
881+
882+
_create_langfuse_update(
883+
completion,
884+
generation,
885+
completion_start_time,
886+
model=model,
887+
usage=usage,
888+
metadata=metadata,
889+
)
890+
except Exception:
891+
pass
892+
finally:
893+
generation.end()
894+
895+
896+
def _instrument_openai_stream(
897+
*,
898+
resource: OpenAiDefinition,
899+
response: Any,
900+
generation: LangfuseGeneration,
901+
) -> Any:
902+
if not hasattr(response, "_iterator"):
903+
return LangfuseResponseGeneratorSync(
904+
resource=resource,
905+
response=response,
906+
generation=generation,
907+
)
908+
909+
items: list[Any] = []
910+
raw_iterator = response._iterator
911+
completion_start_time: Optional[datetime] = None
912+
is_finalized = False
913+
close = response.close
914+
915+
def finalize_once() -> None:
916+
nonlocal is_finalized
917+
if is_finalized:
918+
return
919+
920+
is_finalized = True
921+
_finalize_stream_response(
922+
resource=resource,
923+
items=items,
924+
generation=generation,
925+
completion_start_time=completion_start_time,
926+
)
927+
928+
response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]
929+
930+
def traced_iterator() -> Any:
931+
nonlocal completion_start_time
932+
try:
933+
for item in raw_iterator:
934+
items.append(item)
935+
936+
if completion_start_time is None:
937+
completion_start_time = _get_timestamp()
938+
939+
yield item
940+
finally:
941+
finalize_once()
942+
943+
def traced_close() -> Any:
944+
try:
945+
return close()
946+
finally:
947+
finalize_once()
948+
949+
response._iterator = traced_iterator()
950+
response.close = traced_close
951+
952+
return response
953+
954+
955+
def _instrument_openai_async_stream(
956+
*,
957+
resource: OpenAiDefinition,
958+
response: Any,
959+
generation: LangfuseGeneration,
960+
) -> Any:
961+
if not hasattr(response, "_iterator"):
962+
return LangfuseResponseGeneratorAsync(
963+
resource=resource,
964+
response=response,
965+
generation=generation,
966+
)
967+
968+
items: list[Any] = []
969+
raw_iterator = response._iterator
970+
completion_start_time: Optional[datetime] = None
971+
is_finalized = False
972+
close = response.close
973+
974+
async def finalize_once() -> None:
975+
nonlocal is_finalized
976+
if is_finalized:
977+
return
978+
979+
is_finalized = True
980+
_finalize_stream_response(
981+
resource=resource,
982+
items=items,
983+
generation=generation,
984+
completion_start_time=completion_start_time,
985+
)
986+
987+
response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]
988+
989+
async def traced_iterator() -> Any:
990+
nonlocal completion_start_time
991+
try:
992+
async for item in raw_iterator:
993+
items.append(item)
994+
995+
if completion_start_time is None:
996+
completion_start_time = _get_timestamp()
997+
998+
yield item
999+
finally:
1000+
await finalize_once()
1001+
1002+
async def traced_close() -> Any:
1003+
try:
1004+
return await close()
1005+
finally:
1006+
await finalize_once()
1007+
1008+
async def traced_aclose() -> Any:
1009+
return await traced_close()
1010+
1011+
response._iterator = traced_iterator()
1012+
response.close = traced_close
1013+
response.aclose = traced_aclose
1014+
1015+
return response
1016+
1017+
8331018
@_langfuse_wrapper
8341019
def _wrap(
8351020
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
@@ -863,7 +1048,13 @@ def _wrap(
8631048
try:
8641049
openai_response = wrapped(**arg_extractor.get_openai_args())
8651050

866-
if _is_streaming_response(openai_response):
1051+
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
1052+
return _instrument_openai_stream(
1053+
resource=open_ai_resource,
1054+
response=openai_response,
1055+
generation=generation,
1056+
)
1057+
elif _is_streaming_response(openai_response):
8671058
return LangfuseResponseGeneratorSync(
8681059
resource=open_ai_resource,
8691060
response=openai_response,
@@ -934,7 +1125,13 @@ async def _wrap_async(
9341125
try:
9351126
openai_response = await wrapped(**arg_extractor.get_openai_args())
9361127

937-
if _is_streaming_response(openai_response):
1128+
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
1129+
return _instrument_openai_async_stream(
1130+
resource=open_ai_resource,
1131+
response=openai_response,
1132+
generation=generation,
1133+
)
1134+
elif _is_streaming_response(openai_response):
9381135
return LangfuseResponseGeneratorAsync(
9391136
resource=open_ai_resource,
9401137
response=openai_response,
@@ -994,6 +1191,7 @@ def register_tracing() -> None:
9941191

9951192

9961193
register_tracing()
1194+
_install_openai_stream_iteration_hooks()
9971195

9981196

9991197
class LangfuseResponseGeneratorSync:
@@ -1010,6 +1208,7 @@ def __init__(
10101208
self.response = response
10111209
self.generation = generation
10121210
self.completion_start_time: Optional[datetime] = None
1211+
self._is_finalized = False
10131212

10141213
def __iter__(self) -> Any:
10151214
try:
@@ -1042,29 +1241,28 @@ def __enter__(self) -> Any:
10421241
return self.__iter__()
10431242

10441243
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
1045-
pass
1244+
self.close()
10461245

1047-
def _finalize(self) -> None:
1048-
try:
1049-
model, completion, usage, metadata = (
1050-
_extract_streamed_response_api_response(self.items)
1051-
if self.resource.object == "Responses"
1052-
or self.resource.object == "AsyncResponses"
1053-
else _extract_streamed_openai_response(self.resource, self.items)
1054-
)
1246+
def close(self) -> None:
1247+
close = getattr(self.response, "close", None)
10551248

1056-
_create_langfuse_update(
1057-
completion,
1058-
self.generation,
1059-
self.completion_start_time,
1060-
model=model,
1061-
usage=usage,
1062-
metadata=metadata,
1063-
)
1064-
except Exception:
1065-
pass
1249+
try:
1250+
if callable(close):
1251+
close()
10661252
finally:
1067-
self.generation.end()
1253+
self._finalize()
1254+
1255+
def _finalize(self) -> None:
1256+
if self._is_finalized:
1257+
return
1258+
1259+
self._is_finalized = True
1260+
_finalize_stream_response(
1261+
resource=self.resource,
1262+
items=self.items,
1263+
generation=self.generation,
1264+
completion_start_time=self.completion_start_time,
1265+
)
10681266

10691267

10701268
class LangfuseResponseGeneratorAsync:
@@ -1081,6 +1279,7 @@ def __init__(
10811279
self.response = response
10821280
self.generation = generation
10831281
self.completion_start_time: Optional[datetime] = None
1282+
self._is_finalized = False
10841283

10851284
async def __aiter__(self) -> Any:
10861285
try:
@@ -1113,40 +1312,56 @@ async def __aenter__(self) -> Any:
11131312
return self.__aiter__()
11141313

11151314
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
1116-
pass
1315+
await self.aclose()
11171316

11181317
async def _finalize(self) -> None:
1119-
try:
1120-
model, completion, usage, metadata = (
1121-
_extract_streamed_response_api_response(self.items)
1122-
if self.resource.object == "Responses"
1123-
or self.resource.object == "AsyncResponses"
1124-
else _extract_streamed_openai_response(self.resource, self.items)
1125-
)
1126-
1127-
_create_langfuse_update(
1128-
completion,
1129-
self.generation,
1130-
self.completion_start_time,
1131-
model=model,
1132-
usage=usage,
1133-
metadata=metadata,
1134-
)
1135-
except Exception:
1136-
pass
1137-
finally:
1138-
self.generation.end()
1318+
if self._is_finalized:
1319+
return
1320+
1321+
self._is_finalized = True
1322+
_finalize_stream_response(
1323+
resource=self.resource,
1324+
items=self.items,
1325+
generation=self.generation,
1326+
completion_start_time=self.completion_start_time,
1327+
)
11391328

11401329
async def close(self) -> None:
11411330
"""Close the response and release the connection.
11421331
11431332
Automatically called if the response body is read to completion.
11441333
"""
1145-
await self.response.close()
1334+
close = getattr(self.response, "close", None)
1335+
aclose = getattr(self.response, "aclose", None)
1336+
1337+
try:
1338+
if callable(close):
1339+
result = close()
1340+
if isawaitable(result):
1341+
await result
1342+
elif callable(aclose):
1343+
result = aclose()
1344+
if isawaitable(result):
1345+
await result
1346+
finally:
1347+
await self._finalize()
11461348

11471349
async def aclose(self) -> None:
11481350
"""Close the response and release the connection.
11491351
11501352
Automatically called if the response body is read to completion.
11511353
"""
1152-
await self.response.aclose()
1354+
aclose = getattr(self.response, "aclose", None)
1355+
close = getattr(self.response, "close", None)
1356+
1357+
try:
1358+
if callable(aclose):
1359+
result = aclose()
1360+
if isawaitable(result):
1361+
await result
1362+
elif callable(close):
1363+
result = close()
1364+
if isawaitable(result):
1365+
await result
1366+
finally:
1367+
await self._finalize()

0 commit comments

Comments
 (0)