diff --git a/src/sentry/apidocs/examples/workflow_engine_examples.py b/src/sentry/apidocs/examples/workflow_engine_examples.py index cec267775069e4..b84c4a877c46ad 100644 --- a/src/sentry/apidocs/examples/workflow_engine_examples.py +++ b/src/sentry/apidocs/examples/workflow_engine_examples.py @@ -201,6 +201,7 @@ class WorkflowEngineExamples: "extrapolationMode": "unknown", }, }, + "health": {"isHealthy": True, "message": None}, } ], "conditionGroup": { @@ -251,6 +252,7 @@ class WorkflowEngineExamples: "lastSeen": "2026-01-12T16:16:26.355334Z", }, "openIssues": 0, + "hasDataSourceError": False, }, status_codes=["200"], response_only=True, @@ -296,6 +298,7 @@ class WorkflowEngineExamples: "extrapolationMode": "unknown", }, }, + "health": {"isHealthy": True, "message": None}, } ], "conditionGroup": { @@ -338,6 +341,7 @@ class WorkflowEngineExamples: "lastSeen": "2026-01-09T18:48:15.250134Z", }, "openIssues": 0, + "hasDataSourceError": False, }, status_codes=["200"], response_only=True, @@ -395,6 +399,7 @@ class WorkflowEngineExamples: "lastSeen": "2026-01-08T21:23:45.723716Z", }, "openIssues": 100, + "hasDataSourceError": False, }, { "id": "234567891", @@ -425,6 +430,7 @@ class WorkflowEngineExamples: "timeoutMs": 5000, "traceSampling": False, }, + "health": {"isHealthy": True, "message": None}, } ], "conditionGroup": { @@ -456,6 +462,7 @@ class WorkflowEngineExamples: "enabled": True, "latestGroup": None, "openIssues": 0, + "hasDataSourceError": False, }, { "id": "1234567", @@ -494,6 +501,7 @@ class WorkflowEngineExamples: "extrapolationMode": "unknown", }, }, + "health": {"isHealthy": True, "message": None}, } ], "conditionGroup": { @@ -520,6 +528,7 @@ class WorkflowEngineExamples: "enabled": True, "latestGroup": None, "openIssues": 0, + "hasDataSourceError": False, }, ], status_codes=["200"], @@ -817,6 +826,7 @@ class WorkflowEngineExamples: "extrapolationMode": "unknown", }, }, + "health": {"isHealthy": True, "message": None}, } ], "conditionGroup": { @@ -833,6 +843,7 @@ class WorkflowEngineExamples: "enabled": True, "latestGroup": None, "openIssues": 0, + "hasDataSourceError": False, }, status_codes=["201"], response_only=True, diff --git a/src/sentry/snuba/models.py b/src/sentry/snuba/models.py index d0b2c4cbaa0ca9..e04fde005cbc9e 100644 --- a/src/sentry/snuba/models.py +++ b/src/sentry/snuba/models.py @@ -20,7 +20,7 @@ from sentry.models.team import Team from sentry.users.models.user import User from sentry.workflow_engine.registry import data_source_type_registry -from sentry.workflow_engine.types import DataSourceTypeHandler +from sentry.workflow_engine.types import DataSourceHealth, DataSourceTypeHandler if TYPE_CHECKING: from sentry.models.organization import Organization @@ -149,6 +149,10 @@ class Status(Enum): UPDATING = 2 DELETING = 3 DISABLED = 4 + # The subscription could not be created or updated after significant + # retrying and likely requires user intervention to fix (e.g., editing + # the detector's query). + BROKEN = 5 # NOTE: project fk SHOULD match AlertRule's fk project = FlexibleForeignKey("sentry.Project", db_constraint=False) @@ -239,3 +243,30 @@ def get_current_instance_count(org: Organization) -> int: @staticmethod def get_relocation_model_name() -> str: return "sentry.querysubscription" + + @override + @staticmethod + def bulk_get_health(data_sources: list[DataSource]) -> dict[int, DataSourceHealth]: + query_subscription_ids: list[int] = [] + for ds in data_sources: + try: + query_subscription_ids.append(int(ds.source_id)) + except ValueError: + pass + + qs_lookup: dict[str, QuerySubscription] = { + str(qs.id): qs for qs in QuerySubscription.objects.filter(id__in=query_subscription_ids) + } + + results: dict[int, DataSourceHealth] = {} + for ds in data_sources: + sub = qs_lookup.get(ds.source_id) + if sub is None: + results[ds.id] = DataSourceHealth( + is_healthy=False, message="Subscription not found" + ) + elif sub.status == QuerySubscription.Status.BROKEN.value: + results[ds.id] = DataSourceHealth(is_healthy=False) + else: + results[ds.id] = DataSourceHealth(is_healthy=True) + return results diff --git a/src/sentry/snuba/subscriptions.py b/src/sentry/snuba/subscriptions.py index e357c5d12d1907..40cc530f71a61f 100644 --- a/src/sentry/snuba/subscriptions.py +++ b/src/sentry/snuba/subscriptions.py @@ -3,6 +3,7 @@ from datetime import timedelta from django.db import router, transaction +from django.utils import timezone from sentry.models.environment import Environment from sentry.models.project import Project @@ -250,7 +251,9 @@ def update_snuba_subscription( :return: The QuerySubscription representing the subscription """ with transaction.atomic(router.db_for_write(QuerySubscription)): - subscription.update(status=QuerySubscription.Status.UPDATING.value) + subscription.update( + status=QuerySubscription.Status.UPDATING.value, date_updated=timezone.now() + ) transaction.on_commit( lambda: update_subscription_in_snuba.delay( @@ -333,7 +336,7 @@ def enable_snuba_subscription(subscription: QuerySubscription) -> None: :param subscription: The subscription to enable :return: """ - subscription.update(status=QuerySubscription.Status.CREATING.value) + subscription.update(status=QuerySubscription.Status.CREATING.value, date_updated=timezone.now()) transaction.on_commit( lambda: create_subscription_in_snuba.delay(query_subscription_id=subscription.id), diff --git a/src/sentry/snuba/tasks.py b/src/sentry/snuba/tasks.py index b41d2a2139da85..3c2d1b5db111f1 100644 --- a/src/sentry/snuba/tasks.py +++ b/src/sentry/snuba/tasks.py @@ -33,6 +33,7 @@ SUBSCRIPTION_STATUS_MAX_AGE = timedelta(minutes=10) +SUBSCRIPTION_BROKEN_MAX_AGE = timedelta(hours=1) class SubscriptionError(Exception): @@ -338,20 +339,41 @@ def _delete_from_snuba(dataset: Dataset, subscription_id: str, entity_key: Entit ) def subscription_checker(**kwargs: Any) -> None: """ - Checks for subscriptions stuck in a transition status and attempts to repair them + Checks for subscriptions stuck in a transition status and attempts to repair them. + Subscriptions stuck in CREATING or UPDATING for longer than SUBSCRIPTION_BROKEN_MAX_AGE + are marked as BROKEN so they stop retrying and can surface errors to users. """ count = 0 + broken_count = 0 + now = timezone.now() + broken_cutoff = now - SUBSCRIPTION_BROKEN_MAX_AGE + repair_cutoff = now - SUBSCRIPTION_STATUS_MAX_AGE + for subscription in QuerySubscription.objects.filter( status__in=( QuerySubscription.Status.CREATING.value, QuerySubscription.Status.UPDATING.value, QuerySubscription.Status.DELETING.value, ), - date_updated__lt=timezone.now() - SUBSCRIPTION_STATUS_MAX_AGE, + date_updated__lt=repair_cutoff, ): with sentry_sdk.start_span(op="repair_subscription") as span: span.set_data("subscription_id", subscription.id) span.set_data("status", subscription.status) + + if ( + subscription.status + in ( + QuerySubscription.Status.CREATING.value, + QuerySubscription.Status.UPDATING.value, + ) + and subscription.date_updated is not None + and subscription.date_updated < broken_cutoff + ): + subscription.update(status=QuerySubscription.Status.BROKEN.value) + broken_count += 1 + continue + count += 1 if subscription.status == QuerySubscription.Status.CREATING.value: create_subscription_in_snuba.delay(query_subscription_id=subscription.id) @@ -361,3 +383,4 @@ def subscription_checker(**kwargs: Any) -> None: delete_subscription_from_snuba.delay(query_subscription_id=subscription.id) metrics.incr("snuba.subscriptions.repair", amount=count) + metrics.incr("snuba.subscriptions.marked_broken", amount=broken_count) diff --git a/src/sentry/workflow_engine/endpoints/serializers/data_source_serializer.py b/src/sentry/workflow_engine/endpoints/serializers/data_source_serializer.py index 2471cd66c445b1..1bed24605b712b 100644 --- a/src/sentry/workflow_engine/endpoints/serializers/data_source_serializer.py +++ b/src/sentry/workflow_engine/endpoints/serializers/data_source_serializer.py @@ -4,7 +4,7 @@ from sentry.api.serializers import Serializer, register, serialize from sentry.workflow_engine.models import DataSource -from sentry.workflow_engine.types import DataSourceTypeHandler +from sentry.workflow_engine.types import DataSourceHealth, DataSourceTypeHandler @register(DataSource) @@ -18,6 +18,7 @@ def get_attrs( ds_by_type[item.type_handler].append(item) serialized_query_objs: dict[int, dict[str, Any]] = {} + health_by_ds_id: dict[int, DataSourceHealth] = {} for type_handler, ds_items in ds_by_type.items(): ds_query_objs = list(type_handler.bulk_get_query_object(ds_items).items()) @@ -30,18 +31,26 @@ def get_attrs( for (ds_id, query_obj), serialized_obj in zip(ds_query_objs, serialized) } ) + health_by_ds_id.update(type_handler.bulk_get_health(ds_items)) + for item in item_list: attrs[item]["query_obj"] = serialized_query_objs.get(item.id, []) + attrs[item]["health"] = health_by_ds_id.get(item.id, DataSourceHealth(is_healthy=True)) return attrs def serialize( self, obj: DataSource, attrs: Mapping[str, Any], user: Any, **kwargs: Any ) -> dict[str, Any]: + health: DataSourceHealth = attrs["health"] return { "id": str(obj.id), "organizationId": str(obj.organization_id), "type": obj.type, "sourceId": str(obj.source_id), "queryObj": attrs["query_obj"], + "health": { + "isHealthy": health.is_healthy, + "message": health.message, + }, } diff --git a/src/sentry/workflow_engine/endpoints/serializers/detector_serializer.py b/src/sentry/workflow_engine/endpoints/serializers/detector_serializer.py index 10c7238c53262a..1fc903c940bf87 100644 --- a/src/sentry/workflow_engine/endpoints/serializers/detector_serializer.py +++ b/src/sentry/workflow_engine/endpoints/serializers/detector_serializer.py @@ -47,6 +47,7 @@ class DetectorSerializerResponse(DetectorSerializerResponseOptional): config: dict[str, Any] enabled: bool openIssues: int + hasDataSourceError: bool @register(Detector) @@ -217,4 +218,8 @@ def serialize( "ruleId": alert_rule_mapping.get("rule_id"), "latestGroup": attrs.get("latest_group"), "openIssues": attrs.get("open_issues_count", 0), + "hasDataSourceError": any( + ds.get("health", {}).get("isHealthy") is False + for ds in (attrs.get("data_sources") or []) + ), } diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index d783e26be1b1fc..d6a502de8fa69c 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -316,6 +316,12 @@ def execute(invocation: ActionInvocation) -> None: raise NotImplementedError +@dataclass(frozen=True) +class DataSourceHealth: + is_healthy: bool + message: str | None = None + + class DataSourceTypeHandler(ABC, Generic[T]): @staticmethod @abstractmethod @@ -364,6 +370,14 @@ def get_relocation_model_name() -> str: """ raise NotImplementedError + @staticmethod + def bulk_get_health(data_sources: list[DataSource]) -> dict[int, DataSourceHealth]: + """ + Returns health status for each DataSource. Implementations should override + this to report source-specific problems (e.g., broken subscriptions). + """ + return {ds.id: DataSourceHealth(is_healthy=True) for ds in data_sources} + class DataConditionHandler(Generic[T]): class Group(StrEnum): diff --git a/tests/sentry/incidents/test_metric_issue_detector_handler.py b/tests/sentry/incidents/test_metric_issue_detector_handler.py index 43ab2093eb4b98..b88d920c7bcd8f 100644 --- a/tests/sentry/incidents/test_metric_issue_detector_handler.py +++ b/tests/sentry/incidents/test_metric_issue_detector_handler.py @@ -75,6 +75,10 @@ def generate_evidence_data( "extrapolation_mode": "unknown", }, }, + "health": { + "is_healthy": True, + "message": None, + }, } ], } diff --git a/tests/sentry/monitors/endpoints/test_organization_detector_details.py b/tests/sentry/monitors/endpoints/test_organization_detector_details.py index 45d7588175a18b..3fd5c97608eb18 100644 --- a/tests/sentry/monitors/endpoints/test_organization_detector_details.py +++ b/tests/sentry/monitors/endpoints/test_organization_detector_details.py @@ -80,6 +80,10 @@ def test_get_monitor_incident_detector_details(self) -> None: "queryObj": serialize( self.monitor, user=self.user, serializer=MonitorSerializer() ), + "health": { + "isHealthy": True, + "message": None, + }, } ], "conditionGroup": None, @@ -89,6 +93,7 @@ def test_get_monitor_incident_detector_details(self) -> None: "ruleId": None, "latestGroup": None, "openIssues": 0, + "hasDataSourceError": False, } def test_update_monitor_incident_detector(self) -> None: @@ -127,6 +132,10 @@ def test_update_monitor_incident_detector(self) -> None: "type": DATA_SOURCE_CRON_MONITOR, "sourceId": str(self.monitor.id), "queryObj": serialize(self.monitor, user=self.user, serializer=MonitorSerializer()), + "health": { + "isHealthy": True, + "message": None, + }, } ] assert response.data["dataSources"] == expected_data_sources diff --git a/tests/sentry/monitors/endpoints/test_organization_detector_index.py b/tests/sentry/monitors/endpoints/test_organization_detector_index.py index 1c11c479bab60a..8809899285781e 100644 --- a/tests/sentry/monitors/endpoints/test_organization_detector_index.py +++ b/tests/sentry/monitors/endpoints/test_organization_detector_index.py @@ -70,6 +70,10 @@ def test_list_monitor_incident_detectors(self) -> None: "queryObj": serialize( self.monitor, user=self.user, serializer=MonitorSerializer() ), + "health": { + "isHealthy": True, + "message": None, + }, } ], "conditionGroup": detector_data["conditionGroup"], @@ -79,6 +83,7 @@ def test_list_monitor_incident_detectors(self) -> None: "ruleId": None, "latestGroup": None, "openIssues": 0, + "hasDataSourceError": False, } diff --git a/tests/sentry/snuba/test_models.py b/tests/sentry/snuba/test_models.py index c5c880fa7731e7..2da2a7e5101063 100644 --- a/tests/sentry/snuba/test_models.py +++ b/tests/sentry/snuba/test_models.py @@ -10,6 +10,7 @@ ) from sentry.snuba.subscriptions import create_snuba_query, create_snuba_subscription from sentry.testutils.cases import TestCase +from sentry.workflow_engine.types import DataSourceHealth class SnubaQueryEventTypesTest(TestCase): @@ -139,3 +140,22 @@ def test_get_current_instance_count(self) -> None: # Count should still be 3 as it only counts for the given org assert QuerySubscriptionDataSourceHandler.get_current_instance_count(new_org) == 3 + + def test_bulk_get_health_active(self) -> None: + result = QuerySubscriptionDataSourceHandler.bulk_get_health([self.data_source]) + assert result[self.data_source.id] == DataSourceHealth(is_healthy=True) + + def test_bulk_get_health_broken(self) -> None: + self.subscription.update(status=QuerySubscription.Status.BROKEN.value) + result = QuerySubscriptionDataSourceHandler.bulk_get_health([self.data_source]) + assert result[self.data_source.id] == DataSourceHealth(is_healthy=False) + + def test_bulk_get_health_missing_subscription(self) -> None: + ds_missing = self.create_data_source( + type="snuba_query_subscription", + source_id="999999", + ) + result = QuerySubscriptionDataSourceHandler.bulk_get_health([ds_missing]) + assert result[ds_missing.id] == DataSourceHealth( + is_healthy=False, message="Subscription not found" + ) diff --git a/tests/sentry/snuba/test_tasks.py b/tests/sentry/snuba/test_tasks.py index c7138e94be9932..40326a40b93103 100644 --- a/tests/sentry/snuba/test_tasks.py +++ b/tests/sentry/snuba/test_tasks.py @@ -30,6 +30,7 @@ from sentry.snuba.metrics.naming_layer.mri import SessionMRI from sentry.snuba.models import QuerySubscription, SnubaQuery, SnubaQueryEventType from sentry.snuba.tasks import ( + SUBSCRIPTION_BROKEN_MAX_AGE, SUBSCRIPTION_STATUS_MAX_AGE, SubscriptionError, create_subscription_in_snuba, @@ -1295,3 +1296,45 @@ def test_create_update(self) -> None: sub_new = QuerySubscription.objects.get(id=sub_new.id) assert sub_new.status == status.value assert sub_new.subscription_id is None + + def test_marks_creating_as_broken(self) -> None: + sub = self.create_subscription( + QuerySubscription.Status.CREATING, + date_updated=timezone.now() - SUBSCRIPTION_BROKEN_MAX_AGE * 2, + ) + subscription_checker() + sub = QuerySubscription.objects.get(id=sub.id) + assert sub.status == QuerySubscription.Status.BROKEN.value + + def test_marks_updating_as_broken(self) -> None: + sub = self.create_subscription( + QuerySubscription.Status.UPDATING, + date_updated=timezone.now() - SUBSCRIPTION_BROKEN_MAX_AGE * 2, + ) + subscription_checker() + sub = QuerySubscription.objects.get(id=sub.id) + assert sub.status == QuerySubscription.Status.BROKEN.value + + def test_does_not_mark_deleting_as_broken(self) -> None: + sub = self.create_subscription( + QuerySubscription.Status.DELETING, + date_updated=timezone.now() - SUBSCRIPTION_BROKEN_MAX_AGE * 2, + ) + with self.tasks(): + subscription_checker() + sub_refreshed = QuerySubscription.objects.filter(id=sub.id).first() + # DELETING subscriptions get re-triggered, not marked broken + assert ( + sub_refreshed is None or sub_refreshed.status != QuerySubscription.Status.BROKEN.value + ) + + def test_does_not_mark_broken_before_threshold(self) -> None: + sub = self.create_subscription( + QuerySubscription.Status.CREATING, + date_updated=timezone.now() - SUBSCRIPTION_STATUS_MAX_AGE * 2, + ) + with self.tasks(): + subscription_checker() + sub = QuerySubscription.objects.get(id=sub.id) + # Should be repaired (retried), not marked broken + assert sub.status == QuerySubscription.Status.ACTIVE.value diff --git a/tests/sentry/workflow_engine/endpoints/serializers/test_data_source_serializer.py b/tests/sentry/workflow_engine/endpoints/serializers/test_data_source_serializer.py index 5ec8970e592b20..7d9a8b9d3c701b 100644 --- a/tests/sentry/workflow_engine/endpoints/serializers/test_data_source_serializer.py +++ b/tests/sentry/workflow_engine/endpoints/serializers/test_data_source_serializer.py @@ -56,4 +56,8 @@ def test_serialize(self) -> None: "status": 1, "subscription": None, }, + "health": { + "isHealthy": True, + "message": None, + }, } diff --git a/tests/sentry/workflow_engine/endpoints/serializers/test_detector_serializer.py b/tests/sentry/workflow_engine/endpoints/serializers/test_detector_serializer.py index 68601a5e7938ea..c7035b1939e6f5 100644 --- a/tests/sentry/workflow_engine/endpoints/serializers/test_detector_serializer.py +++ b/tests/sentry/workflow_engine/endpoints/serializers/test_detector_serializer.py @@ -7,7 +7,7 @@ from sentry.models.group import GroupStatus from sentry.notifications.models.notificationaction import ActionTarget from sentry.snuba.dataset import Dataset -from sentry.snuba.models import QuerySubscriptionDataSourceHandler, SnubaQuery +from sentry.snuba.models import QuerySubscription, QuerySubscriptionDataSourceHandler, SnubaQuery from sentry.snuba.subscriptions import create_snuba_query, create_snuba_subscription from sentry.testutils.cases import TestCase from sentry.testutils.helpers.datetime import before_now @@ -51,6 +51,7 @@ def test_serialize_simple(self) -> None: "ruleId": None, "latestGroup": None, "openIssues": 0, + "hasDataSourceError": False, } def test_serialize_full(self) -> None: @@ -143,6 +144,10 @@ def test_serialize_full(self) -> None: "status": 1, "subscription": None, }, + "health": { + "isHealthy": True, + "message": None, + }, } ], "conditionGroup": { @@ -184,6 +189,7 @@ def test_serialize_full(self) -> None: "ruleId": None, "latestGroup": mock.ANY, "openIssues": 1, + "hasDataSourceError": False, } def test_serialize_latest_group(self) -> None: @@ -244,3 +250,33 @@ def test_serialize_bulk(self) -> None: assert len(result) == 2 assert all(d["name"] in ["Test Detector 0", "Test Detector 1"] for d in result) + + def test_serialize_has_data_source_error(self) -> None: + detector = self.create_detector( + project_id=self.project.id, name="Broken Detector", type=MetricIssue.slug + ) + snuba_query = create_snuba_query( + SnubaQuery.Type.ERROR, + Dataset.Events, + "hello", + "count()", + timedelta(minutes=1), + timedelta(minutes=1), + None, + ) + subscription = create_snuba_subscription( + self.project, INCIDENTS_SNUBA_SUBSCRIPTION_TYPE, snuba_query + ) + subscription.update(status=QuerySubscription.Status.BROKEN.value) + + type_name = data_source_type_registry.get_key(QuerySubscriptionDataSourceHandler) + data_source = self.create_data_source( + organization=self.organization, + type=type_name, + source_id=str(subscription.id), + ) + data_source.detectors.set([detector]) + + result = serialize(detector) + assert result["hasDataSourceError"] is True + assert result["dataSources"][0]["health"]["isHealthy"] is False