Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion backend/src/api/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,12 @@ async def split_clip(
async def merge_clips(
task_id: str, request: Request, db: AsyncSession = Depends(get_db)
):
"""Merge multiple clips into one clip."""
"""Merge clips synchronously. Kept for back-compat — prefer /merge_async.

The ffmpeg concat-encode regularly exceeds the ALB idle timeout for
multi-clip composites and surfaces as a 504 here. New callers should
use the async variant.
"""
try:
payload = await request.json()
clip_ids = payload.get("clip_ids") or []
Expand All @@ -678,6 +683,137 @@ async def merge_clips(
raise HTTPException(status_code=500, detail=f"Error merging clips: {str(e)}")


@router.post("/{task_id}/clips/merge_async", status_code=202)
async def merge_clips_async(
task_id: str, request: Request, db: AsyncSession = Depends(get_db)
):
"""Enqueue a merge job and return immediately.

Poll status via GET /tasks/{task_id}/clips/merge_jobs/{merge_job_id}.
Validation (ownership, clip existence) runs synchronously so bad
requests fail fast instead of burning a worker slot.
"""
try:
payload = await request.json()
clip_ids = payload.get("clip_ids") or []
if not isinstance(clip_ids, list):
Comment thread
brad07 marked this conversation as resolved.
raise HTTPException(status_code=400, detail="clip_ids must be an array")
Comment thread
brad07 marked this conversation as resolved.
if len(clip_ids) < 2:
raise HTTPException(
status_code=400, detail="At least two clips are required to merge"
)

task_service = TaskService(db)
await _require_task_owner(request, task_service, db, task_id)

for clip_id in clip_ids:
clip = await task_service.clip_repo.get_clip_by_id(db, clip_id)
if not clip or clip["task_id"] != task_id:
raise HTTPException(
status_code=404, detail=f"Clip {clip_id} not found on task"
)

merge_job_id = await JobQueue.enqueue_job(
"merge_clips_job", task_id, clip_ids
)
logger.info(
f"Enqueued merge job {merge_job_id} task={task_id} clips={len(clip_ids)}"
)
return {"merge_job_id": merge_job_id, "status": "queued"}
except HTTPException:
Comment thread
brad07 marked this conversation as resolved.
raise
except Exception as e:
logger.error(f"Error enqueueing merge: {e}")
raise HTTPException(
status_code=500, detail=f"Error enqueueing merge: {str(e)}"
)


@router.get("/{task_id}/clips/merge_jobs/{merge_job_id}")
async def get_merge_job(
task_id: str,
merge_job_id: str,
request: Request,
db: AsyncSession = Depends(get_db),
):
"""Poll a queued merge.

Status values mirror arq's JobStatus enum (deferred | queued |
in_progress | complete | not_found). On `complete` the response
carries either `clip_id` + `message` (success) or `error` (worker
exception, surfaced as the str() of the raised exception).
"""
Comment thread
brad07 marked this conversation as resolved.
try:
task_service = TaskService(db)
await _require_task_owner(request, task_service, db, task_id)

# Bind the job to the path task before exposing its status/result.
# Owning *any* task isn't enough — without this check, a caller who
# guessed a merge_job_id could probe a job that belongs to a
# different task (and learn its merged clip_id). We verify via
# arq's stored JobDef which carries the original args we passed
# at enqueue (task_id, clip_ids) — no extra persistence layer
# needed.
info = await JobQueue.get_job_info(merge_job_id)
if info is None:
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)
if info.function != "merge_clips_job":
# Wrong fn => not a merge job at all; treat as not-found rather
# than leak which functions exist.
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)
# args === (task_id, clip_ids) per merge_clips_job signature.
# Mismatch means the caller is asking about a job that belongs
# to a different task — pretend it doesn't exist.
if not info.args or info.args[0] != task_id:
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)

# get_job_status normalises arq's JobStatus enum to a lowercase
# string (and returns None for not_found), so we can consume the
# value directly.
status_str = await JobQueue.get_job_status(merge_job_id)
if status_str is None:
# Race: arq evicted the job's status entry between our info()
# call and now. Treat as not-found.
raise HTTPException(
status_code=404, detail=f"Merge job {merge_job_id} not found"
)

response: Dict[str, Any] = {
"merge_job_id": merge_job_id,
"status": status_str,
}

if status_str == "complete":
try:
result = await JobQueue.get_job_result(merge_job_id)
if isinstance(result, dict):
response["clip_id"] = result.get("clip_id")
response["message"] = result.get("message")
else:
response["error"] = (
f"Unexpected worker result type: {type(result).__name__}"
)
except Exception as exc:
# arq raises the original worker exception when the job
# ended in failure; expose its string form to the caller.
response["error"] = str(exc)

return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching merge job status: {e}")
raise HTTPException(
status_code=500, detail=f"Error fetching merge job status: {str(e)}"
)


@router.patch("/{task_id}/clips/{clip_id}/captions")
async def update_clip_captions(
task_id: str, clip_id: str, request: Request, db: AsyncSession = Depends(get_db)
Expand Down
58 changes: 48 additions & 10 deletions backend/src/workers/job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional
from arq import create_pool
from arq.connections import RedisSettings, ArqRedis
from arq.jobs import Job
from ..config import get_config

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,20 +83,57 @@ async def enqueue_processing_job(
function_name, *args, _queue_name=queue_name, **kwargs
)

@classmethod
def _job(cls, pool: ArqRedis, job_id: str) -> Job:
"""Construct an arq Job handle for a given id.

ArqRedis itself has no `.job()` method (despite the obvious
name); the public API for looking up an existing job is the
Job(job_id=..., redis=pool) constructor. The Job handle is
cheap to create — it's just a pair of references — and reading
from Redis happens lazily on info()/status()/result().
"""
return Job(job_id=job_id, redis=pool)

@classmethod
async def get_job_result(cls, job_id: str):
"""Get the result of a completed job."""
"""Return the worker function's return value, or re-raise its exception."""
pool = await cls.get_pool()
job = await pool.job(job_id)
if job:
return await job.result()
return None
return await cls._job(pool, job_id).result()

@classmethod
async def get_job_status(cls, job_id: str) -> Optional[str]:
"""Get the status of a job."""
"""Return arq's JobStatus as a lowercase string, or None if unknown.

Normalising the enum -> str at the JobQueue boundary lets route
handlers consume the value directly without importing arq
internals (and prevents the easy bug of returning the enum object
through an Optional[str] signature).
"""
pool = await cls.get_pool()
status = await cls._job(pool, job_id).status()
if status is None:
return None
# arq.jobs.JobStatus renders as "JobStatus.complete" etc. Take
# the suffix and lowercase it for a stable wire shape.
status_str = str(status).split(".")[-1].lower()
# JobStatus.not_found is how arq signals a missing job — surface
# that as None at this boundary too.
if status_str == "not_found":
return None
return status_str

@classmethod
async def get_job_info(cls, job_id: str):
"""Return the JobDef (function name + args/kwargs) for a job.

Used to verify a polling request is authorised for the job it
names — callers can match args[N] against the path parameter
that should own the job, without needing a separate persistence
layer for the task↔job association. arq stores the job def in
Redis as long as the job exists or its result is still cached.

Returns None if the job is unknown to Redis.
"""
pool = await cls.get_pool()
job = await pool.job(job_id)
if job:
return await job.status()
return None
return await cls._job(pool, job_id).info()
38 changes: 37 additions & 1 deletion backend/src/workers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,42 @@ async def clip_ready_callback(
# Error will be caught by arq and task status will be updated
raise

async def merge_clips_job(
ctx: Dict[str, Any],
task_id: str,
clip_ids: list[str],
) -> Dict[str, Any]:
"""
Background worker task to merge clips.

The synchronous /tasks/{task_id}/clips/merge endpoint blocks the HTTP
request for the full ffmpeg concat-encode duration, which routinely
exceeds the ALB idle timeout (60s default, 300s after the band-aid
bump) and surfaces as a 504 to the caller. This worker variant is
enqueued by /tasks/{task_id}/clips/merge_async and polled via
/tasks/{task_id}/clips/merge_jobs/{job_id} so callers never hold an
HTTP connection open for the encode.

Returns the same dict shape as TaskService.merge_clips so arq's
job result storage carries the merged_clip_id straight to the poller.
"""
from ..database import AsyncSessionLocal
from ..runtime_settings import load_runtime_settings_cache
from ..services.task_service import TaskService

set_trace_id(f"merge-{task_id}")
logger.info(f"Worker merging {len(clip_ids)} clips for task {task_id}")

async with AsyncSessionLocal() as db:
await load_runtime_settings_cache(db)
task_service = TaskService(db)
result = await task_service.merge_clips(task_id, clip_ids)
logger.info(
f"Merge complete task={task_id} merged_clip_id={result.get('clip_id')}"
)
return result


# Worker configuration for arq
class WorkerSettings:
"""Configuration for arq worker."""
Expand All @@ -128,7 +164,7 @@ class WorkerSettings:
config = Config()

# Functions to run
functions = [process_video_task]
functions = [process_video_task, merge_clips_job]
queue_name = "supoclip_tasks"

# Redis settings from environment
Expand Down
Loading
Loading