Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 99 additions & 10 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,25 +1178,114 @@ def cancel(self):
Cancel a job. For async_api jobs, clean up NATS/Redis resources
and transition through CANCELING → REVOKED. For other jobs,
revoke the Celery task.

The CANCELING and REVOKED writes go through the guarded chokepoint
(issue #1337) so a cancel cannot clobber a status another writer already
moved to a terminal state. If the job has already completed (SUCCESS) or
otherwise left the cancellable states, the guarded UPDATE no-ops and we
leave the status untouched — but we still revoke the Celery task and run
the NATS/Redis teardown below, since those resources may need releasing
regardless of the final status.
"""
self.status = JobState.CANCELING
self.save()
# CANCELING is a transient marker: only advance into it from a still-active
# state, never from an already-terminal one (don't un-finish a finished job).
canceling = self._guarded_status_update(
JobState.CANCELING,
from_statuses=JobState.finalizable_states(),
)
if canceling:
self.save(update_fields=["progress", "updated_at"])

if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
if self.dispatch_mode == JobDispatchMode.ASYNC_API:
# For async jobs we need to set the status to revoked here since the task already
# finished (it only queues the images).
self.status = JobState.REVOKED
self.save()
# The local run_job only queues images and has almost always
# finished by now, so terminating it does nothing about the work
# running on the remote ADC workers — that is stopped by tearing
# down the NATS stream + Redis state in cleanup below. Revoke
# without terminate so we never SIGTERM the bootstrap mid-flight on
# the rare occasion it is still running. (folds antenna#1324)
if task:
task.revoke()
# Set the status to revoked here. CANCELING is included in the
# from-set so this completes the cancel's own CANCELING → REVOKED
# progression.
revoked = self._guarded_status_update(
JobState.REVOKED,
from_statuses=JobState.finalizable_states() + [JobState.CANCELING],
set_finished=True,
)
if revoked:
self.save(update_fields=["progress", "finished_at", "updated_at"])
elif task:
# Sync / internal jobs do the actual work inside the celery task,
# so terminating it is the cancel mechanism.
task.revoke(terminate=True)
else:
self.status = JobState.REVOKED
self.save()
revoked = self._guarded_status_update(
JobState.REVOKED,
# CANCELING included so this completes the cancel's own progression.
from_statuses=JobState.finalizable_states() + [JobState.CANCELING],
set_finished=True,
)
if revoked:
self.save(update_fields=["progress", "finished_at", "updated_at"])

cleanup_async_job_if_needed(self)

def _guarded_status_update(self, to_status, from_statuses, *, set_finished=False):
"""Move the job to ``to_status`` only if it is still in one of
``from_statuses``, in a single atomic step.

Why this exists: a job's status is written from several places, and for
async (pull-based) jobs the result batches that drive those writes arrive
concurrently. Without a guard each writer reads the whole job row, edits
it, and writes it back, so a slower writer working from a stale copy can
overwrite a status another writer already advanced. That made a finished
job look like it was running again, or flipped a cancelled/failed job to
SUCCESS. Because a lot of behaviour keys off job status — whether the job
is still handed out for more work, whether its resources get cleaned up,
what the UI shows — those wrong statuses were not cosmetic; they caused
knock-on bugs. See issue #1337.

The guard is a single conditional ``UPDATE ... WHERE status IN
(from_statuses)``. It holds no transaction-length row lock (so it does not
reintroduce the contention #1261 removed): if a concurrent writer has
already moved the job out of ``from_statuses``, the UPDATE matches zero
rows and this call is a no-op, leaving that writer's status intact.

The lock-free status writers all go through this method — it, the result
handler ``_update_job_progress`` (#1338), and the two Celery task-signal
handlers — which is why each passes a ``from_statuses`` precondition. The
remaining two writers enforce the same "don't revive a finished job" rule
under a row lock instead: ``_fail_job`` and the stale-job reaper
(``check_stale_jobs``) run under ``select_for_update`` and check the
current status before writing. The reaper deliberately keeps a broader
from-set so it can still force a genuinely stuck job out of
CANCELING/UNKNOWN — that is its role as the last resort.

Caller contract: persist ``progress.summary.status`` into the JSONB
afterwards with a narrow ``save(update_fields=["progress", "updated_at"])``,
and only when this method reports the transition fired.

Returns the number of rows updated (0 or 1). On a successful transition
the in-memory instance is advanced to match (``status``, optionally
``finished_at``, and ``progress.summary.status``) so the caller can save
the progress blob without re-reading.
"""
fields = {"status": to_status}
now = None
if set_finished:
now = datetime.datetime.now() # Naive local time, matching finished_at writes elsewhere.
fields["finished_at"] = now
updated = Job.objects.filter(pk=self.pk, status__in=from_statuses).update(**fields)
if updated:
self.status = to_status
if set_finished:
self.finished_at = now
self.progress.summary.status = to_status
return updated

def update_status(self, status=None, save=True):
"""
Update the status of the job based on the status of the celery task.
Expand Down
30 changes: 27 additions & 3 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,21 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs):
)
return

job.update_status(state)
# Write the terminal SUCCESS through the guarded helper (see #1337): it only
# transitions a job that is still in a non-terminal state, so a slow
# task_postrun cannot revive a job that a cancel, the stale-job reaper, or the
# result handler already finished. Non-terminal Celery states (STARTED,
# RETRY, ...) still go through update_status() unchanged.
if state == JobState.SUCCESS:
transitioned = job._guarded_status_update(
JobState.SUCCESS,
from_statuses=JobState.finalizable_states(),
set_finished=True,
)
if transitioned:
job.save(update_fields=["progress", "finished_at", "updated_at"])
else:
job.update_status(state)

# Clean up async resources for revoked jobs
if state == JobState.REVOKED:
Expand Down Expand Up @@ -1432,11 +1446,21 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs):
)
return

job.update_status(JobState.FAILURE, save=False)
# Write the terminal FAILURE through the guarded helper (see #1337): it only
# transitions a job that is still in a non-terminal state, so a run_job
# failure cannot overwrite a status another writer already finished (e.g. a
# job a cancel just REVOKED, or one the result handler marked SUCCESS). If the
# guard makes no change we still log the error and run the teardown below.
transitioned = job._guarded_status_update(
JobState.FAILURE,
from_statuses=JobState.finalizable_states(),
set_finished=True,
)

job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}')

job.save()
if transitioned:
job.save(update_fields=["progress", "finished_at", "updated_at"])

# Clean up async resources for failed jobs
cleanup_async_job_if_needed(job)
Expand Down
Loading
Loading