Skip to content

Commit fccbffc

Browse files
authored
Merge branch 'main' into codex/langgraph-control-flow-resume-traces
2 parents ad2e408 + cd9812c commit fccbffc

File tree

3 files changed

+192
-87
lines changed

3 files changed

+192
-87
lines changed

.github/workflows/dependabot-merge.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
steps:
1616
- name: Dependabot metadata
1717
id: metadata
18-
uses: dependabot/fetch-metadata@ffa630c65fa7e0ecfa0625b5ceda64399aea1b36 # v3
18+
uses: dependabot/fetch-metadata@ffa630c65fa7e0ecfa0625b5ceda64399aea1b36 # v3.0.0
1919
with:
2020
github-token: "${{ secrets.GITHUB_TOKEN }}"
2121
- name: Enable auto-merge for Dependabot PRs

langfuse/_client/observe.py

Lines changed: 99 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -290,42 +290,15 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
290290

291291
try:
292292
result = await func(*args, **kwargs)
293-
294-
if capture_output is True:
295-
if inspect.isgenerator(result):
296-
is_return_type_generator = True
297-
298-
return self._wrap_sync_generator_result(
299-
langfuse_span_or_generation,
300-
result,
301-
transform_to_string,
302-
)
303-
304-
if inspect.isasyncgen(result):
305-
is_return_type_generator = True
306-
307-
return self._wrap_async_generator_result(
308-
langfuse_span_or_generation,
309-
result,
310-
transform_to_string,
311-
)
312-
313-
# handle starlette.StreamingResponse
314-
if type(result).__name__ == "StreamingResponse" and hasattr(
315-
result, "body_iterator"
316-
):
317-
is_return_type_generator = True
318-
319-
result.body_iterator = (
320-
self._wrap_async_generator_result(
321-
langfuse_span_or_generation,
322-
result.body_iterator,
323-
transform_to_string,
324-
)
325-
)
326-
327-
langfuse_span_or_generation.update(output=result)
328-
293+
(
294+
is_return_type_generator,
295+
result,
296+
) = self._handle_observe_result(
297+
langfuse_span_or_generation,
298+
result,
299+
capture_output=capture_output,
300+
transform_to_string=transform_to_string,
301+
)
329302
return result
330303
except (Exception, asyncio.CancelledError) as e:
331304
langfuse_span_or_generation.update(
@@ -408,42 +381,15 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
408381

409382
try:
410383
result = func(*args, **kwargs)
411-
412-
if capture_output is True:
413-
if inspect.isgenerator(result):
414-
is_return_type_generator = True
415-
416-
return self._wrap_sync_generator_result(
417-
langfuse_span_or_generation,
418-
result,
419-
transform_to_string,
420-
)
421-
422-
if inspect.isasyncgen(result):
423-
is_return_type_generator = True
424-
425-
return self._wrap_async_generator_result(
426-
langfuse_span_or_generation,
427-
result,
428-
transform_to_string,
429-
)
430-
431-
# handle starlette.StreamingResponse
432-
if type(result).__name__ == "StreamingResponse" and hasattr(
433-
result, "body_iterator"
434-
):
435-
is_return_type_generator = True
436-
437-
result.body_iterator = (
438-
self._wrap_async_generator_result(
439-
langfuse_span_or_generation,
440-
result.body_iterator,
441-
transform_to_string,
442-
)
443-
)
444-
445-
langfuse_span_or_generation.update(output=result)
446-
384+
(
385+
is_return_type_generator,
386+
result,
387+
) = self._handle_observe_result(
388+
langfuse_span_or_generation,
389+
result,
390+
capture_output=capture_output,
391+
transform_to_string=transform_to_string,
392+
)
447393
return result
448394
except (Exception, asyncio.CancelledError) as e:
449395
langfuse_span_or_generation.update(
@@ -493,6 +439,7 @@ def _wrap_sync_generator_result(
493439
LangfuseGuardrail,
494440
],
495441
generator: Generator,
442+
capture_output: bool,
496443
transform_to_string: Optional[Callable[[Iterable], str]] = None,
497444
) -> Any:
498445
preserved_context = contextvars.copy_context()
@@ -501,6 +448,7 @@ def _wrap_sync_generator_result(
501448
generator,
502449
preserved_context,
503450
langfuse_span_or_generation,
451+
capture_output,
504452
transform_to_string,
505453
)
506454

@@ -518,6 +466,7 @@ def _wrap_async_generator_result(
518466
LangfuseGuardrail,
519467
],
520468
generator: AsyncGenerator,
469+
capture_output: bool,
521470
transform_to_string: Optional[Callable[[Iterable], str]] = None,
522471
) -> Any:
523472
preserved_context = contextvars.copy_context()
@@ -526,9 +475,61 @@ def _wrap_async_generator_result(
526475
generator,
527476
preserved_context,
528477
langfuse_span_or_generation,
478+
capture_output,
529479
transform_to_string,
530480
)
531481

482+
def _handle_observe_result(
483+
self,
484+
langfuse_span_or_generation: Union[
485+
LangfuseSpan,
486+
LangfuseGeneration,
487+
LangfuseAgent,
488+
LangfuseTool,
489+
LangfuseChain,
490+
LangfuseRetriever,
491+
LangfuseEvaluator,
492+
LangfuseEmbedding,
493+
LangfuseGuardrail,
494+
],
495+
result: Any,
496+
*,
497+
capture_output: bool,
498+
transform_to_string: Optional[Callable[[Iterable], str]] = None,
499+
) -> Tuple[bool, Any]:
500+
if inspect.isgenerator(result):
501+
return True, self._wrap_sync_generator_result(
502+
langfuse_span_or_generation,
503+
result,
504+
capture_output,
505+
transform_to_string,
506+
)
507+
508+
if inspect.isasyncgen(result):
509+
return True, self._wrap_async_generator_result(
510+
langfuse_span_or_generation,
511+
result,
512+
capture_output,
513+
transform_to_string,
514+
)
515+
516+
# handle starlette.StreamingResponse
517+
if type(result).__name__ == "StreamingResponse" and hasattr(
518+
result, "body_iterator"
519+
):
520+
result.body_iterator = self._wrap_async_generator_result(
521+
langfuse_span_or_generation,
522+
result.body_iterator,
523+
capture_output,
524+
transform_to_string,
525+
)
526+
return True, result
527+
528+
if capture_output is True:
529+
langfuse_span_or_generation.update(output=result)
530+
531+
return False, result
532+
532533

533534
_decorator = LangfuseDecorator()
534535

@@ -553,12 +554,14 @@ def __init__(
553554
LangfuseEmbedding,
554555
LangfuseGuardrail,
555556
],
557+
capture_output: bool,
556558
transform_fn: Optional[Callable[[Iterable], str]],
557559
) -> None:
558560
self.generator = generator
559561
self.context = context
560562
self.items: List[Any] = []
561563
self.span = span
564+
self.capture_output = capture_output
562565
self.transform_fn = transform_fn
563566

564567
def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
@@ -568,21 +571,25 @@ def __next__(self) -> Any:
568571
try:
569572
# Run the generator's __next__ in the preserved context
570573
item = self.context.run(next, self.generator)
571-
self.items.append(item)
574+
if self.capture_output:
575+
self.items.append(item)
572576

573577
return item
574578

575579
except StopIteration:
576580
# Handle output and span cleanup when generator is exhausted
577-
output: Any = self.items
581+
if self.capture_output:
582+
output: Any = self.items
583+
584+
if self.transform_fn is not None:
585+
output = self.transform_fn(self.items)
578586

579-
if self.transform_fn is not None:
580-
output = self.transform_fn(self.items)
587+
elif all(isinstance(item, str) for item in self.items):
588+
output = "".join(self.items)
581589

582-
elif all(isinstance(item, str) for item in self.items):
583-
output = "".join(self.items)
590+
self.span.update(output=output)
584591

585-
self.span.update(output=output).end()
592+
self.span.end()
586593

587594
raise # Re-raise StopIteration
588595

@@ -612,12 +619,14 @@ def __init__(
612619
LangfuseEmbedding,
613620
LangfuseGuardrail,
614621
],
622+
capture_output: bool,
615623
transform_fn: Optional[Callable[[Iterable], str]],
616624
) -> None:
617625
self.generator = generator
618626
self.context = context
619627
self.items: List[Any] = []
620628
self.span = span
629+
self.capture_output = capture_output
621630
self.transform_fn = transform_fn
622631

623632
def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
@@ -636,21 +645,25 @@ async def __anext__(self) -> Any:
636645
# Python < 3.10 fallback - context parameter not supported
637646
item = await self.generator.__anext__()
638647

639-
self.items.append(item)
648+
if self.capture_output:
649+
self.items.append(item)
640650

641651
return item
642652

643653
except StopAsyncIteration:
644654
# Handle output and span cleanup when generator is exhausted
645-
output: Any = self.items
655+
if self.capture_output:
656+
output: Any = self.items
657+
658+
if self.transform_fn is not None:
659+
output = self.transform_fn(self.items)
646660

647-
if self.transform_fn is not None:
648-
output = self.transform_fn(self.items)
661+
elif all(isinstance(item, str) for item in self.items):
662+
output = "".join(self.items)
649663

650-
elif all(isinstance(item, str) for item in self.items):
651-
output = "".join(self.items)
664+
self.span.update(output=output)
652665

653-
self.span.update(output=output).end()
666+
self.span.end()
654667

655668
raise # Re-raise StopAsyncIteration
656669
except (Exception, asyncio.CancelledError) as e:

tests/unit/test_observe.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import sys
2+
3+
import pytest
4+
5+
from langfuse import observe
6+
from langfuse._client.attributes import LangfuseOtelSpanAttributes
7+
8+
9+
def _finished_spans_by_name(memory_exporter, name: str):
10+
return [span for span in memory_exporter.get_finished_spans() if span.name == name]
11+
12+
13+
def test_sync_generator_preserves_context_without_output_capture(
14+
langfuse_memory_client, memory_exporter
15+
):
16+
@observe(name="child_step")
17+
def child_step(index: int) -> str:
18+
return f"item_{index}"
19+
20+
@observe(name="root", capture_output=False)
21+
def root():
22+
def body():
23+
for index in range(2):
24+
yield child_step(index)
25+
26+
return body()
27+
28+
generator = root()
29+
30+
assert memory_exporter.get_finished_spans() == []
31+
32+
assert list(generator) == ["item_0", "item_1"]
33+
assert generator.items == []
34+
35+
langfuse_memory_client.flush()
36+
37+
root_span = _finished_spans_by_name(memory_exporter, "root")[0]
38+
child_spans = _finished_spans_by_name(memory_exporter, "child_step")
39+
40+
assert len(child_spans) == 2
41+
assert all(child.parent is not None for child in child_spans)
42+
assert all(
43+
child.parent.span_id == root_span.context.span_id for child in child_spans
44+
)
45+
assert all(
46+
child.context.trace_id == root_span.context.trace_id for child in child_spans
47+
)
48+
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in root_span.attributes
49+
50+
51+
@pytest.mark.asyncio
52+
@pytest.mark.skipif(sys.version_info < (3, 11), reason="requires python3.11 or higher")
53+
async def test_streaming_response_preserves_context_without_output_capture(
54+
langfuse_memory_client, memory_exporter
55+
):
56+
class StreamingResponse:
57+
def __init__(self, body_iterator):
58+
self.body_iterator = body_iterator
59+
60+
@observe(name="stream_step")
61+
async def stream_step(index: int) -> str:
62+
return f"chunk_{index}"
63+
64+
async def body():
65+
for index in range(2):
66+
yield await stream_step(index)
67+
68+
@observe(name="endpoint", capture_output=False)
69+
async def endpoint():
70+
return StreamingResponse(body())
71+
72+
response = await endpoint()
73+
74+
assert memory_exporter.get_finished_spans() == []
75+
76+
assert [item async for item in response.body_iterator] == ["chunk_0", "chunk_1"]
77+
assert response.body_iterator.items == []
78+
79+
langfuse_memory_client.flush()
80+
81+
endpoint_span = _finished_spans_by_name(memory_exporter, "endpoint")[0]
82+
step_spans = _finished_spans_by_name(memory_exporter, "stream_step")
83+
84+
assert len(step_spans) == 2
85+
assert all(step.parent is not None for step in step_spans)
86+
assert all(
87+
step.parent.span_id == endpoint_span.context.span_id for step in step_spans
88+
)
89+
assert all(
90+
step.context.trace_id == endpoint_span.context.trace_id for step in step_spans
91+
)
92+
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in endpoint_span.attributes

0 commit comments

Comments
 (0)