From b19ae4a73d86cdd5f3a40d12eb0196f83e9c6df0 Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Wed, 8 Apr 2026 13:35:31 +0200 Subject: [PATCH 1/6] Simplify task API --- .../organization/project/branch/__init__.py | 2 +- src/api/organization/project/branch/tasks.py | 61 +++++++------------ 2 files changed, 23 insertions(+), 40 deletions(-) diff --git a/src/api/organization/project/branch/__init__.py b/src/api/organization/project/branch/__init__.py index f53c5c8d..d5ea0972 100644 --- a/src/api/organization/project/branch/__init__.py +++ b/src/api/organization/project/branch/__init__.py @@ -100,7 +100,7 @@ get_control_in_progress_status, ) from .resize_tasks import dispatch_resize -from .tasks import task_api +from .tasks import api as task_api api = APIRouter(tags=["branch"]) diff --git a/src/api/organization/project/branch/tasks.py b/src/api/organization/project/branch/tasks.py index 7853a2f4..bd9c6da6 100644 --- a/src/api/organization/project/branch/tasks.py +++ b/src/api/organization/project/branch/tasks.py @@ -1,12 +1,5 @@ -"""Branch task list/detail endpoints. - -Exposes Celery task state (resize and control) under: - GET .../branches/{branch_id}/tasks - GET .../branches/{branch_id}/tasks/{task_id} -""" - from datetime import datetime -from typing import Any +from typing import Any, Literal from uuid import UUID from fastapi import APIRouter, HTTPException @@ -17,7 +10,9 @@ from .control_tasks import perform_control from .resize_tasks import finalize_resize -task_api = APIRouter(tags=["branch"]) +api = APIRouter(tags=["branch"]) + +TaskType = Literal["control", "resize"] _CELERY_STATE_TO_STATUS: dict[str, str] = { "PENDING": "PENDING", @@ -38,40 +33,29 @@ class BranchTaskPublic(BaseModel): date_done: datetime | None -def _build_resize_task_public(task_id: UUID) -> BranchTaskPublic: - result = finalize_resize.AsyncResult(str(task_id)) - state = result.state - status = _CELERY_STATE_TO_STATUS.get(state, state) - kwargs: dict = result.kwargs or {} - return BranchTaskPublic( - id=task_id, - task_type="resize", - status=status, - parameters=kwargs.get("effective_parameters", {}), - result=result.result if state == "SUCCESS" else None, - error=str(result.traceback) if state == "FAILURE" and result.traceback else None, - date_done=result.date_done, - ) - +def _build_task_public(task_id: UUID, task_type: TaskType) -> BranchTaskPublic: + tasks = { + "control": perform_control, + "resize": finalize_resize, + } + result = tasks[task_type].AsyncResult(str(task_id)) -def _build_control_task_public(task_id: UUID) -> BranchTaskPublic: - result = perform_control.AsyncResult(str(task_id)) state = result.state status = _CELERY_STATE_TO_STATUS.get(state, state) kwargs: dict = result.kwargs or {} - action = kwargs.get("action", "control") + task_type = task_type if task_type != "control" else kwargs["action"] return BranchTaskPublic( id=task_id, - task_type=action, + task_type=task_type, status=status, - parameters={"action": action}, + parameters=kwargs.get("effective_parameters", {}), result=result.result if state == "SUCCESS" else None, error=str(result.traceback) if state == "FAILURE" and result.traceback else None, date_done=result.date_done, ) -@task_api.get( +@api.get( "/", name="organizations:projects:branch:tasks:list", response_model=list[BranchTaskPublic], @@ -82,15 +66,14 @@ async def list_tasks( _project: ProjectDep, branch: BranchDep, ) -> list[BranchTaskPublic]: - tasks = [] - if branch.resize_task_id is not None: - tasks.append(_build_resize_task_public(branch.resize_task_id)) - if branch.control_task_id is not None: - tasks.append(_build_control_task_public(branch.control_task_id)) - return tasks + tasks: list[tuple[UUID | None, TaskType]] = [ + (branch.control_task_id, "control"), + (branch.resize_task_id, "resize"), + ] + return [_build_task_public(task_id, task_type) for task_id, task_type in tasks if task_id is not None] -@task_api.get( +@api.get( "/{task_id}", name="organizations:projects:branch:tasks:detail", response_model=BranchTaskPublic, @@ -103,7 +86,7 @@ async def get_task( task_id: UUID, ) -> BranchTaskPublic: if branch.resize_task_id == task_id: - return _build_resize_task_public(task_id) + return _build_task_public(task_id, "resize") if branch.control_task_id == task_id: - return _build_control_task_public(task_id) + return _build_task_public(task_id, "control") raise HTTPException(status_code=404, detail="Task not found") From 286aa38c69481a99ff55c358fc532e89600024ca Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Wed, 8 Apr 2026 14:07:14 +0200 Subject: [PATCH 2/6] Reorganize task module structure --- src/api/organization/project/branch/__init__.py | 4 ++-- .../project/branch/{tasks.py => tasks/__init__.py} | 12 ++++++++---- .../branch/{control_tasks.py => tasks/_control.py} | 14 +++++++------- .../branch/{resize_tasks.py => tasks/_resize.py} | 14 +++++++------- src/worker/__init__.py | 3 +-- 5 files changed, 25 insertions(+), 22 deletions(-) rename src/api/organization/project/branch/{tasks.py => tasks/__init__.py} (84%) rename src/api/organization/project/branch/{control_tasks.py => tasks/_control.py} (93%) rename src/api/organization/project/branch/{resize_tasks.py => tasks/_resize.py} (90%) diff --git a/src/api/organization/project/branch/__init__.py b/src/api/organization/project/branch/__init__.py index d5ea0972..a38b8d51 100644 --- a/src/api/organization/project/branch/__init__.py +++ b/src/api/organization/project/branch/__init__.py @@ -94,12 +94,12 @@ from ....settings import get_settings as get_api_settings from .api_keys import api as api_key_api from .auth import api as auth_api -from .control_tasks import ( +from .tasks import ( _CONTROL_TO_POWER_STATE, dispatch_control, + dispatch_resize, get_control_in_progress_status, ) -from .resize_tasks import dispatch_resize from .tasks import api as task_api api = APIRouter(tags=["branch"]) diff --git a/src/api/organization/project/branch/tasks.py b/src/api/organization/project/branch/tasks/__init__.py similarity index 84% rename from src/api/organization/project/branch/tasks.py rename to src/api/organization/project/branch/tasks/__init__.py index bd9c6da6..b52c14b2 100644 --- a/src/api/organization/project/branch/tasks.py +++ b/src/api/organization/project/branch/tasks/__init__.py @@ -5,10 +5,14 @@ from fastapi import APIRouter, HTTPException from pydantic import BaseModel -from ...._util import Forbidden, NotFound, Unauthenticated -from ....dependencies import BranchDep, OrganizationDep, ProjectDep -from .control_tasks import perform_control -from .resize_tasks import finalize_resize +from ....._util import Forbidden, NotFound, Unauthenticated +from .....dependencies import BranchDep, OrganizationDep, ProjectDep +from ._control import _CONTROL_TO_POWER_STATE as _CONTROL_TO_POWER_STATE +from ._control import dispatch_control as dispatch_control +from ._control import get_control_in_progress_status as get_control_in_progress_status +from ._control import perform_control +from ._resize import dispatch_resize as dispatch_resize +from ._resize import finalize_resize api = APIRouter(tags=["branch"]) diff --git a/src/api/organization/project/branch/control_tasks.py b/src/api/organization/project/branch/tasks/_control.py similarity index 93% rename from src/api/organization/project/branch/control_tasks.py rename to src/api/organization/project/branch/tasks/_control.py index 98466f32..06b068b5 100644 --- a/src/api/organization/project/branch/control_tasks.py +++ b/src/api/organization/project/branch/tasks/_control.py @@ -16,13 +16,13 @@ from sqlalchemy.exc import NoResultFound from ulid import ULID -from .....database import AsyncSessionLocal -from .....deployment import get_autoscaler_vm_identity -from .....deployment.health import query_deployment_status -from .....deployment.kubernetes.neonvm import Phase, PowerState, get_neon_vm, set_virtualmachine_power_state -from .....models.branch import BranchServiceStatus -from .....models.branch import lookup as branch_lookup -from .....worker import app +from ......database import AsyncSessionLocal +from ......deployment import get_autoscaler_vm_identity +from ......deployment.health import query_deployment_status +from ......deployment.kubernetes.neonvm import Phase, PowerState, get_neon_vm, set_virtualmachine_power_state +from ......models.branch import BranchServiceStatus +from ......models.branch import lookup as branch_lookup +from ......worker import app logger = logging.getLogger(__name__) diff --git a/src/api/organization/project/branch/resize_tasks.py b/src/api/organization/project/branch/tasks/_resize.py similarity index 90% rename from src/api/organization/project/branch/resize_tasks.py rename to src/api/organization/project/branch/tasks/_resize.py index 95c13867..1af9de99 100644 --- a/src/api/organization/project/branch/resize_tasks.py +++ b/src/api/organization/project/branch/tasks/_resize.py @@ -14,13 +14,13 @@ from celery import chord from ulid import ULID -from .....database import AsyncSessionLocal -from .....deployment.health import collect_branch_service_health, derive_branch_status_from_services -from .....deployment.resize import resize_cpu_memory, resize_database_pvc, resize_iops, resize_storage_pvc -from .....models.branch import Branch -from .....models.resources import ResourceLimitsPublic -from .....worker import app -from ...._util.resourcelimit import apply_branch_resource_allocation +from ......database import AsyncSessionLocal +from ......deployment.health import collect_branch_service_health, derive_branch_status_from_services +from ......deployment.resize import resize_cpu_memory, resize_database_pvc, resize_iops, resize_storage_pvc +from ......models.branch import Branch +from ......models.resources import ResourceLimitsPublic +from ......worker import app +from ....._util.resourcelimit import apply_branch_resource_allocation logger = logging.getLogger(__name__) diff --git a/src/worker/__init__.py b/src/worker/__init__.py index 0089540c..148c4322 100644 --- a/src/worker/__init__.py +++ b/src/worker/__init__.py @@ -22,6 +22,5 @@ class Settings(BaseSettings): app.conf.beat_schedule_filename = "/tmp/celerybeat-schedule" # Register tasks — must be imported after `app` is defined. -from ..api.organization.project.branch import control_tasks as _api_control_tasks # noqa: E402, F401 -from ..api.organization.project.branch import resize_tasks as _api_resize_tasks # noqa: E402, F401 +from ..api.organization.project.branch import tasks as _api_branch_tasks # noqa: E402, F401 from ..deployment import resize as _deployment_resize # noqa: E402, F401 From 6ad5cd2b3e55646932b7f1263fb23a16df5b322d Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Wed, 1 Apr 2026 13:37:39 +0200 Subject: [PATCH 3/6] Convert branch deletion to celery job --- .../organization/project/branch/__init__.py | 42 ++++----- .../project/branch/delete_tasks.py | 61 +++++++++++++ .../project/branch/tasks/__init__.py | 10 ++- src/deployment/delete.py | 54 ++++++++++++ src/models/branch.py | 3 +- .../b8e4c9f12d35_add_branch_delete_task_id.py | 28 ++++++ src/worker/__init__.py | 1 + tests/branches/test_delete.py | 85 +++++++++++++++++++ tests/conftest.py | 19 ++++- 9 files changed, 280 insertions(+), 23 deletions(-) create mode 100644 src/api/organization/project/branch/delete_tasks.py create mode 100644 src/deployment/delete.py create mode 100644 src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py create mode 100644 tests/branches/test_delete.py diff --git a/src/api/organization/project/branch/__init__.py b/src/api/organization/project/branch/__init__.py index a38b8d51..e35172bc 100644 --- a/src/api/organization/project/branch/__init__.py +++ b/src/api/organization/project/branch/__init__.py @@ -73,7 +73,7 @@ ) from .....models.resources import ResourceLimitsPublic, ResourceType from ...._util import Conflict, Forbidden, NotFound, Unauthenticated, url_path_for -from ...._util.backups import copy_branch_backup_schedules, delete_branch_backups, ensure_branch_pitr_schedule +from ...._util.backups import copy_branch_backup_schedules, ensure_branch_pitr_schedule from ...._util.resourcelimit import ( apply_branch_resource_allocation, check_available_resources_limits, @@ -97,6 +97,7 @@ from .tasks import ( _CONTROL_TO_POWER_STATE, dispatch_control, + dispatch_delete, dispatch_resize, get_control_in_progress_status, ) @@ -1682,31 +1683,34 @@ async def update( @instance_api.delete( "/", name="organizations:projects:branch:delete", - status_code=204, - responses={401: Unauthenticated, 403: Forbidden, 404: NotFound}, + status_code=202, + responses={401: Unauthenticated, 403: Forbidden, 404: NotFound, 400: {"description": "Delete already in progress"}}, ) async def delete( session: SessionDep, - _organization: OrganizationDep, - _project: ProjectDep, + request: Request, + organization: OrganizationDep, + project: ProjectDep, branch: BranchDep, ): - branch_id = branch.id - await _set_branch_status(session, branch, BranchServiceStatus.DELETING) - await delete_deployment(branch_id) - try: - await realm_admin("master").a_delete_realm(str(branch_id)) - except KeycloakError as exc: - if getattr(exc, "response_code", None) == 404: - logger.error("Keycloak realm not found for branch %s during delete; continuing.", branch_id, exc_info=True) - else: - raise - await delete_branch_provisioning(session, branch_id) - await delete_branch_backups(session, branch_id) - await session.delete(branch) + if branch.delete_task_id is not None: + raise HTTPException(status_code=400, detail="A delete operation is already in progress") + + task_id = dispatch_delete(str(branch.id)) + + branch.set_status(BranchServiceStatus.DELETING) + branch.delete_task_id = task_id await session.commit() - return Response(status_code=204) + task_url = url_path_for( + request, + "organizations:projects:branch:tasks:detail", + organization_id=await organization.awaitable_attrs.id, + project_id=await project.awaitable_attrs.id, + branch_id=await branch.awaitable_attrs.id, + task_id=task_id, + ) + return Response(status_code=202, headers={"Location": task_url}) @instance_api.post( diff --git a/src/api/organization/project/branch/delete_tasks.py b/src/api/organization/project/branch/delete_tasks.py new file mode 100644 index 00000000..0a9d32b0 --- /dev/null +++ b/src/api/organization/project/branch/delete_tasks.py @@ -0,0 +1,61 @@ +"""Celery chord tasks for async branch deletion. + +The chord dispatches three sub-tasks in parallel (K8s, Keycloak, backups) and +calls ``finalize_delete`` when all sub-tasks have settled. With +``task_chord_propagates = False`` the callback always fires, so the branch DB +record is removed regardless of individual sub-task outcomes (errors are logged +but do not abort the delete). +""" + +import logging +from uuid import UUID + +from asgiref.sync import async_to_sync +from celery import chord +from ulid import ULID + +from .....database import AsyncSessionLocal +from .....deployment.delete import delete_backup_snapshots, delete_k8s_deployment, delete_keycloak_realm +from .....models.branch import Branch +from .....worker import app +from ...._util.resourcelimit import delete_branch_provisioning + +logger = logging.getLogger(__name__) + + +async def _async_finalize_delete(job_results: list, branch_id: str) -> dict: + errors: list[str] = [] + for result in job_results: + if isinstance(result, Exception): + errors.append(str(result)) + logger.error("Delete sub-task error for branch %s: %s", branch_id, result) + + branch_ulid = ULID.from_str(branch_id) + async with AsyncSessionLocal() as session: + branch = await session.get(Branch, branch_ulid) + if branch is None: + logger.warning("Branch %s not found in finalize_delete; already deleted.", branch_id) + return {"errors": errors} + + await delete_branch_provisioning(session, branch_ulid, commit=False) + await session.delete(branch) + await session.commit() + + return {"errors": errors} + + +@app.task(name="simplyblock.vela.branch.delete.finalize") +def finalize_delete(job_results: list, branch_id: str) -> dict: + """Chord callback: delete provisioning records and the branch DB row.""" + return async_to_sync(_async_finalize_delete)(job_results, branch_id) + + +def dispatch_delete(branch_id: str) -> UUID: + """Dispatch the delete chord; return the chord result UUID.""" + jobs = [ + delete_k8s_deployment.s(branch_id), + delete_keycloak_realm.s(branch_id), + delete_backup_snapshots.s(branch_id), + ] + result = chord(jobs)(finalize_delete.s(branch_id=branch_id)) + return UUID(result.id) diff --git a/src/api/organization/project/branch/tasks/__init__.py b/src/api/organization/project/branch/tasks/__init__.py index b52c14b2..028d4aac 100644 --- a/src/api/organization/project/branch/tasks/__init__.py +++ b/src/api/organization/project/branch/tasks/__init__.py @@ -11,12 +11,13 @@ from ._control import dispatch_control as dispatch_control from ._control import get_control_in_progress_status as get_control_in_progress_status from ._control import perform_control +from ._delete import finalize_delete from ._resize import dispatch_resize as dispatch_resize from ._resize import finalize_resize api = APIRouter(tags=["branch"]) -TaskType = Literal["control", "resize"] +TaskType = Literal["control", "delete", "resize"] _CELERY_STATE_TO_STATUS: dict[str, str] = { "PENDING": "PENDING", @@ -40,6 +41,7 @@ class BranchTaskPublic(BaseModel): def _build_task_public(task_id: UUID, task_type: TaskType) -> BranchTaskPublic: tasks = { "control": perform_control, + "delete": finalize_delete, "resize": finalize_resize, } result = tasks[task_type].AsyncResult(str(task_id)) @@ -48,11 +50,12 @@ def _build_task_public(task_id: UUID, task_type: TaskType) -> BranchTaskPublic: status = _CELERY_STATE_TO_STATUS.get(state, state) kwargs: dict = result.kwargs or {} task_type = task_type if task_type != "control" else kwargs["action"] + parameters = {k: v for k, v in kwargs.items() if k not in {"branch_id", "action"}} return BranchTaskPublic( id=task_id, task_type=task_type, status=status, - parameters=kwargs.get("effective_parameters", {}), + parameters=parameters, result=result.result if state == "SUCCESS" else None, error=str(result.traceback) if state == "FAILURE" and result.traceback else None, date_done=result.date_done, @@ -72,6 +75,7 @@ async def list_tasks( ) -> list[BranchTaskPublic]: tasks: list[tuple[UUID | None, TaskType]] = [ (branch.control_task_id, "control"), + (branch.delete_task_id, "delete"), (branch.resize_task_id, "resize"), ] return [_build_task_public(task_id, task_type) for task_id, task_type in tasks if task_id is not None] @@ -93,4 +97,6 @@ async def get_task( return _build_task_public(task_id, "resize") if branch.control_task_id == task_id: return _build_task_public(task_id, "control") + if branch.delete_task_id == task_id: + return _build_task_public(task_id, "delete") raise HTTPException(status_code=404, detail="Task not found") diff --git a/src/deployment/delete.py b/src/deployment/delete.py new file mode 100644 index 00000000..35e79796 --- /dev/null +++ b/src/deployment/delete.py @@ -0,0 +1,54 @@ +"""Celery sub-tasks for async branch deletion (deployment layer). + +These tasks handle the infrastructure-level teardown steps that can run in +parallel: K8s namespace/VM deletion, Keycloak realm removal, and backup +snapshot cleanup. The API layer's ``delete_tasks.finalize_delete`` chord +callback then removes the DB record after all sub-tasks have settled. +""" + +import logging + +from asgiref.sync import async_to_sync +from keycloak.exceptions import KeycloakError +from ulid import ULID + +from ..api._util.backups import delete_branch_backups +from ..api.keycloak import realm_admin +from ..database import AsyncSessionLocal +from ..worker import app +from . import delete_deployment + +logger = logging.getLogger(__name__) + + +async def _delete_keycloak(branch_id: ULID) -> None: + try: + await realm_admin("master").a_delete_realm(str(branch_id)) + except KeycloakError as exc: + if getattr(exc, "response_code", None) == 404: + logger.warning("Keycloak realm not found for branch %s; skipping.", branch_id) + else: + raise + + +async def _delete_snapshots(branch_id: ULID) -> None: + async with AsyncSessionLocal() as session: + await delete_branch_backups(session, branch_id) + + +@app.task(name="simplyblock.vela.deployment.delete.k8s") +def delete_k8s_deployment(branch_id: str) -> None: + """Delete the K8s namespace and associated VM for a branch.""" + async_to_sync(delete_deployment)(ULID.from_str(branch_id)) + + +@app.task(name="simplyblock.vela.deployment.delete.keycloak") +def delete_keycloak_realm(branch_id: str) -> None: + """Delete the Keycloak realm for a branch (404 is treated as success).""" + async_to_sync(_delete_keycloak)(ULID.from_str(branch_id)) + + +@app.task(name="simplyblock.vela.deployment.delete.backups") +def delete_backup_snapshots(branch_id: str) -> None: + """Delete K8s volume snapshots for all backups belonging to a branch.""" + async_to_sync(_delete_snapshots)(ULID.from_str(branch_id)) diff --git a/src/models/branch.py b/src/models/branch.py index e1824306..70daee99 100644 --- a/src/models/branch.py +++ b/src/models/branch.py @@ -112,9 +112,10 @@ class Branch(AsyncAttrs, Model, table=True): sa_column=Column(JSONB, nullable=False, server_default=text("'{}'::jsonb")), ) pitr_enabled: bool = Field(default=False, sa_column=Column(Boolean, nullable=False, server_default=text("false"))) + control_task_id: uuid.UUID | None = None + delete_task_id: uuid.UUID | None = None resize_task_id: uuid.UUID | None = None db_port: int | None = None - control_task_id: uuid.UUID | None = None __table_args__ = (UniqueConstraint("project_id", "name", name="unique_branch_name_per_project"),) diff --git a/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py b/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py new file mode 100644 index 00000000..6be49b64 --- /dev/null +++ b/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py @@ -0,0 +1,28 @@ +"""Add branch delete_task_id + +Revision ID: b8e4c9f12d35 +Revises: a1b2c3d4e5f6 +Create Date: 2026-04-01 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b8e4c9f12d35' +down_revision: Union[str, Sequence[str], None] = 'a1b2c3d4e5f6' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.add_column('branch', sa.Column('delete_task_id', sa.Uuid(), nullable=True)) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_column('branch', 'delete_task_id') diff --git a/src/worker/__init__.py b/src/worker/__init__.py index 148c4322..f022af56 100644 --- a/src/worker/__init__.py +++ b/src/worker/__init__.py @@ -23,4 +23,5 @@ class Settings(BaseSettings): # Register tasks — must be imported after `app` is defined. from ..api.organization.project.branch import tasks as _api_branch_tasks # noqa: E402, F401 +from ..deployment import delete as _deployment_delete # noqa: E402, F401 from ..deployment import resize as _deployment_resize # noqa: E402, F401 diff --git a/tests/branches/test_delete.py b/tests/branches/test_delete.py new file mode 100644 index 00000000..2c28eb8b --- /dev/null +++ b/tests/branches/test_delete.py @@ -0,0 +1,85 @@ +"""Integration tests for async branch deletion. + +These tests verify: + - DELETE /branches/{id}/ returns 202 Accepted with a Location header + - A concurrent DELETE returns 400 while the first delete is in progress + - The branch is eventually gone (GET returns 404) after the task completes +""" + +import time + +import pytest +from conftest import BRANCH_TIMEOUT_SEC, wait_for_deletion + +pytestmark = pytest.mark.branch + +_POLL_INTERVAL = 10 + +_state: dict = {} + + +@pytest.fixture(scope="module") +def org(make_org): + return make_org("test-org-delete") + + +@pytest.fixture(scope="module") +def project(make_project, org): + return make_project(org, "test-project-delete") + + +@pytest.fixture(scope="module") +def branch_id(make_branch, org, project): + return make_branch(org, project, "test-branch-delete") + + +def test_delete_returns_202_with_location(client, org, project, branch_id): + r = client.delete(f"organizations/{org}/projects/{project}/branches/{branch_id}/") + assert r.status_code == 202 + assert "Location" in r.headers + _state["task_url"] = r.headers["Location"] + + +def test_task_listed_while_deleting(client, org, project, branch_id): + r = client.get(f"organizations/{org}/projects/{project}/branches/{branch_id}/tasks/") + if r.status_code == 404: + pytest.skip("Branch already deleted before task listing test could run") + assert r.status_code == 200 + tasks = r.json() + assert any(t["task_type"] == "delete" for t in tasks) + + +def test_concurrent_delete_returns_400(client, org, project, branch_id): + # While the branch has delete_task_id set the endpoint must reject a second delete. + # The branch may have been deleted already in fast CI environments (404 is also acceptable). + r = client.delete(f"organizations/{org}/projects/{project}/branches/{branch_id}/") + assert r.status_code in (400, 404) + + +def test_branch_gone_after_delete(client, org, project, branch_id): + # Poll until the branch returns 404 — the Celery task has completed and deleted it. + wait_for_deletion( + client, + f"organizations/{org}/projects/{project}/branches/{branch_id}/", + BRANCH_TIMEOUT_SEC, + ) + r = client.get(f"organizations/{org}/projects/{project}/branches/{branch_id}/") + assert r.status_code == 404 + + +def test_task_detail_unavailable_after_deletion(client): + task_url = _state.get("task_url") + if not task_url: + pytest.skip("No task URL stored from delete test") + # Once the branch is gone the task detail endpoint returns 404 (BranchDep lookup fails). + deadline = time.monotonic() + BRANCH_TIMEOUT_SEC + while True: + r = client.get(task_url, timeout=30) + if r.status_code == 404: + return + status = r.json().get("status") if r.status_code == 200 else None + if status in ("COMPLETED", "FAILED"): + return + if time.monotonic() >= deadline: + raise TimeoutError(f"Timed out waiting for task to complete; last status={status!r}") + time.sleep(_POLL_INTERVAL) diff --git a/tests/conftest.py b/tests/conftest.py index 2f77dceb..1985a66f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import contextlib import operator import os import time @@ -56,6 +57,18 @@ def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Re yield request +def wait_for_deletion(client: httpx.Client, url: str, timeout: int = BRANCH_TIMEOUT_SEC) -> None: + """Poll GET url until it returns 404 (resource deleted) or timeout.""" + deadline = time.monotonic() + timeout + while True: + r = client.get(url, timeout=30) + if r.status_code == 404: + return + if time.monotonic() >= deadline: + raise TimeoutError(f"Timed out waiting for deletion of {url}") + time.sleep(_POLL_INTERVAL) + + def wait_for_status( client: httpx.Client, url: str, @@ -177,4 +190,8 @@ def _factory(org_id: ULID, project_id: ULID, name: str, **overrides: object) -> yield _factory for org_id, project_id, uid in reversed(created): - client.delete(f"organizations/{org_id}/projects/{project_id}/branches/{uid}/") + r = client.delete(f"organizations/{org_id}/projects/{project_id}/branches/{uid}/") + if r.status_code == 202: + # Async delete — wait for the branch to be fully removed before proceeding + with contextlib.suppress(TimeoutError): + wait_for_deletion(client, f"organizations/{org_id}/projects/{project_id}/branches/{uid}/") From ac636e00fd5a91e7e07efaf6dd0d4c0da90f9a14 Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Wed, 1 Apr 2026 13:58:04 +0200 Subject: [PATCH 4/6] fixup! Convert branch deletion to celery job --- .../versions/b8e4c9f12d35_add_branch_delete_task_id.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py b/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py index 6be49b64..4d84764c 100644 --- a/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py +++ b/src/models/migrations/versions/b8e4c9f12d35_add_branch_delete_task_id.py @@ -1,8 +1,8 @@ """Add branch delete_task_id Revision ID: b8e4c9f12d35 -Revises: a1b2c3d4e5f6 -Create Date: 2026-04-01 00:00:00.000000 +Revises: 2b4e8f1a6c03 +Create Date: 2026-04-01 13:57:00.000000 """ from typing import Sequence, Union @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = 'b8e4c9f12d35' -down_revision: Union[str, Sequence[str], None] = 'a1b2c3d4e5f6' +down_revision: Union[str, Sequence[str], None] = '2b4e8f1a6c03' branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None From a8548823bcbe56e52b0c706dad35c49524d18649 Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Wed, 1 Apr 2026 16:00:44 +0200 Subject: [PATCH 5/6] fixup! Convert branch deletion to celery job --- tests/branches/test_basic.py | 31 ++++++++++++- tests/branches/test_delete.py | 85 ----------------------------------- 2 files changed, 30 insertions(+), 86 deletions(-) delete mode 100644 tests/branches/test_delete.py diff --git a/tests/branches/test_basic.py b/tests/branches/test_basic.py index 41dbab9a..7760fed8 100644 --- a/tests/branches/test_basic.py +++ b/tests/branches/test_basic.py @@ -2,7 +2,7 @@ import psycopg import pytest -from conftest import BRANCH_TIMEOUT_SEC, wait_for_status +from conftest import BRANCH_TIMEOUT_SEC, wait_for_deletion, wait_for_status pytestmark = pytest.mark.branch @@ -182,3 +182,32 @@ def test_branch_apikey_list(client, org, project, branch_id): assert r.status_code == 200 ids = [k["id"] for k in r.json()] assert _state["api_key_id"] in ids + + +def test_branch_delete_lifecycle(client, org, project, branch_id): + base = f"organizations/{org}/projects/{project}/branches/{branch_id}" + + # DELETE returns 202 with a Location pointing at the task + r = client.delete(f"{base}/") + assert r.status_code == 202 + assert "Location" in r.headers + + # Branch detail reflects the DELETING status immediately + r = client.get(f"{base}/") + if r.status_code == 200: + assert r.json()["status"] == "DELETING" + + # Task is listed on the branch while deletion is in progress + r = client.get(f"{base}/tasks/") + if r.status_code == 200: + assert any(t["task_type"] == "delete" for t in r.json()) + + # A second DELETE while delete_task_id is set must be rejected + r = client.delete(f"{base}/") + assert r.status_code == 400 + + # Wait for the background job to finish — branch must disappear + wait_for_deletion(client, f"{base}/", BRANCH_TIMEOUT_SEC) + + r = client.get(f"{base}/") + assert r.status_code == 404 diff --git a/tests/branches/test_delete.py b/tests/branches/test_delete.py deleted file mode 100644 index 2c28eb8b..00000000 --- a/tests/branches/test_delete.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Integration tests for async branch deletion. - -These tests verify: - - DELETE /branches/{id}/ returns 202 Accepted with a Location header - - A concurrent DELETE returns 400 while the first delete is in progress - - The branch is eventually gone (GET returns 404) after the task completes -""" - -import time - -import pytest -from conftest import BRANCH_TIMEOUT_SEC, wait_for_deletion - -pytestmark = pytest.mark.branch - -_POLL_INTERVAL = 10 - -_state: dict = {} - - -@pytest.fixture(scope="module") -def org(make_org): - return make_org("test-org-delete") - - -@pytest.fixture(scope="module") -def project(make_project, org): - return make_project(org, "test-project-delete") - - -@pytest.fixture(scope="module") -def branch_id(make_branch, org, project): - return make_branch(org, project, "test-branch-delete") - - -def test_delete_returns_202_with_location(client, org, project, branch_id): - r = client.delete(f"organizations/{org}/projects/{project}/branches/{branch_id}/") - assert r.status_code == 202 - assert "Location" in r.headers - _state["task_url"] = r.headers["Location"] - - -def test_task_listed_while_deleting(client, org, project, branch_id): - r = client.get(f"organizations/{org}/projects/{project}/branches/{branch_id}/tasks/") - if r.status_code == 404: - pytest.skip("Branch already deleted before task listing test could run") - assert r.status_code == 200 - tasks = r.json() - assert any(t["task_type"] == "delete" for t in tasks) - - -def test_concurrent_delete_returns_400(client, org, project, branch_id): - # While the branch has delete_task_id set the endpoint must reject a second delete. - # The branch may have been deleted already in fast CI environments (404 is also acceptable). - r = client.delete(f"organizations/{org}/projects/{project}/branches/{branch_id}/") - assert r.status_code in (400, 404) - - -def test_branch_gone_after_delete(client, org, project, branch_id): - # Poll until the branch returns 404 — the Celery task has completed and deleted it. - wait_for_deletion( - client, - f"organizations/{org}/projects/{project}/branches/{branch_id}/", - BRANCH_TIMEOUT_SEC, - ) - r = client.get(f"organizations/{org}/projects/{project}/branches/{branch_id}/") - assert r.status_code == 404 - - -def test_task_detail_unavailable_after_deletion(client): - task_url = _state.get("task_url") - if not task_url: - pytest.skip("No task URL stored from delete test") - # Once the branch is gone the task detail endpoint returns 404 (BranchDep lookup fails). - deadline = time.monotonic() + BRANCH_TIMEOUT_SEC - while True: - r = client.get(task_url, timeout=30) - if r.status_code == 404: - return - status = r.json().get("status") if r.status_code == 200 else None - if status in ("COMPLETED", "FAILED"): - return - if time.monotonic() >= deadline: - raise TimeoutError(f"Timed out waiting for task to complete; last status={status!r}") - time.sleep(_POLL_INTERVAL) From 012efbf61d09269c504433f9a8e8197ccd8705cb Mon Sep 17 00:00:00 2001 From: Max Schettler Date: Tue, 7 Apr 2026 16:47:41 +0200 Subject: [PATCH 6/6] fixup! Convert branch deletion to celery job --- tests/branches/test_basic.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/branches/test_basic.py b/tests/branches/test_basic.py index 7760fed8..16d448db 100644 --- a/tests/branches/test_basic.py +++ b/tests/branches/test_basic.py @@ -202,9 +202,10 @@ def test_branch_delete_lifecycle(client, org, project, branch_id): if r.status_code == 200: assert any(t["task_type"] == "delete" for t in r.json()) - # A second DELETE while delete_task_id is set must be rejected + # A second DELETE while the branch is in DELETING state must be rejected. + # BranchDep raises 409 for any operation on a DELETING branch. r = client.delete(f"{base}/") - assert r.status_code == 400 + assert r.status_code == 409 # Wait for the background job to finish — branch must disappear wait_for_deletion(client, f"{base}/", BRANCH_TIMEOUT_SEC)