diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py index 70c10055eb..83db77973b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py @@ -1,52 +1,13 @@ from opentelemetry.metrics import Histogram, Meter -from opentelemetry.semconv._incubating.metrics import gen_ai_metrics - -_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ - 0.01, - 0.02, - 0.04, - 0.08, - 0.16, - 0.32, - 0.64, - 1.28, - 2.56, - 5.12, - 10.24, - 20.48, - 40.96, - 81.92, -] - -_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ - 1, - 4, - 16, - 64, - 256, - 1024, - 4096, - 16384, - 65536, - 262144, - 1048576, - 4194304, - 16777216, - 67108864, -] +from opentelemetry.util.genai.instruments import ( + create_duration_histogram, + create_token_histogram, + create_ttfc_histogram, +) class Instruments: def __init__(self, meter: Meter): - self.operation_duration_histogram: Histogram = meter.create_histogram( - name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, - description="GenAI operation duration", - unit="s", - explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, - ) - self.token_usage_histogram: Histogram = meter.create_histogram( - name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, - description="Measures number of input and output tokens used", - unit="{token}", - explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, - ) + self.operation_duration_histogram: Histogram = create_duration_histogram(meter) + self.token_usage_histogram: Histogram = create_token_histogram(meter) + self.ttfc_histogram: Histogram = create_ttfc_histogram(meter) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py index 30f74cfab1..7fd8357066 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py @@ -90,7 +90,10 @@ def traced_method(wrapped, instance, args, kwargs): parsed_result = result if is_streaming(kwargs): return LegacyChatStreamWrapper( - parsed_result, span, logger, capture_content + parsed_result, span, logger, capture_content, + instruments=instruments, + start_time=start, + request_attributes=span_attributes, ) if span.is_recording(): @@ -195,7 +198,10 @@ async def traced_method(wrapped, instance, args, kwargs): parsed_result = result if is_streaming(kwargs): return LegacyChatStreamWrapper( - parsed_result, span, logger, capture_content + parsed_result, span, logger, capture_content, + instruments=instruments, + start_time=start, + request_attributes=span_attributes, ) if span.is_recording(): @@ -631,6 +637,8 @@ def __init__( self.choice_buffers = [] self._started = False self.capture_content = capture_content + self._first_token_received = False + self._first_token_time: Optional[float] = None self._setup() def _setup(self): @@ -752,8 +760,25 @@ def process_chunk(self, chunk): self.set_response_model(chunk) self.set_response_service_tier(chunk) self.build_streaming_response(chunk) + self._detect_first_token(chunk) self.set_usage(chunk) + def _detect_first_token(self, chunk): + if self._first_token_received: + return + if getattr(chunk, "choices", None) is None: + return + for choice in chunk.choices: + if not choice.delta: + continue + if ( + choice.delta.content is not None + or choice.delta.tool_calls is not None + ): + self._first_token_received = True + self._first_token_time = default_timer() + return + def __getattr__(self, name): return getattr(self.stream, name) @@ -777,10 +802,16 @@ def __init__( span: Span, logger: Logger, capture_content: bool, + instruments: Optional[Instruments] = None, + start_time: Optional[float] = None, + request_attributes: Optional[dict] = None, ): super().__init__(stream, capture_content=capture_content) self.span = span self.logger = logger + self._instruments = instruments + self._start_time = start_time + self._request_attributes = request_attributes or {} def cleanup(self, error: Optional[BaseException] = None): if not self._started: @@ -863,9 +894,43 @@ def cleanup(self, error: Optional[BaseException] = None): if error: handle_span_exception(self.span, error) else: + self._record_ttft() self.span.end() self._started = False + def _record_ttft(self): + if ( + self._instruments is None + or self._start_time is None + or self._first_token_time is None + ): + return + ttft = max(self._first_token_time - self._start_time, 0.0) + common_attributes = { + GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value, + GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value, + } + if GenAIAttributes.GEN_AI_REQUEST_MODEL in self._request_attributes: + common_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] = ( + self._request_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] + ) + if self.response_model: + common_attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] = ( + self.response_model + ) + if ServerAttributes.SERVER_ADDRESS in self._request_attributes: + common_attributes[ServerAttributes.SERVER_ADDRESS] = ( + self._request_attributes[ServerAttributes.SERVER_ADDRESS] + ) + if ServerAttributes.SERVER_PORT in self._request_attributes: + common_attributes[ServerAttributes.SERVER_PORT] = ( + self._request_attributes[ServerAttributes.SERVER_PORT] + ) + self._instruments.ttfc_histogram.record( + ttft, + attributes=common_attributes, + ) + class ChatStreamWrapper(BaseStreamWrapper): handler: TelemetryHandler @@ -941,6 +1006,15 @@ def cleanup(self, error: Optional[BaseException] = None): }, ) + if ( + self._first_token_time is not None + and self.invocation.monotonic_start_s is not None + ): + self.invocation.time_to_first_token_s = max( + self._first_token_time - self.invocation.monotonic_start_s, + 0.0, + ) + self._set_output_messages() if error: diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py new file mode 100644 index 0000000000..b3eabc447f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-v2/tests/test_ttft_metrics.py @@ -0,0 +1,132 @@ +import pytest +from tests.test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT + +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + server_attributes as ServerAttributes, +) +from opentelemetry.util.genai.instruments import ( + GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK, + _GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS, + get_metric_data_points, +) + + +def test_streaming_chat_records_ttft_metric( + metric_reader, openai_client, instrument_with_content, vcr +): + """TTFT metric is recorded for streaming chat completions.""" + with vcr.use_cassette("test_chat_completion_streaming.yaml"): + response = openai_client.chat.completions.create( + model=DEFAULT_MODEL, + messages=USER_ONLY_PROMPT, + stream=True, + stream_options={"include_usage": True}, + ) + for _ in response: + pass + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 1, ( + "expected exactly one TTFC data point for streaming" + ) + + data_point = data_points[0] + assert data_point.sum >= 0 + assert data_point.count == 1 + assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS) + + assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] + == GenAIAttributes.GenAiOperationNameValues.CHAT.value + ) + assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes + assert ( + data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] + == "gpt-4o-mini" + ) + assert ServerAttributes.SERVER_ADDRESS in data_point.attributes + + +@pytest.mark.asyncio() +async def test_async_streaming_chat_records_ttft_metric( + metric_reader, async_openai_client, instrument_with_content, vcr +): + """TTFT metric is recorded for async streaming chat completions.""" + with vcr.use_cassette("test_async_chat_completion_streaming.yaml"): + response = await async_openai_client.chat.completions.create( + model=DEFAULT_MODEL, + messages=USER_ONLY_PROMPT, + stream=True, + stream_options={"include_usage": True}, + ) + async for _ in response: + pass + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 1, ( + "expected exactly one TTFC data point for async streaming" + ) + + data_point = data_points[0] + assert data_point.sum >= 0 + assert data_point.count == 1 + assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS) + + +def test_non_streaming_chat_does_not_record_ttft_metric( + metric_reader, openai_client, instrument_with_content, vcr +): + """TTFT metric should NOT be recorded for non-streaming requests.""" + with vcr.use_cassette("test_chat_completion_metrics.yaml"): + openai_client.chat.completions.create( + messages=USER_ONLY_PROMPT, model=DEFAULT_MODEL, stream=False + ) + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 0, ( + "gen_ai.client.operation.time_to_first_chunk metric should not be recorded for non-streaming" + ) + + +def test_streaming_tool_calls_records_ttft_metric( + metric_reader, openai_client, instrument_with_content, vcr +): + """TTFT metric is recorded for streaming responses with tool calls.""" + with vcr.use_cassette( + "test_chat_completion_multiple_tools_streaming_with_content.yaml" + ): + response = openai_client.chat.completions.create( + model=DEFAULT_MODEL, + messages=[{"role": "user", "content": "What's the weather?"}], + stream=True, + stream_options={"include_usage": True}, + tools=[ + { + "type": "function", + "function": { + "name": "get_weather", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string"}, + }, + }, + }, + } + ], + ) + for _ in response: + pass + + data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) + assert len(data_points) == 1, ( + "expected exactly one TTFC data point for streaming tool calls" + ) + + data_point = data_points[0] + assert data_point.sum >= 0 + assert data_point.count == 1 diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py index a1bd55811c..13eada3a2d 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py @@ -113,6 +113,8 @@ def __init__( # pylint: disable=too-many-locals self.seed = seed self.server_address = server_address self.server_port = server_port + self.time_to_first_token_s: float | None = None + """Time to first token in seconds (streaming responses only).""" self._start() def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: @@ -283,6 +285,8 @@ class LLMInvocation: seed: int | None = None server_address: str | None = None server_port: int | None = None + time_to_first_token_s: float | None = None + """Time to first token in seconds (streaming responses only).""" _inference_invocation: InferenceInvocation | None = field( default=None, init=False, repr=False @@ -347,6 +351,7 @@ def _sync_to_invocation(self) -> None: inv.server_port = self.server_port inv.attributes = self.attributes inv.metric_attributes = self.metric_attributes + inv.time_to_first_token_s = self.time_to_first_token_s @property def span(self) -> Span: @@ -356,3 +361,10 @@ def span(self) -> Span: if self._inference_invocation is not None else INVALID_SPAN ) + + @property + def monotonic_start_s(self) -> float | None: + """Monotonic start time, delegated from the underlying InferenceInvocation.""" + if self._inference_invocation is not None: + return self._inference_invocation._monotonic_start_s + return None diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py index d1bad284c1..2b852c69b5 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -35,6 +35,27 @@ 67108864, ] +GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK = ( + "gen_ai.client.operation.time_to_first_chunk" +) + +_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + def create_duration_histogram(meter: Meter) -> Histogram: return meter.create_histogram( @@ -52,3 +73,25 @@ def create_token_histogram(meter: Meter) -> Histogram: unit="{token}", explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, ) + + +def create_ttfc_histogram(meter: Meter) -> Histogram: + return meter.create_histogram( + name=GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK, + description="Time to receive the first chunk in a streaming response", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS, + ) + + +def get_metric_data_points(metric_reader, metric_name): + """Extract all data points for a given metric name from a metric reader.""" + results = [] + metrics = metric_reader.get_metrics_data().resource_metrics + if not metrics: + return results + for scope_metrics in metrics[0].scope_metrics: + for m in scope_metrics.metrics: + if m.name == metric_name: + results.extend(m.data.data_points) + return results diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py index 144f4663b6..6ac7e1c17b 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -9,9 +9,11 @@ from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) +from opentelemetry.semconv.attributes import error_attributes from opentelemetry.util.genai.instruments import ( create_duration_histogram, create_token_histogram, + create_ttfc_histogram, ) from ._invocation import GenAIInvocation @@ -23,6 +25,7 @@ class InvocationMetricsRecorder: def __init__(self, meter: Meter): self._duration_histogram: Histogram = create_duration_histogram(meter) self._token_histogram: Histogram = create_token_histogram(meter) + self._ttfc_histogram: Histogram = create_ttfc_histogram(meter) def record(self, invocation: GenAIInvocation) -> None: """Record duration and token metrics for an invocation if possible.""" @@ -50,5 +53,13 @@ def record(self, invocation: GenAIInvocation) -> None: context=invocation._span_context, ) + ttfc_s = getattr(invocation, 'time_to_first_token_s', None) + if ttfc_s is not None and error_attributes.ERROR_TYPE not in attributes: + self._ttfc_histogram.record( + ttfc_s, + attributes=attributes, + context=invocation._span_context, + ) + __all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 103c168023..19c6278d33 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -235,6 +235,7 @@ class OutputMessage: finish_reason: str | FinishReason + @dataclass class Error: message: str