diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index 767dfcc7ed..a23d8b33d5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add LangChain workflow span support and refactor LLM invocation + ([#4449](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4449)) - Fix compatibility with wrapt 2.x by using positional arguments in `wrap_function_wrapper()` calls ([#4445](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4445)) - Added span support for genAI langchain llm invocation. diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py new file mode 100644 index 0000000000..b48865ee21 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py @@ -0,0 +1,113 @@ +""" +LangGraph StateGraph example with an LLM node. + +Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph +with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces +the LLM calls made from within the graph node. +""" + +from typing import Annotated + +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI +from langgraph.graph import END, START, StateGraph +from langgraph.graph.message import add_messages +from typing_extensions import TypedDict + +from opentelemetry import _logs, metrics, trace +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.langchain import LangChainInstrumentor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +# Configure tracing +trace.set_tracer_provider(TracerProvider()) +span_processor = BatchSpanProcessor(OTLPSpanExporter()) +trace.get_tracer_provider().add_span_processor(span_processor) + +# Configure logging +_logs.set_logger_provider(LoggerProvider()) +_logs.get_logger_provider().add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter()) +) + +# Configure metrics +metrics.set_meter_provider( + MeterProvider( + metric_readers=[ + PeriodicExportingMetricReader( + OTLPMetricExporter(), + ), + ] + ) +) + + +class GraphState(TypedDict): + """State for the graph; messages are accumulated with add_messages.""" + + messages: Annotated[list, add_messages] + + +def build_graph(llm: ChatOpenAI): + """Build a StateGraph with a single LLM node.""" + + def llm_node(state: GraphState) -> dict: + """Node that invokes the LLM with the current messages.""" + response = llm.invoke(state["messages"]) + return {"messages": [response]} + + builder = StateGraph(GraphState) + builder.add_node("llm", llm_node) + builder.add_edge(START, "llm") + builder.add_edge("llm", END) + return builder.compile() + + +def main(): + # Set up instrumentation (traces LLM calls from within graph nodes) + LangChainInstrumentor().instrument() + + # ChatOpenAI setup + llm = ChatOpenAI( + model="gpt-3.5-turbo", + temperature=0.1, + max_tokens=100, + top_p=0.9, + frequency_penalty=0.5, + presence_penalty=0.5, + stop_sequences=["\n", "Human:", "AI:"], + seed=100, + ) + + graph = build_graph(llm) + + initial_messages = [ + SystemMessage(content="You are a helpful assistant!"), + HumanMessage(content="What is the capital of France?"), + ] + + result = graph.invoke({"messages": initial_messages}) + + print("LangGraph output (messages):") + for msg in result.get("messages", []): + print(f" {type(msg).__name__}: {msg.content}") + + # Un-instrument after use + LangChainInstrumentor().uninstrument() + + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt new file mode 100644 index 0000000000..f27cb4a3c1 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt @@ -0,0 +1,8 @@ +langchain==0.3.21 +langchain_openai +langgraph +opentelemetry-sdk>=1.39.0 +opentelemetry-exporter-otlp-proto-grpc>=1.39.0 + +# Uncomment after langchain instrumentation is released +# opentelemetry-instrumentation-langchain~=2.0b0.dev \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml index 80a406b7da..a87b82f999 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml @@ -25,7 +25,8 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.57b0", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-util-genai >= 0.4b0.dev", ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 5235af3c7f..3c214d9e65 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -25,10 +25,12 @@ _InvocationManager, ) from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.invocation import ( + InferenceInvocation, + WorkflowInvocation, +) from opentelemetry.util.genai.types import ( - Error, InputMessage, - LLMInvocation, # TODO: migrate to InferenceInvocation MessagePart, OutputMessage, Text, @@ -45,6 +47,82 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None: self._telemetry_handler = telemetry_handler self._invocation_manager = _InvocationManager() + def on_chain_start( + self, + serialized: dict[str, Any], + inputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> Any: + payload = serialized or {} + name_source = ( + payload.get("name") + or payload.get("id") + or kwargs.get("name") + or (metadata.get("langgraph_node") if metadata else None) + ) + name = str(name_source or "chain") + + if parent_run_id is None: + workflow_name_override = ( + metadata.get("workflow_name") if metadata else None + ) + wf = self._telemetry_handler.start_workflow( + name=workflow_name_override or name + ) + self._invocation_manager.add_invocation_state(run_id, None, wf) + return + else: + # TODO: For agent invocation + self._invocation_manager.add_invocation_state( + run_id, + parent_run_id, + None, # type: ignore[arg-type] + ) + + def on_chain_end( + self, + outputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + invocation = self._invocation_manager.get_invocation(run_id=run_id) + if invocation is None or not isinstance( + invocation, WorkflowInvocation + ): + # If the invocation does not exist, we cannot set attributes or end it + return + + invocation.stop() + + if not invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id) + + def on_chain_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + invocation = self._invocation_manager.get_invocation(run_id=run_id) + if invocation is None or not isinstance( + invocation, WorkflowInvocation + ): + # If the invocation does not exist, we cannot set attributes or end it + return + + invocation.fail(error) + if not invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) + def on_chat_model_start( self, serialized: dict[str, Any], @@ -140,25 +218,22 @@ def on_chat_model_start( ) ) - llm_invocation = LLMInvocation( + llm_invocation = self._telemetry_handler.start_inference( + provider, request_model=request_model, - input_messages=input_messages, - provider=provider, - top_p=top_p, - frequency_penalty=frequency_penalty, - presence_penalty=presence_penalty, - stop_sequences=stop_sequences, - seed=seed, - temperature=temperature, - max_tokens=max_tokens, - ) - llm_invocation = self._telemetry_handler.start_llm( - invocation=llm_invocation ) + llm_invocation.input_messages = input_messages + llm_invocation.top_p = top_p + llm_invocation.frequency_penalty = frequency_penalty + llm_invocation.presence_penalty = presence_penalty + llm_invocation.stop_sequences = stop_sequences + llm_invocation.seed = seed + llm_invocation.temperature = temperature + llm_invocation.max_tokens = max_tokens self._invocation_manager.add_invocation_state( run_id=run_id, parent_run_id=parent_run_id, - invocation=llm_invocation, # pyright: ignore[reportArgumentType] + invocation=llm_invocation, ) def on_llm_end( @@ -172,7 +247,7 @@ def on_llm_end( llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) if llm_invocation is None or not isinstance( llm_invocation, - LLMInvocation, + InferenceInvocation, ): # If the invocation does not exist, we cannot set attributes or end it return @@ -247,10 +322,8 @@ def on_llm_end( if response_id is not None: llm_invocation.response_id = str(response_id) - llm_invocation = self._telemetry_handler.stop_llm( - invocation=llm_invocation - ) - if llm_invocation.span and not llm_invocation.span.is_recording(): + llm_invocation.stop() + if not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) def on_llm_error( @@ -264,14 +337,11 @@ def on_llm_error( llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) if llm_invocation is None or not isinstance( llm_invocation, - LLMInvocation, + InferenceInvocation, ): # If the invocation does not exist, we cannot set attributes or end it return - error_otel = Error(message=str(error), type=type(error)) - llm_invocation = self._telemetry_handler.fail_llm( - invocation=llm_invocation, error=error_otel - ) - if llm_invocation.span and not llm_invocation.span.is_recording(): + llm_invocation.fail(error) + if not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py index e8d2293bae..139e9f4a6c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py @@ -23,7 +23,7 @@ @dataclass class _InvocationState: - invocation: GenAIInvocation + invocation: Optional[GenAIInvocation] children: List[UUID] = field(default_factory=lambda: list()) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py new file mode 100644 index 0000000000..e1ce91a564 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py @@ -0,0 +1,583 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests verifying CSA propagation through the LangChain callback handler. + +These tests exercise the OpenTelemetryLangChainCallbackHandler directly using +mock LangChain callback payloads, so they do not require live models or VCR +cassettes. The key behaviour under test: + + on_chain_start(parent_run_id=None) + → WorkflowInvocation created + → TelemetryHandler.start() called + → gen_ai.workflow.name written to context-scoped attributes (CSA) + + on_chat_model_start(...) + → LLMInvocation created + → TelemetryHandler.start() called + → gen_ai.workflow.name read from CSA and stamped on the LLM invocation + + on_llm_end(...) + → LLM span closed with gen_ai.workflow.name attribute set + + on_chain_end(...) + → workflow span closed, CSA scope ends +""" + +from __future__ import annotations + +import os +import uuid +from unittest import TestCase +from unittest.mock import patch + +from langchain_core.messages import AIMessage +from langchain_core.outputs import ChatGeneration, LLMResult + +from opentelemetry import baggage +from opentelemetry.instrumentation.langchain.callback_handler import ( + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.trace import SpanKind +from opentelemetry.trace.status import StatusCode +from opentelemetry.util.genai.context_attributes import ( + get_context_scoped_attributes, +) +from opentelemetry.util.genai.handler import TelemetryHandler + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _make_serialized(name: str) -> dict: + """Minimal serialized dict that on_chain_start / on_chat_model_start expect.""" + return {"name": name} + + +def _make_llm_result(content: str = "hello") -> LLMResult: + """Minimal LLMResult with one generation.""" + msg = AIMessage(content=content) + gen = ChatGeneration(message=msg, text=content) + gen.generation_info = {"finish_reason": "stop"} + return LLMResult( + generations=[[gen]], llm_output={"model_name": "gpt-3.5-turbo"} + ) + + +def _make_chat_invocation_params(model_name: str = "gpt-3.5-turbo") -> dict: + """kwargs dict that on_chat_model_start receives for a ChatOpenAI call.""" + return { + "invocation_params": { + "model_name": model_name, + "params": {"model_name": model_name}, + } + } + + +# --------------------------------------------------------------------------- +# Base test class +# --------------------------------------------------------------------------- + + +class _CallbackHandlerTestBase(TestCase): + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + telemetry_handler = TelemetryHandler(tracer_provider=tracer_provider) + self.handler = OpenTelemetryLangChainCallbackHandler( + telemetry_handler=telemetry_handler + ) + + def _finished_spans(self): + return self.span_exporter.get_finished_spans() + + def _spans_by_kind(self, kind: SpanKind): + return [s for s in self._finished_spans() if s.kind == kind] + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestWorkflowSpanCreation(_CallbackHandlerTestBase): + """Verify that a workflow span is created for top-level chains.""" + + def test_workflow_span_created_for_top_level_chain(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow MyChain") + + def test_no_workflow_span_for_nested_chain(self) -> None: + """Chains with a parent_run_id are nested — no extra workflow span.""" + parent_run_id = uuid.uuid4() + child_run_id = uuid.uuid4() + + # Start parent (top-level) + self.handler.on_chain_start( + serialized=_make_serialized("ParentChain"), + inputs={}, + run_id=parent_run_id, + parent_run_id=None, + ) + # Start child (nested — should NOT create a workflow span) + self.handler.on_chain_start( + serialized=_make_serialized("ChildChain"), + inputs={}, + run_id=child_run_id, + parent_run_id=parent_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=child_run_id, + parent_run_id=parent_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=parent_run_id, + parent_run_id=None, + ) + + # Only one INTERNAL (workflow) span — for the parent, not the child + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow ParentChain") + + +class TestLLMSpanGetsWorkflowName(_CallbackHandlerTestBase): + """Verify gen_ai.workflow.name is propagated to the LLM span via CSA.""" + + def test_llm_span_inside_chain_gets_workflow_name(self) -> None: + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyPipeline"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_end( + response=_make_llm_result(), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(client_spans), 1) + self.assertEqual( + client_spans[0].attributes.get("gen_ai.workflow.name"), + "MyPipeline", + ) + + def test_workflow_name_from_metadata_override(self) -> None: + """metadata['workflow_name'] overrides the serialized chain name.""" + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("InternalChainName"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + metadata={"workflow_name": "my_custom_wf"}, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_end( + response=_make_llm_result(), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual( + internal_spans[0].name, "invoke_workflow my_custom_wf" + ) + + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(client_spans), 1) + self.assertEqual( + client_spans[0].attributes.get("gen_ai.workflow.name"), + "my_custom_wf", + ) + + +class TestCSANotLeakedToBaggage(_CallbackHandlerTestBase): + """Verify that gen_ai.workflow.name is NOT written to W3C Baggage by default.""" + + def test_csa_not_leaked_to_baggage(self) -> None: + env = { + k: v + for k, v in os.environ.items() + if k != "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE" + } + with patch.dict(os.environ, env, clear=True): + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("BaggageTestChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + try: + # While workflow is active, baggage should NOT contain the workflow name + baggage_value = baggage.get_baggage("gen_ai.workflow.name") + self.assertIsNone( + baggage_value, + "gen_ai.workflow.name must not be leaked to W3C Baggage by default", + ) + finally: + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + +class TestCSAScopeEndsAfterChain(_CallbackHandlerTestBase): + """Verify that the CSA is no longer visible after the chain ends.""" + + def test_csa_not_visible_outside_workflow_scope(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("ScopedChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + # After on_chain_end the context token is detached — CSA should be gone + attrs = get_context_scoped_attributes() + self.assertIsNone( + attrs.get("gen_ai.workflow.name"), + "gen_ai.workflow.name should not be visible after workflow scope ends", + ) + + +class TestWorkflowErrorPath(_CallbackHandlerTestBase): + """Verify on_chain_error records error status and cleans up state.""" + + def test_workflow_span_has_error_status_on_chain_error(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=ValueError("something went wrong"), + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + span = internal_spans[0] + self.assertEqual(span.name, "invoke_workflow FailingChain") + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertIn("something went wrong", span.status.description) + + def test_workflow_span_error_type_attribute(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=RuntimeError("boom"), + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual( + internal_spans[0].attributes.get("error.type"), "RuntimeError" + ) + + def test_chain_error_cleans_up_invocation_state(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=ValueError("oops"), + run_id=chain_run_id, + parent_run_id=None, + ) + + # Invocation should have been removed — a second error call is a no-op + self.handler.on_chain_error( + error=ValueError("duplicate"), + run_id=chain_run_id, + parent_run_id=None, + ) + # Only one span — the second call was silently ignored + self.assertEqual(len(self._finished_spans()), 1) + + def test_chain_error_csa_scope_ends(self) -> None: + """CSA should be gone after on_chain_error, same as on_chain_end.""" + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=ValueError("oops"), + run_id=chain_run_id, + parent_run_id=None, + ) + + attrs = get_context_scoped_attributes() + self.assertIsNone( + attrs.get("gen_ai.workflow.name"), + "gen_ai.workflow.name should not be visible after workflow error", + ) + + def test_chain_error_unknown_run_id_is_noop(self) -> None: + """on_chain_error with an unknown run_id must not raise.""" + self.handler.on_chain_error( + error=ValueError("no matching invocation"), + run_id=uuid.uuid4(), + parent_run_id=None, + ) + self.assertEqual(len(self._finished_spans()), 0) + + def test_chain_end_unknown_run_id_is_noop(self) -> None: + """on_chain_end with an unknown run_id must not raise.""" + self.handler.on_chain_end( + outputs={}, + run_id=uuid.uuid4(), + parent_run_id=None, + ) + self.assertEqual(len(self._finished_spans()), 0) + + +class TestLLMErrorInsideWorkflow(_CallbackHandlerTestBase): + """Verify on_llm_error inside a workflow doesn't break the parent workflow span.""" + + def test_llm_error_inside_workflow_records_error_on_llm_span(self) -> None: + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyPipeline"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_error( + error=RuntimeError("model timeout"), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(client_spans), 1) + self.assertEqual(client_spans[0].status.status_code, StatusCode.ERROR) + + # Workflow span still finishes (not in error state) + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertNotEqual( + internal_spans[0].status.status_code, StatusCode.ERROR + ) + + def test_llm_error_inside_workflow_llm_span_is_child_of_workflow( + self, + ) -> None: + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyPipeline"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_end( + response=_make_llm_result(), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(len(client_spans), 1) + + workflow_span = internal_spans[0] + llm_span = client_spans[0] + self.assertEqual( + llm_span.context.trace_id, + workflow_span.context.trace_id, + "LLM span and workflow span must share the same trace", + ) + self.assertEqual( + llm_span.parent.span_id, + workflow_span.context.span_id, + "LLM span must be a child of the workflow span", + ) + + +class TestWorkflowNameFallback(_CallbackHandlerTestBase): + """Verify the name resolution fallback chain in on_chain_start.""" + + def test_name_falls_back_to_id_list(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized={"id": ["pkg", "mod", "MyRunnableClass"]}, + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end( + outputs={}, run_id=chain_run_id, parent_run_id=None + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + # id is a list — _safe_str(list) produces a string; just verify a span was created + self.assertTrue(internal_spans[0].name.startswith("invoke_workflow ")) + + def test_name_falls_back_to_langgraph_node(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized={}, + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + metadata={"langgraph_node": "my_node"}, + ) + self.handler.on_chain_end( + outputs={}, run_id=chain_run_id, parent_run_id=None + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow my_node") + + def test_name_defaults_to_chain_when_nothing_provided(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized={}, + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end( + outputs={}, run_id=chain_run_id, parent_run_id=None + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow chain") diff --git a/tests/opentelemetry-docker-tests/tests/test-requirements.txt b/tests/opentelemetry-docker-tests/tests/test-requirements.txt index 434f44d597..50dfa259c9 100644 --- a/tests/opentelemetry-docker-tests/tests/test-requirements.txt +++ b/tests/opentelemetry-docker-tests/tests/test-requirements.txt @@ -58,7 +58,7 @@ python-dotenv==0.21.1 pytz==2024.1 PyYAML==5.3.1 redis==5.0.1 -requests==2.25.0 +requests==2.32.4 six==1.16.0 SQLAlchemy==1.4.52 texttable==1.7.0 diff --git a/uv.lock b/uv.lock index 14d5ea35b6..b532c37e77 100644 --- a/uv.lock +++ b/uv.lock @@ -3530,6 +3530,7 @@ name = "opentelemetry-instrumentation-langchain" source = { editable = "instrumentation-genai/opentelemetry-instrumentation-langchain" } dependencies = [ { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-util-genai" }, ] [package.optional-dependencies] @@ -3541,6 +3542,7 @@ instruments = [ requires-dist = [ { name = "langchain", marker = "extra == 'instruments'", specifier = ">=0.3.21" }, { name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" }, + { name = "opentelemetry-util-genai", editable = "util/opentelemetry-util-genai" }, ] provides-extras = ["instruments"]