Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 108 additions & 39 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,56 @@ def __init__(
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn
self._span_ended = False

def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
return self

def _finalize(self) -> None:
if self._span_ended:
return

if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()
self._span_ended = True

def _finalize_with_error(self, error: BaseException) -> None:
if self._span_ended:
return

self.span.update(
level="ERROR", status_message=str(error) or type(error).__name__
).end()
self._span_ended = True

def close(self) -> None:
if self._span_ended:
return

try:
self.context.run(self.generator.close)
except (Exception, asyncio.CancelledError) as error:
self._finalize_with_error(error)
raise
else:
self._finalize()

def __del__(self) -> None:
try:
self.close()
except BaseException:
pass

def __next__(self) -> Any:
try:
# Run the generator's __next__ in the preserved context
Expand All @@ -577,27 +623,11 @@ def __next__(self) -> Any:
return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()

self._finalize()
raise # Re-raise StopIteration

except (Exception, asyncio.CancelledError) as e:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
).end()

self._finalize_with_error(e)
raise


Expand Down Expand Up @@ -628,47 +658,86 @@ def __init__(
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn
self._span_ended = False

def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
return self

def _finalize(self) -> None:
if self._span_ended:
return

if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()
self._span_ended = True

def _finalize_with_error(self, error: BaseException) -> None:
if self._span_ended:
return

self.span.update(
level="ERROR", status_message=str(error) or type(error).__name__
).end()
self._span_ended = True

async def aclose(self) -> None:
if self._span_ended:
return

try:
try:
await asyncio.create_task(
self.generator.aclose(),
context=self.context,
) # type: ignore
except TypeError:
await self.context.run(asyncio.create_task, self.generator.aclose())
Comment thread
hassiebp marked this conversation as resolved.
except (Exception, asyncio.CancelledError) as error:
self._finalize_with_error(error)
raise
else:
self._finalize()

async def close(self) -> None:
await self.aclose()

def __del__(self) -> None:
self._finalize()

async def __anext__(self) -> Any:
try:
# Run the generator's __anext__ in the preserved context
try:
# Python 3.10+ approach with context parameter
# Python 3.11+ approach with explicit task context
item = await asyncio.create_task(
self.generator.__anext__(), # type: ignore
context=self.context,
) # type: ignore
except TypeError:
# Python < 3.10 fallback - context parameter not supported
item = await self.generator.__anext__()
# Python 3.10 fallback - create the task inside the preserved context.
item = await self.context.run(
asyncio.create_task,
self.generator.__anext__(), # type: ignore
)

if self.capture_output:
self.items.append(item)

return item

except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()

self._finalize()
raise # Re-raise StopAsyncIteration
except (Exception, asyncio.CancelledError) as e:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
).end()

self._finalize_with_error(e)
raise
Loading
Loading