Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions backend/src/api/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,19 +771,43 @@ async def get_merge_job(
# needed.
info = await JobQueue.get_job_info(merge_job_id)
if info is None:
# Distinguishes "Redis doesn't know this job_id at all" from
# the function/args mismatch branches below. Each branch
# returns the same opaque 404 to callers to avoid leaking
# the existence (or shape) of other tasks' jobs, but the
# server log line tells us which case fired so we can debug
# disappearing-job reports (worker on different Redis, key
# eviction, race, etc.).
logger.warning(
"Merge job not found: info=None merge_job_id=%s task_id=%s",
merge_job_id,
task_id,
)
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)
if info.function != "merge_clips_job":
# Wrong fn => not a merge job at all; treat as not-found rather
# than leak which functions exist.
logger.warning(
"Merge job not found: function mismatch merge_job_id=%s task_id=%s function=%s",
merge_job_id,
task_id,
info.function,
)
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)
# args === (task_id, clip_ids) per merge_clips_job signature.
# Mismatch means the caller is asking about a job that belongs
# to a different task — pretend it doesn't exist.
if not info.args or info.args[0] != task_id:
logger.warning(
"Merge job not found: args mismatch merge_job_id=%s task_id=%s info_args=%r",
merge_job_id,
task_id,
info.args,
)
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)
Expand All @@ -795,6 +819,11 @@ async def get_merge_job(
if status_str is None:
# Race: arq evicted the job's status entry between our info()
# call and now. Treat as not-found.
logger.warning(
"Merge job not found: status=None after info OK merge_job_id=%s task_id=%s",
merge_job_id,
task_id,
)
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)
Expand Down
10 changes: 9 additions & 1 deletion backend/src/workers/job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,16 @@ def _job(cls, pool: ArqRedis, job_id: str) -> Job:
Job(job_id=..., redis=pool) constructor. The Job handle is
cheap to create — it's just a pair of references — and reading
from Redis happens lazily on info()/status()/result().

We pin `_queue_name` to our default queue so Job.status() can
find queued-but-not-yet-started jobs (status() reads the queue
ZSET via `zscore(self._queue_name, job_id)` to detect the
`queued` state). Without this, status() would default to
arq's `arq:queue` and return `not_found` for anything still
waiting in our `supoclip_tasks` queue. info() and result() are
queue-agnostic.
"""
return Job(job_id=job_id, redis=pool)
return Job(job_id=job_id, redis=pool, _queue_name=DEFAULT_QUEUE_NAME)

@classmethod
async def get_job_result(cls, job_id: str):
Expand Down
Loading