Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 144 additions & 49 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextvars
import inspect
import logging
import os
Expand All @@ -21,25 +22,24 @@
from opentelemetry.util._decorator import _AgnosticContextManager
from typing_extensions import ParamSpec

from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)

from langfuse._client.constants import (
ObservationTypeLiteralNoEvent,
get_observation_types_list,
)
from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)
from langfuse._client.get_client import _set_current_public_key, get_client
from langfuse._client.span import (
LangfuseGeneration,
LangfuseSpan,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseEvaluator,
LangfuseGeneration,
LangfuseGuardrail,
LangfuseRetriever,
LangfuseSpan,
LangfuseTool,
)
from langfuse.types import TraceContext

Expand Down Expand Up @@ -468,27 +468,69 @@ def _wrap_sync_generator_result(
generator: Generator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
items = []

try:
for item in generator:
items.append(item)

yield item

finally:
output: Any = items

if transform_to_string is not None:
output = transform_to_string(items)

elif all(isinstance(item, str) for item in items):
output = "".join(items)

langfuse_span_or_generation.update(output=output)
langfuse_span_or_generation.end()
# Capture the current context while the span is still active
preserved_context = contextvars.copy_context()
items: list[Any] = []

class ContextPreservedSyncGeneratorWrapper:
"""Sync generator wrapper that ensures each iteration runs in preserved context."""

def __init__(
self,
generator: Generator,
context: contextvars.Context,
items: list[Any],
span: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseGuardrail,
],
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items = items
self.span = span
self.transform_fn = transform_fn

def __iter__(self) -> "ContextPreservedSyncGeneratorWrapper":
return self

def __next__(self) -> Any:
Comment thread
hassiebp marked this conversation as resolved.
Outdated
try:
# Run the generator's __next__ in the preserved context
item = self.context.run(next, self.generator)
self.items.append(item)
return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)
self.span.end()
raise # Re-raise StopIteration

return ContextPreservedSyncGeneratorWrapper(
generator,
preserved_context,
items,
langfuse_span_or_generation,
transform_to_string,
)

async def _wrap_async_generator_result(
def _wrap_async_generator_result(
self,
langfuse_span_or_generation: Union[
LangfuseSpan,
Expand All @@ -503,26 +545,79 @@ async def _wrap_async_generator_result(
],
generator: AsyncGenerator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> AsyncGenerator:
items = []

try:
async for item in generator:
items.append(item)

yield item

finally:
output: Any = items

if transform_to_string is not None:
output = transform_to_string(items)

elif all(isinstance(item, str) for item in items):
output = "".join(items)

langfuse_span_or_generation.update(output=output)
langfuse_span_or_generation.end()
) -> Any:
import asyncio

# Capture the current context while the span is still active
preserved_context = contextvars.copy_context()
items: list[Any] = []

class ContextPreservedAsyncGeneratorWrapper:
"""Async generator wrapper that ensures each iteration runs in preserved context."""

def __init__(
self,
generator: AsyncGenerator,
context: contextvars.Context,
items: list[Any],
span: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseGuardrail,
],
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items = items
self.span = span
self.transform_fn = transform_fn

def __aiter__(self) -> "ContextPreservedAsyncGeneratorWrapper":
return self

async def __anext__(self) -> Any:
Comment thread
hassiebp marked this conversation as resolved.
Outdated
try:
# Run the generator's __anext__ in the preserved context
try:
# Python 3.10+ approach with context parameter
item = await asyncio.create_task(
self.generator.__anext__(), # type: ignore
context=self.context,
) # type: ignore
except TypeError:
# Python < 3.10 fallback - context parameter not supported
item = await self.generator.__anext__()
Comment thread
hassiebp marked this conversation as resolved.
Outdated

self.items.append(item)
return item

except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output)
self.span.end()
raise # Re-raise StopAsyncIteration

return ContextPreservedAsyncGeneratorWrapper(
generator,
preserved_context,
items,
langfuse_span_or_generation,
transform_to_string,
)


_decorator = LangfuseDecorator()
Expand Down
Loading
Loading