Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions src/api/organization/project/branch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
branch_service_name,
delete_deployment,
deploy_branch_environment,
ensure_branch_storage_class,
get_autoscaler_vm_identity,
kube_service,
resolve_branch_database_volume_size,
Expand Down Expand Up @@ -765,20 +764,18 @@ async def _clone_branch_environment_task(
initial_password: str | None,
) -> None:
await _persist_branch_status(branch_id, BranchServiceStatus.CREATING)
storage_class_name: str | None = None
if copy_data:
try:
storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops)
await clone_branch_database_volume(
source_branch_id=source_branch_id,
target_branch_id=branch_id,
snapshot_class=_VOLUME_SNAPSHOT_CLASS,
storage_class_name=storage_class_name,
snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS,
snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS,
pvc_timeout_seconds=_PVC_CLONE_TIMEOUT_SECONDS,
pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS,
database_size=parameters.database_size,
iops=parameters.iops,
pitr_enabled=pitr_enabled,
)
except VelaError:
Expand Down Expand Up @@ -842,22 +839,20 @@ async def _restore_branch_environment_task(
initial_password: str | None,
) -> None:
await _persist_branch_status(branch_id, BranchServiceStatus.CREATING)
storage_class_name: str | None = None
try:
storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops)
await restore_branch_database_volume_from_snapshot(
source_branch_id=source_branch_id,
target_branch_id=branch_id,
snapshot_namespace=snapshot_namespace,
snapshot_name=snapshot_name,
snapshot_content_name=snapshot_content_name,
snapshot_class=_VOLUME_SNAPSHOT_CLASS,
storage_class_name=storage_class_name,
snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS,
snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS,
pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS,
pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS,
database_size=restore_database_size,
iops=parameters.iops,
)
if wal_snapshot_name is not None:
await restore_branch_wal_volume_from_snapshot(
Expand All @@ -866,11 +861,11 @@ async def _restore_branch_environment_task(
snapshot_namespace=snapshot_namespace,
snapshot_name=wal_snapshot_name,
snapshot_class=_VOLUME_SNAPSHOT_CLASS,
storage_class_name=storage_class_name,
snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS,
snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS,
pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS,
pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS,
iops=parameters.iops,
)
except VelaError:
await _persist_branch_status(branch_id, BranchServiceStatus.ERROR)
Expand Down Expand Up @@ -937,20 +932,19 @@ async def _restore_branch_environment_in_place_task(
return

try:
storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops)
await restore_branch_database_volume_from_snapshot(
source_branch_id=source_branch_id,
target_branch_id=branch_id,
snapshot_namespace=snapshot_namespace,
snapshot_name=snapshot_name,
snapshot_content_name=snapshot_content_name,
snapshot_class=_VOLUME_SNAPSHOT_CLASS,
storage_class_name=storage_class_name,
snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS,
snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS,
pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS,
pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS,
database_size=parameters.database_size,
iops=parameters.iops,
)
except VelaError:
await _persist_branch_status(branch_id, BranchServiceStatus.ERROR)
Expand Down
131 changes: 41 additions & 90 deletions src/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
SIMPLYBLOCK_CSI_CONFIGMAP = "simplyblock-csi-cm"
SIMPLYBLOCK_CSI_SECRET = "simplyblock-csi-secret"
SIMPLYBLOCK_CSI_STORAGE_CLASS = "simplyblock-csi-sc"
SIMPLYBLOCK_QOS_RW_IOPS_ANNOTATION = "simplybk/qos-rw-iops"
STORAGE_PVC_SUFFIX = "-storage-pvc"
DATABASE_PVC_SUFFIX = "-db-pvc"
AUTOSCALER_PVC_SUFFIX = "-block-data"
Expand All @@ -94,8 +95,8 @@
DATABASE_DNS_RECORD_TYPE: Literal["CNAME"] = "CNAME"


def branch_storage_class_name(branch_id: Identifier) -> str:
return f"sc-{str(branch_id).lower()}"
def simplyblock_iops_annotations(iops: int) -> dict[str, str]:
return {SIMPLYBLOCK_QOS_RW_IOPS_ANNOTATION: str(iops)}


def deployment_branch(namespace: str) -> ULID:
Expand Down Expand Up @@ -238,52 +239,6 @@ async def _initialize_autoscaler_overlay_endpoints(namespace: str) -> None:
await _ensure_autoscaler_overlay_endpoint_slices(namespace, overlay_ip)


def _build_storage_class_manifest(*, storage_class_name: str, iops: int, base_storage_class: Any) -> dict[str, Any]:
provisioner = getattr(base_storage_class, "provisioner", None)
if not provisioner:
raise VelaKubernetesError("Base storage class missing provisioner")

base_parameters = dict(getattr(base_storage_class, "parameters", {}) or {})
cluster_id = base_parameters.get("cluster_id")
if not cluster_id:
raise VelaKubernetesError("Base storage class missing required parameter 'cluster_id'")

parameters = {key: str(value) for key, value in base_parameters.items()}
parameters.update(
{
"qos_rw_iops": str(iops),
"qos_rw_mbytes": "0",
"qos_r_mbytes": "0",
"qos_w_mbytes": "0",
}
)

allow_volume_expansion = getattr(base_storage_class, "allow_volume_expansion", None)
volume_binding_mode = getattr(base_storage_class, "volume_binding_mode", None)
reclaim_policy = getattr(base_storage_class, "reclaim_policy", None)
mount_options = getattr(base_storage_class, "mount_options", None)

manifest: dict[str, Any] = {
"apiVersion": "storage.k8s.io/v1",
"kind": "StorageClass",
"metadata": {
"name": storage_class_name,
},
"provisioner": provisioner,
"parameters": parameters,
}
if reclaim_policy is not None:
manifest["reclaimPolicy"] = reclaim_policy
if volume_binding_mode is not None:
manifest["volumeBindingMode"] = volume_binding_mode
if allow_volume_expansion is not None:
manifest["allowVolumeExpansion"] = bool(allow_volume_expansion)
if mount_options:
manifest["mountOptions"] = list(mount_options)

return manifest


async def load_simplyblock_credentials() -> tuple[str, UUID, str, str]:
simplyblock_namespace = get_settings().simplyblock_csi_namespace
try:
Expand Down Expand Up @@ -354,34 +309,36 @@ async def resolve_branch_database_volume_size(branch_id: Identifier) -> int:

async def update_branch_volume_iops(branch_id: Identifier, iops: int) -> None:
namespace = deployment_namespace(branch_id)
_, autoscaler_vm_name = get_autoscaler_vm_identity(branch_id)
pvc_names = (
f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}",
f"{autoscaler_vm_name}{AUTOSCALER_WAL_PVC_SUFFIX}",
f"{autoscaler_vm_name}{STORAGE_PVC_SUFFIX}",
)

async def _resolve_existing_volume(pvc_name: str) -> UUID | None:
try:
volume, _ = await _resolve_volume_identifiers(namespace, pvc_name)
except (VelaDeploymentError, VelaKubernetesError) as exc:
if "not found" in str(exc).lower():
return None
raise
return volume

resolved_volumes = await asyncio.gather(*(_resolve_existing_volume(pvc_name) for pvc_name in pvc_names))
volumes = tuple(dict.fromkeys(volume for volume in resolved_volumes if volume is not None))
if not volumes:
raise VelaDeploymentError(f"Failed to resolve any Simplyblock volumes for branch {branch_id}")

volume, _ = await resolve_autoscaler_volume_identifiers(namespace)
try:
async with create_simplyblock_api() as sb_api:
await sb_api.update_volume(volume=volume, payload={"max_rw_iops": iops})
await asyncio.gather(
*(sb_api.update_volume(volume=volume, payload={"max_rw_iops": iops}) for volume in volumes)
)
except VelaSimplyblockAPIError as exc:
raise VelaDeploymentError("Failed to update volume") from exc

logger.info("Updated Simplyblock volume %s IOPS to %s", volume, iops)


async def ensure_branch_storage_class(branch_id: Identifier, *, iops: int) -> str:
storage_class_name = branch_storage_class_name(branch_id)
try:
await kube_service.get_storage_class(storage_class_name)
logger.info("StorageClass %s already exists; reusing", storage_class_name)
return storage_class_name
except VelaKubernetesError:
pass

base_storage_class = await kube_service.get_storage_class(SIMPLYBLOCK_CSI_STORAGE_CLASS)
storage_class_manifest = _build_storage_class_manifest(
storage_class_name=storage_class_name,
iops=iops,
base_storage_class=base_storage_class,
)
await kube_service.apply_storage_class(storage_class_manifest)
return storage_class_name
logger.info("Updated Simplyblock volumes %s IOPS to %s", list(volumes), iops)
Comment on lines +312 to +341
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  • Why are we going through the simplyblock API here? In other places annotations to the PVC are sufficient.
  • If we do in fact have to go through the API, a helper to set the IOPS of a PVC would make this more readable.



def _load_chart_values(chart_root: Any) -> dict[str, Any]:
Expand All @@ -393,7 +350,6 @@ def _load_chart_values(chart_root: Any) -> dict[str, Any]:

async def _create_fresh_pvcs(
namespace: str,
storage_class_name: str,
parameters: DeploymentParameters,
*,
pitr_enabled: bool,
Expand All @@ -405,10 +361,13 @@ async def _create_fresh_pvcs(

def _pvc(name: str, size: str) -> kubernetes_client.V1PersistentVolumeClaim:
return kubernetes_client.V1PersistentVolumeClaim(
metadata=kubernetes_client.V1ObjectMeta(name=name),
metadata=kubernetes_client.V1ObjectMeta(
name=name,
annotations=simplyblock_iops_annotations(parameters.iops),
),
spec=kubernetes_client.V1PersistentVolumeClaimSpec(
access_modes=access_modes,
storage_class_name=storage_class_name,
storage_class_name=SIMPLYBLOCK_CSI_STORAGE_CLASS,
volume_mode="Block",
resources=kubernetes_client.V1VolumeResourceRequirements(requests={"storage": size}),
),
Expand All @@ -429,7 +388,6 @@ def _configure_vela_values(
jwt_secret: str,
database_admin_password: str,
pgbouncer_admin_password: str,
storage_class_name: str,
pgbouncer_config: Mapping[str, int] | None,
enable_file_storage: bool,
pitr_enabled: bool,
Expand Down Expand Up @@ -474,23 +432,26 @@ def _configure_vela_values(
storage_persistence["size"] = str(parameters.storage_size)
else:
storage_persistence.pop("size", None)
storage_persistence["storageClassName"] = storage_class_name
storage_persistence["storageClassName"] = SIMPLYBLOCK_CSI_STORAGE_CLASS
storage_persistence["annotations"] = simplyblock_iops_annotations(parameters.iops)
storage_spec["enabled"] = enable_file_storage

wal_archive_spec = values_content.pop("walArchive", None)
pg_wal_spec = values_content.setdefault("pg_wal", wal_archive_spec or {})
pg_wal_spec["enabled"] = pitr_enabled
wal_persistence = pg_wal_spec.setdefault("persistence", {})
wal_persistence["size"] = PITR_WAL_PVC_SIZE
wal_persistence["storageClassName"] = storage_class_name
wal_persistence["storageClassName"] = SIMPLYBLOCK_CSI_STORAGE_CLASS
wal_persistence["claimName"] = wal_persistence.get("claimName") or (
f"{_autoscaler_vm_name()}{AUTOSCALER_WAL_PVC_SUFFIX}"
)
wal_persistence["annotations"] = simplyblock_iops_annotations(parameters.iops)
wal_persistence.setdefault("accessModes", ["ReadWriteMany"])

db_persistence = db_spec.setdefault("persistence", {})
db_persistence["size"] = str(parameters.database_size)
db_persistence["storageClassName"] = storage_class_name
db_persistence["storageClassName"] = SIMPLYBLOCK_CSI_STORAGE_CLASS
db_persistence["annotations"] = simplyblock_iops_annotations(parameters.iops)

autoscaler_spec = values_content.setdefault("autoscalerVm", {})
autoscaler_spec["enabled"] = True
Expand All @@ -508,7 +469,8 @@ def _configure_vela_values(
autoscaler_persistence = autoscaler_spec.setdefault("persistence", {})
autoscaler_persistence["claimName"] = f"{_autoscaler_vm_name()}{AUTOSCALER_PVC_SUFFIX}"
autoscaler_persistence["size"] = str(parameters.database_size)
autoscaler_persistence["storageClassName"] = storage_class_name
autoscaler_persistence["storageClassName"] = SIMPLYBLOCK_CSI_STORAGE_CLASS
autoscaler_persistence["annotations"] = simplyblock_iops_annotations(parameters.iops)
autoscaler_persistence.setdefault("accessModes", ["ReadWriteMany"])

autoscaler_tls = autoscaler_spec.setdefault("tls", {})
Expand Down Expand Up @@ -548,18 +510,15 @@ async def create_vela_config(
postgresql_resource = resources.files(__package__).joinpath("postgresql.conf")
values_content = _load_chart_values(chart)

storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops)

if not use_existing_db_pvc:
await _create_fresh_pvcs(namespace, storage_class_name, parameters, pitr_enabled=pitr_enabled)
await _create_fresh_pvcs(namespace, parameters, pitr_enabled=pitr_enabled)

values_content = _configure_vela_values(
values_content,
parameters=parameters,
jwt_secret=jwt_secret,
database_admin_password=database_admin_password,
pgbouncer_admin_password=pgbouncer_admin_password,
storage_class_name=storage_class_name,
pgbouncer_config=pgbouncer_config,
enable_file_storage=parameters.enable_file_storage,
pitr_enabled=pitr_enabled,
Expand Down Expand Up @@ -639,7 +598,6 @@ async def _delete_autoscaler_vm(namespace: str) -> None:

async def delete_deployment(branch_id: Identifier) -> None:
namespace, _ = get_autoscaler_vm_identity(branch_id)
storage_class_name = branch_storage_class_name(branch_id)
await cleanup_branch_dns(branch_id)
await _delete_autoscaler_vm(namespace)
try:
Expand All @@ -653,13 +611,6 @@ async def delete_deployment(branch_id: Identifier) -> None:
await delete_vela_grafana_obj(branch_id)
except VelaGrafanaError:
logger.info("Grafana dashboard for branch %s not found", branch_id)
try:
await kube_service.delete_storage_class(storage_class_name)
except ApiException as exc:
if exc.status == 404:
logger.info("StorageClass %s not found", storage_class_name)
else:
raise


def get_autoscaler_vm_identity(branch_id: Identifier) -> tuple[str, str]:
Expand Down
Loading
Loading