Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,9 +1605,11 @@ def plan_builder(
backfill_models = None

models_override: t.Optional[UniqueKeyDict[str, Model]] = None
selected_fqns: t.Set[str] = set()
selected_deletion_fqns: t.Set[str] = set()
if select_models:
try:
models_override = model_selector.select_models(
models_override, selected_fqns = model_selector.select_models(
select_models,
environment,
fallback_env_name=create_from or c.PROD,
Expand All @@ -1622,12 +1624,17 @@ def plan_builder(
# Only backfill selected models unless explicitly specified.
backfill_models = model_selector.expand_model_selections(select_models)

if not backfill_models:
# The selection matched nothing locally. Check whether it matched models
# in the deployed environment that were deleted locally.
selected_deletion_fqns = selected_fqns - set(self._models)

expanded_restate_models = None
if restate_models is not None:
expanded_restate_models = model_selector.expand_model_selections(restate_models)

if (restate_models is not None and not expanded_restate_models) or (
backfill_models is not None and not backfill_models
backfill_models is not None and not backfill_models and not selected_deletion_fqns
):
raise PlanError(
"Selector did not return any models. Please check your model selection and try again."
Expand All @@ -1636,7 +1643,7 @@ def plan_builder(
if always_include_local_changes is None:
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
force_no_diff = restate_models is not None or (
backfill_models is not None and not backfill_models
backfill_models is not None and not backfill_models and not selected_deletion_fqns
)
else:
force_no_diff = not always_include_local_changes
Expand Down
60 changes: 36 additions & 24 deletions sqlmesh/core/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def select_models(
target_env_name: str,
fallback_env_name: t.Optional[str] = None,
ensure_finalized_snapshots: bool = False,
) -> UniqueKeyDict[str, Model]:
) -> t.Tuple[UniqueKeyDict[str, Model], t.Set[str]]:
"""Given a set of selections returns models from the current state with names matching the
selection while sourcing the remaining models from the target environment.

Expand All @@ -76,29 +76,11 @@ def select_models(
the environment is not finalized.

Returns:
A dictionary of models.
A tuple of (models dict, set of all matched FQNs including env models).
"""
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
if target_env and target_env.expired:
target_env = None

if not target_env and fallback_env_name:
target_env = self._state_reader.get_environment(
Environment.sanitize_name(fallback_env_name)
)

env_models: t.Dict[str, Model] = {}
if target_env:
environment_snapshot_infos = (
target_env.snapshots
if not ensure_finalized_snapshots
else target_env.finalized_or_current_snapshots
)
env_models = {
s.name: s.model
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
if s.is_model
}
env_models = self._load_env_models(
target_env_name, fallback_env_name, ensure_finalized_snapshots
)

all_selected_models = self.expand_model_selections(
model_selections, models={**env_models, **self._models}
Expand Down Expand Up @@ -166,7 +148,37 @@ def get_model(fqn: str) -> t.Optional[Model]:
if needs_update:
update_model_schemas(dag, models=models, cache_dir=self._cache_dir)

return models
return models, all_selected_models

def _load_env_models(
self,
target_env_name: str,
fallback_env_name: t.Optional[str] = None,
ensure_finalized_snapshots: bool = False,
) -> t.Dict[str, "Model"]:
"""Loads models from the target environment, falling back to the fallback environment if needed."""
target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name))
if target_env and target_env.expired:
target_env = None

if not target_env and fallback_env_name:
target_env = self._state_reader.get_environment(
Environment.sanitize_name(fallback_env_name)
)

if not target_env:
return {}

environment_snapshot_infos = (
target_env.snapshots
if not ensure_finalized_snapshots
else target_env.finalized_or_current_snapshots
)
return {
s.name: s.model
for s in self._state_reader.get_snapshots(environment_snapshot_infos).values()
if s.is_model
}

def expand_model_selections(
self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None
Expand Down
38 changes: 38 additions & 0 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None:
sushi_context.plan("prod", restate_models=["*missing*"])


def test_plan_select_model_deleted_model(sushi_context: Context) -> None:
"""Selecting a model that has been deleted locally but still exists in the deployed
environment should produce a valid plan with the deletion, not raise PlanError."""
# Pick a leaf model that can be safely deleted without breaking other models' rendering.
model_name = "sushi.top_waiters"
snapshot = sushi_context.get_snapshot(model_name)
assert snapshot is not None

# Delete the model file from disk.
model = sushi_context.get_model(model_name)
assert model._path.exists()
model._path.unlink()

# Reload the context so it no longer knows about the deleted model.
sushi_context.load()
assert model_name not in [m for m in sushi_context.models]

# Planning with select_models for the deleted model should succeed (not raise PlanError).
plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True)
assert plan is not None

# The deleted model should appear in removed_snapshots.
removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()}
assert snapshot.name in removed_names


def test_plan_select_model_deleted_model_still_rejects_nonexistent(
sushi_context: Context,
) -> None:
"""A model that neither exists locally nor in the deployed environment should still
raise PlanError."""
with pytest.raises(
PlanError,
match="Selector did not return any models. Please check your model selection and try again.",
):
sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"])


def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path):
models_dir = pathlib.Path("models")
macros_dir = pathlib.Path("macros")
Expand Down
144 changes: 137 additions & 7 deletions tests/core/test_selector_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot):

selector = NativeSelector(state_reader_mock, local_models)

selected = selector.select_models(["db.parent"], env_name)
selected, _ = selector.select_models(["db.parent"], env_name)
assert selected[local_child.fqn].render_query() != child.render_query()

_assert_models_equal(
Expand All @@ -320,7 +320,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot):
},
)

selected = selector.select_models(["db.child"], env_name)
selected, _ = selector.select_models(["db.child"], env_name)
assert selected[local_child.fqn].data_hash == child.data_hash

_assert_models_equal(
Expand All @@ -343,12 +343,12 @@ def test_select_models_missing_env(mocker: MockerFixture, make_snapshot):

selector = NativeSelector(state_reader_mock, local_models)

assert selector.select_models([model.name], "missing_env").keys() == {model.fqn}
assert not selector.select_models(["missing"], "missing_env")
assert selector.select_models([model.name], "missing_env")[0].keys() == {model.fqn}
assert not selector.select_models(["missing"], "missing_env")[0]

assert selector.select_models(
[model.name], "missing_env", fallback_env_name="another_missing_env"
).keys() == {model.fqn}
)[0].keys() == {model.fqn}

state_reader_mock.get_environment.assert_has_calls(
[
Expand Down Expand Up @@ -789,7 +789,7 @@ def test_select_models_local_tags_take_precedence_over_remote(

selector = NativeSelector(state_reader_mock, local_models)

selected = selector.select_models(["tag:a"], env_name)
selected, _ = selector.select_models(["tag:a"], env_name)

# both should get selected because they both now have the 'a' tag locally, even though one exists in remote state without the 'a' tag
_assert_models_equal(
Expand All @@ -801,7 +801,137 @@ def test_select_models_local_tags_take_precedence_over_remote(
)


def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None:
def test_select_models_returns_selected_fqns(mocker: MockerFixture, make_snapshot):
"""select_models should return the set of all matched FQNs (including env-only models)
alongside the model dict."""
local_model = SqlModel(
name="db.local_model",
query=d.parse_one("SELECT 1 AS a"),
)
deleted_model = SqlModel(
name="db.deleted_model",
query=d.parse_one("SELECT 2 AS b"),
)

deleted_model_snapshot = make_snapshot(deleted_model)
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

env_name = "test_env"

state_reader_mock = mocker.Mock()
state_reader_mock.get_environment.return_value = Environment(
name=env_name,
snapshots=[deleted_model_snapshot.table_info],
start_at="2023-01-01",
end_at="2023-02-01",
plan_id="test_plan_id",
)
state_reader_mock.get_snapshots.return_value = {
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
}

local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
local_models[local_model.fqn] = local_model

selector = NativeSelector(state_reader_mock, local_models)

# Selecting a deleted model: selected_fqns includes it even though models dict won't.
_, selected_fqns = selector.select_models(["db.deleted_model"], env_name)
assert deleted_model.fqn in selected_fqns

# Selecting a local model: selected_fqns includes it.
_, selected_fqns = selector.select_models(["db.local_model"], env_name)
assert local_model.fqn in selected_fqns

# Mixed selection (active + deleted): both appear in selected_fqns.
_, selected_fqns = selector.select_models(
["db.deleted_model", "db.local_model"], env_name
)
assert selected_fqns == {deleted_model.fqn, local_model.fqn}

# Wildcard should match both local and env models.
_, selected_fqns = selector.select_models(["*_model"], env_name)
assert selected_fqns == {deleted_model.fqn, local_model.fqn}

# Non-existent model should not appear.
_, selected_fqns = selector.select_models(["db.nonexistent"], env_name)
assert selected_fqns == set()


def test_select_models_selected_fqns_fallback(mocker: MockerFixture, make_snapshot):
"""select_models selected_fqns should include env models found via fallback environment."""
deleted_model = SqlModel(
name="db.deleted_model",
query=d.parse_one("SELECT 1 AS a"),
)

deleted_model_snapshot = make_snapshot(deleted_model)
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

fallback_env = Environment(
name="prod",
snapshots=[deleted_model_snapshot.table_info],
start_at="2023-01-01",
end_at="2023-02-01",
plan_id="test_plan_id",
)

state_reader_mock = mocker.Mock()
state_reader_mock.get_environment.side_effect = (
lambda name: fallback_env if name == "prod" else None
)
state_reader_mock.get_snapshots.return_value = {
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
}

local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
selector = NativeSelector(state_reader_mock, local_models)

_, selected_fqns = selector.select_models(
["db.deleted_model"], "missing_env", fallback_env_name="prod"
)
assert deleted_model.fqn in selected_fqns


def test_select_models_selected_fqns_expired(mocker: MockerFixture, make_snapshot):
"""select_models should not match env models from expired environments."""
deleted_model = SqlModel(
name="db.deleted_model",
query=d.parse_one("SELECT 1 AS a"),
)

deleted_model_snapshot = make_snapshot(deleted_model)
deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

expired_env = Environment(
name="test_env",
snapshots=[deleted_model_snapshot.table_info],
start_at="2023-01-01",
end_at="2023-02-01",
plan_id="test_plan_id",
expiration_ts=now_timestamp() - 1,
)

state_reader_mock = mocker.Mock()
state_reader_mock.get_environment.return_value = expired_env
state_reader_mock.get_snapshots.return_value = {
deleted_model_snapshot.snapshot_id: deleted_model_snapshot,
}

local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
selector = NativeSelector(state_reader_mock, local_models)

_, selected_fqns = selector.select_models(["db.deleted_model"], "test_env")
assert selected_fqns == set()


def _assert_models_equal(
actual: t.Union[t.Dict[str, Model], t.Tuple[t.Dict[str, Model], t.Set[str]]],
expected: t.Dict[str, Model],
) -> None:
# select_models returns a tuple; unwrap if needed.
if isinstance(actual, tuple):
actual = actual[0]
assert set(actual) == set(expected)
for name, model in actual.items():
# Use dict() to make Pydantic V2 happy.
Expand Down