Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/experimentation/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass(frozen=True)
class WarehouseEventStats:
total_events_received: int
unique_events_count: int
9 changes: 9 additions & 0 deletions api/experimentation/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import typing

from django.db import models
from django.db.models import Q
from django_lifecycle import ( # type: ignore[import-untyped]
Expand All @@ -14,6 +16,9 @@
delete_environment_key_from_ingestion,
)

if typing.TYPE_CHECKING:
from experimentation.dataclasses import WarehouseEventStats


class WarehouseType(models.TextChoices):
FLAGSMITH = "flagsmith", "Flagsmith"
Expand Down Expand Up @@ -49,6 +54,10 @@ class WarehouseConnection(LifecycleModelMixin, SoftDeleteExportableModel): # ty
)
created_at = models.DateTimeField(auto_now_add=True)

# Populated at serialization time for flagsmith connections from ClickHouse;
# never persisted to the database.
event_stats: "WarehouseEventStats | None" = None

class Meta:
constraints = [
models.UniqueConstraint(
Expand Down
22 changes: 21 additions & 1 deletion api/experimentation/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from rest_framework import serializers

from environments.models import Environment
from experimentation.dataclasses import WarehouseEventStats
from experimentation.models import (
Experiment,
WarehouseConnection,
Expand All @@ -17,10 +18,21 @@
class WarehouseConnectionSerializer(serializers.ModelSerializer): # type: ignore[type-arg]
name = serializers.CharField(max_length=255, required=False)
config = serializers.JSONField(default=None, required=False, allow_null=True)
total_events_received = serializers.SerializerMethodField()
unique_events_count = serializers.SerializerMethodField()

class Meta:
model = WarehouseConnection
fields = ("id", "warehouse_type", "status", "name", "config", "created_at")
fields = (
"id",
"warehouse_type",
"status",
"name",
"config",
"created_at",
"total_events_received",
"unique_events_count",
)
read_only_fields = ("id", "status", "created_at")

def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -57,6 +69,14 @@ def create(
result: WarehouseConnection = super().create(validated_data)
return result

def get_total_events_received(self, obj: WarehouseConnection) -> int:
stats: WarehouseEventStats | None = obj.event_stats
return stats.total_events_received if stats else 0

def get_unique_events_count(self, obj: WarehouseConnection) -> int:
stats: WarehouseEventStats | None = obj.event_stats
return stats.unique_events_count if stats else 0

@staticmethod
def _generate_name(warehouse_type: str, environment: Environment) -> str:
label = WarehouseType(warehouse_type).label
Expand Down
88 changes: 86 additions & 2 deletions api/experimentation/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@
import typing
from functools import lru_cache

import structlog
from clickhouse_driver import Client
from django.conf import settings
from django.utils import timezone

from audit.models import AuditLog
from audit.related_object_type import RelatedObjectType
from experimentation.constants import EXPERIMENT_FLAG, WAREHOUSE_CONNECTION_FLAG
from experimentation.dataclasses import WarehouseEventStats
from experimentation.models import (
VALID_STATUS_TRANSITIONS,
ExperimentStatus,
WarehouseConnectionStatus,
WarehouseType,
)
from integrations.flagsmith.client import get_openfeature_client

if typing.TYPE_CHECKING:
from experimentation.models import Experiment, WarehouseConnection
from organisations.models import Organisation
from users.models import FFAdminUser

logger = structlog.get_logger("warehouse")


def is_warehouse_feature_enabled(organisation: Organisation) -> bool:
return get_openfeature_client().get_boolean_value(
Expand Down Expand Up @@ -56,6 +66,20 @@ def get_unique_event_names(environment_key: str) -> list[str]:
return [row[0] for row in rows]


def get_warehouse_event_stats(environment_key: str) -> WarehouseEventStats:
"""Return event counts recorded for `environment_key` in the warehouse."""
rows = _get_clickhouse_client().execute(
"SELECT count() AS total, uniqExact(event) AS unique "
"FROM events WHERE environment_key = %(environment_key)s",
{"environment_key": environment_key},
)
total, unique = rows[0] if rows else (0, 0)
return WarehouseEventStats(
total_events_received=int(total),
unique_events_count=int(unique),
)


def _resolve_audit_log_author(
user: FFAdminUser,
) -> dict[str, int | None]:
Expand Down Expand Up @@ -107,8 +131,6 @@ def transition_experiment_status(
target_status: str,
user: FFAdminUser,
) -> Experiment:
from experimentation.models import VALID_STATUS_TRANSITIONS, ExperimentStatus

valid_targets = VALID_STATUS_TRANSITIONS.get(experiment.status, set())
if target_status not in valid_targets:
raise ValueError(
Expand All @@ -125,3 +147,65 @@ def transition_experiment_status(
experiment.save()
create_experiment_audit_log(experiment, user, action=target_status)
return experiment


def mark_warehouse_pending_connection(
connection: WarehouseConnection,
) -> WarehouseConnection:
"""Move a connection from created to pending_connection. No-op for any
other status."""
if connection.status != WarehouseConnectionStatus.CREATED:
return connection

connection.status = WarehouseConnectionStatus.PENDING_CONNECTION
connection.save()
Comment thread
Zaimwa9 marked this conversation as resolved.
Outdated
logger.info(
"connection.test_event_sent",
environment__id=connection.environment_id,
organisation__id=connection.environment.project.organisation_id,
)
return connection


def refresh_warehouse_connection_status(
connection: WarehouseConnection,
stats: WarehouseEventStats,
) -> WarehouseConnection:
"""Set a pending connection to connected when the warehouse has received at
least one event. No-op otherwise."""
if (
connection.status == WarehouseConnectionStatus.PENDING_CONNECTION
and stats.total_events_received > 0
):
connection.status = WarehouseConnectionStatus.CONNECTED
connection.save()
Comment thread
Zaimwa9 marked this conversation as resolved.
Outdated
logger.info(
"connection.connected",
environment__id=connection.environment_id,
organisation__id=connection.environment.project.organisation_id,
)
return connection


def annotate_warehouse_event_stats(
connection: WarehouseConnection,
environment_key: str,
) -> None:
"""Attach warehouse event stats to a flagsmith connection and update its
status to match. No-op for non-flagsmith connections or when no warehouse
is configured; leaves the connection unchanged if the warehouse errors."""
if (
connection.warehouse_type != WarehouseType.FLAGSMITH
or not settings.EXPERIMENTATION_CLICKHOUSE_URL
):
return
try:
stats = get_warehouse_event_stats(environment_key)
except Exception:
logger.exception(
"connection.event_stats_unavailable",
environment__id=connection.environment_id,
)
return
Comment thread
Zaimwa9 marked this conversation as resolved.
connection.event_stats = stats
refresh_warehouse_connection_status(connection, stats)
29 changes: 29 additions & 0 deletions api/experimentation/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Experiment,
ExperimentStatus,
WarehouseConnection,
WarehouseType,
)
from experimentation.permissions import (
ExperimentPermission,
Expand All @@ -27,8 +28,10 @@
WarehouseConnectionSerializer,
)
from experimentation.services import (
annotate_warehouse_event_stats,
create_experiment_audit_log,
create_warehouse_audit_log,
mark_warehouse_pending_connection,
transition_experiment_status,
)
from users.models import FFAdminUser
Expand Down Expand Up @@ -71,6 +74,32 @@ def perform_destroy(self, instance: WarehouseConnection) -> None:
)
instance.delete()

def list(self, request: Request, *args: object, **kwargs: object) -> Response:
environment_api_key: str = self.kwargs["environment_api_key"]
connections = list(self.filter_queryset(self.get_queryset()))
for connection in connections:
annotate_warehouse_event_stats(connection, environment_api_key)
Comment thread
Zaimwa9 marked this conversation as resolved.
serializer = self.get_serializer(connections, many=True)
return Response(serializer.data)

def retrieve(self, request: Request, *args: object, **kwargs: object) -> Response:
connection = self.get_object()
annotate_warehouse_event_stats(connection, self.kwargs["environment_api_key"])
serializer = self.get_serializer(connection)
return Response(serializer.data)

@action(detail=True, methods=["post"], url_path="test-warehouse-connection")
def test_warehouse_connection(self, request: Request, **kwargs: object) -> Response:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes so this endpoint does 2 things:

  1. On the first call, it switches created -> pending_connection
  2. It is then polled from the warehouse page, if it has some events, it switches from pending_connection to connected

I'm not entirely satisfied but using a task would have been overkilled to me (especially while we are still likely to change the flush buffer).

I think for external warehouse this can be replaced with a sort of SELECT 1 in the external warehouse. For the flagsmith hosted one, this would always work so listening to an event on this specific environment is the real source of truth

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think polling is probably a bad idea since clickhouse calls are expensive

connection: WarehouseConnection = self.get_object()
if connection.warehouse_type != WarehouseType.FLAGSMITH:
return Response(
{"detail": "Test events are only supported for Flagsmith warehouses."},
status=status.HTTP_400_BAD_REQUEST,
)
mark_warehouse_pending_connection(connection)
serializer = self.get_serializer(connection)
return Response(serializer.data)

def create(self, request: Request, *args: object, **kwargs: object) -> Response:
environment = self._get_environment()
serializer = self.get_serializer(data=request.data)
Expand Down
21 changes: 21 additions & 0 deletions api/tests/unit/experimentation/test_serializers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest

from environments.models import Environment
from experimentation.dataclasses import WarehouseEventStats
from experimentation.models import (
WarehouseConnection,
WarehouseConnectionStatus,
Expand Down Expand Up @@ -130,3 +131,23 @@ def test_create__flagsmith_with_config__raises_validation_error(
# When / Then
assert not serializer.is_valid()
assert "config" in serializer.errors


def test_warehouse_serializer__event_stats_attached__serializes_counts() -> None:
# Given
connection = WarehouseConnection(
warehouse_type=WarehouseType.FLAGSMITH,
name="Flagsmith Warehouse",
status=WarehouseConnectionStatus.CONNECTED,
)
connection.event_stats = WarehouseEventStats(
total_events_received=7,
unique_events_count=2,
)

# When
data = WarehouseConnectionSerializer(connection).data

# Then
assert data["total_events_received"] == 7
assert data["unique_events_count"] == 2
Loading
Loading