Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ Required environment variables:

Optional tunables (with defaults):

- `CFDB_WORKFLOW_DURATION_CAP_S` — per-workflow wall-clock cap (default `1200`).
- `CFDB_WORKFLOW_DURATION_CAP_S` — per-workflow wall-clock cap (default `14400`, i.e. 4 h — sized for multi-hour preprocessing runs; lower it for fixture-bound dev).
- `CFDB_WORKFLOW_DISPATCH_WAIT_S` — how long `ensure_workflow` waits for a free worker before giving up (default `60`).
- `CFDB_WORKFLOW_HEARTBEAT_INTERVAL_S` — cadence at which the wool routine emits heartbeat events during quiet stages so the API can refresh `JobRecord.updated_at` (default `300`). The stale-reclaim threshold below is sized as `2 × heartbeat + safety_margin`; lowering this knob without also lowering the threshold widens the false-reclaim window.
- `CFDB_WORKFLOW_STALE_THRESHOLD_S` — `updated_at` age beyond which an active row is reclaimable (default `900`; sized as `2 × heartbeat_interval + safety_margin` so a single missed heartbeat does not falsely reclaim a healthy worker).
Expand All @@ -578,6 +578,23 @@ Optional tunables (with defaults):

Required tools on `PATH` for the **worker pool** (not the API): `samtools`, `bgzip`, `tabix`, `bcftools`, `gffread`, `bigBedToBed`. The `api` Docker image already installs all of these — the simplest local-dev / single-host deployment is to reuse `Dockerfile.api` as the worker image and override the `CMD` (or run the wool worker entrypoint via `python -m wool`). On the worker host, set `WORKFLOW_POOL_NAMESPACE` to match the API's value.

#### ECS Fargate profile

When the API runs on ECS Fargate (or LocalStack-backed dev that mirrors prod end-to-end), the lifespan switches from `LocalFsCache` + `LanDiscovery` to `S3Cache` + `EcsDiscovery` + `EcsProvisioner`. The selection is env-driven; with none of the variables below set the API runs the local PoC profile unchanged.

- `AWS_ENDPOINT_URL` — boto3 endpoint override. Unset in production (boto3 hits real AWS); set to `http://localstack:4566` (or similar) for LocalStack-backed dev. The same application code runs in both environments — only this variable differs.
- `AWS_REGION` — AWS region for the boto3 client (default `us-east-1`).
- `WORKFLOW_S3_BUCKET` — when set, the lifespan instantiates `S3Cache` instead of `LocalFsCache`. The bucket must already exist (creation is out of band). When unset, the API stays on the local filesystem cache.
- `WORKFLOW_S3_PREFIX` — optional key prefix the S3 backend prepends to every cache key (default empty). Lets a single bucket host multiple environments (`dev/`, `staging/`, `prod/`) without collisions.
- `ECS_CLUSTER` — ECS cluster name or ARN. Gates the ECS-backed provisioner and discovery profile; unset means the PoC profile stays on `LanDiscovery` with no provisioner.
- `ECS_WORKER_TASK_DEFINITION` — task definition for the worker container, as a family name (`cfdb-worker`) or `family:revision`. The provisioner passes it through to `RunTask` verbatim.
- `ECS_WORKER_TASK_FAMILY` — family used by `EcsDiscovery` to filter `ListTasks`. Defaults to `ECS_WORKER_TASK_DEFINITION` with any `:revision` suffix stripped; set explicitly only when the discovery family differs from the provisioner task-def family (rare).
- `ECS_WORKER_SUBNETS` — comma-separated awsvpc subnet IDs the worker ENIs land in. Required for the ECS profile; an empty list with `ECS_CLUSTER` set is a misconfiguration.
- `ECS_WORKER_SECURITY_GROUPS` — comma-separated awsvpc security group IDs. Optional — when empty, ECS applies the VPC default SG.
- `ECS_WORKER_ASSIGN_PUBLIC_IP` — `ENABLED` or `DISABLED` (default `DISABLED`). Production should leave this disabled and reach AWS via VPC endpoints; LocalStack accepts either value.

The worker container's `CMD` is `python -m cfdb.workflows.worker_main`. Worker-side knobs (gRPC port, health port, max lifetime, drain grace) are documented under `--help` on that command; their env vars are `CFDB_WORKER_GRPC_PORT`, `CFDB_WORKER_HEALTH_PORT`, `CFDB_WORKER_MAX_LIFETIME_SECONDS`, and `CFDB_WORKER_DRAIN_GRACE_SECONDS`. The worker task definition MUST declare a `healthCheck` against the gRPC port; without one ECS reports `healthStatus: UNKNOWN` indefinitely and the worker is never advertised to discovery.

#### Running a local worker pool

For single-host development, start a wool worker pool in a separate process before launching the API, with `WORKFLOW_POOL_NAMESPACE` matching what the API uses. The API connects via LAN discovery (zeroconf/mDNS) and dispatches workflows to whatever workers are publishing under that namespace. With no worker pool running, `/data` and `/index` requests for processable formats will hang on the dispatch retry budget (60s by default) before failing with `NoWorkersAvailable`.
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ classifiers = [
]
dependencies = [
"aiohttp",
"boto3",
"click",
"debugpy",
"fastapi",
Expand All @@ -30,7 +31,7 @@ readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"

[project.optional-dependencies]
dev = ["allpairspy", "debugpy", "httpx", "hypothesis", "mongomock-motor", "pytest-asyncio", "pytest-mock", "ruff"]
dev = ["allpairspy", "debugpy", "httpx", "hypothesis", "mongomock-motor", "moto[ecs,s3]", "pytest-asyncio", "pytest-mock", "ruff"]

[project.scripts]
cfdb = "cfdb.cli:cli"
Expand Down
135 changes: 133 additions & 2 deletions src/cfdb/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from motor.motor_asyncio import AsyncIOMotorDatabase

from cfdb.workflows import WORKFLOW_DURATION_CAP_S

if TYPE_CHECKING:
from cfdb.workflows.cache import CacheBackend
from cfdb.workflows.executor import JobExecutor
Expand Down Expand Up @@ -79,10 +81,139 @@ def _parse_int_env(name: str, default: int, *, minimum: int = 0) -> int:
#: same string; otherwise the API's discovery service won't see the
#: worker registrations and the pool will start with zero leasable
#: workers. The default ``"cfdb-workers"`` matches what the worker pool
#: needs to publish under; an ECS-aware variant may supplant LAN
#: discovery in a future PR.
#: needs to publish under. Ignored when the ECS-discovery profile is
#: active (see ``ECS_CLUSTER`` / ``ECS_WORKER_TASK_DEFINITION``); the
#: ECS path discovers workers by polling the ECS control plane directly
#: and does not need a shared zeroconf namespace.
WORKFLOW_POOL_NAMESPACE: Final = os.getenv("WORKFLOW_POOL_NAMESPACE", "cfdb-workers")

# AWS / ECS profile. These knobs are optional — when none of them are
# set the API runs the PoC profile (``LocalFsCache`` + ``LanDiscovery``
# + no worker provisioner) and behaves exactly as it did before the
# Fargate work landed. Production / LocalStack-backed dev sets the
# bucket + cluster + task-def + subnets to activate the ECS-backed
# profile; the same code path serves both because boto3 honors
# ``AWS_ENDPOINT_URL`` to redirect at LocalStack.

#: boto3 endpoint override. In production this is unset and boto3
#: targets real AWS endpoints; LocalStack-backed dev sets it to
#: ``http://localstack:4566`` so the same code talks to the local
#: container. Threaded through to ``_build_s3_client`` /
#: ``build_ecs_client`` (in ``cfdb.workflows.cache`` /
#: ``cfdb.workflows.provisioner``) via the boto3 ``Session`` default
#: chain, so no per-client wiring is needed here.
AWS_ENDPOINT_URL: Final = os.getenv("AWS_ENDPOINT_URL")

#: AWS region. Defaults to ``us-east-1`` so a missing ``AWS_REGION`` in
#: dev doesn't surface as an opaque boto3 ``NoRegionError`` at first
#: request — operators get a working default and override it in
#: production deployments.
AWS_REGION: Final = os.getenv("AWS_REGION", "us-east-1")

#: When set, the lifespan instantiates ``S3Cache`` instead of
#: ``LocalFsCache``. Unset means the API stays on the local
#: filesystem cache.
WORKFLOW_S3_BUCKET: Final = os.getenv("WORKFLOW_S3_BUCKET")

#: Optional key prefix the S3 backend prepends to every cache key.
#: Lets a single bucket host multiple environments (``dev/``,
#: ``staging/``, ``prod/``) without collisions.
WORKFLOW_S3_PREFIX: Final = os.getenv("WORKFLOW_S3_PREFIX", "")

#: ECS cluster name or ARN. Gates the ECS-backed provisioner and
#: discovery profile; unset means the PoC profile stays on
#: ``LanDiscovery`` with no provisioner.
ECS_CLUSTER: Final = os.getenv("ECS_CLUSTER")

#: ECS worker task definition. Accepts either a family name
#: (``cfdb-worker``) or a ``family:revision`` string. The provisioner
#: passes it through to ``RunTask`` verbatim; the discovery loop
#: strips any ``:revision`` suffix to derive its
#: ``family`` filter (see ``ECS_WORKER_TASK_FAMILY`` override below).
ECS_WORKER_TASK_DEFINITION: Final = os.getenv("ECS_WORKER_TASK_DEFINITION")


def _ecs_default_task_family() -> str | None:
"""Derive the discovery ``family`` filter from the task definition.

``RunTask`` accepts ``family[:revision]``; ``ListTasks`` accepts
only the family (no revision). The default split strips the
revision when present, with ``ECS_WORKER_TASK_FAMILY`` available as
an explicit override for environments that pin a non-default
family name.
"""
explicit = os.getenv("ECS_WORKER_TASK_FAMILY")
if explicit:
return explicit
if ECS_WORKER_TASK_DEFINITION:
return ECS_WORKER_TASK_DEFINITION.split(":", 1)[0]
return None


#: Family used by ``EcsDiscovery`` to filter ``ListTasks``. Derived
#: from ``ECS_WORKER_TASK_DEFINITION`` by default; set explicitly via
#: ``ECS_WORKER_TASK_FAMILY`` only when the discovery family differs
#: from the provisioner task-def family (rare).
ECS_WORKER_TASK_FAMILY: Final = _ecs_default_task_family()


def _parse_csv_env(name: str, default: str = "") -> list[str]:
"""Parse a comma-separated env var into a list of trimmed strings.

Empty strings are dropped so a trailing comma or double comma
doesn't propagate as an empty subnet/SG entry that boto3 would
later reject with a less informative error.
"""
raw = os.getenv(name, default)
return [item.strip() for item in raw.split(",") if item.strip()]


#: Awsvpc subnet IDs the worker ENIs land in. Required for the ECS
#: profile; an empty list with ``ECS_CLUSTER`` set is a misconfiguration
#: that the lifespan refuses to start under.
ECS_WORKER_SUBNETS: Final = _parse_csv_env("ECS_WORKER_SUBNETS")

#: Awsvpc security groups attached to the worker ENIs. Optional —
#: when empty, ECS applies the VPC default SG.
ECS_WORKER_SECURITY_GROUPS: Final = _parse_csv_env("ECS_WORKER_SECURITY_GROUPS")

_ASSIGN_PUBLIC_IP_VALUES = frozenset({"ENABLED", "DISABLED"})


def _parse_assign_public_ip(name: str, default: str) -> str:
"""Parse an ECS ``assignPublicIp`` env var with explicit validation.

ECS rejects anything other than ``ENABLED`` / ``DISABLED``; we
surface the misconfiguration at module-import time so the lifespan
doesn't get to Mongo + S3 init before tripping on it. Pairs with
:data:`cfdb.workflows.provisioner._ASSIGN_PUBLIC_IP_VALUES`.
"""
raw = os.getenv(name)
if raw is None or raw == "":
return default
if raw not in _ASSIGN_PUBLIC_IP_VALUES:
raise ImportError(
f"Environment variable {name}={raw!r} must be one of "
f"{sorted(_ASSIGN_PUBLIC_IP_VALUES)}"
)
return raw


#: Whether the worker ENI gets a public IPv4 address. Production
#: should leave this DISABLED and reach AWS via VPC endpoints;
#: LocalStack accepts either value.
ECS_WORKER_ASSIGN_PUBLIC_IP: Final = _parse_assign_public_ip(
"ECS_WORKER_ASSIGN_PUBLIC_IP", "DISABLED"
)

#: Per-deployment override for the workflow runtime cap. Threads
#: through to :class:`cfdb.workflows.executor.WoolExecutor` via the
#: lifespan. Defaults to :data:`cfdb.workflows.WORKFLOW_DURATION_CAP_S`
#: (the env-driven workflows-package default, currently 4 h).
WORKFLOW_DURATION_CAP_SECONDS: Final = _parse_int_env(
"WORKFLOW_DURATION_CAP_SECONDS", default=WORKFLOW_DURATION_CAP_S, minimum=1
)

db: AsyncIOMotorDatabase | None = None
cache: "CacheBackend | None" = None
executor: "JobExecutor | None" = None
Expand Down
Loading