diff --git a/api/clickhouse/migrations/0002_identities_add_is_deleted.py b/api/clickhouse/migrations/0002_identities_add_is_deleted.py new file mode 100644 index 000000000000..fdb16e4b887b --- /dev/null +++ b/api/clickhouse/migrations/0002_identities_add_is_deleted.py @@ -0,0 +1,17 @@ +from django.db import migrations + +_ADD_COLUMN_DDL = ( + "ALTER TABLE IDENTITIES ADD COLUMN IF NOT EXISTS is_deleted Bool DEFAULT false" +) + + +class Migration(migrations.Migration): + # ClickHouse has no transactional DDL. + atomic = False + dependencies = [("clickhouse", "0001_create_identities")] + operations = [ + migrations.RunSQL( + _ADD_COLUMN_DDL, + reverse_sql=("ALTER TABLE IDENTITIES DROP COLUMN IF EXISTS is_deleted"), + ) + ] diff --git a/api/edge_api/identities/models.py b/api/edge_api/identities/models.py index 718bf78bde55..11fecd27a87d 100644 --- a/api/edge_api/identities/models.py +++ b/api/edge_api/identities/models.py @@ -2,6 +2,7 @@ import typing from contextlib import suppress +from django.conf import settings from django.db.models import Prefetch, Q from api_keys.user import APIKeyUser @@ -194,6 +195,18 @@ def delete(self, user: FFAdminUser | APIKeyUser = None) -> None: # type: ignore user=user, ) self._reset_initial_state() # type: ignore[no-untyped-call] + if settings.CLICKHOUSE_ENABLED: + from segment_membership.tasks import ( + write_identity_deletion_tombstone_to_clickhouse, + ) + + write_identity_deletion_tombstone_to_clickhouse.delay( + args=( + self.engine_identity_model.environment_api_key, + self.engine_identity_model.identifier, + self.engine_identity_model.composite_key, + ) + ) def synchronise_features(self, valid_feature_names: typing.Collection[str]) -> None: identity_feature_names = { diff --git a/api/segment_membership/mappers.py b/api/segment_membership/mappers.py index 9ee572db4841..ba3675914b4d 100644 --- a/api/segment_membership/mappers.py +++ b/api/segment_membership/mappers.py @@ -2,16 +2,18 @@ from flagsmith_schemas import dynamodb -# (environment_id, identifier, identity_key, traits) -ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None] +# (environment_id, identifier, identity_key, traits, is_deleted) +ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None, bool] def map_identity_document_to_clickhouse_row( env_key: str, identity_doc: dynamodb.Identity, + *, + is_deleted: bool = False, ) -> ClickHouseIdentityRow: """Project a Dynamo identity document onto an IDENTITIES row tuple - `(environment_id, identifier, identity_key, traits)`.""" + `(environment_id, identifier, identity_key, traits, is_deleted)`.""" identifier = identity_doc["identifier"] composite_key = identity_doc["composite_key"] raw_traits = identity_doc.get("identity_traits") @@ -21,6 +23,7 @@ def map_identity_document_to_clickhouse_row( identifier, composite_key, traits, + is_deleted, ) diff --git a/api/segment_membership/services.py b/api/segment_membership/services.py index 9be17c1baee9..59d765e52b76 100644 --- a/api/segment_membership/services.py +++ b/api/segment_membership/services.py @@ -104,7 +104,9 @@ def compute_segment_counts_for_project( f"SELECT {seg.id} AS segment_id, " f"i.environment_id AS env_key, count() AS c " f"FROM IDENTITIES AS i FINAL " - f"WHERE i.environment_id IN %(env_keys)s AND ({predicate}) " + f"WHERE i.environment_id IN %(env_keys)s " + f"AND i.is_deleted = false " + f"AND ({predicate}) " f"GROUP BY i.environment_id" ) diff --git a/api/segment_membership/tasks.py b/api/segment_membership/tasks.py index 704ef7782ab6..9eb65c1eef7e 100644 --- a/api/segment_membership/tasks.py +++ b/api/segment_membership/tasks.py @@ -45,6 +45,7 @@ "identifier", "identity_key", "traits", + "is_deleted", ) _INSERT_IDENTITIES_SQL = ( @@ -188,3 +189,40 @@ def refresh_project_segment_counts(project_id: int) -> None: membership_counts__count=len(membership_counts), stale_counts__count=stale_deleted, ) + + +@register_task_handler() +def write_identity_deletion_tombstone_to_clickhouse( + env_key: str, + identifier: str, + identity_key: str, +) -> None: + """Insert a tombstone row for a deleted identity so it is excluded from + segment membership counts at the next refresh. + + ReplacingMergeTree(inserted_at) keeps the row with the highest + inserted_at per (environment_id, identifier). Because this row is + written after the identity is removed from Dynamo its inserted_at + will be newer than any prior live row, so FINAL deduplication will + always surface the tombstone. + """ + if not settings.CLICKHOUSE_ENABLED: + logger.info( + "tombstone.skipped", + reason="clickhouse_not_configured", + env_key=env_key, + identifier=identifier, + ) + return + + log_comment = f"flagsmith:segment_membership:tombstone:env_{env_key}" + with open_clickhouse_cursor(log_comment=log_comment) as cursor: + cursor.executemany( + _INSERT_IDENTITIES_SQL, + [(env_key, identifier, identity_key, None, True)], + ) + logger.info( + "tombstone.written", + env_key=env_key, + identifier=identifier, + ) diff --git a/api/tests/unit/edge_api/identities/test_edge_identity_models.py b/api/tests/unit/edge_api/identities/test_edge_identity_models.py index 64c7740c44a8..d4510da7aac5 100644 --- a/api/tests/unit/edge_api/identities/test_edge_identity_models.py +++ b/api/tests/unit/edge_api/identities/test_edge_identity_models.py @@ -6,6 +6,7 @@ from django.utils import timezone from freezegun import freeze_time from pytest_django import DjangoAssertNumQueries +from pytest_django.fixtures import SettingsWrapper from pytest_lazyfixture import lazy_fixture # type: ignore[import-untyped] from pytest_mock import MockerFixture @@ -509,6 +510,56 @@ def test_save__feature_override_updated__generates_audit_records( ) +def test_edge_identity_delete__clickhouse_enabled__dispatches_tombstone_task( + mocker: MockerFixture, + edge_identity_model: EdgeIdentity, + edge_identity_dynamo_wrapper_mock: MagicMock, + settings: SettingsWrapper, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = True + mock_tombstone_task = mocker.MagicMock() + mocker.patch( + "segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse", + mock_tombstone_task, + ) + + # When + edge_identity_model.delete() + + # Then + edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once() + mock_tombstone_task.delay.assert_called_once_with( + args=( + edge_identity_model.environment_api_key, + edge_identity_model.identifier, + edge_identity_model.engine_identity_model.composite_key, + ) + ) + + +def test_edge_identity_delete__clickhouse_disabled__no_tombstone_dispatched( + mocker: MockerFixture, + edge_identity_model: EdgeIdentity, + edge_identity_dynamo_wrapper_mock: MagicMock, + settings: SettingsWrapper, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = False + mock_tombstone_task = mocker.MagicMock() + mocker.patch( + "segment_membership.tasks.write_identity_deletion_tombstone_to_clickhouse", + mock_tombstone_task, + ) + + # When + edge_identity_model.delete() + + # Then + edge_identity_dynamo_wrapper_mock.delete_item.assert_called_once() + mock_tombstone_task.delay.assert_not_called() + + def test_get_all_feature_states__post_v2_versioning_migration__returns_latest_overrides( environment: Environment, feature: Feature, diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py index bd3cf464a27e..d515238191ee 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py @@ -22,7 +22,7 @@ {"trait_key": "plan", "trait_value": "growth"}, ], }, - ("env-key", "alice", "env_x_alice", {"plan": "growth"}), + ("env-key", "alice", "env_x_alice", {"plan": "growth"}, False), id="single string trait", ), pytest.param( @@ -34,7 +34,7 @@ "created_date": "2026-05-08T00:00:00Z", "identity_traits": [], }, - ("env-key", "alice", "env_x_alice", None), + ("env-key", "alice", "env_x_alice", None, False), id="empty traits collapse to NULL", ), pytest.param( @@ -48,7 +48,7 @@ {"trait_key": "age", "trait_value": Decimal("18")}, ], }, - ("env-key", "alice", "env_x_alice", {"age": 18}), + ("env-key", "alice", "env_x_alice", {"age": 18}, False), id="whole-number Decimal narrows to int", ), pytest.param( @@ -62,7 +62,7 @@ {"trait_key": "score", "trait_value": Decimal("1.5")}, ], }, - ("env-key", "alice", "env_x_alice", {"score": 1.5}), + ("env-key", "alice", "env_x_alice", {"score": 1.5}, False), id="fractional Decimal narrows to float", ), pytest.param( @@ -82,6 +82,7 @@ "alice", "env_x_alice", {"plan": "growth", "team": "alpha"}, + False, ), id="multiple traits flatten to a single dict", ), @@ -89,9 +90,27 @@ ) def test_map_identity_document_to_clickhouse_row__cases__return_expected( doc: DynamoIdentity, - expected: tuple[str, str, str, dict[str, object] | None], + expected: tuple[str, str, str, dict[str, object] | None, bool], ) -> None: # Given a Dynamo identity document # When mapped onto an IDENTITIES row # Then it lines up positionally with the IDENTITIES schema assert map_identity_document_to_clickhouse_row("env-key", doc) == expected + + +def test_map_identity_document_to_clickhouse_row__is_deleted_true__sets_flag() -> None: + # Given a Dynamo identity document and is_deleted=True + doc: DynamoIdentity = { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + } + + # When mapped with is_deleted=True + result = map_identity_document_to_clickhouse_row("env-key", doc, is_deleted=True) + + # Then the flag is set in the returned tuple + assert result == ("env-key", "alice", "env_x_alice", None, True) diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py index 3eff7ba86c38..1b84be1dbe0a 100644 --- a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py @@ -107,7 +107,7 @@ def test_backfill_identities_to_clickhouse__happy_path__bulk_inserts( sql, rows_arg = cursor.executemany.call_args.args assert sql == ( "INSERT INTO IDENTITIES " - "(environment_id, identifier, identity_key, traits) VALUES" + "(environment_id, identifier, identity_key, traits, is_deleted) VALUES" ) assert {row[0] for row in rows_arg} == {environment.api_key} assert {row[1] for row in rows_arg} == {"a", "b"} @@ -356,3 +356,58 @@ def test_refresh_project_segment_counts__never_matched_pair__no_row_written( assert not SegmentMembershipCount.objects.filter( segment=segment, environment=environment ).exists() + + +def test_write_identity_deletion_tombstone_to_clickhouse__clickhouse_disabled__skips( + mocker: MockerFixture, + settings: SettingsWrapper, + log: StructuredLogCapture, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = False + spy = mocker.patch.object(tasks, "open_clickhouse_cursor") + + # When + tasks.write_identity_deletion_tombstone_to_clickhouse( + env_key="env-abc", + identifier="alice", + identity_key="env-abc_alice", + ) + + # Then + spy.assert_not_called() + assert any(e["event"] == "tombstone.skipped" for e in log.events) + + +def test_write_identity_deletion_tombstone_to_clickhouse__clickhouse_enabled__writes_tombstone( + mocker: MockerFixture, + settings: SettingsWrapper, + log: StructuredLogCapture, +) -> None: + # Given + settings.CLICKHOUSE_ENABLED = True + cursor = MagicMock() + open_cursor = mocker.patch.object(tasks, "open_clickhouse_cursor") + open_cursor.return_value.__enter__.return_value = cursor + + # When + tasks.write_identity_deletion_tombstone_to_clickhouse( + env_key="env-abc", + identifier="alice", + identity_key="env-abc_alice", + ) + + # Then — exactly one INSERT with is_deleted=True + sql, rows_arg = cursor.executemany.call_args.args + assert sql == ( + "INSERT INTO IDENTITIES " + "(environment_id, identifier, identity_key, traits, is_deleted) VALUES" + ) + assert len(rows_arg) == 1 + row = rows_arg[0] + assert row[0] == "env-abc" # environment_id + assert row[1] == "alice" # identifier + assert row[2] == "env-abc_alice" # identity_key + assert row[3] is None # traits — NULL for tombstone + assert row[4] is True # is_deleted + assert any(e["event"] == "tombstone.written" for e in log.events) diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index 3133424c66bd..565353f2b0bc 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -328,7 +328,7 @@ Attributes: ### `segment_membership.backfill.environment.completed` Logged at `info` from: - - `api/segment_membership/tasks.py:110` + - `api/segment_membership/tasks.py:111` Attributes: - `environment.id` @@ -338,7 +338,7 @@ Attributes: ### `segment_membership.backfill.environment.failed` Logged at `exception` from: - - `api/segment_membership/tasks.py:103` + - `api/segment_membership/tasks.py:104` Attributes: - `environment.id` @@ -347,8 +347,8 @@ Attributes: ### `segment_membership.backfill.skipped` Logged at `info` from: - - `api/segment_membership/tasks.py:67` - - `api/segment_membership/tasks.py:72` + - `api/segment_membership/tasks.py:68` + - `api/segment_membership/tasks.py:73` Attributes: - `reason` @@ -366,7 +366,7 @@ Attributes: ### `segment_membership.refresh.project.completed` Logged at `info` from: - - `api/segment_membership/tasks.py:185` + - `api/segment_membership/tasks.py:186` Attributes: - `membership_counts.count` @@ -376,7 +376,7 @@ Attributes: ### `segment_membership.refresh.project.failed` Logged at `exception` from: - - `api/segment_membership/tasks.py:158` + - `api/segment_membership/tasks.py:159` Attributes: - `project.id` @@ -384,13 +384,32 @@ Attributes: ### `segment_membership.refresh.project.skipped` Logged at `info` from: - - `api/segment_membership/tasks.py:129` - - `api/segment_membership/tasks.py:138` + - `api/segment_membership/tasks.py:130` + - `api/segment_membership/tasks.py:139` Attributes: - `project.id` - `reason` +### `segment_membership.tombstone.skipped` + +Logged at `info` from: + - `api/segment_membership/tasks.py:210` + +Attributes: + - `env_key` + - `identifier` + - `reason` + +### `segment_membership.tombstone.written` + +Logged at `info` from: + - `api/segment_membership/tasks.py:224` + +Attributes: + - `env_key` + - `identifier` + ### `segments.serializers.segment_revision_created` Logged at `info` from: