From acbf5c4cafc3cd9434625b8e8ab00454565e36f2 Mon Sep 17 00:00:00 2001 From: Nik-Reddy Date: Thu, 16 Apr 2026 14:06:11 -0700 Subject: [PATCH] feat(openai): Add gen_ai.client.operation.time_to_first_chunk metric for streaming Implement the gen_ai.client.operation.time_to_first_chunk histogram metric as defined in OpenTelemetry Semantic Conventions v1.38.0. This metric records the time (in seconds) from request start to the first output chunk received during streaming chat completions. Changes: - Add time_to_first_token_s field to LLMInvocation dataclass - Add create_ttfc_histogram() factory with semconv-specified bucket boundaries - InvocationMetricsRecorder now creates and records TTFC histogram - First-token detection in stream wrappers for both new and legacy paths - 4 test cases: sync/async streaming, non-streaming exclusion, tool-call streaming Fixes #3932 --- .../instrumentation/openai_v2/instruments.py | 55 ++------ .../instrumentation/openai_v2/patch.py | 78 ++++++++++- .../tests/test_ttft_metrics.py | 132 ++++++++++++++++++ .../util/genai/_inference_invocation.py | 12 ++ .../opentelemetry/util/genai/instruments.py | 43 ++++++ .../src/opentelemetry/util/genai/metrics.py | 11 ++ .../src/opentelemetry/util/genai/types.py | 1 + 7 files changed, 283 insertions(+), 49 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py index 70c10055eb..83db77973b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py @@ -1,52 +1,13 @@ from opentelemetry.metrics import Histogram, Meter -from opentelemetry.semconv._incubating.metrics import gen_ai_metrics - -_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ - 0.01, - 0.02, - 0.04, - 0.08, - 0.16, - 0.32, - 0.64, - 1.28, - 2.56, - 5.12, - 10.24, - 20.48, - 40.96, - 81.92, -] - -_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ - 1, - 4, - 16, - 64, - 256, - 1024, - 4096, - 16384, - 65536, - 262144, - 1048576, - 4194304, - 16777216, - 67108864, -] +from opentelemetry.util.genai.instruments import ( + create_duration_histogram, + create_token_histogram, + create_ttfc_histogram, +) class Instruments: def __init__(self, meter: Meter): - self.operation_duration_histogram: Histogram = meter.create_histogram( - name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, - description="GenAI operation duration", - unit="s", - explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, - ) - self.token_usage_histogram: Histogram = meter.create_histogram( - name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, - description="Measures number of input and output tokens used", - unit="{token}", - explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, - ) + self.operation_duration_histogram: Histogram = create_duration_histogram(meter) + self.token_usage_histogram: Histogram = create_token_histogram(meter) + self.ttfc_histogram: Histogram = create_ttfc_histogram(meter) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 30f74cfab1..7fd8357066 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -90,7 +90,10 @@ def traced_method(wrapped, instance, args, kwargs): parsed_result = result if is_streaming(kwargs): return LegacyChatStreamWrapper( - parsed_result, span, logger, capture_content + parsed_result, span, logger, capture_content, + instruments=instruments, + start_time=start, + request_attributes=span_attributes, ) if span.is_recording(): @@ -195,7 +198,10 @@ async def traced_method(wrapped, instance, args, kwargs): parsed_result = result if is_streaming(kwargs): return LegacyChatStreamWrapper( - parsed_result, span, logger, capture_content + parsed_result, span, logger, capture_content, + instruments=instruments, + start_time=start, + request_attributes=span_attributes, ) if span.is_recording(): @@ -631,6 +637,8 @@ def __init__( self.choice_buffers = [] self._started = False self.capture_content = capture_content + self._first_token_received = False + self._first_token_time: Optional[float] = None self._setup() def _setup(self): @@ -752,8 +760,25 @@ def process_chunk(self, chunk): self.set_response_model(chunk) self.set_response_service_tier(chunk) self.build_streaming_response(chunk) + self._detect_first_token(chunk) self.set_usage(chunk) + def _detect_first_token(self, chunk): + if self._first_token_received: + return + if getattr(chunk, "choices", None) is None: + return + for choice in chunk.choices: + if not choice.delta: + continue + if ( + choice.delta.content is not None + or choice.delta.tool_calls is not None + ): + self._first_token_received = True + self._first_token_time = default_timer() + return + def __getattr__(self, name): return getattr(self.stream, name) @@ -777,10 +802,16 @@ def __init__( span: Span, logger: Logger, capture_content: bool, + instruments: Optional[Instruments] = None, + start_time: Optional[float] = None, + request_attributes: Optional[dict] = None, ): super().__init__(stream, capture_content=capture_content) self.span = span self.logger = logger + self._instruments = instruments + self._start_time = start_time + self._request_attributes = request_attributes or {} def cleanup(self, error: Optional[BaseException] = None): if not self._started: @@ -863,9 +894,43 @@ def cleanup(self, error: Optional[BaseException] = None): if error: handle_span_exception(self.span, error) else: + self._record_ttft() self.span.end() self._started = False + def _record_ttft(self): + if ( + self._instruments is None + or self._start_time is None + or self._first_token_time is None + ): + return + ttft = max(self._first_token_time - self._start_time, 0.0) + common_attributes = { + GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value, + GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value, + } + if GenAIAttributes.GEN_AI_REQUEST_MODEL in self._request_attributes: + common_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] = ( + self._request_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] + ) + if self.response_model: + common_attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] = ( + self.response_model + ) + if ServerAttributes.SERVER_ADDRESS in self._request_attributes: + common_attributes[ServerAttributes.SERVER_ADDRESS] = ( + self._request_attributes[ServerAttributes.SERVER_ADDRESS] + ) + if ServerAttributes.SERVER_PORT in self._request_attributes: + common_attributes[ServerAttributes.SERVER_PORT] = ( + self._request_attributes[ServerAttributes.SERVER_PORT] + ) + self._instruments.ttfc_histogram.record( + ttft, + attributes=common_attributes, + ) + class ChatStreamWrapper(BaseStreamWrapper): handler: TelemetryHandler @@ -941,6 +1006,15 @@ def cleanup(self, error: Optional[BaseException] = None): }, ) + if ( + self._first_token_time is not None + and self.invocation.monotonic_start_s is not None + ): + self.invocation.time_to_first_token_s = max( + self._first_token_time - self.invocation.monotonic_start_s, + 0.0, + ) + self._set_output_messages() if error: diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py new file mode 100644 index 0000000000..b3eabc447f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py @@ -0,0 +1,132 @@ +import pytest +from tests.test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT + +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + server_attributes as ServerAttributes, +) +from opentelemetry.util.genai.instruments import ( + GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK, + _GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS, + get_metric_data_points, +) + + +def test_streaming_chat_records_ttft_metric( + metric_reader, openai_client, instrument_with_content, vcr +): + """TTFT metric is recorded for streaming chat completions.""" + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create( + model=DEFAULT_MODEL, + messages=USER_ONLY_PROMPT, + stream=True, + stream_options={"include_usage": True}, + ) + for _ in response: + pass + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 1, ( + "expected exactly one TTFC data point for streaming" + ) + + data_point = data_points[0] + assert data_point.sum >= 0 + assert data_point.count == 1 + assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS) + + assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] + == GenAIAttributes.GenAiOperationNameValues.CHAT.value + ) + assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] + == "gpt-4o-mini" + ) + assert ServerAttributes.SERVER_ADDRESS in data_point.attributes + + +@pytest.mark.asyncio() +async def test_async_streaming_chat_records_ttft_metric( + metric_reader, async_openai_client, instrument_with_content, vcr +): + """TTFT metric is recorded for async streaming chat completions.""" + with vcr.use_cassette("test_async_chat_completion_streaming.yaml"): + response = await async_openai_client.chat.completions.create( + model=DEFAULT_MODEL, + messages=USER_ONLY_PROMPT, + stream=True, + stream_options={"include_usage": True}, + ) + async for _ in response: + pass + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 1, ( + "expected exactly one TTFC data point for async streaming" + ) + + data_point = data_points[0] + assert data_point.sum >= 0 + assert data_point.count == 1 + assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS) + + +def test_non_streaming_chat_does_not_record_ttft_metric( + metric_reader, openai_client, instrument_with_content, vcr +): + """TTFT metric should NOT be recorded for non-streaming requests.""" + with vcr.use_cassette("test_chat_completion_metrics.yaml"): + openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, model=DEFAULT_MODEL, stream=False + ) + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 0, ( + "gen_ai.client.operation.time_to_first_chunk metric should not be recorded for non-streaming" + ) + + +def test_streaming_tool_calls_records_ttft_metric( + metric_reader, openai_client, instrument_with_content, vcr +): + """TTFT metric is recorded for streaming responses with tool calls.""" + with vcr.use_cassette( + "test_chat_completion_multiple_tools_streaming_with_content.yaml" + ): + response = openai_client.chat.completions.create( + model=DEFAULT_MODEL, + messages=[{"role": "user", "content": "What's the weather?"}], + stream=True, + stream_options={"include_usage": True}, + tools=[ + { + "type": "function", + "function": { + "name": "get_weather", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string"}, + }, + }, + }, + } + ], + ) + for _ in response: + pass + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 1, ( + "expected exactly one TTFC data point for streaming tool calls" + ) + + data_point = data_points[0] + assert data_point.sum >= 0 + assert data_point.count == 1 diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py index a1bd55811c..13eada3a2d 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py @@ -113,6 +113,8 @@ def __init__( # pylint: disable=too-many-locals self.seed = seed self.server_address = server_address self.server_port = server_port + self.time_to_first_token_s: float | None = None + """Time to first token in seconds (streaming responses only).""" self._start() def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: @@ -283,6 +285,8 @@ class LLMInvocation: seed: int | None = None server_address: str | None = None server_port: int | None = None + time_to_first_token_s: float | None = None + """Time to first token in seconds (streaming responses only).""" _inference_invocation: InferenceInvocation | None = field( default=None, init=False, repr=False @@ -347,6 +351,7 @@ def _sync_to_invocation(self) -> None: inv.server_port = self.server_port inv.attributes = self.attributes inv.metric_attributes = self.metric_attributes + inv.time_to_first_token_s = self.time_to_first_token_s @property def span(self) -> Span: @@ -356,3 +361,10 @@ def span(self) -> Span: if self._inference_invocation is not None else INVALID_SPAN ) + + @property + def monotonic_start_s(self) -> float | None: + """Monotonic start time, delegated from the underlying InferenceInvocation.""" + if self._inference_invocation is not None: + return self._inference_invocation._monotonic_start_s + return None diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py index d1bad284c1..2b852c69b5 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -35,6 +35,27 @@ 67108864, ] +GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK = ( + "gen_ai.client.operation.time_to_first_chunk" +) + +_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + def create_duration_histogram(meter: Meter) -> Histogram: return meter.create_histogram( @@ -52,3 +73,25 @@ def create_token_histogram(meter: Meter) -> Histogram: unit="{token}", explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, ) + + +def create_ttfc_histogram(meter: Meter) -> Histogram: + return meter.create_histogram( + name=GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK, + description="Time to receive the first chunk in a streaming response", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS, + ) + + +def get_metric_data_points(metric_reader, metric_name): + """Extract all data points for a given metric name from a metric reader.""" + results = [] + metrics = metric_reader.get_metrics_data().resource_metrics + if not metrics: + return results + for scope_metrics in metrics[0].scope_metrics: + for m in scope_metrics.metrics: + if m.name == metric_name: + results.extend(m.data.data_points) + return results diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 144f4663b6..6ac7e1c17b 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -9,9 +9,11 @@ from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) +from opentelemetry.semconv.attributes import error_attributes from opentelemetry.util.genai.instruments import ( create_duration_histogram, create_token_histogram, + create_ttfc_histogram, ) from ._invocation import GenAIInvocation @@ -23,6 +25,7 @@ class InvocationMetricsRecorder: def __init__(self, meter: Meter): self._duration_histogram: Histogram = create_duration_histogram(meter) self._token_histogram: Histogram = create_token_histogram(meter) + self._ttfc_histogram: Histogram = create_ttfc_histogram(meter) def record(self, invocation: GenAIInvocation) -> None: """Record duration and token metrics for an invocation if possible.""" @@ -50,5 +53,13 @@ def record(self, invocation: GenAIInvocation) -> None: context=invocation._span_context, ) + ttfc_s = getattr(invocation, 'time_to_first_token_s', None) + if ttfc_s is not None and error_attributes.ERROR_TYPE not in attributes: + self._ttfc_histogram.record( + ttfc_s, + attributes=attributes, + context=invocation._span_context, + ) + __all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 103c168023..19c6278d33 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -235,6 +235,7 @@ class OutputMessage: finish_reason: str | FinishReason + @dataclass class Error: message: str