Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/clickhouse/migrations/0002_identities_add_is_deleted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.db import migrations

_ADD_COLUMN_DDL = (
"ALTER TABLE IDENTITIES ADD COLUMN IF NOT EXISTS is_deleted Bool DEFAULT false"
)


class Migration(migrations.Migration):
# ClickHouse has no transactional DDL.
atomic = False
dependencies = [("clickhouse", "0001_create_identities")]
operations = [
migrations.RunSQL(
_ADD_COLUMN_DDL,
reverse_sql=("ALTER TABLE IDENTITIES DROP COLUMN IF EXISTS is_deleted"),
)
]
13 changes: 13 additions & 0 deletions api/edge_api/identities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import typing
from contextlib import suppress

from django.conf import settings
from django.db.models import Prefetch, Q

from api_keys.user import APIKeyUser
Expand Down Expand Up @@ -194,6 +195,18 @@ def delete(self, user: FFAdminUser | APIKeyUser = None) -> None: # type: ignore
user=user,
)
self._reset_initial_state() # type: ignore[no-untyped-call]
if settings.CLICKHOUSE_ENABLED:
from segment_membership.tasks import (
write_identity_deletion_tombstone_to_clickhouse,
)

write_identity_deletion_tombstone_to_clickhouse.delay(
args=(
self.engine_identity_model.environment_api_key,
self.engine_identity_model.identifier,
self.engine_identity_model.composite_key,
)
)

def synchronise_features(self, valid_feature_names: typing.Collection[str]) -> None:
identity_feature_names = {
Expand Down
9 changes: 6 additions & 3 deletions api/segment_membership/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

from flagsmith_schemas import dynamodb

# (environment_id, identifier, identity_key, traits)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None]
# (environment_id, identifier, identity_key, traits, is_deleted)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None, bool]


def map_identity_document_to_clickhouse_row(
env_key: str,
identity_doc: dynamodb.Identity,
*,
is_deleted: bool = False,
) -> ClickHouseIdentityRow:
"""Project a Dynamo identity document onto an IDENTITIES row tuple
`(environment_id, identifier, identity_key, traits)`."""
`(environment_id, identifier, identity_key, traits, is_deleted)`."""
identifier = identity_doc["identifier"]
composite_key = identity_doc["composite_key"]
raw_traits = identity_doc.get("identity_traits")
Expand All @@ -21,6 +23,7 @@ def map_identity_document_to_clickhouse_row(
identifier,
composite_key,
traits,
is_deleted,
)


Expand Down
4 changes: 3 additions & 1 deletion api/segment_membership/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def compute_segment_counts_for_project(
f"SELECT {seg.id} AS segment_id, "
f"i.environment_id AS env_key, count() AS c "
f"FROM IDENTITIES AS i FINAL "
f"WHERE i.environment_id IN %(env_keys)s AND ({predicate}) "
f"WHERE i.environment_id IN %(env_keys)s "
f"AND i.is_deleted = false "
f"AND ({predicate}) "
f"GROUP BY i.environment_id"
)

Expand Down
38 changes: 38 additions & 0 deletions api/segment_membership/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"identifier",
"identity_key",
"traits",
"is_deleted",
)

_INSERT_IDENTITIES_SQL = (
Expand Down Expand Up @@ -188,3 +189,40 @@ def refresh_project_segment_counts(project_id: int) -> None:
membership_counts__count=len(membership_counts),
stale_counts__count=stale_deleted,
)


@register_task_handler()
def write_identity_deletion_tombstone_to_clickhouse(
env_key: str,
identifier: str,
identity_key: str,
) -> None:
"""Insert a tombstone row for a deleted identity so it is excluded from
segment membership counts at the next refresh.

ReplacingMergeTree(inserted_at) keeps the row with the highest
inserted_at per (environment_id, identifier). Because this row is
written after the identity is removed from Dynamo its inserted_at
will be newer than any prior live row, so FINAL deduplication will
always surface the tombstone.
"""
if not settings.CLICKHOUSE_ENABLED:
logger.info(
"tombstone.skipped",
reason="clickhouse_not_configured",
env_key=env_key,
identifier=identifier,
)
return

log_comment = f"flagsmith:segment_membership:tombstone:env_{env_key}"
with open_clickhouse_cursor(log_comment=log_comment) as cursor:
cursor.executemany(
_INSERT_IDENTITIES_SQL,
[(env_key, identifier, identity_key, None, True)],
)
logger.info(
"tombstone.written",
env_key=env_key,
identifier=identifier,
)
51 changes: 51 additions & 0 deletions api/tests/unit/edge_api/identities/test_edge_identity_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.utils import timezone
from freezegun import freeze_time
from pytest_django import DjangoAssertNumQueries
from pytest_django.fixtures import SettingsWrapper
from pytest_lazyfixture import lazy_fixture # type: ignore[import-untyped]
from pytest_mock import MockerFixture

Expand Down Expand Up @@ -509,6 +510,56 @@ def test_save__feature_override_updated__generates_audit_records(
)


def test_edge_identity_delete__clickhouse_enabled__dispatches_tombstone_task(
mocker: MockerFixture,
edge_identity_model: EdgeIdentity,
edge_identity_dynamo_wrapper_mock: MagicMock,
settings: SettingsWrapper,
) -> None:
# Given
settings.CLICKHOUSE_ENABLED = True
mock_tombstone_task = mocker.MagicMock()
mocker.patch(
"segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse",
mock_tombstone_task,
)

# When
edge_identity_model.delete()

# Then
edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once()
mock_tombstone_task.delay.assert_called_once_with(
args=(
edge_identity_model.environment_api_key,
edge_identity_model.identifier,
edge_identity_model.engine_identity_model.composite_key,
)
)


def test_edge_identity_delete__clickhouse_disabled__no_tombstone_dispatched(
mocker: MockerFixture,
edge_identity_model: EdgeIdentity,
edge_identity_dynamo_wrapper_mock: MagicMock,
settings: SettingsWrapper,
) -> None:
# Given
settings.CLICKHOUSE_ENABLED = False
mock_tombstone_task = mocker.MagicMock()
mocker.patch(
"segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse",
mock_tombstone_task,
)

# When
edge_identity_model.delete()

# Then
edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once()
mock_tombstone_task.delay.assert_not_called()


def test_get_all_feature_states__post_v2_versioning_migration__returns_latest_overrides(
environment: Environment,
feature: Feature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{"trait_key": "plan", "trait_value": "growth"},
],
},
("env-key", "alice", "env_x_alice", {"plan": "growth"}),
("env-key", "alice", "env_x_alice", {"plan": "growth"}, False),
id="single string trait",
),
pytest.param(
Expand All @@ -34,7 +34,7 @@
"created_date": "2026-05-08T00:00:00Z",
"identity_traits": [],
},
("env-key", "alice", "env_x_alice", None),
("env-key", "alice", "env_x_alice", None, False),
id="empty traits collapse to NULL",
),
pytest.param(
Expand All @@ -48,7 +48,7 @@
{"trait_key": "age", "trait_value": Decimal("18")},
],
},
("env-key", "alice", "env_x_alice", {"age": 18}),
("env-key", "alice", "env_x_alice", {"age": 18}, False),
id="whole-number Decimal narrows to int",
),
pytest.param(
Expand All @@ -62,7 +62,7 @@
{"trait_key": "score", "trait_value": Decimal("1.5")},
],
},
("env-key", "alice", "env_x_alice", {"score": 1.5}),
("env-key", "alice", "env_x_alice", {"score": 1.5}, False),
id="fractional Decimal narrows to float",
),
pytest.param(
Expand All @@ -82,16 +82,35 @@
"alice",
"env_x_alice",
{"plan": "growth", "team": "alpha"},
False,
),
id="multiple traits flatten to a single dict",
),
],
)
def test_map_identity_document_to_clickhouse_row__cases__return_expected(
doc: DynamoIdentity,
expected: tuple[str, str, str, dict[str, object] | None],
expected: tuple[str, str, str, dict[str, object] | None, bool],
) -> None:
# Given a Dynamo identity document
# When mapped onto an IDENTITIES row
# Then it lines up positionally with the IDENTITIES schema
assert map_identity_document_to_clickhouse_row("env-key", doc) == expected


def test_map_identity_document_to_clickhouse_row__is_deleted_true__sets_flag() -> None:
# Given a Dynamo identity document and is_deleted=True
doc: DynamoIdentity = {
"identity_uuid": UUID_A,
"identifier": "alice",
"environment_api_key": "env-key",
"composite_key": "env_x_alice",
"created_date": "2026-05-08T00:00:00Z",
"identity_traits": [],
}

# When mapped with is_deleted=True
result = map_identity_document_to_clickhouse_row("env-key", doc, is_deleted=True)

# Then the flag is set in the returned tuple
assert result == ("env-key", "alice", "env_x_alice", None, True)
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_backfill_identities_to_clickhouse__happy_path__bulk_inserts(
sql, rows_arg = cursor.executemany.call_args.args
assert sql == (
"INSERT INTO IDENTITIES "
"(environment_id, identifier, identity_key, traits) VALUES"
"(environment_id, identifier, identity_key, traits, is_deleted) VALUES"
)
assert {row[0] for row in rows_arg} == {environment.api_key}
assert {row[1] for row in rows_arg} == {"a", "b"}
Expand Down Expand Up @@ -356,3 +356,58 @@ def test_refresh_project_segment_counts__never_matched_pair__no_row_written(
assert not SegmentMembershipCount.objects.filter(
segment=segment, environment=environment
).exists()


def test_write_identity_deletion_tombstone_to_clickhouse__clickhouse_disabled__skips(
mocker: MockerFixture,
settings: SettingsWrapper,
log: StructuredLogCapture,
) -> None:
# Given
settings.CLICKHOUSE_ENABLED = False
spy = mocker.patch.object(tasks, "open_clickhouse_cursor")

# When
tasks.write_identity_deletion_tombstone_to_clickhouse(
env_key="env-abc",
identifier="alice",
identity_key="env-abc_alice",
)

# Then
spy.assert_not_called()
assert any(e["event"] == "tombstone.skipped" for e in log.events)


def test_write_identity_deletion_tombstone_to_clickhouse__clickhouse_enabled__writes_tombstone(
mocker: MockerFixture,
settings: SettingsWrapper,
log: StructuredLogCapture,
) -> None:
# Given
settings.CLICKHOUSE_ENABLED = True
cursor = MagicMock()
open_cursor = mocker.patch.object(tasks, "open_clickhouse_cursor")
open_cursor.return_value.__enter__.return_value = cursor

# When
tasks.write_identity_deletion_tombstone_to_clickhouse(
env_key="env-abc",
identifier="alice",
identity_key="env-abc_alice",
)

# Then — exactly one INSERT with is_deleted=True
sql, rows_arg = cursor.executemany.call_args.args
assert sql == (
"INSERT INTO IDENTITIES "
"(environment_id, identifier, identity_key, traits, is_deleted) VALUES"
)
assert len(rows_arg) == 1
row = rows_arg[0]
assert row[0] == "env-abc" # environment_id
assert row[1] == "alice" # identifier
assert row[2] == "env-abc_alice" # identity_key
assert row[3] is None # traits — NULL for tombstone
assert row[4] is True # is_deleted
assert any(e["event"] == "tombstone.written" for e in log.events)
Loading
Loading