-
Notifications
You must be signed in to change notification settings - Fork 931
feat(openai): Add gen_ai.client.operation.time_to_first_chunk metric for streaming #4415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,52 +1,13 @@ | ||
| from opentelemetry.metrics import Histogram, Meter | ||
| from opentelemetry.semconv._incubating.metrics import gen_ai_metrics | ||
|
|
||
| _GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ | ||
| 0.01, | ||
| 0.02, | ||
| 0.04, | ||
| 0.08, | ||
| 0.16, | ||
| 0.32, | ||
| 0.64, | ||
| 1.28, | ||
| 2.56, | ||
| 5.12, | ||
| 10.24, | ||
| 20.48, | ||
| 40.96, | ||
| 81.92, | ||
| ] | ||
|
|
||
| _GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ | ||
| 1, | ||
| 4, | ||
| 16, | ||
| 64, | ||
| 256, | ||
| 1024, | ||
| 4096, | ||
| 16384, | ||
| 65536, | ||
| 262144, | ||
| 1048576, | ||
| 4194304, | ||
| 16777216, | ||
| 67108864, | ||
| ] | ||
| from opentelemetry.util.genai.instruments import ( | ||
| create_duration_histogram, | ||
| create_token_histogram, | ||
| create_ttfc_histogram, | ||
| ) | ||
|
|
||
|
|
||
| class Instruments: | ||
| def __init__(self, meter: Meter): | ||
| self.operation_duration_histogram: Histogram = meter.create_histogram( | ||
| name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, | ||
| description="GenAI operation duration", | ||
| unit="s", | ||
| explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, | ||
| ) | ||
| self.token_usage_histogram: Histogram = meter.create_histogram( | ||
| name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, | ||
| description="Measures number of input and output tokens used", | ||
| unit="{token}", | ||
| explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, | ||
| ) | ||
| self.operation_duration_histogram: Histogram = create_duration_histogram(meter) | ||
| self.token_usage_histogram: Histogram = create_token_histogram(meter) | ||
| self.ttfc_histogram: Histogram = create_ttfc_histogram(meter) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,7 +90,10 @@ def traced_method(wrapped, instance, args, kwargs): | |
| parsed_result = result | ||
| if is_streaming(kwargs): | ||
| return LegacyChatStreamWrapper( | ||
| parsed_result, span, logger, capture_content | ||
| parsed_result, span, logger, capture_content, | ||
| instruments=instruments, | ||
| start_time=start, | ||
| request_attributes=span_attributes, | ||
| ) | ||
|
|
||
| if span.is_recording(): | ||
|
|
@@ -195,7 +198,10 @@ async def traced_method(wrapped, instance, args, kwargs): | |
| parsed_result = result | ||
| if is_streaming(kwargs): | ||
| return LegacyChatStreamWrapper( | ||
| parsed_result, span, logger, capture_content | ||
| parsed_result, span, logger, capture_content, | ||
| instruments=instruments, | ||
| start_time=start, | ||
| request_attributes=span_attributes, | ||
| ) | ||
|
|
||
| if span.is_recording(): | ||
|
|
@@ -631,6 +637,8 @@ def __init__( | |
| self.choice_buffers = [] | ||
| self._started = False | ||
| self.capture_content = capture_content | ||
| self._first_token_received = False | ||
| self._first_token_time: Optional[float] = None | ||
| self._setup() | ||
|
|
||
| def _setup(self): | ||
|
|
@@ -752,8 +760,25 @@ def process_chunk(self, chunk): | |
| self.set_response_model(chunk) | ||
| self.set_response_service_tier(chunk) | ||
| self.build_streaming_response(chunk) | ||
| self._detect_first_token(chunk) | ||
| self.set_usage(chunk) | ||
|
|
||
| def _detect_first_token(self, chunk): | ||
| if self._first_token_received: | ||
| return | ||
| if getattr(chunk, "choices", None) is None: | ||
| return | ||
| for choice in chunk.choices: | ||
| if not choice.delta: | ||
| continue | ||
| if ( | ||
| choice.delta.content is not None | ||
| or choice.delta.tool_calls is not None | ||
| ): | ||
| self._first_token_received = True | ||
| self._first_token_time = default_timer() | ||
| return | ||
|
|
||
| def __getattr__(self, name): | ||
| return getattr(self.stream, name) | ||
|
|
||
|
|
@@ -777,10 +802,16 @@ def __init__( | |
| span: Span, | ||
| logger: Logger, | ||
| capture_content: bool, | ||
| instruments: Optional[Instruments] = None, | ||
| start_time: Optional[float] = None, | ||
| request_attributes: Optional[dict] = None, | ||
| ): | ||
| super().__init__(stream, capture_content=capture_content) | ||
| self.span = span | ||
| self.logger = logger | ||
| self._instruments = instruments | ||
| self._start_time = start_time | ||
| self._request_attributes = request_attributes or {} | ||
|
|
||
| def cleanup(self, error: Optional[BaseException] = None): | ||
| if not self._started: | ||
|
|
@@ -863,9 +894,43 @@ def cleanup(self, error: Optional[BaseException] = None): | |
| if error: | ||
| handle_span_exception(self.span, error) | ||
| else: | ||
| self._record_ttft() | ||
| self.span.end() | ||
| self._started = False | ||
|
|
||
| def _record_ttft(self): | ||
| if ( | ||
| self._instruments is None | ||
| or self._start_time is None | ||
| or self._first_token_time is None | ||
| ): | ||
| return | ||
| ttft = max(self._first_token_time - self._start_time, 0.0) | ||
|
Comment on lines
+901
to
+908
|
||
| common_attributes = { | ||
| GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value, | ||
| GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value, | ||
| } | ||
| if GenAIAttributes.GEN_AI_REQUEST_MODEL in self._request_attributes: | ||
| common_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] = ( | ||
| self._request_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] | ||
| ) | ||
| if self.response_model: | ||
| common_attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] = ( | ||
| self.response_model | ||
| ) | ||
| if ServerAttributes.SERVER_ADDRESS in self._request_attributes: | ||
| common_attributes[ServerAttributes.SERVER_ADDRESS] = ( | ||
| self._request_attributes[ServerAttributes.SERVER_ADDRESS] | ||
| ) | ||
| if ServerAttributes.SERVER_PORT in self._request_attributes: | ||
| common_attributes[ServerAttributes.SERVER_PORT] = ( | ||
| self._request_attributes[ServerAttributes.SERVER_PORT] | ||
| ) | ||
| self._instruments.ttfc_histogram.record( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should not happen in openai instrumentation, this logic is not openai specific and should live in utils |
||
| ttft, | ||
| attributes=common_attributes, | ||
| ) | ||
|
Comment on lines
+929
to
+932
|
||
|
|
||
|
|
||
| class ChatStreamWrapper(BaseStreamWrapper): | ||
| handler: TelemetryHandler | ||
|
|
@@ -941,6 +1006,15 @@ def cleanup(self, error: Optional[BaseException] = None): | |
| }, | ||
| ) | ||
|
|
||
| if ( | ||
| self._first_token_time is not None | ||
| and self.invocation.monotonic_start_s is not None | ||
| ): | ||
| self.invocation.time_to_first_token_s = max( | ||
| self._first_token_time - self.invocation.monotonic_start_s, | ||
| 0.0, | ||
| ) | ||
|
|
||
| self._set_output_messages() | ||
|
|
||
| if error: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| import pytest | ||
| from tests.test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT | ||
|
|
||
| from opentelemetry.semconv._incubating.attributes import ( | ||
| gen_ai_attributes as GenAIAttributes, | ||
| ) | ||
| from opentelemetry.semconv._incubating.attributes import ( | ||
| server_attributes as ServerAttributes, | ||
| ) | ||
| from opentelemetry.util.genai.instruments import ( | ||
| GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK, | ||
| _GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS, | ||
| get_metric_data_points, | ||
| ) | ||
|
Comment on lines
+10
to
+14
|
||
|
|
||
|
|
||
| def test_streaming_chat_records_ttft_metric( | ||
| metric_reader, openai_client, instrument_with_content, vcr | ||
| ): | ||
| """TTFT metric is recorded for streaming chat completions.""" | ||
| with vcr.use_cassette("test_chat_completion_streaming.yaml"): | ||
| response = openai_client.chat.completions.create( | ||
| model=DEFAULT_MODEL, | ||
| messages=USER_ONLY_PROMPT, | ||
| stream=True, | ||
| stream_options={"include_usage": True}, | ||
| ) | ||
| for _ in response: | ||
| pass | ||
|
|
||
| data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) | ||
| assert len(data_points) == 1, ( | ||
| "expected exactly one TTFC data point for streaming" | ||
| ) | ||
|
|
||
| data_point = data_points[0] | ||
| assert data_point.sum >= 0 | ||
| assert data_point.count == 1 | ||
| assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS) | ||
|
|
||
| assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes | ||
| assert ( | ||
| data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME] | ||
| == GenAIAttributes.GenAiOperationNameValues.CHAT.value | ||
| ) | ||
| assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes | ||
| assert ( | ||
| data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] | ||
| == "gpt-4o-mini" | ||
| ) | ||
| assert ServerAttributes.SERVER_ADDRESS in data_point.attributes | ||
|
|
||
|
|
||
| @pytest.mark.asyncio() | ||
| async def test_async_streaming_chat_records_ttft_metric( | ||
| metric_reader, async_openai_client, instrument_with_content, vcr | ||
| ): | ||
| """TTFT metric is recorded for async streaming chat completions.""" | ||
| with vcr.use_cassette("test_async_chat_completion_streaming.yaml"): | ||
| response = await async_openai_client.chat.completions.create( | ||
| model=DEFAULT_MODEL, | ||
| messages=USER_ONLY_PROMPT, | ||
| stream=True, | ||
| stream_options={"include_usage": True}, | ||
| ) | ||
| async for _ in response: | ||
| pass | ||
|
|
||
| data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) | ||
| assert len(data_points) == 1, ( | ||
| "expected exactly one TTFC data point for async streaming" | ||
| ) | ||
|
|
||
| data_point = data_points[0] | ||
| assert data_point.sum >= 0 | ||
| assert data_point.count == 1 | ||
| assert data_point.explicit_bounds == tuple(_GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK_BUCKETS) | ||
|
|
||
|
|
||
| def test_non_streaming_chat_does_not_record_ttft_metric( | ||
| metric_reader, openai_client, instrument_with_content, vcr | ||
| ): | ||
| """TTFT metric should NOT be recorded for non-streaming requests.""" | ||
| with vcr.use_cassette("test_chat_completion_metrics.yaml"): | ||
| openai_client.chat.completions.create( | ||
| messages=USER_ONLY_PROMPT, model=DEFAULT_MODEL, stream=False | ||
| ) | ||
|
|
||
| data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) | ||
| assert len(data_points) == 0, ( | ||
| "gen_ai.client.operation.time_to_first_chunk metric should not be recorded for non-streaming" | ||
| ) | ||
|
|
||
|
|
||
| def test_streaming_tool_calls_records_ttft_metric( | ||
| metric_reader, openai_client, instrument_with_content, vcr | ||
| ): | ||
| """TTFT metric is recorded for streaming responses with tool calls.""" | ||
| with vcr.use_cassette( | ||
| "test_chat_completion_multiple_tools_streaming_with_content.yaml" | ||
| ): | ||
| response = openai_client.chat.completions.create( | ||
| model=DEFAULT_MODEL, | ||
| messages=[{"role": "user", "content": "What's the weather?"}], | ||
| stream=True, | ||
| stream_options={"include_usage": True}, | ||
| tools=[ | ||
| { | ||
| "type": "function", | ||
| "function": { | ||
| "name": "get_weather", | ||
| "parameters": { | ||
| "type": "object", | ||
| "properties": { | ||
| "location": {"type": "string"}, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
| ], | ||
| ) | ||
| for _ in response: | ||
| pass | ||
|
|
||
| data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_OPERATION_TIME_TO_FIRST_CHUNK) | ||
| assert len(data_points) == 1, ( | ||
| "expected exactly one TTFC data point for streaming tool calls" | ||
| ) | ||
|
|
||
| data_point = data_points[0] | ||
| assert data_point.sum >= 0 | ||
| assert data_point.count == 1 | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -113,6 +113,8 @@ def __init__( # pylint: disable=too-many-locals | |||||||||
| self.seed = seed | ||||||||||
| self.server_address = server_address | ||||||||||
| self.server_port = server_port | ||||||||||
| self.time_to_first_token_s: float | None = None | ||||||||||
| """Time to first token in seconds (streaming responses only).""" | ||||||||||
|
Comment on lines
+116
to
+117
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| self._start() | ||||||||||
|
|
||||||||||
| def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]: | ||||||||||
|
|
@@ -283,6 +285,8 @@ class LLMInvocation: | |||||||||
| seed: int | None = None | ||||||||||
| server_address: str | None = None | ||||||||||
| server_port: int | None = None | ||||||||||
| time_to_first_token_s: float | None = None | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @eternalcuriouslearner if I remember correctly you were exploring having common streaming helpers - I imagine if we had them in utils, we wouldn't need instrumentation libs to provide this and would populate it through that common code. WDYT?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lmolkova I have a dumb question. I am assuming this attribute is going to available once after move to OpenTelemetry Semantic Conventions v1.38.0. Do we really need this pr?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know where 1.38.0 came from, time-to-first chunk metric was added in upcoming 1.41.0 (not released yet) and there is more coming in open-telemetry/semantic-conventions#3607, but I agree with you that stream helpers would be a better design choice. Also give that open-telemetry/semantic-conventions#3607 is not merged yet, I think it would be best to close this PR.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see #3607 landed and #4443 is merged too, so once #4274 goes in and @eternalcuriouslearner moves the streaming helpers into utils, I can rebase this to plug the TTFT metric into that shared infrastructure instead of having it in the openai instrumentation directly. I'll keep this open for now and rework it once the streaming helpers are in place. Please let me know @lmolkova |
||||||||||
| """Time to first token in seconds (streaming responses only).""" | ||||||||||
|
|
||||||||||
| _inference_invocation: InferenceInvocation | None = field( | ||||||||||
| default=None, init=False, repr=False | ||||||||||
|
|
@@ -347,6 +351,7 @@ def _sync_to_invocation(self) -> None: | |||||||||
| inv.server_port = self.server_port | ||||||||||
| inv.attributes = self.attributes | ||||||||||
| inv.metric_attributes = self.metric_attributes | ||||||||||
| inv.time_to_first_token_s = self.time_to_first_token_s | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def span(self) -> Span: | ||||||||||
|
|
@@ -356,3 +361,10 @@ def span(self) -> Span: | |||||||||
| if self._inference_invocation is not None | ||||||||||
| else INVALID_SPAN | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def monotonic_start_s(self) -> float | None: | ||||||||||
| """Monotonic start time, delegated from the underlying InferenceInvocation.""" | ||||||||||
| if self._inference_invocation is not None: | ||||||||||
| return self._inference_invocation._monotonic_start_s | ||||||||||
| return None | ||||||||||
|
Comment on lines
+365
to
+370
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TTFC is specified as time to the first output chunk received, but this detection waits until a chunk contains
delta.contentordelta.tool_calls. For OpenAI streaming, the first chunk can be role-only (e.g.,delta.role='assistant'), so this will systematically over-measure TTFC. Record the timestamp on the first received streaming chunk (e.g., first chunk with anychoicesentry / anydeltapresent), not only when content/tool_calls appear.