diff --git a/CHANGELOG.md b/CHANGELOG.md index 28f32f3a96..bc59ecc56c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -131,6 +131,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4210](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4210)) - Add stale PR GitHub Action ([#4220](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4220)) +- `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-psycopg`: Add sqlcommenter support for `instrument_connection` + ([#4267](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4267/)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 28896be138..648466993d 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -72,6 +72,12 @@ from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor PsycopgInstrumentor().instrument(enable_commenter=True) + # OR with specific connection + cnx = psycopg.connect(database='Database') + instrumented_cnx = PsycopgInstrumentor().instrument_connection( + cnx, + enable_commenter=True, + ) SQLCommenter with commenter_options @@ -247,7 +253,11 @@ def _uninstrument(self, **kwargs: Any): # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql @staticmethod def instrument_connection( - connection: ConnectionT, tracer_provider: TracerProvider | None = None + connection: ConnectionT, + tracer_provider: TracerProvider | None = None, + enable_commenter: bool = False, + commenter_options: dict = None, + enable_attribute_commenter: bool = False, ) -> ConnectionT: """Enable instrumentation in a psycopg connection. @@ -257,6 +267,12 @@ def instrument_connection( tracer_provider: opentelemetry.trace.TracerProvider, optional The TracerProvider to use for instrumentation. If not provided, the global TracerProvider will be used. + enable_commenter: bool, optional + Optional flag to enable/disable sqlcommenter (default False). + commenter_options: dict, optional + Optional configurations for tags to be appended at the sql query. + enable_attribute_commenter: + Optional flag to enable/disable addition of sqlcomment to span attribute (default False). Requires enable_commenter=True. Returns: An instrumented psycopg connection object. @@ -270,11 +286,17 @@ def instrument_connection( ) if isinstance(connection, psycopg.AsyncConnection): connection.cursor_factory = _new_cursor_async_factory( - tracer_provider=tracer_provider + tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + enable_attribute_commenter=enable_attribute_commenter, ) else: connection.cursor_factory = _new_cursor_factory( - tracer_provider=tracer_provider + tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + enable_attribute_commenter=enable_attribute_commenter, ) connection._is_instrumented_by_opentelemetry = True else: @@ -362,6 +384,9 @@ def _new_cursor_factory( db_api: DatabaseApiIntegration | None = None, base_factory: type[psycopg.Cursor] | None = None, tracer_provider: TracerProvider | None = None, + enable_commenter: bool = False, + commenter_options: dict = None, + enable_attribute_commenter: bool = False, ): if not db_api: db_api = DatabaseApiIntegration( @@ -370,6 +395,10 @@ def _new_cursor_factory( connection_attributes=PsycopgInstrumentor._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + connect_module=psycopg, + enable_attribute_commenter=enable_attribute_commenter, ) base_factory = base_factory or psycopg.Cursor @@ -398,6 +427,9 @@ def _new_cursor_async_factory( db_api: DatabaseApiAsyncIntegration | None = None, base_factory: type[psycopg.AsyncCursor] | None = None, tracer_provider: TracerProvider | None = None, + enable_commenter: bool = False, + commenter_options: dict = None, + enable_attribute_commenter: bool = False, ): if not db_api: db_api = DatabaseApiAsyncIntegration( @@ -406,6 +438,10 @@ def _new_cursor_async_factory( connection_attributes=PsycopgInstrumentor._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + connect_module=psycopg, + enable_attribute_commenter=enable_attribute_commenter, ) base_factory = base_factory or psycopg.AsyncCursor _cursor_tracer = CursorTracer(db_api) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index 84d6709bbb..e9765ff88e 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -22,6 +22,9 @@ import opentelemetry.instrumentation.psycopg from opentelemetry.instrumentation.psycopg import PsycopgInstrumentor from opentelemetry.sdk import resources +from opentelemetry.semconv._incubating.attributes.db_attributes import ( + DB_STATEMENT, +) from opentelemetry.test.test_base import TestBase @@ -451,6 +454,152 @@ def test_sqlcommenter_enabled(self, event_mocked): kwargs = event_mocked.call_args[1] self.assertEqual(kwargs["enable_commenter"], True) + def test_sqlcommenter_enabled_instrument_connection_defaults(self): + with ( + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.__version__", + "foobar", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.pq.__build_version__", + "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.pq.version", + new=lambda: "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.threadsafety", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.apilevel", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.paramstyle", + "test", + ), + ): + cnx = psycopg.connect(database="test") + cnx = PsycopgInstrumentor().instrument_connection( + cnx, + enable_commenter=True, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + span_id = format(span.get_span_context().span_id, "016x") + trace_id = format(span.get_span_context().trace_id, "032x") + trace_flags = int(span.get_span_context().trace_flags) + self.assertEqual( + MockCursor.execute.call_args[0][0], + f"Select 1 /*db_driver='psycopg%%3Afoobar',dbapi_level='123',dbapi_threadsafety='123',driver_paramstyle='test',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + + def test_sqlcommenter_enabled_instrument_connection_stmt_enabled(self): + with ( + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.__version__", + "foobar", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.pq.__build_version__", + "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.pq.version", + new=lambda: "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.threadsafety", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.apilevel", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.paramstyle", + "test", + ), + ): + cnx = psycopg.connect(database="test") + cnx = PsycopgInstrumentor().instrument_connection( + cnx, + enable_commenter=True, + enable_attribute_commenter=True, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + span_id = format(span.get_span_context().span_id, "016x") + trace_id = format(span.get_span_context().trace_id, "032x") + trace_flags = int(span.get_span_context().trace_flags) + self.assertEqual( + MockCursor.execute.call_args[0][0], + f"Select 1 /*db_driver='psycopg%%3Afoobar',dbapi_level='123',dbapi_threadsafety='123',driver_paramstyle='test',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + self.assertEqual( + span.attributes[DB_STATEMENT], + f"Select 1 /*db_driver='psycopg%%3Afoobar',dbapi_level='123',dbapi_threadsafety='123',driver_paramstyle='test',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + + def test_sqlcommenter_enabled_instrument_connection_with_options(self): + with ( + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.__version__", + "foobar", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.pq.__build_version__", + "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.pq.version", + new=lambda: "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg.psycopg.threadsafety", + "123", + ), + ): + cnx = psycopg.connect(database="test") + cnx = PsycopgInstrumentor().instrument_connection( + cnx, + enable_commenter=True, + commenter_options={ + "dbapi_level": False, + "dbapi_threadsafety": True, + "driver_paramstyle": False, + "foo": "ignored", + }, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + span_id = format(span.get_span_context().span_id, "016x") + trace_id = format(span.get_span_context().trace_id, "032x") + trace_flags = int(span.get_span_context().trace_flags) + self.assertEqual( + MockCursor.execute.call_args[0][0], + f"Select 1 /*db_driver='psycopg%%3Afoobar',dbapi_threadsafety='123',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + @mock.patch("opentelemetry.instrumentation.dbapi.wrap_connect") def test_sqlcommenter_disabled(self, event_mocked): cnx = psycopg.connect(database="test") @@ -461,6 +610,45 @@ def test_sqlcommenter_disabled(self, event_mocked): kwargs = event_mocked.call_args[1] self.assertEqual(kwargs["enable_commenter"], False) + def test_sqlcommenter_disabled_default_instrument_connection(self): + cnx = psycopg.connect(database="test") + cnx = PsycopgInstrumentor().instrument_connection( + cnx, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + self.assertEqual( + MockCursor.execute.call_args[0][0], + "Select 1", + ) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + + def test_sqlcommenter_disabled_explicit_instrument_connection(self): + cnx = psycopg.connect(database="test") + cnx = PsycopgInstrumentor().instrument_connection( + cnx, + enable_commenter=False, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + self.assertEqual( + MockCursor.execute.call_args[0][0], + "Select 1", + ) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + class TestPostgresqlIntegrationAsync( PostgresqlIntegrationTestMixin, TestBase, IsolatedAsyncioTestCase diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py index 4c8b1b6e02..4f9e5aaf82 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py @@ -72,6 +72,12 @@ Psycopg2Instrumentor().instrument(enable_commenter=True) + # OR with specific connection + cnx = psycopg2.connect(database='Database') + instrumented_cnx = Psycopg2Instrumentor().instrument_connection( + cnx, + enable_commenter=True, + ) SQLCommenter with commenter_options *********************************** @@ -251,6 +257,9 @@ def _uninstrument(self, **kwargs): def instrument_connection( connection: PgConnection, tracer_provider: typing.Optional[trace_api.TracerProvider] = None, + enable_commenter: bool = False, + commenter_options: dict = None, + enable_attribute_commenter: bool = False, ) -> PgConnection: """Enable instrumentation in a psycopg2 connection. @@ -263,6 +272,12 @@ def instrument_connection( tracer_provider: opentelemetry.trace.TracerProvider, optional The TracerProvider to use for instrumentation. If not specified, the global TracerProvider will be used. + enable_commenter: bool, optional + Optional flag to enable/disable sqlcommenter (default False). + commenter_options: dict, optional + Optional configurations for tags to be appended at the sql query. + enable_attribute_commenter: + Optional flag to enable/disable addition of sqlcomment to span attribute (default False). Requires enable_commenter=True. Returns: An instrumented psycopg2 connection object. @@ -279,6 +294,9 @@ def instrument_connection( connection.cursor_factory = _new_cursor_factory( base_factory=original_cursor_factory, tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + enable_attribute_commenter=enable_attribute_commenter, ) Psycopg2Instrumentor._INSTRUMENTED_CONNECTIONS[connection] = ( original_cursor_factory @@ -348,7 +366,14 @@ def get_statement(self, cursor, args): return statement -def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None): +def _new_cursor_factory( + db_api: dbapi.DatabaseApiIntegration = None, + base_factory: typing.Optional[typing.Type[pg_cursor]] = None, + tracer_provider: typing.Optional[trace_api.TracerProvider] = None, + enable_commenter: bool = False, + commenter_options: dict = None, + enable_attribute_commenter: bool = False, +): if not db_api: db_api = DatabaseApiIntegration( __name__, @@ -356,6 +381,10 @@ def _new_cursor_factory(db_api=None, base_factory=None, tracer_provider=None): connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + enable_commenter=enable_commenter, + commenter_options=commenter_options, + connect_module=psycopg2, + enable_attribute_commenter=enable_attribute_commenter, ) base_factory = base_factory or pg_cursor diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py index 2b985dc180..4d67fb3042 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py @@ -21,6 +21,9 @@ from opentelemetry import trace from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor from opentelemetry.sdk import resources +from opentelemetry.semconv._incubating.attributes.db_attributes import ( + DB_STATEMENT, +) from opentelemetry.test.test_base import TestBase @@ -65,6 +68,7 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use return {"dbname": "test"} +# pylint: disable=too-many-public-methods class TestPostgresqlIntegration(TestBase): def setUp(self): super().setUp() @@ -351,6 +355,140 @@ def test_sqlcommenter_enabled(self, event_mocked): kwargs = event_mocked.call_args[1] self.assertEqual(kwargs["enable_commenter"], True) + def test_sqlcommenter_enabled_instrument_connection_defaults(self): + with ( + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.__version__", + "foobar", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.__libpq_version__", + "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.threadsafety", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.apilevel", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.paramstyle", + "test", + ), + ): + cnx = psycopg2.connect(database="test") + cnx = Psycopg2Instrumentor().instrument_connection( + cnx, + enable_commenter=True, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + span_id = format(span.get_span_context().span_id, "016x") + trace_id = format(span.get_span_context().trace_id, "032x") + trace_flags = int(span.get_span_context().trace_flags) + self.assertEqual( + MockCursor.execute.call_args[0][0], + f"Select 1 /*db_driver='psycopg2%%3Afoobar',dbapi_level='123',dbapi_threadsafety='123',driver_paramstyle='test',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + + def test_sqlcommenter_enabled_instrument_connection_stmt_enabled(self): + with ( + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.__version__", + "foobar", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.__libpq_version__", + "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.threadsafety", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.apilevel", + "123", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.paramstyle", + "test", + ), + ): + cnx = psycopg2.connect(database="test") + cnx = Psycopg2Instrumentor().instrument_connection( + cnx, + enable_commenter=True, + enable_attribute_commenter=True, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + span_id = format(span.get_span_context().span_id, "016x") + trace_id = format(span.get_span_context().trace_id, "032x") + trace_flags = int(span.get_span_context().trace_flags) + self.assertEqual( + MockCursor.execute.call_args[0][0], + f"Select 1 /*db_driver='psycopg2%%3Afoobar',dbapi_level='123',dbapi_threadsafety='123',driver_paramstyle='test',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + self.assertEqual( + span.attributes[DB_STATEMENT], + f"Select 1 /*db_driver='psycopg2%%3Afoobar',dbapi_level='123',dbapi_threadsafety='123',driver_paramstyle='test',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + + def test_sqlcommenter_enabled_instrument_connection_with_options(self): + with ( + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.__version__", + "foobar", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.__libpq_version__", + "foobaz", + ), + mock.patch( + "opentelemetry.instrumentation.psycopg2.psycopg2.threadsafety", + "123", + ), + ): + cnx = psycopg2.connect(database="test") + cnx = Psycopg2Instrumentor().instrument_connection( + cnx, + enable_commenter=True, + commenter_options={ + "dbapi_level": False, + "dbapi_threadsafety": True, + "driver_paramstyle": False, + "foo": "ignored", + }, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + span_id = format(span.get_span_context().span_id, "016x") + trace_id = format(span.get_span_context().trace_id, "032x") + trace_flags = int(span.get_span_context().trace_flags) + self.assertEqual( + MockCursor.execute.call_args[0][0], + f"Select 1 /*db_driver='psycopg2%%3Afoobar',dbapi_threadsafety='123',libpq_version='foobaz',traceparent='00-{trace_id}-{span_id}-{trace_flags:02x}'*/", + ) + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + @mock.patch("opentelemetry.instrumentation.dbapi.wrap_connect") def test_sqlcommenter_disabled(self, event_mocked): cnx = psycopg2.connect(database="test") @@ -361,6 +499,45 @@ def test_sqlcommenter_disabled(self, event_mocked): kwargs = event_mocked.call_args[1] self.assertEqual(kwargs["enable_commenter"], False) + def test_sqlcommenter_disabled_default_instrument_connection(self): + cnx = psycopg2.connect(database="test") + cnx = Psycopg2Instrumentor().instrument_connection( + cnx, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + self.assertEqual( + MockCursor.execute.call_args[0][0], + "Select 1", + ) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + + def test_sqlcommenter_disabled_explicit_instrument_connection(self): + cnx = psycopg2.connect(database="test") + cnx = Psycopg2Instrumentor().instrument_connection( + cnx, + enable_commenter=False, + ) + query = "Select 1" + cursor = cnx.cursor() + cursor.execute(query) + self.assertEqual( + MockCursor.execute.call_args[0][0], + "Select 1", + ) + spans_list = self.memory_exporter.get_finished_spans() + span = spans_list[0] + self.assertEqual( + span.attributes[DB_STATEMENT], + "Select 1", + ) + def test_no_op_tracer_provider(self): Psycopg2Instrumentor().instrument( tracer_provider=trace.NoOpTracerProvider()