Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/sentry/snuba/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from sentry.models.team import Team
from sentry.users.models.user import User
from sentry.workflow_engine.registry import data_source_type_registry
from sentry.workflow_engine.types import DataSourceTypeHandler
from sentry.workflow_engine.types import DataSourceHealth, DataSourceTypeHandler

if TYPE_CHECKING:
from sentry.models.organization import Organization
Expand Down Expand Up @@ -149,6 +149,10 @@ class Status(Enum):
UPDATING = 2
DELETING = 3
DISABLED = 4
# The subscription could not be created or updated after significant
# retrying and likely requires user intervention to fix (e.g., editing
# the detector's query).
BROKEN = 5

# NOTE: project fk SHOULD match AlertRule's fk
project = FlexibleForeignKey("sentry.Project", db_constraint=False)
Expand Down Expand Up @@ -239,3 +243,30 @@ def get_current_instance_count(org: Organization) -> int:
@staticmethod
def get_relocation_model_name() -> str:
return "sentry.querysubscription"

@override
@staticmethod
def bulk_get_health(data_sources: list[DataSource]) -> dict[int, DataSourceHealth]:
query_subscription_ids: list[int] = []
for ds in data_sources:
try:
query_subscription_ids.append(int(ds.source_id))
except ValueError:
pass

qs_lookup: dict[str, QuerySubscription] = {
str(qs.id): qs for qs in QuerySubscription.objects.filter(id__in=query_subscription_ids)
}

results: dict[int, DataSourceHealth] = {}
for ds in data_sources:
sub = qs_lookup.get(ds.source_id)
if sub is None:
results[ds.id] = DataSourceHealth(
is_healthy=False, message="Subscription not found"
)
elif sub.status == QuerySubscription.Status.BROKEN.value:
results[ds.id] = DataSourceHealth(is_healthy=False)
else:
results[ds.id] = DataSourceHealth(is_healthy=True)
return results
26 changes: 24 additions & 2 deletions src/sentry/snuba/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@


SUBSCRIPTION_STATUS_MAX_AGE = timedelta(minutes=10)
SUBSCRIPTION_BROKEN_MAX_AGE = timedelta(hours=1)


class SubscriptionError(Exception):
Expand Down Expand Up @@ -338,20 +339,40 @@ def _delete_from_snuba(dataset: Dataset, subscription_id: str, entity_key: Entit
)
def subscription_checker(**kwargs: Any) -> None:
"""
Checks for subscriptions stuck in a transition status and attempts to repair them
Checks for subscriptions stuck in a transition status and attempts to repair them.
Subscriptions stuck in CREATING or UPDATING for longer than SUBSCRIPTION_BROKEN_MAX_AGE
are marked as BROKEN so they stop retrying and can surface errors to users.
"""
count = 0
broken_count = 0
now = timezone.now()
broken_cutoff = now - SUBSCRIPTION_BROKEN_MAX_AGE
repair_cutoff = now - SUBSCRIPTION_STATUS_MAX_AGE

for subscription in QuerySubscription.objects.filter(
status__in=(
QuerySubscription.Status.CREATING.value,
QuerySubscription.Status.UPDATING.value,
QuerySubscription.Status.DELETING.value,
),
date_updated__lt=timezone.now() - SUBSCRIPTION_STATUS_MAX_AGE,
date_updated__lt=repair_cutoff,
):
with sentry_sdk.start_span(op="repair_subscription") as span:
span.set_data("subscription_id", subscription.id)
span.set_data("status", subscription.status)

if (
subscription.status
in (
QuerySubscription.Status.CREATING.value,
QuerySubscription.Status.UPDATING.value,
)
and subscription.date_updated < broken_cutoff
):
subscription.update(status=QuerySubscription.Status.BROKEN.value)
Comment thread
kcons marked this conversation as resolved.
broken_count += 1
continue

count += 1
if subscription.status == QuerySubscription.Status.CREATING.value:
create_subscription_in_snuba.delay(query_subscription_id=subscription.id)
Expand All @@ -361,3 +382,4 @@ def subscription_checker(**kwargs: Any) -> None:
delete_subscription_from_snuba.delay(query_subscription_id=subscription.id)

metrics.incr("snuba.subscriptions.repair", amount=count)
metrics.incr("snuba.subscriptions.marked_broken", amount=broken_count)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from sentry.api.serializers import Serializer, register, serialize
from sentry.workflow_engine.models import DataSource
from sentry.workflow_engine.types import DataSourceTypeHandler
from sentry.workflow_engine.types import DataSourceHealth, DataSourceTypeHandler


@register(DataSource)
Expand All @@ -18,6 +18,7 @@ def get_attrs(
ds_by_type[item.type_handler].append(item)

serialized_query_objs: dict[int, dict[str, Any]] = {}
health_by_ds_id: dict[int, DataSourceHealth] = {}

for type_handler, ds_items in ds_by_type.items():
ds_query_objs = list(type_handler.bulk_get_query_object(ds_items).items())
Expand All @@ -30,18 +31,26 @@ def get_attrs(
for (ds_id, query_obj), serialized_obj in zip(ds_query_objs, serialized)
}
)
health_by_ds_id.update(type_handler.bulk_get_health(ds_items))

for item in item_list:
attrs[item]["query_obj"] = serialized_query_objs.get(item.id, [])
attrs[item]["health"] = health_by_ds_id.get(item.id, DataSourceHealth(is_healthy=True))

return attrs

def serialize(
self, obj: DataSource, attrs: Mapping[str, Any], user: Any, **kwargs: Any
) -> dict[str, Any]:
health: DataSourceHealth = attrs["health"]
return {
"id": str(obj.id),
"organizationId": str(obj.organization_id),
"type": obj.type,
"sourceId": str(obj.source_id),
"queryObj": attrs["query_obj"],
"health": {
"isHealthy": health.is_healthy,
"message": health.message,
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class DetectorSerializerResponse(DetectorSerializerResponseOptional):
config: dict[str, Any]
enabled: bool
openIssues: int
hasDataSourceError: bool


@register(Detector)
Expand Down Expand Up @@ -217,4 +218,8 @@ def serialize(
"ruleId": alert_rule_mapping.get("rule_id"),
"latestGroup": attrs.get("latest_group"),
"openIssues": attrs.get("open_issues_count", 0),
"hasDataSourceError": any(
ds.get("health", {}).get("isHealthy") is False
for ds in (attrs.get("data_sources") or [])
),
}
14 changes: 14 additions & 0 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ def execute(invocation: ActionInvocation) -> None:
raise NotImplementedError


@dataclass(frozen=True)
class DataSourceHealth:
is_healthy: bool
message: str | None = None


class DataSourceTypeHandler(ABC, Generic[T]):
@staticmethod
@abstractmethod
Expand Down Expand Up @@ -364,6 +370,14 @@ def get_relocation_model_name() -> str:
"""
raise NotImplementedError

@staticmethod
def bulk_get_health(data_sources: list[DataSource]) -> dict[int, DataSourceHealth]:
"""
Returns health status for each DataSource. Implementations should override
this to report source-specific problems (e.g., broken subscriptions).
"""
return {ds.id: DataSourceHealth(is_healthy=True) for ds in data_sources}


class DataConditionHandler(Generic[T]):
class Group(StrEnum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def generate_evidence_data(
"extrapolation_mode": "unknown",
},
},
"health": {
"is_healthy": True,
"message": None,
},
}
],
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ def test_get_monitor_incident_detector_details(self) -> None:
"queryObj": serialize(
self.monitor, user=self.user, serializer=MonitorSerializer()
),
"health": {
"isHealthy": True,
"message": None,
},
}
],
"conditionGroup": None,
Expand All @@ -89,6 +93,7 @@ def test_get_monitor_incident_detector_details(self) -> None:
"ruleId": None,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
}

def test_update_monitor_incident_detector(self) -> None:
Expand Down Expand Up @@ -127,6 +132,10 @@ def test_update_monitor_incident_detector(self) -> None:
"type": DATA_SOURCE_CRON_MONITOR,
"sourceId": str(self.monitor.id),
"queryObj": serialize(self.monitor, user=self.user, serializer=MonitorSerializer()),
"health": {
"isHealthy": True,
"message": None,
},
}
]
assert response.data["dataSources"] == expected_data_sources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def test_list_monitor_incident_detectors(self) -> None:
"queryObj": serialize(
self.monitor, user=self.user, serializer=MonitorSerializer()
),
"health": {
"isHealthy": True,
"message": None,
},
}
],
"conditionGroup": detector_data["conditionGroup"],
Expand All @@ -79,6 +83,7 @@ def test_list_monitor_incident_detectors(self) -> None:
"ruleId": None,
"latestGroup": None,
"openIssues": 0,
"hasDataSourceError": False,
}


Expand Down
20 changes: 20 additions & 0 deletions tests/sentry/snuba/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from sentry.snuba.subscriptions import create_snuba_query, create_snuba_subscription
from sentry.testutils.cases import TestCase
from sentry.workflow_engine.types import DataSourceHealth


class SnubaQueryEventTypesTest(TestCase):
Expand Down Expand Up @@ -139,3 +140,22 @@ def test_get_current_instance_count(self) -> None:

# Count should still be 3 as it only counts for the given org
assert QuerySubscriptionDataSourceHandler.get_current_instance_count(new_org) == 3

def test_bulk_get_health_active(self) -> None:
result = QuerySubscriptionDataSourceHandler.bulk_get_health([self.data_source])
assert result[self.data_source.id] == DataSourceHealth(is_healthy=True)

def test_bulk_get_health_broken(self) -> None:
self.subscription.update(status=QuerySubscription.Status.BROKEN.value)
result = QuerySubscriptionDataSourceHandler.bulk_get_health([self.data_source])
assert result[self.data_source.id] == DataSourceHealth(is_healthy=False)

def test_bulk_get_health_missing_subscription(self) -> None:
ds_missing = self.create_data_source(
type="snuba_query_subscription",
source_id="999999",
)
result = QuerySubscriptionDataSourceHandler.bulk_get_health([ds_missing])
assert result[ds_missing.id] == DataSourceHealth(
is_healthy=False, message="Subscription not found"
)
43 changes: 43 additions & 0 deletions tests/sentry/snuba/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from sentry.snuba.metrics.naming_layer.mri import SessionMRI
from sentry.snuba.models import QuerySubscription, SnubaQuery, SnubaQueryEventType
from sentry.snuba.tasks import (
SUBSCRIPTION_BROKEN_MAX_AGE,
SUBSCRIPTION_STATUS_MAX_AGE,
SubscriptionError,
create_subscription_in_snuba,
Expand Down Expand Up @@ -1295,3 +1296,45 @@ def test_create_update(self) -> None:
sub_new = QuerySubscription.objects.get(id=sub_new.id)
assert sub_new.status == status.value
assert sub_new.subscription_id is None

def test_marks_creating_as_broken(self) -> None:
sub = self.create_subscription(
QuerySubscription.Status.CREATING,
date_updated=timezone.now() - SUBSCRIPTION_BROKEN_MAX_AGE * 2,
)
subscription_checker()
sub = QuerySubscription.objects.get(id=sub.id)
assert sub.status == QuerySubscription.Status.BROKEN.value

def test_marks_updating_as_broken(self) -> None:
sub = self.create_subscription(
QuerySubscription.Status.UPDATING,
date_updated=timezone.now() - SUBSCRIPTION_BROKEN_MAX_AGE * 2,
)
subscription_checker()
sub = QuerySubscription.objects.get(id=sub.id)
assert sub.status == QuerySubscription.Status.BROKEN.value

def test_does_not_mark_deleting_as_broken(self) -> None:
sub = self.create_subscription(
QuerySubscription.Status.DELETING,
date_updated=timezone.now() - SUBSCRIPTION_BROKEN_MAX_AGE * 2,
)
with self.tasks():
subscription_checker()
sub_refreshed = QuerySubscription.objects.filter(id=sub.id).first()
# DELETING subscriptions get re-triggered, not marked broken
assert (
sub_refreshed is None or sub_refreshed.status != QuerySubscription.Status.BROKEN.value
)

def test_does_not_mark_broken_before_threshold(self) -> None:
sub = self.create_subscription(
QuerySubscription.Status.CREATING,
date_updated=timezone.now() - SUBSCRIPTION_STATUS_MAX_AGE * 2,
)
with self.tasks():
subscription_checker()
sub = QuerySubscription.objects.get(id=sub.id)
# Should be repaired (retried), not marked broken
assert sub.status == QuerySubscription.Status.ACTIVE.value
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,8 @@ def test_serialize(self) -> None:
"status": 1,
"subscription": None,
},
"health": {
"isHealthy": True,
"message": None,
},
}
Loading
Loading