From da9a69c85fae698fbb510a9fdca7f26db8cbd897 Mon Sep 17 00:00:00 2001 From: vinicius Date: Sun, 5 Apr 2026 08:17:45 -0400 Subject: [PATCH 1/2] Fix: allow --select-model to plan a model deletion When a model file is deleted locally but still exists in the deployed environment, --select-model now correctly plans the removal instead of raising PlanError. Fixes #5741 Signed-off-by: vinicius --- sqlmesh/core/context.py | 20 ++++- sqlmesh/core/selector.py | 83 ++++++++++++++----- tests/core/test_context.py | 38 +++++++++ tests/core/test_selector_native.py | 127 +++++++++++++++++++++++++++++ 4 files changed, 245 insertions(+), 23 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index dc51aad2a7..3e493bb4ff 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1605,6 +1605,10 @@ def plan_builder( backfill_models = None models_override: t.Optional[UniqueKeyDict[str, Model]] = None + # FQNs of models that are selected for deletion (present in the deployed environment but + # absent from local files). These are models whose files have been deleted; they will + # appear in context_diff.removed_snapshots rather than as backfill candidates. + selected_deletion_fqns: t.Set[str] = set() if select_models: try: models_override = model_selector.select_models( @@ -1622,12 +1626,24 @@ def plan_builder( # Only backfill selected models unless explicitly specified. backfill_models = model_selector.expand_model_selections(select_models) + if not backfill_models: + # The selection matched nothing locally. Check whether it matched models that exist + # in the deployed environment but have been deleted locally. If so, the selection is + # valid — the deletions will surface in context_diff.removed_snapshots. + env_selected = model_selector.expand_model_selections_with_env( + select_models, + environment, + fallback_env_name=create_from or c.PROD, + ensure_finalized_snapshots=self.config.plan.use_finalized_state, + ) + selected_deletion_fqns = env_selected - set(self._models) + expanded_restate_models = None if restate_models is not None: expanded_restate_models = model_selector.expand_model_selections(restate_models) if (restate_models is not None and not expanded_restate_models) or ( - backfill_models is not None and not backfill_models + backfill_models is not None and not backfill_models and not selected_deletion_fqns ): raise PlanError( "Selector did not return any models. Please check your model selection and try again." @@ -1636,7 +1652,7 @@ def plan_builder( if always_include_local_changes is None: # default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes force_no_diff = restate_models is not None or ( - backfill_models is not None and not backfill_models + backfill_models is not None and not backfill_models and not selected_deletion_fqns ) else: force_no_diff = not always_include_local_changes diff --git a/sqlmesh/core/selector.py b/sqlmesh/core/selector.py index 9eaf4995c8..f8c9fb6506 100644 --- a/sqlmesh/core/selector.py +++ b/sqlmesh/core/selector.py @@ -78,27 +78,9 @@ def select_models( Returns: A dictionary of models. """ - target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) - if target_env and target_env.expired: - target_env = None - - if not target_env and fallback_env_name: - target_env = self._state_reader.get_environment( - Environment.sanitize_name(fallback_env_name) - ) - - env_models: t.Dict[str, Model] = {} - if target_env: - environment_snapshot_infos = ( - target_env.snapshots - if not ensure_finalized_snapshots - else target_env.finalized_or_current_snapshots - ) - env_models = { - s.name: s.model - for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() - if s.is_model - } + env_models = self._load_env_models( + target_env_name, fallback_env_name, ensure_finalized_snapshots + ) all_selected_models = self.expand_model_selections( model_selections, models={**env_models, **self._models} @@ -168,6 +150,65 @@ def get_model(fqn: str) -> t.Optional[Model]: return models + def expand_model_selections_with_env( + self, + model_selections: t.Iterable[str], + target_env_name: str, + fallback_env_name: t.Optional[str] = None, + ensure_finalized_snapshots: bool = False, + ) -> t.Set[str]: + """Expands model selections against both local models and the target environment. + + This allows selections to match models that have been deleted locally but still + exist in the deployed environment. + + Args: + model_selections: A set of selections. + target_env_name: The name of the target environment. + fallback_env_name: The name of the fallback environment that will be used if the target + environment doesn't exist. + ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized + environment state, or to use whatever snapshots are in the current environment state even if + the environment is not finalized. + + Returns: + A set of matched model FQNs. + """ + env_models = self._load_env_models( + target_env_name, fallback_env_name, ensure_finalized_snapshots + ) + return self.expand_model_selections(model_selections, models={**env_models, **self._models}) + + def _load_env_models( + self, + target_env_name: str, + fallback_env_name: t.Optional[str] = None, + ensure_finalized_snapshots: bool = False, + ) -> t.Dict[str, "Model"]: + """Loads models from the target environment, falling back to the fallback environment if needed.""" + target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) + if target_env and target_env.expired: + target_env = None + + if not target_env and fallback_env_name: + target_env = self._state_reader.get_environment( + Environment.sanitize_name(fallback_env_name) + ) + + if not target_env: + return {} + + environment_snapshot_infos = ( + target_env.snapshots + if not ensure_finalized_snapshots + else target_env.finalized_or_current_snapshots + ) + return { + s.name: s.model + for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() + if s.is_model + } + def expand_model_selections( self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None ) -> t.Set[str]: diff --git a/tests/core/test_context.py b/tests/core/test_context.py index c3d88e205e..50e5f656a7 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None: sushi_context.plan("prod", restate_models=["*missing*"]) +def test_plan_select_model_deleted_model(sushi_context: Context) -> None: + """Selecting a model that has been deleted locally but still exists in the deployed + environment should produce a valid plan with the deletion, not raise PlanError.""" + # Pick a leaf model that can be safely deleted without breaking other models' rendering. + model_name = "sushi.top_waiters" + snapshot = sushi_context.get_snapshot(model_name) + assert snapshot is not None + + # Delete the model file from disk. + model = sushi_context.get_model(model_name) + assert model._path.exists() + model._path.unlink() + + # Reload the context so it no longer knows about the deleted model. + sushi_context.load() + assert model_name not in [m for m in sushi_context.models] + + # Planning with select_models for the deleted model should succeed (not raise PlanError). + plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True) + assert plan is not None + + # The deleted model should appear in removed_snapshots. + removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()} + assert snapshot.name in removed_names + + +def test_plan_select_model_deleted_model_still_rejects_nonexistent( + sushi_context: Context, +) -> None: + """A model that neither exists locally nor in the deployed environment should still + raise PlanError.""" + with pytest.raises( + PlanError, + match="Selector did not return any models. Please check your model selection and try again.", + ): + sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"]) + + def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path): models_dir = pathlib.Path("models") macros_dir = pathlib.Path("macros") diff --git a/tests/core/test_selector_native.py b/tests/core/test_selector_native.py index 5889efadda..8c4886791e 100644 --- a/tests/core/test_selector_native.py +++ b/tests/core/test_selector_native.py @@ -801,6 +801,133 @@ def test_select_models_local_tags_take_precedence_over_remote( ) +def test_expand_model_selections_with_env(mocker: MockerFixture, make_snapshot): + """expand_model_selections_with_env should include models from the deployed environment, + even if they have been deleted locally.""" + local_model = SqlModel( + name="db.local_model", + query=d.parse_one("SELECT 1 AS a"), + ) + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 2 AS b"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + env_name = "test_env" + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.return_value = Environment( + name=env_name, + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + ) + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + local_models[local_model.fqn] = local_model + + selector = NativeSelector(state_reader_mock, local_models) + + # Expanding against local models only should NOT find the deleted model. + assert selector.expand_model_selections(["db.deleted_model"]) == set() + + # Expanding with env should find the deleted model. + result = selector.expand_model_selections_with_env(["db.deleted_model"], env_name) + assert deleted_model.fqn in result + + # Local model should also be reachable. + result = selector.expand_model_selections_with_env(["db.local_model"], env_name) + assert local_model.fqn in result + + # Selecting both should return both. + result = selector.expand_model_selections_with_env( + ["db.deleted_model", "db.local_model"], env_name + ) + assert result == {deleted_model.fqn, local_model.fqn} + + # Wildcard should match env models too. + result = selector.expand_model_selections_with_env(["*_model"], env_name) + assert result == {deleted_model.fqn, local_model.fqn} + + # Non-existent model should return empty. + result = selector.expand_model_selections_with_env(["db.nonexistent"], env_name) + assert result == set() + + +def test_expand_model_selections_with_env_fallback(mocker: MockerFixture, make_snapshot): + """expand_model_selections_with_env should fall back to the fallback environment.""" + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 1 AS a"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + fallback_env = Environment( + name="prod", + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + ) + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.side_effect = ( + lambda name: fallback_env if name == "prod" else None + ) + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + selector = NativeSelector(state_reader_mock, local_models) + + result = selector.expand_model_selections_with_env( + ["db.deleted_model"], "missing_env", fallback_env_name="prod" + ) + assert deleted_model.fqn in result + + +def test_expand_model_selections_with_env_expired(mocker: MockerFixture, make_snapshot): + """expand_model_selections_with_env should ignore expired environments.""" + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 1 AS a"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + expired_env = Environment( + name="test_env", + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + expiration_ts=now_timestamp() - 1, + ) + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.return_value = expired_env + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + selector = NativeSelector(state_reader_mock, local_models) + + result = selector.expand_model_selections_with_env(["db.deleted_model"], "test_env") + assert result == set() + + def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None: assert set(actual) == set(expected) for name, model in actual.items(): From feafd50c0de34eb14783b31edb4cc77d9016c58b Mon Sep 17 00:00:00 2001 From: vinicius Date: Fri, 10 Apr 2026 21:49:06 -0400 Subject: [PATCH 2/2] Fix: allow --select-model to plan a model deletion Refactor select_models to return matched FQNs alongside the model dict so context._plan can detect deletions without a separate env lookup. Remove expand_model_selections_with_env, add mixed selection test coverage. Fixes #5741 Signed-off-by: vinicius --- sqlmesh/core/context.py | 19 ++------ sqlmesh/core/selector.py | 35 ++------------ tests/core/test_selector_native.py | 75 ++++++++++++++++-------------- 3 files changed, 47 insertions(+), 82 deletions(-) diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 3e493bb4ff..f1a7657704 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1605,13 +1605,11 @@ def plan_builder( backfill_models = None models_override: t.Optional[UniqueKeyDict[str, Model]] = None - # FQNs of models that are selected for deletion (present in the deployed environment but - # absent from local files). These are models whose files have been deleted; they will - # appear in context_diff.removed_snapshots rather than as backfill candidates. + selected_fqns: t.Set[str] = set() selected_deletion_fqns: t.Set[str] = set() if select_models: try: - models_override = model_selector.select_models( + models_override, selected_fqns = model_selector.select_models( select_models, environment, fallback_env_name=create_from or c.PROD, @@ -1627,16 +1625,9 @@ def plan_builder( backfill_models = model_selector.expand_model_selections(select_models) if not backfill_models: - # The selection matched nothing locally. Check whether it matched models that exist - # in the deployed environment but have been deleted locally. If so, the selection is - # valid — the deletions will surface in context_diff.removed_snapshots. - env_selected = model_selector.expand_model_selections_with_env( - select_models, - environment, - fallback_env_name=create_from or c.PROD, - ensure_finalized_snapshots=self.config.plan.use_finalized_state, - ) - selected_deletion_fqns = env_selected - set(self._models) + # The selection matched nothing locally. Check whether it matched models + # in the deployed environment that were deleted locally. + selected_deletion_fqns = selected_fqns - set(self._models) expanded_restate_models = None if restate_models is not None: diff --git a/sqlmesh/core/selector.py b/sqlmesh/core/selector.py index f8c9fb6506..54b89d2680 100644 --- a/sqlmesh/core/selector.py +++ b/sqlmesh/core/selector.py @@ -62,7 +62,7 @@ def select_models( target_env_name: str, fallback_env_name: t.Optional[str] = None, ensure_finalized_snapshots: bool = False, - ) -> UniqueKeyDict[str, Model]: + ) -> t.Tuple[UniqueKeyDict[str, Model], t.Set[str]]: """Given a set of selections returns models from the current state with names matching the selection while sourcing the remaining models from the target environment. @@ -76,7 +76,7 @@ def select_models( the environment is not finalized. Returns: - A dictionary of models. + A tuple of (models dict, set of all matched FQNs including env models). """ env_models = self._load_env_models( target_env_name, fallback_env_name, ensure_finalized_snapshots @@ -148,36 +148,7 @@ def get_model(fqn: str) -> t.Optional[Model]: if needs_update: update_model_schemas(dag, models=models, cache_dir=self._cache_dir) - return models - - def expand_model_selections_with_env( - self, - model_selections: t.Iterable[str], - target_env_name: str, - fallback_env_name: t.Optional[str] = None, - ensure_finalized_snapshots: bool = False, - ) -> t.Set[str]: - """Expands model selections against both local models and the target environment. - - This allows selections to match models that have been deleted locally but still - exist in the deployed environment. - - Args: - model_selections: A set of selections. - target_env_name: The name of the target environment. - fallback_env_name: The name of the fallback environment that will be used if the target - environment doesn't exist. - ensure_finalized_snapshots: Whether to source environment snapshots from the latest finalized - environment state, or to use whatever snapshots are in the current environment state even if - the environment is not finalized. - - Returns: - A set of matched model FQNs. - """ - env_models = self._load_env_models( - target_env_name, fallback_env_name, ensure_finalized_snapshots - ) - return self.expand_model_selections(model_selections, models={**env_models, **self._models}) + return models, all_selected_models def _load_env_models( self, diff --git a/tests/core/test_selector_native.py b/tests/core/test_selector_native.py index 8c4886791e..0b741c1f51 100644 --- a/tests/core/test_selector_native.py +++ b/tests/core/test_selector_native.py @@ -309,7 +309,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot): selector = NativeSelector(state_reader_mock, local_models) - selected = selector.select_models(["db.parent"], env_name) + selected, _ = selector.select_models(["db.parent"], env_name) assert selected[local_child.fqn].render_query() != child.render_query() _assert_models_equal( @@ -320,7 +320,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot): }, ) - selected = selector.select_models(["db.child"], env_name) + selected, _ = selector.select_models(["db.child"], env_name) assert selected[local_child.fqn].data_hash == child.data_hash _assert_models_equal( @@ -343,12 +343,12 @@ def test_select_models_missing_env(mocker: MockerFixture, make_snapshot): selector = NativeSelector(state_reader_mock, local_models) - assert selector.select_models([model.name], "missing_env").keys() == {model.fqn} - assert not selector.select_models(["missing"], "missing_env") + assert selector.select_models([model.name], "missing_env")[0].keys() == {model.fqn} + assert not selector.select_models(["missing"], "missing_env")[0] assert selector.select_models( [model.name], "missing_env", fallback_env_name="another_missing_env" - ).keys() == {model.fqn} + )[0].keys() == {model.fqn} state_reader_mock.get_environment.assert_has_calls( [ @@ -789,7 +789,7 @@ def test_select_models_local_tags_take_precedence_over_remote( selector = NativeSelector(state_reader_mock, local_models) - selected = selector.select_models(["tag:a"], env_name) + selected, _ = selector.select_models(["tag:a"], env_name) # both should get selected because they both now have the 'a' tag locally, even though one exists in remote state without the 'a' tag _assert_models_equal( @@ -801,9 +801,9 @@ def test_select_models_local_tags_take_precedence_over_remote( ) -def test_expand_model_selections_with_env(mocker: MockerFixture, make_snapshot): - """expand_model_selections_with_env should include models from the deployed environment, - even if they have been deleted locally.""" +def test_select_models_returns_selected_fqns(mocker: MockerFixture, make_snapshot): + """select_models should return the set of all matched FQNs (including env-only models) + alongside the model dict.""" local_model = SqlModel( name="db.local_model", query=d.parse_one("SELECT 1 AS a"), @@ -835,34 +835,31 @@ def test_expand_model_selections_with_env(mocker: MockerFixture, make_snapshot): selector = NativeSelector(state_reader_mock, local_models) - # Expanding against local models only should NOT find the deleted model. - assert selector.expand_model_selections(["db.deleted_model"]) == set() + # Selecting a deleted model: selected_fqns includes it even though models dict won't. + _, selected_fqns = selector.select_models(["db.deleted_model"], env_name) + assert deleted_model.fqn in selected_fqns - # Expanding with env should find the deleted model. - result = selector.expand_model_selections_with_env(["db.deleted_model"], env_name) - assert deleted_model.fqn in result + # Selecting a local model: selected_fqns includes it. + _, selected_fqns = selector.select_models(["db.local_model"], env_name) + assert local_model.fqn in selected_fqns - # Local model should also be reachable. - result = selector.expand_model_selections_with_env(["db.local_model"], env_name) - assert local_model.fqn in result - - # Selecting both should return both. - result = selector.expand_model_selections_with_env( + # Mixed selection (active + deleted): both appear in selected_fqns. + _, selected_fqns = selector.select_models( ["db.deleted_model", "db.local_model"], env_name ) - assert result == {deleted_model.fqn, local_model.fqn} + assert selected_fqns == {deleted_model.fqn, local_model.fqn} - # Wildcard should match env models too. - result = selector.expand_model_selections_with_env(["*_model"], env_name) - assert result == {deleted_model.fqn, local_model.fqn} + # Wildcard should match both local and env models. + _, selected_fqns = selector.select_models(["*_model"], env_name) + assert selected_fqns == {deleted_model.fqn, local_model.fqn} - # Non-existent model should return empty. - result = selector.expand_model_selections_with_env(["db.nonexistent"], env_name) - assert result == set() + # Non-existent model should not appear. + _, selected_fqns = selector.select_models(["db.nonexistent"], env_name) + assert selected_fqns == set() -def test_expand_model_selections_with_env_fallback(mocker: MockerFixture, make_snapshot): - """expand_model_selections_with_env should fall back to the fallback environment.""" +def test_select_models_selected_fqns_fallback(mocker: MockerFixture, make_snapshot): + """select_models selected_fqns should include env models found via fallback environment.""" deleted_model = SqlModel( name="db.deleted_model", query=d.parse_one("SELECT 1 AS a"), @@ -890,14 +887,14 @@ def test_expand_model_selections_with_env_fallback(mocker: MockerFixture, make_s local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") selector = NativeSelector(state_reader_mock, local_models) - result = selector.expand_model_selections_with_env( + _, selected_fqns = selector.select_models( ["db.deleted_model"], "missing_env", fallback_env_name="prod" ) - assert deleted_model.fqn in result + assert deleted_model.fqn in selected_fqns -def test_expand_model_selections_with_env_expired(mocker: MockerFixture, make_snapshot): - """expand_model_selections_with_env should ignore expired environments.""" +def test_select_models_selected_fqns_expired(mocker: MockerFixture, make_snapshot): + """select_models should not match env models from expired environments.""" deleted_model = SqlModel( name="db.deleted_model", query=d.parse_one("SELECT 1 AS a"), @@ -924,11 +921,17 @@ def test_expand_model_selections_with_env_expired(mocker: MockerFixture, make_sn local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") selector = NativeSelector(state_reader_mock, local_models) - result = selector.expand_model_selections_with_env(["db.deleted_model"], "test_env") - assert result == set() + _, selected_fqns = selector.select_models(["db.deleted_model"], "test_env") + assert selected_fqns == set() -def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None: +def _assert_models_equal( + actual: t.Union[t.Dict[str, Model], t.Tuple[t.Dict[str, Model], t.Set[str]]], + expected: t.Dict[str, Model], +) -> None: + # select_models returns a tuple; unwrap if needed. + if isinstance(actual, tuple): + actual = actual[0] assert set(actual) == set(expected) for name, model in actual.items(): # Use dict() to make Pydantic V2 happy.