diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 784b1e8ab..e662707f4 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1178,25 +1178,114 @@ def cancel(self): Cancel a job. For async_api jobs, clean up NATS/Redis resources and transition through CANCELING → REVOKED. For other jobs, revoke the Celery task. + + The CANCELING and REVOKED writes go through the guarded chokepoint + (issue #1337) so a cancel cannot clobber a status another writer already + moved to a terminal state. If the job has already completed (SUCCESS) or + otherwise left the cancellable states, the guarded UPDATE no-ops and we + leave the status untouched — but we still revoke the Celery task and run + the NATS/Redis teardown below, since those resources may need releasing + regardless of the final status. """ - self.status = JobState.CANCELING - self.save() + # CANCELING is a transient marker: only advance into it from a still-active + # state, never from an already-terminal one (don't un-finish a finished job). + canceling = self._guarded_status_update( + JobState.CANCELING, + from_statuses=JobState.finalizable_states(), + ) + if canceling: + self.save(update_fields=["progress", "updated_at"]) if self.task_id: task = run_job.AsyncResult(self.task_id) - if task: - task.revoke(terminate=True) if self.dispatch_mode == JobDispatchMode.ASYNC_API: - # For async jobs we need to set the status to revoked here since the task already - # finished (it only queues the images). - self.status = JobState.REVOKED - self.save() + # The local run_job only queues images and has almost always + # finished by now, so terminating it does nothing about the work + # running on the remote ADC workers — that is stopped by tearing + # down the NATS stream + Redis state in cleanup below. Revoke + # without terminate so we never SIGTERM the bootstrap mid-flight on + # the rare occasion it is still running. (folds antenna#1324) + if task: + task.revoke() + # Set the status to revoked here. CANCELING is included in the + # from-set so this completes the cancel's own CANCELING → REVOKED + # progression. + revoked = self._guarded_status_update( + JobState.REVOKED, + from_statuses=JobState.finalizable_states() + [JobState.CANCELING], + set_finished=True, + ) + if revoked: + self.save(update_fields=["progress", "finished_at", "updated_at"]) + elif task: + # Sync / internal jobs do the actual work inside the celery task, + # so terminating it is the cancel mechanism. + task.revoke(terminate=True) else: - self.status = JobState.REVOKED - self.save() + revoked = self._guarded_status_update( + JobState.REVOKED, + # CANCELING included so this completes the cancel's own progression. + from_statuses=JobState.finalizable_states() + [JobState.CANCELING], + set_finished=True, + ) + if revoked: + self.save(update_fields=["progress", "finished_at", "updated_at"]) cleanup_async_job_if_needed(self) + def _guarded_status_update(self, to_status, from_statuses, *, set_finished=False): + """Move the job to ``to_status`` only if it is still in one of + ``from_statuses``, in a single atomic step. + + Why this exists: a job's status is written from several places, and for + async (pull-based) jobs the result batches that drive those writes arrive + concurrently. Without a guard each writer reads the whole job row, edits + it, and writes it back, so a slower writer working from a stale copy can + overwrite a status another writer already advanced. That made a finished + job look like it was running again, or flipped a cancelled/failed job to + SUCCESS. Because a lot of behaviour keys off job status — whether the job + is still handed out for more work, whether its resources get cleaned up, + what the UI shows — those wrong statuses were not cosmetic; they caused + knock-on bugs. See issue #1337. + + The guard is a single conditional ``UPDATE ... WHERE status IN + (from_statuses)``. It holds no transaction-length row lock (so it does not + reintroduce the contention #1261 removed): if a concurrent writer has + already moved the job out of ``from_statuses``, the UPDATE matches zero + rows and this call is a no-op, leaving that writer's status intact. + + The lock-free status writers all go through this method — it, the result + handler ``_update_job_progress`` (#1338), and the two Celery task-signal + handlers — which is why each passes a ``from_statuses`` precondition. The + remaining two writers enforce the same "don't revive a finished job" rule + under a row lock instead: ``_fail_job`` and the stale-job reaper + (``check_stale_jobs``) run under ``select_for_update`` and check the + current status before writing. The reaper deliberately keeps a broader + from-set so it can still force a genuinely stuck job out of + CANCELING/UNKNOWN — that is its role as the last resort. + + Caller contract: persist ``progress.summary.status`` into the JSONB + afterwards with a narrow ``save(update_fields=["progress", "updated_at"])``, + and only when this method reports the transition fired. + + Returns the number of rows updated (0 or 1). On a successful transition + the in-memory instance is advanced to match (``status``, optionally + ``finished_at``, and ``progress.summary.status``) so the caller can save + the progress blob without re-reading. + """ + fields = {"status": to_status} + now = None + if set_finished: + now = datetime.datetime.now() # Naive local time, matching finished_at writes elsewhere. + fields["finished_at"] = now + updated = Job.objects.filter(pk=self.pk, status__in=from_statuses).update(**fields) + if updated: + self.status = to_status + if set_finished: + self.finished_at = now + self.progress.summary.status = to_status + return updated + def update_status(self, status=None, save=True): """ Update the status of the job based on the status of the celery task. diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index dbf57fa38..b00213ebb 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -1404,7 +1404,21 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs): ) return - job.update_status(state) + # Write the terminal SUCCESS through the guarded helper (see #1337): it only + # transitions a job that is still in a non-terminal state, so a slow + # task_postrun cannot revive a job that a cancel, the stale-job reaper, or the + # result handler already finished. Non-terminal Celery states (STARTED, + # RETRY, ...) still go through update_status() unchanged. + if state == JobState.SUCCESS: + transitioned = job._guarded_status_update( + JobState.SUCCESS, + from_statuses=JobState.finalizable_states(), + set_finished=True, + ) + if transitioned: + job.save(update_fields=["progress", "finished_at", "updated_at"]) + else: + job.update_status(state) # Clean up async resources for revoked jobs if state == JobState.REVOKED: @@ -1432,11 +1446,21 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): ) return - job.update_status(JobState.FAILURE, save=False) + # Write the terminal FAILURE through the guarded helper (see #1337): it only + # transitions a job that is still in a non-terminal state, so a run_job + # failure cannot overwrite a status another writer already finished (e.g. a + # job a cancel just REVOKED, or one the result handler marked SUCCESS). If the + # guard makes no change we still log the error and run the teardown below. + transitioned = job._guarded_status_update( + JobState.FAILURE, + from_statuses=JobState.finalizable_states(), + set_finished=True, + ) job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}') - job.save() + if transitioned: + job.save(update_fields=["progress", "finished_at", "updated_at"]) # Clean up async resources for failed jobs cleanup_async_job_if_needed(job) diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 8dc99da29..760af1809 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1469,3 +1469,296 @@ def worker(): JobState.REVOKED.value, f"late results batch resurrected a REVOKED job to {self.job.status!r} (lost-update race)", ) + + +class TestTerminalTransitionChokepoint(TransactionTestCase): + """Regression tests for issue #1337, terminal-writer hardening. + + #1338 guarded the terminal write in ``_update_job_progress``. The other + terminal writers — ``Job.cancel()``, the ``task_postrun`` SUCCESS handler, + and the ``task_failure`` FAILURE handler — used to do an unguarded full-row + ``save()`` and could clobber a terminal status set by another writer. These + tests cover the sequential (non-interleaved) guarantees; the real concurrent + interleave for cancel-vs-complete is in ``TestCancelCompletionRace`` below. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Chokepoint Project") + self.pipeline = Pipeline.objects.create(name="CP Pipeline", slug="cp-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="CP Collection", project=self.project) + + def tearDown(self): + cache.clear() + + def _make_job(self, *, status=JobState.STARTED, task_id=None, dispatch_mode=JobDispatchMode.ASYNC_API) -> Job: + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Chokepoint Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=dispatch_mode, + status=status, + ) + if task_id is not None: + job.task_id = task_id + job.save(update_fields=["task_id"]) + return job + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_postrun_success_cannot_resurrect_revoked_job(self, _mock_cleanup): + """A late ``task_postrun`` SUCCESS must not flip a REVOKED job to SUCCESS.""" + from ami.jobs.tasks import update_job_status + + job = self._make_job(status=JobState.STARTED, task_id="postrun-revoked-task") + # Drive all stages complete so the postrun SUCCESS guard (is_complete()) + # does not defer, exercising the terminal write path itself. + for stage in ("collect", "process", "results"): + _update_job_progress(job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS) + # Another writer (a cancel / the reaper) marks the job terminal first. + Job.objects.filter(pk=job.pk).update(status=JobState.REVOKED) + + task = MagicMock() + task.request.kwargs = {"job_id": job.pk} + update_job_status(sender=None, task_id="postrun-revoked-task", task=task, state=JobState.SUCCESS) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_postrun_success_fires_from_active_state(self, _mock_cleanup): + """The happy path still transitions an active job to SUCCESS with finished_at.""" + from ami.jobs.tasks import update_job_status + + job = self._make_job(status=JobState.STARTED, task_id="postrun-success-task") + for stage in ("collect", "process", "results"): + _update_job_progress(job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS) + # Reset the column to a pre-terminal state so we can observe the postrun + # handler making the SUCCESS transition itself. + Job.objects.filter(pk=job.pk).update(status=JobState.STARTED, finished_at=None) + + task = MagicMock() + task.request.kwargs = {"job_id": job.pk} + update_job_status(sender=None, task_id="postrun-success-task", task=task, state=JobState.SUCCESS) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) + self.assertIsNotNone(job.finished_at) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_task_failure_cannot_resurrect_revoked_job(self, _mock_cleanup): + """A late ``task_failure`` FAILURE must not clobber an already-REVOKED job.""" + from ami.jobs.tasks import update_job_failure + + # SYNC_API so the in-flight async deferral guard does not short-circuit; + # we want to exercise the terminal FAILURE write itself. + job = self._make_job( + status=JobState.STARTED, task_id="failure-revoked-task", dispatch_mode=JobDispatchMode.SYNC_API + ) + Job.objects.filter(pk=job.pk).update(status=JobState.REVOKED) + + update_job_failure(sender=None, task_id="failure-revoked-task", exception=RuntimeError("late crash")) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + def test_cancel_of_already_success_job_is_noop_but_still_cleans_up(self, mock_cleanup): + """cancel() on a SUCCESS job leaves the status SUCCESS but still runs teardown. + + A job can complete (result handler sets SUCCESS) before an operator's + cancel lands. The guarded UPDATEs no-op (SUCCESS is outside their + from-sets), so the status is not regressed to CANCELING/REVOKED — but the + NATS/Redis teardown must still run, since a cancel is also the operator's + request to release any lingering async resources. + """ + job = self._make_job(status=JobState.SUCCESS, dispatch_mode=JobDispatchMode.ASYNC_API) + + job.cancel() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) + mock_cleanup.assert_called_once() + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + def test_cancel_of_active_async_job_revokes(self, mock_cleanup): + """cancel() on an active ASYNC_API job (no celery task) transitions to REVOKED.""" + job = self._make_job(status=JobState.STARTED, dispatch_mode=JobDispatchMode.ASYNC_API) + + job.cancel() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + self.assertIsNotNone(job.finished_at) + # Both sources of truth agree on the terminal state. + self.assertEqual(job.progress.summary.status, JobState.REVOKED) + mock_cleanup.assert_called_once() + + +class TestCancelCompletionRace(TransactionTestCase): + """Force the real concurrent interleave between cancel() and a completing + result batch (issue #1337). Mirrors ``TestConcurrentStatusRace``: a seam + parks one path mid-flight while the other commits its guarded UPDATE. + + The invariant: whichever path commits its guarded terminal UPDATE first + wins, and the loser's guard no-ops rather than clobbering with a full-row + save. A SUCCESS is never overwritten to REVOKED by a stale cancel, and a + REVOKED is never resurrected to SUCCESS by a late completion. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Cancel Race Project") + self.pipeline = Pipeline.objects.create(name="CRR Pipeline", slug="crr-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="CRR Collection", project=self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Cancel Race Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, + ) + + def tearDown(self): + cache.clear() + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_committing_first_is_not_overwritten_by_stale_cancel(self, _mock_t_cleanup, _mock_m_cleanup): + """A cancel that read STARTED, then parked, must not regress a status the + result handler committed to SUCCESS while the cancel was parked.""" + # Bring the job to the brink of completion: collect + process done. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + cancel_reached_seam = threading.Event() + completion_committed = threading.Event() + + # Park the cancel right before it issues its first guarded UPDATE. By + # patching run_job.AsyncResult (called between the CANCELING write and the + # REVOKED write) we can't seam cleanly; instead seam on _guarded_status_update + # itself for the CANCELING transition. + real_guarded = Job._guarded_status_update + + def seamed_guarded(self_job, to_status, *args, **kwargs): + if to_status == JobState.CANCELING: + cancel_reached_seam.set() + if not completion_committed.wait(timeout=10): + raise AssertionError("completion did not commit within timeout") + return real_guarded(self_job, to_status, *args, **kwargs) + + cancel_error: list[BaseException] = [] + + def canceller(): + from django.db import connection + + try: + fresh = Job.objects.get(pk=self.job.pk) # reads STARTED + with patch.object(Job, "_guarded_status_update", autospec=True, side_effect=seamed_guarded): + fresh.cancel() + except BaseException as exc: # noqa: BLE001 + cancel_error.append(exc) + finally: + connection.close() + + cancel_thread = threading.Thread(target=canceller, name="canceller") + cancel_thread.start() + + try: + if not cancel_reached_seam.wait(timeout=10): + raise AssertionError("cancel never reached the seam") + # The result handler completes the job (commits SUCCESS) while the + # cancel is parked holding its stale STARTED read. + _update_job_progress( + self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS + ) + completion_committed.set() + finally: + cancel_thread.join(timeout=15) + + if cancel_thread.is_alive(): + raise AssertionError("cancel thread deadlocked") + if cancel_error: + raise cancel_error[0] + + self.job.refresh_from_db() + # SUCCESS committed first; the cancel's guarded CANCELING/REVOKED UPDATEs + # no-op against the now-terminal SUCCESS, so the completion is preserved. + self.assertEqual( + self.job.status, + JobState.SUCCESS.value, + f"stale cancel clobbered a committed SUCCESS to {self.job.status!r}", + ) + # The progress blob was not full-row clobbered: it is recorded complete. + self.assertTrue(self.job.progress.is_complete()) + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_cancel_committing_first_is_not_resurrected_by_late_completion(self, _mock_t_cleanup, _mock_m_cleanup): + """A cancel that commits REVOKED must not be resurrected to SUCCESS by a + result batch that read STARTED before the cancel committed.""" + # Bring the job to the brink of completion. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + worker_reached_seam = threading.Event() + cancel_committed = threading.Event() + + real_get_counts = _update_job_progress.__globals__["_get_current_counts_from_job_progress"] + + def blocking_get_counts(job, stage): + counts = real_get_counts(job, stage) + worker_reached_seam.set() + if not cancel_committed.wait(timeout=10): + raise AssertionError("cancel did not commit within timeout") + return counts + + worker_error: list[BaseException] = [] + + def worker(): + from django.db import connection + + try: + with patch( + "ami.jobs.tasks._get_current_counts_from_job_progress", + side_effect=blocking_get_counts, + ): + _update_job_progress( + self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS + ) + except BaseException as exc: # noqa: BLE001 + worker_error.append(exc) + finally: + connection.close() + + worker_thread = threading.Thread(target=worker, name="late-results-worker") + worker_thread.start() + + try: + if not worker_reached_seam.wait(timeout=10): + raise AssertionError("worker never reached the seam") + # The cancel commits REVOKED while the result worker is parked. + fresh = Job.objects.get(pk=self.job.pk) + fresh.cancel() + cancel_committed.set() + finally: + worker_thread.join(timeout=15) + + if worker_thread.is_alive(): + raise AssertionError("worker thread deadlocked") + if worker_error: + raise worker_error[0] + + self.job.refresh_from_db() + # REVOKED committed first; the late completion's guarded UPDATE no-ops, so + # the job is not resurrected to SUCCESS. + self.assertEqual( + self.job.status, + JobState.REVOKED.value, + f"late completion resurrected a REVOKED job to {self.job.status!r}", + )