@@ -830,6 +830,30 @@ def _is_streaming_response(response: Any) -> bool:
830830 )
831831
832832
833+ _openai_stream_iter_hook_installed = False
834+
835+
836+ def _install_openai_stream_iteration_hooks () -> None :
837+ global _openai_stream_iter_hook_installed
838+
839+ if not _is_openai_v1 ():
840+ return
841+
842+ if not _openai_stream_iter_hook_installed :
843+ original_iter = openai .Stream .__iter__
844+
845+ def traced_iter (self : Any ) -> Any :
846+ try :
847+ yield from original_iter (self )
848+ finally :
849+ finalize_once = getattr (self , "_langfuse_finalize_once" , None )
850+ if finalize_once is not None :
851+ finalize_once ()
852+
853+ openai .Stream .__iter__ = traced_iter
854+ _openai_stream_iter_hook_installed = True
855+
856+
833857def _finalize_stream_response (
834858 * ,
835859 resource : OpenAiDefinition ,
@@ -858,21 +882,6 @@ def _finalize_stream_response(
858882 generation .end ()
859883
860884
861- async def _finalize_stream_response_async (
862- * ,
863- resource : OpenAiDefinition ,
864- items : list [Any ],
865- generation : LangfuseGeneration ,
866- completion_start_time : Optional [datetime ],
867- ) -> None :
868- _finalize_stream_response (
869- resource = resource ,
870- items = items ,
871- generation = generation ,
872- completion_start_time = completion_start_time ,
873- )
874-
875-
876885def _instrument_openai_stream (
877886 * ,
878887 resource : OpenAiDefinition ,
@@ -905,6 +914,8 @@ def finalize_once() -> None:
905914 completion_start_time = completion_start_time ,
906915 )
907916
917+ response ._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]
918+
908919 def traced_iterator () -> Any :
909920 nonlocal completion_start_time
910921 try :
@@ -955,7 +966,7 @@ async def finalize_once() -> None:
955966 return
956967
957968 is_finalized = True
958- await _finalize_stream_response_async (
969+ _finalize_stream_response (
959970 resource = resource ,
960971 items = items ,
961972 generation = generation ,
@@ -1167,6 +1178,7 @@ def register_tracing() -> None:
11671178
11681179
11691180register_tracing ()
1181+ _install_openai_stream_iteration_hooks ()
11701182
11711183
11721184class LangfuseResponseGeneratorSync :
@@ -1275,7 +1287,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None
12751287 pass
12761288
12771289 async def _finalize (self ) -> None :
1278- await _finalize_stream_response_async (
1290+ _finalize_stream_response (
12791291 resource = self .resource ,
12801292 items = self .items ,
12811293 generation = self .generation ,
0 commit comments