From 2b27144f95fa55489b18e3421301432dea430351 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 29 Apr 2026 14:49:53 -0700 Subject: [PATCH 1/7] fix(jobs): reaper checks Redis directly via all_tasks_processed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Job 2521 ended REVOKED despite NATS+Redis both confirming all 4510 tasks processed. Root cause: `_update_job_progress` writes the entire `Job.progress` JSONB blob without a row lock since #1261 dropped `select_for_update`. Concurrent workers raced; a slower committer clobbered the faster committer's SUCCESS write, leaving progress at `processed=4509 remaining=1 STARTED`. The reaper read the clobbered `progress.is_complete()` -> False -> REVOKE. Add `AsyncJobStateManager.all_tasks_processed() -> bool | None` (tri-state) backed directly by Redis SCARD across both pending sets, and inline it at the reaper guard in `check_stale_jobs`. Sync_api jobs unchanged (dispatch_mode gate). When Redis state is absent (cleanup, expiry, never initialized, or transient RedisError), fall back to `progress.is_complete()` and emit a WARNING so monitoring can flag the fallback path. Other 5 `progress.is_complete()` callers stay as-is — only the reaper guard is bitten by the clobber. Atomic-write follow-up tracked separately. Co-Authored-By: Claude --- ami/jobs/tasks.py | 28 +++- ami/jobs/tests/test_tasks.py | 192 ++++++++++++++++++++++++ ami/ml/orchestration/async_job_state.py | 30 ++++ ami/ml/tests.py | 54 +++++++ 4 files changed, 297 insertions(+), 7 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index b00213ebb..2cce502b6 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -976,9 +976,10 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ For each stale job, checks Celery for a terminal task status. REVOKED is always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted - when job.progress.is_complete() — NATS workers may still be delivering - results after the Celery task finishes. All other cases result in revocation. - Async resources (NATS/Redis) are cleaned up in both branches. + when AsyncJobStateManager.all_tasks_processed() reports True (i.e. the + Redis pending sets are drained). When Redis state is absent, falls back to + job.progress.is_complete(). All other cases result in revocation. Async + resources (NATS/Redis) are cleaned up in both branches. Returns a list of dicts describing what was done to each job. """ @@ -1028,12 +1029,25 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ # Treat as unknown state — job will be revoked below. # Only trust terminal Celery states. For async_api jobs, SUCCESS and - # FAILURE are only accepted when progress is complete — NATS workers may - # still be delivering results after the Celery task finishes. + # FAILURE are only accepted when all NATS tasks are processed — workers + # may still be delivering results after the Celery task finishes. + # Consult Redis (source of truth for SREM completeness) directly rather + # than Job.progress.is_complete(), which mirrors a JSONB blob racy under + # concurrent _update_job_progress writes since #1261. is_terminal = celery_state in states.READY_STATES is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API - if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete(): - is_terminal = False + if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}: + processed = AsyncJobStateManager(job.pk).all_tasks_processed() + if processed is False: + is_terminal = False + elif processed is None: + logger.warning( + "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()", + job.pk, + ) + if not job.progress.is_complete(): + is_terminal = False + # processed is True -> trust Celery's terminal state previous_status = job.status if is_terminal: diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 760af1809..bfc808c2b 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1762,3 +1762,195 @@ def worker(): JobState.REVOKED.value, f"late completion resurrected a REVOKED job to {self.job.status!r}", ) + + +class TestCheckStaleJobsReaperGuard(TransactionTestCase): + """Reaper guard for async_api jobs: the reaper consults Redis directly + (AsyncJobStateManager.all_tasks_processed) rather than + Job.progress.is_complete() to decide whether a Celery SUCCESS on a stale + job is trustworthy. This class verifies the Redis-direct path, the + absent-state fallback to progress.is_complete() with a WARNING log, and + that sync_api jobs are unaffected by the guard.""" + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Reaper Guard Project") + self.pipeline = Pipeline.objects.create(name="Reaper Pipeline", slug="reaper-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="Reaper Coll", project=self.project) + + def tearDown(self): + cache.clear() + + def _stale_async_job(self, *, task_id: str = "reaper-task") -> Job: + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="reaper async job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + ) + job.task_id = task_id + job.update_status(JobState.STARTED, save=True) + return job + + def _mark_stale(self, job: Job) -> None: + """Push updated_at back past STALLED_JOBS_MAX_MINUTES via raw update so + Job.save() side-effects (auto_now) don't undo it. Call AFTER any helper + that touches the job model.""" + Job.objects.filter(pk=job.pk).update( + updated_at=datetime.datetime.now() - datetime.timedelta(minutes=Job.STALLED_JOBS_MAX_MINUTES + 1) + ) + job.refresh_from_db() + + def _set_progress_clobbered(self, job: Job, total: int, processed: int) -> None: + """Mimic the incident's Job.progress shape: process stage at processed/total + with status STARTED, even though Redis was actually fully drained.""" + progress = job.progress + collect = progress.get_stage("collect") + collect.progress = 1.0 + collect.status = JobState.SUCCESS + progress.update_stage( + "process", + progress=processed / total, + status=JobState.STARTED, + processed=processed, + remaining=total - processed, + failed=0, + ) + progress.update_stage( + "results", + progress=processed / total, + status=JobState.STARTED, + captures=processed, + ) + job.save() + + def _set_progress_complete(self, job: Job) -> None: + progress = job.progress + for key in ("collect", "process", "results"): + stage = progress.get_stage(key) + stage.progress = 1.0 + stage.status = JobState.SUCCESS + job.save() + + @patch("celery.result.AsyncResult") + def test_async_celery_success_redis_empty_progress_clobbered_lands_success(self, mock_async_result): + """The incident case. Pre-fix, this came back REVOKED because the reaper + consulted progress.is_complete() (False, due to clobber). Post-fix, + Redis says all_tasks_processed() → True, so SUCCESS is honored.""" + from ami.jobs.tasks import check_stale_jobs + + job = self._stale_async_job() + ids = [str(i) for i in range(10)] + manager = AsyncJobStateManager(job.pk) + manager.initialize_job(ids) + manager.update_state(set(ids), stage="process") + manager.update_state(set(ids), stage="results") + # Clobber: progress shows mid-flight even though Redis is drained. + self._set_progress_clobbered(job, total=10, processed=9) + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.SUCCESS + + check_stale_jobs() + + job.refresh_from_db() + self.assertEqual( + job.status, + JobState.SUCCESS.value, + f"clobbered progress should not block SUCCESS when Redis says drained; got {job.status}", + ) + + @patch("celery.result.AsyncResult") + def test_async_celery_success_redis_pending_lands_revoked(self, mock_async_result): + """Redis still has pending ids → genuine in-flight; reaper revokes.""" + from ami.jobs.tasks import check_stale_jobs + + job = self._stale_async_job() + ids = [str(i) for i in range(10)] + AsyncJobStateManager(job.pk).initialize_job(ids) + # No SREMs — pending sets still full. + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.SUCCESS + + check_stale_jobs() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + + @patch("celery.result.AsyncResult") + def test_async_celery_success_redis_absent_progress_complete_lands_success(self, mock_async_result): + """Redis state is gone (cleaned up / never initialized). Reaper falls + back to progress.is_complete(). Complete progress → SUCCESS + WARNING.""" + from ami.jobs.tasks import check_stale_jobs + + job = self._stale_async_job() + # Don't initialize Redis state. + self._set_progress_complete(job) + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.SUCCESS + + with self.assertLogs("ami.jobs.tasks", level="WARNING") as cm: + check_stale_jobs() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) + self.assertTrue(any("Redis state absent" in m for m in cm.output)) + + @patch("celery.result.AsyncResult") + def test_async_celery_success_redis_absent_progress_incomplete_lands_revoked(self, mock_async_result): + """Redis absent + progress incomplete → REVOKED via fallback + WARNING.""" + from ami.jobs.tasks import check_stale_jobs + + job = self._stale_async_job() + # Don't initialize Redis. Default progress is fresh (not complete). + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.SUCCESS + + with self.assertLogs("ami.jobs.tasks", level="WARNING") as cm: + check_stale_jobs() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + self.assertTrue(any("Redis state absent" in m for m in cm.output)) + + @patch("celery.result.AsyncResult") + def test_sync_api_celery_success_lands_success_without_redis_check(self, mock_async_result): + """sync_api jobs skip the Redis guard entirely — Celery's terminal + state is authoritative. No Redis state initialized; if the new path + leaked into sync_api this would REVOKE.""" + from ami.jobs.tasks import check_stale_jobs + + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="reaper sync job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.SYNC_API, + ) + job.task_id = "reaper-sync-task" + job.update_status(JobState.STARTED, save=True) + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.SUCCESS + + check_stale_jobs() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) diff --git a/ami/ml/orchestration/async_job_state.py b/ami/ml/orchestration/async_job_state.py index 26e7c3024..4c9502a60 100644 --- a/ami/ml/orchestration/async_job_state.py +++ b/ami/ml/orchestration/async_job_state.py @@ -228,6 +228,36 @@ def get_pending_image_ids(self) -> set[str]: return set() return {m.decode() if isinstance(m, (bytes, bytearray)) else str(m) for m in members} + def all_tasks_processed(self) -> bool | None: + """Tri-state truth signal for NATS-task SREM completeness across both + process and results pending sets. + + True — both pending sets empty AND total > 0 (or total == 0) + False — at least one pending set has members + None — Redis state absent (cleaned up, expired, never initialized, + or transient RedisError) + + Scope: tracks NATS task lifecycle only; does not know about `collect` + or any future post-results stages. + """ + try: + redis = self._get_redis() + with redis.pipeline() as pipe: + for stage in self.STAGES: + pipe.scard(self._get_pending_key(stage)) + pipe.get(self._total_key) + results = pipe.execute() + except RedisError as e: + logger.warning(f"Redis error reading all_tasks_processed for job {self.job_id}: {e}") + return None + + *pending_counts, total_raw = results + if total_raw is None: + return None + if int(total_raw) == 0: + return True + return all(count == 0 for count in pending_counts) + def cleanup(self) -> None: """ Delete all Redis keys associated with this job. diff --git a/ami/ml/tests.py b/ami/ml/tests.py index ea375135e..5de78aaff 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -1630,6 +1630,60 @@ def test_update_state_returns_none_when_state_genuinely_missing(self): progress = self.manager.update_state({"img1", "img2"}, "process") self.assertIsNone(progress) + def test_all_tasks_processed_fresh_init_returns_false(self): + """Just-initialized job has all images pending in both stages.""" + self._init_and_verify(self.image_ids) + self.assertFalse(self.manager.all_tasks_processed()) + + def test_all_tasks_processed_after_drain_returns_true(self): + """SREM-ing every id from both stages → True.""" + self._init_and_verify(self.image_ids) + self.manager.update_state(set(self.image_ids), "process") + self.manager.update_state(set(self.image_ids), "results") + self.assertTrue(self.manager.all_tasks_processed()) + + def test_all_tasks_processed_partial_stage_returns_false(self): + """Process drained but results still pending → False.""" + self._init_and_verify(self.image_ids) + self.manager.update_state(set(self.image_ids), "process") + # results stage untouched + self.assertFalse(self.manager.all_tasks_processed()) + + def test_all_tasks_processed_zero_total_returns_true(self): + """A zero-images job is trivially complete.""" + self.manager.initialize_job([]) + self.assertTrue(self.manager.all_tasks_processed()) + + def test_all_tasks_processed_never_initialized_returns_none(self): + """No init → total key absent → None (caller falls back).""" + # Do NOT call initialize_job + self.assertIsNone(self.manager.all_tasks_processed()) + + def test_all_tasks_processed_after_cleanup_returns_none(self): + """After cleanup() the total key is gone → None.""" + self._init_and_verify(self.image_ids) + self.manager.cleanup() + self.assertIsNone(self.manager.all_tasks_processed()) + + def test_all_tasks_processed_returns_none_on_redis_error(self): + """Transient RedisError → WARNING + None (caller falls back).""" + from unittest.mock import MagicMock, patch + + from redis.exceptions import RedisError + + self._init_and_verify(self.image_ids) + + pipe = MagicMock() + pipe.execute.side_effect = RedisError("Connection reset by peer") + fake_redis = MagicMock() + fake_redis.pipeline.return_value.__enter__.return_value = pipe + + with patch.object(self.manager, "_get_redis", return_value=fake_redis): + with self.assertLogs("ami.ml.orchestration.async_job_state", level="WARNING") as cm: + result = self.manager.all_tasks_processed() + self.assertIsNone(result) + self.assertTrue(any("Redis error reading all_tasks_processed" in m for m in cm.output)) + class TestSaveResultsRefreshesDeploymentCounts(TestCase): """save_results must refresh Deployment cached counts, not just Event counts. From c1fd65e5150cca66e2eaa081496f0b8c60176d58 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 17 Jun 2026 16:27:28 -0700 Subject: [PATCH 2/7] fix(jobs): scope reaper fast-path to Celery SUCCESS, not FAILURE Address review feedback on the stale-job reaper guard. CodeRabbit (major): the reaper trusted both Celery SUCCESS and FAILURE as terminal for drained async_api jobs. But update_job_failure() defers post-queue run_job failures for async_api jobs to the async result handler, which decides the final status from the processed/failed counts against FAILURE_THRESHOLD (a drained-but-failed task can still resolve to SUCCESS). Trusting Celery FAILURE here would force the job to FAILURE and bypass that logic. Scope the Redis-backed fast-path to SUCCESS only; a stale async_api job whose Celery task ended FAILURE now falls through to the revoke branch instead. Add a regression test. Copilot (nit): the fallback WARNING said "Redis state absent" but all_tasks_processed() also returns None on transient RedisError. Reworded to "Redis state unavailable" to cover both causes; updated the assertions. Narrow both reaper save points to update_fields=[status, progress, finished_at, updated_at]. A full save() re-writes the whole row from the snapshot fetched at select_for_update() time, which can clobber `logs` and `progress.errors` written by a concurrent _update_job_progress (no row lock since #1261). Mirrors the narrowed-write pattern already used in _update_job_progress. Also remove three docs/claude session-artifact files that were added on this branch. They contain production job IDs, internal infrastructure references, and other operational detail that does not belong in a public repository. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 60 ++++++++++++++++++++++++++---------- ami/jobs/tests/test_tasks.py | 53 ++++++++++++++++++++++++++----- 2 files changed, 89 insertions(+), 24 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 2cce502b6..a751a8aba 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -975,11 +975,15 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ healthy. Default cutoff is :attr:`Job.STALLED_JOBS_MAX_MINUTES`. For each stale job, checks Celery for a terminal task status. REVOKED is - always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted - when AsyncJobStateManager.all_tasks_processed() reports True (i.e. the - Redis pending sets are drained). When Redis state is absent, falls back to - job.progress.is_complete(). All other cases result in revocation. Async - resources (NATS/Redis) are cleaned up in both branches. + always trusted. For async_api jobs, only SUCCESS is fast-pathed to a + terminal status, and only when AsyncJobStateManager.all_tasks_processed() + reports True (i.e. the Redis pending sets are drained); when Redis state is + unavailable it falls back to job.progress.is_complete(). Celery FAILURE is + not trusted for async_api jobs — update_job_failure() defers the terminal + outcome to the async result handler (FAILURE_THRESHOLD logic), so a stale + FAILED async_api job is revoked rather than forced to FAILURE. All other + cases result in revocation. Async resources (NATS/Redis) are cleaned up in + both branches. Returns a list of dicts describing what was done to each job. """ @@ -1028,33 +1032,54 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ ) # Treat as unknown state — job will be revoked below. - # Only trust terminal Celery states. For async_api jobs, SUCCESS and - # FAILURE are only accepted when all NATS tasks are processed — workers - # may still be delivering results after the Celery task finishes. - # Consult Redis (source of truth for SREM completeness) directly rather - # than Job.progress.is_complete(), which mirrors a JSONB blob racy under - # concurrent _update_job_progress writes since #1261. + # Only trust terminal Celery states. For async_api jobs, a SUCCESS + # Celery state is only accepted when all NATS tasks are processed — + # workers may still be delivering results after the Celery task + # finishes. Consult Redis (source of truth for SREM completeness) + # directly rather than Job.progress.is_complete(), which mirrors a + # JSONB blob racy under concurrent _update_job_progress writes since + # #1261. + # + # Celery FAILURE is deliberately NOT fast-pathed to a terminal + # status here: update_job_failure() defers post-queue run_job + # failures for async_api jobs to the async result handler, which + # decides the terminal outcome from the final processed/failed + # counts against FAILURE_THRESHOLD (a drained-but-failed Celery task + # can still resolve to SUCCESS). Trusting Celery FAILURE here would + # force the job to FAILURE and bypass that threshold logic, so a + # stale async_api job whose Celery task ended FAILURE falls through + # to the revoke branch instead. is_terminal = celery_state in states.READY_STATES is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API - if is_async_api and celery_state in {states.SUCCESS, states.FAILURE}: + if is_async_api and celery_state == states.SUCCESS: processed = AsyncJobStateManager(job.pk).all_tasks_processed() if processed is False: is_terminal = False elif processed is None: logger.warning( - "Reaper for job %s: Redis state absent, falling back to " "progress.is_complete()", + "Reaper for job %s: Redis state unavailable, falling back to " "progress.is_complete()", job.pk, ) if not job.progress.is_complete(): is_terminal = False - # processed is True -> trust Celery's terminal state + # processed is True -> trust Celery SUCCESS + elif is_async_api and celery_state == states.FAILURE: + # Don't treat Celery FAILURE as authoritative for async_api jobs + # (see comment above); revoke instead of forcing FAILURE. + is_terminal = False previous_status = job.status if is_terminal: if not dry_run: job.update_status(celery_state, save=False) job.finished_at = datetime.datetime.now() - job.save() + # Narrow the write to the fields the reaper actually mutates. + # A full save() would re-write the whole row from the snapshot + # fetched at select_for_update() time, clobbering `logs` and + # `progress.errors` that a concurrent _update_job_progress (no + # row lock since #1261) may commit. update_status() touches + # status + progress.summary.status, so `progress` is included. + job.save(update_fields=["status", "progress", "finished_at", "updated_at"]) else: # Per-job diagnostic: surface enough state at revoke time that an # operator can answer "why was this stalled?" without grepping @@ -1076,7 +1101,10 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ if not dry_run: job.update_status(JobState.REVOKED, save=False) job.finished_at = datetime.datetime.now() - job.save() + # See note on the terminal branch above: narrow the write so + # the reaper doesn't clobber a concurrent progress writer's + # `logs` / `progress.errors`. + job.save(update_fields=["status", "progress", "finished_at", "updated_at"]) # Async resource cleanup runs outside the transaction — it makes network # calls (NATS/Redis) that should not hold the DB row lock. diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index bfc808c2b..7f99b9f38 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1765,12 +1765,15 @@ def worker(): class TestCheckStaleJobsReaperGuard(TransactionTestCase): - """Reaper guard for async_api jobs: the reaper consults Redis directly - (AsyncJobStateManager.all_tasks_processed) rather than - Job.progress.is_complete() to decide whether a Celery SUCCESS on a stale - job is trustworthy. This class verifies the Redis-direct path, the - absent-state fallback to progress.is_complete() with a WARNING log, and - that sync_api jobs are unaffected by the guard.""" + """Reaper guard for async_api jobs: a SUCCESS Celery state is only accepted + when AsyncJobStateManager.all_tasks_processed() reports True. The earlier + guard read Job.progress.is_complete() — racy under concurrent + _update_job_progress writes since #1261 dropped select_for_update. A + production job once landed REVOKED with NATS+Redis fully drained because a + slower committer clobbered the SUCCESS write. This class verifies the new + Redis-direct path, the unavailable-state fallback to progress.is_complete() + with a WARNING, that Celery FAILURE is not fast-pathed to a terminal + FAILURE for async_api jobs, and that sync_api jobs are unaffected.""" def setUp(self): cache.clear() @@ -1905,7 +1908,7 @@ def test_async_celery_success_redis_absent_progress_complete_lands_success(self, job.refresh_from_db() self.assertEqual(job.status, JobState.SUCCESS.value) - self.assertTrue(any("Redis state absent" in m for m in cm.output)) + self.assertTrue(any("Redis state unavailable" in m for m in cm.output)) @patch("celery.result.AsyncResult") def test_async_celery_success_redis_absent_progress_incomplete_lands_revoked(self, mock_async_result): @@ -1925,7 +1928,7 @@ def test_async_celery_success_redis_absent_progress_incomplete_lands_revoked(sel job.refresh_from_db() self.assertEqual(job.status, JobState.REVOKED.value) - self.assertTrue(any("Redis state absent" in m for m in cm.output)) + self.assertTrue(any("Redis state unavailable" in m for m in cm.output)) @patch("celery.result.AsyncResult") def test_sync_api_celery_success_lands_success_without_redis_check(self, mock_async_result): @@ -1954,3 +1957,37 @@ def test_sync_api_celery_success_lands_success_without_redis_check(self, mock_as job.refresh_from_db() self.assertEqual(job.status, JobState.SUCCESS.value) + + @patch("celery.result.AsyncResult") + def test_async_celery_failure_redis_drained_lands_revoked_not_failure(self, mock_async_result): + """A Celery FAILURE on an async_api job is never fast-pathed to a + terminal FAILURE here, even when Redis reports the pending sets drained. + + update_job_failure() deliberately defers post-queue run_job failures for + async_api jobs to the async result handler, which decides the terminal + outcome from the final processed/failed counts against FAILURE_THRESHOLD + (a drained-but-failed task can still resolve to SUCCESS). The reaper must + not pre-empt that by forcing FAILURE, so the stale job is REVOKED instead. + """ + from ami.jobs.tasks import check_stale_jobs + + job = self._stale_async_job(task_id="reaper-failure-task") + ids = [str(i) for i in range(10)] + manager = AsyncJobStateManager(job.pk) + manager.initialize_job(ids) + manager.update_state(set(ids), stage="process") + manager.update_state(set(ids), stage="results") + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.FAILURE + + check_stale_jobs() + + job.refresh_from_db() + self.assertEqual( + job.status, + JobState.REVOKED.value, + f"Celery FAILURE must not be forced to terminal FAILURE for async_api; got {job.status}", + ) From 6148387f96d4f615313b8c7d8cc5d47126ad02d5 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 14:18:10 -0700 Subject: [PATCH 3/7] refactor(jobs): keep reaper Celery/Redis I/O off the row lock The stale-job reaper fetched Celery task state and queried Redis (all_tasks_processed) while holding a select_for_update() row lock inside transaction.atomic(). Those are network round-trips, and holding the lock across them prolongs contention with the lock-free result handler that writes the same row. Split into two phases: gather the Celery/Redis decision with no row lock, then re-fetch under a short select_for_update() with the same staleness predicate and persist. If the job progressed or finalized while off the lock, the row no longer matches and it is skipped. --- ami/jobs/tasks.py | 120 +++++++++++++++++++++++++++------------------- 1 file changed, 70 insertions(+), 50 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index a751a8aba..6f39f8255 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -1008,6 +1008,76 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ results = [] for pk in stale_pks: + # Phase 1 — gather external Celery/Redis state WITHOUT holding the row + # lock. Both the Celery result lookup and the Redis completeness check + # are network round-trips; doing them under select_for_update() would + # extend the lock hold time and block the lock-free result handler that + # also writes this row (its guarded UPDATE since #1261). We re-check and + # write under a short lock in Phase 2 below. + try: + job = Job.objects.get( + pk=pk, + status__in=JobState.running_states(), + updated_at__lt=cutoff, + ) + except Job.DoesNotExist: + # Another concurrent run already handled this job, or it progressed. + continue + + celery_state = None + if job.task_id: + try: + celery_state = AsyncResult(job.task_id).state + except Exception: + logger.warning( + "Failed to fetch Celery state for stale job %s (task_id=%s)", + job.pk, + job.task_id, + exc_info=True, + ) + # Treat as unknown state — job will be revoked below. + + # Only trust terminal Celery states. For async_api jobs, a SUCCESS + # Celery state is only accepted when all NATS tasks are processed — + # workers may still be delivering results after the Celery task + # finishes. Consult Redis (source of truth for SREM completeness) + # directly rather than Job.progress.is_complete(), which mirrors a + # JSONB blob racy under concurrent _update_job_progress writes since + # #1261. + # + # Celery FAILURE is deliberately NOT fast-pathed to a terminal + # status here: update_job_failure() defers post-queue run_job + # failures for async_api jobs to the async result handler, which + # decides the terminal outcome from the final processed/failed + # counts against FAILURE_THRESHOLD (a drained-but-failed Celery task + # can still resolve to SUCCESS). Trusting Celery FAILURE here would + # force the job to FAILURE and bypass that threshold logic, so a + # stale async_api job whose Celery task ended FAILURE falls through + # to the revoke branch instead. + is_terminal = celery_state in states.READY_STATES + is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API + if is_async_api and celery_state == states.SUCCESS: + processed = AsyncJobStateManager(job.pk).all_tasks_processed() + if processed is False: + is_terminal = False + elif processed is None: + logger.warning( + "Reaper for job %s: Redis state unavailable, falling back to " "progress.is_complete()", + job.pk, + ) + if not job.progress.is_complete(): + is_terminal = False + # processed is True -> trust Celery SUCCESS + elif is_async_api and celery_state == states.FAILURE: + # Don't treat Celery FAILURE as authoritative for async_api jobs + # (see comment above); revoke instead of forcing FAILURE. + is_terminal = False + + # Phase 2 — short locked transaction. Re-fetch under select_for_update() + # with the same staleness predicate: if the job progressed or was + # finalized while we were off the lock doing Celery/Redis I/O, the row no + # longer matches and we skip it. Otherwise the Phase 1 decision still + # holds and we persist it. No network I/O happens under this lock. with transaction.atomic(): try: job = Job.objects.select_for_update().get( @@ -1016,58 +1086,8 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[ updated_at__lt=cutoff, ) except Job.DoesNotExist: - # Another concurrent run already handled this job. continue - celery_state = None - if job.task_id: - try: - celery_state = AsyncResult(job.task_id).state - except Exception: - logger.warning( - "Failed to fetch Celery state for stale job %s (task_id=%s)", - job.pk, - job.task_id, - exc_info=True, - ) - # Treat as unknown state — job will be revoked below. - - # Only trust terminal Celery states. For async_api jobs, a SUCCESS - # Celery state is only accepted when all NATS tasks are processed — - # workers may still be delivering results after the Celery task - # finishes. Consult Redis (source of truth for SREM completeness) - # directly rather than Job.progress.is_complete(), which mirrors a - # JSONB blob racy under concurrent _update_job_progress writes since - # #1261. - # - # Celery FAILURE is deliberately NOT fast-pathed to a terminal - # status here: update_job_failure() defers post-queue run_job - # failures for async_api jobs to the async result handler, which - # decides the terminal outcome from the final processed/failed - # counts against FAILURE_THRESHOLD (a drained-but-failed Celery task - # can still resolve to SUCCESS). Trusting Celery FAILURE here would - # force the job to FAILURE and bypass that threshold logic, so a - # stale async_api job whose Celery task ended FAILURE falls through - # to the revoke branch instead. - is_terminal = celery_state in states.READY_STATES - is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API - if is_async_api and celery_state == states.SUCCESS: - processed = AsyncJobStateManager(job.pk).all_tasks_processed() - if processed is False: - is_terminal = False - elif processed is None: - logger.warning( - "Reaper for job %s: Redis state unavailable, falling back to " "progress.is_complete()", - job.pk, - ) - if not job.progress.is_complete(): - is_terminal = False - # processed is True -> trust Celery SUCCESS - elif is_async_api and celery_state == states.FAILURE: - # Don't treat Celery FAILURE as authoritative for async_api jobs - # (see comment above); revoke instead of forcing FAILURE. - is_terminal = False - previous_status = job.status if is_terminal: if not dry_run: From f81b6b975fa28320c348dd496dd220668b41cdaf Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 17:11:01 -0700 Subject: [PATCH 4/7] style(migrations): apply black formatting to two migration files [skip ci] Co-Authored-By: Claude Fable 5 --- ami/ml/migrations/0027_rename_last_checked_to_last_seen.py | 1 - ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py | 1 - 2 files changed, 2 deletions(-) diff --git a/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py b/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py index 4f14eee7c..9d429daf3 100644 --- a/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py +++ b/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py @@ -2,7 +2,6 @@ class Migration(migrations.Migration): - dependencies = [ ("ml", "0026_make_processing_service_endpoint_url_nullable"), ] diff --git a/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py b/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py index 510cc8104..8eee31c1d 100644 --- a/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py +++ b/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py @@ -7,7 +7,6 @@ def normalize_empty_endpoint_url(apps, schema_editor): class Migration(migrations.Migration): - dependencies = [ ("ml", "0027_rename_last_checked_to_last_seen"), ] From 5465da4c7d560af08261399a06c651f907bc1ca9 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 17:11:38 -0700 Subject: [PATCH 5/7] docs(tests): make reaper guard test docstrings self-contained [skip ci] Two docstrings in TestCheckStaleJobsReaperGuard referenced past context ("the incident case", "pre-fix", "mimic the incident's") that only parsed if the reader already knew the backstory. Rewrite them in present-tense, condition-first form so each docstring describes what the test verifies without requiring knowledge of any prior incident. Co-Authored-By: Claude Fable 5 --- ami/jobs/tests/test_tasks.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 7f99b9f38..454785cc9 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1808,8 +1808,10 @@ def _mark_stale(self, job: Job) -> None: job.refresh_from_db() def _set_progress_clobbered(self, job: Job, total: int, processed: int) -> None: - """Mimic the incident's Job.progress shape: process stage at processed/total - with status STARTED, even though Redis was actually fully drained.""" + """Write a progress blob that looks mid-flight (process stage at processed/total, + status STARTED) even though the caller has already drained all pending ids from + Redis. Simulates a concurrent writer clobbering the completion record between + the SREM that drained the last id and the DB save that would have recorded it.""" progress = job.progress collect = progress.get_stage("collect") collect.progress = 1.0 @@ -1840,9 +1842,9 @@ def _set_progress_complete(self, job: Job) -> None: @patch("celery.result.AsyncResult") def test_async_celery_success_redis_empty_progress_clobbered_lands_success(self, mock_async_result): - """The incident case. Pre-fix, this came back REVOKED because the reaper - consulted progress.is_complete() (False, due to clobber). Post-fix, - Redis says all_tasks_processed() → True, so SUCCESS is honored.""" + """When Redis pending sets are drained but progress.is_complete() returns False + (because a concurrent writer clobbered the completion record), the reaper trusts + the Redis result and lands SUCCESS rather than REVOKED.""" from ami.jobs.tasks import check_stale_jobs job = self._stale_async_job() From 3544558789a98d770b8610c3490e071c144c12b6 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 17:11:50 -0700 Subject: [PATCH 6/7] test(jobs): cover Phase-1/Phase-2 skip path in stale-job reaper Add a deterministic test for the race window introduced by the two-phase reaper refactor: a job that is stale at Phase-1 read but finalized by a concurrent writer before Phase-2's select_for_update re-fetch must be skipped entirely, with the concurrent writer's status preserved. The test patches AsyncJobStateManager.all_tasks_processed so that its side effect advances the job to SUCCESS (moving it outside running_states) before returning, then asserts that check_stale_jobs makes no further write and leaves the status as SUCCESS. Co-Authored-By: Claude Fable 5 --- ami/jobs/tests/test_tasks.py | 54 ++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 454785cc9..1162103d5 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1993,3 +1993,57 @@ def test_async_celery_failure_redis_drained_lands_revoked_not_failure(self, mock JobState.REVOKED.value, f"Celery FAILURE must not be forced to terminal FAILURE for async_api; got {job.status}", ) + + @patch("celery.result.AsyncResult") + def test_phase2_skip_when_job_finalized_between_phases(self, mock_async_result): + """A job finalized by another writer between Phase 1 and Phase 2 is skipped. + + Phase 1 reads the job as stale and decides it is terminal (Celery SUCCESS, + Redis drained). Between Phase 1's decision and Phase 2's select_for_update + re-fetch, a concurrent writer moves the job to SUCCESS. Phase 2's predicate + (status__in=running_states()) no longer matches, so Job.DoesNotExist is + raised and the reaper skips the job without further modification. The status + the concurrent writer set must survive unchanged. + """ + from ami.jobs.tasks import check_stale_jobs + from ami.ml.orchestration.async_job_state import AsyncJobStateManager as _ASM + + job = self._stale_async_job(task_id="reaper-phase2-skip-task") + ids = [str(i) for i in range(5)] + manager = AsyncJobStateManager(job.pk) + manager.initialize_job(ids) + manager.update_state(set(ids), stage="process") + manager.update_state(set(ids), stage="results") + self._mark_stale(job) + + from celery import states as celery_states + + mock_async_result.return_value.state = celery_states.SUCCESS + + # During Phase 1, when all_tasks_processed() is called, simulate a + # concurrent writer finalizing the job to SUCCESS. The reaper's Phase-2 + # select_for_update will then find the job outside running_states() and + # skip it. + real_all_tasks_processed = _ASM.all_tasks_processed + + def finalizing_all_tasks_processed(self_asm): + result = real_all_tasks_processed(self_asm) + # Side effect: another writer finalizes the job while the reaper + # has not yet entered its Phase-2 lock. + Job.objects.filter(pk=self_asm.job_id).update( + status=JobState.SUCCESS, + updated_at=datetime.datetime.now(), + ) + return result + + with patch.object(_ASM, "all_tasks_processed", side_effect=finalizing_all_tasks_processed, autospec=True): + check_stale_jobs() + + job.refresh_from_db() + # The concurrent writer's SUCCESS must survive: the reaper skipped the + # job in Phase 2 rather than applying its own status write. + self.assertEqual( + job.status, + JobState.SUCCESS.value, + f"Phase-2 skip must leave the concurrent writer's status intact; got {job.status}", + ) From 27e24f8b6a7ff3761aa43406431e786f460ff5fb Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 17:58:08 -0700 Subject: [PATCH 7/7] chore: restore ml migrations 0027/0028 to their committed form These two migrations were incidentally reformatted by black (a blank line removed after the Migration class declaration) while this branch was being tidied. Existing migrations should not be modified, so this restores them byte-for-byte to the form already on main. No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/ml/migrations/0027_rename_last_checked_to_last_seen.py | 1 + ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py | 1 + 2 files changed, 2 insertions(+) diff --git a/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py b/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py index 9d429daf3..4f14eee7c 100644 --- a/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py +++ b/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py @@ -2,6 +2,7 @@ class Migration(migrations.Migration): + dependencies = [ ("ml", "0026_make_processing_service_endpoint_url_nullable"), ] diff --git a/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py b/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py index 8eee31c1d..510cc8104 100644 --- a/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py +++ b/ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py @@ -7,6 +7,7 @@ def normalize_empty_endpoint_url(apps, schema_editor): class Migration(migrations.Migration): + dependencies = [ ("ml", "0027_rename_last_checked_to_last_seen"), ]