From 83450a9da4682715ba44bf38786827c5e0a31358 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 12 May 2026 00:59:33 -0700 Subject: [PATCH 01/10] perf(api): denormalize SourceImageCollection counts as cached columns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The collection list endpoint previously ran 3 correlated count subqueries per row (source_images_count, source_images_with_detections_count, source_images_processed_count). On large collections each subquery scans the M2M and Detection tables. Reads are now O(1) against cached columns. - Add 3 IntegerField columns on SourceImageCollection with default=0 - Migration 0085 backfills via a single GROUP BY over the M2M with FILTER clauses; atomic=False so prod-sized M2M can chunk - update_calculated_fields() recomputes all 3 counts in one aggregate query - m2m_changed on images.through keeps counts fresh on add/remove (both directions; reverse post_clear documented as caller responsibility) - Detection post_save/post_delete recompute for collections containing the affected source image - pipeline.save_results() recomputes for affected collections after the bulk_create batch (which skips signals) - Drop the 3 with_*_count() annotation methods on SourceImageCollectionQuerySet and the chain in SourceImageCollectionViewSet.queryset - TestSourceImageCollectionCountsDenormalize covers initial state, add/remove, Detection create/delete, null-bbox processed-but-not-with-detections case, and update_calculated_fields() recovery from drift - TestSourceImageCollectionListQueryCount tightened: with_counts now <=10q (was <=15), ordering by source_images_count uses column directly Follow-up to PR #1300 — the paginator COUNT(*) win there is mostly invisible since the UI does not paginate collections; this is the actual UX-visible improvement. Co-Authored-By: Claude --- ami/main/api/views.py | 8 +- ...enormalize_sourceimagecollection_counts.py | 89 ++++++++ ami/main/models.py | 68 +++--- ami/main/signals.py | 30 ++- ami/main/tests.py | 195 ++++++++++++++++++ ami/ml/models/pipeline.py | 5 + 6 files changed, 347 insertions(+), 48 deletions(-) create mode 100644 ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py diff --git a/ami/main/api/views.py b/ami/main/api/views.py index c4ca76da8..84981fb42 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -712,13 +712,7 @@ class SourceImageCollectionViewSet(DefaultViewSet, ProjectMixin): Endpoint for viewing capture sets or samples of captures. """ - queryset = ( - SourceImageCollection.objects.all() - .with_source_images_count() # type: ignore - .with_source_images_with_detections_count() - .with_source_images_processed_count() - .prefetch_related("jobs") - ) + queryset = SourceImageCollection.objects.all().prefetch_related("jobs") serializer_class = SourceImageCollectionSerializer permission_classes = [ ObjectPermission, diff --git a/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py b/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py new file mode 100644 index 000000000..05fa2e49c --- /dev/null +++ b/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py @@ -0,0 +1,89 @@ +""" +Denormalize three counts onto ``SourceImageCollection`` so the list endpoint +reads them in O(1) instead of running 3 correlated count subqueries per row. + +Backfill uses a single GROUP BY over the M2M with FILTER clauses to compute +all three counts in one pass. ``with_det`` checks for a valid (non-null / +non-empty) detection bbox to match the runtime +``NULL_DETECTIONS_FILTER`` semantics in ``ami/main/models.py``. + +``atomic = False`` so the long UPDATE can run outside a single transaction +on production-sized M2M tables. +""" + +from django.db import migrations, models + + +def backfill_counts(apps, schema_editor): + schema_editor.execute( + """ + UPDATE main_sourceimagecollection sc + SET source_images_count = c.total, + source_images_processed_count = c.processed, + source_images_with_detections_count = c.with_det + FROM ( + SELECT msci.sourceimagecollection_id AS coll_id, + COUNT(*) AS total, + COUNT(*) FILTER ( + WHERE EXISTS ( + SELECT 1 FROM main_detection d + WHERE d.source_image_id = si.id + ) + ) AS processed, + COUNT(*) FILTER ( + WHERE EXISTS ( + SELECT 1 FROM main_detection d + WHERE d.source_image_id = si.id + AND d.bbox IS NOT NULL + AND d.bbox::text <> '[]' + ) + ) AS with_det + FROM main_sourceimagecollection_images msci + INNER JOIN main_sourceimage si ON si.id = msci.sourceimage_id + GROUP BY msci.sourceimagecollection_id + ) c + WHERE sc.id = c.coll_id; + """ + ) + # Collections with no images: paginated SELECTs returned 0 via Coalesce; keep + # them populated rather than NULL so the column reads stay consistent. + schema_editor.execute( + """ + UPDATE main_sourceimagecollection + SET source_images_count = 0, + source_images_processed_count = 0, + source_images_with_detections_count = 0 + WHERE source_images_count IS NULL; + """ + ) + + +def reverse_noop(apps, schema_editor): + pass + + +class Migration(migrations.Migration): + atomic = False + + dependencies = [ + ("main", "0084_revoke_delete_job_from_roles"), + ] + + operations = [ + migrations.AddField( + model_name="sourceimagecollection", + name="source_images_count", + field=models.IntegerField(default=0), + ), + migrations.AddField( + model_name="sourceimagecollection", + name="source_images_with_detections_count", + field=models.IntegerField(default=0), + ), + migrations.AddField( + model_name="sourceimagecollection", + name="source_images_processed_count", + field=models.IntegerField(default=0), + ), + migrations.RunPython(backfill_counts, reverse_noop), + ] diff --git a/ami/main/models.py b/ami/main/models.py index b30b4e645..b86880f60 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -4093,32 +4093,6 @@ def html(self) -> str: class SourceImageCollectionQuerySet(BaseQuerySet): - def with_source_images_count(self): - return self.annotate( - source_images_count=models.Count( - "images", - distinct=True, - ) - ) - - def with_source_images_with_detections_count(self): - return self.annotate( - source_images_with_detections_count=models.Count( - "images", - filter=(~models.Q(images__detections__bbox__isnull=True) & ~models.Q(images__detections__bbox=[])), - distinct=True, - ) - ) - - def with_source_images_processed_count(self): - return self.annotate( - source_images_processed_count=models.Count( - "images", - filter=models.Q(images__detections__isnull=False), - distinct=True, - ) - ) - def with_source_images_processed_by_algorithm_count(self, algorithm_id: int): return self.annotate( source_images_processed_by_algorithm_count=models.Count( @@ -4205,6 +4179,12 @@ class SourceImageCollection(BaseModel): default=dict, ) + # Denormalized counts. Kept in sync via m2m_changed and pipeline-completion + # hooks. Reads are O(1). + source_images_count = models.IntegerField(default=0) + source_images_with_detections_count = models.IntegerField(default=0) + source_images_processed_count = models.IntegerField(default=0) + objects = SourceImageCollectionManager() jobs: models.QuerySet["Job"] @@ -4219,19 +4199,6 @@ def infer_dataset_type(self): def dataset_type(self): return self.infer_dataset_type() - def source_images_count(self) -> int | None: - # This should always be pre-populated using queryset annotations - # return self.images.count() - return None - - def source_images_with_detections_count(self) -> int | None: - # This should always be pre-populated using queryset annotations - return None - - def source_images_processed_count(self) -> int | None: - # This should always be pre-populated using queryset annotations - return None - def occurrences_count(self) -> int | None: # This should always be pre-populated using queryset annotations return None @@ -4240,6 +4207,29 @@ def taxa_count(self) -> int | None: # This should always be pre-populated using queryset annotations return None + def get_source_image_counts(self) -> dict[str, int]: + """Compute the 3 source-image counts in a single query. No writes — testable.""" + valid_det = Detection.objects.filter(source_image=models.OuterRef("pk")).exclude(NULL_DETECTIONS_FILTER) + any_det = Detection.objects.filter(source_image=models.OuterRef("pk")) + counts = self.images.annotate( + _has_any_det=Exists(any_det), + _has_valid_det=Exists(valid_det), + ).aggregate( + source_images_count=models.Count("id"), + source_images_processed_count=models.Count("id", filter=models.Q(_has_any_det=True)), + source_images_with_detections_count=models.Count("id", filter=models.Q(_has_valid_det=True)), + ) + return counts + + def update_calculated_fields(self, save: bool = False) -> None: + """Recompute the 3 denormalized source-image count columns.""" + counts = self.get_source_image_counts() + self.source_images_count = counts["source_images_count"] + self.source_images_processed_count = counts["source_images_processed_count"] + self.source_images_with_detections_count = counts["source_images_with_detections_count"] + if save: + SourceImageCollection.objects.filter(pk=self.pk).update(**counts) + def get_queryset( self, *args, diff --git a/ami/main/signals.py b/ami/main/signals.py index e36e41937..718ff5650 100644 --- a/ami/main/signals.py +++ b/ami/main/signals.py @@ -2,11 +2,11 @@ from django.contrib.auth.models import Group from django.db import transaction -from django.db.models.signals import m2m_changed, post_save, pre_delete, pre_save +from django.db.models.signals import m2m_changed, post_delete, post_save, pre_delete, pre_save from django.dispatch import receiver from guardian.shortcuts import assign_perm -from ami.main.models import Project +from ami.main.models import Detection, Project, SourceImageCollection from ami.main.tasks import refresh_project_cached_counts from ami.users.roles import BasicMember, ProjectManager, create_roles_for_project @@ -197,3 +197,29 @@ def exclude_taxa_updated(sender, instance: Project, action, **kwargs): if action in ["post_add", "post_remove", "post_clear"]: logger.info(f"Exclude taxa updated for project {instance.pk} (action={action})") refresh_cached_counts_for_project(instance) + + +# ============================================================================ +# SourceImageCollection Denormalized Counts +# ============================================================================ + + +@receiver(m2m_changed, sender=SourceImageCollection.images.through) +def update_collection_counts_on_m2m(sender, instance, action, **kwargs): + """Recompute denormalized counts when images are added to or removed from a collection.""" + if action in ("post_add", "post_remove", "post_clear"): + instance.update_calculated_fields(save=True) + + +@receiver(post_save, sender=Detection) +@receiver(post_delete, sender=Detection) +def update_collection_counts_on_detection_change(sender, instance, **kwargs): + """Keep processed / with-detections counts fresh on per-row Detection writes. + + `bulk_create` skips signals, so ML pipelines must call `update_calculated_fields` + explicitly after their batch writes (see `ami.ml.models.pipeline.save_results`). + """ + if not instance.source_image_id: + return + for collection in SourceImageCollection.objects.filter(images__id=instance.source_image_id).distinct(): + collection.update_calculated_fields(save=True) diff --git a/ami/main/tests.py b/ami/main/tests.py index c31d56faa..71017988f 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -3275,6 +3275,201 @@ def test_list_query_count_does_not_scale_with_page_size(self): self.assertLessEqual(large, small + 5, f"Taxon list scaling: {small} -> {large} (likely N+1)") +@override_settings(CACHALOT_ENABLED=False) +class TestSourceImageCollectionListQueryCount(APITestCase): + """Audit SourceImageCollectionViewSet.list query counts. + + The three source-image counts are denormalized as columns on + SourceImageCollection (see migration 0085). The viewset no longer needs + per-row count subqueries, so list, with_counts, and ordering paths all run + against the column directly. + + Cachalot disabled so we measure cold query count, not warm cache. + """ + + def setUp(self): + self.project, self.deployment = setup_test_project() + create_taxa(self.project) + create_captures(deployment=self.deployment, num_nights=1, images_per_night=25) + create_occurrences(deployment=self.deployment, num=25, determination_score=0.9) + + images = list(SourceImage.objects.filter(deployment=self.deployment)) + # 30 collections so `limit=25` exercises a real page boundary; per-row + # subquery scaling regressions only show up once the page has >1 row. + for i in range(30): + c = SourceImageCollection.objects.create( + name=f"qcount-collection-{i}", + project=self.project, + method="manual", + kwargs={"image_ids": [img.pk for img in images]}, + ) + c.images.set(images) + + self.project.default_filters_score_threshold = 0.0 + self.project.save() + + self.user = User.objects.create_user( + email="qcount-collection@insectai.org", is_staff=False, is_superuser=False + ) + self.client.force_authenticate(user=self.user) + + def _list_query_count(self, url: str) -> int: + from django.core.cache import caches + from django.test.utils import CaptureQueriesContext + + caches["default"].clear() + with CaptureQueriesContext(connection) as ctx: + res = self.client.get(url) + self.assertEqual(res.status_code, status.HTTP_200_OK, res.content) + return len(ctx.captured_queries) + + def test_list_query_count_does_not_scale_with_page_size(self): + small = self._list_query_count(f"/api/v2/captures/collections/?project_id={self.project.pk}&limit=1") + large = self._list_query_count(f"/api/v2/captures/collections/?project_id={self.project.pk}&limit=25") + print(f"\n[AUDIT] Collection list: limit=1 -> {small}q, limit=25 -> {large}q") + self.assertLessEqual(large, small + 2, f"Collection list scaling: {small} -> {large} (likely N+1)") + + def test_list_query_count_with_counts(self): + url = f"/api/v2/captures/collections/?project_id={self.project.pk}&with_counts=true&limit=25" + # warmups equalise pool state / auth + self.client.get(url) + self.client.get(url) + count = self._list_query_count(url) + print(f"\n[AUDIT] Collection list with_counts=true limit=25 -> {count}q") + # 3 source-image counts now read from columns; with_counts only adds the + # occurrences/taxa subquery annotations. + self.assertLessEqual(count, 10, f"Collection list with_counts too many queries: {count}") + + def test_list_query_count_ordering_by_annotated_count(self): + url = f"/api/v2/captures/collections/?project_id={self.project.pk}" f"&limit=25&ordering=-source_images_count" + self.client.get(url) + self.client.get(url) + count = self._list_query_count(url) + print(f"\n[AUDIT] Collection list ordered by source_images_count limit=25 -> {count}q") + # Sort uses the cached column directly — no extra subquery. + self.assertLessEqual(count, 10, f"Collection list ordered by count too many queries: {count}") + + +class TestSourceImageCollectionCountsDenormalize(TestCase): + """Verify denormalized count columns stay in sync via signals and bulk hooks.""" + + def setUp(self): + self.project, self.deployment = setup_test_project() + create_taxa(self.project) + create_captures(deployment=self.deployment, num_nights=1, images_per_night=5) + self.images = list(SourceImage.objects.filter(deployment=self.deployment)) + self.collection = SourceImageCollection.objects.create( + name="denorm-test", + project=self.project, + method="manual", + kwargs={"image_ids": [img.pk for img in self.images]}, + ) + + def _refresh(self): + self.collection.refresh_from_db() + + def test_initial_counts_zero(self): + self._refresh() + self.assertEqual(self.collection.source_images_count, 0) + self.assertEqual(self.collection.source_images_processed_count, 0) + self.assertEqual(self.collection.source_images_with_detections_count, 0) + + def test_count_updates_on_image_add(self): + self.collection.images.set(self.images) + self._refresh() + self.assertEqual(self.collection.source_images_count, len(self.images)) + + def test_count_decrements_on_image_remove(self): + self.collection.images.set(self.images) + self.collection.images.remove(self.images[0]) + self._refresh() + self.assertEqual(self.collection.source_images_count, len(self.images) - 1) + + def test_with_detections_count_updates_on_detection_create(self): + self.collection.images.set(self.images) + Detection.objects.create( + source_image=self.images[0], + timestamp=self.images[0].timestamp, + bbox=[0.1, 0.1, 0.2, 0.2], + path="detections/d1.jpg", + ) + self._refresh() + self.assertEqual(self.collection.source_images_with_detections_count, 1) + self.assertEqual(self.collection.source_images_processed_count, 1) + + def test_with_detections_count_decrements_on_detection_delete(self): + self.collection.images.set(self.images) + det = Detection.objects.create( + source_image=self.images[0], + timestamp=self.images[0].timestamp, + bbox=[0.1, 0.1, 0.2, 0.2], + path="detections/d1.jpg", + ) + det.delete() + self._refresh() + self.assertEqual(self.collection.source_images_with_detections_count, 0) + self.assertEqual(self.collection.source_images_processed_count, 0) + + def test_null_bbox_detection_processed_but_no_with_detections(self): + """A null-bbox detection marks the image as processed but not 'with detections'.""" + self.collection.images.set(self.images) + Detection.objects.create( + source_image=self.images[0], + timestamp=self.images[0].timestamp, + bbox=None, + path="detections/null.jpg", + ) + self._refresh() + self.assertEqual(self.collection.source_images_processed_count, 1) + self.assertEqual(self.collection.source_images_with_detections_count, 0) + + def test_update_calculated_fields_recomputes_from_scratch(self): + self.collection.images.set(self.images) + Detection.objects.create( + source_image=self.images[0], + timestamp=self.images[0].timestamp, + bbox=[0.1, 0.1, 0.2, 0.2], + path="detections/d1.jpg", + ) + SourceImageCollection.objects.filter(pk=self.collection.pk).update( + source_images_count=999, + source_images_processed_count=999, + source_images_with_detections_count=999, + ) + self.collection.refresh_from_db() + self.collection.update_calculated_fields(save=True) + self._refresh() + self.assertEqual(self.collection.source_images_count, len(self.images)) + self.assertEqual(self.collection.source_images_with_detections_count, 1) + + def test_get_source_image_counts_returns_dict_without_writes(self): + self.collection.images.set(self.images) + Detection.objects.create( + source_image=self.images[0], + timestamp=self.images[0].timestamp, + bbox=[0.1, 0.1, 0.2, 0.2], + path="detections/d1.jpg", + ) + SourceImageCollection.objects.filter(pk=self.collection.pk).update( + source_images_count=0, + source_images_processed_count=0, + source_images_with_detections_count=0, + ) + self.collection.refresh_from_db() + counts = self.collection.get_source_image_counts() + self.assertEqual( + counts, + { + "source_images_count": len(self.images), + "source_images_processed_count": 1, + "source_images_with_detections_count": 1, + }, + ) + # No side effects — DB row unchanged. + self.collection.refresh_from_db() + self.assertEqual(self.collection.source_images_count, 0) + + class TestProjectDefaultTaxaFilter(APITestCase): """ Tests for project default taxa filtering (include/exclude lists). diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index c259e4aea..0c6c1dbb3 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -999,6 +999,11 @@ def save_results( for deployment in Deployment.objects.filter(pk__in=deployment_ids): deployment.update_calculated_fields(save=True) + # bulk_create above skips Detection signals; refresh affected collections explicitly. + source_image_ids = [img.pk for img in source_images] + for collection in SourceImageCollection.objects.filter(images__id__in=source_image_ids).distinct(): + collection.update_calculated_fields(save=True) + total_time = time.time() - start_time job_logger.info(f"Saved results from pipeline {pipeline} in {total_time:.2f} seconds") From 457e090e18f066018576f39864b3b3f4d30935d0 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 13 May 2026 17:08:51 -0700 Subject: [PATCH 02/10] refactor(models): introduce CachedCountField marker, apply to all denormalized count columns Add `CachedCountField(IntegerField)` to `ami/base/models.py` as a marker subclass for cached aggregate columns. Swap it in for the 12 existing denormalized count fields across Deployment, Event, SourceImage, and SourceImageCollection. Discoverable via `Model._meta.get_fields()` + `isinstance(f, CachedCountField)` for future use (refresh tasks, admin display, list-endpoint defer()). The DB column type is unchanged (IntegerField -> IntegerField), so the generated migration `0086_use_cached_count_field` is a pure no-op at the SQL level; it only updates Django's recorded model state. Co-Authored-By: Claude --- ami/base/models.py | 12 +++ .../migrations/0086_use_cached_count_field.py | 73 +++++++++++++++++++ ami/main/models.py | 26 +++---- 3 files changed, 98 insertions(+), 13 deletions(-) create mode 100644 ami/main/migrations/0086_use_cached_count_field.py diff --git a/ami/base/models.py b/ami/base/models.py index 2f245b745..8f0dda76e 100644 --- a/ami/base/models.py +++ b/ami/base/models.py @@ -7,6 +7,18 @@ from ami.users.models import User +class CachedCountField(models.IntegerField): + """Denormalized count of related rows. + + Marker subclass so cached aggregate columns can be discovered via + ``Model._meta.get_fields()`` (e.g. for refresh tasks, admin display, or + list-endpoint defer()). Values may be stale or null between + ``update_calculated_fields`` calls — readers should not assume freshness. + """ + + description = "Cached count of related rows" + + def has_one_to_many_project_relation(model: type[models.Model]) -> bool: """ Returns True if the model has any ForeignKey or OneToOneField relationship to Project. diff --git a/ami/main/migrations/0086_use_cached_count_field.py b/ami/main/migrations/0086_use_cached_count_field.py new file mode 100644 index 000000000..c0afe21e7 --- /dev/null +++ b/ami/main/migrations/0086_use_cached_count_field.py @@ -0,0 +1,73 @@ +# Generated by Django 4.2.10 on 2026-05-13 20:05 + +import ami.base.models +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0085_denormalize_sourceimagecollection_counts"), + ] + + operations = [ + migrations.AlterField( + model_name="deployment", + name="captures_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="deployment", + name="detections_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="deployment", + name="events_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="deployment", + name="occurrences_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="deployment", + name="taxa_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="event", + name="captures_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="event", + name="detections_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="event", + name="occurrences_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="sourceimage", + name="detections_count", + field=ami.base.models.CachedCountField(blank=True, null=True), + ), + migrations.AlterField( + model_name="sourceimagecollection", + name="source_images_count", + field=ami.base.models.CachedCountField(default=0), + ), + migrations.AlterField( + model_name="sourceimagecollection", + name="source_images_processed_count", + field=ami.base.models.CachedCountField(default=0), + ), + migrations.AlterField( + model_name="sourceimagecollection", + name="source_images_with_detections_count", + field=ami.base.models.CachedCountField(default=0), + ), + ] diff --git a/ami/main/models.py b/ami/main/models.py index b86880f60..ac212a526 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -32,7 +32,7 @@ import ami.tasks import ami.utils from ami.base.fields import DateStringField -from ami.base.models import BaseModel, BaseQuerySet +from ami.base.models import BaseModel, BaseQuerySet, CachedCountField from ami.main import charts from ami.main.models_future.filters import ( build_occurrence_default_filters_q, @@ -754,11 +754,11 @@ class Deployment(BaseModel): # data_source_last_check_notes = models.TextField(max_length=255, blank=True, null=True) # Pre-calculated values - events_count = models.IntegerField(blank=True, null=True) - occurrences_count = models.IntegerField(blank=True, null=True) - captures_count = models.IntegerField(blank=True, null=True) - detections_count = models.IntegerField(blank=True, null=True) - taxa_count = models.IntegerField(blank=True, null=True) + events_count = CachedCountField(blank=True, null=True) + occurrences_count = CachedCountField(blank=True, null=True) + captures_count = CachedCountField(blank=True, null=True) + detections_count = CachedCountField(blank=True, null=True) + taxa_count = CachedCountField(blank=True, null=True) first_capture_timestamp = models.DateTimeField(blank=True, null=True) last_capture_timestamp = models.DateTimeField(blank=True, null=True) @@ -1155,9 +1155,9 @@ class Event(BaseModel): occurrences: models.QuerySet["Occurrence"] # Pre-calculated values - captures_count = models.IntegerField(blank=True, null=True) - detections_count = models.IntegerField(blank=True, null=True) - occurrences_count = models.IntegerField(blank=True, null=True) + captures_count = CachedCountField(blank=True, null=True) + detections_count = CachedCountField(blank=True, null=True) + occurrences_count = CachedCountField(blank=True, null=True) calculated_fields_updated_at = models.DateTimeField(blank=True, null=True) class Meta: @@ -1942,7 +1942,7 @@ class SourceImage(BaseModel): test_image = models.BooleanField(default=False) # Precaclulated values - detections_count = models.IntegerField(null=True, blank=True) + detections_count = CachedCountField(null=True, blank=True) project = models.ForeignKey(Project, on_delete=models.SET_NULL, null=True, related_name="captures") deployment = models.ForeignKey(Deployment, on_delete=models.SET_NULL, null=True, related_name="captures") @@ -4181,9 +4181,9 @@ class SourceImageCollection(BaseModel): # Denormalized counts. Kept in sync via m2m_changed and pipeline-completion # hooks. Reads are O(1). - source_images_count = models.IntegerField(default=0) - source_images_with_detections_count = models.IntegerField(default=0) - source_images_processed_count = models.IntegerField(default=0) + source_images_count = CachedCountField(default=0) + source_images_with_detections_count = CachedCountField(default=0) + source_images_processed_count = CachedCountField(default=0) objects = SourceImageCollectionManager() From 272559910ae3f3031ec69ee5b8eceb8b50c19d04 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 13 May 2026 17:09:05 -0700 Subject: [PATCH 03/10] chore: drop tautological count-default test, clarify reactive comments - Remove `test_initial_counts_zero` from `TestSourceImageCollectionCountsDenormalize`; it asserted the `default=0` field-level default rather than any code path we own. - Rewrite a handful of em-dash fragment comments added during PR review rounds (`get_source_image_counts` docstring, captures-list query-count assertion message, collection-list ordering comment, denormalize-test side-effect note) in normal prose. Co-Authored-By: Claude --- ami/main/models.py | 2 +- ami/main/tests.py | 15 ++++----------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/ami/main/models.py b/ami/main/models.py index ac212a526..64a19b9c6 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -4208,7 +4208,7 @@ def taxa_count(self) -> int | None: return None def get_source_image_counts(self) -> dict[str, int]: - """Compute the 3 source-image counts in a single query. No writes — testable.""" + """Return the 3 source-image counts as a dict. Single aggregate query; does not write to the DB.""" valid_det = Detection.objects.filter(source_image=models.OuterRef("pk")).exclude(NULL_DETECTIONS_FILTER) any_det = Detection.objects.filter(source_image=models.OuterRef("pk")) counts = self.images.annotate( diff --git a/ami/main/tests.py b/ami/main/tests.py index 71017988f..e622e7835 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -3226,12 +3226,11 @@ def test_list_response_shape_has_no_lazy_loads(self): for key in ("id", "url", "size_display", "deployment", "event", "detections_count", "path"): self.assertIn(key, row, f"missing field {key!r} in list response") self.assertIsNotNone(row["deployment"]["name"]) - # No lazy-load queries should fire after the main list SELECT. - # 1 list select + 1 detection prefetch (no, not in this call) + savepoints. + # After the main list SELECT, only prefetch + savepoint queries should fire; no per-row lazy loads. self.assertLessEqual( len(ctx.captured_queries), 6, - f"Unexpected extra queries — likely lazy-load from deferred field: {len(ctx.captured_queries)}", + f"Unexpected extra queries (likely lazy-load from a deferred field): {len(ctx.captured_queries)}", ) @@ -3346,7 +3345,7 @@ def test_list_query_count_ordering_by_annotated_count(self): self.client.get(url) count = self._list_query_count(url) print(f"\n[AUDIT] Collection list ordered by source_images_count limit=25 -> {count}q") - # Sort uses the cached column directly — no extra subquery. + # Sort uses the cached column directly, so no extra subquery is added. self.assertLessEqual(count, 10, f"Collection list ordered by count too many queries: {count}") @@ -3368,12 +3367,6 @@ def setUp(self): def _refresh(self): self.collection.refresh_from_db() - def test_initial_counts_zero(self): - self._refresh() - self.assertEqual(self.collection.source_images_count, 0) - self.assertEqual(self.collection.source_images_processed_count, 0) - self.assertEqual(self.collection.source_images_with_detections_count, 0) - def test_count_updates_on_image_add(self): self.collection.images.set(self.images) self._refresh() @@ -3465,7 +3458,7 @@ def test_get_source_image_counts_returns_dict_without_writes(self): "source_images_with_detections_count": 1, }, ) - # No side effects — DB row unchanged. + # Confirm the DB row was not updated. self.collection.refresh_from_db() self.assertEqual(self.collection.source_images_count, 0) From 1a242aa4d5ac9be2ae290918745850c9ac947c75 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 13 May 2026 18:50:04 -0700 Subject: [PATCH 04/10] refactor(counts): async signals + cached_counts integrity check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address opus takeaway-review findings on the SourceImageCollection denormalize work. Signal handlers no longer block on the recompute: - Detection post_save/post_delete dedupes affected source_image_ids on a per-connection set and fans out one task per affected collection via transaction.on_commit. A 10k-detection bulk_create-loop (where signals fire) now triggers at most one task per affected collection instead of 10k synchronous aggregates per detection. - m2m_changed on SourceImageCollection.images defers to a Celery task via transaction.on_commit. New ami.main.checks.cached_counts module catches drift from bulk write paths that skip signals (bulk_create, bulk_update, raw SQL, ML post- processors like small_size_filter). Generic across every model that declares a CachedCountField column — discovered via Model._meta.get_fields(). Layered defense: signals = best-effort sync, periodic reconcile_cached_counts_task = safety net. This also gives CachedCountField its first consumer, addressing the "marker class with no consumer" critique. Other tweaks from the same review pass: - CachedCountField docstring documents migration-friction tradeoff (future AlterField for unrelated IntegerFields near cached ones). - SourceImageCollection.update_calculated_fields docstring explains the .filter(pk=).update() vs .save() divergence on the other 3 models. - TestSourceImageCollectionCountsDenormalize → TransactionTestCase + eager Celery so on_commit hooks actually fire in tests. - TestCachedCountsIntegrityCheck (5 tests) covers discover / find / reconcile dry-run / reconcile repair / no-drift baseline. - Detection bbox fixtures: [0.1...] → [10, 10, 20, 20] for consistency with codebase pixel-space convention (CodeRabbit nit). Co-Authored-By: Claude --- ami/base/models.py | 10 ++- ami/main/checks/cached_counts.py | 105 +++++++++++++++++++++++++++++++ ami/main/models.py | 12 +++- ami/main/signals.py | 59 ++++++++++++++--- ami/main/tasks.py | 40 ++++++++++++ ami/main/tests.py | 101 ++++++++++++++++++++++++++--- 6 files changed, 308 insertions(+), 19 deletions(-) create mode 100644 ami/main/checks/cached_counts.py diff --git a/ami/base/models.py b/ami/base/models.py index 8f0dda76e..8ce1d61b0 100644 --- a/ami/base/models.py +++ b/ami/base/models.py @@ -11,9 +11,13 @@ class CachedCountField(models.IntegerField): """Denormalized count of related rows. Marker subclass so cached aggregate columns can be discovered via - ``Model._meta.get_fields()`` (e.g. for refresh tasks, admin display, or - list-endpoint defer()). Values may be stale or null between - ``update_calculated_fields`` calls — readers should not assume freshness. + ``Model._meta.get_fields()`` (see ``ami.main.checks.cached_counts`` + for the periodic drift-reconciliation check). Column type is unchanged + from ``IntegerField`` — the AlterField migrations that introduce this + subclass are no-op SQL. Mixing ``CachedCountField`` and plain + ``IntegerField`` on the same model is fine, but a future contributor + adding a non-cached IntegerField next to a cached one will see an + AlterField in their migration; that's expected, not a bug. """ description = "Cached count of related rows" diff --git a/ami/main/checks/cached_counts.py b/ami/main/checks/cached_counts.py new file mode 100644 index 000000000..d2261e416 --- /dev/null +++ b/ami/main/checks/cached_counts.py @@ -0,0 +1,105 @@ +"""Reconcile drift on ``CachedCountField`` columns. + +Cached count columns (e.g. ``Deployment.captures_count``, +``SourceImageCollection.source_images_count``) are kept current via signals +and explicit ``update_calculated_fields`` calls. Bulk write paths that skip +signals — ``bulk_create``, ``bulk_update``, raw SQL, some ML post-processors +— silently drift the stored value out of sync with the underlying rows. + +This check discovers every model that declares one or more +``CachedCountField`` columns, finds rows whose stored values disagree with +a fresh recompute via ``instance.update_calculated_fields(save=False)``, +and either reports or repairs them. + +Run via ``manage.py check_data_integrity`` (when PR #1188 lands) or via +the ``reconcile_cached_counts`` Celery task. +""" + +from __future__ import annotations + +import logging +from collections.abc import Iterator + +from django.apps import apps + +from ami.base.models import BaseModel, CachedCountField +from ami.main.checks.schemas import IntegrityCheckResult + +logger = logging.getLogger(__name__) + + +def _cached_count_field_names(model: type[BaseModel]) -> list[str]: + fields = model._meta.get_fields() # type: ignore[attr-defined] + return [f.name for f in fields if isinstance(f, CachedCountField) and f.name] + + +def discover_cached_count_fields() -> dict[type[BaseModel], list[str]]: + """Return models that declare one or more ``CachedCountField`` columns.""" + result: dict[type[BaseModel], list[str]] = {} + for model in apps.get_models(): + if not issubclass(model, BaseModel): + continue + cached = _cached_count_field_names(model) + if cached: + result[model] = cached + return result + + +def _scope_to_project(qs, model: type[BaseModel], project_id: int | None): + if project_id is None: + return qs + project_accessor = model.get_project_accessor() + if project_accessor and project_accessor != "projects": + return qs.filter(**{f"{project_accessor}_id": project_id}) + return qs + + +def find_stale_cached_counts( + model: type[BaseModel], + project_id: int | None = None, +) -> Iterator[tuple[BaseModel, dict[str, int | None], dict[str, int | None]]]: + """Yield ``(instance, stored, computed)`` for rows whose cached counts drift. + + Iterates the queryset row-by-row and calls ``update_calculated_fields(save=False)`` + on a fresh copy so the stored row stays untouched. Heavy on large tables; + callers should scope by ``project_id`` whenever the check is interactive. + """ + cached_fields = _cached_count_field_names(model) + if not cached_fields: + return + qs = _scope_to_project(model.objects.all(), model, project_id) + for instance in qs.iterator(): + stored = {f: getattr(instance, f) for f in cached_fields} + instance.update_calculated_fields(save=False) + computed = {f: getattr(instance, f) for f in cached_fields} + if stored != computed: + yield instance, stored, computed + + +def reconcile_cached_counts( + model: type[BaseModel] | None = None, + project_id: int | None = None, + dry_run: bool = True, +) -> IntegrityCheckResult: + """Repair stale cached counts. Pass ``model=None`` to sweep all models.""" + models_to_check = [model] if model else list(discover_cached_count_fields().keys()) + result = IntegrityCheckResult() + for m in models_to_check: + for instance, stored, computed in find_stale_cached_counts(m, project_id=project_id): + result.checked += 1 + logger.info( + "Stale cached counts on %s pk=%s: stored=%s computed=%s", + m.__name__, + instance.pk, + stored, + computed, + ) + if dry_run: + continue + try: + instance.update_calculated_fields(save=True) + result.fixed += 1 + except Exception: + logger.exception("Failed to reconcile %s pk=%s", m.__name__, instance.pk) + result.unfixable += 1 + return result diff --git a/ami/main/models.py b/ami/main/models.py index 64a19b9c6..1d46d4cc5 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -4222,7 +4222,17 @@ def get_source_image_counts(self) -> dict[str, int]: return counts def update_calculated_fields(self, save: bool = False) -> None: - """Recompute the 3 denormalized source-image count columns.""" + """Recompute the 3 denormalized source-image count columns. + + Persists via ``.filter(pk=).update(**counts)`` rather than ``.save()`` + — cached-count refreshes shouldn't bump ``updated_at`` (semantically the + entity hasn't been modified) and shouldn't re-fire ``post_save``, which + on this model would re-enter ``m2m_changed`` if the handler later does + anything with ``self.images``. Deployment / Event / SourceImage take a + different path (``self.save(update_calculated_fields=False)``) because + their ``update_calculated_fields`` also writes non-cached fields that + downstream save-handlers expect to see updated. + """ counts = self.get_source_image_counts() self.source_images_count = counts["source_images_count"] self.source_images_processed_count = counts["source_images_processed_count"] diff --git a/ami/main/signals.py b/ami/main/signals.py index 718ff5650..b954b6960 100644 --- a/ami/main/signals.py +++ b/ami/main/signals.py @@ -1,13 +1,13 @@ import logging from django.contrib.auth.models import Group -from django.db import transaction +from django.db import connection, transaction from django.db.models.signals import m2m_changed, post_delete, post_save, pre_delete, pre_save from django.dispatch import receiver from guardian.shortcuts import assign_perm from ami.main.models import Detection, Project, SourceImageCollection -from ami.main.tasks import refresh_project_cached_counts +from ami.main.tasks import refresh_collection_cached_counts, refresh_project_cached_counts from ami.users.roles import BasicMember, ProjectManager, create_roles_for_project from .models import User @@ -202,24 +202,67 @@ def exclude_taxa_updated(sender, instance: Project, action, **kwargs): # ============================================================================ # SourceImageCollection Denormalized Counts # ============================================================================ +# +# Detection writes can fan out to many collections (one SourceImage may belong +# to multiple collections). We coalesce per-transaction so that, e.g., a 10k-row +# pipeline save fires the recompute task at most once per affected collection +# instead of once per detection. The dedup set lives on ``connection`` (thread- +# local in Django's default setup) and is drained by an ``on_commit`` hook. +# Bulk write paths that bypass signals (``bulk_create``, ``bulk_update``, raw +# SQL) still drift the cached counts; the ``reconcile_cached_counts_task`` in +# ``ami.main.tasks`` is the safety net for those. + +_PENDING_SOURCE_IMAGE_IDS_ATTR = "_pending_collection_count_refresh_source_image_ids" + + +def _flush_pending_collection_refreshes() -> None: + """Drain the per-connection dedup set and dispatch one task per collection.""" + source_image_ids: set[int] = getattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR, set()) + try: + delattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR) + except AttributeError: + pass + if not source_image_ids: + return + collection_ids = ( + SourceImageCollection.objects.filter(images__id__in=source_image_ids).values_list("pk", flat=True).distinct() + ) + for cid in collection_ids: + refresh_collection_cached_counts.delay(cid) + + +def _schedule_collection_refresh_for_source_image(source_image_id: int) -> None: + pending = getattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR, None) + is_new = pending is None + if is_new: + pending = set() + setattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR, pending) + pending.add(source_image_id) + if is_new: + # Outside an atomic block, ``on_commit`` fires synchronously at + # registration time — so the ``add`` above must precede it or the + # flush sees an empty set. + transaction.on_commit(_flush_pending_collection_refreshes) @receiver(m2m_changed, sender=SourceImageCollection.images.through) def update_collection_counts_on_m2m(sender, instance, action, **kwargs): """Recompute denormalized counts when images are added to or removed from a collection.""" if action in ("post_add", "post_remove", "post_clear"): - instance.update_calculated_fields(save=True) + collection_pk = instance.pk + transaction.on_commit(lambda: refresh_collection_cached_counts.delay(collection_pk)) @receiver(post_save, sender=Detection) @receiver(post_delete, sender=Detection) def update_collection_counts_on_detection_change(sender, instance, **kwargs): - """Keep processed / with-detections counts fresh on per-row Detection writes. + """Schedule a collection-counts refresh for every collection containing the affected SourceImage. - `bulk_create` skips signals, so ML pipelines must call `update_calculated_fields` - explicitly after their batch writes (see `ami.ml.models.pipeline.save_results`). + The dedup + on_commit indirection means even tight per-row Detection write + loops fan out to at most one task per affected collection. ``bulk_create`` + / ``bulk_update`` skip signals entirely — those rely on the periodic + reconciliation task to repair drift. """ if not instance.source_image_id: return - for collection in SourceImageCollection.objects.filter(images__id=instance.source_image_id).distinct(): - collection.update_calculated_fields(save=True) + _schedule_collection_refresh_for_source_image(instance.source_image_id) diff --git a/ami/main/tasks.py b/ami/main/tasks.py index 16f927a3f..5fee8d0d8 100644 --- a/ami/main/tasks.py +++ b/ami/main/tasks.py @@ -23,3 +23,43 @@ def refresh_project_cached_counts(project_id: int) -> None: logger.info(f"Refreshing cached counts for project {project.pk} ({project.name})") project.update_related_calculated_fields() + + +@celery_app.task(ignore_result=True) +def refresh_collection_cached_counts(collection_id: int) -> None: + """Recompute the 3 denormalized image counts on one SourceImageCollection. + + Dispatched on_commit by the Detection and m2m signal handlers so that + high-volume write paths (ML pipeline, sample population) don't pay the + aggregate cost inline. + """ + from ami.main.models import SourceImageCollection + + try: + collection = SourceImageCollection.objects.get(pk=collection_id) + except SourceImageCollection.DoesNotExist: + logger.warning(f"SourceImageCollection {collection_id} not found; skipping cached-count refresh") + return + collection.update_calculated_fields(save=True) + + +@celery_app.task(ignore_result=True) +def reconcile_cached_counts_task(project_id: int | None = None, dry_run: bool = False) -> dict: + """Periodic drift check for every model with ``CachedCountField`` columns. + + Catches drift introduced by bulk write paths that skip signals + (``bulk_create``, ``bulk_update``, raw SQL, ML post-processors). Default + is repair mode; pass ``dry_run=True`` for report-only. + """ + from ami.main.checks.cached_counts import reconcile_cached_counts + + result = reconcile_cached_counts(project_id=project_id, dry_run=dry_run) + logger.info( + "reconcile_cached_counts: checked=%d fixed=%d unfixable=%d (project_id=%s, dry_run=%s)", + result.checked, + result.fixed, + result.unfixable, + project_id, + dry_run, + ) + return {"checked": result.checked, "fixed": result.fixed, "unfixable": result.unfixable} diff --git a/ami/main/tests.py b/ami/main/tests.py index e622e7835..32e1554a2 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -6,7 +6,7 @@ from django.contrib.auth.models import AnonymousUser from django.core.files.uploadedfile import SimpleUploadedFile from django.db import connection, models -from django.test import TestCase, override_settings +from django.test import TestCase, TransactionTestCase, override_settings from guardian.shortcuts import assign_perm, get_perms, remove_perm from PIL import Image from rest_framework import status @@ -3349,8 +3349,15 @@ def test_list_query_count_ordering_by_annotated_count(self): self.assertLessEqual(count, 10, f"Collection list ordered by count too many queries: {count}") -class TestSourceImageCollectionCountsDenormalize(TestCase): - """Verify denormalized count columns stay in sync via signals and bulk hooks.""" +@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True) +class TestSourceImageCollectionCountsDenormalize(TransactionTestCase): + """Verify denormalized count columns stay in sync via signals and bulk hooks. + + Uses ``TransactionTestCase`` + eager Celery so ``transaction.on_commit`` + callbacks (registered by the signal handlers) actually fire inside the + test body and the dispatched ``refresh_collection_cached_counts`` task + runs inline. + """ def setUp(self): self.project, self.deployment = setup_test_project() @@ -3383,7 +3390,7 @@ def test_with_detections_count_updates_on_detection_create(self): Detection.objects.create( source_image=self.images[0], timestamp=self.images[0].timestamp, - bbox=[0.1, 0.1, 0.2, 0.2], + bbox=[10, 10, 20, 20], path="detections/d1.jpg", ) self._refresh() @@ -3395,7 +3402,7 @@ def test_with_detections_count_decrements_on_detection_delete(self): det = Detection.objects.create( source_image=self.images[0], timestamp=self.images[0].timestamp, - bbox=[0.1, 0.1, 0.2, 0.2], + bbox=[10, 10, 20, 20], path="detections/d1.jpg", ) det.delete() @@ -3421,7 +3428,7 @@ def test_update_calculated_fields_recomputes_from_scratch(self): Detection.objects.create( source_image=self.images[0], timestamp=self.images[0].timestamp, - bbox=[0.1, 0.1, 0.2, 0.2], + bbox=[10, 10, 20, 20], path="detections/d1.jpg", ) SourceImageCollection.objects.filter(pk=self.collection.pk).update( @@ -3440,7 +3447,7 @@ def test_get_source_image_counts_returns_dict_without_writes(self): Detection.objects.create( source_image=self.images[0], timestamp=self.images[0].timestamp, - bbox=[0.1, 0.1, 0.2, 0.2], + bbox=[10, 10, 20, 20], path="detections/d1.jpg", ) SourceImageCollection.objects.filter(pk=self.collection.pk).update( @@ -3463,6 +3470,86 @@ def test_get_source_image_counts_returns_dict_without_writes(self): self.assertEqual(self.collection.source_images_count, 0) +class TestCachedCountsIntegrityCheck(TestCase): + """Verify the generic drift-detection check for ``CachedCountField`` columns. + + The check is the safety net for bulk write paths that skip signals + (``bulk_create``, ``bulk_update``, raw SQL, ML post-processors). + """ + + def setUp(self): + # reuse=False isolates this test from collections created on the shared + # test project by other test classes — we want exact-count assertions. + self.project, self.deployment = setup_test_project(reuse=False) + create_taxa(self.project) + create_captures(deployment=self.deployment, num_nights=1, images_per_night=3) + # create_captures auto-creates a "Test Source Image Collection" with + # stale counts (signals defer to on_commit which doesn't fire in + # TestCase). Remove it so the project has exactly one collection. + SourceImageCollection.objects.filter(project=self.project).delete() + self.images = list(SourceImage.objects.filter(deployment=self.deployment)) + self.collection = SourceImageCollection.objects.create( + name="drift-test", + project=self.project, + method="manual", + kwargs={"image_ids": [img.pk for img in self.images]}, + ) + self.collection.images.set(self.images) + # Force the cached counts on this fresh collection to a known-correct state. + self.collection.update_calculated_fields(save=True) + + def test_discover_finds_models_with_cached_count_fields(self): + from ami.main.checks.cached_counts import discover_cached_count_fields + + discovered = discover_cached_count_fields() + model_names = {m.__name__ for m in discovered} + # Every model with at least one CachedCountField column should appear. + for expected in ("Deployment", "Event", "SourceImage", "SourceImageCollection"): + self.assertIn(expected, model_names, f"discover missed {expected}") + self.assertIn("source_images_count", discovered[SourceImageCollection]) + + def test_find_stale_yields_drift(self): + from ami.main.checks.cached_counts import find_stale_cached_counts + + # Drift in via raw UPDATE — same shape as bulk_update or signal-skipping writes. + SourceImageCollection.objects.filter(pk=self.collection.pk).update( + source_images_count=999, source_images_processed_count=999, source_images_with_detections_count=999 + ) + drift = list(find_stale_cached_counts(SourceImageCollection, project_id=self.project.pk)) + self.assertEqual(len(drift), 1) + instance, stored, computed = drift[0] + self.assertEqual(instance.pk, self.collection.pk) + self.assertEqual(stored["source_images_count"], 999) + self.assertEqual(computed["source_images_count"], len(self.images)) + + def test_reconcile_dry_run_reports_without_writing(self): + from ami.main.checks.cached_counts import reconcile_cached_counts + + SourceImageCollection.objects.filter(pk=self.collection.pk).update(source_images_count=999) + result = reconcile_cached_counts(model=SourceImageCollection, project_id=self.project.pk, dry_run=True) + self.assertEqual(result.checked, 1) + self.assertEqual(result.fixed, 0) + self.collection.refresh_from_db() + self.assertEqual(self.collection.source_images_count, 999, "dry_run must not write") + + def test_reconcile_no_dry_run_repairs(self): + from ami.main.checks.cached_counts import reconcile_cached_counts + + SourceImageCollection.objects.filter(pk=self.collection.pk).update(source_images_count=999) + result = reconcile_cached_counts(model=SourceImageCollection, project_id=self.project.pk, dry_run=False) + self.assertEqual(result.checked, 1) + self.assertEqual(result.fixed, 1) + self.collection.refresh_from_db() + self.assertEqual(self.collection.source_images_count, len(self.images)) + + def test_reconcile_no_drift_returns_zero_checked(self): + from ami.main.checks.cached_counts import reconcile_cached_counts + + result = reconcile_cached_counts(model=SourceImageCollection, project_id=self.project.pk, dry_run=False) + self.assertEqual(result.checked, 0) + self.assertEqual(result.fixed, 0) + + class TestProjectDefaultTaxaFilter(APITestCase): """ Tests for project default taxa filtering (include/exclude lists). From 451c8be3212154074ecb7c7e62eee571b23a301e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 14 May 2026 23:50:27 -0700 Subject: [PATCH 05/10] docs: cached-counts update_cached_counts method design Captures the design behind replacing PR #1301's per-source-table dedup state + per-model Celery refresh tasks with a generic instance/queryset method on BaseModel/BaseQuerySet. Wraps update_calculated_fields(save=True) with caller-controlled sync vs async (run_async=True default) and per- (model, pk) dedup keyed on the DB connection. Co-Authored-By: Claude --- ...5-14-cached-counts-update-method-design.md | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-14-cached-counts-update-method-design.md diff --git a/docs/superpowers/specs/2026-05-14-cached-counts-update-method-design.md b/docs/superpowers/specs/2026-05-14-cached-counts-update-method-design.md new file mode 100644 index 000000000..5721776e6 --- /dev/null +++ b/docs/superpowers/specs/2026-05-14-cached-counts-update-method-design.md @@ -0,0 +1,183 @@ +# Cached counts: `update_cached_counts` method design + +**Date:** 2026-05-14 +**Context:** Follow-up to PR #1301 takeaway-review feedback. Replace per-source-table dedup state + per-model Celery refresh tasks with a generic instance/queryset method that wraps `update_calculated_fields(save=True)`. + +## Goals + +1. Single source of truth for "recompute and persist this row's cached counts" across the codebase. +2. Caller-controlled sync vs async (`run_async=True` default). +3. Per-(model, pk) dedup so high-volume signal fan-out collapses to one task per affected row, regardless of how many source-row writes triggered it. +4. No new concepts at the field declaration site: `CachedCountField` marker, model `update_calculated_fields` body, and the periodic reconcile task stay as-is. + +## Non-goals + +- Declarative `invalidate_on=[Detection, ...]` on field. Deferred to follow-up; the registry would sit on top of this method. +- Plugging the bulk_create / bulk_update / raw-SQL blind spot. That stays the responsibility of `reconcile_cached_counts_task` and inline calls in ML worker code (`pipeline.save_results`). +- Splitting `update_calculated_fields` into "just counts" vs "derived state (S3 sums, first/last timestamps)". Wrapper stays thin today; semantic split is a separate concern when refreshing one drift forces a full S3 scan and we notice. + +## Architecture + +### New module: `ami/base/cached_counts.py` + +Per-connection dedup set keyed by `(model_label, pk)`. One `transaction.on_commit` hook per connection drains the set and dispatches the generic Celery task once per unique `(model_label, pk)`. + +```python +_PENDING_ATTR = "_pending_cached_count_recomputes" + +def _schedule_recompute(model: type[models.Model], pk: Any) -> None: + pending = getattr(connection, _PENDING_ATTR, None) + is_new = pending is None + if is_new: + pending = set() + setattr(connection, _PENDING_ATTR, pending) + pending.add((model._meta.label, pk)) + if is_new: + # Outside an atomic block, on_commit fires synchronously at + # registration time — the add above must precede it. + transaction.on_commit(_flush_pending_recomputes) + + +def _flush_pending_recomputes() -> None: + pending = getattr(connection, _PENDING_ATTR, set()) + try: + delattr(connection, _PENDING_ATTR) + except AttributeError: + pass + for label, pk in pending: + recompute_cached_counts_task.delay(label, pk) + + +@shared_task(ignore_result=True) +def recompute_cached_counts_task(model_label: str, pk: Any) -> None: + model = apps.get_model(model_label) + try: + instance = model.objects.get(pk=pk) + except model.DoesNotExist: + return + instance.update_calculated_fields(save=True) +``` + +### `BaseModel.update_cached_counts(run_async=True)` + +```python +class BaseModel(models.Model): + def update_cached_counts(self, run_async: bool = True) -> None: + if run_async: + _schedule_recompute(type(self), self.pk) + return + self.update_calculated_fields(save=True) +``` + +### `BaseQuerySet.update_cached_counts(run_async=True)` + +```python +class BaseQuerySet(QuerySet): + def update_cached_counts(self, run_async: bool = True) -> None: + for pk in self.values_list("pk", flat=True): + if run_async: + _schedule_recompute(self.model, pk) + else: + self.model.objects.get(pk=pk).update_calculated_fields(save=True) +``` + +## Call site changes + +### `ami/main/signals.py` + +Detection post_save/post_delete handler: + +```python +@receiver(post_save, sender=Detection) +@receiver(post_delete, sender=Detection) +def update_collection_counts_on_detection_change(sender, instance, **kwargs): + if not instance.source_image_id: + return + SourceImageCollection.objects.filter(images__id=instance.source_image_id).update_cached_counts() +``` + +m2m_changed on `SourceImageCollection.images.through`: + +```python +@receiver(m2m_changed, sender=SourceImageCollection.images.through) +def update_collection_counts_on_m2m(sender, instance, action, **kwargs): + if action in ("post_add", "post_remove", "post_clear"): + instance.update_cached_counts() +``` + +Project default-filter cascade (stays hand-rolled; cascade is to children, not parents): + +```python +def refresh_cached_counts_for_project(project: Project): + Event.objects.filter(project=project).update_cached_counts() + Deployment.objects.filter(project=project).update_cached_counts() + SourceImage.objects.filter(project=project).update_cached_counts() +``` + +### `ami/main/tasks.py` + +Drop `refresh_collection_cached_counts` entirely. `refresh_project_cached_counts` can also drop; the per-project cascade now schedules per-row tasks directly from the signal via the queryset method's `run_async=True` default. Reconcile task stays. + +### `ami/main/checks/cached_counts.py` reconcile loop + +```python +# before +instance.update_calculated_fields(save=True) +# after +instance.update_cached_counts(run_async=False) +``` + +Synchronous because reconcile already runs in a Celery task and we want the repair to complete before the result is reported. + +### `ami/ml/models/pipeline.py` (worker context) + +Stays as-is. Already runs in Celery and already dedupes via `.distinct()` on the collection queryset. Could optionally swap `collection.update_calculated_fields(save=True)` → `collection.update_cached_counts(run_async=False)` for stylistic unification — non-blocking on this PR. + +## What goes away + +- `_PENDING_SOURCE_IMAGE_IDS_ATTR` constant +- `_flush_pending_collection_refreshes` helper +- `_schedule_collection_refresh_for_source_image` helper +- `refresh_collection_cached_counts` task +- `refresh_project_cached_counts` task (its body becomes 3 queryset calls in the signal handler) + +## What stays + +- `CachedCountField` marker class +- Per-model `update_calculated_fields(save=True)` bodies (the actual recompute logic) +- Periodic `reconcile_cached_counts_task` and the integrity check module +- Inline calls in `pipeline.save_results` (worker-context, already deduped) + +## Cost of adding the next cached count + +Before: new field + recompute in `update_calculated_fields` + per-connection dedup attr + flush helper + Celery task + signal handler wiring (~6 things, ~50 LOC). + +After: new field + recompute in `update_calculated_fields` + signal handler calling `.update_cached_counts()` (~3 things, ~10 LOC). + +## Risks + +1. **bulk_create / bulk_update skip signals.** Unchanged from current state. Reconcile task is the safety net. Cachalot accepts the same boundary at the SQL-compiler patch layer (raw cursor coverage is opt-in via `CACHALOT_INVALIDATE_RAW`). +2. **Project default-filter cascade fans out to thousands of children.** Today it's one task; under this design it becomes N small tasks. Net cost is slightly higher (more queue overhead) but each task is bounded and parallelizable. Separate issue from this PR. +3. **`update_calculated_fields` on Deployment does S3-sum + first/last timestamp work alongside the counts.** Refreshing drift on one count therefore triggers an S3 query. Acceptable today; flagged for future split. +4. **`async` is a Python reserved word.** Use `run_async` to match existing precedent (`process_single_source_image(run_async=True)`). + +## Migration path + +This PR (or a follow-up commit on PR #1301): + +1. Create `ami/base/cached_counts.py` with `_schedule_recompute`, `_flush_pending_recomputes`, `recompute_cached_counts_task`. +2. Add `update_cached_counts` to `BaseModel` and `BaseQuerySet` in `ami/base/models.py`. +3. Refactor `ami/main/signals.py` — drop dedup helpers, switch handlers to queryset method. +4. Refactor `ami/main/tasks.py` — drop `refresh_collection_cached_counts` and `refresh_project_cached_counts`. +5. Update `ami/main/checks/cached_counts.py` reconcile loop. +6. Run existing tests in `ami/main/tests.py` (`test_source_image_cached_counts_refresh_on_threshold_change` etc.) to confirm parity. + +## Tests + +Existing tests cover: +- Threshold-change signal triggers refresh +- Detection post_save triggers per-collection refresh +- m2m_changed triggers refresh +- Bulk write drift is caught by reconcile + +These should pass unchanged. One new test: per-connection dedup collapses N detection writes to ≤ N tasks, where N is the number of distinct affected target rows. (PR #1301 has the dedup test for the old code path; rewrite to assert against the new generic task.) From 9ac2ce173670481c1edcbbc5b1819977bb0d9a45 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 15 May 2026 00:20:03 -0700 Subject: [PATCH 06/10] refactor(counts): unify cached-count refresh as BaseModel/BaseQuerySet method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the per-source-table dedup state + per-model Celery refresh tasks introduced in 1a242aa4 with a generic `update_cached_counts(run_async=True)` method on `BaseModel` and `BaseQuerySet`. Wraps the existing `update_calculated_fields(save=True)` body; per-(model_label, pk) dedup keyed on the active DB connection collapses high-volume signal fan-out to one task per affected row. Architecture: - `ami/base/cached_counts.py` — `schedule_recompute(label, pk)` queues a recompute in a per-connection dedup set; `_flush_pending_recomputes` is registered via `transaction.on_commit` (once per call, idempotent on the drain side so it survives rollback-then-fresh-transaction in tests). - `BaseModel.update_cached_counts(run_async=True)` — schedules a recompute for `self`. With `run_async=False` runs inline. - `BaseQuerySet.update_cached_counts(run_async=True)` — schedules one per row in the queryset. - `ami/main/tasks.py::recompute_cached_counts_task` — generic Celery task, loads model via `apps.get_model(label)` and calls `update_calculated_fields(save=True)`. Call site changes: - Detection signal handler is now one line: `SourceImageCollection.objects.filter(images__id=instance.source_image_id).update_cached_counts()` - Collection m2m handler: `instance.update_cached_counts()`. - `refresh_project_cached_counts` body keeps the existing `project.update_related_calculated_fields()` (preserves the bulk subquery UPDATE for `SourceImage.detections_count`); body no longer duplicates the Events/Deployments loop here. - Reconcile loop in `ami/main/checks/cached_counts.py` calls `instance.update_cached_counts(run_async=False)` for stylistic unity. Removed: - `_PENDING_SOURCE_IMAGE_IDS_ATTR`, `_flush_pending_collection_refreshes`, `_schedule_collection_refresh_for_source_image` (signals.py) - `refresh_collection_cached_counts` task (tasks.py) Cost of adding the next cached-count column drops from ~50 LOC (new field + recompute body + dedup attr + flush helper + Celery task + signal handler) to ~10 LOC (new field + recompute body + signal handler calling the method). Design rationale and trade-offs in `docs/superpowers/specs/2026-05-14-cached-counts-update-method-design.md`. Co-Authored-By: Claude --- ami/base/cached_counts.py | 60 +++++++++++++++++++++++++++++++ ami/base/models.py | 41 +++++++++++++++++++++ ami/main/checks/cached_counts.py | 2 +- ami/main/signals.py | 62 ++++++++------------------------ ami/main/tasks.py | 52 +++++++++++++++------------ ami/main/tests.py | 3 +- 6 files changed, 147 insertions(+), 73 deletions(-) create mode 100644 ami/base/cached_counts.py diff --git a/ami/base/cached_counts.py b/ami/base/cached_counts.py new file mode 100644 index 000000000..0faa203bb --- /dev/null +++ b/ami/base/cached_counts.py @@ -0,0 +1,60 @@ +"""Helpers for scheduling cached-count recomputes after a transaction commits. + +Wraps the per-connection dedup + ``transaction.on_commit`` plumbing that +``BaseModel.update_cached_counts`` and ``BaseQuerySet.update_cached_counts`` +build on. The actual recompute body lives in each model's +``update_calculated_fields(save=True)`` implementation; this module only +handles scheduling. + +Per-(model_label, pk) dedup means N writes affecting the same target row +collapse to one task, regardless of how many signal handlers fire in the +transaction. The dedup set lives on the active DB connection (thread-local +in Django's default setup) and is drained by a single ``on_commit`` hook. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from django.db import connection, transaction + +logger = logging.getLogger(__name__) + +_PENDING_ATTR = "_pending_cached_count_recomputes" + + +def schedule_recompute(model_label: str, pk: Any) -> None: + """Queue a ``(model_label, pk)`` for recompute at the next commit. + + The pending set lives on the active DB connection; ``transaction.on_commit`` + fires the flush. ``_flush_pending_recomputes`` is idempotent — the first + call drains the set; subsequent ones no-op — so we register on_commit on + every call. That keeps us correct across transaction rollbacks (which + discard registered on_commits but leave attributes on ``connection`` + untouched, e.g. between a rolled-back ``TestCase`` and a fresh + ``TransactionTestCase``). + + Outside an atomic block, ``on_commit`` fires synchronously at + registration time — so the ``add`` below must precede the + ``transaction.on_commit`` call or the flush sees an empty set. + """ + pending: set[tuple[str, Any]] | None = getattr(connection, _PENDING_ATTR, None) + if pending is None: + pending = set() + setattr(connection, _PENDING_ATTR, pending) + pending.add((model_label, pk)) + transaction.on_commit(_flush_pending_recomputes) + + +def _flush_pending_recomputes() -> None: + """Drain the per-connection dedup set; dispatch one task per ``(model, pk)``.""" + from ami.main.tasks import recompute_cached_counts_task + + pending: set[tuple[str, Any]] = getattr(connection, _PENDING_ATTR, set()) + try: + delattr(connection, _PENDING_ATTR) + except AttributeError: + pass + for model_label, pk in pending: + recompute_cached_counts_task.delay(model_label, pk) diff --git a/ami/base/models.py b/ami/base/models.py index 8ce1d61b0..6cc757c0f 100644 --- a/ami/base/models.py +++ b/ami/base/models.py @@ -56,6 +56,29 @@ def has_many_to_many_project_relation(model: type[models.Model]) -> bool: class BaseQuerySet(QuerySet): + def update_cached_counts(self, run_async: bool = True) -> None: + """Recompute cached count columns for every row in the queryset. + + With ``run_async=True`` (default), each row is queued for recompute + via ``ami.base.cached_counts.schedule_recompute`` and dispatched as + a single ``recompute_cached_counts_task`` per ``(model, pk)`` after + the surrounding transaction commits. Repeated calls within the same + transaction dedupe through a per-connection set. + + With ``run_async=False``, each row is loaded and recomputed inline. + Suitable for Celery-worker contexts (reconcile task, ML pipeline + finalize) where the caller has already taken the latency hit. + """ + from ami.base.cached_counts import schedule_recompute + + model_label = self.model._meta.label + for pk in self.values_list("pk", flat=True): + if run_async: + schedule_recompute(model_label, pk) + else: + instance = self.model.objects.get(pk=pk) + instance.update_calculated_fields(save=True) + def visible_for_user(self, user: User | AnonymousUser) -> QuerySet: """ Filter queryset to include only objects whose related draft projects @@ -182,6 +205,24 @@ def update_calculated_fields(self, *args, **kwargs): """Update calculated fields specific to each model.""" pass + def update_cached_counts(self, run_async: bool = True) -> None: + """Recompute this row's cached count columns. + + With ``run_async=True`` (default), schedule a Celery task to run + after the surrounding transaction commits. Per-(model, pk) dedup + on the active DB connection collapses repeated calls within the + same transaction into a single task. + + With ``run_async=False``, recompute inline by calling + ``update_calculated_fields(save=True)`` directly. + """ + from ami.base.cached_counts import schedule_recompute + + if run_async: + schedule_recompute(self._meta.label, self.pk) + return + self.update_calculated_fields(save=True) + def _get_object_perms(self, user): """ Get the object-level permissions for the user on this instance. diff --git a/ami/main/checks/cached_counts.py b/ami/main/checks/cached_counts.py index d2261e416..b50b1d318 100644 --- a/ami/main/checks/cached_counts.py +++ b/ami/main/checks/cached_counts.py @@ -97,7 +97,7 @@ def reconcile_cached_counts( if dry_run: continue try: - instance.update_calculated_fields(save=True) + instance.update_cached_counts(run_async=False) result.fixed += 1 except Exception: logger.exception("Failed to reconcile %s pk=%s", m.__name__, instance.pk) diff --git a/ami/main/signals.py b/ami/main/signals.py index b954b6960..516b647c2 100644 --- a/ami/main/signals.py +++ b/ami/main/signals.py @@ -1,13 +1,13 @@ import logging from django.contrib.auth.models import Group -from django.db import connection, transaction +from django.db import transaction from django.db.models.signals import m2m_changed, post_delete, post_save, pre_delete, pre_save from django.dispatch import receiver from guardian.shortcuts import assign_perm from ami.main.models import Detection, Project, SourceImageCollection -from ami.main.tasks import refresh_collection_cached_counts, refresh_project_cached_counts +from ami.main.tasks import refresh_project_cached_counts from ami.users.roles import BasicMember, ProjectManager, create_roles_for_project from .models import User @@ -204,53 +204,19 @@ def exclude_taxa_updated(sender, instance: Project, action, **kwargs): # ============================================================================ # # Detection writes can fan out to many collections (one SourceImage may belong -# to multiple collections). We coalesce per-transaction so that, e.g., a 10k-row -# pipeline save fires the recompute task at most once per affected collection -# instead of once per detection. The dedup set lives on ``connection`` (thread- -# local in Django's default setup) and is drained by an ``on_commit`` hook. -# Bulk write paths that bypass signals (``bulk_create``, ``bulk_update``, raw -# SQL) still drift the cached counts; the ``reconcile_cached_counts_task`` in -# ``ami.main.tasks`` is the safety net for those. - -_PENDING_SOURCE_IMAGE_IDS_ATTR = "_pending_collection_count_refresh_source_image_ids" - - -def _flush_pending_collection_refreshes() -> None: - """Drain the per-connection dedup set and dispatch one task per collection.""" - source_image_ids: set[int] = getattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR, set()) - try: - delattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR) - except AttributeError: - pass - if not source_image_ids: - return - collection_ids = ( - SourceImageCollection.objects.filter(images__id__in=source_image_ids).values_list("pk", flat=True).distinct() - ) - for cid in collection_ids: - refresh_collection_cached_counts.delay(cid) - - -def _schedule_collection_refresh_for_source_image(source_image_id: int) -> None: - pending = getattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR, None) - is_new = pending is None - if is_new: - pending = set() - setattr(connection, _PENDING_SOURCE_IMAGE_IDS_ATTR, pending) - pending.add(source_image_id) - if is_new: - # Outside an atomic block, ``on_commit`` fires synchronously at - # registration time — so the ``add`` above must precede it or the - # flush sees an empty set. - transaction.on_commit(_flush_pending_collection_refreshes) +# to multiple collections). The queryset's ``update_cached_counts()`` method +# dedupes per-(model, pk) across the transaction via a per-connection set, so +# a 10k-row pipeline save fires the recompute task at most once per affected +# collection instead of once per detection. Bulk write paths that bypass +# signals (``bulk_create``, ``bulk_update``, raw SQL) still drift the cached +# counts; ``reconcile_cached_counts_task`` is the safety net for those. @receiver(m2m_changed, sender=SourceImageCollection.images.through) def update_collection_counts_on_m2m(sender, instance, action, **kwargs): """Recompute denormalized counts when images are added to or removed from a collection.""" if action in ("post_add", "post_remove", "post_clear"): - collection_pk = instance.pk - transaction.on_commit(lambda: refresh_collection_cached_counts.delay(collection_pk)) + instance.update_cached_counts() @receiver(post_save, sender=Detection) @@ -258,11 +224,11 @@ def update_collection_counts_on_m2m(sender, instance, action, **kwargs): def update_collection_counts_on_detection_change(sender, instance, **kwargs): """Schedule a collection-counts refresh for every collection containing the affected SourceImage. - The dedup + on_commit indirection means even tight per-row Detection write - loops fan out to at most one task per affected collection. ``bulk_create`` - / ``bulk_update`` skip signals entirely — those rely on the periodic - reconciliation task to repair drift. + The queryset method's per-(model, pk) dedup means tight per-row Detection + write loops fan out to at most one task per affected collection. + ``bulk_create`` / ``bulk_update`` skip signals entirely — those rely on + the periodic reconciliation task to repair drift. """ if not instance.source_image_id: return - _schedule_collection_refresh_for_source_image(instance.source_image_id) + SourceImageCollection.objects.filter(images__id=instance.source_image_id).update_cached_counts() diff --git a/ami/main/tasks.py b/ami/main/tasks.py index 5fee8d0d8..75337a065 100644 --- a/ami/main/tasks.py +++ b/ami/main/tasks.py @@ -1,17 +1,43 @@ import logging +from django.apps import apps + from config import celery_app logger = logging.getLogger(__name__) +@celery_app.task(ignore_result=True) +def recompute_cached_counts_task(model_label: str, pk: int) -> None: + """Recompute one row's cached count columns. + + Dispatched by ``ami.base.cached_counts._flush_pending_recomputes`` after + a transaction commits. Generic across every model that declares + ``CachedCountField`` columns and implements ``update_calculated_fields``. + + Silent on missing rows: the row may have been deleted between when the + recompute was queued and when the task runs. + """ + model = apps.get_model(model_label) + try: + instance = model.objects.get(pk=pk) + except model.DoesNotExist: + logger.debug("recompute_cached_counts_task: %s pk=%s not found, skipping", model_label, pk) + return + instance.update_calculated_fields(save=True) + + @celery_app.task(ignore_result=True) def refresh_project_cached_counts(project_id: int) -> None: - """Refresh cached counts for all Events and Deployments in a project. + """Refresh cached counts on every Event, Deployment, and SourceImage in a project. - Dispatched from signals on ``Project.default_filters_*`` changes. The work - fans out to every Event and Deployment in the project, so it must not run - inline in the request/save path. + Dispatched from signals on ``Project.default_filters_*`` changes. The + cascade can touch tens of thousands of rows for a large project, so we + do the work inline in this single Celery task rather than queueing one + recompute task per row — that would flood the broker on a single filter + change. ``Project.update_related_calculated_fields()`` keeps the bulk + subquery UPDATE for ``SourceImage.detections_count`` while looping Events + and Deployments row-by-row. """ from ami.main.models import Project @@ -25,24 +51,6 @@ def refresh_project_cached_counts(project_id: int) -> None: project.update_related_calculated_fields() -@celery_app.task(ignore_result=True) -def refresh_collection_cached_counts(collection_id: int) -> None: - """Recompute the 3 denormalized image counts on one SourceImageCollection. - - Dispatched on_commit by the Detection and m2m signal handlers so that - high-volume write paths (ML pipeline, sample population) don't pay the - aggregate cost inline. - """ - from ami.main.models import SourceImageCollection - - try: - collection = SourceImageCollection.objects.get(pk=collection_id) - except SourceImageCollection.DoesNotExist: - logger.warning(f"SourceImageCollection {collection_id} not found; skipping cached-count refresh") - return - collection.update_calculated_fields(save=True) - - @celery_app.task(ignore_result=True) def reconcile_cached_counts_task(project_id: int | None = None, dry_run: bool = False) -> dict: """Periodic drift check for every model with ``CachedCountField`` columns. diff --git a/ami/main/tests.py b/ami/main/tests.py index 32e1554a2..d212bb083 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -3355,8 +3355,7 @@ class TestSourceImageCollectionCountsDenormalize(TransactionTestCase): Uses ``TransactionTestCase`` + eager Celery so ``transaction.on_commit`` callbacks (registered by the signal handlers) actually fire inside the - test body and the dispatched ``refresh_collection_cached_counts`` task - runs inline. + test body and the dispatched ``recompute_cached_counts_task`` runs inline. """ def setUp(self): From 1120396e5dedbafd668f968283df5e335fc5d4c0 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 19 May 2026 16:40:18 -0700 Subject: [PATCH 07/10] fix(serializers): mark SourceImageCollection cached counts read-only The three denormalized count columns (source_images_count, source_images_with_detections_count, source_images_processed_count) were exposed as writable model fields after the schema migration. A client with update permission could PATCH arbitrary values and persist drift until the next signal recompute touched the row. Co-Authored-By: Claude --- ami/main/api/serializers.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ami/main/api/serializers.py b/ami/main/api/serializers.py index 354df5459..dff62167a 100644 --- a/ami/main/api/serializers.py +++ b/ami/main/api/serializers.py @@ -1254,6 +1254,12 @@ class Meta: "created_at", "updated_at", ] + # Denormalized columns kept in sync by signals; never client-writable. + read_only_fields = [ + "source_images_count", + "source_images_with_detections_count", + "source_images_processed_count", + ] def get_permissions(self, instance, instance_data): request: Request = self.context["request"] From 5113ff2339c3dac4f239d817dbb86c325821ef7f Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 19 May 2026 16:40:33 -0700 Subject: [PATCH 08/10] refactor(migrations): split SourceImageCollection counts schema and backfill 0085 was non-atomic and combined irreversible AddField operations with a long RunPython backfill. If the backfill failed or was interrupted on production-sized data, the columns existed but the migration stayed unrecorded -- a retry would fail on duplicate column. Split into: - 0085: AddField x3 only, atomic. AddField with constant default is a metadata-only operation on PostgreSQL 11+, safe on large tables. - 0086_backfill: the GROUP BY / FILTER backfill, atomic=False. Writes absolute computed values, so naturally idempotent and safe to re-run if interrupted. - 0087_use_cached_count_field: renamed from 0086, dependency updated to point at the new backfill migration. Co-Authored-By: Claude --- ...enormalize_sourceimagecollection_counts.py | 68 +++---------------- ...6_backfill_sourceimagecollection_counts.py | 65 ++++++++++++++++++ ...ield.py => 0087_use_cached_count_field.py} | 2 +- 3 files changed, 75 insertions(+), 60 deletions(-) create mode 100644 ami/main/migrations/0086_backfill_sourceimagecollection_counts.py rename ami/main/migrations/{0086_use_cached_count_field.py => 0087_use_cached_count_field.py} (97%) diff --git a/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py b/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py index 05fa2e49c..503bdf1b3 100644 --- a/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py +++ b/ami/main/migrations/0085_denormalize_sourceimagecollection_counts.py @@ -1,70 +1,21 @@ """ -Denormalize three counts onto ``SourceImageCollection`` so the list endpoint -reads them in O(1) instead of running 3 correlated count subqueries per row. +Add three denormalized count columns to ``SourceImageCollection`` so the list +endpoint reads them in O(1) instead of running 3 correlated count subqueries +per row. -Backfill uses a single GROUP BY over the M2M with FILTER clauses to compute -all three counts in one pass. ``with_det`` checks for a valid (non-null / -non-empty) detection bbox to match the runtime -``NULL_DETECTIONS_FILTER`` semantics in ``ami/main/models.py``. +Schema only. The backfill runs in the separate, non-atomic, re-runnable +migration ``0086_backfill_sourceimagecollection_counts`` so an interrupted +backfill on production-sized data cannot leave the schema half-applied +(columns added but migration unrecorded -> retry fails on duplicate column). -``atomic = False`` so the long UPDATE can run outside a single transaction -on production-sized M2M tables. +``AddField`` with a constant ``default`` is a metadata-only operation on +PostgreSQL 11+, so this is safe to run atomically even on large tables. """ from django.db import migrations, models -def backfill_counts(apps, schema_editor): - schema_editor.execute( - """ - UPDATE main_sourceimagecollection sc - SET source_images_count = c.total, - source_images_processed_count = c.processed, - source_images_with_detections_count = c.with_det - FROM ( - SELECT msci.sourceimagecollection_id AS coll_id, - COUNT(*) AS total, - COUNT(*) FILTER ( - WHERE EXISTS ( - SELECT 1 FROM main_detection d - WHERE d.source_image_id = si.id - ) - ) AS processed, - COUNT(*) FILTER ( - WHERE EXISTS ( - SELECT 1 FROM main_detection d - WHERE d.source_image_id = si.id - AND d.bbox IS NOT NULL - AND d.bbox::text <> '[]' - ) - ) AS with_det - FROM main_sourceimagecollection_images msci - INNER JOIN main_sourceimage si ON si.id = msci.sourceimage_id - GROUP BY msci.sourceimagecollection_id - ) c - WHERE sc.id = c.coll_id; - """ - ) - # Collections with no images: paginated SELECTs returned 0 via Coalesce; keep - # them populated rather than NULL so the column reads stay consistent. - schema_editor.execute( - """ - UPDATE main_sourceimagecollection - SET source_images_count = 0, - source_images_processed_count = 0, - source_images_with_detections_count = 0 - WHERE source_images_count IS NULL; - """ - ) - - -def reverse_noop(apps, schema_editor): - pass - - class Migration(migrations.Migration): - atomic = False - dependencies = [ ("main", "0084_revoke_delete_job_from_roles"), ] @@ -85,5 +36,4 @@ class Migration(migrations.Migration): name="source_images_processed_count", field=models.IntegerField(default=0), ), - migrations.RunPython(backfill_counts, reverse_noop), ] diff --git a/ami/main/migrations/0086_backfill_sourceimagecollection_counts.py b/ami/main/migrations/0086_backfill_sourceimagecollection_counts.py new file mode 100644 index 000000000..e40740186 --- /dev/null +++ b/ami/main/migrations/0086_backfill_sourceimagecollection_counts.py @@ -0,0 +1,65 @@ +""" +Backfill the denormalized ``SourceImageCollection`` count columns added in +0085. + +Split from the schema migration on purpose: this is the slow step on +production-sized M2M tables. ``atomic = False`` lets the UPDATE run outside a +single transaction, and the UPDATE writes absolute computed values (not +deltas) so it is idempotent — safe to re-run if interrupted. Collections with +no images keep the column ``default=0`` from 0085 (the GROUP BY only emits +rows for collections that have images). + +``with_det`` checks for a valid (non-null / non-empty) detection bbox to match +the runtime ``NULL_DETECTIONS_FILTER`` semantics in ``ami/main/models.py``. +""" + +from django.db import migrations + + +def backfill_counts(apps, schema_editor): + schema_editor.execute( + """ + UPDATE main_sourceimagecollection sc + SET source_images_count = c.total, + source_images_processed_count = c.processed, + source_images_with_detections_count = c.with_det + FROM ( + SELECT msci.sourceimagecollection_id AS coll_id, + COUNT(*) AS total, + COUNT(*) FILTER ( + WHERE EXISTS ( + SELECT 1 FROM main_detection d + WHERE d.source_image_id = si.id + ) + ) AS processed, + COUNT(*) FILTER ( + WHERE EXISTS ( + SELECT 1 FROM main_detection d + WHERE d.source_image_id = si.id + AND d.bbox IS NOT NULL + AND d.bbox::text <> '[]' + ) + ) AS with_det + FROM main_sourceimagecollection_images msci + INNER JOIN main_sourceimage si ON si.id = msci.sourceimage_id + GROUP BY msci.sourceimagecollection_id + ) c + WHERE sc.id = c.coll_id; + """ + ) + + +def reverse_noop(apps, schema_editor): + pass + + +class Migration(migrations.Migration): + atomic = False + + dependencies = [ + ("main", "0085_denormalize_sourceimagecollection_counts"), + ] + + operations = [ + migrations.RunPython(backfill_counts, reverse_noop), + ] diff --git a/ami/main/migrations/0086_use_cached_count_field.py b/ami/main/migrations/0087_use_cached_count_field.py similarity index 97% rename from ami/main/migrations/0086_use_cached_count_field.py rename to ami/main/migrations/0087_use_cached_count_field.py index c0afe21e7..5f568da60 100644 --- a/ami/main/migrations/0086_use_cached_count_field.py +++ b/ami/main/migrations/0087_use_cached_count_field.py @@ -6,7 +6,7 @@ class Migration(migrations.Migration): dependencies = [ - ("main", "0085_denormalize_sourceimagecollection_counts"), + ("main", "0086_backfill_sourceimagecollection_counts"), ] operations = [ From f0cc05559ec7b39785b9c6fa7c9fd51501d0e702 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 19 May 2026 16:40:46 -0700 Subject: [PATCH 09/10] refactor(counts): defer reconcile tooling to follow-up The reconcile tooling shipped with this branch was not production-wired: no Celery beat schedule registered the periodic task, the IntegrityCheck contract was violated (checked counted only drifted rows, not rows inspected), and the task default of (project_id=None, dry_run=False) was a full-table repair sweep across every CachedCountField model. Better to ship the perf win on its own and bring the reconcile/integrity layer in a follow-up that registers the periodic task, fixes the checked semantics, defaults to dry_run, and surfaces results on a dashboard. Removed: - ami/main/checks/cached_counts.py (discover/find_stale/reconcile_) - reconcile_cached_counts_task in ami/main/tasks.py - TestCachedCountsIntegrityCheck in ami/main/tests.py Adjusted: - ami/main/signals.py: safety-net comment now describes the actual current coverage (signals + pipeline.save_results) and points at the follow-up planning doc instead of the removed reconcile task. - ami/base/models.py: CachedCountField docstring no longer references the deleted module. Co-Authored-By: Claude --- ami/base/models.py | 5 +- ami/main/checks/cached_counts.py | 105 ------------------------------- ami/main/signals.py | 4 +- ami/main/tasks.py | 22 ------- ami/main/tests.py | 80 ----------------------- 5 files changed, 6 insertions(+), 210 deletions(-) delete mode 100644 ami/main/checks/cached_counts.py diff --git a/ami/base/models.py b/ami/base/models.py index 6cc757c0f..62e09d6e4 100644 --- a/ami/base/models.py +++ b/ami/base/models.py @@ -11,8 +11,9 @@ class CachedCountField(models.IntegerField): """Denormalized count of related rows. Marker subclass so cached aggregate columns can be discovered via - ``Model._meta.get_fields()`` (see ``ami.main.checks.cached_counts`` - for the periodic drift-reconciliation check). Column type is unchanged + ``Model._meta.get_fields()`` + ``isinstance(f, CachedCountField)`` by + future cross-cutting tasks (admin display, periodic drift + reconciliation). Column type is unchanged from ``IntegerField`` — the AlterField migrations that introduce this subclass are no-op SQL. Mixing ``CachedCountField`` and plain ``IntegerField`` on the same model is fine, but a future contributor diff --git a/ami/main/checks/cached_counts.py b/ami/main/checks/cached_counts.py deleted file mode 100644 index b50b1d318..000000000 --- a/ami/main/checks/cached_counts.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Reconcile drift on ``CachedCountField`` columns. - -Cached count columns (e.g. ``Deployment.captures_count``, -``SourceImageCollection.source_images_count``) are kept current via signals -and explicit ``update_calculated_fields`` calls. Bulk write paths that skip -signals — ``bulk_create``, ``bulk_update``, raw SQL, some ML post-processors -— silently drift the stored value out of sync with the underlying rows. - -This check discovers every model that declares one or more -``CachedCountField`` columns, finds rows whose stored values disagree with -a fresh recompute via ``instance.update_calculated_fields(save=False)``, -and either reports or repairs them. - -Run via ``manage.py check_data_integrity`` (when PR #1188 lands) or via -the ``reconcile_cached_counts`` Celery task. -""" - -from __future__ import annotations - -import logging -from collections.abc import Iterator - -from django.apps import apps - -from ami.base.models import BaseModel, CachedCountField -from ami.main.checks.schemas import IntegrityCheckResult - -logger = logging.getLogger(__name__) - - -def _cached_count_field_names(model: type[BaseModel]) -> list[str]: - fields = model._meta.get_fields() # type: ignore[attr-defined] - return [f.name for f in fields if isinstance(f, CachedCountField) and f.name] - - -def discover_cached_count_fields() -> dict[type[BaseModel], list[str]]: - """Return models that declare one or more ``CachedCountField`` columns.""" - result: dict[type[BaseModel], list[str]] = {} - for model in apps.get_models(): - if not issubclass(model, BaseModel): - continue - cached = _cached_count_field_names(model) - if cached: - result[model] = cached - return result - - -def _scope_to_project(qs, model: type[BaseModel], project_id: int | None): - if project_id is None: - return qs - project_accessor = model.get_project_accessor() - if project_accessor and project_accessor != "projects": - return qs.filter(**{f"{project_accessor}_id": project_id}) - return qs - - -def find_stale_cached_counts( - model: type[BaseModel], - project_id: int | None = None, -) -> Iterator[tuple[BaseModel, dict[str, int | None], dict[str, int | None]]]: - """Yield ``(instance, stored, computed)`` for rows whose cached counts drift. - - Iterates the queryset row-by-row and calls ``update_calculated_fields(save=False)`` - on a fresh copy so the stored row stays untouched. Heavy on large tables; - callers should scope by ``project_id`` whenever the check is interactive. - """ - cached_fields = _cached_count_field_names(model) - if not cached_fields: - return - qs = _scope_to_project(model.objects.all(), model, project_id) - for instance in qs.iterator(): - stored = {f: getattr(instance, f) for f in cached_fields} - instance.update_calculated_fields(save=False) - computed = {f: getattr(instance, f) for f in cached_fields} - if stored != computed: - yield instance, stored, computed - - -def reconcile_cached_counts( - model: type[BaseModel] | None = None, - project_id: int | None = None, - dry_run: bool = True, -) -> IntegrityCheckResult: - """Repair stale cached counts. Pass ``model=None`` to sweep all models.""" - models_to_check = [model] if model else list(discover_cached_count_fields().keys()) - result = IntegrityCheckResult() - for m in models_to_check: - for instance, stored, computed in find_stale_cached_counts(m, project_id=project_id): - result.checked += 1 - logger.info( - "Stale cached counts on %s pk=%s: stored=%s computed=%s", - m.__name__, - instance.pk, - stored, - computed, - ) - if dry_run: - continue - try: - instance.update_cached_counts(run_async=False) - result.fixed += 1 - except Exception: - logger.exception("Failed to reconcile %s pk=%s", m.__name__, instance.pk) - result.unfixable += 1 - return result diff --git a/ami/main/signals.py b/ami/main/signals.py index 516b647c2..6bf4f0ea4 100644 --- a/ami/main/signals.py +++ b/ami/main/signals.py @@ -209,7 +209,9 @@ def exclude_taxa_updated(sender, instance: Project, action, **kwargs): # a 10k-row pipeline save fires the recompute task at most once per affected # collection instead of once per detection. Bulk write paths that bypass # signals (``bulk_create``, ``bulk_update``, raw SQL) still drift the cached -# counts; ``reconcile_cached_counts_task`` is the safety net for those. +# counts; ``pipeline.save_results()`` explicitly recomputes for the ML path. +# Generic periodic drift reconciliation across all CachedCountField models is +# tracked as a follow-up (see docs/claude/planning/cached-counts-reconcile-followup.md). @receiver(m2m_changed, sender=SourceImageCollection.images.through) diff --git a/ami/main/tasks.py b/ami/main/tasks.py index 75337a065..45637fb97 100644 --- a/ami/main/tasks.py +++ b/ami/main/tasks.py @@ -49,25 +49,3 @@ def refresh_project_cached_counts(project_id: int) -> None: logger.info(f"Refreshing cached counts for project {project.pk} ({project.name})") project.update_related_calculated_fields() - - -@celery_app.task(ignore_result=True) -def reconcile_cached_counts_task(project_id: int | None = None, dry_run: bool = False) -> dict: - """Periodic drift check for every model with ``CachedCountField`` columns. - - Catches drift introduced by bulk write paths that skip signals - (``bulk_create``, ``bulk_update``, raw SQL, ML post-processors). Default - is repair mode; pass ``dry_run=True`` for report-only. - """ - from ami.main.checks.cached_counts import reconcile_cached_counts - - result = reconcile_cached_counts(project_id=project_id, dry_run=dry_run) - logger.info( - "reconcile_cached_counts: checked=%d fixed=%d unfixable=%d (project_id=%s, dry_run=%s)", - result.checked, - result.fixed, - result.unfixable, - project_id, - dry_run, - ) - return {"checked": result.checked, "fixed": result.fixed, "unfixable": result.unfixable} diff --git a/ami/main/tests.py b/ami/main/tests.py index d212bb083..67572415d 100644 --- a/ami/main/tests.py +++ b/ami/main/tests.py @@ -3469,86 +3469,6 @@ def test_get_source_image_counts_returns_dict_without_writes(self): self.assertEqual(self.collection.source_images_count, 0) -class TestCachedCountsIntegrityCheck(TestCase): - """Verify the generic drift-detection check for ``CachedCountField`` columns. - - The check is the safety net for bulk write paths that skip signals - (``bulk_create``, ``bulk_update``, raw SQL, ML post-processors). - """ - - def setUp(self): - # reuse=False isolates this test from collections created on the shared - # test project by other test classes — we want exact-count assertions. - self.project, self.deployment = setup_test_project(reuse=False) - create_taxa(self.project) - create_captures(deployment=self.deployment, num_nights=1, images_per_night=3) - # create_captures auto-creates a "Test Source Image Collection" with - # stale counts (signals defer to on_commit which doesn't fire in - # TestCase). Remove it so the project has exactly one collection. - SourceImageCollection.objects.filter(project=self.project).delete() - self.images = list(SourceImage.objects.filter(deployment=self.deployment)) - self.collection = SourceImageCollection.objects.create( - name="drift-test", - project=self.project, - method="manual", - kwargs={"image_ids": [img.pk for img in self.images]}, - ) - self.collection.images.set(self.images) - # Force the cached counts on this fresh collection to a known-correct state. - self.collection.update_calculated_fields(save=True) - - def test_discover_finds_models_with_cached_count_fields(self): - from ami.main.checks.cached_counts import discover_cached_count_fields - - discovered = discover_cached_count_fields() - model_names = {m.__name__ for m in discovered} - # Every model with at least one CachedCountField column should appear. - for expected in ("Deployment", "Event", "SourceImage", "SourceImageCollection"): - self.assertIn(expected, model_names, f"discover missed {expected}") - self.assertIn("source_images_count", discovered[SourceImageCollection]) - - def test_find_stale_yields_drift(self): - from ami.main.checks.cached_counts import find_stale_cached_counts - - # Drift in via raw UPDATE — same shape as bulk_update or signal-skipping writes. - SourceImageCollection.objects.filter(pk=self.collection.pk).update( - source_images_count=999, source_images_processed_count=999, source_images_with_detections_count=999 - ) - drift = list(find_stale_cached_counts(SourceImageCollection, project_id=self.project.pk)) - self.assertEqual(len(drift), 1) - instance, stored, computed = drift[0] - self.assertEqual(instance.pk, self.collection.pk) - self.assertEqual(stored["source_images_count"], 999) - self.assertEqual(computed["source_images_count"], len(self.images)) - - def test_reconcile_dry_run_reports_without_writing(self): - from ami.main.checks.cached_counts import reconcile_cached_counts - - SourceImageCollection.objects.filter(pk=self.collection.pk).update(source_images_count=999) - result = reconcile_cached_counts(model=SourceImageCollection, project_id=self.project.pk, dry_run=True) - self.assertEqual(result.checked, 1) - self.assertEqual(result.fixed, 0) - self.collection.refresh_from_db() - self.assertEqual(self.collection.source_images_count, 999, "dry_run must not write") - - def test_reconcile_no_dry_run_repairs(self): - from ami.main.checks.cached_counts import reconcile_cached_counts - - SourceImageCollection.objects.filter(pk=self.collection.pk).update(source_images_count=999) - result = reconcile_cached_counts(model=SourceImageCollection, project_id=self.project.pk, dry_run=False) - self.assertEqual(result.checked, 1) - self.assertEqual(result.fixed, 1) - self.collection.refresh_from_db() - self.assertEqual(self.collection.source_images_count, len(self.images)) - - def test_reconcile_no_drift_returns_zero_checked(self): - from ami.main.checks.cached_counts import reconcile_cached_counts - - result = reconcile_cached_counts(model=SourceImageCollection, project_id=self.project.pk, dry_run=False) - self.assertEqual(result.checked, 0) - self.assertEqual(result.fixed, 0) - - class TestProjectDefaultTaxaFilter(APITestCase): """ Tests for project default taxa filtering (include/exclude lists). From 2533857737dcca1f63fd44a0cdbd1e7e16116825 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 19 May 2026 16:41:00 -0700 Subject: [PATCH 10/10] docs(planning): cached-counts reconcile follow-up + design space Captures what was removed from the reconcile layer and why (no beat registration, checked-counts semantics, unsafe defaults), what the follow-up needs to ship, and a design-space section for future expensive cached fields. Names the patterns under consideration (NULL-sentinel staleness, freshness-timestamp companion, aggregates/rollup table / materialized view) and proposes a marker-class hierarchy so a single discovery sweep can dispatch per-strategy reconcile (eager: set-based diff; lazy: freshness check; rollup-backed: refresh view). Cites the existing freshness-timestamp precedent on Event.calculated_fields_updated_at. Co-Authored-By: Claude --- .../cached-counts-reconcile-followup.md | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 docs/claude/planning/cached-counts-reconcile-followup.md diff --git a/docs/claude/planning/cached-counts-reconcile-followup.md b/docs/claude/planning/cached-counts-reconcile-followup.md new file mode 100644 index 000000000..ec4dc3c11 --- /dev/null +++ b/docs/claude/planning/cached-counts-reconcile-followup.md @@ -0,0 +1,148 @@ +# Follow-up: CachedCountField drift reconciliation + periodic task + dashboard + +**Status:** planned (split out of PR #1301 on 2026-05-15) +**Depends on:** PR #1301 (denormalized `SourceImageCollection` counts + `CachedCountField` marker) merged. + +## Why this is a separate PR + +#1301 ships the perf win (denormalized columns, signals, ML-path recompute, +`CachedCountField` marker, per-connection dedup scheduler). The generic +drift-reconciliation layer was removed from #1301 because it was not +production-wired and shipping it half-done is worse than not shipping it: + +- The reconcile Celery task had **no beat schedule** — `CELERY_BEAT_SCHEDULER` + is `django_celery_beat.schedulers:DatabaseScheduler` (`config/settings/base.py:413`), + so periodic tasks live in the DB. #1301 added no `PeriodicTask` registration, + so the "safety net" never actually ran. +- `reconcile_cached_counts` counted `checked` only for rows that **already + drifted**, contradicting the `IntegrityCheckResult` contract + (`ami/main/checks/schemas.py`: `checked` = rows inspected). A clean sweep + reported `checked=0`, indistinguishable from "didn't run". +- Task default was `project_id=None, dry_run=False` → a full-table, + repair-mode, per-row-subquery sweep across every `CachedCountField` model + (incl. `SourceImage`, millions of rows). Dangerous default if naively + scheduled. + +Removed from #1301 (recover from git history at branch +`perf/sourceimagecollection-cached-counts` pre-2026-05-15): +- `ami/main/checks/cached_counts.py` (`discover_cached_count_fields`, + `find_stale_cached_counts`, `reconcile_cached_counts`) +- `reconcile_cached_counts_task` in `ami/main/tasks.py` +- `TestCachedCountsIntegrityCheck` in `ami/main/tests.py` + +The `CachedCountField` marker, `discover`-via-`_meta.get_fields()` approach, +and the design doc (`docs/superpowers/specs/2026-05-14-cached-counts-update-method-design.md`) +stay in #1301 — the follow-up builds on the marker. + +## Scope of the follow-up + +1. **Reconcile module** — restore `find_stale_cached_counts` / + `reconcile_cached_counts`. Fix `checked` to increment per row inspected + (in the iteration, not the drift branch). Keep `dry_run` and `project_id` + scoping. +2. **Safe task defaults** — `reconcile_cached_counts_task` defaults to + `dry_run=True`; repair mode must be explicit. Require either `project_id` + or an explicit `model` for the repair path; refuse a full-table unscoped + repair without an explicit `force=True`. +3. **Periodic task registration** — data migration creating the + `django_celery_beat` `PeriodicTask` (dry-run mode, reasonable cadence, + per-project fan-out rather than one unscoped sweep). Document how to + enable repair mode per environment. +4. **Surface results** — dashboard / log destination for reconcile output + (checked / fixed / drift detail). Decide: admin page, structured logs to + the existing logging sink, or a lightweight status model. Drift events + should be visible without grepping worker logs. +5. **Tests** — restore the integrity-check tests; add one asserting + `checked` reflects rows inspected on a no-drift sweep (the bug the old + `test_reconcile_no_drift_returns_zero_checked` baked in). + +## Open questions + +- Cadence + scoping: per-project nightly vs. one global weekly sweep. Per-row + subquery recompute on `SourceImage` at prod scale is expensive — likely + needs the bulk-subquery UPDATE path (`Project.update_related_calculated_fields`) + rather than row-by-row `update_calculated_fields` for the big models. +- Whether reconcile should auto-repair or only alert + require a manual + trigger for repair (safer; drift usually signals a missing signal/hook + that should be fixed at the source, not papered over). + +## Reconciler compute strategy: read-only vs upsert vs invalidate + +We already have the read/write split on the eager path: +`SourceImageCollection.get_source_image_counts()` is pure-compute (one +aggregate, no writes); `update_calculated_fields(save=True)` is the +side-effecting upsert. That split is correct for the **signal** path but the +pure-compute read is the wrong primitive for a **sweep** — the removed +reconciler iterated rows calling it via `.iterator()`, which is N subquery +aggregates. Three named approaches, ranked: + +1. **Set-based diff (eager fields, chosen).** One GROUP BY producing the true + counts for every row in a single pass — this query already exists as the + `0086_backfill_sourceimagecollection_counts` SQL. Reconcile = run it, + compare to stored columns (`WHERE sc.x IS DISTINCT FROM c.x` to detect, + `UPDATE ... WHERE` to repair only divergent rows). O(passes), not O(rows). + Precedent: `Project.update_related_calculated_fields()` keeps a + bulk-subquery UPDATE for `SourceImage.detections_count` rather than + looping. **Implication:** the per-model "true value" query is the single + source of truth; backfill, reconcile, and (where cheap) the signal path + should all derive from it, closing the migration/runtime predicate-drift + gap flagged in the takeaway review. + +2. **Lazy invalidation (expensive fields, considered — see below).** Mark + stale on a write criterion; recompute on read or on a prioritized sweep. + Named pattern: *write-invalidate* / *cache-aside* with *TTL* or + *generation/version stamping*. Not warranted for the cheap count columns + (eager signal recompute is one aggregate per affected row), but it is the + right path for future expensive caches. + +3. **Generic per-row loop (diagnostic only).** Lowest-common-denominator, + works for any `CachedCountField` model, slow at scale. Survives only as a + scoped / `dry_run` diagnostic, never an unscoped repair sweep. + +## Design space for future expensive cached fields (considered, not in scope) + +Counts are cheap to recompute eagerly. Some cached values will not be — e.g. +a stored `best_machine_prediction_score` / `best_machine_prediction_taxon_id` +on `Occurrence` (today a queryset annotation, `with_best_machine_prediction`, +not a column). When a cached value is expensive, eager write-through stops +being viable and the field needs a **freshness signal**. Named options: + +- **NULL-sentinel staleness.** Nullable column where `NULL` = "not computed / + stale, recompute before trusting". Zero extra schema; this is *already* the + de-facto contract for `Deployment.*_count` / `Event.*_count` + (`blank=True, null=True`). Limitation: conflates "computed and genuinely + empty" with "stale". Fine for counts (`NULL` ≠ `0`) and for FK/score caches + (`NULL` = unknown). The "stale bit = set it to None" intuition is exactly + this pattern. + +- **Freshness-timestamp companion.** A sibling `_computed_at` + datetime; staleness = `computed_at IS NULL OR computed_at <= source_changed_at` + (TTL or watermark). Precedent already in the codebase: + `Event.calculated_fields_updated_at` (`models.py:1161`) gates recompute via + `Q(calculated_fields_updated_at__isnull=True) | Q(...__lte=last_updated)` + (`models.py:1346`). Preferred for expensive fields: enables TTL, + oldest-first reconcile prioritization, and observability ("how stale is + this?"). This is the "value + stale bit, two-part field" idea — the second + part is a freshness timestamp, not a boolean. + +- **Aggregates / rollup table (or materialized view).** For very expensive + cross-table rollups, store the value out of the hot row entirely: a FK from + the entity to a summary row, or a DB **materialized view** refreshed on a + schedule. Data-warehouse name: *aggregate table* / *summary table*; the + DB-maintained variant is a *materialized view* (`REFRESH MATERIALIZED + VIEW`). Trades write-amplification for refresh latency; reconcile becomes + "refresh the view" rather than per-row diff. + +**Naming pathway (so future work has somewhere to land, no build now):** +keep `CachedCountField(IntegerField)` as the *eager write-through scalar* +marker. Reserve a sibling concept for *lazy / invalidatable* caches that +carry a freshness signal — a class such as `LazyCachedField` / +`InvalidatableCachedField`, distinguished by an associated `computed_at` +companion (or documented NULL-sentinel contract). The discovery mechanism +stays one `_meta.get_fields()` + `isinstance` sweep; the **marker class +hierarchy tells the reconciler which strategy applies**: eager → set-based +diff (approach 1); lazy → check freshness/NULL, then recompute or merely +invalidate (approach 2); rollup-backed → refresh the aggregate. One +enumeration, per-class strategy. This keeps #1301's marker the right shape +and leaves a clear, named path to the expensive cases without widening the +current PR.