Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2898,22 +2898,34 @@ def _destroy(self) -> bool:

def _run_janitor(self, ignore_ttl: bool = False) -> None:
current_ts = now_timestamp()
failures: t.List[str] = []

# Clean up expired environments by removing their views and schemas
self._cleanup_environments(current_ts=current_ts)

delete_expired_snapshots(
self.state_sync,
self.snapshot_evaluator,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
console=self.console,
batch_size=self.config.janitor.expired_snapshots_batch_size,
failures.extend(self._cleanup_environments(current_ts=current_ts))

failures.extend(
delete_expired_snapshots(
self.state_sync,
self.snapshot_evaluator,
current_ts=current_ts,
ignore_ttl=ignore_ttl,
console=self.console,
batch_size=self.config.janitor.expired_snapshots_batch_size,
)
)
self.state_sync.compact_intervals()

def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
if failures:
failure_string = "\n - ".join(failures)
summary = f"Janitor completed with failures:\n {failure_string}"
Comment on lines +2922 to +2923
if self.config.janitor.warn_on_delete_failure:
self.console.log_warning(summary)
else:
raise SQLMeshError(summary)

def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[str]:
current_ts = current_ts or now_timestamp()
failures: t.List[str] = []

expired_environments_summaries = self.state_sync.get_expired_environments(
current_ts=current_ts
Expand All @@ -2923,15 +2935,19 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
expired_env = self.state_reader.get_environment(expired_env_summary.name)

if expired_env:
cleanup_expired_views(
default_adapter=self.engine_adapter,
engine_adapters=self.engine_adapters,
environments=[expired_env],
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
console=self.console,
failures.extend(
cleanup_expired_views(
default_adapter=self.engine_adapter,
engine_adapters=self.engine_adapters,
environments=[expired_env],
console=self.console,
)
)

self.state_sync.delete_expired_environments(current_ts=current_ts)
# we want to retry on the next janitor pass if drops failed
if not failures:
self.state_sync.delete_expired_environments(current_ts=current_ts)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where a schema was permanently deleted by the user, prior to the janitor running, won't this cause the environment to never expire? I feel like we need an escape hatch.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this pr unblocks the janitor so it doesn't accumulate stale snapshots in the case of one failure and it can continue with the rest of the operations

this guard to not delete the expired environments in this case of a failure is matching the current behaviour we have of retrying the next time that the janitor runs rather than deleting the expired environments and leaving orphans in the db. the test: test_janitor_cleanup_order is an example of this behaviour that we currently want

I see the value of an escape hatch for the cases where drops persistently fail, but this would require a more deliberate mechanism. I can't think of a simple way to do it as part of this pr without changing the janitor design substantially unless you have a suggestion of a simple way

return failures

def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
connection_name = connection_name.capitalize()
Expand Down
63 changes: 33 additions & 30 deletions sqlmesh/core/janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
RowBoundary,
ExpiredBatchRange,
)
from sqlmesh.utils.errors import SQLMeshError


def cleanup_expired_views(
default_adapter: EngineAdapter,
engine_adapters: t.Dict[str, EngineAdapter],
environments: t.List[Environment],
warn_on_delete_failure: bool = False,
console: t.Optional[Console] = None,
) -> None:
) -> t.List[str]:
failures: t.List[str] = []

expired_schema_or_catalog_environments = [
environment
for environment in environments
Expand Down Expand Up @@ -85,10 +85,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
console.update_cleanup_progress(expired_view)
except Exception as e:
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
logger.warning(message)
Comment thread
georgesittas marked this conversation as resolved.
failures.append(message)
Comment on lines 87 to +89

# Drop the schemas for the expired environments
for engine_adapter, schema in schemas_to_drop:
Expand All @@ -102,10 +100,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
except Exception as e:
message = f"Failed to drop the expired environment schema '{schema}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
logger.warning(message)
failures.append(message)
Comment on lines 101 to +104

# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
Expand All @@ -117,10 +113,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
console.update_cleanup_progress(catalog)
except Exception as e:
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
if warn_on_delete_failure:
logger.warning(message)
else:
raise SQLMeshError(message) from e
logger.warning(message)
failures.append(message)
Comment on lines 114 to +117

return failures


def delete_expired_snapshots(
Expand All @@ -131,7 +127,7 @@ def delete_expired_snapshots(
ignore_ttl: bool = False,
batch_size: t.Optional[int] = None,
console: t.Optional[Console] = None,
) -> None:
) -> t.List[str]:
"""Delete all expired snapshots in batches.

This helper function encapsulates the logic for deleting expired snapshots in batches,
Expand All @@ -146,8 +142,9 @@ def delete_expired_snapshots(
console: Optional console for reporting progress.

Returns:
The total number of deleted expired snapshots.
List of failure messages so callers can surface them at the end of the janitor run.
"""
failures: t.List[str] = []
num_expired_snapshots = 0
for batch in iter_expired_snapshot_batches(
state_reader=state_sync,
Expand All @@ -165,17 +162,23 @@ def delete_expired_snapshots(
len(batch.expired_snapshot_ids),
end_info,
)
snapshot_evaluator.cleanup(
target_snapshots=batch.cleanup_tasks,
on_complete=console.update_cleanup_progress if console else None,
)
state_sync.delete_expired_snapshots(
batch_range=ExpiredBatchRange(
start=RowBoundary.lowest_boundary(),
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
try:
snapshot_evaluator.cleanup(
target_snapshots=batch.cleanup_tasks,
on_complete=console.update_cleanup_progress if console else None,
)
state_sync.delete_expired_snapshots(
batch_range=ExpiredBatchRange(
start=RowBoundary.lowest_boundary(),
end=batch.batch_range.end,
),
ignore_ttl=ignore_ttl,
)
logger.info("Cleaned up expired snapshots batch")
num_expired_snapshots += len(batch.expired_snapshot_ids)
except Exception as e:
message = f"Failed to clean up an expired snapshots batch: {e}"
logger.warning(message)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

failures.append(message)
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
return failures
64 changes: 62 additions & 2 deletions tests/core/integration/test_aux_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ModelDefaultsConfig,
DuckDBConnectionConfig,
)
from sqlmesh.core.config.janitor import JanitorConfig
from sqlmesh.core.context import Context
from sqlmesh.core.model import (
SqlModel,
Expand Down Expand Up @@ -146,7 +147,8 @@ def setup_scenario():
# Case 2: Assume that the view cleanup yields an error, the enviroment
# record should still exist
mocker.patch(
"sqlmesh.core.context.cleanup_expired_views", side_effect=Exception("view cleanup error")
"sqlmesh.core.context.cleanup_expired_views",
return_value=["view cleanup error"],
)
ctx, model1_snapshot = setup_scenario()

Expand All @@ -157,13 +159,71 @@ def setup_scenario():
assert ctx.state_sync.get_environment("dev")

# - Run the janitor again, this time it should succeed
mocker.patch("sqlmesh.core.context.cleanup_expired_views")
mocker.patch("sqlmesh.core.context.cleanup_expired_views", return_value=[])
ctx._run_janitor(ignore_ttl=True)

# - Check that the environment record does not exist in the state sync anymore
assert not ctx.state_sync.get_environment("dev")


def test_janitor_aggregates_failures_into_single_error(mocker: MockerFixture, tmp_path: Path):
models_dir = tmp_path / "models"
models_dir.mkdir()
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")

ctx = Context(
paths=[tmp_path],
config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")),
)
ctx.plan("dev", no_prompts=True, auto_apply=True)
ctx.invalidate_environment("dev")

mocker.patch(
"sqlmesh.core.context.cleanup_expired_views",
return_value=["view drop error A", "view drop error B"],
)
mocker.patch(
"sqlmesh.core.janitor.iter_expired_snapshot_batches",
return_value=iter([]),
)

with pytest.raises(SQLMeshError, match="Janitor completed with failures"):
ctx._run_janitor(ignore_ttl=True)


def test_janitor_warn_on_delete_failure_downgrades_aggregated_error(
mocker: MockerFixture, tmp_path: Path
):
models_dir = tmp_path / "models"
models_dir.mkdir()
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")

ctx = Context(
paths=[tmp_path],
config=Config(
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
janitor=JanitorConfig(warn_on_delete_failure=True),
),
)
ctx.plan("dev", no_prompts=True, auto_apply=True)
ctx.invalidate_environment("dev")

mocker.patch(
"sqlmesh.core.context.cleanup_expired_views",
return_value=["view drop error"],
)
mocker.patch(
"sqlmesh.core.janitor.iter_expired_snapshot_batches",
return_value=iter([]),
)

warn_spy = mocker.patch.object(ctx.console, "log_warning")

ctx._run_janitor(ignore_ttl=True)
assert warn_spy.called
assert "Janitor completed with failures" in warn_spy.call_args[0][0]


@use_terminal_console
def test_destroy(copy_to_temp_path):
# Testing project with two gateways to verify cleanup is performed across engines
Expand Down
61 changes: 55 additions & 6 deletions tests/core/test_janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
)
from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots
from sqlmesh.utils.date import now_timestamp
from sqlmesh.utils.errors import SQLMeshError

pytestmark = pytest.mark.slow

Expand Down Expand Up @@ -101,7 +100,7 @@ def test_cleanup_expired_views(mocker: MockerFixture, make_snapshot: t.Callable)
@pytest.mark.parametrize(
"suffix_target", [EnvironmentSuffixTarget.SCHEMA, EnvironmentSuffixTarget.TABLE]
)
def test_cleanup_expired_environment_schema_warn_on_delete_failure(
def test_cleanup_expired_views_collects_failures(
mocker: MockerFixture, make_snapshot: t.Callable, suffix_target: EnvironmentSuffixTarget
):
adapter = mocker.MagicMock()
Expand All @@ -124,17 +123,67 @@ def test_cleanup_expired_environment_schema_warn_on_delete_failure(
catalog_name_override="catalog_override",
)

with pytest.raises(SQLMeshError, match="Failed to drop the expired environment .*"):
cleanup_expired_views(adapter, {}, [schema_environment], warn_on_delete_failure=False)

cleanup_expired_views(adapter, {}, [schema_environment], warn_on_delete_failure=True)
# Janitor is now best-effort: failures are returned, not raised.
failures = cleanup_expired_views(adapter, {}, [schema_environment])
assert len(failures) == 1
assert "Failed to drop the expired environment" in failures[0]

if suffix_target == EnvironmentSuffixTarget.SCHEMA:
assert adapter.drop_schema.called
else:
assert adapter.drop_view.called


def test_cleanup_expired_views_continues_past_failures(
mocker: MockerFixture, make_snapshot: t.Callable
):
adapter = mocker.MagicMock()
adapter.dialect = None

snapshot_failing = make_snapshot(
SqlModel(name="catalog.schema.failing", query=parse_one("select 1"))
)
snapshot_failing.categorize_as(SnapshotChangeCategory.BREAKING)
snapshot_succeeding = make_snapshot(
SqlModel(name="catalog.schema.succeeding", query=parse_one("select 1"))
)
snapshot_succeeding.categorize_as(SnapshotChangeCategory.BREAKING)

failing_env = Environment(
name="failing_env",
suffix_target=EnvironmentSuffixTarget.TABLE,
snapshots=[snapshot_failing.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="p",
previous_plan_id="p",
)
succeeding_env = Environment(
name="succeeding_env",
suffix_target=EnvironmentSuffixTarget.TABLE,
snapshots=[snapshot_succeeding.table_info],
start_at="2022-01-01",
end_at="2022-01-01",
plan_id="p",
previous_plan_id="p",
)

def drop_view_side_effect(view, ignore_if_not_exists=True):
if "failing" in str(view):
raise Exception("boom")

adapter.drop_view.side_effect = drop_view_side_effect

failures = cleanup_expired_views(adapter, {}, [failing_env, succeeding_env])

# Both drops were attempted
assert adapter.drop_view.call_count == 2

# Only the failing one is reported
assert len(failures) == 1
assert "failing" in failures[0]


def test_delete_expired_snapshots_common_function_batching(
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable, mocker: MockerFixture
):
Expand Down
Loading