Skip to content

Commit ada75a7

Browse files
dspatoulasxrmx
andauthored
fix: Update PsycopgInstrumentor.instrument_connection to use async cursor factory (#3956)
* Fix instrument_connection to use async cursor factory for `AsyncConnection` * Update tests for psycopg async cursor factory change * Add `psycopg` async cursor fix to CHANGELOG.md * Fix pylint error for `MockAsyncConnection.cursor` * Update CHANGELOG.md --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 97b762f commit ada75a7

3 files changed

Lines changed: 63 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424
([#3945](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3945))
2525
- `opentelemetry-util-http`, `opentelemetry-instrumentation-requests`, `opentelemetry-instrumentation-wsgi`, `opentelemetry-instrumentation-asgi`: normalize byte-valued user-agent headers before detecting synthetic sources so attributes are recorded reliably.
2626
([#4001](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/4001))
27+
- `opentelemetry-instrumentation-psycopg`: Fix `instrument_connection` method to use `_new_cursor_async_factory` on async connections.
28+
([#3956](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3956))
2729

2830
## Version 1.39.0/0.60b0 (2025-12-03)
2931

instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,14 @@ def instrument_connection(
268268
setattr(
269269
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
270270
)
271-
connection.cursor_factory = _new_cursor_factory(
272-
tracer_provider=tracer_provider
273-
)
271+
if isinstance(connection, psycopg.AsyncConnection):
272+
connection.cursor_factory = _new_cursor_async_factory(
273+
tracer_provider=tracer_provider
274+
)
275+
else:
276+
connection.cursor_factory = _new_cursor_factory(
277+
tracer_provider=tracer_provider
278+
)
274279
connection._is_instrumented_by_opentelemetry = True
275280
else:
276281
_logger.warning(

instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use
9999
return {"dbname": "test"}
100100

101101

102-
class MockAsyncConnection:
102+
class MockAsyncConnection(psycopg.AsyncConnection):
103103
commit = mock.MagicMock(spec=types.MethodType)
104104
commit.__name__ = "commit"
105105

@@ -113,11 +113,11 @@ def __init__(self, *args, **kwargs):
113113
async def connect(*args, **kwargs):
114114
return MockAsyncConnection(**kwargs)
115115

116-
def cursor(self):
116+
def cursor(self, *args, **kwargs):
117117
if self.cursor_factory:
118118
cur = self.cursor_factory(self)
119119
return cur
120-
return MockAsyncCursor()
120+
return MockAsyncCursor(*args, **kwargs)
121121

122122
def execute(self, query, params=None, *, prepare=None, binary=False):
123123
cur = self.cursor()
@@ -178,6 +178,8 @@ def test_instrumentor(self):
178178

179179
cnx = psycopg.connect(database="test")
180180

181+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
182+
181183
cursor = cnx.cursor()
182184

183185
query = "SELECT * FROM test"
@@ -209,6 +211,8 @@ def test_instrumentor_with_connection_class(self):
209211

210212
cnx = psycopg.Connection.connect(database="test")
211213

214+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
215+
212216
cursor = cnx.cursor()
213217

214218
query = "SELECT * FROM test"
@@ -239,6 +243,7 @@ def test_span_name(self):
239243

240244
cnx = psycopg.connect(database="test")
241245

246+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
242247
cursor = cnx.cursor()
243248

244249
cursor.execute("Test query", ("param1Value", False))
@@ -267,6 +272,7 @@ def test_span_name(self):
267272
def test_span_params_attribute(self):
268273
PsycopgInstrumentor().instrument(capture_parameters=True)
269274
cnx = psycopg.connect(database="test")
275+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
270276
query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s"
271277
params = ("test", 42)
272278

@@ -311,6 +317,7 @@ def test_custom_tracer_provider(self):
311317
PsycopgInstrumentor().instrument(tracer_provider=tracer_provider)
312318

313319
cnx = psycopg.connect(database="test")
320+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
314321
cursor = cnx.cursor()
315322
query = "SELECT * FROM test"
316323
cursor.execute(query)
@@ -332,6 +339,9 @@ def test_instrument_connection(self):
332339
self.assertEqual(len(spans_list), 0)
333340

334341
cnx = PsycopgInstrumentor().instrument_connection(cnx)
342+
343+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
344+
335345
cursor = cnx.cursor()
336346
cursor.execute(query)
337347

@@ -350,6 +360,7 @@ def test_instrument_connection_with_instrument(self):
350360

351361
PsycopgInstrumentor().instrument()
352362
cnx = PsycopgInstrumentor().instrument_connection(cnx)
363+
self.assertTrue(issubclass(cnx.cursor_factory, MockCursor))
353364
cursor = cnx.cursor()
354365
cursor.execute(query)
355366

@@ -422,6 +433,9 @@ async def test_wrap_async_connection_class_with_cursor(self):
422433
async def test_async_connection():
423434
acnx = await psycopg.AsyncConnection.connect("test")
424435
async with acnx as cnx:
436+
self.assertTrue(
437+
issubclass(cnx.cursor_factory, MockAsyncCursor)
438+
)
425439
async with cnx.cursor() as cursor:
426440
await cursor.execute("SELECT * FROM test")
427441

@@ -450,6 +464,9 @@ async def test_instrumentor_with_async_connection_class(self):
450464
async def test_async_connection():
451465
acnx = await psycopg.AsyncConnection.connect("test")
452466
async with acnx as cnx:
467+
self.assertTrue(
468+
issubclass(cnx.cursor_factory, MockAsyncCursor)
469+
)
453470
await cnx.execute("SELECT * FROM test")
454471

455472
await test_async_connection()
@@ -474,6 +491,7 @@ async def test_span_name_async(self):
474491
PsycopgInstrumentor().instrument()
475492

476493
cnx = await psycopg.AsyncConnection.connect("test")
494+
self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor))
477495
async with cnx.cursor() as cursor:
478496
await cursor.execute("Test query", ("param1Value", False))
479497
await cursor.execute(
@@ -500,6 +518,7 @@ async def test_span_name_async(self):
500518
async def test_span_params_attribute(self):
501519
PsycopgInstrumentor().instrument(capture_parameters=True)
502520
cnx = await psycopg.AsyncConnection.connect("test")
521+
self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor))
503522
query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s"
504523
params = ("test", 42)
505524
async with cnx.cursor() as cursor:
@@ -543,6 +562,7 @@ async def test_tracing_is_async(self):
543562

544563
async def test_async_connection():
545564
acnx = await psycopg.AsyncConnection.connect("test")
565+
self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor))
546566
async with acnx as cnx:
547567
async with cnx.cursor() as cursor:
548568
await cursor.execute("SELECT * FROM test", delay=delay)
@@ -557,3 +577,33 @@ async def test_async_connection():
557577
self.assertGreater(duration, delay * 1e9)
558578

559579
PsycopgInstrumentor().uninstrument()
580+
581+
async def test_instrument_connection_uses_async_cursor_factory(self):
582+
query = b"SELECT * FROM test"
583+
584+
acnx = await psycopg.AsyncConnection.connect("test")
585+
async with acnx:
586+
await acnx.execute(query)
587+
588+
spans_list = self.memory_exporter.get_finished_spans()
589+
self.assertEqual(len(spans_list), 0)
590+
591+
acnx = PsycopgInstrumentor().instrument_connection(acnx)
592+
593+
self.assertTrue(acnx._is_instrumented_by_opentelemetry)
594+
595+
# The new cursor_factory should be a subclass of MockAsyncCursor,
596+
# the async traced cursor factory returned by _new_cursor_async_factory
597+
self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor))
598+
599+
cursor = acnx.cursor()
600+
await cursor.execute(query)
601+
602+
spans_list = self.memory_exporter.get_finished_spans()
603+
self.assertEqual(len(spans_list), 1)
604+
span = spans_list[0]
605+
606+
# Check version and name in span's instrumentation info
607+
self.assertEqualSpanInstrumentationScope(
608+
span, opentelemetry.instrumentation.psycopg
609+
)

0 commit comments

Comments
 (0)