diff --git a/api/experimentation/dataclasses.py b/api/experimentation/dataclasses.py new file mode 100644 index 000000000000..6cb10b9e0a49 --- /dev/null +++ b/api/experimentation/dataclasses.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True) +class WarehouseEventStats: + total_events_received: int + unique_events_count: int diff --git a/api/experimentation/models.py b/api/experimentation/models.py index c2c1f9712b1e..d4caa50e3c01 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -1,3 +1,5 @@ +import typing + from django.db import models from django.db.models import Q from django_lifecycle import ( # type: ignore[import-untyped] @@ -14,6 +16,9 @@ delete_environment_key_from_ingestion, ) +if typing.TYPE_CHECKING: + from experimentation.dataclasses import WarehouseEventStats + class WarehouseType(models.TextChoices): FLAGSMITH = "flagsmith", "Flagsmith" @@ -49,6 +54,10 @@ class WarehouseConnection(LifecycleModelMixin, SoftDeleteExportableModel): # ty ) created_at = models.DateTimeField(auto_now_add=True) + # Populated at serialization time for flagsmith connections from ClickHouse; + # never persisted to the database. + event_stats: "WarehouseEventStats | None" = None + class Meta: constraints = [ models.UniqueConstraint( diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index f0fefecf0a92..d9bd3f0bde6c 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -3,6 +3,7 @@ from rest_framework import serializers from environments.models import Environment +from experimentation.dataclasses import WarehouseEventStats from experimentation.models import ( Experiment, WarehouseConnection, @@ -17,10 +18,21 @@ class WarehouseConnectionSerializer(serializers.ModelSerializer): # type: ignore[type-arg] name = serializers.CharField(max_length=255, required=False) config = serializers.JSONField(default=None, required=False, allow_null=True) + total_events_received = serializers.SerializerMethodField() + unique_events_count = serializers.SerializerMethodField() class Meta: model = WarehouseConnection - fields = ("id", "warehouse_type", "status", "name", "config", "created_at") + fields = ( + "id", + "warehouse_type", + "status", + "name", + "config", + "created_at", + "total_events_received", + "unique_events_count", + ) read_only_fields = ("id", "status", "created_at") def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: @@ -57,6 +69,14 @@ def create( result: WarehouseConnection = super().create(validated_data) return result + def get_total_events_received(self, obj: WarehouseConnection) -> int | None: + stats: WarehouseEventStats | None = obj.event_stats + return stats.total_events_received if stats else None + + def get_unique_events_count(self, obj: WarehouseConnection) -> int | None: + stats: WarehouseEventStats | None = obj.event_stats + return stats.unique_events_count if stats else None + @staticmethod def _generate_name(warehouse_type: str, environment: Environment) -> str: label = WarehouseType(warehouse_type).label diff --git a/api/experimentation/services.py b/api/experimentation/services.py index b7137658269a..f3cd247d4e1f 100644 --- a/api/experimentation/services.py +++ b/api/experimentation/services.py @@ -3,13 +3,22 @@ import typing from functools import lru_cache +import structlog from clickhouse_driver import Client +from clickhouse_driver.util.helpers import parse_url from django.conf import settings from django.utils import timezone from audit.models import AuditLog from audit.related_object_type import RelatedObjectType from experimentation.constants import EXPERIMENT_FLAG, WAREHOUSE_CONNECTION_FLAG +from experimentation.dataclasses import WarehouseEventStats +from experimentation.models import ( + VALID_STATUS_TRANSITIONS, + ExperimentStatus, + WarehouseConnectionStatus, + WarehouseType, +) from integrations.flagsmith.client import get_openfeature_client if typing.TYPE_CHECKING: @@ -17,6 +26,11 @@ from organisations.models import Organisation from users.models import FFAdminUser +logger = structlog.get_logger("warehouse") + +CLICKHOUSE_CONNECT_TIMEOUT_SECONDS = 5 +CLICKHOUSE_QUERY_TIMEOUT_SECONDS = 30 + def is_warehouse_feature_enabled(organisation: Organisation) -> bool: return get_openfeature_client().get_boolean_value( @@ -39,9 +53,13 @@ def _get_clickhouse_client() -> Client: """Build a clickhouse-driver client for the experimentation event store. The database is taken from the DSN path, so queries can reference the - `events` table unqualified. + `events` table unqualified. Connect and query timeouts are bounded unless the + DSN overrides them. """ - return Client.from_url(settings.EXPERIMENTATION_CLICKHOUSE_URL) + host, kwargs = parse_url(settings.EXPERIMENTATION_CLICKHOUSE_URL) + kwargs.setdefault("connect_timeout", CLICKHOUSE_CONNECT_TIMEOUT_SECONDS) + kwargs.setdefault("send_receive_timeout", CLICKHOUSE_QUERY_TIMEOUT_SECONDS) + return Client(host, **kwargs) def get_unique_event_names(environment_key: str) -> list[str]: @@ -56,6 +74,20 @@ def get_unique_event_names(environment_key: str) -> list[str]: return [row[0] for row in rows] +def get_warehouse_event_stats(environment_key: str) -> WarehouseEventStats: + """Return event counts recorded for `environment_key` in the warehouse.""" + rows = _get_clickhouse_client().execute( + "SELECT count() AS total, uniqExact(event) AS unique " + "FROM events WHERE environment_key = %(environment_key)s", + {"environment_key": environment_key}, + ) + total, unique = rows[0] if rows else (0, 0) + return WarehouseEventStats( + total_events_received=int(total), + unique_events_count=int(unique), + ) + + def _resolve_audit_log_author( user: FFAdminUser, ) -> dict[str, int | None]: @@ -107,8 +139,6 @@ def transition_experiment_status( target_status: str, user: FFAdminUser, ) -> Experiment: - from experimentation.models import VALID_STATUS_TRANSITIONS, ExperimentStatus - valid_targets = VALID_STATUS_TRANSITIONS.get(experiment.status, set()) if target_status not in valid_targets: raise ValueError( @@ -125,3 +155,59 @@ def transition_experiment_status( experiment.save() create_experiment_audit_log(experiment, user, action=target_status) return experiment + + +def mark_warehouse_pending_connection( + connection: WarehouseConnection, +) -> WarehouseConnection: + """Move a connection from created to pending_connection. No-op for any + other status.""" + if connection.status != WarehouseConnectionStatus.CREATED: + return connection + + connection.status = WarehouseConnectionStatus.PENDING_CONNECTION + connection.save(update_fields=["status"]) + logger.info( + "connection.test_event_sent", + environment__id=connection.environment_id, + organisation__id=connection.environment.project.organisation_id, + ) + return connection + + +def refresh_warehouse_connection_status( + connection: WarehouseConnection, + stats: WarehouseEventStats, +) -> WarehouseConnection: + """Set a pending connection to connected when the warehouse has received at + least one event. No-op otherwise.""" + if ( + connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + and stats.total_events_received > 0 + ): + connection.status = WarehouseConnectionStatus.CONNECTED + connection.save(update_fields=["status"]) + logger.info( + "connection.connected", + environment__id=connection.environment_id, + organisation__id=connection.environment.project.organisation_id, + ) + return connection + + +def annotate_warehouse_event_stats( + connection: WarehouseConnection, + environment_key: str, +) -> None: + """Attach live warehouse event stats to a flagsmith connection. No-op for + non-flagsmith connections or when no warehouse is configured; leaves stats + unset when the warehouse is unreachable. Read-only: never changes status.""" + if ( + connection.warehouse_type != WarehouseType.FLAGSMITH + or not settings.EXPERIMENTATION_CLICKHOUSE_URL + ): + return + try: + connection.event_stats = get_warehouse_event_stats(environment_key) + except Exception: + return diff --git a/api/experimentation/views.py b/api/experimentation/views.py index c24b812effc4..8615e6e88f1e 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -16,6 +16,7 @@ Experiment, ExperimentStatus, WarehouseConnection, + WarehouseType, ) from experimentation.permissions import ( ExperimentPermission, @@ -27,8 +28,11 @@ WarehouseConnectionSerializer, ) from experimentation.services import ( + annotate_warehouse_event_stats, create_experiment_audit_log, create_warehouse_audit_log, + mark_warehouse_pending_connection, + refresh_warehouse_connection_status, transition_experiment_status, ) from users.models import FFAdminUser @@ -71,6 +75,35 @@ def perform_destroy(self, instance: WarehouseConnection) -> None: ) instance.delete() + def list(self, request: Request, *args: object, **kwargs: object) -> Response: + environment_api_key: str = self.kwargs["environment_api_key"] + connections = list(self.filter_queryset(self.get_queryset())) + for connection in connections: + annotate_warehouse_event_stats(connection, environment_api_key) + serializer = self.get_serializer(connections, many=True) + return Response(serializer.data) + + def retrieve(self, request: Request, *args: object, **kwargs: object) -> Response: + connection = self.get_object() + annotate_warehouse_event_stats(connection, self.kwargs["environment_api_key"]) + serializer = self.get_serializer(connection) + return Response(serializer.data) + + @action(detail=True, methods=["post"], url_path="test-warehouse-connection") + def test_warehouse_connection(self, request: Request, **kwargs: object) -> Response: + connection: WarehouseConnection = self.get_object() + if connection.warehouse_type != WarehouseType.FLAGSMITH: + return Response( + {"detail": "Test events are only supported for Flagsmith warehouses."}, + status=status.HTTP_400_BAD_REQUEST, + ) + mark_warehouse_pending_connection(connection) + annotate_warehouse_event_stats(connection, self.kwargs["environment_api_key"]) + if connection.event_stats is not None: + refresh_warehouse_connection_status(connection, connection.event_stats) + serializer = self.get_serializer(connection) + return Response(serializer.data) + def create(self, request: Request, *args: object, **kwargs: object) -> Response: environment = self._get_environment() serializer = self.get_serializer(data=request.data) diff --git a/api/tests/unit/experimentation/test_serializers.py b/api/tests/unit/experimentation/test_serializers.py index 50a8e958e72a..a58666fb24fa 100644 --- a/api/tests/unit/experimentation/test_serializers.py +++ b/api/tests/unit/experimentation/test_serializers.py @@ -1,6 +1,7 @@ import pytest from environments.models import Environment +from experimentation.dataclasses import WarehouseEventStats from experimentation.models import ( WarehouseConnection, WarehouseConnectionStatus, @@ -130,3 +131,23 @@ def test_create__flagsmith_with_config__raises_validation_error( # When / Then assert not serializer.is_valid() assert "config" in serializer.errors + + +def test_warehouse_serializer__event_stats_attached__serializes_counts() -> None: + # Given + connection = WarehouseConnection( + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.CONNECTED, + ) + connection.event_stats = WarehouseEventStats( + total_events_received=7, + unique_events_count=2, + ) + + # When + data = WarehouseConnectionSerializer(connection).data + + # Then + assert data["total_events_received"] == 7 + assert data["unique_events_count"] == 2 diff --git a/api/tests/unit/experimentation/test_services.py b/api/tests/unit/experimentation/test_services.py index f4e9bd477187..966214a7a2b0 100644 --- a/api/tests/unit/experimentation/test_services.py +++ b/api/tests/unit/experimentation/test_services.py @@ -1,10 +1,19 @@ +import pytest from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture +from environments.models import Environment from experimentation import services +from experimentation.dataclasses import WarehouseEventStats +from experimentation.models import ( + WarehouseConnection, + WarehouseConnectionStatus, + WarehouseType, +) -def test_get_clickhouse_client__configured_url__builds_client_from_url( +def test_get_clickhouse_client__configured_url__builds_client_with_timeouts( mocker: MockerFixture, settings: SettingsWrapper, ) -> None: @@ -12,17 +21,49 @@ def test_get_clickhouse_client__configured_url__builds_client_from_url( settings.EXPERIMENTATION_CLICKHOUSE_URL = ( "clickhouse://user:pass@ch.example.com:9440/flagsmith_exp?secure=True" ) - mock_from_url = mocker.patch("experimentation.services.Client.from_url") + mock_client_cls = mocker.patch("experimentation.services.Client") services._get_clickhouse_client.cache_clear() # When client = services._get_clickhouse_client() # Then - mock_from_url.assert_called_once_with( - "clickhouse://user:pass@ch.example.com:9440/flagsmith_exp?secure=True", + mock_client_cls.assert_called_once_with( + "ch.example.com", + port=9440, + database="flagsmith_exp", + user="user", + password="pass", + secure=True, + connect_timeout=services.CLICKHOUSE_CONNECT_TIMEOUT_SECONDS, + send_receive_timeout=services.CLICKHOUSE_QUERY_TIMEOUT_SECONDS, + ) + assert client is mock_client_cls.return_value + services._get_clickhouse_client.cache_clear() + + +def test_get_clickhouse_client__dsn_timeouts__are_preserved( + mocker: MockerFixture, + settings: SettingsWrapper, +) -> None: + # Given + settings.EXPERIMENTATION_CLICKHOUSE_URL = ( + "clickhouse://ch.example.com:9000/db?connect_timeout=1&send_receive_timeout=2" + ) + mock_client_cls = mocker.patch("experimentation.services.Client") + services._get_clickhouse_client.cache_clear() + + # When + services._get_clickhouse_client() + + # Then + mock_client_cls.assert_called_once_with( + "ch.example.com", + port=9000, + database="db", + connect_timeout=1, + send_receive_timeout=2, ) - assert client is mock_from_url.return_value services._get_clickhouse_client.cache_clear() @@ -66,3 +107,163 @@ def test_get_unique_event_names__no_events__returns_empty_list( # Then assert result == [] + + +@pytest.mark.parametrize( + "rows, expected_total, expected_unique", + [ + ([(42, 3)], 42, 3), + ([], 0, 0), + ], + ids=["events_present", "empty_result_set"], +) +def test_get_warehouse_event_stats__rows__returns_counts( + mocker: MockerFixture, + rows: list[tuple[int, int]], + expected_total: int, + expected_unique: int, +) -> None: + # Given + mock_client = mocker.Mock() + mock_client.execute.return_value = rows + mocker.patch( + "experimentation.services._get_clickhouse_client", + return_value=mock_client, + ) + + # When + result = services.get_warehouse_event_stats("env-key-123") + + # Then + assert result.total_events_received == expected_total + assert result.unique_events_count == expected_unique + mock_client.execute.assert_called_once_with( + "SELECT count() AS total, uniqExact(event) AS unique " + "FROM events WHERE environment_key = %(environment_key)s", + {"environment_key": "env-key-123"}, + ) + + +@pytest.mark.django_db +def test_mark_warehouse_pending_connection__created__transitions_to_pending( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + ) + + # When + result = services.mark_warehouse_pending_connection(connection) + + # Then + assert result.status == WarehouseConnectionStatus.PENDING_CONNECTION + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + assert log.events == [ + { + "level": "info", + "event": "connection.test_event_sent", + "environment__id": environment.id, + "organisation__id": environment.project.organisation_id, + } + ] + + +@pytest.mark.django_db +def test_mark_warehouse_pending_connection__already_pending__is_noop( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + + # When + result = services.mark_warehouse_pending_connection(connection) + + # Then + assert result.status == WarehouseConnectionStatus.PENDING_CONNECTION + assert log.events == [] + + +@pytest.mark.django_db +def test_refresh_warehouse_connection_status__events_exist__transitions_to_connected( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + stats = WarehouseEventStats(total_events_received=5, unique_events_count=1) + + # When + result = services.refresh_warehouse_connection_status(connection, stats) + + # Then + assert result.status == WarehouseConnectionStatus.CONNECTED + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.CONNECTED + assert log.events == [ + { + "level": "info", + "event": "connection.connected", + "environment__id": environment.id, + "organisation__id": environment.project.organisation_id, + } + ] + + +@pytest.mark.django_db +def test_refresh_warehouse_connection_status__no_events__stays_pending( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + stats = WarehouseEventStats(total_events_received=0, unique_events_count=0) + + # When + result = services.refresh_warehouse_connection_status(connection, stats) + + # Then + assert result.status == WarehouseConnectionStatus.PENDING_CONNECTION + assert log.events == [] + + +@pytest.mark.django_db +def test_refresh_warehouse_connection_status__already_connected__is_noop( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.CONNECTED, + ) + stats = WarehouseEventStats(total_events_received=99, unique_events_count=4) + + # When + result = services.refresh_warehouse_connection_status(connection, stats) + + # Then + assert result.status == WarehouseConnectionStatus.CONNECTED + assert log.events == [] diff --git a/api/tests/unit/experimentation/test_views.py b/api/tests/unit/experimentation/test_views.py index 16c4f142986e..078a9faca9fa 100644 --- a/api/tests/unit/experimentation/test_views.py +++ b/api/tests/unit/experimentation/test_views.py @@ -1,17 +1,43 @@ import pytest from django.urls import reverse +from pytest_django.fixtures import SettingsWrapper +from pytest_mock import MockerFixture from rest_framework import status from rest_framework.test import APIClient from audit.models import AuditLog from audit.related_object_type import RelatedObjectType from environments.models import Environment -from experimentation.models import WarehouseConnection +from experimentation import services +from experimentation.dataclasses import WarehouseEventStats +from experimentation.models import ( + WarehouseConnection, + WarehouseConnectionStatus, + WarehouseType, +) from tests.types import EnableFeaturesFixture pytestmark = pytest.mark.django_db +@pytest.fixture(autouse=True) +def mock_clickhouse_stats( + mocker: MockerFixture, + settings: SettingsWrapper, +) -> object: + """Default every view test to a configured, empty warehouse. Tests that need + events re-patch experimentation.services.get_warehouse_event_stats; tests for the + unconfigured/erroring paths override the setting / raise.""" + settings.EXPERIMENTATION_CLICKHOUSE_URL = "clickhouse://localhost:9000/test" + services._get_clickhouse_client.cache_clear() + mock_client = mocker.Mock() + mock_client.execute.return_value = [(0, 0)] + return mocker.patch( + "experimentation.services._get_clickhouse_client", + return_value=mock_client, + ) + + def test_post__valid_data__returns_201_and_creates_connection( admin_client: APIClient, environment: Environment, @@ -745,3 +771,334 @@ def test_patch__exists__creates_audit_log( related_object_type=RelatedObjectType.WAREHOUSE_CONNECTION.name, ) assert "updated" in audit_log.log + + +@pytest.mark.parametrize( + "warehouse_type, config, expected_total, expected_unique", + [ + (WarehouseType.FLAGSMITH, None, 12, 3), + ( + WarehouseType.SNOWFLAKE, + {"account_identifier": "xy12345.us-east-1"}, + None, + None, + ), + ], + ids=["flagsmith", "snowflake"], +) +def test_get__warehouse_type__returns_expected_event_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, + mocker: MockerFixture, + warehouse_type: str, + config: dict[str, str] | None, + expected_total: int | None, + expected_unique: int | None, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + WarehouseConnection.objects.create( + environment=environment, + warehouse_type=warehouse_type, + name="Warehouse", + config=config, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=12, + unique_events_count=3, + ), + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json()[0] + assert data["total_events_received"] == expected_total + assert data["unique_events_count"] == expected_unique + + +def test_get__pending_connection_with_events__shows_stats_but_does_not_flip( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=1, + unique_events_count=1, + ), + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + data = response.json()[0] + assert data["total_events_received"] == 1 + assert data["unique_events_count"] == 1 + # GET is read-only: the flip happens on the POST endpoint, not here. + assert data["status"] == "pending_connection" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + + +@pytest.mark.parametrize( + "warehouse_type, config, expected_status", + [ + (WarehouseType.FLAGSMITH, None, status.HTTP_200_OK), + ( + WarehouseType.SNOWFLAKE, + {"account_identifier": "xy12345.us-east-1"}, + status.HTTP_400_BAD_REQUEST, + ), + ], + ids=["flagsmith", "snowflake"], +) +def test_test_warehouse_connection__warehouse_type__expected_status( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_type: str, + config: dict[str, str] | None, + expected_status: int, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=warehouse_type, + name="Warehouse", + config=config, + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == expected_status + if expected_status == status.HTTP_200_OK: + assert response.json()["status"] == "pending_connection" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + + +def test_test_warehouse_connection__pending_with_events__flips_to_connected( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=1, + unique_events_count=1, + ), + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "connected" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.CONNECTED + + +def test_test_warehouse_connection__pending_no_events__stays_pending( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given (autouse mock returns 0 events) + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "pending_connection" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + + +def test_test_warehouse_connection__clickhouse_unreachable__stays_pending( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + side_effect=OSError("clickhouse unreachable"), + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["status"] == "pending_connection" + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + + +def test_test_warehouse_connection__already_connected__is_noop( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.CONNECTED, + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "connected" + + +def test_test_warehouse_connection__non_admin__returns_403( + staff_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, warehouse_connection.id], + ) + + # When + response = staff_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_get__clickhouse_unconfigured__returns_200_without_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, + warehouse_connection_url: str, + settings: SettingsWrapper, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + settings.EXPERIMENTATION_CLICKHOUSE_URL = None + stats_spy = mocker.patch("experimentation.services.get_warehouse_event_stats") + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json()[0] + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + stats_spy.assert_not_called() + + +def test_get__clickhouse_errors__returns_200_without_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + side_effect=OSError("clickhouse unreachable"), + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json()[0] + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + # connection stays pending (GET never writes) + assert data["status"] == "pending_connection" diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 3133424c66bd..3c7df584962c 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -442,6 +442,24 @@ Attributes: - `feature_name` - `sentry_action` +### `warehouse.connection.connected` + +Logged at `info` from: + - `api/experimentation/services.py:190` + +Attributes: + - `environment.id` + - `organisation.id` + +### `warehouse.connection.test_event_sent` + +Logged at `info` from: + - `api/experimentation/services.py:170` + +Attributes: + - `environment.id` + - `organisation.id` + ### `workflows.change_request.committed` Logged at `info` from: