Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 102 additions & 36 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,56 @@ def __init__(
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn
self._span_ended = False

def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
return self

def _finalize(self) -> None:
if self._span_ended:
return

if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()
self._span_ended = True

def _finalize_with_error(self, error: BaseException) -> None:
if self._span_ended:
return

self.span.update(
level="ERROR", status_message=str(error) or type(error).__name__
).end()
self._span_ended = True

def close(self) -> None:
if self._span_ended:
return

try:
self.context.run(self.generator.close)
except (Exception, asyncio.CancelledError) as error:
self._finalize_with_error(error)
raise
else:
self._finalize()

def __del__(self) -> None:
try:
self.close()
except BaseException:
pass

def __next__(self) -> Any:
try:
# Run the generator's __next__ in the preserved context
Expand All @@ -577,27 +623,11 @@ def __next__(self) -> Any:
return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()

self._finalize()
raise # Re-raise StopIteration

except (Exception, asyncio.CancelledError) as e:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
).end()

self._finalize_with_error(e)
raise


Expand Down Expand Up @@ -628,10 +658,62 @@ def __init__(
self.span = span
self.capture_output = capture_output
self.transform_fn = transform_fn
self._span_ended = False

def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
return self

def _finalize(self) -> None:
if self._span_ended:
return

if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()
self._span_ended = True

def _finalize_with_error(self, error: BaseException) -> None:
if self._span_ended:
return

self.span.update(
level="ERROR", status_message=str(error) or type(error).__name__
).end()
self._span_ended = True

async def aclose(self) -> None:
if self._span_ended:
return

try:
try:
await asyncio.create_task(
self.generator.aclose(),
context=self.context,
) # type: ignore
except TypeError:
await self.generator.aclose()
Comment thread
hassiebp marked this conversation as resolved.
Outdated
except (Exception, asyncio.CancelledError) as error:
self._finalize_with_error(error)
raise
else:
self._finalize()

async def close(self) -> None:
await self.aclose()

def __del__(self) -> None:
self._finalize()

async def __anext__(self) -> Any:
try:
# Run the generator's __anext__ in the preserved context
Expand All @@ -651,24 +733,8 @@ async def __anext__(self) -> Any:
return item

except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
if self.capture_output:
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)

self.span.end()

self._finalize()
raise # Re-raise StopAsyncIteration
except (Exception, asyncio.CancelledError) as e:
self.span.update(
level="ERROR", status_message=str(e) or type(e).__name__
).end()

self._finalize_with_error(e)
raise
Loading
Loading