Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 91 additions & 10 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,25 +1178,106 @@ def cancel(self):
Cancel a job. For async_api jobs, clean up NATS/Redis resources
and transition through CANCELING → REVOKED. For other jobs,
revoke the Celery task.

The CANCELING and REVOKED writes go through the guarded chokepoint
(issue #1337) so a cancel cannot clobber a status another writer already
moved to a terminal state. If the job has already completed (SUCCESS) or
otherwise left the cancellable states, the guarded UPDATE no-ops and we
leave the status untouched — but we still revoke the Celery task and run
the NATS/Redis teardown below, since those resources may need releasing
regardless of the final status.
"""
self.status = JobState.CANCELING
self.save()
# CANCELING is a transient marker: only advance into it from a still-active
# state, never from an already-terminal one (don't un-finish a finished job).
canceling = self._guarded_status_update(
JobState.CANCELING,
from_statuses=JobState.finalizable_states(),
)
if canceling:
self.save(update_fields=["progress", "updated_at"])

if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
if self.dispatch_mode == JobDispatchMode.ASYNC_API:
# For async jobs we need to set the status to revoked here since the task already
# finished (it only queues the images).
self.status = JobState.REVOKED
self.save()
# The local run_job only queues images and has almost always
# finished by now, so terminating it does nothing about the work
# running on the remote ADC workers — that is stopped by tearing
# down the NATS stream + Redis state in cleanup below. Revoke
# without terminate so we never SIGTERM the bootstrap mid-flight on
# the rare occasion it is still running. (folds antenna#1324)
if task:
task.revoke()
# Set the status to revoked here. CANCELING is included in the
# from-set so this completes the cancel's own CANCELING → REVOKED
# progression.
revoked = self._guarded_status_update(
JobState.REVOKED,
from_statuses=JobState.finalizable_states() + [JobState.CANCELING],
set_finished=True,
)
if revoked:
self.save(update_fields=["progress", "finished_at", "updated_at"])
elif task:
# Sync / internal jobs do the actual work inside the celery task,
# so terminating it is the cancel mechanism.
task.revoke(terminate=True)
else:
self.status = JobState.REVOKED
self.save()
revoked = self._guarded_status_update(
JobState.REVOKED,
# CANCELING included so this completes the cancel's own progression.
from_statuses=JobState.finalizable_states() + [JobState.CANCELING],
set_finished=True,
)
if revoked:
self.save(update_fields=["progress", "finished_at", "updated_at"])

cleanup_async_job_if_needed(self)

def _guarded_status_update(self, to_status, from_statuses, *, set_finished=False):
"""Atomically move the job to ``to_status`` only if it is still in one of
``from_statuses``.

This is a statement-scope ``UPDATE`` (it acquires no transaction-length
row lock, so it does not reintroduce the contention that #1261 removed)
whose ``status__in`` precondition a stale read-modify-write elsewhere
cannot clobber: if a concurrent writer has already moved the job to a
state outside ``from_statuses`` (e.g. another worker, the reaper, or a
cancel marked it terminal), the UPDATE matches zero rows and this call
is a no-op.

It is the chokepoint for the *lock-free* terminal writers — this method,
the result handler ``_update_job_progress`` (#1338), and the celery
task-signal handlers — which is why each carries the ``status__in``
precondition. The other two terminal writers take a row lock instead and
enforce the same no-resurrect invariant that way: ``_fail_job`` and the
stale-job reaper (``check_stale_jobs``) both run under
``select_for_update`` and check for a terminal/CANCELING status before
writing (the reaper deliberately keeps a broader from-set so it can still
force a genuinely stuck job out of CANCELING/UNKNOWN — that is its role as
last resort). See issue #1337.

Callers are responsible for persisting ``progress.summary.status`` into
the JSONB afterwards with a narrow ``save(update_fields=["progress",
"updated_at"])``, and only when this method reported the transition fired.

Returns the number of rows updated (0 or 1). On a successful transition
the in-memory instance is advanced to match (``status``, optionally
``finished_at``, and ``progress.summary.status``) so the caller can save
the progress blob without re-reading.
"""
fields = {"status": to_status}
now = None
if set_finished:
now = datetime.datetime.now() # Naive local time, matching finished_at writes elsewhere.
fields["finished_at"] = now
updated = Job.objects.filter(pk=self.pk, status__in=from_statuses).update(**fields)
if updated:
self.status = to_status
if set_finished:
self.finished_at = now
self.progress.summary.status = to_status
return updated

def update_status(self, status=None, save=True):
"""
Update the status of the job based on the status of the celery task.
Expand Down
30 changes: 27 additions & 3 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,21 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs):
)
return

job.update_status(state)
# Route the terminal SUCCESS write through the guarded chokepoint (issue
# #1337): a stale task_postrun must not resurrect a job another writer (a
# cancel, the reaper, or _update_job_progress) already moved to a terminal
# state. Non-terminal celery states (STARTED, RETRY, ...) still flow through
# the dual-use update_status() unchanged.
Comment thread
mihow marked this conversation as resolved.
Outdated
if state == JobState.SUCCESS:
transitioned = job._guarded_status_update(
JobState.SUCCESS,
from_statuses=JobState.finalizable_states(),
set_finished=True,
)
if transitioned:
job.save(update_fields=["progress", "finished_at", "updated_at"])
else:
job.update_status(state)

# Clean up async resources for revoked jobs
if state == JobState.REVOKED:
Expand Down Expand Up @@ -1432,11 +1446,21 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs):
)
return

job.update_status(JobState.FAILURE, save=False)
# Route the terminal FAILURE write through the guarded chokepoint (issue
# #1337): a run_job failure must not clobber a status another writer already
# moved to a terminal state (e.g. a job a cancel just REVOKED, or one the
# result handler already marked SUCCESS). If the guard no-ops we still log
# and run the teardown below.
Comment thread
mihow marked this conversation as resolved.
Outdated
transitioned = job._guarded_status_update(
JobState.FAILURE,
from_statuses=JobState.finalizable_states(),
set_finished=True,
)

job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}')

job.save()
if transitioned:
job.save(update_fields=["progress", "finished_at", "updated_at"])

# Clean up async resources for failed jobs
cleanup_async_job_if_needed(job)
Expand Down
Loading
Loading