Skip to content

Commit 77373f8

Browse files
bisgaard-itisthpiercexrmxtammy-baylis-swi
authored
Support passing tracer provider in aiohttp server instrumentation lib (#3819)
* allow to pass tracer_provider when instrumenting aiohttp server * minor fix * add note to indicate backwards compatibility * small fix * add test * update changelog * Only decode JSON input buffer in Anthropic Claude streaming (#3875) * Only decode JSON input buffer in Anthropic Claude streaming _decode_tool_use was only used when _tool_json_input_buf was found, but we were decoding the entire _content_block after adding _tool_json_input_buf to it. The _content_block overall which could contain non-JSON elements (e.g. {}), causing failures. To fix this, we have removed _decode_tool_use helper function and inlined JSON decoding logic directly into content_block_stop handler in _process_anthropic_claude_chunk, where we only use it to decode _tool_json_input_buf before appending to _content_block. * Update test_botocore_bedrock.py Fix lint: `opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py:2990:4: C0415: Import outside toplevel (opentelemetry.instrumentation.botocore.extensions.bedrock_utils.InvokeModelWithResponseStreamWrapper) (import-outside-toplevel)` * Update test_botocore_bedrock.py Remove extra line * fix lint issue * aiohttp-client: add support for url exclusions (#3850) * Stop using deprecated span.instrumentation_info * aiohttp-client: add support for OTEL_PYTHON_EXCLUDED_URLS / OTEL_PYTHON_HTTPX_EXCLUDED_URLS * Add docs * Add changelog * Please lint * Update CHANGELOG.md Co-authored-by: Tammy Baylis <96076570+tammy-baylis-swi@users.noreply.github.com> * Test for both env vars * Assert at each iteration --------- Co-authored-by: Tammy Baylis <96076570+tammy-baylis-swi@users.noreply.github.com> * update changelog * @tammy-baylis-swi add link to PR * create factory for instrumented application * run precommit hooks * update readthedocs * cleanup changelog after merge conflicts * make pre-commit happy * @tammy-baylis-swi add comment about optional inputs * fix issues caused by merge * precommit hooks * fix typehints * sort imports * @xrmx removed assert * Apply suggestions from code review --------- Co-authored-by: Thomas Pierce <thp@amazon.com> Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com> Co-authored-by: Tammy Baylis <96076570+tammy-baylis-swi@users.noreply.github.com>
1 parent a986d25 commit 77373f8

3 files changed

Lines changed: 153 additions & 78 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
([#4006](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4006))
2222
- `opentelemetry-instrumentation-flask`: Add support for 3.1+ streaming responses
2323
([#3938](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3938))
24+
- `opentelemetry-instrumentation-aiohttp-server`: Support passing `TracerProvider` when instrumenting.
25+
([#3819](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3819))
2426

2527
### Fixed
2628

instrumentation/opentelemetry-instrumentation-aiohttp-server/src/opentelemetry/instrumentation/aiohttp_server/__init__.py

Lines changed: 95 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,14 @@
2525
from opentelemetry.instrumentation.aiohttp_server import (
2626
AioHttpServerInstrumentor
2727
)
28+
from opentelemetry.sdk.resources import Resource
29+
from opentelemetry.sdk.trace import TracerProvider
30+
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
2831
29-
AioHttpServerInstrumentor().instrument()
32+
# Optional: configure non-default TracerProvider, resource, sampler
33+
resource = Resource(attributes={"service.name": "my-aiohttp-service"})
34+
sampler = ParentBased(root=TraceIdRatioBased(rate=0.25)) # sample 25% of traces
35+
AioHttpServerInstrumentor().instrument(tracer_provider=TracerProvider(resource=resource, sampler=sampler))
3036
3137
async def hello(request):
3238
return web.Response(text="Hello, world")
@@ -387,73 +393,95 @@ def keys(self, carrier: dict) -> list:
387393
getter = AiohttpGetter()
388394

389395

390-
@web.middleware
391-
async def middleware(request, handler):
392-
"""Middleware for aiohttp implementing tracing logic"""
393-
if not is_http_instrumentation_enabled() or _excluded_urls.url_disabled(
394-
request.url.path
395-
):
396-
return await handler(request)
397-
398-
span_name = get_default_span_name(request)
399-
400-
request_attrs = collect_request_attributes(request)
401-
duration_attrs = _parse_duration_attrs(request_attrs)
402-
active_requests_count_attrs = _parse_active_request_count_attrs(
403-
request_attrs
396+
def create_aiohttp_middleware(
397+
tracer_provider: trace.TracerProvider | None = None,
398+
):
399+
_tracer = (
400+
tracer_provider.get_tracer(__name__, __version__)
401+
if tracer_provider
402+
else tracer
404403
)
405404

406-
duration_histogram = meter.create_histogram(
407-
name=MetricInstruments.HTTP_SERVER_DURATION,
408-
unit="ms",
409-
description="Measures the duration of inbound HTTP requests.",
410-
)
405+
@web.middleware
406+
async def _middleware(request, handler):
407+
"""Middleware for aiohttp implementing tracing logic"""
408+
if (
409+
not is_http_instrumentation_enabled()
410+
or _excluded_urls.url_disabled(request.url.path)
411+
):
412+
return await handler(request)
413+
414+
span_name = get_default_span_name(request)
415+
416+
request_attrs = collect_request_attributes(request)
417+
duration_attrs = _parse_duration_attrs(request_attrs)
418+
active_requests_count_attrs = _parse_active_request_count_attrs(
419+
request_attrs
420+
)
411421

412-
active_requests_counter = meter.create_up_down_counter(
413-
name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS,
414-
unit="requests",
415-
description="measures the number of concurrent HTTP requests those are currently in flight",
416-
)
422+
duration_histogram = meter.create_histogram(
423+
name=MetricInstruments.HTTP_SERVER_DURATION,
424+
unit="ms",
425+
description="Measures the duration of inbound HTTP requests.",
426+
)
417427

418-
with tracer.start_as_current_span(
419-
span_name,
420-
context=extract(request, getter=getter),
421-
kind=trace.SpanKind.SERVER,
422-
) as span:
423-
if span.is_recording():
424-
request_headers_attributes = collect_request_headers_attributes(
425-
request
426-
)
427-
request_attrs.update(request_headers_attributes)
428-
span.set_attributes(request_attrs)
429-
start = default_timer()
430-
active_requests_counter.add(1, active_requests_count_attrs)
431-
try:
432-
resp = await handler(request)
433-
set_status_code(span, resp.status)
428+
active_requests_counter = meter.create_up_down_counter(
429+
name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS,
430+
unit="requests",
431+
description="measures the number of concurrent HTTP requests those are currently in flight",
432+
)
433+
434+
with _tracer.start_as_current_span(
435+
span_name,
436+
context=extract(request, getter=getter),
437+
kind=trace.SpanKind.SERVER,
438+
) as span:
434439
if span.is_recording():
435-
response_headers_attributes = (
436-
collect_response_headers_attributes(resp)
440+
request_headers_attributes = (
441+
collect_request_headers_attributes(request)
437442
)
438-
span.set_attributes(response_headers_attributes)
439-
except web.HTTPException as ex:
440-
set_status_code(span, ex.status_code)
441-
raise
442-
finally:
443-
duration = max((default_timer() - start) * 1000, 0)
444-
duration_histogram.record(duration, duration_attrs)
445-
active_requests_counter.add(-1, active_requests_count_attrs)
446-
return resp
447-
448-
449-
class _InstrumentedApplication(web.Application):
450-
"""Insert tracing middleware"""
451-
452-
def __init__(self, *args, **kwargs):
453-
middlewares = kwargs.pop("middlewares", [])
454-
middlewares.insert(0, middleware)
455-
kwargs["middlewares"] = middlewares
456-
super().__init__(*args, **kwargs)
443+
request_attrs.update(request_headers_attributes)
444+
span.set_attributes(request_attrs)
445+
start = default_timer()
446+
active_requests_counter.add(1, active_requests_count_attrs)
447+
try:
448+
resp = await handler(request)
449+
set_status_code(span, resp.status)
450+
if span.is_recording():
451+
response_headers_attributes = (
452+
collect_response_headers_attributes(resp)
453+
)
454+
span.set_attributes(response_headers_attributes)
455+
except web.HTTPException as ex:
456+
set_status_code(span, ex.status_code)
457+
raise
458+
finally:
459+
duration = max((default_timer() - start) * 1000, 0)
460+
duration_histogram.record(duration, duration_attrs)
461+
active_requests_counter.add(-1, active_requests_count_attrs)
462+
return resp
463+
464+
return _middleware
465+
466+
467+
middleware = create_aiohttp_middleware() # for backwards compatibility
468+
469+
470+
def create_instrumented_application(
471+
tracer_provider: trace.TracerProvider | None = None,
472+
):
473+
_middleware = create_aiohttp_middleware(tracer_provider=tracer_provider)
474+
475+
class _InstrumentedApplication(web.Application):
476+
"""Insert tracing middleware"""
477+
478+
def __init__(self, *args, **kwargs):
479+
middlewares = kwargs.pop("middlewares", [])
480+
middlewares.insert(0, _middleware)
481+
kwargs["middlewares"] = middlewares
482+
super().__init__(*args, **kwargs)
483+
484+
return _InstrumentedApplication
457485

458486

459487
class AioHttpServerInstrumentor(BaseInstrumentor):
@@ -464,6 +492,7 @@ class AioHttpServerInstrumentor(BaseInstrumentor):
464492
"""
465493

466494
def _instrument(self, **kwargs):
495+
tracer_provider = kwargs.get("tracer_provider", None)
467496
# update global values at instrument time so we can test them
468497
global _excluded_urls # pylint: disable=global-statement
469498
_excluded_urls = get_excluded_urls("AIOHTTP_SERVER")
@@ -475,6 +504,10 @@ def _instrument(self, **kwargs):
475504
meter = metrics.get_meter(__name__, __version__)
476505

477506
self._original_app = web.Application
507+
508+
_InstrumentedApplication = create_instrumented_application(
509+
tracer_provider=tracer_provider
510+
)
478511
setattr(web, "Application", _InstrumentedApplication)
479512

480513
def _uninstrument(self, **kwargs):

instrumentation/opentelemetry-instrumentation-aiohttp-server/tests/test_aiohttp_server_integration.py

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
AioHttpServerInstrumentor,
2727
)
2828
from opentelemetry.instrumentation.utils import suppress_http_instrumentation
29+
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
2930
from opentelemetry.semconv._incubating.attributes.http_attributes import (
3031
HTTP_METHOD,
3132
HTTP_STATUS_CODE,
@@ -101,9 +102,9 @@ def fixture_suppress():
101102

102103
@pytest_asyncio.fixture(name="server_fixture")
103104
async def fixture_server_fixture(tracer, aiohttp_server, suppress):
104-
_, memory_exporter = tracer
105+
tracer_provider, memory_exporter = tracer
105106

106-
AioHttpServerInstrumentor().instrument()
107+
AioHttpServerInstrumentor().instrument(tracer_provider=tracer_provider)
107108

108109
app = aiohttp.web.Application()
109110
app.add_routes([aiohttp.web.get("/test-path", default_handler)])
@@ -228,20 +229,6 @@ async def handler(request):
228229
memory_exporter.clear()
229230

230231

231-
def _get_sorted_metrics(metrics_data):
232-
resource_metrics = metrics_data.resource_metrics if metrics_data else []
233-
234-
all_metrics = []
235-
for metrics in resource_metrics:
236-
for scope_metrics in metrics.scope_metrics:
237-
all_metrics.extend(scope_metrics.metrics)
238-
239-
return sorted(
240-
all_metrics,
241-
key=lambda m: m.name,
242-
)
243-
244-
245232
@pytest.mark.asyncio
246233
@pytest.mark.parametrize(
247234
"env_var",
@@ -281,6 +268,59 @@ async def handler(request):
281268
AioHttpServerInstrumentor().uninstrument()
282269

283270

271+
@pytest.mark.asyncio
272+
@pytest.mark.parametrize(
273+
"tracer",
274+
[
275+
TestBase().create_tracer_provider(
276+
sampler=ParentBased(TraceIdRatioBased(0.05))
277+
)
278+
],
279+
)
280+
async def test_non_global_tracer_provider(
281+
tracer,
282+
server_fixture,
283+
aiohttp_client,
284+
):
285+
n_requests = 1000
286+
collection_ratio = 0.05
287+
n_expected_trace_ids = n_requests * collection_ratio
288+
289+
_, memory_exporter = tracer
290+
server, _ = server_fixture
291+
292+
assert len(memory_exporter.get_finished_spans()) == 0
293+
294+
client = await aiohttp_client(server)
295+
for _ in range(n_requests):
296+
await client.get("/test-path")
297+
298+
trace_ids = {
299+
span.context.trace_id
300+
for span in memory_exporter.get_finished_spans()
301+
if span.context is not None
302+
}
303+
assert (
304+
0.5 * n_expected_trace_ids
305+
<= len(trace_ids)
306+
<= 1.5 * n_expected_trace_ids
307+
)
308+
309+
310+
def _get_sorted_metrics(metrics_data):
311+
resource_metrics = metrics_data.resource_metrics if metrics_data else []
312+
313+
all_metrics = []
314+
for metrics in resource_metrics:
315+
for scope_metrics in metrics.scope_metrics:
316+
all_metrics.extend(scope_metrics.metrics)
317+
318+
return sorted(
319+
all_metrics,
320+
key=lambda m: m.name,
321+
)
322+
323+
284324
@pytest.mark.asyncio
285325
async def test_custom_request_headers(tracer, aiohttp_server, monkeypatch):
286326
# pylint: disable=too-many-locals

0 commit comments

Comments
 (0)