feat(processing_services): v2 worker mode for minimal stub#1252
Conversation
✅ Deploy Preview for antenna-preview ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Deploy Preview for antenna-ssec ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
mihow
left a comment
There was a problem hiding this comment.
See my inline comments. Make it clear in the PR description and/or design doc what mirrors or diverges from the implementation in the AMI Data Companion worker. Also review this previous implementation for adding a worker to the docker compose stack, but for connecting to Celery directly. We will stick with the NATS / API version, but I'm curious for you to read the details of this implementation and write up a comparison. #1011
…, simpler loop Apply feedback from PR #1252 review: - Consolidate schemas into `processing_services/minimal/api/schemas.py`. v1 push and v2 worker now share a single source of truth; the separate `worker/schemas.py` mirror is removed. Client/runner/register/loop use real Pydantic types in signatures (PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest). - Move defaults into `processing_services/minimal/.env.dev`. Strip the hard-coded `os.environ.get(..., "<default>")` fallbacks in register.py and worker_main.py; replace with `os.environ[...]`. The inline environment block in docker-compose.yml becomes `env_file:`. - Replace the loop's job→slug reverse-lookup with per-slug iteration: the outer loop variable IS the slug, so `_slug_for_job` is gone. - Fix register.py docstring: the PS is identified by the Authorization header's user (main) or the API key (#1194), not by `processing_service_name`. That field just labels the DB row `get_or_create(name=...)` lands on. - Import pipelines directly in register.py instead of HTTP-GETting /info from the co-located FastAPI. Removes the FastAPI readiness wait entirely, and means register.py works in MODE=worker (where FastAPI isn't running at all). Co-Authored-By: Claude <noreply@anthropic.com>
|
Claude says: Follow-up in
Live-stack e2e is up next. |
Adds a design document for extending processing_services/minimal/ with a v2 worker mode so agents and CI can exercise the pull path without spinning up the heavyweight ami-data-companion worker. Co-Authored-By: Claude <noreply@anthropic.com>
Extends processing_services/minimal/ to support Antenna's v2 pull/async/worker
processing-service paradigm alongside the existing v1 push mode. One container
image, three modes via the MODE env var:
MODE=api v1 FastAPI only (unchanged, CI default)
MODE=worker v2 HTTP poll loop (reserve tasks -> run stub pipeline -> post results)
MODE=api+worker both (local dev default)
The worker talks HTTP-only to Antenna's job queue API (POST /jobs/{id}/tasks/,
POST /jobs/{id}/result/), matching the ADC contract. Pipelines are the same
stubs used by /process (ConstantPipeline, RandomDetectionRandomSpeciesPipeline),
so v1 and v2 produce identical detections.
Out-of-the-box automation:
- ami/main/management/commands/ensure_default_project.py: idempotent bootstrap
that creates the default superuser + "Default Project" on Django startup.
Guarded by ENSURE_DEFAULT_PROJECT=1, set in .envs/.local/.django and
.envs/.ci/.django.
- compose/local/django/start runs it before starting gunicorn.
- processing_services/minimal/register.py self-provisions a ProcessingService
and registers the stub pipelines with the default project. Supports user
token auth (current main) and has TODOs for API-key auth (PR #1194).
- processing_services/docker-compose.yml sets MODE=api+worker by default.
This unblocks agent-driven e2e work on the async_api path without needing to
stand up the external ami-data-companion worker.
Design doc: docs/claude/planning/2026-04-17-minimal-worker-design.md
Co-Authored-By: Claude <noreply@anthropic.com>
…, simpler loop Apply feedback from PR #1252 review: - Consolidate schemas into `processing_services/minimal/api/schemas.py`. v1 push and v2 worker now share a single source of truth; the separate `worker/schemas.py` mirror is removed. Client/runner/register/loop use real Pydantic types in signatures (PipelineProcessingTask, PipelineTaskResult, ProcessingServiceClientInfo, AsyncPipelineRegistrationRequest). - Move defaults into `processing_services/minimal/.env.dev`. Strip the hard-coded `os.environ.get(..., "<default>")` fallbacks in register.py and worker_main.py; replace with `os.environ[...]`. The inline environment block in docker-compose.yml becomes `env_file:`. - Replace the loop's job→slug reverse-lookup with per-slug iteration: the outer loop variable IS the slug, so `_slug_for_job` is gone. - Fix register.py docstring: the PS is identified by the Authorization header's user (main) or the API key (#1194), not by `processing_service_name`. That field just labels the DB row `get_or_create(name=...)` lands on. - Import pipelines directly in register.py instead of HTTP-GETting /info from the co-located FastAPI. Removes the FastAPI readiness wait entirely, and means register.py works in MODE=worker (where FastAPI isn't running at all). Co-Authored-By: Claude <noreply@anthropic.com>
- Add "Comparison with prior/external implementations" section covering what the stub mirrors vs. diverges from the ADC worker, and why the NATS/API path was picked over the Celery-direct path in PR #1011. - Update the env-var table to reflect the `.env.dev` file as the single source of defaults (no code fallbacks). - Update directory layout and poll-loop pseudocode to match the current per-slug iteration (no job→slug reverse-lookup). Co-Authored-By: Claude <noreply@anthropic.com>
…name my_slugs Follow-up to the previous review round: v2-only classes go in worker/schemas.py so the v1 push path doesn't have to know about pull-mode types. The v1 schemas and v1/v2 shared types (PipelineResultsResponse, PipelineConfigResponse, SourceImageResponse, ...) stay in api/schemas.py. Also rename the awkward `my_slugs` in loop.py. The list comes directly from `pipeline_choices` (the dict of stub pipelines exported by api.api), so iterate it directly instead of copying keys into a local variable. Co-Authored-By: Claude <noreply@anthropic.com>
The /jobs/ endpoint's `pipeline` filter is a ModelChoiceFilter expecting a DB id, so slug values get rejected with 400 "Select a valid choice". The slug-based alias exposed by JobFilterSet is `pipeline__slug`. Caught by running the minimal worker end-to-end against a live Antenna stack — the poll loop was logging 400 warnings every WORKER_POLL_INTERVAL. After the fix, two async_api test jobs (pipelines `constant` and `random-detection-random-species`) flow through worker → reserve tasks → process → submit results → NATS ACK → Job.SUCCESS. Co-Authored-By: Claude <noreply@anthropic.com>
…ng [skip ci] The docstring referenced processing_services/.env.example, which does not exist; the real file is processing_services/minimal/.env.dev. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…efault_project Make the local/CI bootstrap genuinely zero-config for the minimal v2 worker. The worker opens each image and reads its pixel dimensions, so it needs reachable images, not path-only rows. ensure_default_project now seeds a small object-store-backed "Default Collection" into the default project by reusing the test fixtures (generate images -> object store -> sync captures). Idempotent on the collection (skips when a non-empty "Default Collection" already exists — keyed on the collection, not on any source images, since a project may have images from other sources yet still lack the collection the worker needs). Best-effort (a failure such as no object store is logged and swallowed so it never breaks startup), and opt-out via --skip-seed. Adds tests for the seed, idempotency, and the skip flag. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Claude says: Rebased onto current main (was 56 behind; clean except one Zero-config seed. E2E result (local,
So the full v2 pull lifecycle is exercised end-to-end without the heavyweight ADC worker. With the seed, a fresh stack boots straight into a processable state. This is ready to come out of draft for review whenever you like; the |
ffd0767 to
1e0da75
Compare
|
Claude says: Review for merging this as the foundational step — a correct, copy-able reference for the v2 (pull/async) API contract via a worker in the app stack. (Deeper lifecycle hardening — failure injection, concurrent-load + chaos — is intentionally out of scope here and tracked separately.) Should-fix before merge (cheap; they protect the contract this PR exists to demonstrate):
Notes / small doc adds (prevent misuse of the reference):
Already addressed: rebased onto current main (the Net: mergeable as the foundation. The only two I would gate on are #1 and #2 — both cheap, both protect the contract-reference value. |

Summary
Extends
processing_services/minimal/to support Antenna's v2 pull/async/worker paradigm alongside the existing v1 push mode, so agents and CI can exercise the NATS/Redis/Celery lifecycle without spinning up the external ami-data-companion (ADC) worker.One container, three modes via
MODEenv var:MODE=api— v1 FastAPI only (unchanged; CI default)MODE=worker— v2 HTTP poll loopMODE=api+worker— both (local dev default)Intention & context
Antenna supports two processing-service paradigms:
PipelineRequestto/process. Good for interactive single-image inference,/api/v2/docs/schema exposure, and admin smoke tests.POST /api/v2/jobs/{id}/tasks/, process, andPOST /api/v2/jobs/{id}/result/. Antenna proxies NATS internally so workers can live behind firewalls.Today the only v2 implementation is the external ami-data-companion (heavyweight: conda + torch + CUDA warmup). This PR adds a deterministic stub analogous to what
minimal/already is for v1.Design doc:
docs/claude/planning/2026-04-17-minimal-worker-design.md— includes explicit comparison to the ADC worker (what this mirrors vs. diverges) and to PR #1011 (the earlier Celery-direct attempt, and why NATS/API was chosen over it).What changed
Processing service
processing_services/minimal/start.sh— MODE-dispatching orchestrator with signal forwarding forTERM/INT. Inapi+workermode: FastAPI first (so/processstays available while register.py retries), then register.py, then the worker loop.processing_services/minimal/register.py— self-registration. Auth priority:ANTENNA_API_KEY→ANTENNA_API_AUTH_TOKEN→ login withANTENNA_USER/ANTENNA_PASSWORD. Pipelines are imported directly from theapimodule (no HTTP round-trip to self). Sends bothprocessing_service_name(main's serializer expects it as the label forProcessingService.get_or_create(name=...)) andclient_info(read by PR feat: API key auth and identity for processing services #1194).processing_services/minimal/worker_main.py— entry forMODE=worker. Reads the cached auth header that register.py drops at/tmp/antenna_auth_header, falls back to env-based login if absent.processing_services/minimal/worker/— three small modules:client.py(HTTP wrapper with retry),loop.py(per-slug poll),runner.py(task → result via the existingapi/pipelines.pystubs). No duplicated pipeline or schema code.processing_services/minimal/worker/schemas.py— v2-only worker classes (PipelineProcessingTask,PipelineTaskResult,PipelineResultsError,ProcessingServiceClientInfo,AsyncPipelineRegistrationRequest), importing the shared wire-format types (PipelineResultsResponse,PipelineConfigResponse, ...) fromapi/schemas.py. Keeping the pull-mode types out ofapi/schemas.pymeans the v1 FastAPI side stays free of v2 types.processing_services/minimal/.env.dev— dev defaults for MODE, Antenna target, auth fallback, and worker tuning. Loaded viaenv_file:inprocessing_services/docker-compose.yml. No more duplicated defaults in Python code or inenvironment:blocks.processing_services/minimal/Dockerfile—ENV MODE=api(CI default),CMD ["/app/start.sh"].Out-of-the-box sequencing
ami/main/management/commands/ensure_default_project.py— idempotent bootstrap. Creates the default superuser (createsuperuserfromDJANGO_SUPERUSER_*env vars) and a named project (default"Default Project"). Looks up by name, not PK, so long-lived dev DBs that already have PK 1 claimed are safe.compose/local/django/start— runs the command whenENSURE_DEFAULT_PROJECT=1..envs/.local/.django,.envs/.ci/.django— setENSURE_DEFAULT_PROJECT=1and (for CI)DJANGO_SUPERUSER_*.Review-round changes (2026-04-17)
Applied @mihow's review feedback:
client.py,runner.py, andregister.pytake/return real Pydantic types rather thandicts. The v2-only worker classes live inworker/schemas.py(importing the shared wire-format types fromapi/schemas.py), so the v1 FastAPI side stays free of pull-mode types.processing_services/minimal/.env.dev.os.environ.get(..., "<default>")fallbacks inregister.pyandworker_main.pyreplaced withos.environ[...]; inlineenvironment:duplication indocker-compose.ymlremoved.loop.pyiteratesfor slug in my_slugsat the outer level sorunner.process_task(task, slug)gets the slug as a loop variable. The prior_slug_for_jobreverse-lookup is gone.processing_service_namejust labels the DB row viaget_or_create(name=...)) and the feat: API key auth and identity for processing services #1194 transition (API-key identifies the PS directly).The flow on a cold boot
ensure_default_projectcreates/finds a superuser + "Default Project".start.shruns FastAPI in the background.register.pyimports its pipelines fromapi.api, logs in to Antenna (or usesANTENNA_API_AUTH_TOKEN), resolves the project by name, and POSTs pipelines to/api/v2/projects/{id}/pipelines/.worker_main.pystarts pollingGET /api/v2/jobs/?pipeline=<slug>&status=STARTED&ids_only=trueonce per slug. When a STARTED async_api job matches, it reserves a batch viaPOST /jobs/{id}/tasks/, runs the stub pipeline, and POSTsPipelineTaskResultitems back.Errors during stub execution are captured and sent as
PipelineResultsErrorso the NATS ACK path still fires — important for exercising retry / stale-job-cutoff paths.What's verified vs. what still needs verification
docker compose -f processing_services/docker-compose.yml build ml_backend_minimal— builds cleanMODE=apicontainer — backward compatible with existing CI (same/process,/info,/livez,/readyz)MODE=api+workercontainer — boots, runs register.py, retries login when Antenna unreachablepython -m py_compile— all new Python compilesconstantorrandom-detection-random-species, confirm worker picks it up, processes it, andprocess_nats_pipeline_resultfires. Not yet run.ensure_default_projectidempotence across repeated Django bootsMODE=apivia ENV (same behavior, different surface)processing_service_name)Test plan (for reviewers)
Out of scope (called out in design doc)
example/toglobal_moths/complete— deferred follow-up.Related
example/service evolutionWhat has been tested
Unit (
ami/main/tests.py::TestEnsureDefaultProjectSeed, 3 passed): the seed creates a non-emptyDefault Collection, is idempotent (re-running does not re-seed — keyed on the collection), and--skip-seedcreates no images. Worker modules compile; lint green.Local end-to-end (deployed on this branch,
MODE=api+worker, no ami-data-companion): the worker registered both stub pipelines to the seededDefault Projectand polled. Oneasync_apijob (constant, 6 images) ran the full pull lifecycle — STARTED → NATS task claim (POST /jobs/{id}/tasks/) → stub inference →POST /jobs/{id}/result/×6 →process_nats_pipeline_result×6 → NATS ACK + consumer/stream teardown → SUCCESS in ~4s, producing 6 detections and 6 occurrences withprogress.errors == []. No code changes were needed.Auth used in this run: the worker authenticates via the
Authorizationheader (DRF).register.pyresolves it in priority order —ANTENNA_API_KEY(Authorization: Api-Key …, the #1194 path) →ANTENNA_API_AUTH_TOKEN(Authorization: Token …) → fallback login withANTENNA_USER/ANTENNA_PASSWORDat/api/v2/auth/token/login/to obtain a DRF token. With no key/token set, this run used the user/password →Tokenfallback; the resolved header is cached and reused by the poll loop.