Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,22 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
is_flag=True,
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
)
@click.option(
"--force-delete",
is_flag=True,
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
"Any objects that could not be dropped become orphaned and must be removed manually.",
)
@click.pass_context
@error_handler
@cli_analytics
def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
"""
Run the janitor process on-demand.

The janitor cleans up old environments and expired snapshots.
"""
ctx.obj.run_janitor(ignore_ttl, **kwargs)
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)


@cli.command("destroy")
Expand Down
62 changes: 43 additions & 19 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,12 +887,12 @@ def _has_environment_changed() -> bool:
return completion_status

@python_api_analytics
def run_janitor(self, ignore_ttl: bool) -> bool:
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
success = False

if self.console.start_cleanup(ignore_ttl):
try:
self._run_janitor(ignore_ttl)
self._run_janitor(ignore_ttl, force_delete=force_delete)
success = True
finally:
self.console.stop_cleanup(success=success)
Expand Down Expand Up @@ -2896,24 +2896,43 @@ def _destroy(self) -> bool:

return True

def _run_janitor(self, ignore_ttl: bool = False) -> None:
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
current_ts = now_timestamp()
failures: t.List[str] = []

# Clean up expired environments by removing their views and schemas
self._cleanup_environments(current_ts=current_ts)
failures.extend(
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
)

delete_expired_snapshots(
self.state_sync,
self.snapshot_evaluator,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
console=self.console,
batch_size=self.config.janitor.expired_snapshots_batch_size,
failures.extend(
delete_expired_snapshots(
self.state_sync,
self.snapshot_evaluator,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
force_delete=force_delete,
console=self.console,
batch_size=self.config.janitor.expired_snapshots_batch_size,
)
)
self.state_sync.compact_intervals()

def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
if failures:
failure_string = "\n - ".join(failures)
summary = f"Janitor completed with failures:\n {failure_string}"
Comment on lines +2922 to +2923
if force_delete:
summary += "\nState records have been deleted, but the underlying objects may still exist in the database.\nPlease investigate and clean up manually the above if necessary."
if self.config.janitor.warn_on_delete_failure:
self.console.log_warning(summary)
else:
raise SQLMeshError(summary)

def _cleanup_environments(
self, current_ts: t.Optional[int] = None, force_delete: bool = False
) -> t.List[str]:
current_ts = current_ts or now_timestamp()
failures: t.List[str] = []

expired_environments_summaries = self.state_sync.get_expired_environments(
current_ts=current_ts
Expand All @@ -2923,15 +2942,20 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
expired_env = self.state_reader.get_environment(expired_env_summary.name)

if expired_env:
cleanup_expired_views(
default_adapter=self.engine_adapter,
engine_adapters=self.engine_adapters,
environments=[expired_env],
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
console=self.console,
failures.extend(
cleanup_expired_views(
default_adapter=self.engine_adapter,
engine_adapters=self.engine_adapters,
environments=[expired_env],
console=self.console,
)
)

self.state_sync.delete_expired_environments(current_ts=current_ts)
# we want to retry on the next janitor pass if drops failed, unless
# force_delete is set in which case we purge state records regardless
if not failures or force_delete:
self.state_sync.delete_expired_environments(current_ts=current_ts)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where a schema was permanently deleted by the user, prior to the janitor running, won't this cause the environment to never expire? I feel like we need an escape hatch.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this pr unblocks the janitor so it doesn't accumulate stale snapshots in the case of one failure and it can continue with the rest of the operations

this guard to not delete the expired environments in this case of a failure is matching the current behaviour we have of retrying the next time that the janitor runs rather than deleting the expired environments and leaving orphans in the db. the test: test_janitor_cleanup_order is an example of this behaviour that we currently want

I see the value of an escape hatch for the cases where drops persistently fail, but this would require a more deliberate mechanism. I can't think of a simple way to do it as part of this pr without changing the janitor design substantially unless you have a suggestion of a simple way

return failures

def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
connection_name = connection_name.capitalize()
Expand Down
74 changes: 44 additions & 30 deletions sqlmesh/core/janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
RowBoundary,
ExpiredBatchRange,
)
from sqlmesh.utils.errors import SQLMeshError


def cleanup_expired_views(
default_adapter: EngineAdapter,
engine_adapters: t.Dict[str, EngineAdapter],
environments: t.List[Environment],
warn_on_delete_failure: bool = False,
console: t.Optional[Console] = None,
) -> None:
) -> t.List[str]:
failures: t.List[str] = []

expired_schema_or_catalog_environments = [
environment
for environment in environments
Expand Down Expand Up @@ -85,10 +85,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
console.update_cleanup_progress(expired_view)
except Exception as e:
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
logger.warning(message)
Comment thread
georgesittas marked this conversation as resolved.
failures.append(message)
Comment on lines 87 to +89

# Drop the schemas for the expired environments
for engine_adapter, schema in schemas_to_drop:
Expand All @@ -102,10 +100,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
except Exception as e:
message = f"Failed to drop the expired environment schema '{schema}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
logger.warning(message)
failures.append(message)
Comment on lines 101 to +104

# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
Expand All @@ -117,10 +113,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
console.update_cleanup_progress(catalog)
except Exception as e:
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
logger.warning(message)
failures.append(message)
Comment on lines 114 to +117

return failures


def delete_expired_snapshots(
Expand All @@ -129,9 +125,10 @@ def delete_expired_snapshots(
*,
current_ts: int,
ignore_ttl: bool = False,
force_delete: bool = False,
batch_size: t.Optional[int] = None,
console: t.Optional[Console] = None,
) -> None:
) -> t.List[str]:
"""Delete all expired snapshots in batches.

This helper function encapsulates the logic for deleting expired snapshots in batches,
Expand All @@ -142,12 +139,14 @@ def delete_expired_snapshots(
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
current_ts: Timestamp used to evaluate expiration.
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
force_delete: If True, delete snapshot state records even when physical table cleanup fails.
batch_size: Maximum number of snapshots to fetch per batch.
console: Optional console for reporting progress.

Returns:
The total number of deleted expired snapshots.
List of failure messages so callers can surface them at the end of the janitor run.
"""
failures: t.List[str] = []
num_expired_snapshots = 0
for batch in iter_expired_snapshot_batches(
state_reader=state_sync,
Expand All @@ -165,17 +164,32 @@ def delete_expired_snapshots(
len(batch.expired_snapshot_ids),
end_info,
)
snapshot_evaluator.cleanup(
target_snapshots=batch.cleanup_tasks,
on_complete=console.update_cleanup_progress if console else None,
)
state_sync.delete_expired_snapshots(
batch_range=ExpiredBatchRange(
start=RowBoundary.lowest_boundary(),
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
cleanup_succeeded = True
try:
snapshot_evaluator.cleanup(
target_snapshots=batch.cleanup_tasks,
on_complete=console.update_cleanup_progress if console else None,
)
except Exception as failed_drops:
message = f"Failed to clean up: {failed_drops}"
logger.warning(message)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

failures.append(message)
cleanup_succeeded = False

if cleanup_succeeded or force_delete:
try:
state_sync.delete_expired_snapshots(
batch_range=ExpiredBatchRange(
start=RowBoundary.lowest_boundary(),
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
except Exception as e:
message = f"Failed to delete expired snapshot state records: {e}"
logger.warning(message)
failures.append(message)
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
return failures
6 changes: 5 additions & 1 deletion sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ def cleanup(
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
}
with self.concurrent_context():
concurrent_apply_to_snapshots(
errors, _ = concurrent_apply_to_snapshots(
[t.snapshot for t in filtered_targets],
lambda s: self._cleanup_snapshot(
s,
Expand All @@ -557,7 +557,11 @@ def cleanup(
),
self.ddl_concurrent_tasks,
reverse_order=True,
raise_on_error=False,
)
if errors:
errored_snapshots = "\n".join(f" {e.node.name}: {e.__cause__}" for e in errors)
raise SQLMeshError(f"\n{errored_snapshots}")
Comment on lines +562 to +564

def audit(
self,
Expand Down
Loading
Loading