Skip to content

Commit fb390b4

Browse files
committed
feat(amu): initial commit
1 parent edce55a commit fb390b4

3 files changed

Lines changed: 208 additions & 0 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""Network Manager Package
2+
3+
Architecture:
4+
AMUManager (manager.py)
5+
└── Main AMU manager its responsible for the AMU backend
6+
Models (models.py)
7+
└── Data classes for type safety
8+
└── Enums for states and types
9+
"""
10+
11+
from .manager import AMUManager
12+
from .models import (
13+
GateInfo,
14+
GateStatus,
15+
MMUState,
16+
)
17+
18+
__all__: list[str] = [
19+
# manager
20+
"AMUManager",
21+
# models
22+
"GateInfo",
23+
"GateStatus",
24+
"MMUState",
25+
]
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import logging
2+
import re
3+
from pathlib import Path
4+
5+
from PyQt6 import QtCore
6+
7+
logger: logging.Logger = logging.getLogger(__name__)
8+
9+
CONFIG_PATH: Path = Path("~/printer_data/config/printer.cfg").expanduser()
10+
AMU_FILES: list[str] = [
11+
"mmu/base/*.cfg",
12+
"mmu/optional/client_macros.cfg",
13+
"filament_manager.cfg",
14+
]
15+
16+
AMU_PATTERNS: list[re.Pattern[str]] = [
17+
re.compile(rf"^(#?)(\[include {re.escape(f)}\])", re.MULTILINE) for f in AMU_FILES
18+
]
19+
20+
21+
class AMUManager(QtCore.QObject):
22+
"""Main manager of the AMU system"""
23+
24+
def __init__(self, parent: QtCore.QObject | None = None) -> None:
25+
super().__init__(parent)
26+
self._amu_state = False
27+
self.__setup_configfile()
28+
29+
def __setup_configfile(self) -> None:
30+
"""
31+
Sets up local configfile variable
32+
33+
Raises:
34+
FileNotFoundError: File Not Found
35+
"""
36+
self._config_filename = CONFIG_PATH
37+
if not self._config_filename.exists():
38+
raise FileNotFoundError(("Config file not found %s", self._config_filename))
39+
40+
def _apply_patterns(self, state: bool) -> bool:
41+
"""Method that comments/uncomments the AMU_FILES from the printer.cfg according with state value
42+
43+
Args:
44+
state (bool): True: Uncomment, False: Comment
45+
46+
Returns:
47+
bool: True: Sucess, False: Failed
48+
"""
49+
# signal to say that it was changed AMU para mainwindow
50+
if self._amu_state == state:
51+
return
52+
53+
replacement = r"\2"
54+
if not state:
55+
replacement = r"#\2"
56+
try:
57+
text: str = self._config_filename.read_text()
58+
for file in AMU_PATTERNS:
59+
text = file.sub(replacement, text)
60+
self._config_filename.write_text(text)
61+
self._amu_state = state
62+
return True
63+
except OSError as e:
64+
logger.error(
65+
"Failed to apply(state=%s) AMU System: could not read/write %s\n%e",
66+
state,
67+
self._config_filename,
68+
e,
69+
)
70+
return False
71+
72+
def toggle_amu_system(self, activate: bool) -> bool:
73+
"""Method that comments/uncomments the AMU_FILES from the printer.cfg according with state value
74+
75+
Args:
76+
state (bool): True: Uncomment, False: Comment
77+
78+
Returns:
79+
bool: True: Sucess, False: Failed
80+
"""
81+
return self._apply_patterns(activate)
82+
83+
84+
if __name__ == "__main__":
85+
manager = AMUManager()
86+
print(manager.toggle_amu_system(True))

BlocksScreen/devices/AMU/models.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import dataclasses
2+
from dataclasses import dataclass
3+
from enum import IntEnum
4+
5+
6+
class GateStatus(IntEnum):
7+
UNKNOWN = 0
8+
AVAILABLE = 1
9+
AVAILABLE_FROM_BUFFER = 2
10+
EMPTY = -1
11+
12+
13+
@dataclass(frozen=True, slots=True)
14+
class GateInfo:
15+
index: int
16+
status: GateStatus
17+
material: str
18+
color: str
19+
color_rgb: tuple[float, float, float]
20+
spool_id: int
21+
22+
23+
@dataclass(frozen=True, slots=True)
24+
class MMUState:
25+
enabled: bool
26+
is_homed: bool
27+
num_gates: int
28+
tool: int
29+
gate: int
30+
filament: str # "Loaded" | "Unloaded" | "Unknown"
31+
action: str
32+
print_state: str
33+
reason_for_pause: str
34+
gates: tuple[GateInfo, ...]
35+
ttg_map: tuple[int, ...]
36+
37+
@classmethod
38+
def from_status(cls, data: dict) -> "MMUState":
39+
num_gates = data.get("num_gates", 0)
40+
41+
statuses = data.get("gate_status", [GateStatus.UNKNOWN] * num_gates)
42+
material = data.get("gate_material", [""] * num_gates)
43+
colors = data.get("gate_color", [""] * num_gates)
44+
rgbs = data.get("gate_color_rgb", [(0.0, 0.0, 0.0)] * num_gates)
45+
spool_ids = data.get("gate_spool_id", [-1] * num_gates)
46+
47+
gates: tuple[GateInfo, ...] = tuple(
48+
GateInfo(
49+
index=i,
50+
status=GateStatus(statuses[i]),
51+
material=material[i],
52+
color=colors[i],
53+
color_rgb=tuple(rgbs[i]),
54+
spool_id=spool_ids[i],
55+
)
56+
for i in range(num_gates)
57+
)
58+
return cls(
59+
enabled=data.get("enabled", False),
60+
is_homed=data.get("is_homed", False),
61+
num_gates=num_gates,
62+
tool=data.get("tool", -1),
63+
gate=data.get("gate", -1),
64+
filament=data.get("filament", "Unknown"),
65+
action=data.get("action", ""),
66+
print_state=data.get("print_state", ""),
67+
reason_for_pause=data.get("reason_for_pause", ""),
68+
gates=gates,
69+
ttg_map=tuple(data.get("ttg_map", [])),
70+
)
71+
72+
def apply_diff(self, diff: dict) -> "MMUState":
73+
gate_keys: set[str] = {
74+
"gate_status",
75+
"gate_material",
76+
"gate_color",
77+
"gate_color_rgb",
78+
"gate_spool_id",
79+
}
80+
if gate_keys.isdisjoint(diff):
81+
# No changes
82+
scalar_fields = {
83+
k: v for k, v in diff.items() if k in MMUState.__dataclass_fields__
84+
}
85+
return dataclasses.replace(self, **scalar_fields)
86+
87+
# Gate arrays changed — need full rebuild, but we lost the raw arrays
88+
# Pass current gate data + diff into from_status
89+
gate_data = {
90+
"gate_status": [g.status for g in self.gates],
91+
"gate_material": [g.material for g in self.gates],
92+
"gate_color": [g.color for g in self.gates],
93+
"gate_color_rgb": [g.color_rgb for g in self.gates],
94+
"gate_spool_id": [g.spool_id for g in self.gates],
95+
}
96+
merged = {**dataclasses.asdict(self), **gate_data, **diff}
97+
return MMUState.from_status(merged)

0 commit comments

Comments
 (0)