Skip to content

Commit b818885

Browse files
committed
fix(openai): preserve native v1 stream contract
1 parent cfbe7a3 commit b818885

2 files changed

Lines changed: 388 additions & 42 deletions

File tree

langfuse/openai.py

Lines changed: 187 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,167 @@ def _is_streaming_response(response: Any) -> bool:
830830
)
831831

832832

833+
def _finalize_stream_response(
834+
*,
835+
resource: OpenAiDefinition,
836+
items: list[Any],
837+
generation: LangfuseGeneration,
838+
completion_start_time: Optional[datetime],
839+
) -> None:
840+
try:
841+
model, completion, usage, metadata = (
842+
_extract_streamed_response_api_response(items)
843+
if resource.object == "Responses" or resource.object == "AsyncResponses"
844+
else _extract_streamed_openai_response(resource, items)
845+
)
846+
847+
_create_langfuse_update(
848+
completion,
849+
generation,
850+
completion_start_time,
851+
model=model,
852+
usage=usage,
853+
metadata=metadata,
854+
)
855+
except Exception:
856+
pass
857+
finally:
858+
generation.end()
859+
860+
861+
async def _finalize_stream_response_async(
862+
*,
863+
resource: OpenAiDefinition,
864+
items: list[Any],
865+
generation: LangfuseGeneration,
866+
completion_start_time: Optional[datetime],
867+
) -> None:
868+
_finalize_stream_response(
869+
resource=resource,
870+
items=items,
871+
generation=generation,
872+
completion_start_time=completion_start_time,
873+
)
874+
875+
876+
def _instrument_openai_stream(
877+
*,
878+
resource: OpenAiDefinition,
879+
response: Any,
880+
generation: LangfuseGeneration,
881+
) -> Any:
882+
if not hasattr(response, "_iterator"):
883+
return LangfuseResponseGeneratorSync(
884+
resource=resource,
885+
response=response,
886+
generation=generation,
887+
)
888+
889+
items: list[Any] = []
890+
raw_iterator = response._iterator
891+
completion_start_time: Optional[datetime] = None
892+
is_finalized = False
893+
close = response.close
894+
895+
def finalize_once() -> None:
896+
nonlocal is_finalized
897+
if is_finalized:
898+
return
899+
900+
is_finalized = True
901+
_finalize_stream_response(
902+
resource=resource,
903+
items=items,
904+
generation=generation,
905+
completion_start_time=completion_start_time,
906+
)
907+
908+
def traced_iterator() -> Any:
909+
nonlocal completion_start_time
910+
try:
911+
for item in raw_iterator:
912+
items.append(item)
913+
914+
if completion_start_time is None:
915+
completion_start_time = _get_timestamp()
916+
917+
yield item
918+
finally:
919+
finalize_once()
920+
921+
def traced_close() -> Any:
922+
try:
923+
return close()
924+
finally:
925+
finalize_once()
926+
927+
response._iterator = traced_iterator()
928+
response.close = traced_close
929+
930+
return response
931+
932+
933+
def _instrument_openai_async_stream(
934+
*,
935+
resource: OpenAiDefinition,
936+
response: Any,
937+
generation: LangfuseGeneration,
938+
) -> Any:
939+
if not hasattr(response, "_iterator"):
940+
return LangfuseResponseGeneratorAsync(
941+
resource=resource,
942+
response=response,
943+
generation=generation,
944+
)
945+
946+
items: list[Any] = []
947+
raw_iterator = response._iterator
948+
completion_start_time: Optional[datetime] = None
949+
is_finalized = False
950+
close = response.close
951+
952+
async def finalize_once() -> None:
953+
nonlocal is_finalized
954+
if is_finalized:
955+
return
956+
957+
is_finalized = True
958+
await _finalize_stream_response_async(
959+
resource=resource,
960+
items=items,
961+
generation=generation,
962+
completion_start_time=completion_start_time,
963+
)
964+
965+
async def traced_iterator() -> Any:
966+
nonlocal completion_start_time
967+
try:
968+
async for item in raw_iterator:
969+
items.append(item)
970+
971+
if completion_start_time is None:
972+
completion_start_time = _get_timestamp()
973+
974+
yield item
975+
finally:
976+
await finalize_once()
977+
978+
async def traced_close() -> Any:
979+
try:
980+
return await close()
981+
finally:
982+
await finalize_once()
983+
984+
async def traced_aclose() -> Any:
985+
return await traced_close()
986+
987+
response._iterator = traced_iterator()
988+
response.close = traced_close
989+
response.aclose = traced_aclose
990+
991+
return response
992+
993+
833994
@_langfuse_wrapper
834995
def _wrap(
835996
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
@@ -863,7 +1024,13 @@ def _wrap(
8631024
try:
8641025
openai_response = wrapped(**arg_extractor.get_openai_args())
8651026

866-
if _is_streaming_response(openai_response):
1027+
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
1028+
return _instrument_openai_stream(
1029+
resource=open_ai_resource,
1030+
response=openai_response,
1031+
generation=generation,
1032+
)
1033+
elif _is_streaming_response(openai_response):
8671034
return LangfuseResponseGeneratorSync(
8681035
resource=open_ai_resource,
8691036
response=openai_response,
@@ -934,7 +1101,13 @@ async def _wrap_async(
9341101
try:
9351102
openai_response = await wrapped(**arg_extractor.get_openai_args())
9361103

937-
if _is_streaming_response(openai_response):
1104+
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
1105+
return _instrument_openai_async_stream(
1106+
resource=open_ai_resource,
1107+
response=openai_response,
1108+
generation=generation,
1109+
)
1110+
elif _is_streaming_response(openai_response):
9381111
return LangfuseResponseGeneratorAsync(
9391112
resource=open_ai_resource,
9401113
response=openai_response,
@@ -1045,26 +1218,12 @@ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
10451218
pass
10461219

10471220
def _finalize(self) -> None:
1048-
try:
1049-
model, completion, usage, metadata = (
1050-
_extract_streamed_response_api_response(self.items)
1051-
if self.resource.object == "Responses"
1052-
or self.resource.object == "AsyncResponses"
1053-
else _extract_streamed_openai_response(self.resource, self.items)
1054-
)
1055-
1056-
_create_langfuse_update(
1057-
completion,
1058-
self.generation,
1059-
self.completion_start_time,
1060-
model=model,
1061-
usage=usage,
1062-
metadata=metadata,
1063-
)
1064-
except Exception:
1065-
pass
1066-
finally:
1067-
self.generation.end()
1221+
_finalize_stream_response(
1222+
resource=self.resource,
1223+
items=self.items,
1224+
generation=self.generation,
1225+
completion_start_time=self.completion_start_time,
1226+
)
10681227

10691228

10701229
class LangfuseResponseGeneratorAsync:
@@ -1116,26 +1275,12 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None
11161275
pass
11171276

11181277
async def _finalize(self) -> None:
1119-
try:
1120-
model, completion, usage, metadata = (
1121-
_extract_streamed_response_api_response(self.items)
1122-
if self.resource.object == "Responses"
1123-
or self.resource.object == "AsyncResponses"
1124-
else _extract_streamed_openai_response(self.resource, self.items)
1125-
)
1126-
1127-
_create_langfuse_update(
1128-
completion,
1129-
self.generation,
1130-
self.completion_start_time,
1131-
model=model,
1132-
usage=usage,
1133-
metadata=metadata,
1134-
)
1135-
except Exception:
1136-
pass
1137-
finally:
1138-
self.generation.end()
1278+
await _finalize_stream_response_async(
1279+
resource=self.resource,
1280+
items=self.items,
1281+
generation=self.generation,
1282+
completion_start_time=self.completion_start_time,
1283+
)
11391284

11401285
async def close(self) -> None:
11411286
"""Close the response and release the connection.

0 commit comments

Comments
 (0)