Skip to content

Commit 1564b9e

Browse files
committed
added testing and created STUB to start responding to moonraker updates with the corresponding AMU method
1 parent 1b3ea8e commit 1564b9e

7 files changed

Lines changed: 319 additions & 12 deletions

File tree

BlocksScreen/devices/amu/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Network Manager Package
1+
"""AMU / HAPPY-Hare MMU package
22
33
Architecture:
44
AMUManager (manager.py)

BlocksScreen/devices/amu/manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,16 @@ def select_tool(self, tool: int) -> None:
199199
"""
200200
self.run_gcode_signal.emit(f"MMU_CHANGE_TOOL TOOL={tool}")
201201

202-
@QtCore.pyqtSlot(dict, name="update_mmu_state")
203-
def update_mmu_state(self, data: dict) -> None:
202+
def update_mmu_state(self, data: dict, name: str = "") -> None:
204203
"""Receive an MMU status dict from Moonraker and update internal state.
205204
206205
Called with either a full status response (on connect) or a diff
207206
(from notify_status_update). Builds or updates the MMUState and
208-
emits mmu_state_changed
207+
emits mmu_state_changed.
209208
210209
Args:
211-
data (dict): Raw MMU status or diff dict from Moonraker
210+
data: Raw MMU status or diff dict from Moonraker.
211+
name: Moonraker object name suffix (always empty for ``mmu``).
212212
"""
213213
if self._mmu_state is None:
214214
self._mmu_state = MMUState.from_status(data)

BlocksScreen/lib/panels/mainWindow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def __init__(self):
201201
self.query_object_list.connect(self.utilitiesPanel.on_object_list)
202202
self.printer.extruder_update.connect(self.on_extruder_update)
203203
self.printer.heater_bed_update.connect(self.on_heater_bed_update)
204-
self.printer.mmu_updated.connect(self.amu_manager.update_mmu_state)
204+
self.printer.register_callback("mmu", self.amu_manager.update_mmu_state)
205205
self.amu_manager.run_gcode_signal.connect(self.ws.api.run_gcode)
206206
self.run_gcode_signal.connect(self.ws.api.run_gcode)
207207

BlocksScreen/lib/printer.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ class Printer(QtCore.QObject):
7979
z_tilt_update: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal(
8080
str, bool, name="z_tilt_update"
8181
)
82-
mmu_updated: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal(
83-
object, name="mmu-updated"
84-
)
85-
8682
config_subscription: typing.ClassVar[QtCore.pyqtSignal] = QtCore.pyqtSignal(
8783
[dict],
8884
[list],
@@ -107,6 +103,7 @@ class Printer(QtCore.QObject):
107103
printer_busy: bool = False
108104
current_loaded_file: str = ""
109105
current_loaded_file_metadata: str = ""
106+
110107

111108
def __init__(self, parent: QtCore.QObject, ws: MoonWebSocket, /) -> None:
112109
super(Printer, self).__init__(parent)
@@ -130,6 +127,7 @@ def __init__(self, parent: QtCore.QObject, ws: MoonWebSocket, /) -> None:
130127
self.request_available_objects_signal.connect(self.ws.api.get_available_objects)
131128
self.request_object_subscription_signal.connect(self.ws.api.object_subscription)
132129
self.query_printer_object.connect(self.ws.api.object_query)
130+
self._callbacks: dict[str, typing.Callable[[dict, str], None]] = {}
133131

134132
def clear_printer_objs(self) -> None:
135133
"""Clear all tracking of printer object"""
@@ -143,6 +141,34 @@ def clear_printer_objs(self) -> None:
143141
self.current_loaded_file = ""
144142
self.current_loaded_file_metadata = ""
145143

144+
145+
def __inject_callback(
146+
self, object_type: str, callback: typing.Callable[[dict, str], None]
147+
) -> None:
148+
"""Internal: insert a validated callback into the dispatch table."""
149+
if not object_type or not callable(callback):
150+
logger.warning(
151+
"register_callback: invalid args object_type=%r callable=%s",
152+
object_type,
153+
callable(callback),
154+
)
155+
return
156+
self._callbacks[object_type] = callback
157+
158+
def register_callback(
159+
self, object_type: str, callback: typing.Callable[[dict, str], None]
160+
) -> None:
161+
"""Register an external callback for a Moonraker object type.
162+
163+
When Moonraker sends an update for ``object_type``, ``callback(values, name)``
164+
is called directly — no intermediate signal required.
165+
166+
Args:
167+
object_type: Moonraker object key, e.g. ``"mmu"`` or ``"filament_switch_sensor"``.
168+
callback: Callable with signature ``(values: dict, name: str) -> None``.
169+
"""
170+
self.__inject_callback(object_type, callback)
171+
146172
@QtCore.pyqtSlot(str, name="on_klippy_status")
147173
def on_klippy_status(self, state: str):
148174
"""Handles klippy update status
@@ -275,6 +301,9 @@ def _check_callback(self, name: str, values: dict) -> bool:
275301
_object_type, _object_name = tuple(_split + [""] * max(0, 2 - len(_split)))
276302
if name.startswith("extruder"):
277303
_object_name = name
304+
if _object_type in self._callbacks:
305+
self._callbacks[_object_type](values, _object_name)
306+
return True
278307
if hasattr(self, f"_{_object_type}_object_updated"):
279308
_callback = getattr(self, f"_{_object_type}_object_updated")
280309
if callable(_callback):
@@ -745,5 +774,5 @@ def _load_filament_object_updated(self, values: dict, name: str) -> None:
745774
if "state" in values.keys():
746775
self.load_filament_update[bool].emit(values["state"])
747776

748-
def _mmu_object_updated(self, values: dict, name: str = "") -> None:
749-
self.mmu_updated.emit(values)
777+
778+

tests/amu/__init__.py

Whitespace-only changes.

tests/amu/test_manager_unit.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
"""Unit tests for BlocksScreen.devices.amu.manager."""
2+
3+
import pytest
4+
from unittest.mock import patch
5+
6+
from BlocksScreen.devices.amu.manager import AMUManager
7+
from BlocksScreen.devices.amu.models import MMUState
8+
9+
_COMMENTED_CFG = """\
10+
#[include mmu/base/*.cfg]
11+
#[include mmu/optional/client_macros.cfg]
12+
#[include filament_manager.cfg]
13+
"""
14+
15+
_UNCOMMENTED_CFG = """\
16+
[include mmu/base/*.cfg]
17+
[include mmu/optional/client_macros.cfg]
18+
[include filament_manager.cfg]
19+
"""
20+
21+
_FULL_STATUS: dict = {
22+
"enabled": True,
23+
"is_homed": True,
24+
"num_gates": 2,
25+
"tool": 0,
26+
"gate": 0,
27+
"filament": "Loaded",
28+
"action": "Idle",
29+
"print_state": "printing",
30+
"reason_for_pause": "",
31+
"gate_status": [1, -1],
32+
"gate_material": ["PLA", ""],
33+
"gate_color": ["ff0000", ""],
34+
"gate_color_rgb": [(1.0, 0.0, 0.0), (0.0, 0.0, 0.0)],
35+
"gate_spool_id": [42, -1],
36+
"ttg_map": [0, 1],
37+
}
38+
39+
40+
@pytest.fixture
41+
def manager(tmp_path, qapp):
42+
"""AMUManager with no config file (patched to a non-existent path)."""
43+
missing = tmp_path / "nonexistent.cfg"
44+
with patch("BlocksScreen.devices.amu.manager.CONFIG_PATH", missing):
45+
yield AMUManager()
46+
47+
48+
@pytest.fixture
49+
def manager_with_cfg(tmp_path, qapp):
50+
"""AMUManager pointing at a temp printer.cfg with commented includes."""
51+
cfg = tmp_path / "printer.cfg"
52+
cfg.write_text(_COMMENTED_CFG)
53+
with patch("BlocksScreen.devices.amu.manager.CONFIG_PATH", cfg):
54+
yield AMUManager(), cfg
55+
56+
57+
class TestAMUManagerInit:
58+
def test_initial_state_is_none(self, manager):
59+
assert manager.get_state() is None
60+
61+
def test_initial_amu_inactive(self, manager):
62+
assert manager.is_amu_active() is False
63+
64+
def test_no_config_file_sets_filename_none(self, manager):
65+
assert manager._config_filename is None
66+
67+
68+
class TestToggleAMUSystem:
69+
def test_no_config_emits_false(self, manager, qtbot):
70+
with qtbot.waitSignal(manager.amu_toggled) as blocker:
71+
manager.toggle_amu_system(True)
72+
assert blocker.args == [False]
73+
74+
def test_same_state_emits_false(self, manager_with_cfg, qtbot):
75+
mgr, _ = manager_with_cfg
76+
# _amu_state starts False; toggling to False again → no change
77+
with qtbot.waitSignal(mgr.amu_toggled) as blocker:
78+
mgr.toggle_amu_system(False)
79+
assert blocker.args == [False]
80+
81+
def test_activate_uncomments_includes(self, manager_with_cfg, qtbot):
82+
mgr, cfg = manager_with_cfg
83+
with qtbot.waitSignal(mgr.amu_toggled) as blocker:
84+
mgr.toggle_amu_system(True)
85+
assert blocker.args == [True]
86+
assert cfg.read_text() == _UNCOMMENTED_CFG
87+
88+
def test_deactivate_comments_includes(self, manager_with_cfg, qtbot):
89+
mgr, cfg = manager_with_cfg
90+
cfg.write_text(_UNCOMMENTED_CFG)
91+
mgr._amu_state = True # align internal state with file
92+
with qtbot.waitSignal(mgr.amu_toggled) as blocker:
93+
mgr.toggle_amu_system(False)
94+
assert blocker.args == [True]
95+
assert cfg.read_text() == _COMMENTED_CFG
96+
97+
def test_activate_sets_amu_active(self, manager_with_cfg, qtbot):
98+
mgr, _ = manager_with_cfg
99+
with qtbot.waitSignal(mgr.amu_toggled):
100+
mgr.toggle_amu_system(True)
101+
assert mgr.is_amu_active() is True
102+
103+
104+
class TestUpdateMMUState:
105+
def test_first_call_emits_state(self, manager, qtbot):
106+
with qtbot.waitSignal(manager.mmu_state_changed) as blocker:
107+
manager.update_mmu_state(_FULL_STATUS)
108+
assert isinstance(blocker.args[0], MMUState)
109+
110+
def test_first_call_stores_state(self, manager):
111+
manager.update_mmu_state(_FULL_STATUS)
112+
state = manager.get_state()
113+
assert state is not None
114+
assert state.num_gates == 2
115+
assert state.enabled is True
116+
117+
def test_diff_updates_scalar(self, manager, qtbot):
118+
manager.update_mmu_state(_FULL_STATUS)
119+
with qtbot.waitSignal(manager.mmu_state_changed) as blocker:
120+
manager.update_mmu_state({"tool": 1, "filament": "Unloaded"})
121+
state = blocker.args[0]
122+
assert state.tool == 1
123+
assert state.filament == "Unloaded"
124+
125+
def test_diff_preserves_unchanged_fields(self, manager):
126+
manager.update_mmu_state(_FULL_STATUS)
127+
manager.update_mmu_state({"tool": 1})
128+
assert manager.get_state().num_gates == 2
129+
assert manager.get_state().gates[0].material == "PLA"
130+
131+
def test_each_call_emits_signal(self, manager, qtbot):
132+
manager.update_mmu_state(_FULL_STATUS)
133+
with qtbot.waitSignal(manager.mmu_state_changed):
134+
manager.update_mmu_state({"tool": 1})
135+
136+
137+
class TestGcodeSignals:
138+
def test_set_gate_info(self, manager, qtbot):
139+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
140+
manager.set_gate_info(0, "PLA", "ff0000", 42)
141+
assert blocker.args == ["MMU_GATE_MAP gate=0 MATERIAL=PLA COLOR=ff0000 SPOOLID=42"]
142+
143+
def test_set_gate_material(self, manager, qtbot):
144+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
145+
manager.set_gate_material(1, "PETG")
146+
assert blocker.args == ["MMU_GATE_MAP gate=1 MATERIAL=PETG"]
147+
148+
def test_set_gate_color(self, manager, qtbot):
149+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
150+
manager.set_gate_color(2, "00ff00")
151+
assert blocker.args == ["MMU_GATE_MAP gate=2 COLOR=00ff00"]
152+
153+
def test_set_gate_spool(self, manager, qtbot):
154+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
155+
manager.set_gate_spool(0, 7)
156+
assert blocker.args == ["MMU_GATE_MAP gate=0 SPOOLID=7"]
157+
158+
def test_home_mmu(self, manager, qtbot):
159+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
160+
manager.home_mmu()
161+
assert blocker.args == ["MMU_HOME"]
162+
163+
def test_reset_mmu(self, manager, qtbot):
164+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
165+
manager.reset_mmu()
166+
assert blocker.args == ["MMU_RESET"]
167+
168+
def test_load_gate(self, manager, qtbot):
169+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
170+
manager.load_gate(3)
171+
assert blocker.args == ["MMU_SELECT gate=3\nMMU_LOAD"]
172+
173+
def test_unload(self, manager, qtbot):
174+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
175+
manager.unload()
176+
assert blocker.args == ["MMU_UNLOAD"]
177+
178+
def test_eject_gate(self, manager, qtbot):
179+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
180+
manager.eject_gate(1)
181+
assert blocker.args == ["MMU_EJECT GATE=1"]
182+
183+
def test_eject_all_gates(self, manager, qtbot):
184+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
185+
manager.eject_all_gates(3)
186+
assert blocker.args == ["MMU_EJECT GATE=0\nMMU_EJECT GATE=1\nMMU_EJECT GATE=2"]
187+
188+
def test_select_tool(self, manager, qtbot):
189+
with qtbot.waitSignal(manager.run_gcode_signal) as blocker:
190+
manager.select_tool(2)
191+
assert blocker.args == ["MMU_CHANGE_TOOL TOOL=2"]

tests/amu/test_models_unit.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Unit tests for BlocksScreen.devices.amu.models."""
2+
3+
from BlocksScreen.devices.amu.models import GateInfo, GateStatus, MMUState
4+
5+
class TestGateStatus:
6+
def test_values(self):
7+
assert GateStatus.UNKNOWN == 0
8+
assert GateStatus.AVAILABLE == 1
9+
assert GateStatus.AVAILABLE_FROM_BUFFER == 2
10+
assert GateStatus.EMPTY == -1
11+
12+
def test_is_int_enum(self):
13+
assert isinstance(GateStatus.AVAILABLE, int)
14+
15+
class TestGateInfo:
16+
def _make(self, status: GateStatus) -> GateInfo:
17+
return GateInfo(index=0, status=status, material="PLA", color="ff0000", color_rgb=(1.0,0.0,0.0), spool_id=1)
18+
def test_is_available_true_for_available(self):
19+
assert self._make(GateStatus.AVAILABLE).is_available is True
20+
def test_is_available_true_for_buffer(self):
21+
assert self._make(GateStatus.AVAILABLE_FROM_BUFFER).is_available is True
22+
def test_is_available_false_for_empty(self):
23+
assert self._make(GateStatus.EMPTY).is_available is False
24+
def test_is_available_false_for_unknown(self):
25+
assert self._make(GateStatus.UNKNOWN).is_available is False
26+
27+
class TestMMUState:
28+
def _full_status(self, num_gates=2) -> dict:
29+
return {
30+
"enabled": True,
31+
"is_homed": True,
32+
"num_gates": num_gates,
33+
"tool": 0,
34+
"gate": 0,
35+
"filament": "Loaded",
36+
"action": "Idle",
37+
"print_state": "printing",
38+
"reason_for_pause": "",
39+
"gate_status": [1, -1],
40+
"gate_material": ["PLA", ""],
41+
"gate_color": ["ff0000", ""],
42+
"gate_color_rgb": [(1.0, 0.0, 0.0), (0.0, 0.0, 0.0)],
43+
"gate_spool_id": [42, -1],
44+
"ttg_map": [0, 1],
45+
}
46+
def test_from_status_builds_correctly(self):
47+
state = MMUState.from_status(self._full_status())
48+
assert state.num_gates == 2
49+
assert state.enabled == True
50+
assert len(state.gates) == 2
51+
assert state.gates[0].material == "PLA"
52+
assert state.gates[1].status == GateStatus.EMPTY
53+
assert state.ttg_map == (0, 1)
54+
def test_from_status_empty_dict_uses_default(self):
55+
state = MMUState.from_status({})
56+
assert state.num_gates == 0
57+
assert state.enabled == False
58+
assert state.gates == ()
59+
def test_is_paused_true(self):
60+
state = MMUState.from_status({**self._full_status(),"print_state": "pause"})
61+
assert state.is_paused is True
62+
def test_is_paused_false(self):
63+
state = MMUState.from_status(self._full_status())
64+
assert state.is_paused is False
65+
def test_current_gate_info(self):
66+
state = MMUState.from_status(self._full_status())
67+
assert state.current_gate_info is state.gates[0]
68+
def test_currrent_gate_info_none_when_no_gate(self):
69+
state = MMUState.from_status({**self._full_status(), "gate": -1})
70+
assert state.current_gate_info is None
71+
def test_gate_for_tool(self):
72+
state = MMUState.from_status(self._full_status())
73+
assert state.gate_for_tool(0) == 0
74+
assert state.gate_for_tool(1) == 1
75+
def test_gate_for_tool_out_of_range(self):
76+
state = MMUState.from_status(self._full_status())
77+
assert state.gate_for_tool(99) == -1
78+
def test_apply_diff_scalar(self):
79+
state = MMUState.from_status(self._full_status())
80+
updated = state.apply_diff({"tool": 1, "filament": "Unloaded"})
81+
assert updated.tool == 1
82+
assert updated.filament == "Unloaded"
83+
assert updated.gates == state.gates
84+
def test_apply_diff_gate_keys(self):
85+
state = MMUState.from_status(self._full_status())
86+
updated = state.apply_diff({"gate_status": [1, 1]})
87+
assert updated.gates[1].status == GateStatus.AVAILABLE

0 commit comments

Comments
 (0)