Deploy workflow workers as an ephemeral ECS Fargate fleet — Closes #33#34
Draft
conradbzura wants to merge 7 commits into
Draft
Conversation
14 tasks
906c063 to
89f4685
Compare
c8585f1 to
b92c47a
Compare
abc625c to
94535ff
Compare
a92e44c to
b47e8cd
Compare
The ECS Fargate worker fleet needs boto3 at runtime for the S3 cache backend, the ECS provisioner, and the worker discovery loop. moto is added as a dev-only dependency so the unit tests can exercise the boto3 code paths without standing up LocalStack or hitting real AWS.
S3Cache is a CacheBackend implementation over boto3 with the same range-aware semantics as LocalFsCache (head_object, get_object with a Range header, upload_file, delete_object). Production points it at real S3; LocalStack-backed dev points it at the LocalStack endpoint via AWS_ENDPOINT_URL — only the endpoint differs. The backend supports an optional key prefix for sharing a single bucket across environments, rejects path-traversal segments, and yields an empty iterator on missing objects so router code can treat cache misses uniformly across backends.
EcsProvisioner is a thin boto3 RunTask wrapper that launches an ephemeral worker container per workflow, with awsvpc network configuration, concurrent-call dedup keyed on the workflow mutex key (so a burst of ensure_workflow calls on the same source doesn't fan out into multiple tasks), and a semaphore guarding the ~20 req/s RunTask rate limit. CapacityException covers both ClientError- and failures[].reason-shaped capacity / ENI errors so the executor can retry rather than hang. EcsDiscovery is a Wool DiscoveryLike poll-and-diff over list_tasks + describe_tasks: it filters on healthStatus HEALTHY, extracts the awsvpc IP, and emits worker-added / worker-dropped events to non- blocking subscribers via per-subscriber asyncio.Queue. State replay on subscribe means subscribers attached after startup observe the existing healthy fleet.
worker_main.py is the entrypoint baked into the worker container image. It starts a wool.LocalWorker so the API can dispatch routines to the task, exposes a tiny aiohttp /health endpoint that returns 503 during drain — so ECS marks the task unhealthy before stop_task kills the gRPC port — and installs SIGTERM/SIGINT handlers so a stop_task issued by the API or the Fargate scheduler shuts down cleanly. The idle-shutdown timeout is configurable so tasks self-terminate when their workflow completes. The entrypoint deliberately does not register itself with discovery; EcsDiscovery polls ECS directly and surfaces healthy tasks to the pool, so the worker only needs to be listening and HEALTHY.
Multi-hour preprocessing runs (samtools sort + index on a multi-GB BAM, tabix on a large interval file) routinely exceed the previous 1200 s (20 min) default and trip the asyncio.timeout in _run_workflow. The new 14400 s (4 h) default sizes the cap for the real workload profile without requiring every operator to set CFDB_WORKFLOW_DURATION_CAP_S explicitly. The cap remains env-driven so fixture-bound dev setups can lower it to keep test runs snappy. The accompanying docstring in executor.py is updated to match the new rationale.
ensure_workflow now requests a worker from an externally-injected
EcsProvisioner on a fresh claim, dedup-keyed on the workflow mutex so
two concurrent claims for the same source file share one RunTask and
one worker. The request is awaited in _run_workflow between
mark_running and the routine-stream open so a capacity / ENI /
throttling failure (surfaced as RetryableProvisionerError) routes
through the same FAILED terminal path as a stream-open failure, with
a "provisioner:" prefix preserved on the persisted error so the
operator can tell the two apart in /jobs/{id}.
The provisioner ctor arg defaults to None — the PoC dev profile that
relies on manually-started wool workers via LanDiscovery is unchanged.
The EcsProvisioner type is imported under TYPE_CHECKING so executor.py
imports stay boto3-free when the provisioner isn't in use.
TestWoolExecutorWithProvisioner covers the three observable shapes:
request issued on fresh claim with the workflow_key dedup_key, request
suppressed on attach to an already-claimed workflow, and capacity
failures landing as FAILED with the provisioner error preserved.
Add the AWS / ECS env config (AWS_ENDPOINT_URL, AWS_REGION, WORKFLOW_S3_BUCKET, WORKFLOW_S3_PREFIX, ECS_CLUSTER, ECS_WORKER_TASK_DEFINITION, ECS_WORKER_TASK_FAMILY, ECS_WORKER_SUBNETS, ECS_WORKER_SECURITY_GROUPS, ECS_WORKER_ASSIGN_PUBLIC_IP) to cfdb.api and switch the lifespan to pick the right runtime backends per env state. Three helpers gate the selection so each concern stays separable: _build_cache returns S3Cache when WORKFLOW_S3_BUCKET is set and LocalFsCache otherwise; _maybe_build_provisioner returns an EcsProvisioner when ECS_CLUSTER / task-def / subnets are all set and None otherwise; _build_discovery wraps EcsDiscovery in its async context (or yields LanDiscovery unchanged) so the lifespan's WorkerPool block opens against either discovery with the same shape. ECS_WORKER_TASK_FAMILY defaults to ECS_WORKER_TASK_DEFINITION with any :revision suffix stripped — ListTasks only accepts the family, RunTask accepts family[:revision]. The bare PoC profile (no AWS env set) keeps producing LocalFsCache + LanDiscovery + no provisioner, identical to the path before this change. The EXDEV cross-filesystem check is now gated on the LocalFsCache branch since S3 has no rename-atomicity precondition. Startup log reports the resolved cache / discovery / provisioner types instead of the LAN namespace so operators can see at a glance which profile activated.
b47e8cd to
950686b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add the runtime application substrate the workflow subsystem needs to run on the ECS Fargate fleet outlined in #32: an
EcsProvisionerthat launches ephemeral worker containers per workflow viaRunTask, anEcsDiscoverypoll-and-diff loop overListTasks+DescribeTasksthat surfaces healthy tasks to Wool's worker pool, anS3Cachebackend that mirrorsLocalFsCachesemantics over boto3, a worker-container entrypoint, and a workflow heartbeat that lets multi-hour preprocessing runs survive the stale-reclaim predicate. The same boto3-backed code targets real AWS in production and LocalStack in dev — onlyAWS_ENDPOINT_URLdiffers.Two
WoolExecutordefaults move with the multi-hour profile:DEFAULT_WORKFLOW_DURATION_CAP_SECONDSrises from 20 min to 4 h (env-driven), and_NO_WORKERS_RETRY_ATTEMPTSwidens from 5 to 60 to cover the Fargate cold-start budget.STALE_WORKFLOW_THRESHOLDdrops from 1 h to 15 min — paired with a 5-min heartbeat sidecar in the executor, that catches actually-dead workers fast without false-positive reclaiming legitimate long sorts.Scope intentionally narrows to the runtime application code so it lands self-contained with full unit coverage. Defer for follow-up PRs once the runtime classes have integration coverage and a deploy pipeline exists: LocalStack
docker-compose.yml, CloudFormation worker task-def updates, multi-stage worker Dockerfile + SOCI index, README documentation, and LocalStack-backed integration tests.Closes #33
Proposed changes
S3 cache backend
Add
S3Cache(src/cfdb/workflows/cache.py) as aCacheBackendover boto3 withhead_object/get_object(withRange) /upload_file/delete_object. Support an optional key prefix for sharing a single bucket across environments, reject path-traversal segments, and yield an empty iterator on missing objects to matchLocalFsCachesemantics. Centralize client construction inbuild_s3_clientso production, LocalStack-backed dev, and unit tests all produce the same shape of client — only the endpoint differs.ECS provisioner and worker discovery
Add
EcsProvisioner(src/cfdb/workflows/provisioner.py) wrappingRunTaskwith awsvpc network configuration. Dedupe concurrent calls keyed on the workflow mutex key so a burst ofensure_workflowcalls on the same source does not fan out into multiple tasks, and guard the ~20 req/sRunTaskrate limit with a semaphore.CapacityExceptioncovers bothClientError- andfailures[].reason-shaped capacity / ENI errors so the executor can surface them as retryable rather than hanging.Add
EcsDiscovery(src/cfdb/workflows/discovery.py) implementing Wool'sDiscoveryLikeprotocol with a poll-and-diff loop overlist_tasks+describe_tasks. Filter onhealthStatus: HEALTHY, extract the awsvpc IP, and emitworker-added/worker-droppedevents to non-blocking subscribers via per-subscriberasyncio.Queue. Replay state on subscribe so subscribers attached after startup observe the existing healthy fleet.Worker container entrypoint
Add
worker_main.py(src/cfdb/workflows/worker_main.py) startingwool.LocalWorker, exposing a tiny aiohttp/healthendpoint that returns 503 during drain so ECS marks the task unhealthy beforestop_taskkills gRPC, installing SIGTERM/SIGINT handlers, and self-terminating after a configurable idle timeout. Deliberately omit discovery registration —EcsDiscoverypolls ECS directly, so the worker only needs to bind its gRPC port and stay HEALTHY.Workflow heartbeat
Add
heartbeat_workflow(src/cfdb/workflows/lock.py) bumpingupdated_aton an active job record so a multi-hour run does not tripclaim_workflow's stale-reclaim predicate. Fence the update on an active status so a heartbeat against a terminal or stale-reclaimed record is a silent no-op — the executor uses theFalsereturn as a stop signal. ShortenSTALE_WORKFLOW_THRESHOLDfrom 1 h to 15 min and exposeDEFAULT_HEARTBEAT_INTERVAL_SECONDSas the cadence shared between executor and tests.Executor integration
Extend
WoolExecutorwithprovisionerandheartbeat_interval_secondsctor args. Invoke the provisioner fromensure_workflowon a fresh claim and surfaceCapacityExceptionas a terminalFAILEDjob with a retryable error string. Spawn a heartbeat sidecar task that bumpsupdated_atwhile the workflow body runs and cancel it infinallybefore the terminal release so the next tick cannot observe an active record while status is flipping. RaiseDEFAULT_WORKFLOW_DURATION_CAP_SECONDSfrom 1200 to 4 h and widen_NO_WORKERS_RETRY_ATTEMPTSfrom 5 to 60 (~60 s window) for Fargate cold-start. Drop theLambdaExecutormention from theJobExecutorABC docstring.API lifespan wiring
Add env vars to
src/cfdb/api/__init__.py:AWS_ENDPOINT_URL,AWS_REGION,WORKFLOW_S3_BUCKET/PREFIX,ECS_CLUSTER,ECS_WORKER_TASK_DEFINITION,ECS_WORKER_SUBNETS,ECS_WORKER_SECURITY_GROUPS,ECS_WORKER_ASSIGN_PUBLIC_IP,WORKFLOW_DURATION_CAP_SECONDS. Have the lifespan pickS3CacheoverLocalFsCachewhenWORKFLOW_S3_BUCKETis set and build anEcsProvisionerwhen ECS env config is present (helper_maybe_build_provisioner). Leave the bare PoC profile unchanged: with none of the new env set, the API continues to use the local filesystem cache plus the in-processWorkerPool.Build dependencies
Add
boto3to runtime deps andmoto[ecs,s3]to dev deps so the unit tests can exercise boto3 paths without LocalStack or real AWS.Test cases
TestS3CacheS3Cacheconstructed with empty bucketValueErroris raisedTestS3Cacheheadis awaitedNoneis returnedTestS3Cacheputheadis awaited for the same keyTestS3Cachegetis iterated without a byte rangeTestS3Cachegetis iterated with an inclusive byte rangeTestS3Cachegetis iteratedTestS3Cachedeleteis awaitedTrueis returned and the object is goneTestS3Cachedeleteis awaitedFalseis returnedTestS3Cache..segmentputis awaitedValueErroris raisedTestS3CacheS3Cacheconstructed with a key prefixTestEcsProvisionerValueErroris raisedTestEcsProvisionerValueErroris raisedTestEcsProvisionerValueErroris raisedTestEcsProvisionerrequestis awaited onceRunTaskis called once with the expected payloadTestEcsProvisionerrequestcalls await in parallelRunTaskis invoked once and all callers receive the same ARNTestEcsProvisionerrequestcalls await in parallelRunTaskis invoked once per keyTestEcsProvisionerClientErrorfor capacityrequestis awaitedCapacityExceptionis raisedTestEcsProvisionerRunTaskresponse with capacity infailures[]requestis awaitedCapacityExceptionis raisedTestEcsDiscoveryValueErroris raisedTestEcsDiscoveryValueErroris raisedTestEcsDiscovery_poll_onceis awaitedworker-addedevents are emitted with the expected addressesTestEcsDiscovery_poll_onceis awaitedTestEcsDiscovery_poll_onceis awaited againworker-droppedis emitted for the gone workerTestEcsDiscovery_poll_onceis awaited twiceTestEcsDiscoveryaddeventTestParseArgs_parse_argsruns without argsTestParseArgs_parse_argsruns without argsTestParseArgs_parse_argsruns with the flagsTestHeartbeatWorkflowupdated_athas been backdatedheartbeat_workflowis awaitedTrueis returned andupdated_atadvancesTestHeartbeatWorkflowCOMPLETEDheartbeat_workflowis awaitedFalseis returnedTestStaleWorkflowThresholdSTALE_WORKFLOW_THRESHOLDconstantTestWoolExecutorWithProvisionerensure_workflowis awaited for a fresh keyTestWoolExecutorWithProvisionerCapacityExceptionensure_workflowis awaitedFAILEDwith a "capacity"-prefixed errorTestWoolExecutorHeartbeatensure_workflowis awaited and the test polls during the runupdated_atadvances past the original claim timestamp