Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/sentry/apidocs/examples/workflow_engine_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class WorkflowEngineExamples:
"extrapolationMode": "unknown",
},
},
"health": {"isHealthy": True, "message": None},
}
],
"conditionGroup": {
Expand Down Expand Up @@ -251,6 +252,7 @@ class WorkflowEngineExamples:
"lastSeen": "2026-01-12T16:16:26.355334Z",
},
"openIssues": 0,
"hasDataSourceError": False,
},
status_codes=["200"],
response_only=True,
Expand Down Expand Up @@ -296,6 +298,7 @@ class WorkflowEngineExamples:
"extrapolationMode": "unknown",
},
},
"health": {"isHealthy": True, "message": None},
}
],
"conditionGroup": {
Expand Down Expand Up @@ -338,6 +341,7 @@ class WorkflowEngineExamples:
"lastSeen": "2026-01-09T18:48:15.250134Z",
},
"openIssues": 0,
"hasDataSourceError": False,
},
status_codes=["200"],
response_only=True,
Expand Down Expand Up @@ -395,6 +399,7 @@ class WorkflowEngineExamples:
"lastSeen": "2026-01-08T21:23:45.723716Z",
},
"openIssues": 100,
"hasDataSourceError": False,
},
{
"id": "234567891",
Expand Down Expand Up @@ -425,6 +430,7 @@ class WorkflowEngineExamples:
"timeoutMs": 5000,
"traceSampling": False,
},
"health": {"isHealthy": True, "message": None},
}
],
"conditionGroup": {
Expand Down Expand Up @@ -456,6 +462,7 @@ class WorkflowEngineExamples:
"enabled": True,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
},
{
"id": "1234567",
Expand Down Expand Up @@ -494,6 +501,7 @@ class WorkflowEngineExamples:
"extrapolationMode": "unknown",
},
},
"health": {"isHealthy": True, "message": None},
}
],
"conditionGroup": {
Expand All @@ -520,6 +528,7 @@ class WorkflowEngineExamples:
"enabled": True,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
},
],
status_codes=["200"],
Expand Down Expand Up @@ -817,6 +826,7 @@ class WorkflowEngineExamples:
"extrapolationMode": "unknown",
},
},
"health": {"isHealthy": True, "message": None},
}
],
"conditionGroup": {
Expand All @@ -833,6 +843,7 @@ class WorkflowEngineExamples:
"enabled": True,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
},
status_codes=["201"],
response_only=True,
Expand Down
33 changes: 32 additions & 1 deletion src/sentry/snuba/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from sentry.models.team import Team
from sentry.users.models.user import User
from sentry.workflow_engine.registry import data_source_type_registry
from sentry.workflow_engine.types import DataSourceTypeHandler
from sentry.workflow_engine.types import DataSourceHealth, DataSourceTypeHandler

if TYPE_CHECKING:
from sentry.models.organization import Organization
Expand Down Expand Up @@ -149,6 +149,10 @@ class Status(Enum):
UPDATING = 2
DELETING = 3
DISABLED = 4
# The subscription could not be created or updated after significant
# retrying and likely requires user intervention to fix (e.g., editing
# the detector's query).
BROKEN = 5

# NOTE: project fk SHOULD match AlertRule's fk
project = FlexibleForeignKey("sentry.Project", db_constraint=False)
Expand Down Expand Up @@ -239,3 +243,30 @@ def get_current_instance_count(org: Organization) -> int:
@staticmethod
def get_relocation_model_name() -> str:
return "sentry.querysubscription"

@override
@staticmethod
def bulk_get_health(data_sources: list[DataSource]) -> dict[int, DataSourceHealth]:
query_subscription_ids: list[int] = []
for ds in data_sources:
try:
query_subscription_ids.append(int(ds.source_id))
except ValueError:
pass

qs_lookup: dict[str, QuerySubscription] = {
str(qs.id): qs for qs in QuerySubscription.objects.filter(id__in=query_subscription_ids)
}

results: dict[int, DataSourceHealth] = {}
for ds in data_sources:
sub = qs_lookup.get(ds.source_id)
if sub is None:
results[ds.id] = DataSourceHealth(
is_healthy=False, message="Subscription not found"
)
elif sub.status == QuerySubscription.Status.BROKEN.value:
results[ds.id] = DataSourceHealth(is_healthy=False)
else:
results[ds.id] = DataSourceHealth(is_healthy=True)
return results
7 changes: 5 additions & 2 deletions src/sentry/snuba/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import timedelta

from django.db import router, transaction
from django.utils import timezone

from sentry.models.environment import Environment
from sentry.models.project import Project
Expand Down Expand Up @@ -250,7 +251,9 @@ def update_snuba_subscription(
:return: The QuerySubscription representing the subscription
"""
with transaction.atomic(router.db_for_write(QuerySubscription)):
subscription.update(status=QuerySubscription.Status.UPDATING.value)
subscription.update(
status=QuerySubscription.Status.UPDATING.value, date_updated=timezone.now()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly overkill but there is an auto_now=True field you can set on the django model

)

transaction.on_commit(
lambda: update_subscription_in_snuba.delay(
Expand Down Expand Up @@ -333,7 +336,7 @@ def enable_snuba_subscription(subscription: QuerySubscription) -> None:
:param subscription: The subscription to enable
:return:
"""
subscription.update(status=QuerySubscription.Status.CREATING.value)
subscription.update(status=QuerySubscription.Status.CREATING.value, date_updated=timezone.now())

transaction.on_commit(
lambda: create_subscription_in_snuba.delay(query_subscription_id=subscription.id),
Expand Down
27 changes: 25 additions & 2 deletions src/sentry/snuba/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@


SUBSCRIPTION_STATUS_MAX_AGE = timedelta(minutes=10)
SUBSCRIPTION_BROKEN_MAX_AGE = timedelta(hours=1)


class SubscriptionError(Exception):
Expand Down Expand Up @@ -338,20 +339,41 @@ def _delete_from_snuba(dataset: Dataset, subscription_id: str, entity_key: Entit
)
def subscription_checker(**kwargs: Any) -> None:
"""
Checks for subscriptions stuck in a transition status and attempts to repair them
Checks for subscriptions stuck in a transition status and attempts to repair them.
Subscriptions stuck in CREATING or UPDATING for longer than SUBSCRIPTION_BROKEN_MAX_AGE
are marked as BROKEN so they stop retrying and can surface errors to users.
"""
count = 0
broken_count = 0
now = timezone.now()
broken_cutoff = now - SUBSCRIPTION_BROKEN_MAX_AGE
repair_cutoff = now - SUBSCRIPTION_STATUS_MAX_AGE

for subscription in QuerySubscription.objects.filter(
status__in=(
QuerySubscription.Status.CREATING.value,
QuerySubscription.Status.UPDATING.value,
QuerySubscription.Status.DELETING.value,
),
date_updated__lt=timezone.now() - SUBSCRIPTION_STATUS_MAX_AGE,
date_updated__lt=repair_cutoff,
):
with sentry_sdk.start_span(op="repair_subscription") as span:
span.set_data("subscription_id", subscription.id)
span.set_data("status", subscription.status)

if (
subscription.status
in (
QuerySubscription.Status.CREATING.value,
QuerySubscription.Status.UPDATING.value,
)
and subscription.date_updated is not None
and subscription.date_updated < broken_cutoff
):
subscription.update(status=QuerySubscription.Status.BROKEN.value)
Comment thread
kcons marked this conversation as resolved.
broken_count += 1
continue

count += 1
if subscription.status == QuerySubscription.Status.CREATING.value:
create_subscription_in_snuba.delay(query_subscription_id=subscription.id)
Expand All @@ -361,3 +383,4 @@ def subscription_checker(**kwargs: Any) -> None:
delete_subscription_from_snuba.delay(query_subscription_id=subscription.id)

metrics.incr("snuba.subscriptions.repair", amount=count)
metrics.incr("snuba.subscriptions.marked_broken", amount=broken_count)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from sentry.api.serializers import Serializer, register, serialize
from sentry.workflow_engine.models import DataSource
from sentry.workflow_engine.types import DataSourceTypeHandler
from sentry.workflow_engine.types import DataSourceHealth, DataSourceTypeHandler


@register(DataSource)
Expand All @@ -18,6 +18,7 @@ def get_attrs(
ds_by_type[item.type_handler].append(item)

serialized_query_objs: dict[int, dict[str, Any]] = {}
health_by_ds_id: dict[int, DataSourceHealth] = {}

for type_handler, ds_items in ds_by_type.items():
ds_query_objs = list(type_handler.bulk_get_query_object(ds_items).items())
Expand All @@ -30,18 +31,26 @@ def get_attrs(
for (ds_id, query_obj), serialized_obj in zip(ds_query_objs, serialized)
}
)
health_by_ds_id.update(type_handler.bulk_get_health(ds_items))

for item in item_list:
attrs[item]["query_obj"] = serialized_query_objs.get(item.id, [])
attrs[item]["health"] = health_by_ds_id.get(item.id, DataSourceHealth(is_healthy=True))

return attrs

def serialize(
self, obj: DataSource, attrs: Mapping[str, Any], user: Any, **kwargs: Any
) -> dict[str, Any]:
health: DataSourceHealth = attrs["health"]
return {
"id": str(obj.id),
"organizationId": str(obj.organization_id),
"type": obj.type,
"sourceId": str(obj.source_id),
"queryObj": attrs["query_obj"],
"health": {
"isHealthy": health.is_healthy,
"message": health.message,
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class DetectorSerializerResponse(DetectorSerializerResponseOptional):
config: dict[str, Any]
enabled: bool
openIssues: int
hasDataSourceError: bool


@register(Detector)
Expand Down Expand Up @@ -217,4 +218,8 @@ def serialize(
"ruleId": alert_rule_mapping.get("rule_id"),
"latestGroup": attrs.get("latest_group"),
"openIssues": attrs.get("open_issues_count", 0),
"hasDataSourceError": any(
ds.get("health", {}).get("isHealthy") is False
for ds in (attrs.get("data_sources") or [])
),
}
14 changes: 14 additions & 0 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ def execute(invocation: ActionInvocation) -> None:
raise NotImplementedError


@dataclass(frozen=True)
class DataSourceHealth:
is_healthy: bool
message: str | None = None


class DataSourceTypeHandler(ABC, Generic[T]):
@staticmethod
@abstractmethod
Expand Down Expand Up @@ -364,6 +370,14 @@ def get_relocation_model_name() -> str:
"""
raise NotImplementedError

@staticmethod
def bulk_get_health(data_sources: list[DataSource]) -> dict[int, DataSourceHealth]:
"""
Returns health status for each DataSource. Implementations should override
this to report source-specific problems (e.g., broken subscriptions).
"""
return {ds.id: DataSourceHealth(is_healthy=True) for ds in data_sources}


class DataConditionHandler(Generic[T]):
class Group(StrEnum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def generate_evidence_data(
"extrapolation_mode": "unknown",
},
},
"health": {
"is_healthy": True,
"message": None,
},
}
],
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ def test_get_monitor_incident_detector_details(self) -> None:
"queryObj": serialize(
self.monitor, user=self.user, serializer=MonitorSerializer()
),
"health": {
"isHealthy": True,
"message": None,
},
}
],
"conditionGroup": None,
Expand All @@ -89,6 +93,7 @@ def test_get_monitor_incident_detector_details(self) -> None:
"ruleId": None,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
}

def test_update_monitor_incident_detector(self) -> None:
Expand Down Expand Up @@ -127,6 +132,10 @@ def test_update_monitor_incident_detector(self) -> None:
"type": DATA_SOURCE_CRON_MONITOR,
"sourceId": str(self.monitor.id),
"queryObj": serialize(self.monitor, user=self.user, serializer=MonitorSerializer()),
"health": {
"isHealthy": True,
"message": None,
},
}
]
assert response.data["dataSources"] == expected_data_sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def test_list_monitor_incident_detectors(self) -> None:
"queryObj": serialize(
self.monitor, user=self.user, serializer=MonitorSerializer()
),
"health": {
"isHealthy": True,
"message": None,
},
}
],
"conditionGroup": detector_data["conditionGroup"],
Expand All @@ -79,6 +83,7 @@ def test_list_monitor_incident_detectors(self) -> None:
"ruleId": None,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
}


Expand Down
Loading
Loading