Skip to content

Commit 03ba99d

Browse files
committed
fix(observe): preserve async wrapper context on py310
1 parent 9ee80d3 commit 03ba99d

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

langfuse/_client/observe.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ async def aclose(self) -> None:
701701
context=self.context,
702702
) # type: ignore
703703
except TypeError:
704-
await self.generator.aclose()
704+
await self.context.run(asyncio.create_task, self.generator.aclose())
705705
except (Exception, asyncio.CancelledError) as error:
706706
self._finalize_with_error(error)
707707
raise
@@ -718,14 +718,17 @@ async def __anext__(self) -> Any:
718718
try:
719719
# Run the generator's __anext__ in the preserved context
720720
try:
721-
# Python 3.10+ approach with context parameter
721+
# Python 3.11+ approach with explicit task context
722722
item = await asyncio.create_task(
723723
self.generator.__anext__(), # type: ignore
724724
context=self.context,
725725
) # type: ignore
726726
except TypeError:
727-
# Python < 3.10 fallback - context parameter not supported
728-
item = await self.generator.__anext__()
727+
# Python 3.10 fallback - create the task inside the preserved context.
728+
item = await self.context.run(
729+
asyncio.create_task,
730+
self.generator.__anext__(), # type: ignore
731+
)
729732

730733
if self.capture_output:
731734
self.items.append(item)

tests/unit/test_observe.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,49 @@ async def generator() -> AsyncGenerator[str, None]:
247247
assert span.ended == 1
248248

249249

250+
@pytest.mark.asyncio
251+
async def test_async_generator_wrapper_fallback_preserves_context(
252+
monkeypatch: pytest.MonkeyPatch,
253+
) -> None:
254+
marker = contextvars.ContextVar("marker", default="ambient")
255+
seen: list[str] = []
256+
original_create_task = asyncio.create_task
257+
258+
def create_task_with_type_error(*args: Any, **kwargs: Any) -> asyncio.Task[Any]:
259+
if "context" in kwargs:
260+
raise TypeError("context argument unsupported")
261+
262+
return original_create_task(*args, **kwargs)
263+
264+
monkeypatch.setattr(asyncio, "create_task", create_task_with_type_error)
265+
266+
async def generator() -> AsyncGenerator[str, None]:
267+
try:
268+
yield marker.get()
269+
yield "item_1"
270+
finally:
271+
seen.append(marker.get())
272+
273+
span = SpanRecorder()
274+
context = contextvars.copy_context()
275+
context.run(marker.set, "preserved")
276+
wrapper = _ContextPreservedAsyncGeneratorWrapper(
277+
generator(),
278+
context,
279+
cast(Any, span),
280+
False,
281+
None,
282+
)
283+
284+
assert await wrapper.__anext__() == "preserved"
285+
marker.set("ambient-now")
286+
287+
await wrapper.aclose()
288+
289+
assert seen == ["preserved"]
290+
assert span.ended == 1
291+
292+
250293
@pytest.mark.asyncio
251294
async def test_async_generator_wrapper_del_ends_span_when_abandoned() -> None:
252295
async def generator() -> AsyncGenerator[str, None]:

0 commit comments

Comments
 (0)