Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 90 additions & 28 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,10 +975,15 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[
healthy. Default cutoff is :attr:`Job.STALLED_JOBS_MAX_MINUTES`.

For each stale job, checks Celery for a terminal task status. REVOKED is
always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted
when job.progress.is_complete() — NATS workers may still be delivering
results after the Celery task finishes. All other cases result in revocation.
Async resources (NATS/Redis) are cleaned up in both branches.
always trusted. For async_api jobs, only SUCCESS is fast-pathed to a
terminal status, and only when AsyncJobStateManager.all_tasks_processed()
reports True (i.e. the Redis pending sets are drained); when Redis state is
unavailable it falls back to job.progress.is_complete(). Celery FAILURE is
not trusted for async_api jobs — update_job_failure() defers the terminal
outcome to the async result handler (FAILURE_THRESHOLD logic), so a stale
FAILED async_api job is revoked rather than forced to FAILURE. All other
cases result in revocation. Async resources (NATS/Redis) are cleaned up in
both branches.

Returns a list of dicts describing what was done to each job.
"""
Expand All @@ -1003,6 +1008,76 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[

results = []
for pk in stale_pks:
# Phase 1 — gather external Celery/Redis state WITHOUT holding the row
# lock. Both the Celery result lookup and the Redis completeness check
# are network round-trips; doing them under select_for_update() would
# extend the lock hold time and block the lock-free result handler that
# also writes this row (its guarded UPDATE since #1261). We re-check and
# write under a short lock in Phase 2 below.
try:
job = Job.objects.get(
pk=pk,
status__in=JobState.running_states(),
updated_at__lt=cutoff,
)
except Job.DoesNotExist:
# Another concurrent run already handled this job, or it progressed.
continue

celery_state = None
if job.task_id:
try:
celery_state = AsyncResult(job.task_id).state
except Exception:
logger.warning(
"Failed to fetch Celery state for stale job %s (task_id=%s)",
job.pk,
job.task_id,
exc_info=True,
)
# Treat as unknown state — job will be revoked below.

# Only trust terminal Celery states. For async_api jobs, a SUCCESS
# Celery state is only accepted when all NATS tasks are processed —
# workers may still be delivering results after the Celery task
# finishes. Consult Redis (source of truth for SREM completeness)
# directly rather than Job.progress.is_complete(), which mirrors a
# JSONB blob racy under concurrent _update_job_progress writes since
# #1261.
#
# Celery FAILURE is deliberately NOT fast-pathed to a terminal
# status here: update_job_failure() defers post-queue run_job
# failures for async_api jobs to the async result handler, which
# decides the terminal outcome from the final processed/failed
# counts against FAILURE_THRESHOLD (a drained-but-failed Celery task
# can still resolve to SUCCESS). Trusting Celery FAILURE here would
# force the job to FAILURE and bypass that threshold logic, so a
# stale async_api job whose Celery task ended FAILURE falls through
# to the revoke branch instead.
is_terminal = celery_state in states.READY_STATES
is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API
if is_async_api and celery_state == states.SUCCESS:
processed = AsyncJobStateManager(job.pk).all_tasks_processed()
if processed is False:
is_terminal = False
elif processed is None:
logger.warning(
"Reaper for job %s: Redis state unavailable, falling back to " "progress.is_complete()",
job.pk,
)
if not job.progress.is_complete():
is_terminal = False
# processed is True -> trust Celery SUCCESS
elif is_async_api and celery_state == states.FAILURE:
# Don't treat Celery FAILURE as authoritative for async_api jobs
# (see comment above); revoke instead of forcing FAILURE.
is_terminal = False

# Phase 2 — short locked transaction. Re-fetch under select_for_update()
# with the same staleness predicate: if the job progressed or was
# finalized while we were off the lock doing Celery/Redis I/O, the row no
# longer matches and we skip it. Otherwise the Phase 1 decision still
# holds and we persist it. No network I/O happens under this lock.
with transaction.atomic():
try:
job = Job.objects.select_for_update().get(
Expand All @@ -1011,36 +1086,20 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[
updated_at__lt=cutoff,
)
except Job.DoesNotExist:
# Another concurrent run already handled this job.
continue

celery_state = None
if job.task_id:
try:
celery_state = AsyncResult(job.task_id).state
except Exception:
logger.warning(
"Failed to fetch Celery state for stale job %s (task_id=%s)",
job.pk,
job.task_id,
exc_info=True,
)
# Treat as unknown state — job will be revoked below.

# Only trust terminal Celery states. For async_api jobs, SUCCESS and
# FAILURE are only accepted when progress is complete — NATS workers may
# still be delivering results after the Celery task finishes.
is_terminal = celery_state in states.READY_STATES
is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API
if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete():
is_terminal = False

previous_status = job.status
if is_terminal:
if not dry_run:
job.update_status(celery_state, save=False)
job.finished_at = datetime.datetime.now()
job.save()
# Narrow the write to the fields the reaper actually mutates.
# A full save() would re-write the whole row from the snapshot
# fetched at select_for_update() time, clobbering `logs` and
# `progress.errors` that a concurrent _update_job_progress (no
# row lock since #1261) may commit. update_status() touches
# status + progress.summary.status, so `progress` is included.
job.save(update_fields=["status", "progress", "finished_at", "updated_at"])
else:
# Per-job diagnostic: surface enough state at revoke time that an
# operator can answer "why was this stalled?" without grepping
Expand All @@ -1062,7 +1121,10 @@ def check_stale_jobs(minutes: int | None = None, dry_run: bool = False) -> list[
if not dry_run:
job.update_status(JobState.REVOKED, save=False)
job.finished_at = datetime.datetime.now()
job.save()
# See note on the terminal branch above: narrow the write so
# the reaper doesn't clobber a concurrent progress writer's
# `logs` / `progress.errors`.
job.save(update_fields=["status", "progress", "finished_at", "updated_at"])

# Async resource cleanup runs outside the transaction — it makes network
# calls (NATS/Redis) that should not hold the DB row lock.
Expand Down
Loading
Loading