feat(merge): land async merge endpoint + worker on main#7
Conversation
Synchronous /clips/merge has been blowing past the ALB idle timeout
(default 60s, bumped to 300s as a band-aid) for any merge wider than
a couple of short clips — ffmpeg concat with libx264 preset=slow
CRF=18 is just slow enough on Fargate that the connection drops mid-
encode and the BN tool sees a 504.
This adds the canonical async pattern the rest of the pipeline already
uses (arq queue + worker pool):
POST /tasks/{task_id}/clips/merge_async
-> 202 {merge_job_id, status: "queued"}
Validation (ownership + clip existence + min 2 clips) runs
synchronously so bad input fails fast instead of burning a worker
slot to discover a typo.
GET /tasks/{task_id}/clips/merge_jobs/{merge_job_id}
-> {status, clip_id?, message?, error?}
status mirrors arq's JobStatus enum (queued | deferred |
in_progress | complete). On complete: clip_id+message on success,
error on worker exception.
Worker side is a thin wrapper around the existing
TaskService.merge_clips method — identical merge logic, just relocated
off the request thread. WorkerSettings.functions gets merge_clips_job
appended so the existing ECS worker service picks it up on next
deploy.
Old sync /clips/merge endpoint is left in place for back-compat —
docstring marks it deprecated. BN tools will migrate in a follow-up
PR; once that lands the sync route can be deleted.
Tests cover: happy enqueue, unknown clip rejection, single-clip
rejection, complete-with-result, complete-with-worker-error, and
unknown job_id.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
feat(merge): async /clips/merge_async endpoint + arq worker
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds async clip-merge enqueue and polling endpoints, an arq worker job for merging clips, JobQueue helpers to inspect job info/status/result, and integration tests covering enqueue, polling, and negative authorization/validation cases. ChangesAsync clip merge workflow
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/src/api/routes/tasks.py`:
- Around line 716-723: Persist and verify the task↔job association: after
calling JobQueue.enqueue_job("merge_clips_job", task_id, clip_ids) capture
merge_job_id and store it against the task (e.g., set a merge_job_id field on
the Task model or call a JobStore.link_job_to_task(task_id, merge_job_id))
before returning the response; then update the status/result lookup logic that
currently fetches by merge_job_id (the block around lines handling merge_job_id
status) to first load the Task by task_id and verify the stored merge_job_id
matches the provided merge_job_id (or reject/403 if not), ensuring you use the
same identifiers merge_job_id and task_id and the enqueue call
JobQueue.enqueue_job to locate where to add the persistence and the status-check
path to add the ownership check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cf884762-1c9a-4459-8316-4f55bb45c3f2
📒 Files selected for processing (3)
backend/src/api/routes/tasks.pybackend/src/workers/tasks.pybackend/tests/integration/test_health_and_tasks.py
…k probe Addresses CodeRabbit on #7. Prior /clips/merge_jobs/{job_id} only verified the caller owned the path task — not that the job actually belonged to that task. A user who legitimately owns task A could craft a request to /tasks/{A}/clips/merge_jobs/{B-job-id} and learn task B's merged clip_id (or error). Bug, not just a smell. Fix avoids new persistence by using arq's existing JobDef storage: - New JobQueue.get_job_info(job_id) returns the JobDef (carries the original (task_id, clip_ids) args from enqueue). - Endpoint now requires: job exists + function == 'merge_clips_job' + args[0] == path task_id. Mismatch returns 404 (not 403) so we don't leak which job_ids exist or which worker functions are registered. The ownership check runs before get_job_status, so a foreign-job probe never even reads the status — verified by status_mock.assert_not_awaited in the new test. Two new tests: - test_get_merge_job_rejects_cross_task_probe — the exact path CR called out - test_get_merge_job_rejects_wrong_function — defends against using this as a generic job introspection oracle (e.g. process_video_task ids) 118 tests collected (was 116). 97 unit tests still pass; integration tests skip locally without DATABASE_URL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
backend/src/workers/job_queue.py (1)
103-104: 💤 Low valueConsider adding return type annotation.
The method lacks a return type hint. Based on arq's API, this returns
Optional[JobDef]. Adding the annotation improves IDE support and documentation.- async def get_job_info(cls, job_id: str): + async def get_job_info(cls, job_id: str) -> Optional["JobDef"]:You may need to import
JobDeffromarq.jobsif using a concrete type, or useOptional[Any]ifJobDefisn't reliably exported.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/src/workers/job_queue.py` around lines 103 - 104, Add an explicit return type annotation to the classmethod get_job_info to indicate it returns Optional[JobDef] (or Optional[Any] if JobDef isn't exported) to improve IDE/type checking; update the signature of get_job_info to include -> Optional[JobDef], and add necessary imports (JobDef from arq.jobs and Optional from typing) at the top of the module so the annotation resolves.backend/src/api/routes/tasks.py (2)
709-714: 💤 Low valueConsider batching clip existence checks.
The loop issues N separate DB queries to validate clip ownership. For requests with many clips, a single query returning all matching clip IDs would reduce round-trips.
This is a minor optimization—current approach is correct and safe.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/src/api/routes/tasks.py` around lines 709 - 714, Replace the per-clip lookup loop that calls task_service.clip_repo.get_clip_by_id for each clip_id with a single batched query (e.g., add or use a method on clip_repo like get_clips_by_ids(db, clip_ids) that returns all clips for those IDs), then verify that every requested clip_id exists and belongs to the given task_id by comparing the returned clips' ids and task_id; if any are missing or have a different task_id, raise the same HTTPException for the offending ids. Use the existing symbols clip_ids, task_service.clip_repo (add get_clips_by_ids if necessary), db, and task_id to locate and replace the loop logic.
800-803: 💤 Low valueWorker exception details exposed to caller.
str(exc)is returned directly to the client. If the worker raises an exception containing internal paths or configuration details, those would leak. Consider sanitizing or truncating the error message for production.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/src/api/routes/tasks.py` around lines 800 - 803, The code currently assigns the raw exception string to response["error"] (in the except block handling worker failures), which can leak internal paths/config; replace this by returning a safe, generic error message (e.g., "worker failed" or include only a short, sanitized snippet) and record the full exception details to an internal logger; implement a sanitizer that strips file paths and tokens (use a regex to remove absolute paths and long tokens) and/or truncate to a max length (e.g., 200 chars) before exposing to the client, while logging str(exc) and traceback via the service logger for debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/src/workers/job_queue.py`:
- Around line 115-119: The code uses await pool.job(job_id) which doesn't exist
on ArqRedis; update get_job_result, get_job_status, and get_job_info to
construct an arq.jobs.Job instance instead (e.g., Job(job_id=job_id,
redis=pool)) and then call await job.result(), await job.status(), or await
job.info() respectively; ensure you import Job from arq.jobs and replace all
pool.job(...) occurrences with the Job(...) instantiation using the existing
cls.get_pool() result.
---
Nitpick comments:
In `@backend/src/api/routes/tasks.py`:
- Around line 709-714: Replace the per-clip lookup loop that calls
task_service.clip_repo.get_clip_by_id for each clip_id with a single batched
query (e.g., add or use a method on clip_repo like get_clips_by_ids(db,
clip_ids) that returns all clips for those IDs), then verify that every
requested clip_id exists and belongs to the given task_id by comparing the
returned clips' ids and task_id; if any are missing or have a different task_id,
raise the same HTTPException for the offending ids. Use the existing symbols
clip_ids, task_service.clip_repo (add get_clips_by_ids if necessary), db, and
task_id to locate and replace the loop logic.
- Around line 800-803: The code currently assigns the raw exception string to
response["error"] (in the except block handling worker failures), which can leak
internal paths/config; replace this by returning a safe, generic error message
(e.g., "worker failed" or include only a short, sanitized snippet) and record
the full exception details to an internal logger; implement a sanitizer that
strips file paths and tokens (use a regex to remove absolute paths and long
tokens) and/or truncate to a max length (e.g., 200 chars) before exposing to the
client, while logging str(exc) and traceback via the service logger for
debugging.
In `@backend/src/workers/job_queue.py`:
- Around line 103-104: Add an explicit return type annotation to the classmethod
get_job_info to indicate it returns Optional[JobDef] (or Optional[Any] if JobDef
isn't exported) to improve IDE/type checking; update the signature of
get_job_info to include -> Optional[JobDef], and add necessary imports (JobDef
from arq.jobs and Optional from typing) at the top of the module so the
annotation resolves.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2a2a8cda-d423-4a07-ac9e-a2de8f49b0ad
📒 Files selected for processing (3)
backend/src/api/routes/tasks.pybackend/src/workers/job_queue.pybackend/tests/integration/test_health_and_tasks.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/tests/integration/test_health_and_tasks.py
CodeRabbit was right (verified against arq 0.26 source): ArqRedis does
NOT expose a .job(job_id) method. The pre-existing get_job_result and
get_job_status methods predated my PR and were never called from
production code, so the AttributeError was latent.
My new poll endpoint is the first real caller. Without this fix, the
first request to /clips/merge_jobs/{id} would crash with:
AttributeError: 'ArqRedis' object has no attribute 'job'
Fix: use the public Job(job_id=..., redis=pool) constructor from
arq.jobs. The Job handle is cheap (just a pair of references); Redis
reads happen lazily on info()/status()/result().
Also normalise get_job_status' handling of arq's JobStatus.not_found
to None at the JobQueue boundary, so callers in the routes don't need
to import arq internals to check for missing jobs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/src/workers/job_queue.py`:
- Around line 113-116: The method in job_queue.py is declared to return
Optional[str] and the docstring says it should return the JobStatus enum value
as a string, but it currently returns the enum object `status`; update the
return to return the computed `status_str` (and still return None for
"not_found") so the function returns a string as declared (verify the function
that contains `status_str = str(status).split(".")[-1].lower()` uses `return
status_str` instead of `return status` and update any related docstring or type
hints if needed).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dcca30fe-15af-41b2-9da4-679735afaf10
📒 Files selected for processing (1)
backend/src/workers/job_queue.py
I added the not_found-to-None normalisation but forgot to return the
converted string — the function still returned the enum object through
its Optional[str] signature. Worked by accident because the route did
its own str().split() conversion on top.
Now:
- JobQueue.get_job_status returns the lowercased status string (or
None) as advertised — single source of truth for the normalisation
- Route consumes the value directly without re-doing the split
- Tests' mock return values updated to match the new wire shape
("complete" instead of "JobStatus.complete")
97 unit tests still pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
backend/src/api/routes/tasks.py (1)
716-718: ⚡ Quick winHonor the configured queue adapter in the async merge flow.
create_task()already usesrequest.app.state.queue_adapter, but these new paths hardcodeJobQueue. Any injected adapter or test double will be ignored for merge enqueueing and polling.♻️ Suggested change
+ queue_adapter = getattr(request.app.state, "queue_adapter", JobQueue) - merge_job_id = await JobQueue.enqueue_job( + merge_job_id = await queue_adapter.enqueue_job( "merge_clips_job", task_id, clip_ids )+ queue_adapter = getattr(request.app.state, "queue_adapter", JobQueue) - info = await JobQueue.get_job_info(merge_job_id) + info = await queue_adapter.get_job_info(merge_job_id) if info is None: raise HTTPException( status_code=404, detail=f"Merge job {merge_job_id} not found" ) @@ - status_str = await JobQueue.get_job_status(merge_job_id) + status_str = await queue_adapter.get_job_status(merge_job_id) @@ - result = await JobQueue.get_job_result(merge_job_id) + result = await queue_adapter.get_job_result(merge_job_id)Also applies to: 757-779, 794-794
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/src/api/routes/tasks.py` around lines 716 - 718, The merge enqueue/poll paths currently call JobQueue.enqueue_job (e.g., assign to merge_job_id) which bypasses the injected adapter used by create_task(); replace direct JobQueue calls with the app-provided adapter (request.app.state.queue_adapter) so enqueueing and any polling use the configured/test double adapter — specifically swap JobQueue.enqueue_job(...) to request.app.state.queue_adapter.enqueue_job(...) and any JobQueue.poll_/get_/delete_ usages in the related merge flow (the blocks around merge_job_id and the sections noted ~757-779 and ~794) to the corresponding methods on request.app.state.queue_adapter (ensure the adapter exposes the same method names like enqueue_job and poll_job/get_job as used here).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/src/api/routes/tasks.py`:
- Around line 696-700: The route currently calls await request.json() and later
accesses payload and clip_ids, but a JSONDecodeError from request.json() is not
caught and bubbles up as a 500; update the handler around the await
request.json() call to catch JSONDecodeError (or ValueError raised by the JSON
parser) and raise HTTPException(status_code=400, detail="Malformed JSON body")
instead; ensure the same pattern is applied to the similar block that processes
payload and clip_ids (the code using payload and clip_ids in this file) so
malformed JSON consistently returns 400 rather than 500.
- Around line 739-745: Update the docstring for the "Poll a queued merge" route
to stop advertising a `not_found` status: state that status values mirror arq's
JobStatus enum (deferred | queued | in_progress | complete) and explicitly
document that missing jobs are returned as HTTP 404 (not a `not_found` job
status). Edit the docstring block around the polling handler (the "Poll a queued
merge" route) to remove or replace the `not_found` token and add a short
sentence noting the 404 behavior so generated API docs match the actual response
handling.
---
Nitpick comments:
In `@backend/src/api/routes/tasks.py`:
- Around line 716-718: The merge enqueue/poll paths currently call
JobQueue.enqueue_job (e.g., assign to merge_job_id) which bypasses the injected
adapter used by create_task(); replace direct JobQueue calls with the
app-provided adapter (request.app.state.queue_adapter) so enqueueing and any
polling use the configured/test double adapter — specifically swap
JobQueue.enqueue_job(...) to request.app.state.queue_adapter.enqueue_job(...)
and any JobQueue.poll_/get_/delete_ usages in the related merge flow (the blocks
around merge_job_id and the sections noted ~757-779 and ~794) to the
corresponding methods on request.app.state.queue_adapter (ensure the adapter
exposes the same method names like enqueue_job and poll_job/get_job as used
here).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 88a40723-5dba-4306-b289-dc70e0981228
📒 Files selected for processing (3)
backend/src/api/routes/tasks.pybackend/src/workers/job_queue.pybackend/tests/integration/test_health_and_tasks.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/src/workers/job_queue.py
- backend/tests/integration/test_health_and_tasks.py
Two minor CodeRabbit nits on #7: 1. POST /clips/merge_async wrapped request.json() in a bare except Exception → 500 that swallowed JSONDecodeError. Catch it explicitly and raise HTTPException(400, "Malformed JSON body"). Adds a test that posts non-JSON bytes. 2. GET /clips/merge_jobs/{id} docstring listed `not_found` as a status value, but missing jobs are converted to HTTP 404 by the handler itself. Generated OpenAPI docs would mislead clients. Update the docstring to explicitly call out 404-on-miss and drop `not_found` from the status enum list. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/src/api/routes/tasks.py`:
- Around line 697-704: After parsing JSON into payload (payload = await
request.json()), immediately validate that payload is a mapping: if not
isinstance(payload, dict): raise HTTPException(status_code=400,
detail="Top-level JSON body must be an object"); then continue to read clip_ids
= payload.get("clip_ids") or []. This prevents AttributeError when payload is an
array/string/number and ensures malformed shapes return 400 before accessing
payload.get.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b3ef2750-67e7-4a8a-9d62-a1109e85810c
📒 Files selected for processing (2)
backend/src/api/routes/tasks.pybackend/tests/integration/test_health_and_tasks.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/tests/integration/test_health_and_tasks.py
CR follow-up on #7. Last patch handled JSONDecodeError but a valid JSON array/scalar (e.g. POST body of "[\"a\", \"b\"]" or just "null") still slipped through and AttributeError'd on payload.get(...), which the bare-except turned back into a 500. Reject non-dict payloads explicitly with HTTPException(400). Test posts a top-level JSON array to confirm 400. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What
Brings the async `/clips/merge_async` work (originally merged as #6 into `fix/concat-absolute-paths`) onto main. Same changes already reviewed in #6; this is just the merge-up.
Why a separate PR
#6 was stacked on `fix/concat-absolute-paths` (#5's branch) because the worker depends on the absolute-paths fix. When #5 merged to main first via squash, this branch diverged. Opening this PR resolves the divergence without losing #6.
Verify
```
diff main...fix/concat-absolute-paths
```
should show only the worker registration in `backend/src/workers/tasks.py`, the two new routes in `backend/src/api/routes/tasks.py`, and the new integration tests.
Summary by CodeRabbit
New Features
Tests