Skip to content

Commit 40ac7e3

Browse files
committed
feat(openai): Add gen_ai.client.time_to_first_token metric for streaming
Fixes #3932
1 parent 7f107df commit 40ac7e3

6 files changed

Lines changed: 278 additions & 3 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/instruments.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from opentelemetry.metrics import Histogram, Meter
22
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
3+
from opentelemetry.util.genai.instruments import (
4+
GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN,
5+
GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS,
6+
)
37

48
_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
59
0.01,
@@ -50,3 +54,9 @@ def __init__(self, meter: Meter):
5054
unit="{token}",
5155
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
5256
)
57+
self.ttft_histogram: Histogram = meter.create_histogram(
58+
name=GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN,
59+
description="Time to generate first token for successful responses",
60+
unit="s",
61+
explicit_bucket_boundaries_advisory=GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS,
62+
)

instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/patch.py

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ def traced_method(wrapped, instance, args, kwargs):
9090
parsed_result = result
9191
if is_streaming(kwargs):
9292
return LegacyChatStreamWrapper(
93-
parsed_result, span, logger, capture_content
93+
parsed_result, span, logger, capture_content,
94+
instruments=instruments,
95+
start_time=start,
96+
request_attributes=span_attributes,
9497
)
9598

9699
if span.is_recording():
@@ -195,7 +198,10 @@ async def traced_method(wrapped, instance, args, kwargs):
195198
parsed_result = result
196199
if is_streaming(kwargs):
197200
return LegacyChatStreamWrapper(
198-
parsed_result, span, logger, capture_content
201+
parsed_result, span, logger, capture_content,
202+
instruments=instruments,
203+
start_time=start,
204+
request_attributes=span_attributes,
199205
)
200206

201207
if span.is_recording():
@@ -631,6 +637,8 @@ def __init__(
631637
self.choice_buffers = []
632638
self._started = False
633639
self.capture_content = capture_content
640+
self._first_token_received = False
641+
self._first_token_time: Optional[float] = None
634642
self._setup()
635643

636644
def _setup(self):
@@ -752,8 +760,25 @@ def process_chunk(self, chunk):
752760
self.set_response_model(chunk)
753761
self.set_response_service_tier(chunk)
754762
self.build_streaming_response(chunk)
763+
self._detect_first_token(chunk)
755764
self.set_usage(chunk)
756765

766+
def _detect_first_token(self, chunk):
767+
if self._first_token_received:
768+
return
769+
if getattr(chunk, "choices", None) is None:
770+
return
771+
for choice in chunk.choices:
772+
if not choice.delta:
773+
continue
774+
if (
775+
choice.delta.content is not None
776+
or choice.delta.tool_calls is not None
777+
):
778+
self._first_token_received = True
779+
self._first_token_time = default_timer()
780+
return
781+
757782
def __getattr__(self, name):
758783
return getattr(self.stream, name)
759784

@@ -777,10 +802,16 @@ def __init__(
777802
span: Span,
778803
logger: Logger,
779804
capture_content: bool,
805+
instruments: Optional[Instruments] = None,
806+
start_time: Optional[float] = None,
807+
request_attributes: Optional[dict] = None,
780808
):
781809
super().__init__(stream, capture_content=capture_content)
782810
self.span = span
783811
self.logger = logger
812+
self._instruments = instruments
813+
self._start_time = start_time
814+
self._request_attributes = request_attributes or {}
784815

785816
def cleanup(self, error: Optional[BaseException] = None):
786817
if not self._started:
@@ -863,9 +894,43 @@ def cleanup(self, error: Optional[BaseException] = None):
863894
if error:
864895
handle_span_exception(self.span, error)
865896
else:
897+
self._record_ttft()
866898
self.span.end()
867899
self._started = False
868900

901+
def _record_ttft(self):
902+
if (
903+
self._instruments is None
904+
or self._start_time is None
905+
or self._first_token_time is None
906+
):
907+
return
908+
ttft = max(self._first_token_time - self._start_time, 0.0)
909+
common_attributes = {
910+
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value,
911+
GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.OPENAI.value,
912+
}
913+
if GenAIAttributes.GEN_AI_REQUEST_MODEL in self._request_attributes:
914+
common_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL] = (
915+
self._request_attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
916+
)
917+
if self.response_model:
918+
common_attributes[GenAIAttributes.GEN_AI_RESPONSE_MODEL] = (
919+
self.response_model
920+
)
921+
if ServerAttributes.SERVER_ADDRESS in self._request_attributes:
922+
common_attributes[ServerAttributes.SERVER_ADDRESS] = (
923+
self._request_attributes[ServerAttributes.SERVER_ADDRESS]
924+
)
925+
if ServerAttributes.SERVER_PORT in self._request_attributes:
926+
common_attributes[ServerAttributes.SERVER_PORT] = (
927+
self._request_attributes[ServerAttributes.SERVER_PORT]
928+
)
929+
self._instruments.ttft_histogram.record(
930+
ttft,
931+
attributes=common_attributes,
932+
)
933+
869934

870935
class ChatStreamWrapper(BaseStreamWrapper):
871936
handler: TelemetryHandler
@@ -941,6 +1006,15 @@ def cleanup(self, error: Optional[BaseException] = None):
9411006
},
9421007
)
9431008

1009+
if (
1010+
self._first_token_time is not None
1011+
and self.invocation.monotonic_start_s is not None
1012+
):
1013+
self.invocation.time_to_first_token_s = max(
1014+
self._first_token_time - self.invocation.monotonic_start_s,
1015+
0.0,
1016+
)
1017+
9441018
self._set_output_messages()
9451019

9461020
if error:
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import pytest
2+
from tests.test_utils import DEFAULT_MODEL, USER_ONLY_PROMPT
3+
4+
from opentelemetry.semconv._incubating.attributes import (
5+
gen_ai_attributes as GenAIAttributes,
6+
)
7+
from opentelemetry.semconv._incubating.attributes import (
8+
server_attributes as ServerAttributes,
9+
)
10+
from opentelemetry.util.genai.instruments import (
11+
GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN,
12+
GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS,
13+
get_metric_data_points,
14+
)
15+
from opentelemetry.util.genai.utils import is_experimental_mode
16+
17+
18+
def test_streaming_chat_records_ttft_metric(
19+
metric_reader, openai_client, instrument_with_content, vcr
20+
):
21+
"""TTFT metric is recorded for streaming chat completions."""
22+
with vcr.use_cassette("test_chat_completion_streaming.yaml"):
23+
response = openai_client.chat.completions.create(
24+
model=DEFAULT_MODEL,
25+
messages=USER_ONLY_PROMPT,
26+
stream=True,
27+
stream_options={"include_usage": True},
28+
)
29+
for _ in response:
30+
pass
31+
32+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN)
33+
assert len(data_points) == 1, (
34+
"expected exactly one TTFT data point for streaming"
35+
)
36+
37+
data_point = data_points[0]
38+
assert data_point.sum >= 0
39+
assert data_point.count == 1
40+
assert data_point.explicit_bounds == tuple(GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS)
41+
42+
latest_experimental_enabled = is_experimental_mode()
43+
assert GenAIAttributes.GEN_AI_OPERATION_NAME in data_point.attributes
44+
assert (
45+
data_point.attributes[GenAIAttributes.GEN_AI_OPERATION_NAME]
46+
== GenAIAttributes.GenAiOperationNameValues.CHAT.value
47+
)
48+
assert GenAIAttributes.GEN_AI_REQUEST_MODEL in data_point.attributes
49+
assert (
50+
data_point.attributes[GenAIAttributes.GEN_AI_REQUEST_MODEL]
51+
== "gpt-4o-mini"
52+
)
53+
assert ServerAttributes.SERVER_ADDRESS in data_point.attributes
54+
55+
56+
@pytest.mark.asyncio()
57+
async def test_async_streaming_chat_records_ttft_metric(
58+
metric_reader, async_openai_client, instrument_with_content, vcr
59+
):
60+
"""TTFT metric is recorded for async streaming chat completions."""
61+
with vcr.use_cassette("test_async_chat_completion_streaming.yaml"):
62+
response = await async_openai_client.chat.completions.create(
63+
model=DEFAULT_MODEL,
64+
messages=USER_ONLY_PROMPT,
65+
stream=True,
66+
stream_options={"include_usage": True},
67+
)
68+
async for _ in response:
69+
pass
70+
71+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN)
72+
assert len(data_points) == 1, (
73+
"expected exactly one TTFT data point for async streaming"
74+
)
75+
76+
data_point = data_points[0]
77+
assert data_point.sum >= 0
78+
assert data_point.count == 1
79+
assert data_point.explicit_bounds == tuple(GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS)
80+
81+
82+
def test_non_streaming_chat_does_not_record_ttft_metric(
83+
metric_reader, openai_client, instrument_with_content, vcr
84+
):
85+
"""TTFT metric should NOT be recorded for non-streaming requests."""
86+
with vcr.use_cassette("test_chat_completion_metrics.yaml"):
87+
openai_client.chat.completions.create(
88+
messages=USER_ONLY_PROMPT, model=DEFAULT_MODEL, stream=False
89+
)
90+
91+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN)
92+
assert len(data_points) == 0, (
93+
"gen_ai.client.time_to_first_token metric should not be recorded for non-streaming"
94+
)
95+
96+
97+
def test_streaming_tool_calls_records_ttft_metric(
98+
metric_reader, openai_client, instrument_with_content, vcr
99+
):
100+
"""TTFT metric is recorded for streaming responses with tool calls."""
101+
with vcr.use_cassette(
102+
"test_chat_completion_multiple_tools_streaming_with_content.yaml"
103+
):
104+
response = openai_client.chat.completions.create(
105+
model=DEFAULT_MODEL,
106+
messages=[{"role": "user", "content": "What's the weather?"}],
107+
stream=True,
108+
stream_options={"include_usage": True},
109+
tools=[
110+
{
111+
"type": "function",
112+
"function": {
113+
"name": "get_weather",
114+
"parameters": {
115+
"type": "object",
116+
"properties": {
117+
"location": {"type": "string"},
118+
},
119+
},
120+
},
121+
}
122+
],
123+
)
124+
for _ in response:
125+
pass
126+
127+
data_points = get_metric_data_points(metric_reader, GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN)
128+
assert len(data_points) == 1, (
129+
"expected exactly one TTFT data point for streaming tool calls"
130+
)
131+
132+
data_point = data_points[0]
133+
assert data_point.sum >= 0
134+
assert data_point.count == 1

util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,27 @@
3535
67108864,
3636
]
3737

38+
GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN = "gen_ai.client.time_to_first_token"
39+
40+
GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS = [
41+
0.001,
42+
0.005,
43+
0.01,
44+
0.02,
45+
0.04,
46+
0.06,
47+
0.08,
48+
0.1,
49+
0.25,
50+
0.5,
51+
0.75,
52+
1.0,
53+
2.5,
54+
5.0,
55+
7.5,
56+
10.0,
57+
]
58+
3859

3960
def create_duration_histogram(meter: Meter) -> Histogram:
4061
return meter.create_histogram(
@@ -52,3 +73,25 @@ def create_token_histogram(meter: Meter) -> Histogram:
5273
unit="{token}",
5374
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
5475
)
76+
77+
78+
def create_ttft_histogram(meter: Meter) -> Histogram:
79+
return meter.create_histogram(
80+
name=GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN,
81+
description="Time to generate first token for successful responses",
82+
unit="s",
83+
explicit_bucket_boundaries_advisory=GEN_AI_CLIENT_TIME_TO_FIRST_TOKEN_BUCKETS,
84+
)
85+
86+
87+
def get_metric_data_points(metric_reader, metric_name):
88+
"""Extract all data points for a given metric name from a metric reader."""
89+
results = []
90+
metrics = metric_reader.get_metrics_data().resource_metrics
91+
if not metrics:
92+
return results
93+
for scope_metrics in metrics[0].scope_metrics:
94+
for m in scope_metrics.metrics:
95+
if m.name == metric_name:
96+
results.extend(m.data.data_points)
97+
return results

util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from opentelemetry.util.genai.instruments import (
1818
create_duration_histogram,
1919
create_token_histogram,
20+
create_ttft_histogram,
2021
)
2122
from opentelemetry.util.genai.types import LLMInvocation
2223
from opentelemetry.util.types import AttributeValue
@@ -28,6 +29,7 @@ class InvocationMetricsRecorder:
2829
def __init__(self, meter: Meter):
2930
self._duration_histogram: Histogram = create_duration_histogram(meter)
3031
self._token_histogram: Histogram = create_token_histogram(meter)
32+
self._ttft_histogram: Histogram = create_ttft_histogram(meter)
3133

3234
def record(
3335
self,
@@ -105,5 +107,12 @@ def record(
105107
context=span_context,
106108
)
107109

110+
if invocation.time_to_first_token_s is not None and not error_type:
111+
self._ttft_histogram.record(
112+
invocation.time_to_first_token_s,
113+
attributes=attributes,
114+
context=span_context,
115+
)
116+
108117

109118
__all__ = ["InvocationMetricsRecorder"]

util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,12 @@ class LLMInvocation(GenAIInvocation):
335335
seed: int | None = None
336336
server_address: str | None = None
337337
server_port: int | None = None
338-
338+
time_to_first_token_s: float | None = None
339+
"""
340+
Time to first token in seconds. This is the time from the start of
341+
the request (monotonic_start_s) to when the first output token is
342+
received. Only populated for streaming responses.
343+
"""
339344

340345
@dataclass
341346
class EmbeddingInvocation(GenAIInvocation):

0 commit comments

Comments
 (0)