diff --git a/src/pykmp/__init__.py b/src/pykmp/__init__.py index 8af7c39..c2e7228 100644 --- a/src/pykmp/__init__.py +++ b/src/pykmp/__init__.py @@ -19,6 +19,7 @@ RegisterData, RegisterID, ) +from .registers import RegisterOutput # `PySerialClientCommunicator` is intentionally not re-exported from the package root. # Doing so would make the optional `pyserial` dependency an unconditional import-time diff --git a/src/pykmp/constants.py b/src/pykmp/constants.py index eb0955b..e6d8069 100644 --- a/src/pykmp/constants.py +++ b/src/pykmp/constants.py @@ -106,7 +106,7 @@ 267: "Cooling energy E3 hires", 346: "Module SW rev", 347: "Customer number", - 348: "Date and Time", # TODO: unknown unit 79, 28591984415535 + 348: "Date and Time", 355: "COP Year", 362: "Tariff TA4", 364: "Heat energy A1", # Heat energy with discount A1, t2 < t5 limit @@ -235,6 +235,7 @@ 64: "Datetime", 65: "imp/l", 66: "l/imp", + 79: "DST YY-MM-DD hh:mm:ss", 85: "%RH", 86: "%O\N{SUBSCRIPT TWO}", 87: "m/s", diff --git a/src/pykmp/registers.py b/src/pykmp/registers.py new file mode 100644 index 0000000..d44cc5e --- /dev/null +++ b/src/pykmp/registers.py @@ -0,0 +1,84 @@ +# SPDX-FileCopyrightText: 2023 Gert van Dijk +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import decimal +from typing import Final + +import attrs + +from . import codec, constants, messages + + +REGISTERS_NAMES_LEN_MAX: Final[int] = max( + len(name) for name in constants.REGISTERS.values() +) + + +@attrs.define(kw_only=True) +class RegisterOutput: + id_int: int + id_hex: str = attrs.field(init=False) + name: str = attrs.field(init=False) + unit_int: int + unit_hex: str = attrs.field(init=False) + unit_str: str = attrs.field(init=False) + value_float: float = attrs.field(init=False, default=None) + value_str: str = attrs.field() # best: uses decimal.Decimal without loss + value_dec: decimal.Decimal = attrs.field() + + def __attrs_post_init__(self) -> None: + self.id_hex = f"0x{self.id_int:04X}" + self.unit_hex = f"0x{self.unit_int:02X}" + self.name = constants.REGISTERS.get(self.id_int, f"") + self.unit_str = constants.UNITS_NAMES.get( + self.unit_int, f"" + ) + if self.value_dec is not None: + self.value_float = float(self.value_dec) + self.value_str = str(self.value_dec) + + @classmethod + def from_register_data(cls, reg: messages.RegisterData) -> Self: + value_dec = None + value_str = None + match reg.unit: + case 0x2f: + # hh:mm:ss + d = int.from_bytes(reg.value[2:], 'big') + value_str = f'{(d//10000):02}:{(d // 100 % 100):02}:{(d % 100):02}' + pass + case 0x30: + # yy-mm-dd + d = int.from_bytes(reg.value[2:], 'big') + value_str = f'{(2000 + d//10000):02}-{(d // 100 % 100):02}-{(d % 100):02}' + pass + case 0x32: + # mm-dd + d = int.from_bytes(reg.value[2:], 'big') + value_str = f'{(d // 100 % 100):02}-{(d % 100):02}' + case 0x4f: + # DST yy:mm:dd hh:mm:ss + dst = reg.value[2] + value_str = f'{(2000 + reg.value[3]):02}-{reg.value[4]:02}-{reg.value[5]:02} ' \ + + f'{reg.value[6]:02}:{reg.value[7]:02}:{reg.value[8]:02}' \ + + f'{"+" if dst > 0 else "-"}{(dst // 60):02}:{(dst % 60):02}' + case 0x36: + # ASCII + value_str = bytes(reg.value[2:]).decode('ascii') + case _: + value_dec = codec.FloatCodec.decode(reg.value) + return cls( + id_int=reg.id_, + unit_int=reg.unit, + value_dec=value_dec, + value_str=value_str, + ) + + def to_pretty_line(self) -> str: + return ( + f"{self.id_int!r:>4} → {self.name:<{REGISTERS_NAMES_LEN_MAX}} = " + f"{self.value_str} {self.unit_str}" + ) diff --git a/src/pykmp/tool/__main__.py b/src/pykmp/tool/__main__.py index 2e5e169..e01f1c9 100644 --- a/src/pykmp/tool/__main__.py +++ b/src/pykmp/tool/__main__.py @@ -13,7 +13,7 @@ import click import pykmp -from pykmp import client, codec, constants, messages +from pykmp import client, constants, messages, registers if TYPE_CHECKING: from collections.abc import Collection, Sequence @@ -147,49 +147,6 @@ def get_serial(ctx: click.Context) -> None: click.echo(f"Meter serial is: {response.serial}") -REGISTERS_NAMES_LEN_MAX: Final[int] = max( - len(name) for name in constants.REGISTERS.values() -) - - -@attrs.define(kw_only=True) -class RegisterOutput: - id_int: int - id_hex: str = attrs.field(init=False) - name: str = attrs.field(init=False) - unit_int: int - unit_hex: str = attrs.field(init=False) - unit_str: str = attrs.field(init=False) - value_float: float = attrs.field(init=False) - value_str: str = attrs.field(init=False) # best: uses decimal.Decimal without loss - value_dec: decimal.Decimal - - def __attrs_post_init__(self) -> None: - self.id_hex = f"0x{self.id_int:04X}" - self.unit_hex = f"0x{self.unit_int:02X}" - self.name = constants.REGISTERS.get(self.id_int, f"") - self.unit_str = constants.UNITS_NAMES.get( - self.unit_int, f"" - ) - self.value_float = float(self.value_dec) - self.value_str = str(self.value_dec) - - @classmethod - def from_register_data(cls, reg: messages.RegisterData) -> Self: - value_dec = codec.FloatCodec.decode(reg.value) - return cls( - id_int=reg.id_, - unit_int=reg.unit, - value_dec=value_dec, - ) - - def to_pretty_line(self) -> str: - return ( - f"{self.id_int!r:>4} → {self.name:<{REGISTERS_NAMES_LEN_MAX}} = " - f"{self.value_str} {self.unit_str}" - ) - - def warn_registers_unknowns( registers: Collection[messages.RegisterData], ) -> None: @@ -279,7 +236,7 @@ def get_register( warn_registers_unknowns(response.registers.values()) outputs = ( - RegisterOutput.from_register_data(reg) for reg in response.registers.values() + registers.RegisterOutput.from_register_data(reg) for reg in response.registers.values() ) match output_format: diff --git a/tests/test_registers.py b/tests/test_registers.py new file mode 100644 index 0000000..9596346 --- /dev/null +++ b/tests/test_registers.py @@ -0,0 +1,96 @@ +# SPDX-FileCopyrightText: 2026 Jan Kundrát +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest +import decimal + +from pykmp import messages, registers + +@pytest.mark.parametrize( + ("id_", "unit", "blob_with_size", "value_str", "value_dec", "unit_str"), + [ + pytest.param( + 1001, + 51, + '04 00 00 00 00 00', + '0', + decimal.Decimal('0'), + 'no unit (number)', + ), + pytest.param( + 60, + 8, + '04 43 00 05 c4 a6', + '378.022', + decimal.Decimal('378.022'), + 'GJ', + ), + pytest.param( + 1002, + 47, + '04 00 00 00 00 04', + '00:00:04', + None, + 'hh:mm:ss', + ), + pytest.param( + 1003, + 48, + '04 00 00 03 ae 4f', + '2024-12-31', + None, + 'yy:mm:dd', + ), + pytest.param( + 0, + 50, + '04 00 00 03 38', + '08-24', + None, + 'mm:dd', + ), + pytest.param( + 348, + 79, + '07 00 00 18 0c 1f 00 00 04', + '2024-12-31 00:00:04-00:00', + None, + 'DST YY-MM-DD hh:mm:ss', + ), + # FIXME: how to print the DST? This (unadjusted time, and a DST remark at the end) is what the vendor's + # documentation is using, but the LogView SW actually shows '07 00 3c 1a 04 15 0c 07 1e' as + # '2026-04-21 13:07:30', i.e., with the DST offset of one hour already added. + pytest.param( + 348, + 79, + '07 00 3C 10 06 1E 0E 30 37', + '2016-06-30 14:48:55+01:00', + None, + 'DST YY-MM-DD hh:mm:ss', + ), + pytest.param( + 254, + 54, + '0C 00 30 32 4B 35 32 41 43 31 41 37 43 5A', + '02K52AC1A7CZ', + None, + 'ASCII', + ), + ] +) +def test_register_parsing(id_, unit, blob_with_size, value_str, value_dec, unit_str): + data = messages.RegisterData(id_=id_, unit=unit, value=bytes.fromhex(blob_with_size)) + parsed = registers.RegisterOutput.from_register_data(data) + assert parsed.value_str == value_str + assert parsed.value_dec == value_dec + assert parsed.unit_str == unit_str + + +def test_register_pretty_line(): + data = messages.RegisterData(id_=1001, unit=51, value=bytes.fromhex('04 00 00 00 00 00')) + parsed = registers.RegisterOutput.from_register_data(data) + for part in ('1001', 'Fabrication No', ' 0 no unit (number)'): + assert part in parsed.to_pretty_line()