Skip to content

fix(jobs): fix dangling jobs from going to revoked#1276

Closed
mihow wants to merge 7 commits into
mainfrom
fix/job-complete-check
Closed

fix(jobs): fix dangling jobs from going to revoked#1276
mihow wants to merge 7 commits into
mainfrom
fix/job-complete-check

Conversation

@mihow

@mihow mihow commented Apr 29, 2026

Copy link
Copy Markdown
Collaborator

Summary

The stale-job reaper (check_stale_jobs) is the safety net that finalizes jobs stuck in a running state past the cutoff. For async (pull-based) ML jobs it was making two unsafe calls: it trusted a Celery task's terminal state too readily, and it judged completeness from Job.progress.is_complete() — a JSONB blob that has been racy under concurrent result writes since #1261 dropped the row lock. The visible symptom was healthy async jobs being REVOKED even though their work had actually finished (NATS and Redis fully drained).

This PR makes the reaper consult the real source of truth (Redis pending-set completeness) before accepting a success, stops it from forcing a terminal FAILURE that the result handler is supposed to decide, and stops its writes from clobbering fields a concurrent writer owns. It also keeps the reaper's network calls off the database row lock so it doesn't contend with the live result handler.

List of Changes

# Change (effect) How
1 A stalled async job whose work is actually done is finalized as SUCCESS instead of being wrongly revoked. For async_api jobs the reaper accepts a Celery SUCCESS only when AsyncJobStateManager.all_tasks_processed() reports the Redis pending sets are drained, instead of reading the racy progress.is_complete(). When Redis state is unavailable it falls back to progress.is_complete() and logs a warning.
2 A stalled async job whose Celery task ended in FAILURE is revoked rather than force-failed. Celery FAILURE is no longer fast-pathed to a terminal status for async_api jobs — update_job_failure() defers the terminal outcome to the result handler, which decides SUCCESS/FAILURE from the final processed/failed counts against FAILURE_THRESHOLD.
3 The reaper no longer overwrites a concurrent writer's logs / progress.errors. Terminal and revoke writes use save(update_fields=[...]) (status, progress, finished_at, updated_at) instead of a full-row save() from a stale snapshot.
4 The reaper no longer prolongs lock contention with the live result handler. Celery/Redis lookups now run in an unlocked first phase; a short select_for_update() second phase re-checks the staleness predicate and persists. If the job progressed or finalized in between, the row no longer matches and it is skipped.
5 New tri-state completeness helper. AsyncJobStateManager.all_tasks_processed()True (both pending sets empty / total 0), False (work remains), None (state unavailable: cleaned up, expired, never initialized, or RedisError).

Detailed Description

This sits alongside the status-write hardening in #1338 / #1342: those make the lock-free status writers safe; this makes the reaper's decision trustworthy and keeps its non-status JSONB writes (logs, progress.errors) from being clobbered. The reaper remains the deliberate last-resort writer — it keeps a broader from-set under its row lock and only accelerates a SUCCESS when Redis confirms the work is drained, otherwise it revokes.

all_tasks_processed() returns None for several distinct causes (missing keys vs transient RedisError). The single caller treats None as "cannot confirm" and falls back to progress.is_complete(), which is a safe degradation regardless of cause. If a future caller needs to distinguish causes, AsyncJobStateManager.diagnose_missing_state() (added in #1241) already exposes the richer detail.

How to Test the Changes

  • pytest ami/jobs/tests/test_tasks.py::TestCheckStaleJobsReaperGuard
  • Full ami/jobs/tests/test_tasks.py + ami/ml/tests.py pass locally (107 passed, 2 skipped).

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Refs #1337. Closes #1276's dangling-revoke symptom; complements #1338 / #1342.

Summary by CodeRabbit

  • Bug Fixes

    • Strengthened job status tracking for asynchronous operations by consulting multiple system state sources, preventing jobs from remaining in incorrect or stalled states when processing is disrupted.
  • Tests

    • Added extensive test coverage for asynchronous job state management, including scenarios where internal systems may fall out of sync.

Copilot AI review requested due to automatic review settings April 29, 2026 22:11
@netlify

netlify Bot commented Apr 29, 2026

Copy link
Copy Markdown

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 27e24f8
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/6a39da26e1df6a0008479b2b

@netlify

netlify Bot commented Apr 29, 2026

Copy link
Copy Markdown

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 27e24f8
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6a39da262ff6f1000859dd28

@coderabbitai

coderabbitai Bot commented Apr 29, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Warning

Review limit reached

@mihow, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 53 minutes and 14 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits.

🚦 How do rate limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, the refill rate gradually slows as usage increases. The highest same-day bursts are limited more strictly.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ebe3dd43-89fa-492a-99c7-995268c4ad50

📥 Commits

Reviewing files that changed from the base of the PR and between 6c39917 and 27e24f8.

📒 Files selected for processing (4)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/orchestration/async_job_state.py
  • ami/ml/tests.py
📝 Walkthrough

Walkthrough

AsyncJobStateManager gains a new tri-state all_tasks_processed() method that pipelines Redis SCARD checks on both stage pending sets. check_stale_jobs is restructured into a lock-free state-fetch phase and a short locked transaction phase, now using all_tasks_processed() as the terminal oracle for ASYNC_API/Celery SUCCESS jobs. Celery FAILURE is demoted from terminal to revoke. DB writes are narrowed to specific fields to prevent JSONB clobbering.

Changes

Redis-backed terminal-state guard for stale job reaper

Layer / File(s) Summary
AsyncJobStateManager.all_tasks_processed() method and unit tests
ami/ml/orchestration/async_job_state.py, ami/ml/tests.py
Adds all_tasks_processed() returning True/False/None via a Redis pipeline over stage pending sets and job total. Tests cover fresh init, partial/full drain, zero-image, never-initialized, post-cleanup, and Redis error with WARNING fallback.
check_stale_jobs two-phase restructure and Redis-backed terminal eligibility
ami/jobs/tasks.py
Restructures check_stale_jobs into a lock-free phase (Celery + Redis drain check) and a short locked transaction. ASYNC_API/Celery SUCCESS defers to all_tasks_processed(); ASYNC_API/Celery FAILURE is forced to revoke; Redis-unavailable falls back to job.progress.is_complete(). Both terminal and revoke DB writes are narrowed to status, progress, finished_at, updated_at.
TestCheckStaleJobsReaperGuard integration tests
ami/jobs/tests/test_tasks.py
Adds TransactionTestCase with helpers for stale ASYNC_API job creation, updated_at backdating, and clobbered/complete progress shapes. Six tests cover all decision branches: Redis drained, Redis pending, Redis absent with complete/incomplete progress, SYNC_API bypass, and ASYNC_API Celery FAILURE → REVOKED.

Sequence Diagram(s)

sequenceDiagram
    participant Reaper as check_stale_jobs
    participant Celery as AsyncResult
    participant Redis as AsyncJobStateManager
    participant DB as Job (PostgreSQL)

    Reaper->>Celery: AsyncResult(job.task_id).state
    Celery-->>Reaper: SUCCESS / FAILURE / PENDING
    alt ASYNC_API and Celery SUCCESS
        Reaper->>Redis: all_tasks_processed()
        Redis-->>Reaper: True / False / None
        note over Reaper: None → WARNING + fallback to progress.is_complete()
    else ASYNC_API and Celery FAILURE
        note over Reaper: is_terminal = False (forced revoke)
    else SYNC_API or non-READY state
        note over Reaper: is_terminal from READY_STATES membership
    end
    Reaper->>DB: SELECT FOR UPDATE (re-check staleness)
    alt is_terminal
        Reaper->>DB: update(status, progress, finished_at, updated_at)
    else not terminal
        Reaper->>DB: update(status=REVOKED, progress, finished_at, updated_at)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Possibly related PRs

  • RolnickLab/antenna#1114: The Redis-unavailable fallback path in the new check_stale_jobs calls job.progress.is_complete(), the helper introduced in this earlier PR.
  • RolnickLab/antenna#1169: Both PRs modify check_stale_jobs terminal-state and revoke decision logic in ami/jobs/tasks.py at the same function.
  • RolnickLab/antenna#1234: That PR adjusts Redis pending-set removal in AsyncJobStateManager.update_state(), directly affecting the drain signal that the new all_tasks_processed() reads.

Poem

🐇 Hippity-hop through the Redis rows,
A pending set drains wherever it goes.
SCARD returns zero — the tasks are done!
No JSONB clobbered, no false REVOKE won.
The reaper now asks Redis before the DB lock,
And stale jobs are handled right 'round the clock! 🕐

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main change: it addresses the issue where async jobs incorrectly transition to REVOKED status despite completion.
Description check ✅ Passed The description includes all major required sections: Summary, List of Changes (in detailed table format), Related Issues (references #1337, #1276, #1338, #1342), Detailed Description, How to Test, and Checklist with most items completed.
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/job-complete-check

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates the async job reaper completion guard to consult Redis (pending-set drain) directly to avoid incorrect REVOKED outcomes caused by concurrent JSONB Job.progress clobbering.

Changes:

  • Add AsyncJobStateManager.all_tasks_processed() -> bool | None backed by Redis SCARD/GET to determine whether async NATS-tracked tasks are fully drained.
  • Update check_stale_jobs to use the Redis-backed completion check for async_api jobs, with a warning + progress.is_complete() fallback when Redis state is unavailable.
  • Add test coverage for the new tri-state Redis oracle and reaper guard behavior (including the “clobbered progress but drained Redis” case).

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
docs/claude/prompts/NEXT_SESSION_PROMPT.md Adds session/runbook notes for implementing and validating the Redis-backed reaper guard.
docs/claude/planning/job-complete-check.md Planning doc describing rationale, scope, risks, and acceptance criteria for the Redis-backed completion check.
ami/ml/orchestration/async_job_state.py Implements all_tasks_processed() using Redis to determine pending work across stages.
ami/ml/tests.py Adds unit tests covering all_tasks_processed() True/False/None behaviors, including RedisError.
ami/jobs/tasks.py Replaces async reaper guard from progress.is_complete() to Redis-backed all_tasks_processed() with fallback.
ami/jobs/tests/test_tasks.py Adds integration-style tests for the reaper guard outcomes across Redis drained/pending/absent and sync_api unaffected.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ami/ml/orchestration/async_job_state.py
Comment thread ami/jobs/tasks.py Outdated
Comment thread ami/jobs/tasks.py Outdated
@mihow mihow changed the title fix(jobs): reaper checks Redis directly via all_tasks_processed fix(jobs): fix dangling jobs from going to revoked Apr 29, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
docs/claude/prompts/NEXT_SESSION_PROMPT.md (1)

211-215: Add a language to this fenced block.

markdownlint MD040 will keep flagging this unlabeled fence. text would work fine here since this is captured command output, not executable code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/claude/prompts/NEXT_SESSION_PROMPT.md` around lines 211 - 215, The
fenced block containing the git output (the lines starting with "$ git log
--oneline fix/job-complete-check ^main" and the two commit lines) is missing a
language tag and triggers markdownlint MD040; update the opening fence from ```
to ```text so the block is labeled as plain text. Ensure only the opening fence
is changed (leave the content and closing ``` intact) so the block is recognized
as non-executable text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 972-983: The current fast-path treats Celery states.SUCCESS and
states.FAILURE as terminal for is_async_api jobs by querying
AsyncJobStateManager(job.pk).all_tasks_processed(), which wrongly forces FAILURE
before the async result path can apply failure-threshold logic (see
update_job_failure()). Change the condition so only states.SUCCESS is considered
a terminal fast-path (i.e., `if is_async_api and celery_state is
states.SUCCESS:`), keep the same handling for processed==False/None and
job.progress.is_complete(), and do not treat states.FAILURE as authoritative
here; add a regression test that simulates celery_state=FAILURE with
Redis-drained (AsyncJobStateManager processed True) and verifies final status is
decided by the async result/failure-threshold logic instead of being forced to
FAILURE.

---

Nitpick comments:
In `@docs/claude/prompts/NEXT_SESSION_PROMPT.md`:
- Around line 211-215: The fenced block containing the git output (the lines
starting with "$ git log --oneline fix/job-complete-check ^main" and the two
commit lines) is missing a language tag and triggers markdownlint MD040; update
the opening fence from ``` to ```text so the block is labeled as plain text.
Ensure only the opening fence is changed (leave the content and closing ```
intact) so the block is recognized as non-executable text.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bd872a1d-acf2-4a53-9a3e-a5b5e6cefa12

📥 Commits

Reviewing files that changed from the base of the PR and between a347d55 and 36b17c4.

📒 Files selected for processing (6)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/orchestration/async_job_state.py
  • ami/ml/tests.py
  • docs/claude/planning/job-complete-check.md
  • docs/claude/prompts/NEXT_SESSION_PROMPT.md

Comment thread ami/jobs/tasks.py Outdated
mihow added a commit that referenced this pull request Apr 30, 2026
Low-priority follow-up to #1276. Three sources of truth for async_api job
lifecycle (Job model, Redis sets, NATS consumer state) lie differently;
unifying them into a diagnostic view catches bug classes Redis alone
can't see (#1247-style stranding, result-handler swallow, hung in-flight
workers). Doc proposes snapshot-on-terminal, diagnostics endpoint,
JobReconciler.diagnose(), and an admin UI panel.

Tracked in #1285.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow

mihow commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator Author

Claude says: (posted on Michael's behalf — part of a structural review pass over the four open PSv2-readiness PRs: #1276, #1279, #1312, #1324, toward the umbrella #910.)

The direction here is right: the reaper was making a destructive decision (REVOKE) based on the one mirror known to lie (Job.progress after a concurrent-writer clobber), and re-pointing it at the atomic Redis tracker is the correct re-ordering of trust. The tri-state design is careful — distinguishing RedisError from genuinely-absent state and refusing to REVOKE on a Redis blip — and the tests are strong, including a faithful reproduction of the job-clobber incident with raw-update staleness. The dispatch-mode gate keeps sync jobs on the old path, and the updated_at-based staleness trigger remains the right one for long-running jobs.

Things worth discussing before merge:

  1. The is_terminal fast-path may widen a known-wrong behavior for the FAILURE case (echoing CodeRabbit's open comment, which looks legitimate). For async_api, celery's FAILURE on run_job is deliberately non-authoritative — update_job_failure defers it because the result handler owns the threshold-derived outcome. Post-PR, a job with drained Redis + celery FAILURE gets force-FAILED even when the failure count was below threshold and the natural outcome was SUCCESS. All five reaper tests use celery_states.SUCCESS, so the FAILURE branch is untested. Scoping the fast-path to SUCCESS-only looks like a two-line change that removes the concern.

  2. The fallback arm re-imports the bug this PR works around. The same clobber race has a second shape: writer A sees completion → sets SUCCESS → cleanup deletes the Redis keys; a slower writer B then commits a stale snapshot including status=STARTED, resurrecting the job after Redis is gone. The reaper then gets None from all_tasks_processed() and falls back to the clobbered progress.is_complete() — same wrong REVOKE, new WARNING line. Not an argument against this PR (it's an honest mitigation), but it does mean the underlying fix — making progress writes atomic instead of read-modify-write — should be tracked as a first-class issue rather than a "follow-up" note. As far as we can find, no open issue owns that fix today: feat(job): refactor job logging so it isn't a bottleneck #1256 contains the contention analysis (its option D), Review how the state and progress of jobs are tracked #1285 names the race but scopes itself to diagnostics. We'll file a dedicated issue and link it here.

  3. Interaction with Improve celery task dispatch and cancellation to prevent stuck jobs #1324 is worth a joint look before both merge. Improve celery task dispatch and cancellation to prevent stuck jobs #1324's acks_late/reject_on_worker_lost is built so a crashed worker's run_job redelivers and resumes; this PR's reaper REVOKEs anything stale past the cutoff, which quietly nullifies that resume for any outage longer than the window. Separately, Improve celery task dispatch and cancellation to prevent stuck jobs #1324's cancel path tears down Redis while the job sits in CANCELING — making this PR's "Redis absent → fallback" branch much more common than the rare case it's described as. The guards do compose without zombies as far as we can trace, but neither PR currently mentions the other, and the combined behavior should be a deliberate choice.

  4. The bundled docs/claude/planning/ and NEXT_SESSION_PROMPT.md files contain internal deployment specifics (instance log paths, references to internal infra docs) that probably shouldn't live in a public repo, and they make the reviewable diff much larger than the code change. Suggest moving them out or trimming before merge.

Smaller cleanups, low priority: the plan doc's race analysis says a just-missed SREM defers to the next reaper tick, but the code REVOKEs in the same tick when processed is False — tiny window and matches pre-PR behavior, just worth correcting in the doc; and the open bot threads (None-conflation, Redis I/O under the row lock, double-WARNING) could use a reply or a fix each.

With item 1 scoped to SUCCESS-only and the docs trimmed, this looks mergeable as the mitigation it sets out to be — and the atomic-progress-writes issue becomes the load-bearing follow-up for the PSv2 default flip.


Claude says: Filed the atomic-progress-writes root-cause issue as promised: #1337 ("Saving job progress concurrently is the root of multiple issues related to incorrect job statuses"). It collects this PR, #1324, #1287, #1288, the prior attempts (#981 closed, #1026 draft), and the appearance-vs-failure breakdown — including why a wrong status isn't just cosmetic (it keeps the job claimable via /next, see #1265/#1282).

@mihow mihow added the PSv2 Async & distributed ML backend (PSv2): job state, NATS dispatch, result handling. Umbrella #515. label Jun 16, 2026
@mihow mihow force-pushed the fix/job-complete-check branch from 563530f to 6c39917 Compare June 22, 2026 21:18

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 1063-1069: The code currently treats missing Redis state and Redis
errors identically by falling back to progress.is_complete(), which is unsafe
during transient Redis failures. Modify all_tasks_processed() to distinguish
between these two cases (missing key returns one value, Redis errors return
another), then update the conditional logic where processed is None is checked
to only use the progress.is_complete() fallback when the key is confirmed
absent, while skipping the job for the current tick when a Redis error occurs.
The fallback should only apply to confirmed missing state, not to transient
Redis errors.
- Around line 709-714: The current implementation at
job.save(update_fields=["progress", "updated_at"]) writes the entire progress
JSONB object from the in-memory snapshot, which can lose concurrent updates to
progress.errors or other sub-fields. Replace this save operation with a
server-side atomic JSONB patch that updates only the progress.summary.status
field using database-level operations (such as Django's F expressions with JSONB
update/CAS), ensuring that concurrent modifications to other progress sub-fields
are not clobbered. Apply this same change to the other occurrences mentioned in
the review at the specified line ranges to ensure consistent atomic updates
across all affected locations.
- Around line 1029-1039: When an exception occurs during the
AsyncResult(job.task_id).state call, add a continue statement in the except
block to skip the current job iteration instead of allowing celery_state to
remain None and fall through to the revoke branch. This prevents transient
Celery result-backend read failures from causing valid jobs to be revoked, and
allows the reaper to retry on the next tick instead.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3ce7c971-5acb-4189-b7b1-281c45799570

📥 Commits

Reviewing files that changed from the base of the PR and between 36b17c4 and 6c39917.

📒 Files selected for processing (4)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/orchestration/async_job_state.py
  • ami/ml/tests.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • ami/ml/orchestration/async_job_state.py
  • ami/jobs/tests/test_tasks.py

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 1063-1069: The code currently treats missing Redis state and Redis
errors identically by falling back to progress.is_complete(), which is unsafe
during transient Redis failures. Modify all_tasks_processed() to distinguish
between these two cases (missing key returns one value, Redis errors return
another), then update the conditional logic where processed is None is checked
to only use the progress.is_complete() fallback when the key is confirmed
absent, while skipping the job for the current tick when a Redis error occurs.
The fallback should only apply to confirmed missing state, not to transient
Redis errors.
- Around line 709-714: The current implementation at
job.save(update_fields=["progress", "updated_at"]) writes the entire progress
JSONB object from the in-memory snapshot, which can lose concurrent updates to
progress.errors or other sub-fields. Replace this save operation with a
server-side atomic JSONB patch that updates only the progress.summary.status
field using database-level operations (such as Django's F expressions with JSONB
update/CAS), ensuring that concurrent modifications to other progress sub-fields
are not clobbered. Apply this same change to the other occurrences mentioned in
the review at the specified line ranges to ensure consistent atomic updates
across all affected locations.
- Around line 1029-1039: When an exception occurs during the
AsyncResult(job.task_id).state call, add a continue statement in the except
block to skip the current job iteration instead of allowing celery_state to
remain None and fall through to the revoke branch. This prevents transient
Celery result-backend read failures from causing valid jobs to be revoked, and
allows the reaper to retry on the next tick instead.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3ce7c971-5acb-4189-b7b1-281c45799570

📥 Commits

Reviewing files that changed from the base of the PR and between 36b17c4 and 6c39917.

📒 Files selected for processing (4)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_tasks.py
  • ami/ml/orchestration/async_job_state.py
  • ami/ml/tests.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • ami/ml/orchestration/async_job_state.py
  • ami/jobs/tests/test_tasks.py
🛑 Comments failed to post (3)
ami/jobs/tasks.py (3)

709-714: 🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Use an atomic JSONB patch for progress.summary.status.

These saves still include progress, so they write the entire JSONB value from the in-memory snapshot. That avoids clobbering unrelated columns like logs, but it can still lose concurrent progress.errors or stage updates; update only the status columns here and patch progress.summary.status with a server-side JSONB update/CAS.

Based on learnings, jobs_job.progress is JSONB and job.save(update_fields=["progress", ...]) still writes the entire JSON dict, so it is not concurrency protection for sub-fields.

Also applies to: 1096-1102, 1124-1127

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@ami/jobs/tasks.py` around lines 709 - 714, The current implementation at
job.save(update_fields=["progress", "updated_at"]) writes the entire progress
JSONB object from the in-memory snapshot, which can lose concurrent updates to
progress.errors or other sub-fields. Replace this save operation with a
server-side atomic JSONB patch that updates only the progress.summary.status
field using database-level operations (such as Django's F expressions with JSONB
update/CAS), ensuring that concurrent modifications to other progress sub-fields
are not clobbered. Apply this same change to the other occurrences mentioned in
the review at the specified line ranges to ensure consistent atomic updates
across all affected locations.

Source: Learnings


1029-1039: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Skip the job when Celery state cannot be read.

After this exception path, celery_state stays None, so the same iteration falls through to the revoke branch. A transient Celery/result-backend read failure can therefore turn a stale-but-valid job into REVOKED; retry on the next reaper tick instead.

Suggested fix
             except Exception:
                 logger.warning(
                     "Failed to fetch Celery state for stale job %s (task_id=%s)",
                     job.pk,
                     job.task_id,
                     exc_info=True,
                 )
-                # Treat as unknown state — job will be revoked below.
+                # Celery state is inconclusive; avoid a destructive revoke and
+                # retry on the next reaper tick.
+                continue
🧰 Tools
🪛 Ruff (0.15.18)

[warning] 1031-1031: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@ami/jobs/tasks.py` around lines 1029 - 1039, When an exception occurs during
the AsyncResult(job.task_id).state call, add a continue statement in the except
block to skip the current job iteration instead of allowing celery_state to
remain None and fall through to the revoke branch. This prevents transient
Celery result-backend read failures from causing valid jobs to be revoked, and
allows the reaper to retry on the next tick instead.

Source: Linters/SAST tools


1063-1069: 🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Don’t fall back to racy Job.progress on transient Redis errors.

all_tasks_processed() returns None for both missing Redis state and RedisError, but only the missing-state case is safe to resolve via progress.is_complete(). On a Redis blip, this can still revoke an async_api/Celery-SUCCESS job based on the JSONB snapshot this PR is trying not to trust. Surface Redis errors separately and skip this job for the tick; reserve the fallback for confirmed absent keys.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@ami/jobs/tasks.py` around lines 1063 - 1069, The code currently treats
missing Redis state and Redis errors identically by falling back to
progress.is_complete(), which is unsafe during transient Redis failures. Modify
all_tasks_processed() to distinguish between these two cases (missing key
returns one value, Redis errors return another), then update the conditional
logic where processed is None is checked to only use the progress.is_complete()
fallback when the key is confirmed absent, while skipping the job for the
current tick when a Redis error occurs. The fallback should only apply to
confirmed missing state, not to transient Redis errors.

mihow and others added 6 commits June 22, 2026 16:57
Job 2521 ended REVOKED despite NATS+Redis both confirming all 4510 tasks
processed. Root cause: `_update_job_progress` writes the entire
`Job.progress` JSONB blob without a row lock since #1261 dropped
`select_for_update`. Concurrent workers raced; a slower committer
clobbered the faster committer's SUCCESS write, leaving progress at
`processed=4509 remaining=1 STARTED`. The reaper read the clobbered
`progress.is_complete()` -> False -> REVOKE.

Add `AsyncJobStateManager.all_tasks_processed() -> bool | None`
(tri-state) backed directly by Redis SCARD across both pending sets, and
inline it at the reaper guard in `check_stale_jobs`. Sync_api jobs
unchanged (dispatch_mode gate). When Redis state is absent (cleanup,
expiry, never initialized, or transient RedisError), fall back to
`progress.is_complete()` and emit a WARNING so monitoring can flag the
fallback path.

Other 5 `progress.is_complete()` callers stay as-is — only the reaper
guard is bitten by the clobber. Atomic-write follow-up tracked separately.

Co-Authored-By: Claude <noreply@anthropic.com>
Address review feedback on the stale-job reaper guard.

CodeRabbit (major): the reaper trusted both Celery SUCCESS and FAILURE
as terminal for drained async_api jobs. But update_job_failure() defers
post-queue run_job failures for async_api jobs to the async result
handler, which decides the final status from the processed/failed counts
against FAILURE_THRESHOLD (a drained-but-failed task can still resolve to
SUCCESS). Trusting Celery FAILURE here would force the job to FAILURE and
bypass that logic. Scope the Redis-backed fast-path to SUCCESS only; a
stale async_api job whose Celery task ended FAILURE now falls through to
the revoke branch instead. Add a regression test.

Copilot (nit): the fallback WARNING said "Redis state absent" but
all_tasks_processed() also returns None on transient RedisError. Reworded
to "Redis state unavailable" to cover both causes; updated the assertions.

Narrow both reaper save points to update_fields=[status, progress,
finished_at, updated_at]. A full save() re-writes the whole row from the
snapshot fetched at select_for_update() time, which can clobber `logs`
and `progress.errors` written by a concurrent _update_job_progress (no
row lock since #1261). Mirrors the narrowed-write pattern already used in
_update_job_progress.

Also remove three docs/claude session-artifact files that were added on
this branch. They contain production job IDs, internal infrastructure
references, and other operational detail that does not belong in a public
repository.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The stale-job reaper fetched Celery task state and queried Redis
(all_tasks_processed) while holding a select_for_update() row lock inside
transaction.atomic(). Those are network round-trips, and holding the lock
across them prolongs contention with the lock-free result handler that
writes the same row.

Split into two phases: gather the Celery/Redis decision with no row lock,
then re-fetch under a short select_for_update() with the same staleness
predicate and persist. If the job progressed or finalized while off the
lock, the row no longer matches and it is skipped.
…p ci]

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Two docstrings in TestCheckStaleJobsReaperGuard referenced past context
("the incident case", "pre-fix", "mimic the incident's") that only
parsed if the reader already knew the backstory. Rewrite them in
present-tense, condition-first form so each docstring describes what
the test verifies without requiring knowledge of any prior incident.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Add a deterministic test for the race window introduced by the two-phase
reaper refactor: a job that is stale at Phase-1 read but finalized by a
concurrent writer before Phase-2's select_for_update re-fetch must be
skipped entirely, with the concurrent writer's status preserved.

The test patches AsyncJobStateManager.all_tasks_processed so that its
side effect advances the job to SUCCESS (moving it outside running_states)
before returning, then asserts that check_stale_jobs makes no further
write and leaves the status as SUCCESS.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@mihow mihow force-pushed the fix/job-complete-check branch from 6c39917 to 3544558 Compare June 23, 2026 00:12
Comment thread ami/ml/migrations/0028_normalize_empty_endpoint_url_to_null.py
Comment thread ami/ml/migrations/0027_rename_last_checked_to_last_seen.py
These two migrations were incidentally reformatted by black (a blank line removed
after the Migration class declaration) while this branch was being tidied. Existing
migrations should not be modified, so this restores them byte-for-byte to the form
already on main. No behavior change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mihow

mihow commented Jun 23, 2026

Copy link
Copy Markdown
Collaborator Author

Claude says: Closing this and folding the reaper hardening into #1337, where the root fix (atomic progress writes) lives.

Why: the core idea here — the stale-job reaper consulting Redis (all_tasks_processed()) as the source of truth before it REVOKEs, instead of the clobberable progress.is_complete() — is sound and is genuinely not covered by the merged #1338 / #1342 (those guard the status-write side). But a review pass found two blockers that make it net-risky to land standalone:

  1. It introduces a cancel-override bug. The SUCCESS fast-path re-fetches with status__in=running_states() (which includes CANCELING) and writes via a plain update_status(), not the guarded transition. So a stale CANCELING job whose Celery task reports SUCCESS gets force-flipped to SUCCESS — overriding an in-flight cancel, the exact resurrection race Stop a finished job from being pulled back to running by a slower worker #1338 was built to stop.
  2. The "does not clobber concurrent progress.errors" claim is false. save(update_fields=[…, "progress", …]) rewrites the whole progress JSONB from the reapers stale snapshot, and errors lives inside progress — so the narrow write protects other columns but not the two fields the comment names.

Since it is a mitigation (not the root fix) and adds a new bug, the reaper-consults-Redis improvement is better done as part of #1337 Layer 2. Captured the salvageable design there. The all_tasks_processed() tri-state helper is a good starting point and can be lifted from this branch (fix/job-complete-check).

@mihow mihow closed this Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

PSv2 Async & distributed ML backend (PSv2): job state, NATS dispatch, result handling. Umbrella #515.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants