From d0b80f1697b072ec7ec9a179a15c2543fffa8c21 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 18 Jun 2026 17:33:15 -0700 Subject: [PATCH 1/4] fix(jobs): route all terminal status writes through one guarded chokepoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #1337 is a lost-update race on the job status column. PR #1338 fixed one writer — `_update_job_progress` — by splitting the terminal status write out of the progress-blob save and performing it as a guarded, statement-scope UPDATE that only fires from a pre-terminal status. The other four terminal writers still did an unguarded full-row `save()` and could clobber a terminal status from the opposite direction: a cancel could overwrite a just-committed SUCCESS with REVOKED, and a stale `task_postrun` SUCCESS or `task_failure` FAILURE could resurrect a job another writer had already revoked. This change adds a single `Job._guarded_status_update(to_status, from_statuses, *, set_finished=False)` helper that performs the guarded UPDATE (no row lock, so it does not reintroduce the contention #1261 removed) and advances the in-memory instance only when the transition actually fires. The remaining terminal writers are routed through it: - `Job.cancel()`: CANCELING and REVOKED are now guarded UPDATEs. The `task.revoke()` and `cleanup_async_job_if_needed()` calls still run regardless of whether the guard fired, since a job may already be terminal but still need its NATS/Redis resources released. - `update_job_status` (task_postrun): only the terminal SUCCESS path is guarded; non-terminal celery states still flow through the dual-use `update_status()` unchanged. - `update_job_failure` (task_failure): the terminal FAILURE write is guarded, keeping the existing in-flight-async deferral guard intact. `_update_job_progress` and `_fail_job` are left as-is: the former is already guarded by #1338, and the latter is already safe via `select_for_update` plus a status precondition. After a guarded transition, callers persist `progress.summary.status` into the JSONB with a narrow `save(update_fields=["progress", ...])` rather than a full save, matching #1338 and avoiding clobbering other columns. The save only happens when the guard fired, so an already-terminal job keeps both its status column and its summary.status. One intentional behavior change: `update_job_failure` now sets `finished_at` when it marks FAILURE (it previously left it unset), making a failed terminal job consistent with `_fail_job` and the result handler. Adds sequential regression tests (postrun/failure cannot resurrect a REVOKED job; cancel of an already-SUCCESS job no-ops on status but still cleans up) and two real-concurrency tests that interleave cancel against a completing result batch in both directions. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/models.py | 77 ++++++++- ami/jobs/tasks.py | 30 +++- ami/jobs/tests/test_tasks.py | 293 +++++++++++++++++++++++++++++++++++ 3 files changed, 390 insertions(+), 10 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 784b1e8ab..c19ca56a0 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1178,9 +1178,23 @@ def cancel(self): Cancel a job. For async_api jobs, clean up NATS/Redis resources and transition through CANCELING → REVOKED. For other jobs, revoke the Celery task. + + The CANCELING and REVOKED writes go through the guarded chokepoint + (issue #1337) so a cancel cannot clobber a status another writer already + moved to a terminal state. If the job has already completed (SUCCESS) or + otherwise left the cancellable states, the guarded UPDATE no-ops and we + leave the status untouched — but we still revoke the Celery task and run + the NATS/Redis teardown below, since those resources may need releasing + regardless of the final status. """ - self.status = JobState.CANCELING - self.save() + # CANCELING is a transient marker: only advance into it from a still-active + # state, never from an already-terminal one (don't un-finish a finished job). + canceling = self._guarded_status_update( + JobState.CANCELING, + from_statuses=JobState.finalizable_states(), + ) + if canceling: + self.save(update_fields=["progress", "updated_at"]) if self.task_id: task = run_job.AsyncResult(self.task_id) @@ -1188,15 +1202,64 @@ def cancel(self): task.revoke(terminate=True) if self.dispatch_mode == JobDispatchMode.ASYNC_API: # For async jobs we need to set the status to revoked here since the task already - # finished (it only queues the images). - self.status = JobState.REVOKED - self.save() + # finished (it only queues the images). CANCELING is included in the from-set so + # this completes the cancel's own CANCELING → REVOKED progression. + revoked = self._guarded_status_update( + JobState.REVOKED, + # CANCELING included so this completes the cancel's own progression. + from_statuses=JobState.finalizable_states() + [JobState.CANCELING], + set_finished=True, + ) + if revoked: + self.save(update_fields=["progress", "finished_at", "updated_at"]) else: - self.status = JobState.REVOKED - self.save() + revoked = self._guarded_status_update( + JobState.REVOKED, + # CANCELING included so this completes the cancel's own progression. + from_statuses=JobState.finalizable_states() + [JobState.CANCELING], + set_finished=True, + ) + if revoked: + self.save(update_fields=["progress", "finished_at", "updated_at"]) cleanup_async_job_if_needed(self) + def _guarded_status_update(self, to_status, from_statuses, *, set_finished=False): + """Atomically move the job to ``to_status`` only if it is still in one of + ``from_statuses``. + + This is a statement-scope ``UPDATE`` (it acquires no transaction-length + row lock, so it does not reintroduce the contention that #1261 removed) + whose ``status__in`` precondition a stale read-modify-write elsewhere + cannot clobber: if a concurrent writer has already moved the job to a + state outside ``from_statuses`` (e.g. another worker, the reaper, or a + cancel marked it terminal), the UPDATE matches zero rows and this call + is a no-op. + + It is the single chokepoint for terminal status writes (issue #1337) and + mirrors the guard added to ``_update_job_progress`` in #1338. Callers are + responsible for persisting ``progress.summary.status`` into the JSONB + afterwards with a narrow ``save(update_fields=["progress", "updated_at"])``, + and only when this method reported that the transition fired. + + Returns the number of rows updated (0 or 1). On a successful transition + the in-memory instance is advanced to match (``status``, optionally + ``finished_at``, and ``progress.summary.status``) so the caller can save + the progress blob without re-reading. + """ + fields = {"status": to_status} + now = None + if set_finished: + now = datetime.datetime.now() # Naive local time, matching finished_at writes elsewhere. + fields["finished_at"] = now + updated = Job.objects.filter(pk=self.pk, status__in=from_statuses).update(**fields) + if updated: + self.status = to_status + if set_finished: + self.finished_at = now + self.progress.summary.status = to_status + return updated + def update_status(self, status=None, save=True): """ Update the status of the job based on the status of the celery task. diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index dbf57fa38..8c69ede4e 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -1404,7 +1404,21 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs): ) return - job.update_status(state) + # Route the terminal SUCCESS write through the guarded chokepoint (issue + # #1337): a stale task_postrun must not resurrect a job another writer (a + # cancel, the reaper, or _update_job_progress) already moved to a terminal + # state. Non-terminal celery states (STARTED, RETRY, ...) still flow through + # the dual-use update_status() unchanged. + if state == JobState.SUCCESS: + transitioned = job._guarded_status_update( + JobState.SUCCESS, + from_statuses=JobState.finalizable_states(), + set_finished=True, + ) + if transitioned: + job.save(update_fields=["progress", "finished_at", "updated_at"]) + else: + job.update_status(state) # Clean up async resources for revoked jobs if state == JobState.REVOKED: @@ -1432,11 +1446,21 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): ) return - job.update_status(JobState.FAILURE, save=False) + # Route the terminal FAILURE write through the guarded chokepoint (issue + # #1337): a run_job failure must not clobber a status another writer already + # moved to a terminal state (e.g. a job a cancel just REVOKED, or one the + # result handler already marked SUCCESS). If the guard no-ops we still log + # and run the teardown below. + transitioned = job._guarded_status_update( + JobState.FAILURE, + from_statuses=JobState.finalizable_states(), + set_finished=True, + ) job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}') - job.save() + if transitioned: + job.save(update_fields=["progress", "finished_at", "updated_at"]) # Clean up async resources for failed jobs cleanup_async_job_if_needed(job) diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 8dc99da29..760af1809 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1469,3 +1469,296 @@ def worker(): JobState.REVOKED.value, f"late results batch resurrected a REVOKED job to {self.job.status!r} (lost-update race)", ) + + +class TestTerminalTransitionChokepoint(TransactionTestCase): + """Regression tests for issue #1337, terminal-writer hardening. + + #1338 guarded the terminal write in ``_update_job_progress``. The other + terminal writers — ``Job.cancel()``, the ``task_postrun`` SUCCESS handler, + and the ``task_failure`` FAILURE handler — used to do an unguarded full-row + ``save()`` and could clobber a terminal status set by another writer. These + tests cover the sequential (non-interleaved) guarantees; the real concurrent + interleave for cancel-vs-complete is in ``TestCancelCompletionRace`` below. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Chokepoint Project") + self.pipeline = Pipeline.objects.create(name="CP Pipeline", slug="cp-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="CP Collection", project=self.project) + + def tearDown(self): + cache.clear() + + def _make_job(self, *, status=JobState.STARTED, task_id=None, dispatch_mode=JobDispatchMode.ASYNC_API) -> Job: + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Chokepoint Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=dispatch_mode, + status=status, + ) + if task_id is not None: + job.task_id = task_id + job.save(update_fields=["task_id"]) + return job + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_postrun_success_cannot_resurrect_revoked_job(self, _mock_cleanup): + """A late ``task_postrun`` SUCCESS must not flip a REVOKED job to SUCCESS.""" + from ami.jobs.tasks import update_job_status + + job = self._make_job(status=JobState.STARTED, task_id="postrun-revoked-task") + # Drive all stages complete so the postrun SUCCESS guard (is_complete()) + # does not defer, exercising the terminal write path itself. + for stage in ("collect", "process", "results"): + _update_job_progress(job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS) + # Another writer (a cancel / the reaper) marks the job terminal first. + Job.objects.filter(pk=job.pk).update(status=JobState.REVOKED) + + task = MagicMock() + task.request.kwargs = {"job_id": job.pk} + update_job_status(sender=None, task_id="postrun-revoked-task", task=task, state=JobState.SUCCESS) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_postrun_success_fires_from_active_state(self, _mock_cleanup): + """The happy path still transitions an active job to SUCCESS with finished_at.""" + from ami.jobs.tasks import update_job_status + + job = self._make_job(status=JobState.STARTED, task_id="postrun-success-task") + for stage in ("collect", "process", "results"): + _update_job_progress(job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS) + # Reset the column to a pre-terminal state so we can observe the postrun + # handler making the SUCCESS transition itself. + Job.objects.filter(pk=job.pk).update(status=JobState.STARTED, finished_at=None) + + task = MagicMock() + task.request.kwargs = {"job_id": job.pk} + update_job_status(sender=None, task_id="postrun-success-task", task=task, state=JobState.SUCCESS) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) + self.assertIsNotNone(job.finished_at) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_task_failure_cannot_resurrect_revoked_job(self, _mock_cleanup): + """A late ``task_failure`` FAILURE must not clobber an already-REVOKED job.""" + from ami.jobs.tasks import update_job_failure + + # SYNC_API so the in-flight async deferral guard does not short-circuit; + # we want to exercise the terminal FAILURE write itself. + job = self._make_job( + status=JobState.STARTED, task_id="failure-revoked-task", dispatch_mode=JobDispatchMode.SYNC_API + ) + Job.objects.filter(pk=job.pk).update(status=JobState.REVOKED) + + update_job_failure(sender=None, task_id="failure-revoked-task", exception=RuntimeError("late crash")) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + def test_cancel_of_already_success_job_is_noop_but_still_cleans_up(self, mock_cleanup): + """cancel() on a SUCCESS job leaves the status SUCCESS but still runs teardown. + + A job can complete (result handler sets SUCCESS) before an operator's + cancel lands. The guarded UPDATEs no-op (SUCCESS is outside their + from-sets), so the status is not regressed to CANCELING/REVOKED — but the + NATS/Redis teardown must still run, since a cancel is also the operator's + request to release any lingering async resources. + """ + job = self._make_job(status=JobState.SUCCESS, dispatch_mode=JobDispatchMode.ASYNC_API) + + job.cancel() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.SUCCESS.value) + mock_cleanup.assert_called_once() + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + def test_cancel_of_active_async_job_revokes(self, mock_cleanup): + """cancel() on an active ASYNC_API job (no celery task) transitions to REVOKED.""" + job = self._make_job(status=JobState.STARTED, dispatch_mode=JobDispatchMode.ASYNC_API) + + job.cancel() + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED.value) + self.assertIsNotNone(job.finished_at) + # Both sources of truth agree on the terminal state. + self.assertEqual(job.progress.summary.status, JobState.REVOKED) + mock_cleanup.assert_called_once() + + +class TestCancelCompletionRace(TransactionTestCase): + """Force the real concurrent interleave between cancel() and a completing + result batch (issue #1337). Mirrors ``TestConcurrentStatusRace``: a seam + parks one path mid-flight while the other commits its guarded UPDATE. + + The invariant: whichever path commits its guarded terminal UPDATE first + wins, and the loser's guard no-ops rather than clobbering with a full-row + save. A SUCCESS is never overwritten to REVOKED by a stale cancel, and a + REVOKED is never resurrected to SUCCESS by a late completion. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Cancel Race Project") + self.pipeline = Pipeline.objects.create(name="CRR Pipeline", slug="crr-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="CRR Collection", project=self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Cancel Race Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, + ) + + def tearDown(self): + cache.clear() + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_committing_first_is_not_overwritten_by_stale_cancel(self, _mock_t_cleanup, _mock_m_cleanup): + """A cancel that read STARTED, then parked, must not regress a status the + result handler committed to SUCCESS while the cancel was parked.""" + # Bring the job to the brink of completion: collect + process done. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + cancel_reached_seam = threading.Event() + completion_committed = threading.Event() + + # Park the cancel right before it issues its first guarded UPDATE. By + # patching run_job.AsyncResult (called between the CANCELING write and the + # REVOKED write) we can't seam cleanly; instead seam on _guarded_status_update + # itself for the CANCELING transition. + real_guarded = Job._guarded_status_update + + def seamed_guarded(self_job, to_status, *args, **kwargs): + if to_status == JobState.CANCELING: + cancel_reached_seam.set() + if not completion_committed.wait(timeout=10): + raise AssertionError("completion did not commit within timeout") + return real_guarded(self_job, to_status, *args, **kwargs) + + cancel_error: list[BaseException] = [] + + def canceller(): + from django.db import connection + + try: + fresh = Job.objects.get(pk=self.job.pk) # reads STARTED + with patch.object(Job, "_guarded_status_update", autospec=True, side_effect=seamed_guarded): + fresh.cancel() + except BaseException as exc: # noqa: BLE001 + cancel_error.append(exc) + finally: + connection.close() + + cancel_thread = threading.Thread(target=canceller, name="canceller") + cancel_thread.start() + + try: + if not cancel_reached_seam.wait(timeout=10): + raise AssertionError("cancel never reached the seam") + # The result handler completes the job (commits SUCCESS) while the + # cancel is parked holding its stale STARTED read. + _update_job_progress( + self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS + ) + completion_committed.set() + finally: + cancel_thread.join(timeout=15) + + if cancel_thread.is_alive(): + raise AssertionError("cancel thread deadlocked") + if cancel_error: + raise cancel_error[0] + + self.job.refresh_from_db() + # SUCCESS committed first; the cancel's guarded CANCELING/REVOKED UPDATEs + # no-op against the now-terminal SUCCESS, so the completion is preserved. + self.assertEqual( + self.job.status, + JobState.SUCCESS.value, + f"stale cancel clobbered a committed SUCCESS to {self.job.status!r}", + ) + # The progress blob was not full-row clobbered: it is recorded complete. + self.assertTrue(self.job.progress.is_complete()) + + @patch("ami.jobs.models.cleanup_async_job_if_needed") + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_cancel_committing_first_is_not_resurrected_by_late_completion(self, _mock_t_cleanup, _mock_m_cleanup): + """A cancel that commits REVOKED must not be resurrected to SUCCESS by a + result batch that read STARTED before the cancel committed.""" + # Bring the job to the brink of completion. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + worker_reached_seam = threading.Event() + cancel_committed = threading.Event() + + real_get_counts = _update_job_progress.__globals__["_get_current_counts_from_job_progress"] + + def blocking_get_counts(job, stage): + counts = real_get_counts(job, stage) + worker_reached_seam.set() + if not cancel_committed.wait(timeout=10): + raise AssertionError("cancel did not commit within timeout") + return counts + + worker_error: list[BaseException] = [] + + def worker(): + from django.db import connection + + try: + with patch( + "ami.jobs.tasks._get_current_counts_from_job_progress", + side_effect=blocking_get_counts, + ): + _update_job_progress( + self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS + ) + except BaseException as exc: # noqa: BLE001 + worker_error.append(exc) + finally: + connection.close() + + worker_thread = threading.Thread(target=worker, name="late-results-worker") + worker_thread.start() + + try: + if not worker_reached_seam.wait(timeout=10): + raise AssertionError("worker never reached the seam") + # The cancel commits REVOKED while the result worker is parked. + fresh = Job.objects.get(pk=self.job.pk) + fresh.cancel() + cancel_committed.set() + finally: + worker_thread.join(timeout=15) + + if worker_thread.is_alive(): + raise AssertionError("worker thread deadlocked") + if worker_error: + raise worker_error[0] + + self.job.refresh_from_db() + # REVOKED committed first; the late completion's guarded UPDATE no-ops, so + # the job is not resurrected to SUCCESS. + self.assertEqual( + self.job.status, + JobState.REVOKED.value, + f"late completion resurrected a REVOKED job to {self.job.status!r}", + ) From f112e6d71d2358d14421c666d1f3bb162a62518e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 19 Jun 2026 18:01:45 -0700 Subject: [PATCH 2/4] refactor(jobs): correct chokepoint claim and fold async skip-terminate into cancel - The reaper (check_stale_jobs) is a 6th terminal-status writer, lock-based, not routed through _guarded_status_update. Correct the docstring's false 'single chokepoint' claim: this helper is the chokepoint for the lock-free writers; _fail_job and the reaper enforce the same no-resurrect invariant under select_for_update (the reaper deliberately keeps a broader from-set so it can still force a stuck CANCELING/UNKNOWN job terminal as last resort). - Fold the one useful change from #1324: cancel() of an ASYNC_API job now revokes the local run_job WITHOUT terminate. That task only queues images and has usually finished; the remote ADC work is stopped by the NATS/Redis teardown, not by SIGTERM-ing the bootstrap. Sync/internal jobs still terminate. Refs #1337. Supersedes the cancel rewrite in #1324. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/models.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index c19ca56a0..7559a352f 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1198,20 +1198,29 @@ def cancel(self): if self.task_id: task = run_job.AsyncResult(self.task_id) - if task: - task.revoke(terminate=True) if self.dispatch_mode == JobDispatchMode.ASYNC_API: - # For async jobs we need to set the status to revoked here since the task already - # finished (it only queues the images). CANCELING is included in the from-set so - # this completes the cancel's own CANCELING → REVOKED progression. + # The local run_job only queues images and has almost always + # finished by now, so terminating it does nothing about the work + # running on the remote ADC workers — that is stopped by tearing + # down the NATS stream + Redis state in cleanup below. Revoke + # without terminate so we never SIGTERM the bootstrap mid-flight on + # the rare occasion it is still running. (folds antenna#1324) + if task: + task.revoke() + # Set the status to revoked here. CANCELING is included in the + # from-set so this completes the cancel's own CANCELING → REVOKED + # progression. revoked = self._guarded_status_update( JobState.REVOKED, - # CANCELING included so this completes the cancel's own progression. from_statuses=JobState.finalizable_states() + [JobState.CANCELING], set_finished=True, ) if revoked: self.save(update_fields=["progress", "finished_at", "updated_at"]) + elif task: + # Sync / internal jobs do the actual work inside the celery task, + # so terminating it is the cancel mechanism. + task.revoke(terminate=True) else: revoked = self._guarded_status_update( JobState.REVOKED, @@ -1236,11 +1245,20 @@ def _guarded_status_update(self, to_status, from_statuses, *, set_finished=False cancel marked it terminal), the UPDATE matches zero rows and this call is a no-op. - It is the single chokepoint for terminal status writes (issue #1337) and - mirrors the guard added to ``_update_job_progress`` in #1338. Callers are - responsible for persisting ``progress.summary.status`` into the JSONB - afterwards with a narrow ``save(update_fields=["progress", "updated_at"])``, - and only when this method reported that the transition fired. + It is the chokepoint for the *lock-free* terminal writers — this method, + the result handler ``_update_job_progress`` (#1338), and the celery + task-signal handlers — which is why each carries the ``status__in`` + precondition. The other two terminal writers take a row lock instead and + enforce the same no-resurrect invariant that way: ``_fail_job`` and the + stale-job reaper (``check_stale_jobs``) both run under + ``select_for_update`` and check for a terminal/CANCELING status before + writing (the reaper deliberately keeps a broader from-set so it can still + force a genuinely stuck job out of CANCELING/UNKNOWN — that is its role as + last resort). See issue #1337. + + Callers are responsible for persisting ``progress.summary.status`` into + the JSONB afterwards with a narrow ``save(update_fields=["progress", + "updated_at"])``, and only when this method reported the transition fired. Returns the number of rows updated (0 or 1). On a successful transition the in-memory instance is advanced to match (``status``, optionally From f3cf90a5f9b58a6f1b030eef1eca2af923f9e677 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 11:55:30 -0700 Subject: [PATCH 3/4] docs(jobs): clarify guarded-write comments per review Fix the double '#' in the issue reference and reword the task_postrun / task_failure comments in plainer, team-readable terms (drop session shorthand): describe the guarded write as only transitioning a job still in a non-terminal state. --- ami/jobs/tasks.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 8c69ede4e..b00213ebb 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -1404,11 +1404,11 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs): ) return - # Route the terminal SUCCESS write through the guarded chokepoint (issue - # #1337): a stale task_postrun must not resurrect a job another writer (a - # cancel, the reaper, or _update_job_progress) already moved to a terminal - # state. Non-terminal celery states (STARTED, RETRY, ...) still flow through - # the dual-use update_status() unchanged. + # Write the terminal SUCCESS through the guarded helper (see #1337): it only + # transitions a job that is still in a non-terminal state, so a slow + # task_postrun cannot revive a job that a cancel, the stale-job reaper, or the + # result handler already finished. Non-terminal Celery states (STARTED, + # RETRY, ...) still go through update_status() unchanged. if state == JobState.SUCCESS: transitioned = job._guarded_status_update( JobState.SUCCESS, @@ -1446,11 +1446,11 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs): ) return - # Route the terminal FAILURE write through the guarded chokepoint (issue - # #1337): a run_job failure must not clobber a status another writer already - # moved to a terminal state (e.g. a job a cancel just REVOKED, or one the - # result handler already marked SUCCESS). If the guard no-ops we still log - # and run the teardown below. + # Write the terminal FAILURE through the guarded helper (see #1337): it only + # transitions a job that is still in a non-terminal state, so a run_job + # failure cannot overwrite a status another writer already finished (e.g. a + # job a cancel just REVOKED, or one the result handler marked SUCCESS). If the + # guard makes no change we still log the error and run the teardown below. transitioned = job._guarded_status_update( JobState.FAILURE, from_statuses=JobState.finalizable_states(), From 51b8ec71c9730059aa4d15032f5b977ca0627be1 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 22 Jun 2026 16:30:17 -0700 Subject: [PATCH 4/4] docs(jobs): lead _guarded_status_update docstring with the user-facing effect Rewrite the helper's docstring to open with what it prevents (concurrent, stale status writes flipping a finished job back to running or a cancelled/failed job to SUCCESS, plus the knock-on bugs since job status drives claimability, cleanup, and the UI) before the implementation, and in plainer prose. --- ami/jobs/models.py | 58 ++++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 7559a352f..e662707f4 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1234,31 +1234,39 @@ def cancel(self): cleanup_async_job_if_needed(self) def _guarded_status_update(self, to_status, from_statuses, *, set_finished=False): - """Atomically move the job to ``to_status`` only if it is still in one of - ``from_statuses``. - - This is a statement-scope ``UPDATE`` (it acquires no transaction-length - row lock, so it does not reintroduce the contention that #1261 removed) - whose ``status__in`` precondition a stale read-modify-write elsewhere - cannot clobber: if a concurrent writer has already moved the job to a - state outside ``from_statuses`` (e.g. another worker, the reaper, or a - cancel marked it terminal), the UPDATE matches zero rows and this call - is a no-op. - - It is the chokepoint for the *lock-free* terminal writers — this method, - the result handler ``_update_job_progress`` (#1338), and the celery - task-signal handlers — which is why each carries the ``status__in`` - precondition. The other two terminal writers take a row lock instead and - enforce the same no-resurrect invariant that way: ``_fail_job`` and the - stale-job reaper (``check_stale_jobs``) both run under - ``select_for_update`` and check for a terminal/CANCELING status before - writing (the reaper deliberately keeps a broader from-set so it can still - force a genuinely stuck job out of CANCELING/UNKNOWN — that is its role as - last resort). See issue #1337. - - Callers are responsible for persisting ``progress.summary.status`` into - the JSONB afterwards with a narrow ``save(update_fields=["progress", - "updated_at"])``, and only when this method reported the transition fired. + """Move the job to ``to_status`` only if it is still in one of + ``from_statuses``, in a single atomic step. + + Why this exists: a job's status is written from several places, and for + async (pull-based) jobs the result batches that drive those writes arrive + concurrently. Without a guard each writer reads the whole job row, edits + it, and writes it back, so a slower writer working from a stale copy can + overwrite a status another writer already advanced. That made a finished + job look like it was running again, or flipped a cancelled/failed job to + SUCCESS. Because a lot of behaviour keys off job status — whether the job + is still handed out for more work, whether its resources get cleaned up, + what the UI shows — those wrong statuses were not cosmetic; they caused + knock-on bugs. See issue #1337. + + The guard is a single conditional ``UPDATE ... WHERE status IN + (from_statuses)``. It holds no transaction-length row lock (so it does not + reintroduce the contention #1261 removed): if a concurrent writer has + already moved the job out of ``from_statuses``, the UPDATE matches zero + rows and this call is a no-op, leaving that writer's status intact. + + The lock-free status writers all go through this method — it, the result + handler ``_update_job_progress`` (#1338), and the two Celery task-signal + handlers — which is why each passes a ``from_statuses`` precondition. The + remaining two writers enforce the same "don't revive a finished job" rule + under a row lock instead: ``_fail_job`` and the stale-job reaper + (``check_stale_jobs``) run under ``select_for_update`` and check the + current status before writing. The reaper deliberately keeps a broader + from-set so it can still force a genuinely stuck job out of + CANCELING/UNKNOWN — that is its role as the last resort. + + Caller contract: persist ``progress.summary.status`` into the JSONB + afterwards with a narrow ``save(update_fields=["progress", "updated_at"])``, + and only when this method reports the transition fired. Returns the number of rows updated (0 or 1). On a successful transition the in-memory instance is advanced to match (``status``, optionally