From 2c90b7b86e6152a4e812a9241055d1e95639c93a Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 2 Jun 2026 15:44:39 +0200 Subject: [PATCH 1/7] feat(experimentation): add warehouse event stats and status services --- api/experimentation/dataclasses.py | 7 + api/experimentation/models.py | 9 + api/experimentation/services.py | 88 +++++++- .../unit/experimentation/test_services.py | 198 ++++++++++++++++++ .../observability/_events-catalogue.md | 18 ++ 5 files changed, 318 insertions(+), 2 deletions(-) create mode 100644 api/experimentation/dataclasses.py diff --git a/api/experimentation/dataclasses.py b/api/experimentation/dataclasses.py new file mode 100644 index 000000000000..6cb10b9e0a49 --- /dev/null +++ b/api/experimentation/dataclasses.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True) +class WarehouseEventStats: + total_events_received: int + unique_events_count: int diff --git a/api/experimentation/models.py b/api/experimentation/models.py index c2c1f9712b1e..d4caa50e3c01 100644 --- a/api/experimentation/models.py +++ b/api/experimentation/models.py @@ -1,3 +1,5 @@ +import typing + from django.db import models from django.db.models import Q from django_lifecycle import ( # type: ignore[import-untyped] @@ -14,6 +16,9 @@ delete_environment_key_from_ingestion, ) +if typing.TYPE_CHECKING: + from experimentation.dataclasses import WarehouseEventStats + class WarehouseType(models.TextChoices): FLAGSMITH = "flagsmith", "Flagsmith" @@ -49,6 +54,10 @@ class WarehouseConnection(LifecycleModelMixin, SoftDeleteExportableModel): # ty ) created_at = models.DateTimeField(auto_now_add=True) + # Populated at serialization time for flagsmith connections from ClickHouse; + # never persisted to the database. + event_stats: "WarehouseEventStats | None" = None + class Meta: constraints = [ models.UniqueConstraint( diff --git a/api/experimentation/services.py b/api/experimentation/services.py index b7137658269a..934c005f4c77 100644 --- a/api/experimentation/services.py +++ b/api/experimentation/services.py @@ -3,6 +3,7 @@ import typing from functools import lru_cache +import structlog from clickhouse_driver import Client from django.conf import settings from django.utils import timezone @@ -10,6 +11,13 @@ from audit.models import AuditLog from audit.related_object_type import RelatedObjectType from experimentation.constants import EXPERIMENT_FLAG, WAREHOUSE_CONNECTION_FLAG +from experimentation.dataclasses import WarehouseEventStats +from experimentation.models import ( + VALID_STATUS_TRANSITIONS, + ExperimentStatus, + WarehouseConnectionStatus, + WarehouseType, +) from integrations.flagsmith.client import get_openfeature_client if typing.TYPE_CHECKING: @@ -17,6 +25,8 @@ from organisations.models import Organisation from users.models import FFAdminUser +logger = structlog.get_logger("warehouse") + def is_warehouse_feature_enabled(organisation: Organisation) -> bool: return get_openfeature_client().get_boolean_value( @@ -56,6 +66,20 @@ def get_unique_event_names(environment_key: str) -> list[str]: return [row[0] for row in rows] +def get_warehouse_event_stats(environment_key: str) -> WarehouseEventStats: + """Return event counts recorded for `environment_key` in the warehouse.""" + rows = _get_clickhouse_client().execute( + "SELECT count() AS total, uniqExact(event) AS unique " + "FROM events WHERE environment_key = %(environment_key)s", + {"environment_key": environment_key}, + ) + total, unique = rows[0] if rows else (0, 0) + return WarehouseEventStats( + total_events_received=int(total), + unique_events_count=int(unique), + ) + + def _resolve_audit_log_author( user: FFAdminUser, ) -> dict[str, int | None]: @@ -107,8 +131,6 @@ def transition_experiment_status( target_status: str, user: FFAdminUser, ) -> Experiment: - from experimentation.models import VALID_STATUS_TRANSITIONS, ExperimentStatus - valid_targets = VALID_STATUS_TRANSITIONS.get(experiment.status, set()) if target_status not in valid_targets: raise ValueError( @@ -125,3 +147,65 @@ def transition_experiment_status( experiment.save() create_experiment_audit_log(experiment, user, action=target_status) return experiment + + +def mark_warehouse_pending_connection( + connection: WarehouseConnection, +) -> WarehouseConnection: + """Move a connection from created to pending_connection. No-op for any + other status.""" + if connection.status != WarehouseConnectionStatus.CREATED: + return connection + + connection.status = WarehouseConnectionStatus.PENDING_CONNECTION + connection.save() + logger.info( + "connection.test_event_sent", + environment__id=connection.environment_id, + organisation__id=connection.environment.project.organisation_id, + ) + return connection + + +def refresh_warehouse_connection_status( + connection: WarehouseConnection, + stats: WarehouseEventStats, +) -> WarehouseConnection: + """Set a pending connection to connected when the warehouse has received at + least one event. No-op otherwise.""" + if ( + connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + and stats.total_events_received > 0 + ): + connection.status = WarehouseConnectionStatus.CONNECTED + connection.save() + logger.info( + "connection.connected", + environment__id=connection.environment_id, + organisation__id=connection.environment.project.organisation_id, + ) + return connection + + +def annotate_warehouse_event_stats( + connection: WarehouseConnection, + environment_key: str, +) -> None: + """Attach warehouse event stats to a flagsmith connection and update its + status to match. No-op for non-flagsmith connections or when no warehouse + is configured; leaves the connection unchanged if the warehouse errors.""" + if ( + connection.warehouse_type != WarehouseType.FLAGSMITH + or not settings.EXPERIMENTATION_CLICKHOUSE_URL + ): + return + try: + stats = get_warehouse_event_stats(environment_key) + except Exception: + logger.exception( + "connection.event_stats_unavailable", + environment__id=connection.environment_id, + ) + return + connection.event_stats = stats + refresh_warehouse_connection_status(connection, stats) diff --git a/api/tests/unit/experimentation/test_services.py b/api/tests/unit/experimentation/test_services.py index f4e9bd477187..ba17fb650582 100644 --- a/api/tests/unit/experimentation/test_services.py +++ b/api/tests/unit/experimentation/test_services.py @@ -1,7 +1,16 @@ +import pytest from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture +from environments.models import Environment from experimentation import services +from experimentation.dataclasses import WarehouseEventStats +from experimentation.models import ( + WarehouseConnection, + WarehouseConnectionStatus, + WarehouseType, +) def test_get_clickhouse_client__configured_url__builds_client_from_url( @@ -66,3 +75,192 @@ def test_get_unique_event_names__no_events__returns_empty_list( # Then assert result == [] + + +def test_get_warehouse_event_stats__events_present__returns_counts( + mocker: MockerFixture, +) -> None: + # Given + mock_client = mocker.Mock() + mock_client.execute.return_value = [(42, 3)] + mocker.patch( + "experimentation.services._get_clickhouse_client", + return_value=mock_client, + ) + + # When + result = services.get_warehouse_event_stats("env-key-123") + + # Then + assert result.total_events_received == 42 + assert result.unique_events_count == 3 + mock_client.execute.assert_called_once_with( + "SELECT count() AS total, uniqExact(event) AS unique " + "FROM events WHERE environment_key = %(environment_key)s", + {"environment_key": "env-key-123"}, + ) + + +def test_get_warehouse_event_stats__non_int_clickhouse_values__casts_to_int( + mocker: MockerFixture, +) -> None: + # Given + mock_client = mocker.Mock() + mock_client.execute.return_value = [("42", "3")] + mocker.patch( + "experimentation.services._get_clickhouse_client", + return_value=mock_client, + ) + + # When + result = services.get_warehouse_event_stats("env-key-123") + + # Then + assert result.total_events_received == 42 + assert result.unique_events_count == 3 + assert isinstance(result.total_events_received, int) + assert isinstance(result.unique_events_count, int) + + +def test_get_warehouse_event_stats__empty_result_set__returns_zeroes( + mocker: MockerFixture, +) -> None: + # Given + mock_client = mocker.Mock() + mock_client.execute.return_value = [] + mocker.patch( + "experimentation.services._get_clickhouse_client", + return_value=mock_client, + ) + + # When + result = services.get_warehouse_event_stats("env-key-123") + + # Then + assert result.total_events_received == 0 + assert result.unique_events_count == 0 + + +@pytest.mark.django_db +def test_mark_warehouse_pending_connection__created__transitions_to_pending( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + ) + + # When + result = services.mark_warehouse_pending_connection(connection) + + # Then + assert result.status == WarehouseConnectionStatus.PENDING_CONNECTION + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + assert log.events == [ + { + "level": "info", + "event": "connection.test_event_sent", + "environment__id": environment.id, + "organisation__id": environment.project.organisation_id, + } + ] + + +@pytest.mark.django_db +def test_mark_warehouse_pending_connection__already_pending__is_noop( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + + # When + result = services.mark_warehouse_pending_connection(connection) + + # Then + assert result.status == WarehouseConnectionStatus.PENDING_CONNECTION + assert log.events == [] + + +@pytest.mark.django_db +def test_refresh_warehouse_connection_status__events_exist__transitions_to_connected( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + stats = WarehouseEventStats(total_events_received=5, unique_events_count=1) + + # When + result = services.refresh_warehouse_connection_status(connection, stats) + + # Then + assert result.status == WarehouseConnectionStatus.CONNECTED + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.CONNECTED + assert log.events == [ + { + "level": "info", + "event": "connection.connected", + "environment__id": environment.id, + "organisation__id": environment.project.organisation_id, + } + ] + + +@pytest.mark.django_db +def test_refresh_warehouse_connection_status__no_events__stays_pending( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + stats = WarehouseEventStats(total_events_received=0, unique_events_count=0) + + # When + result = services.refresh_warehouse_connection_status(connection, stats) + + # Then + assert result.status == WarehouseConnectionStatus.PENDING_CONNECTION + assert log.events == [] + + +@pytest.mark.django_db +def test_refresh_warehouse_connection_status__already_connected__is_noop( + environment: Environment, + log: StructuredLogCapture, +) -> None: + # Given + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.CONNECTED, + ) + stats = WarehouseEventStats(total_events_received=99, unique_events_count=4) + + # When + result = services.refresh_warehouse_connection_status(connection, stats) + + # Then + assert result.status == WarehouseConnectionStatus.CONNECTED + assert log.events == [] diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 3133424c66bd..d1b52a42db0c 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -442,6 +442,24 @@ Attributes: - `feature_name` - `sentry_action` +### `warehouse.connection.connected` + +Logged at `info` from: + - `api/experimentation/services.py:186` + +Attributes: + - `environment.id` + - `organisation.id` + +### `warehouse.connection.test_event_sent` + +Logged at `info` from: + - `api/experimentation/services.py:164` + +Attributes: + - `environment.id` + - `organisation.id` + ### `workflows.change_request.committed` Logged at `info` from: From 518c78ac3cb8b160d1506efcea458dcd55eb8c66 Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 2 Jun 2026 15:45:09 +0200 Subject: [PATCH 2/7] feat(experimentation): add event stats to serializer --- api/experimentation/serializers.py | 22 ++++++++++- .../unit/experimentation/test_serializers.py | 37 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index f0fefecf0a92..d9bd3f0bde6c 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -3,6 +3,7 @@ from rest_framework import serializers from environments.models import Environment +from experimentation.dataclasses import WarehouseEventStats from experimentation.models import ( Experiment, WarehouseConnection, @@ -17,10 +18,21 @@ class WarehouseConnectionSerializer(serializers.ModelSerializer): # type: ignore[type-arg] name = serializers.CharField(max_length=255, required=False) config = serializers.JSONField(default=None, required=False, allow_null=True) + total_events_received = serializers.SerializerMethodField() + unique_events_count = serializers.SerializerMethodField() class Meta: model = WarehouseConnection - fields = ("id", "warehouse_type", "status", "name", "config", "created_at") + fields = ( + "id", + "warehouse_type", + "status", + "name", + "config", + "created_at", + "total_events_received", + "unique_events_count", + ) read_only_fields = ("id", "status", "created_at") def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: @@ -57,6 +69,14 @@ def create( result: WarehouseConnection = super().create(validated_data) return result + def get_total_events_received(self, obj: WarehouseConnection) -> int | None: + stats: WarehouseEventStats | None = obj.event_stats + return stats.total_events_received if stats else None + + def get_unique_events_count(self, obj: WarehouseConnection) -> int | None: + stats: WarehouseEventStats | None = obj.event_stats + return stats.unique_events_count if stats else None + @staticmethod def _generate_name(warehouse_type: str, environment: Environment) -> str: label = WarehouseType(warehouse_type).label diff --git a/api/tests/unit/experimentation/test_serializers.py b/api/tests/unit/experimentation/test_serializers.py index 50a8e958e72a..4ccea58e0602 100644 --- a/api/tests/unit/experimentation/test_serializers.py +++ b/api/tests/unit/experimentation/test_serializers.py @@ -1,6 +1,7 @@ import pytest from environments.models import Environment +from experimentation.dataclasses import WarehouseEventStats from experimentation.models import ( WarehouseConnection, WarehouseConnectionStatus, @@ -130,3 +131,39 @@ def test_create__flagsmith_with_config__raises_validation_error( # When / Then assert not serializer.is_valid() assert "config" in serializer.errors + + +def test_warehouse_serializer__event_stats_attached__serializes_counts() -> None: + # Given + connection = WarehouseConnection( + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.CONNECTED, + ) + connection.event_stats = WarehouseEventStats( + total_events_received=7, + unique_events_count=2, + ) + + # When + data = WarehouseConnectionSerializer(connection).data + + # Then + assert data["total_events_received"] == 7 + assert data["unique_events_count"] == 2 + + +def test_warehouse_serializer__no_event_stats__serializes_nulls() -> None: + # Given + connection = WarehouseConnection( + warehouse_type=WarehouseType.SNOWFLAKE, + name="Snowflake Warehouse", + status=WarehouseConnectionStatus.CREATED, + ) + + # When + data = WarehouseConnectionSerializer(connection).data + + # Then + assert data["total_events_received"] is None + assert data["unique_events_count"] is None From 0300658328a5c0e09914fb9dee33d26623ec62d1 Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 2 Jun 2026 15:45:34 +0200 Subject: [PATCH 3/7] feat(experimentation): annotate stats and test connection endpoint --- api/experimentation/views.py | 33 ++ api/tests/unit/experimentation/test_views.py | 323 ++++++++++++++++++- 2 files changed, 355 insertions(+), 1 deletion(-) diff --git a/api/experimentation/views.py b/api/experimentation/views.py index c24b812effc4..7b18c53533ba 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -16,6 +16,7 @@ Experiment, ExperimentStatus, WarehouseConnection, + WarehouseType, ) from experimentation.permissions import ( ExperimentPermission, @@ -27,8 +28,10 @@ WarehouseConnectionSerializer, ) from experimentation.services import ( + annotate_warehouse_event_stats, create_experiment_audit_log, create_warehouse_audit_log, + mark_warehouse_pending_connection, transition_experiment_status, ) from users.models import FFAdminUser @@ -71,6 +74,36 @@ def perform_destroy(self, instance: WarehouseConnection) -> None: ) instance.delete() + def list(self, request: Request, *args: object, **kwargs: object) -> Response: + environment_api_key: str = self.kwargs["environment_api_key"] + connections = list(self.filter_queryset(self.get_queryset())) + for connection in connections: + annotate_warehouse_event_stats(connection, environment_api_key) + serializer = self.get_serializer(connections, many=True) + return Response(serializer.data) + + def retrieve(self, request: Request, *args: object, **kwargs: object) -> Response: + connection = self.get_object() + annotate_warehouse_event_stats( + connection, self.kwargs["environment_api_key"] + ) + serializer = self.get_serializer(connection) + return Response(serializer.data) + + @action(detail=True, methods=["post"], url_path="test-warehouse-connection") + def test_warehouse_connection( + self, request: Request, **kwargs: object + ) -> Response: + connection: WarehouseConnection = self.get_object() + if connection.warehouse_type != WarehouseType.FLAGSMITH: + return Response( + {"detail": "Test events are only supported for Flagsmith warehouses."}, + status=status.HTTP_400_BAD_REQUEST, + ) + mark_warehouse_pending_connection(connection) + serializer = self.get_serializer(connection) + return Response(serializer.data) + def create(self, request: Request, *args: object, **kwargs: object) -> Response: environment = self._get_environment() serializer = self.get_serializer(data=request.data) diff --git a/api/tests/unit/experimentation/test_views.py b/api/tests/unit/experimentation/test_views.py index 16c4f142986e..6de8f6597ec9 100644 --- a/api/tests/unit/experimentation/test_views.py +++ b/api/tests/unit/experimentation/test_views.py @@ -1,17 +1,44 @@ import pytest from django.urls import reverse +from pytest_django.fixtures import SettingsWrapper +from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture from rest_framework import status from rest_framework.test import APIClient from audit.models import AuditLog from audit.related_object_type import RelatedObjectType from environments.models import Environment -from experimentation.models import WarehouseConnection +from experimentation import services +from experimentation.dataclasses import WarehouseEventStats +from experimentation.models import ( + WarehouseConnection, + WarehouseConnectionStatus, + WarehouseType, +) from tests.types import EnableFeaturesFixture pytestmark = pytest.mark.django_db +@pytest.fixture(autouse=True) +def mock_clickhouse_stats( + mocker: MockerFixture, + settings: SettingsWrapper, +) -> object: + """Default every view test to a configured, empty warehouse. Tests that need + events re-patch experimentation.services.get_warehouse_event_stats; tests for the + unconfigured/erroring paths override the setting / raise.""" + settings.EXPERIMENTATION_CLICKHOUSE_URL = "clickhouse://localhost:9000/test" + services._get_clickhouse_client.cache_clear() + mock_client = mocker.Mock() + mock_client.execute.return_value = [(0, 0)] + return mocker.patch( + "experimentation.services._get_clickhouse_client", + return_value=mock_client, + ) + + def test_post__valid_data__returns_201_and_creates_connection( admin_client: APIClient, environment: Environment, @@ -745,3 +772,297 @@ def test_patch__exists__creates_audit_log( related_object_type=RelatedObjectType.WAREHOUSE_CONNECTION.name, ) assert "updated" in audit_log.log + + +def test_get__flagsmith_connection__includes_event_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, + warehouse_connection_url: str, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=12, + unique_events_count=3, + ), + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json()[0] + assert data["total_events_received"] == 12 + assert data["unique_events_count"] == 3 + + +def test_get__pending_connection_with_events__flips_to_connected( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=1, + unique_events_count=1, + ), + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.json()[0]["status"] == "connected" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.CONNECTED + + +def test_get__pending_connection_no_events__stays_pending( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, +) -> None: + # Given (autouse mock returns 0 events) + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.json()[0]["status"] == "pending_connection" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + + +def test_get__snowflake_connection__skips_clickhouse_and_nulls_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.SNOWFLAKE, + name="Snowflake Warehouse", + config={"account_identifier": "xy12345.us-east-1"}, + status=WarehouseConnectionStatus.CREATED, + ) + stats_spy = mocker.patch( + "experimentation.services.get_warehouse_event_stats", + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + data = response.json()[0] + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + stats_spy.assert_not_called() + + +def test_get_detail__flagsmith_connection__includes_event_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=8, + unique_events_count=2, + ), + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-detail", + args=[environment.api_key, warehouse_connection.id], + ) + + # When + response = admin_client.get(url) + + # Then + assert response.json()["total_events_received"] == 8 + assert response.json()["unique_events_count"] == 2 + + +def _test_warehouse_connection_url(environment: Environment, connection_id: int) -> str: + return reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection_id], + ) + + +def test_test_warehouse_connection__created_flagsmith__returns_200_and_pending( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + url = _test_warehouse_connection_url(environment, warehouse_connection.id) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "pending_connection" + warehouse_connection.refresh_from_db() + assert warehouse_connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + + +def test_test_warehouse_connection__snowflake__returns_400( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.SNOWFLAKE, + name="Snowflake Warehouse", + config={"account_identifier": "xy12345.us-east-1"}, + ) + url = _test_warehouse_connection_url(environment, connection.id) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_test_warehouse_connection__already_connected__is_noop( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.CONNECTED, + ) + url = _test_warehouse_connection_url(environment, connection.id) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "connected" + + +def test_test_warehouse_connection__non_admin__returns_403( + staff_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + url = _test_warehouse_connection_url(environment, warehouse_connection.id) + + # When + response = staff_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_403_FORBIDDEN + + +def test_get__clickhouse_unconfigured__returns_200_without_stats( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection: WarehouseConnection, + warehouse_connection_url: str, + settings: SettingsWrapper, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + settings.EXPERIMENTATION_CLICKHOUSE_URL = None + stats_spy = mocker.patch("experimentation.services.get_warehouse_event_stats") + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json()[0] + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + stats_spy.assert_not_called() + + +def test_get__clickhouse_errors__returns_200_without_stats_and_logs( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + warehouse_connection_url: str, + mocker: MockerFixture, + log: StructuredLogCapture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + side_effect=OSError("clickhouse unreachable"), + ) + + # When + response = admin_client.get(warehouse_connection_url) + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json()[0] + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + # connection stays pending (no flip on error) + assert data["status"] == "pending_connection" + # an actionable event was logged + assert any( + e["event"] == "connection.event_stats_unavailable" for e in log.events + ) From b9cfa772587b9c07e2da12c964eb24155ab1c49a Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 2 Jun 2026 16:03:19 +0200 Subject: [PATCH 4/7] feat(experimentation): improved tests --- api/experimentation/serializers.py | 8 +- .../unit/experimentation/test_serializers.py | 16 -- .../unit/experimentation/test_services.py | 59 ++---- api/tests/unit/experimentation/test_views.py | 168 +++++++----------- 4 files changed, 84 insertions(+), 167 deletions(-) diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index d9bd3f0bde6c..46309675ba84 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -69,13 +69,13 @@ def create( result: WarehouseConnection = super().create(validated_data) return result - def get_total_events_received(self, obj: WarehouseConnection) -> int | None: + def get_total_events_received(self, obj: WarehouseConnection) -> int: stats: WarehouseEventStats | None = obj.event_stats - return stats.total_events_received if stats else None + return stats.total_events_received if stats else 0 - def get_unique_events_count(self, obj: WarehouseConnection) -> int | None: + def get_unique_events_count(self, obj: WarehouseConnection) -> int: stats: WarehouseEventStats | None = obj.event_stats - return stats.unique_events_count if stats else None + return stats.unique_events_count if stats else 0 @staticmethod def _generate_name(warehouse_type: str, environment: Environment) -> str: diff --git a/api/tests/unit/experimentation/test_serializers.py b/api/tests/unit/experimentation/test_serializers.py index 4ccea58e0602..a58666fb24fa 100644 --- a/api/tests/unit/experimentation/test_serializers.py +++ b/api/tests/unit/experimentation/test_serializers.py @@ -151,19 +151,3 @@ def test_warehouse_serializer__event_stats_attached__serializes_counts() -> None # Then assert data["total_events_received"] == 7 assert data["unique_events_count"] == 2 - - -def test_warehouse_serializer__no_event_stats__serializes_nulls() -> None: - # Given - connection = WarehouseConnection( - warehouse_type=WarehouseType.SNOWFLAKE, - name="Snowflake Warehouse", - status=WarehouseConnectionStatus.CREATED, - ) - - # When - data = WarehouseConnectionSerializer(connection).data - - # Then - assert data["total_events_received"] is None - assert data["unique_events_count"] is None diff --git a/api/tests/unit/experimentation/test_services.py b/api/tests/unit/experimentation/test_services.py index ba17fb650582..a4b36bbffce4 100644 --- a/api/tests/unit/experimentation/test_services.py +++ b/api/tests/unit/experimentation/test_services.py @@ -77,12 +77,23 @@ def test_get_unique_event_names__no_events__returns_empty_list( assert result == [] -def test_get_warehouse_event_stats__events_present__returns_counts( +@pytest.mark.parametrize( + "rows, expected_total, expected_unique", + [ + ([(42, 3)], 42, 3), + ([], 0, 0), + ], + ids=["events_present", "empty_result_set"], +) +def test_get_warehouse_event_stats__rows__returns_counts( mocker: MockerFixture, + rows: list[tuple[int, int]], + expected_total: int, + expected_unique: int, ) -> None: # Given mock_client = mocker.Mock() - mock_client.execute.return_value = [(42, 3)] + mock_client.execute.return_value = rows mocker.patch( "experimentation.services._get_clickhouse_client", return_value=mock_client, @@ -92,8 +103,8 @@ def test_get_warehouse_event_stats__events_present__returns_counts( result = services.get_warehouse_event_stats("env-key-123") # Then - assert result.total_events_received == 42 - assert result.unique_events_count == 3 + assert result.total_events_received == expected_total + assert result.unique_events_count == expected_unique mock_client.execute.assert_called_once_with( "SELECT count() AS total, uniqExact(event) AS unique " "FROM events WHERE environment_key = %(environment_key)s", @@ -101,46 +112,6 @@ def test_get_warehouse_event_stats__events_present__returns_counts( ) -def test_get_warehouse_event_stats__non_int_clickhouse_values__casts_to_int( - mocker: MockerFixture, -) -> None: - # Given - mock_client = mocker.Mock() - mock_client.execute.return_value = [("42", "3")] - mocker.patch( - "experimentation.services._get_clickhouse_client", - return_value=mock_client, - ) - - # When - result = services.get_warehouse_event_stats("env-key-123") - - # Then - assert result.total_events_received == 42 - assert result.unique_events_count == 3 - assert isinstance(result.total_events_received, int) - assert isinstance(result.unique_events_count, int) - - -def test_get_warehouse_event_stats__empty_result_set__returns_zeroes( - mocker: MockerFixture, -) -> None: - # Given - mock_client = mocker.Mock() - mock_client.execute.return_value = [] - mocker.patch( - "experimentation.services._get_clickhouse_client", - return_value=mock_client, - ) - - # When - result = services.get_warehouse_event_stats("env-key-123") - - # Then - assert result.total_events_received == 0 - assert result.unique_events_count == 0 - - @pytest.mark.django_db def test_mark_warehouse_pending_connection__created__transitions_to_pending( environment: Environment, diff --git a/api/tests/unit/experimentation/test_views.py b/api/tests/unit/experimentation/test_views.py index 6de8f6597ec9..09743de08927 100644 --- a/api/tests/unit/experimentation/test_views.py +++ b/api/tests/unit/experimentation/test_views.py @@ -774,16 +774,38 @@ def test_patch__exists__creates_audit_log( assert "updated" in audit_log.log -def test_get__flagsmith_connection__includes_event_stats( +@pytest.mark.parametrize( + "warehouse_type, config, expected_total, expected_unique", + [ + (WarehouseType.FLAGSMITH, None, 12, 3), + ( + WarehouseType.SNOWFLAKE, + {"account_identifier": "xy12345.us-east-1"}, + 0, + 0, + ), + ], + ids=["flagsmith", "snowflake"], +) +def test_get__warehouse_type__returns_expected_event_stats( admin_client: APIClient, environment: Environment, enable_features: EnableFeaturesFixture, - warehouse_connection: WarehouseConnection, warehouse_connection_url: str, mocker: MockerFixture, + warehouse_type: str, + config: dict[str, str] | None, + expected_total: int, + expected_unique: int, ) -> None: # Given enable_features("experimentation_warehouse_connection") + WarehouseConnection.objects.create( + environment=environment, + warehouse_type=warehouse_type, + name="Warehouse", + config=config, + ) mocker.patch( "experimentation.services.get_warehouse_event_stats", return_value=WarehouseEventStats( @@ -798,8 +820,8 @@ def test_get__flagsmith_connection__includes_event_stats( # Then assert response.status_code == status.HTTP_200_OK data = response.json()[0] - assert data["total_events_received"] == 12 - assert data["unique_events_count"] == 3 + assert data["total_events_received"] == expected_total + assert data["unique_events_count"] == expected_unique def test_get__pending_connection_with_events__flips_to_connected( @@ -858,112 +880,48 @@ def test_get__pending_connection_no_events__stays_pending( assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION -def test_get__snowflake_connection__skips_clickhouse_and_nulls_stats( +@pytest.mark.parametrize( + "warehouse_type, config, expected_status", + [ + (WarehouseType.FLAGSMITH, None, status.HTTP_200_OK), + ( + WarehouseType.SNOWFLAKE, + {"account_identifier": "xy12345.us-east-1"}, + status.HTTP_400_BAD_REQUEST, + ), + ], + ids=["flagsmith", "snowflake"], +) +def test_test_warehouse_connection__warehouse_type__expected_status( admin_client: APIClient, environment: Environment, enable_features: EnableFeaturesFixture, - warehouse_connection_url: str, - mocker: MockerFixture, + warehouse_type: str, + config: dict[str, str] | None, + expected_status: int, ) -> None: # Given enable_features("experimentation_warehouse_connection") - WarehouseConnection.objects.create( + connection = WarehouseConnection.objects.create( environment=environment, - warehouse_type=WarehouseType.SNOWFLAKE, - name="Snowflake Warehouse", - config={"account_identifier": "xy12345.us-east-1"}, - status=WarehouseConnectionStatus.CREATED, - ) - stats_spy = mocker.patch( - "experimentation.services.get_warehouse_event_stats", - ) - - # When - response = admin_client.get(warehouse_connection_url) - - # Then - data = response.json()[0] - assert data["total_events_received"] is None - assert data["unique_events_count"] is None - stats_spy.assert_not_called() - - -def test_get_detail__flagsmith_connection__includes_event_stats( - admin_client: APIClient, - environment: Environment, - enable_features: EnableFeaturesFixture, - warehouse_connection: WarehouseConnection, - mocker: MockerFixture, -) -> None: - # Given - enable_features("experimentation_warehouse_connection") - mocker.patch( - "experimentation.services.get_warehouse_event_stats", - return_value=WarehouseEventStats( - total_events_received=8, - unique_events_count=2, - ), + warehouse_type=warehouse_type, + name="Warehouse", + config=config, ) url = reverse( - "api-v1:environments:experimentation:warehouse-connections-detail", - args=[environment.api_key, warehouse_connection.id], - ) - - # When - response = admin_client.get(url) - - # Then - assert response.json()["total_events_received"] == 8 - assert response.json()["unique_events_count"] == 2 - - -def _test_warehouse_connection_url(environment: Environment, connection_id: int) -> str: - return reverse( "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", - args=[environment.api_key, connection_id], + args=[environment.api_key, connection.id], ) - -def test_test_warehouse_connection__created_flagsmith__returns_200_and_pending( - admin_client: APIClient, - environment: Environment, - enable_features: EnableFeaturesFixture, - warehouse_connection: WarehouseConnection, -) -> None: - # Given - enable_features("experimentation_warehouse_connection") - url = _test_warehouse_connection_url(environment, warehouse_connection.id) - # When response = admin_client.post(url, format="json") # Then - assert response.status_code == status.HTTP_200_OK - assert response.json()["status"] == "pending_connection" - warehouse_connection.refresh_from_db() - assert warehouse_connection.status == WarehouseConnectionStatus.PENDING_CONNECTION - - -def test_test_warehouse_connection__snowflake__returns_400( - admin_client: APIClient, - environment: Environment, - enable_features: EnableFeaturesFixture, -) -> None: - # Given - enable_features("experimentation_warehouse_connection") - connection = WarehouseConnection.objects.create( - environment=environment, - warehouse_type=WarehouseType.SNOWFLAKE, - name="Snowflake Warehouse", - config={"account_identifier": "xy12345.us-east-1"}, - ) - url = _test_warehouse_connection_url(environment, connection.id) - - # When - response = admin_client.post(url, format="json") - - # Then - assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.status_code == expected_status + if expected_status == status.HTTP_200_OK: + assert response.json()["status"] == "pending_connection" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION def test_test_warehouse_connection__already_connected__is_noop( @@ -979,7 +937,10 @@ def test_test_warehouse_connection__already_connected__is_noop( name="Flagsmith Warehouse", status=WarehouseConnectionStatus.CONNECTED, ) - url = _test_warehouse_connection_url(environment, connection.id) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) # When response = admin_client.post(url, format="json") @@ -997,7 +958,10 @@ def test_test_warehouse_connection__non_admin__returns_403( ) -> None: # Given enable_features("experimentation_warehouse_connection") - url = _test_warehouse_connection_url(environment, warehouse_connection.id) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, warehouse_connection.id], + ) # When response = staff_client.post(url, format="json") @@ -1026,8 +990,8 @@ def test_get__clickhouse_unconfigured__returns_200_without_stats( # Then assert response.status_code == status.HTTP_200_OK data = response.json()[0] - assert data["total_events_received"] is None - assert data["unique_events_count"] is None + assert data["total_events_received"] == 0 + assert data["unique_events_count"] == 0 stats_spy.assert_not_called() @@ -1058,11 +1022,9 @@ def test_get__clickhouse_errors__returns_200_without_stats_and_logs( # Then assert response.status_code == status.HTTP_200_OK data = response.json()[0] - assert data["total_events_received"] is None - assert data["unique_events_count"] is None + assert data["total_events_received"] == 0 + assert data["unique_events_count"] == 0 # connection stays pending (no flip on error) assert data["status"] == "pending_connection" # an actionable event was logged - assert any( - e["event"] == "connection.event_stats_unavailable" for e in log.events - ) + assert any(e["event"] == "connection.event_stats_unavailable" for e in log.events) From 3262f68c88850cb56e772871f89bef2265a6ea2e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 2 Jun 2026 14:04:24 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- api/experimentation/views.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/api/experimentation/views.py b/api/experimentation/views.py index 7b18c53533ba..78ca96b046df 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -84,16 +84,12 @@ def list(self, request: Request, *args: object, **kwargs: object) -> Response: def retrieve(self, request: Request, *args: object, **kwargs: object) -> Response: connection = self.get_object() - annotate_warehouse_event_stats( - connection, self.kwargs["environment_api_key"] - ) + annotate_warehouse_event_stats(connection, self.kwargs["environment_api_key"]) serializer = self.get_serializer(connection) return Response(serializer.data) @action(detail=True, methods=["post"], url_path="test-warehouse-connection") - def test_warehouse_connection( - self, request: Request, **kwargs: object - ) -> Response: + def test_warehouse_connection(self, request: Request, **kwargs: object) -> Response: connection: WarehouseConnection = self.get_object() if connection.warehouse_type != WarehouseType.FLAGSMITH: return Response( From 4fea97c8ce2100bcc7903a45e3f09c48b2d3f237 Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 2 Jun 2026 16:08:06 +0200 Subject: [PATCH 6/7] feat(experimentation): regenerated catalogue --- .../observability/_events-catalogue.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index d1b52a42db0c..e273ba8f23a8 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -445,16 +445,24 @@ Attributes: ### `warehouse.connection.connected` Logged at `info` from: - - `api/experimentation/services.py:186` + - `api/experimentation/services.py:182` Attributes: - `environment.id` - `organisation.id` +### `warehouse.connection.event_stats_unavailable` + +Logged at `exception` from: + - `api/experimentation/services.py:205` + +Attributes: + - `environment.id` + ### `warehouse.connection.test_event_sent` Logged at `info` from: - - `api/experimentation/services.py:164` + - `api/experimentation/services.py:162` Attributes: - `environment.id` From 69c60e0bed184de22979826cd604e43ea55d049b Mon Sep 17 00:00:00 2001 From: wadii Date: Tue, 2 Jun 2026 17:21:54 +0200 Subject: [PATCH 7/7] feat(experimentation): move connection status flip out of GET --- api/experimentation/serializers.py | 8 +- api/experimentation/services.py | 30 ++-- api/experimentation/views.py | 4 + .../unit/experimentation/test_services.py | 42 ++++- api/tests/unit/experimentation/test_views.py | 154 +++++++++++++----- .../observability/_events-catalogue.md | 12 +- 6 files changed, 177 insertions(+), 73 deletions(-) diff --git a/api/experimentation/serializers.py b/api/experimentation/serializers.py index 46309675ba84..d9bd3f0bde6c 100644 --- a/api/experimentation/serializers.py +++ b/api/experimentation/serializers.py @@ -69,13 +69,13 @@ def create( result: WarehouseConnection = super().create(validated_data) return result - def get_total_events_received(self, obj: WarehouseConnection) -> int: + def get_total_events_received(self, obj: WarehouseConnection) -> int | None: stats: WarehouseEventStats | None = obj.event_stats - return stats.total_events_received if stats else 0 + return stats.total_events_received if stats else None - def get_unique_events_count(self, obj: WarehouseConnection) -> int: + def get_unique_events_count(self, obj: WarehouseConnection) -> int | None: stats: WarehouseEventStats | None = obj.event_stats - return stats.unique_events_count if stats else 0 + return stats.unique_events_count if stats else None @staticmethod def _generate_name(warehouse_type: str, environment: Environment) -> str: diff --git a/api/experimentation/services.py b/api/experimentation/services.py index 934c005f4c77..f3cd247d4e1f 100644 --- a/api/experimentation/services.py +++ b/api/experimentation/services.py @@ -5,6 +5,7 @@ import structlog from clickhouse_driver import Client +from clickhouse_driver.util.helpers import parse_url from django.conf import settings from django.utils import timezone @@ -27,6 +28,9 @@ logger = structlog.get_logger("warehouse") +CLICKHOUSE_CONNECT_TIMEOUT_SECONDS = 5 +CLICKHOUSE_QUERY_TIMEOUT_SECONDS = 30 + def is_warehouse_feature_enabled(organisation: Organisation) -> bool: return get_openfeature_client().get_boolean_value( @@ -49,9 +53,13 @@ def _get_clickhouse_client() -> Client: """Build a clickhouse-driver client for the experimentation event store. The database is taken from the DSN path, so queries can reference the - `events` table unqualified. + `events` table unqualified. Connect and query timeouts are bounded unless the + DSN overrides them. """ - return Client.from_url(settings.EXPERIMENTATION_CLICKHOUSE_URL) + host, kwargs = parse_url(settings.EXPERIMENTATION_CLICKHOUSE_URL) + kwargs.setdefault("connect_timeout", CLICKHOUSE_CONNECT_TIMEOUT_SECONDS) + kwargs.setdefault("send_receive_timeout", CLICKHOUSE_QUERY_TIMEOUT_SECONDS) + return Client(host, **kwargs) def get_unique_event_names(environment_key: str) -> list[str]: @@ -158,7 +166,7 @@ def mark_warehouse_pending_connection( return connection connection.status = WarehouseConnectionStatus.PENDING_CONNECTION - connection.save() + connection.save(update_fields=["status"]) logger.info( "connection.test_event_sent", environment__id=connection.environment_id, @@ -178,7 +186,7 @@ def refresh_warehouse_connection_status( and stats.total_events_received > 0 ): connection.status = WarehouseConnectionStatus.CONNECTED - connection.save() + connection.save(update_fields=["status"]) logger.info( "connection.connected", environment__id=connection.environment_id, @@ -191,21 +199,15 @@ def annotate_warehouse_event_stats( connection: WarehouseConnection, environment_key: str, ) -> None: - """Attach warehouse event stats to a flagsmith connection and update its - status to match. No-op for non-flagsmith connections or when no warehouse - is configured; leaves the connection unchanged if the warehouse errors.""" + """Attach live warehouse event stats to a flagsmith connection. No-op for + non-flagsmith connections or when no warehouse is configured; leaves stats + unset when the warehouse is unreachable. Read-only: never changes status.""" if ( connection.warehouse_type != WarehouseType.FLAGSMITH or not settings.EXPERIMENTATION_CLICKHOUSE_URL ): return try: - stats = get_warehouse_event_stats(environment_key) + connection.event_stats = get_warehouse_event_stats(environment_key) except Exception: - logger.exception( - "connection.event_stats_unavailable", - environment__id=connection.environment_id, - ) return - connection.event_stats = stats - refresh_warehouse_connection_status(connection, stats) diff --git a/api/experimentation/views.py b/api/experimentation/views.py index 78ca96b046df..8615e6e88f1e 100644 --- a/api/experimentation/views.py +++ b/api/experimentation/views.py @@ -32,6 +32,7 @@ create_experiment_audit_log, create_warehouse_audit_log, mark_warehouse_pending_connection, + refresh_warehouse_connection_status, transition_experiment_status, ) from users.models import FFAdminUser @@ -97,6 +98,9 @@ def test_warehouse_connection(self, request: Request, **kwargs: object) -> Respo status=status.HTTP_400_BAD_REQUEST, ) mark_warehouse_pending_connection(connection) + annotate_warehouse_event_stats(connection, self.kwargs["environment_api_key"]) + if connection.event_stats is not None: + refresh_warehouse_connection_status(connection, connection.event_stats) serializer = self.get_serializer(connection) return Response(serializer.data) diff --git a/api/tests/unit/experimentation/test_services.py b/api/tests/unit/experimentation/test_services.py index a4b36bbffce4..966214a7a2b0 100644 --- a/api/tests/unit/experimentation/test_services.py +++ b/api/tests/unit/experimentation/test_services.py @@ -13,7 +13,7 @@ ) -def test_get_clickhouse_client__configured_url__builds_client_from_url( +def test_get_clickhouse_client__configured_url__builds_client_with_timeouts( mocker: MockerFixture, settings: SettingsWrapper, ) -> None: @@ -21,17 +21,49 @@ def test_get_clickhouse_client__configured_url__builds_client_from_url( settings.EXPERIMENTATION_CLICKHOUSE_URL = ( "clickhouse://user:pass@ch.example.com:9440/flagsmith_exp?secure=True" ) - mock_from_url = mocker.patch("experimentation.services.Client.from_url") + mock_client_cls = mocker.patch("experimentation.services.Client") services._get_clickhouse_client.cache_clear() # When client = services._get_clickhouse_client() # Then - mock_from_url.assert_called_once_with( - "clickhouse://user:pass@ch.example.com:9440/flagsmith_exp?secure=True", + mock_client_cls.assert_called_once_with( + "ch.example.com", + port=9440, + database="flagsmith_exp", + user="user", + password="pass", + secure=True, + connect_timeout=services.CLICKHOUSE_CONNECT_TIMEOUT_SECONDS, + send_receive_timeout=services.CLICKHOUSE_QUERY_TIMEOUT_SECONDS, + ) + assert client is mock_client_cls.return_value + services._get_clickhouse_client.cache_clear() + + +def test_get_clickhouse_client__dsn_timeouts__are_preserved( + mocker: MockerFixture, + settings: SettingsWrapper, +) -> None: + # Given + settings.EXPERIMENTATION_CLICKHOUSE_URL = ( + "clickhouse://ch.example.com:9000/db?connect_timeout=1&send_receive_timeout=2" + ) + mock_client_cls = mocker.patch("experimentation.services.Client") + services._get_clickhouse_client.cache_clear() + + # When + services._get_clickhouse_client() + + # Then + mock_client_cls.assert_called_once_with( + "ch.example.com", + port=9000, + database="db", + connect_timeout=1, + send_receive_timeout=2, ) - assert client is mock_from_url.return_value services._get_clickhouse_client.cache_clear() diff --git a/api/tests/unit/experimentation/test_views.py b/api/tests/unit/experimentation/test_views.py index 09743de08927..078a9faca9fa 100644 --- a/api/tests/unit/experimentation/test_views.py +++ b/api/tests/unit/experimentation/test_views.py @@ -2,7 +2,6 @@ from django.urls import reverse from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture -from pytest_structlog import StructuredLogCapture from rest_framework import status from rest_framework.test import APIClient @@ -781,8 +780,8 @@ def test_patch__exists__creates_audit_log( ( WarehouseType.SNOWFLAKE, {"account_identifier": "xy12345.us-east-1"}, - 0, - 0, + None, + None, ), ], ids=["flagsmith", "snowflake"], @@ -795,8 +794,8 @@ def test_get__warehouse_type__returns_expected_event_stats( mocker: MockerFixture, warehouse_type: str, config: dict[str, str] | None, - expected_total: int, - expected_unique: int, + expected_total: int | None, + expected_unique: int | None, ) -> None: # Given enable_features("experimentation_warehouse_connection") @@ -824,7 +823,7 @@ def test_get__warehouse_type__returns_expected_event_stats( assert data["unique_events_count"] == expected_unique -def test_get__pending_connection_with_events__flips_to_connected( +def test_get__pending_connection_with_events__shows_stats_but_does_not_flip( admin_client: APIClient, environment: Environment, enable_features: EnableFeaturesFixture, @@ -851,31 +850,11 @@ def test_get__pending_connection_with_events__flips_to_connected( response = admin_client.get(warehouse_connection_url) # Then - assert response.json()[0]["status"] == "connected" - connection.refresh_from_db() - assert connection.status == WarehouseConnectionStatus.CONNECTED - - -def test_get__pending_connection_no_events__stays_pending( - admin_client: APIClient, - environment: Environment, - enable_features: EnableFeaturesFixture, - warehouse_connection_url: str, -) -> None: - # Given (autouse mock returns 0 events) - enable_features("experimentation_warehouse_connection") - connection = WarehouseConnection.objects.create( - environment=environment, - warehouse_type=WarehouseType.FLAGSMITH, - name="Flagsmith Warehouse", - status=WarehouseConnectionStatus.PENDING_CONNECTION, - ) - - # When - response = admin_client.get(warehouse_connection_url) - - # Then - assert response.json()[0]["status"] == "pending_connection" + data = response.json()[0] + assert data["total_events_received"] == 1 + assert data["unique_events_count"] == 1 + # GET is read-only: the flip happens on the POST endpoint, not here. + assert data["status"] == "pending_connection" connection.refresh_from_db() assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION @@ -924,6 +903,104 @@ def test_test_warehouse_connection__warehouse_type__expected_status( assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION +def test_test_warehouse_connection__pending_with_events__flips_to_connected( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + return_value=WarehouseEventStats( + total_events_received=1, + unique_events_count=1, + ), + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "connected" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.CONNECTED + + +def test_test_warehouse_connection__pending_no_events__stays_pending( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given (autouse mock returns 0 events) + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "pending_connection" + connection.refresh_from_db() + assert connection.status == WarehouseConnectionStatus.PENDING_CONNECTION + + +def test_test_warehouse_connection__clickhouse_unreachable__stays_pending( + admin_client: APIClient, + environment: Environment, + enable_features: EnableFeaturesFixture, + mocker: MockerFixture, +) -> None: + # Given + enable_features("experimentation_warehouse_connection") + connection = WarehouseConnection.objects.create( + environment=environment, + warehouse_type=WarehouseType.FLAGSMITH, + name="Flagsmith Warehouse", + status=WarehouseConnectionStatus.PENDING_CONNECTION, + ) + mocker.patch( + "experimentation.services.get_warehouse_event_stats", + side_effect=OSError("clickhouse unreachable"), + ) + url = reverse( + "api-v1:environments:experimentation:warehouse-connections-test-warehouse-connection", + args=[environment.api_key, connection.id], + ) + + # When + response = admin_client.post(url, format="json") + + # Then + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["status"] == "pending_connection" + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + + def test_test_warehouse_connection__already_connected__is_noop( admin_client: APIClient, environment: Environment, @@ -990,18 +1067,17 @@ def test_get__clickhouse_unconfigured__returns_200_without_stats( # Then assert response.status_code == status.HTTP_200_OK data = response.json()[0] - assert data["total_events_received"] == 0 - assert data["unique_events_count"] == 0 + assert data["total_events_received"] is None + assert data["unique_events_count"] is None stats_spy.assert_not_called() -def test_get__clickhouse_errors__returns_200_without_stats_and_logs( +def test_get__clickhouse_errors__returns_200_without_stats( admin_client: APIClient, environment: Environment, enable_features: EnableFeaturesFixture, warehouse_connection_url: str, mocker: MockerFixture, - log: StructuredLogCapture, ) -> None: # Given enable_features("experimentation_warehouse_connection") @@ -1022,9 +1098,7 @@ def test_get__clickhouse_errors__returns_200_without_stats_and_logs( # Then assert response.status_code == status.HTTP_200_OK data = response.json()[0] - assert data["total_events_received"] == 0 - assert data["unique_events_count"] == 0 - # connection stays pending (no flip on error) + assert data["total_events_received"] is None + assert data["unique_events_count"] is None + # connection stays pending (GET never writes) assert data["status"] == "pending_connection" - # an actionable event was logged - assert any(e["event"] == "connection.event_stats_unavailable" for e in log.events) diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index e273ba8f23a8..3c7df584962c 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -445,24 +445,16 @@ Attributes: ### `warehouse.connection.connected` Logged at `info` from: - - `api/experimentation/services.py:182` + - `api/experimentation/services.py:190` Attributes: - `environment.id` - `organisation.id` -### `warehouse.connection.event_stats_unavailable` - -Logged at `exception` from: - - `api/experimentation/services.py:205` - -Attributes: - - `environment.id` - ### `warehouse.connection.test_event_sent` Logged at `info` from: - - `api/experimentation/services.py:162` + - `api/experimentation/services.py:170` Attributes: - `environment.id`