diff --git a/default_config.json b/default_config.json index 7684bff0f..abb781b70 100644 --- a/default_config.json +++ b/default_config.json @@ -22,6 +22,29 @@ "target_pixel": [256, 256], "gps_type": "ublox", "gps_baud_rate": 9600, + "time_sync_enabled": false, + "time_sync_source_mode": "best", + "gps_time_sync": true, + "ntp_time_sync": true, + "ntp_server": "pool.ntp.org", + "ntp_server_custom": "", + "ntp_poll_interval_seconds": 300, + "ntp_timeout_seconds": 1.0, + "ntp_max_delay_ms": 1500, + "ntp_stale_seconds": 900, + "time_sync_system_clock": true, + "gps_time_sync_min_samples": 5, + "gps_time_sync_window_seconds": 120, + "gps_time_sync_stale_seconds": 30, + "gps_time_sync_max_tacc_ns": 1000000000, + "gps_time_sync_stable_jitter_ms": 250, + "gps_time_sync_stable_offset_ms": 1000, + "time_sync_system_clock_min_interval_seconds": 300, + "time_sync_system_clock_step_threshold_ms": 500, + "software_pps": false, + "software_pps_interval_seconds": 1.0, + "rtc_sync": false, + "rtc_sync_min_interval_seconds": 3600, "filter.selected_catalogs": [ "NGC", "M", diff --git a/docs/mf_time_sync_en.md b/docs/mf_time_sync_en.md new file mode 100644 index 000000000..8e4633509 --- /dev/null +++ b/docs/mf_time_sync_en.md @@ -0,0 +1,144 @@ +# MF PiFinder Time Sync + +This document describes PiFinder's integrated time-sync feature. GPS and NTP can both be used as time sources, and the selected time can optionally be used for Linux system clock and RTC sync requests. Software PPS is managed as a separate periodic event source. + +The whole feature is `Off` by default. When `Time Sync` is turned `On`, the default source mode is `Best`, which compares GPS and NTP and selects the source with the better estimated quality. If NTP networking is slow or unavailable, NTP is reported as `unavailable` or `low_quality`; usable GPS time can still be selected. + +## UI Settings + +Settings path: + +```text +Settings > Advanced > Time Sync +``` + +Status path: + +```text +Tools > Place & Time > Time Sync +``` + +Main UI items: + +| UI item | Config key | Default | Meaning | +| --- | --- | --- | --- | +| `Time Sync` | `time_sync_enabled` | `Off` | Master switch for integrated time sync | +| `Source Mode` | `time_sync_source_mode` | `Best` | Select `Best`, `GPS`, or `NTP` | +| `GPS Source` | `gps_time_sync` | `On` | Use GPS as a time source | +| `NTP Source` | `ntp_time_sync` | `On` | Use NTP as a time source | +| `NTP Server` | `ntp_server`, `ntp_server_custom` | `pool.ntp.org` | Select a known NTP server or enter a custom server | +| `System Clock` | `time_sync_system_clock` | `On` | Request Linux system clock sync from the selected time | +| `RTC Sync` | `rtc_sync` | `Off` | Request RTC sync from the selected time | +| `Software PPS` | `software_pps` | `Off` | Emit software periodic ticks | + +Default NTP server list: + +```text +pool.ntp.org +time.google.com +time.cloudflare.com +time.nist.gov +Custom +``` + +When `Custom` is selected, PiFinder opens the server entry screen immediately. After saving, `NTP Server` is automatically set to `Custom`, and the entered address is stored in `ntp_server_custom`. + +## Default Config + +Important defaults in `default_config.json`: + +```json +"time_sync_enabled": false, +"time_sync_source_mode": "best", +"gps_time_sync": true, +"ntp_time_sync": true, +"ntp_server": "pool.ntp.org", +"ntp_server_custom": "", +"ntp_poll_interval_seconds": 300, +"ntp_timeout_seconds": 1.0, +"ntp_max_delay_ms": 1500, +"ntp_stale_seconds": 900, +"time_sync_system_clock": true, +"rtc_sync": false, +"software_pps": false +``` + +## Source Selection + +In `Best` mode, PiFinder compares stable GPS candidates with valid NTP candidates. + +GPS is judged by `valid`, `tAcc`, recent sample jitter, and stale age. NTP is judged by response validity, stratum, round-trip delay, root dispersion, and stale age. + +When both GPS and NTP are usable, PiFinder selects the source with the smaller estimated quality value. If NTP delay is above `ntp_max_delay_ms`, NTP is marked `low_quality` and is not selected. + +## System Clock and RTC + +The main PiFinder service keeps normal user permissions. Actual system clock or RTC writes require the separate root helper service. + +Before final outdoor testing, start with dry-run mode: + +```bash +cd ~/PiFinder +./scripts/install_gps_time_sync_helper.sh enable-dry-run +``` + +Switch to real write mode only when dry-run results are correct: + +```bash +cd ~/PiFinder +./scripts/install_gps_time_sync_helper.sh enable +``` + +The helper validates each request before running `/usr/bin/date` or `/usr/sbin/hwclock`. It checks that the request belongs to the current boot, is fresh, has a valid selected time source, and comes from a `stable` monitor state. + +## Status Files + +The status path keeps the existing filename for compatibility: + +```text +~/PiFinder_data/gps_time_status.json +``` + +Important fields: + +| Field | Meaning | +| --- | --- | +| `state` | Integrated time-sync state | +| `selected` | Currently selected time source and time | +| `latest` | Last GPS time sample | +| `ntp` | Last NTP query result | +| `sources.gps` | GPS source state and candidate | +| `sources.ntp` | NTP source state and candidate | +| `system_clock_sync` | System clock sync request state | +| `rtc_sync` | RTC sync request state | +| `software_pps` | Software PPS tick state | +| `helper` | Last root-helper result | + +Helper request file: + +```text +~/PiFinder_data/gps_time_sync_request.json +``` + +Helper status file: + +```text +~/PiFinder_data/gps_time_sync_helper_status.json +``` + +The filenames are retained for compatibility with existing installs and the helper service. + +## Test + +Run unit tests: + +```bash +cd ~/PiFinder/python +pytest tests/test_gps_time_sync.py tests/test_gps_time_sync_helper.py tests/test_gps_time_sync_status_ui.py -q +``` + +Watch hardware status: + +```bash +watch -n 1 cat ~/PiFinder_data/gps_time_status.json +``` diff --git a/docs/mf_time_sync_ko.md b/docs/mf_time_sync_ko.md new file mode 100644 index 000000000..1e6340906 --- /dev/null +++ b/docs/mf_time_sync_ko.md @@ -0,0 +1,144 @@ +# MF PiFinder 시간 동기화 + +이 문서는 PiFinder의 통합 시간 동기화 기능을 설명합니다. 시간 소스는 GPS와 NTP를 함께 사용할 수 있고, 선택된 시간은 선택적으로 Linux system clock과 RTC 동기화 요청에 사용됩니다. 소프트웨어 PPS는 별도 주기 이벤트로 관리됩니다. + +기본값은 안전을 위해 전체 기능이 `Off`입니다. 전체 `Time Sync`를 `On`으로 바꾸면 기본 소스 모드는 `Best`이며, GPS와 NTP 중 추정 품질이 더 좋은 값을 선택합니다. NTP 네트워크가 느리거나 끊기면 NTP는 `unavailable` 또는 `low_quality`로 표시되고, 사용 가능한 GPS 시간이 있으면 GPS를 선택합니다. + +## UI 설정 + +설정 위치: + +```text +Settings > Advanced > Time Sync +``` + +상태 확인 위치: + +```text +Tools > Place & Time > Time Sync +``` + +주요 UI 항목: + +| UI 항목 | 설정 키 | 기본값 | 의미 | +| --- | --- | --- | --- | +| `Time Sync` | `time_sync_enabled` | `Off` | 통합 시간 동기화 전체 스위치 | +| `Source Mode` | `time_sync_source_mode` | `Best` | `Best`, `GPS`, `NTP` 중 선택 | +| `GPS Source` | `gps_time_sync` | `On` | GPS 시간 소스 사용 | +| `NTP Source` | `ntp_time_sync` | `On` | NTP 시간 소스 사용 | +| `NTP Server` | `ntp_server`, `ntp_server_custom` | `pool.ntp.org` | 기본 NTP 서버 목록 선택 또는 커스텀 서버 입력 | +| `System Clock` | `time_sync_system_clock` | `On` | 선택된 시간으로 Linux system clock 동기화 요청 | +| `RTC Sync` | `rtc_sync` | `Off` | 선택된 시간으로 RTC 동기화 요청 | +| `Software PPS` | `software_pps` | `Off` | 소프트웨어 주기 tick 생성 | + +NTP 서버 기본 목록: + +```text +pool.ntp.org +time.google.com +time.cloudflare.com +time.nist.gov +Custom +``` + +`Custom`을 선택하면 바로 서버 주소 입력 화면이 열립니다. 입력을 저장하면 `NTP Server`는 자동으로 `Custom`으로 설정되고, 입력한 주소는 `ntp_server_custom`에 저장됩니다. + +## 기본 설정 값 + +`default_config.json`의 주요 기본값은 다음과 같습니다. + +```json +"time_sync_enabled": false, +"time_sync_source_mode": "best", +"gps_time_sync": true, +"ntp_time_sync": true, +"ntp_server": "pool.ntp.org", +"ntp_server_custom": "", +"ntp_poll_interval_seconds": 300, +"ntp_timeout_seconds": 1.0, +"ntp_max_delay_ms": 1500, +"ntp_stale_seconds": 900, +"time_sync_system_clock": true, +"rtc_sync": false, +"software_pps": false +``` + +## 선택 방식 + +`Best` 모드에서는 안정적인 GPS 후보와 유효한 NTP 후보를 비교합니다. + +GPS는 `valid`, `tAcc`, 최근 샘플 jitter, stale 여부를 기준으로 판단합니다. NTP는 응답 유효성, stratum, 왕복 지연, root dispersion, stale 여부를 기준으로 판단합니다. + +GPS와 NTP가 모두 사용할 수 있으면 추정 품질값이 더 작은 소스를 선택합니다. NTP 지연이 `ntp_max_delay_ms`보다 크면 `low_quality`로 표시되고 선택 후보에서 제외됩니다. + +## System Clock과 RTC + +PiFinder 본체는 일반 사용자 권한으로 실행됩니다. system clock 또는 RTC를 실제로 쓰려면 별도 root helper 서비스가 필요합니다. + +실외 최종 테스트 전에는 dry-run으로 먼저 확인하는 것을 권장합니다. + +```bash +cd ~/PiFinder +./scripts/install_gps_time_sync_helper.sh enable-dry-run +``` + +실제 쓰기를 허용하려면 다음으로 전환합니다. + +```bash +cd ~/PiFinder +./scripts/install_gps_time_sync_helper.sh enable +``` + +helper는 요청 파일을 검증한 뒤에만 `/usr/bin/date` 또는 `/usr/sbin/hwclock`을 실행합니다. 요청은 같은 부팅 세션의 최신 요청인지, 선택된 시간 소스가 유효한지, 모니터 상태가 `stable`인지 확인됩니다. + +## 상태 파일 + +상태 파일은 기존 경로를 유지합니다. + +```text +~/PiFinder_data/gps_time_status.json +``` + +주요 항목: + +| 항목 | 의미 | +| --- | --- | +| `state` | 통합 시간 동기화 상태 | +| `selected` | 현재 선택된 시간 소스와 시간 | +| `latest` | 마지막 GPS 시간 샘플 | +| `ntp` | 마지막 NTP 조회 결과 | +| `sources.gps` | GPS 소스 상태와 후보 | +| `sources.ntp` | NTP 소스 상태와 후보 | +| `system_clock_sync` | system clock 동기화 요청 상태 | +| `rtc_sync` | RTC 동기화 요청 상태 | +| `software_pps` | 소프트웨어 PPS tick 상태 | +| `helper` | root helper의 마지막 처리 결과 | + +helper 요청 파일: + +```text +~/PiFinder_data/gps_time_sync_request.json +``` + +helper 상태 파일: + +```text +~/PiFinder_data/gps_time_sync_helper_status.json +``` + +파일명은 기존 설치와 helper 서비스 호환성을 위해 유지합니다. + +## 테스트 + +단위 테스트: + +```bash +cd ~/PiFinder/python +pytest tests/test_gps_time_sync.py tests/test_gps_time_sync_helper.py tests/test_gps_time_sync_status_ui.py -q +``` + +실기 상태 확인: + +```bash +watch -n 1 cat ~/PiFinder_data/gps_time_status.json +``` diff --git a/pi_config_files/pifinder_gps_time_sync.service b/pi_config_files/pifinder_gps_time_sync.service new file mode 100644 index 000000000..8e1d9b2ad --- /dev/null +++ b/pi_config_files/pifinder_gps_time_sync.service @@ -0,0 +1,15 @@ +[Unit] +Description=PiFinder Time Sync Helper +After=basic.target pifinder.service + +[Service] +Type=simple +User=root +WorkingDirectory=__PIFINDER_REPO_DIR__/python +Environment=PIFINDER_DATA_DIR=__PIFINDER_DATA_DIR__ +ExecStart=/usr/bin/python -m PiFinder.gps_time_sync_helper +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/python/PiFinder/gps_gpsd.py b/python/PiFinder/gps_gpsd.py index 96df6c435..93ec5f1fe 100644 --- a/python/PiFinder/gps_gpsd.py +++ b/python/PiFinder/gps_gpsd.py @@ -42,6 +42,43 @@ def is_tpv_accurate(tpv_dict): return False +def gpsd_time_message(tpv_dict, gps_locked=False): + """Build a GPS time message even before the position fix is accurate.""" + gps_time = tpv_dict.get("time") + if not gps_time: + return None + + content = { + "time": gps_time, + "source": "GPSD", + "mode": tpv_dict.get("mode", 0), + "lock": gps_locked, + } + if gps_locked: + content["error_in_m"] = error_in_m + return "time", content + + +def gpsd_sky_time_sample(sky_dict): + """Build a monitor-only GPS time candidate from SKY reports.""" + gps_time = sky_dict.get("time") + if not gps_time: + return None + + return ( + "time_sample", + { + "time": gps_time, + "source": "GPSD-SKY", + "valid": False, + "satellites_seen": sky_dict.get("nSat"), + "satellites_used": sky_dict.get("uSat"), + "hdop": sky_dict.get("hdop"), + "pdop": sky_dict.get("pdop"), + }, + ) + + def gps_main(gps_queue, console_queue, log_queue): global error_2d, error_3d, error_in_m MultiprocLogging.configurer(log_queue) @@ -54,8 +91,16 @@ def gps_main(gps_queue, console_queue, log_queue): for result in client.dict_stream( convert_datetime=True, filter=["TPV", "SKY"] ): - if result["class"] == "TPV" and is_tpv_accurate(result): - logger.debug("last reading is %s", result) + if result["class"] == "TPV": + gps_accurate = is_tpv_accurate(result) + + time_msg = gpsd_time_message(result, gps_accurate) + if time_msg is not None: + logger.debug("Setting GPSD time to %s", result.get("time")) + gps_queue.put(time_msg) + + if result["class"] == "TPV" and gps_accurate: + logger.debug("last accurate reading is %s", result) if ( result.get("lat") and result.get("lon") @@ -80,14 +125,12 @@ def gps_main(gps_queue, console_queue, log_queue): logger.debug("GPS fix: %s", msg) gps_queue.put(msg) - if result.get("time"): - msg = ("time", result.get("time")) - logger.debug("Setting time to %s", result.get("time")) - gps_queue.put(msg) - if result["class"] == "SKY": logger.debug("GPS: SKY: %s", result) print("GPS: SKY: %s", result) + time_sample = gpsd_sky_time_sample(result) + if time_sample is not None: + gps_queue.put(time_sample) if result["class"] == "SKY": error_2d = result.get("hdop", 999) error_3d = result.get("pdop", 999) diff --git a/python/PiFinder/gps_time_sync.py b/python/PiFinder/gps_time_sync.py new file mode 100644 index 000000000..a44d779f2 --- /dev/null +++ b/python/PiFinder/gps_time_sync.py @@ -0,0 +1,1385 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- +""" +Integrated time-source monitor for PiFinder. + +The monitor evaluates GPS and NTP candidates, selects the best available time +source, manages optional software PPS ticks, and writes constrained requests for +the privileged helper when system clock or RTC updates are enabled. +""" + +from __future__ import annotations + +import datetime +import json +import logging +import math +import os +import socket +import struct +import threading +import time +from collections import deque +from pathlib import Path +from typing import Any, Callable, Deque, Optional + +import pytz + +from PiFinder import utils + + +logger = logging.getLogger("GPS.TimeSync") + +DATA_DIR = Path(os.environ.get("PIFINDER_DATA_DIR", utils.data_dir)) +STATUS_FILE = DATA_DIR / "gps_time_status.json" +REQUEST_FILE = DATA_DIR / "gps_time_sync_request.json" +HELPER_STATUS_FILE = DATA_DIR / "gps_time_sync_helper_status.json" +NTP_EPOCH_DELTA = 2_208_988_800 +DEFAULT_NTP_SERVERS = ( + "pool.ntp.org", + "time.google.com", + "time.cloudflare.com", + "time.nist.gov", +) + + +def _read_boot_id() -> str: + try: + return Path("/proc/sys/kernel/random/boot_id").read_text().strip() + except OSError: + return "unknown" + + +class ClockSyncRequestWriter: + """Write requests for the privileged GPS time-sync helper.""" + + def __init__( + self, + request_file: Path = REQUEST_FILE, + boot_id_fn: Callable[[], str] = _read_boot_id, + ): + self.request_file = request_file + self.boot_id_fn = boot_id_fn + + def write_request(self, payload: dict[str, Any]) -> dict[str, Any]: + try: + utils.create_path(self.request_file.parent) + payload = dict(payload) + payload["boot_id"] = self.boot_id_fn() + tmp_file = self.request_file.with_name(self.request_file.name + ".tmp") + with open(tmp_file, "w", encoding="utf-8") as request_out: + json.dump(payload, request_out, indent=2, sort_keys=True) + tmp_file.replace(self.request_file) + except Exception as exc: + return {"ok": False, "message": str(exc)} + return {"ok": True, "message": f"request written to {self.request_file}"} + + def clear_request(self) -> None: + try: + self.request_file.unlink() + except FileNotFoundError: + return + except Exception: + logger.exception("Could not clear time sync request") + + +def _as_bool(value: Any, default: bool = False) -> bool: + if value is None: + return default + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in ("1", "true", "yes", "on") + return bool(value) + + +def _as_float(value: Any, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _as_int(value: Any, default: int) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _utc_datetime(dt: datetime.datetime) -> datetime.datetime: + if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: + return pytz.timezone("UTC").localize(dt) + return dt.astimezone(pytz.timezone("UTC")) + + +def _read_ntp_timestamp(packet: bytes, offset: int) -> float: + seconds, fraction = struct.unpack("!II", packet[offset : offset + 8]) + return seconds - NTP_EPOCH_DELTA + fraction / 2**32 + + +def _write_ntp_timestamp(packet: bytearray, offset: int, unix_time: float) -> None: + ntp_time = unix_time + NTP_EPOCH_DELTA + seconds = int(ntp_time) + fraction = int((ntp_time - seconds) * 2**32) + packet[offset : offset + 8] = struct.pack("!II", seconds, fraction) + + +def _read_ntp_short(packet: bytes, offset: int, signed: bool = False) -> float: + fmt = "!i" if signed else "!I" + raw_value = struct.unpack(fmt, packet[offset : offset + 4])[0] + return raw_value / 2**16 + + +class NtpClient: + """Small SNTP client used for opportunistic time-source checks.""" + + port = 123 + + def __init__(self, time_fn: Callable[[], float] = time.time): + self.time_fn = time_fn + + def query(self, server: str, timeout_seconds: float = 1.0) -> dict[str, Any]: + server = server.strip() + if not server: + raise ValueError("NTP server is empty") + + packet = bytearray(48) + packet[0] = 0x23 # LI=0, VN=4, mode=client + t1 = self.time_fn() + _write_ntp_timestamp(packet, 40, t1) + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.settimeout(timeout_seconds) + sock.sendto(packet, (server, self.port)) + data, address = sock.recvfrom(512) + t4 = self.time_fn() + + if len(data) < 48: + raise ValueError("NTP response was shorter than 48 bytes") + + leap = (data[0] >> 6) & 0x03 + version = (data[0] >> 3) & 0x07 + mode = data[0] & 0x07 + stratum = data[1] + t2 = _read_ntp_timestamp(data, 32) + t3 = _read_ntp_timestamp(data, 40) + root_delay = _read_ntp_short(data, 4, signed=True) + root_dispersion = _read_ntp_short(data, 8) + delay = (t4 - t1) - (t3 - t2) + offset = ((t2 - t1) + (t3 - t4)) / 2.0 + valid = mode in (4, 5) and 0 < stratum < 16 and t3 > 0 and leap != 3 + quality = max(0.0, delay) / 2.0 + max(0.0, root_dispersion) + + return { + "time": datetime.datetime.fromtimestamp(t4 + offset, tz=pytz.UTC), + "server": server, + "address": address[0] if address else None, + "valid": valid, + "version": version, + "mode": mode, + "stratum": stratum, + "leap": leap, + "offset_seconds": offset, + "delay_seconds": max(0.0, delay), + "root_delay_seconds": root_delay, + "root_dispersion_seconds": root_dispersion, + "quality_seconds": quality, + "received_unix": t4, + } + + +class GpsTimeSyncMonitor: + """Evaluate GPS time quality and optional software PPS ticks.""" + + def __init__( + self, + time_sync_enabled: Optional[bool] = None, + enabled: bool = False, + ntp_enabled: bool = False, + software_pps_enabled: bool = False, + system_clock_sync_enabled: bool = False, + rtc_sync_enabled: bool = False, + source_mode: str = "best", + ntp_server: str = DEFAULT_NTP_SERVERS[0], + ntp_server_custom: str = "", + ntp_poll_interval_seconds: float = 300.0, + ntp_timeout_seconds: float = 1.0, + ntp_max_delay_ms: float = 1500.0, + ntp_stale_seconds: float = 900.0, + min_samples: int = 5, + sample_window_seconds: float = 120.0, + stale_seconds: float = 30.0, + max_tacc_ns: int = 1_000_000_000, + stable_jitter_ms: float = 250.0, + stable_offset_ms: float = 1000.0, + status_write_interval_seconds: float = 5.0, + software_pps_interval_seconds: float = 1.0, + system_clock_sync_min_interval_seconds: float = 300.0, + system_clock_sync_step_threshold_ms: float = 500.0, + rtc_sync_min_interval_seconds: float = 3600.0, + status_file: Path = STATUS_FILE, + helper_status_file: Path = HELPER_STATUS_FILE, + time_fn: Callable[[], float] = time.time, + monotonic_fn: Callable[[], float] = time.monotonic, + request_writer: Optional[ClockSyncRequestWriter] = None, + ntp_client: Optional[NtpClient] = None, + ntp_async: bool = True, + ): + if time_sync_enabled is None: + time_sync_enabled = ( + enabled + or ntp_enabled + or software_pps_enabled + or system_clock_sync_enabled + or rtc_sync_enabled + ) + self.time_sync_enabled = time_sync_enabled + self.enabled = enabled + self.ntp_enabled = ntp_enabled + self.software_pps_enabled = software_pps_enabled + self.system_clock_sync_enabled = system_clock_sync_enabled + self.rtc_sync_enabled = rtc_sync_enabled + self.source_mode = source_mode if source_mode in ("best", "gps", "ntp") else "best" + self.ntp_server = ntp_server + self.ntp_server_custom = ntp_server_custom + self.ntp_poll_interval_seconds = max(5.0, ntp_poll_interval_seconds) + self.ntp_timeout_seconds = max(0.1, ntp_timeout_seconds) + self.ntp_max_delay_seconds = max(0.001, ntp_max_delay_ms / 1000.0) + self.ntp_stale_seconds = max(5.0, ntp_stale_seconds) + self.min_samples = max(1, min_samples) + self.sample_window_seconds = max(1.0, sample_window_seconds) + self.stale_seconds = max(1.0, stale_seconds) + self.max_tacc_ns = max_tacc_ns + self.stable_jitter_seconds = max(0.001, stable_jitter_ms / 1000.0) + self.stable_offset_seconds = max(0.001, stable_offset_ms / 1000.0) + self.status_write_interval_seconds = max(0.5, status_write_interval_seconds) + self.software_pps_interval_seconds = max(0.1, software_pps_interval_seconds) + self.system_clock_sync_min_interval_seconds = max( + 1.0, system_clock_sync_min_interval_seconds + ) + self.system_clock_sync_step_threshold_seconds = max( + 0.0, system_clock_sync_step_threshold_ms / 1000.0 + ) + self.rtc_sync_min_interval_seconds = max(1.0, rtc_sync_min_interval_seconds) + self.status_file = status_file + self.helper_status_file = helper_status_file + self.time_fn = time_fn + self.monotonic_fn = monotonic_fn + self.request_writer = request_writer or ClockSyncRequestWriter() + self.ntp_client = ntp_client or NtpClient(time_fn=time_fn) + self.ntp_async = ntp_async + + self.samples: Deque[dict[str, Any]] = deque() + self.state = "disabled" + self.message = "Time sync disabled" + self.gps_state = "disabled" + self.gps_message = "GPS time source disabled" + self.last_status_write_monotonic: Optional[float] = None + self.latest_sample: Optional[dict[str, Any]] = None + + self.ntp_state = "disabled" + self.ntp_message = "NTP time source disabled" + self.latest_ntp_sample: Optional[dict[str, Any]] = None + self.last_ntp_poll_monotonic: Optional[float] = None + self.ntp_query_in_progress = False + self.ntp_query_started_monotonic: Optional[float] = None + self.ntp_pending_result: Optional[dict[str, Any]] = None + self.ntp_lock = threading.Lock() + + self.selected_source: Optional[dict[str, Any]] = None + + self.pps_tick_count = 0 + self.last_pps_tick_monotonic: Optional[float] = None + self.last_pps_tick_estimated_utc: Optional[datetime.datetime] = None + self.next_pps_tick_monotonic: Optional[float] = None + + self.system_clock_sync_state = "disabled" + self.system_clock_sync_message = "System clock sync disabled" + self.system_clock_request_count = 0 + self.last_system_clock_request_monotonic: Optional[float] = None + self.last_system_clock_request_utc: Optional[str] = None + self.last_system_clock_offset_seconds: Optional[float] = None + + self.rtc_sync_state = "disabled" + self.rtc_sync_message = "RTC sync disabled" + self.rtc_request_count = 0 + self.last_rtc_request_monotonic: Optional[float] = None + self.last_rtc_request_utc: Optional[str] = None + + @classmethod + def from_config( + cls, + cfg, + status_file: Path = STATUS_FILE, + helper_status_file: Path = HELPER_STATUS_FILE, + ) -> "GpsTimeSyncMonitor": + return cls( + time_sync_enabled=_as_bool(cfg.get_option("time_sync_enabled", False)), + enabled=_as_bool(cfg.get_option("gps_time_sync", True)), + ntp_enabled=_as_bool(cfg.get_option("ntp_time_sync", True)), + software_pps_enabled=_as_bool(cfg.get_option("software_pps", False)), + system_clock_sync_enabled=_as_bool( + cfg.get_option( + "time_sync_system_clock", + cfg.get_option("gps_time_sync_system_clock", True), + ) + ), + rtc_sync_enabled=_as_bool(cfg.get_option("rtc_sync", False)), + source_mode=str(cfg.get_option("time_sync_source_mode", "best")), + ntp_server=str(cfg.get_option("ntp_server", DEFAULT_NTP_SERVERS[0])), + ntp_server_custom=str(cfg.get_option("ntp_server_custom", "")), + ntp_poll_interval_seconds=_as_float( + cfg.get_option("ntp_poll_interval_seconds", 300.0), 300.0 + ), + ntp_timeout_seconds=_as_float( + cfg.get_option("ntp_timeout_seconds", 1.0), 1.0 + ), + ntp_max_delay_ms=_as_float( + cfg.get_option("ntp_max_delay_ms", 1500.0), 1500.0 + ), + ntp_stale_seconds=_as_float( + cfg.get_option("ntp_stale_seconds", 900.0), 900.0 + ), + min_samples=_as_int(cfg.get_option("gps_time_sync_min_samples", 5), 5), + sample_window_seconds=_as_float( + cfg.get_option("gps_time_sync_window_seconds", 120.0), 120.0 + ), + stale_seconds=_as_float( + cfg.get_option("gps_time_sync_stale_seconds", 30.0), 30.0 + ), + max_tacc_ns=_as_int( + cfg.get_option("gps_time_sync_max_tacc_ns", 1_000_000_000), + 1_000_000_000, + ), + stable_jitter_ms=_as_float( + cfg.get_option("gps_time_sync_stable_jitter_ms", 250.0), 250.0 + ), + stable_offset_ms=_as_float( + cfg.get_option("gps_time_sync_stable_offset_ms", 1000.0), 1000.0 + ), + software_pps_interval_seconds=_as_float( + cfg.get_option("software_pps_interval_seconds", 1.0), 1.0 + ), + system_clock_sync_min_interval_seconds=_as_float( + cfg.get_option( + "time_sync_system_clock_min_interval_seconds", + cfg.get_option( + "gps_time_sync_system_clock_min_interval_seconds", 300.0 + ), + ), + 300.0, + ), + system_clock_sync_step_threshold_ms=_as_float( + cfg.get_option( + "time_sync_system_clock_step_threshold_ms", + cfg.get_option( + "gps_time_sync_system_clock_step_threshold_ms", 500.0 + ), + ), + 500.0, + ), + rtc_sync_min_interval_seconds=_as_float( + cfg.get_option("rtc_sync_min_interval_seconds", 3600.0), 3600.0 + ), + status_file=status_file, + helper_status_file=helper_status_file, + ) + + def update_config(self, cfg) -> None: + updated = self.from_config( + cfg, + status_file=self.status_file, + helper_status_file=self.helper_status_file, + ) + self.time_sync_enabled = updated.time_sync_enabled + self.enabled = updated.enabled + self.ntp_enabled = updated.ntp_enabled + self.software_pps_enabled = updated.software_pps_enabled + self.system_clock_sync_enabled = updated.system_clock_sync_enabled + self.rtc_sync_enabled = updated.rtc_sync_enabled + self.source_mode = updated.source_mode + self.ntp_server = updated.ntp_server + self.ntp_server_custom = updated.ntp_server_custom + self.ntp_poll_interval_seconds = updated.ntp_poll_interval_seconds + self.ntp_timeout_seconds = updated.ntp_timeout_seconds + self.ntp_max_delay_seconds = updated.ntp_max_delay_seconds + self.ntp_stale_seconds = updated.ntp_stale_seconds + self.min_samples = updated.min_samples + self.sample_window_seconds = updated.sample_window_seconds + self.stale_seconds = updated.stale_seconds + self.max_tacc_ns = updated.max_tacc_ns + self.stable_jitter_seconds = updated.stable_jitter_seconds + self.stable_offset_seconds = updated.stable_offset_seconds + self.status_write_interval_seconds = updated.status_write_interval_seconds + self.software_pps_interval_seconds = updated.software_pps_interval_seconds + self.system_clock_sync_min_interval_seconds = ( + updated.system_clock_sync_min_interval_seconds + ) + self.system_clock_sync_step_threshold_seconds = ( + updated.system_clock_sync_step_threshold_seconds + ) + self.rtc_sync_min_interval_seconds = updated.rtc_sync_min_interval_seconds + self._refresh_action_wait_states() + self.write_status(force=True) + + def _active(self) -> bool: + return self.time_sync_enabled and ( + self.enabled + or self.ntp_enabled + or self.software_pps_enabled + or self.system_clock_sync_enabled + or self.rtc_sync_enabled + ) + + def write_startup_status(self) -> None: + if not self.time_sync_enabled: + self._set_state("disabled", "Time sync disabled") + elif self.enabled: + self._set_state("waiting_for_time_source", "Waiting for time source") + elif self.software_pps_enabled: + self._set_state("software_pps_only", "Software PPS enabled") + else: + self._set_state("disabled", "Time sync has no enabled source") + + self._refresh_action_wait_states() + if self._active() or self.status_file.exists(): + self.write_status(force=True) + + def _set_state(self, state: str, message: str) -> bool: + changed = state != self.state or message != self.message + self.state = state + self.message = message + return changed + + def _set_gps_state(self, state: str, message: str) -> bool: + changed = state != self.gps_state or message != self.gps_message + self.gps_state = state + self.gps_message = message + return changed + + def _set_ntp_state(self, state: str, message: str) -> bool: + changed = state != self.ntp_state or message != self.ntp_message + self.ntp_state = state + self.ntp_message = message + return changed + + def _prune_samples(self, now_monotonic: float) -> None: + while ( + self.samples + and now_monotonic - self.samples[0]["monotonic"] > self.sample_window_seconds + ): + self.samples.popleft() + + def _offset_stats(self) -> dict[str, Optional[float]]: + offsets = [ + sample["offset_seconds"] + for sample in self.samples + if sample.get("offset_seconds") is not None + ] + if not offsets: + return { + "latest_seconds": None, + "mean_seconds": None, + "jitter_seconds": None, + "min_seconds": None, + "max_seconds": None, + } + latest = offsets[-1] + min_offset = min(offsets) + max_offset = max(offsets) + return { + "latest_seconds": latest, + "mean_seconds": sum(offsets) / len(offsets), + "jitter_seconds": max_offset - min_offset, + "min_seconds": min_offset, + "max_seconds": max_offset, + } + + def _extract_sample( + self, gps_content: Any + ) -> tuple[Optional[datetime.datetime], Optional[int], str, bool]: + if isinstance(gps_content, datetime.datetime): + return _utc_datetime(gps_content), None, "GPS", True + if not isinstance(gps_content, dict): + return None, None, "unknown", False + + gps_dt = gps_content.get("time") + if not isinstance(gps_dt, datetime.datetime): + return None, None, str(gps_content.get("source", "unknown")), False + + tacc = gps_content.get("tAcc") + if tacc is not None: + tacc = _as_int(tacc, -1) + return ( + _utc_datetime(gps_dt), + tacc, + str(gps_content.get("source", "GPS")), + _as_bool(gps_content.get("valid", True), True), + ) + + def observe_time(self, gps_content: Any, reference_dt: Any = None) -> None: + if not self.time_sync_enabled or not self.enabled: + return + + now_monotonic = self.monotonic_fn() + gps_dt, tacc_ns, source, valid = self._extract_sample(gps_content) + if gps_dt is None: + changed = self._set_gps_state( + "invalid_sample", "GPS time sample missing time" + ) + changed = self._evaluate_state() or changed + self.write_status(force=changed) + return + + ref_dt = None + offset_seconds = None + if isinstance(reference_dt, datetime.datetime): + ref_dt = _utc_datetime(reference_dt) + offset_seconds = (gps_dt - ref_dt).total_seconds() + + sample = { + "gps_time": gps_dt.isoformat(), + "source": source, + "valid": valid, + "tAcc_ns": tacc_ns, + "reference_time": ref_dt.isoformat() if ref_dt else None, + "offset_seconds": offset_seconds, + "system_offset_seconds": gps_dt.timestamp() - self.time_fn(), + "monotonic": now_monotonic, + "received_unix": self.time_fn(), + } + for key in ( + "message_class", + "lock_type", + "mode", + "satellites_seen", + "satellites_used", + "hdop", + "pdop", + ): + if isinstance(gps_content, dict) and key in gps_content: + sample[key] = gps_content[key] + self.latest_sample = sample + self.samples.append(sample) + self._prune_samples(now_monotonic) + + changed = self._evaluate_state() + changed = self._maybe_apply_sync_actions() or changed + self.write_status(force=changed or len(self.samples) == 1) + + def _evaluate_gps_state(self) -> bool: + if not self.enabled: + return self._set_gps_state("disabled", "GPS time source disabled") + + if self.latest_sample is None: + return self._set_gps_state("waiting_for_gps_time", "Waiting for GPS time") + + if self.monotonic_fn() - self.latest_sample["monotonic"] > self.stale_seconds: + return self._set_gps_state( + "stale", + f"No GPS time sample for more than {self.stale_seconds:.0f}s", + ) + + if not self.latest_sample.get("valid", True): + return self._set_gps_state( + "low_quality", + "GPS time candidate is present but is not valid yet", + ) + + tacc_ns = self.latest_sample.get("tAcc_ns") + if tacc_ns is not None and tacc_ns >= 0 and tacc_ns > self.max_tacc_ns: + return self._set_gps_state( + "low_quality", + f"GPS time accuracy {tacc_ns} ns exceeds {self.max_tacc_ns} ns", + ) + + stats = self._offset_stats() + if stats["latest_seconds"] is None: + return self._set_gps_state( + "no_reference", + "GPS time received before PiFinder internal time was available", + ) + + if len(self.samples) < self.min_samples: + return self._set_gps_state( + "collecting", + f"Collecting GPS time samples {len(self.samples)}/{self.min_samples}", + ) + + latest_offset = abs(stats["latest_seconds"] or 0.0) + jitter = stats["jitter_seconds"] or 0.0 + if ( + latest_offset <= self.stable_offset_seconds + and jitter <= self.stable_jitter_seconds + ): + return self._set_gps_state("stable", "GPS time is stable") + + return self._set_gps_state( + "unstable", + "GPS time offset or jitter is outside the configured threshold", + ) + + def _gps_quality_seconds(self) -> Optional[float]: + if self.latest_sample is None: + return None + tacc_ns = self.latest_sample.get("tAcc_ns") + if isinstance(tacc_ns, (int, float)) and tacc_ns >= 0: + return tacc_ns / 1_000_000_000.0 + jitter = self._offset_stats().get("jitter_seconds") + if isinstance(jitter, (int, float)): + return max(jitter, self.stable_jitter_seconds) + return self.stable_jitter_seconds + + def _gps_candidate(self) -> Optional[dict[str, Any]]: + if self.gps_state != "stable" or self.latest_sample is None: + return None + age = self.monotonic_fn() - self.latest_sample["monotonic"] + if age > self.stale_seconds: + return None + gps_dt = self._latest_gps_datetime() + if gps_dt is None: + return None + quality_seconds = self._gps_quality_seconds() + return { + "source": "GPS", + "time": gps_dt.isoformat(), + "valid": True, + "quality_seconds": quality_seconds, + "age_seconds": age, + "tAcc_ns": self.latest_sample.get("tAcc_ns"), + "message_class": self.latest_sample.get("message_class"), + "server": None, + } + + def _ntp_candidate(self) -> Optional[dict[str, Any]]: + if self.ntp_state != "stable" or self.latest_ntp_sample is None: + return None + age = self.monotonic_fn() - self.latest_ntp_sample["monotonic"] + if age > self.ntp_stale_seconds: + return None + ntp_time = self.latest_ntp_sample.get("time") + if not ntp_time: + return None + try: + ntp_dt = _utc_datetime(datetime.datetime.fromisoformat(ntp_time)) + except ValueError: + return None + return { + "source": "NTP", + "time": ntp_dt.isoformat(), + "valid": True, + "quality_seconds": self.latest_ntp_sample.get("quality_seconds"), + "age_seconds": age, + "server": self.latest_ntp_sample.get("server"), + "delay_seconds": self.latest_ntp_sample.get("delay_seconds"), + "stratum": self.latest_ntp_sample.get("stratum"), + } + + def _candidate_for_mode(self) -> list[dict[str, Any]]: + gps_candidate = self._gps_candidate() + ntp_candidate = self._ntp_candidate() + if self.source_mode == "gps": + return [gps_candidate] if gps_candidate else [] + if self.source_mode == "ntp": + return [ntp_candidate] if ntp_candidate else [] + return [ + candidate + for candidate in (gps_candidate, ntp_candidate) + if candidate is not None + ] + + def _evaluate_selected_source(self) -> bool: + candidates = self._candidate_for_mode() + if candidates: + selected = min( + candidates, + key=lambda candidate: ( + candidate.get("quality_seconds") + if isinstance(candidate.get("quality_seconds"), (int, float)) + else float("inf") + ), + ) + changed = selected != self.selected_source + self.selected_source = selected + changed = ( + self._set_state( + "stable", + "Selected {source} time source".format( + source=selected.get("source", "time") + ), + ) + or changed + ) + return changed + + previous_selected = self.selected_source + self.selected_source = None + changed = previous_selected is not None + + if self.source_mode == "gps" and self.enabled: + return self._set_state(self.gps_state, self.gps_message) or changed + if self.source_mode == "ntp" and self.ntp_enabled: + return self._set_state(self.ntp_state, self.ntp_message) or changed + if self.enabled and self.gps_state not in ("disabled", "waiting_for_gps_time"): + return self._set_state(self.gps_state, self.gps_message) or changed + if self.ntp_enabled: + return self._set_state(self.ntp_state, self.ntp_message) or changed + if self.enabled: + return self._set_state(self.gps_state, self.gps_message) or changed + if self.software_pps_enabled: + return ( + self._set_state("software_pps_only", "Software PPS enabled") or changed + ) + return self._set_state("disabled", "Time sync has no enabled source") or changed + + def _evaluate_state(self) -> bool: + if not self.time_sync_enabled: + self.selected_source = None + self._set_gps_state("disabled", "GPS time source disabled") + self._set_ntp_state("disabled", "NTP time source disabled") + return self._set_state("disabled", "Time sync disabled") + + changed = self._evaluate_gps_state() + changed = self._evaluate_selected_source() or changed + return changed + + def _set_system_clock_sync_state( + self, state: str, message: str, offset_seconds: Optional[float] = None + ) -> bool: + changed = ( + state != self.system_clock_sync_state + or message != self.system_clock_sync_message + or offset_seconds != self.last_system_clock_offset_seconds + ) + self.system_clock_sync_state = state + self.system_clock_sync_message = message + self.last_system_clock_offset_seconds = offset_seconds + return changed + + def _set_rtc_sync_state(self, state: str, message: str) -> bool: + changed = state != self.rtc_sync_state or message != self.rtc_sync_message + self.rtc_sync_state = state + self.rtc_sync_message = message + return changed + + def _latest_gps_datetime(self) -> Optional[datetime.datetime]: + if self.latest_sample is None: + return None + gps_time = self.latest_sample.get("gps_time") + if not gps_time: + return None + try: + return _utc_datetime(datetime.datetime.fromisoformat(gps_time)) + except ValueError: + return None + + def _selected_datetime(self) -> Optional[datetime.datetime]: + if not self.selected_source: + return None + selected_time = self.selected_source.get("time") + if not isinstance(selected_time, str) or not selected_time: + return None + try: + return _utc_datetime(datetime.datetime.fromisoformat(selected_time)) + except ValueError: + return None + + def _effective_ntp_server(self) -> str: + if self.ntp_server == "custom": + custom = self.ntp_server_custom.strip() + if custom: + return custom + server = self.ntp_server.strip() + return server if server and server != "custom" else DEFAULT_NTP_SERVERS[0] + + def _apply_ntp_result(self, result: dict[str, Any]) -> bool: + now_monotonic = self.monotonic_fn() + if not result.get("ok", True): + self.latest_ntp_sample = { + "server": result.get("server", self._effective_ntp_server()), + "valid": False, + "error": result.get("message", "NTP query failed"), + "monotonic": now_monotonic, + "received_unix": self.time_fn(), + } + return self._set_ntp_state( + "unavailable", str(result.get("message") or "NTP query failed") + ) + + ntp_dt = result.get("time") + if not isinstance(ntp_dt, datetime.datetime): + self.latest_ntp_sample = { + "server": result.get("server", self._effective_ntp_server()), + "valid": False, + "error": "NTP response did not include time", + "monotonic": now_monotonic, + "received_unix": self.time_fn(), + } + return self._set_ntp_state("invalid_sample", "NTP response missing time") + + ntp_dt = _utc_datetime(ntp_dt) + delay_seconds = result.get("delay_seconds") + valid = _as_bool(result.get("valid", True), True) + sample = { + "time": ntp_dt.isoformat(), + "server": result.get("server", self._effective_ntp_server()), + "address": result.get("address"), + "valid": valid, + "stratum": result.get("stratum"), + "leap": result.get("leap"), + "offset_seconds": result.get("offset_seconds"), + "delay_seconds": delay_seconds, + "root_delay_seconds": result.get("root_delay_seconds"), + "root_dispersion_seconds": result.get("root_dispersion_seconds"), + "quality_seconds": result.get("quality_seconds"), + "system_offset_seconds": ntp_dt.timestamp() - self.time_fn(), + "monotonic": now_monotonic, + "received_unix": result.get("received_unix", self.time_fn()), + } + self.latest_ntp_sample = sample + + if not valid: + return self._set_ntp_state( + "low_quality", "NTP server response was not valid" + ) + if ( + isinstance(delay_seconds, (int, float)) + and delay_seconds > self.ntp_max_delay_seconds + ): + return self._set_ntp_state( + "low_quality", + "NTP delay {delay:.3f}s exceeds {limit:.3f}s".format( + delay=delay_seconds, limit=self.ntp_max_delay_seconds + ), + ) + return self._set_ntp_state("stable", "NTP time is available") + + def _run_ntp_query(self, server: str) -> None: + try: + result = self.ntp_client.query(server, self.ntp_timeout_seconds) + result = dict(result) + result.setdefault("ok", True) + except Exception as exc: + result = {"ok": False, "server": server, "message": str(exc)} + + with self.ntp_lock: + self.ntp_pending_result = result + self.ntp_query_in_progress = False + + def _consume_ntp_result(self) -> bool: + with self.ntp_lock: + result = self.ntp_pending_result + self.ntp_pending_result = None + if result is None: + return False + return self._apply_ntp_result(result) + + def _poll_ntp(self, now_monotonic: float) -> bool: + changed = self._consume_ntp_result() + if not self.time_sync_enabled or not self.ntp_enabled: + changed = ( + self._set_ntp_state("disabled", "NTP time source disabled") or changed + ) + return changed + + if ( + self.latest_ntp_sample is not None + and now_monotonic - self.latest_ntp_sample["monotonic"] + > self.ntp_stale_seconds + ): + changed = self._set_ntp_state("stale", "NTP sample is stale") or changed + + if self.ntp_query_in_progress: + return ( + self._set_ntp_state("querying", "Querying NTP server") or changed + ) + + due = ( + self.last_ntp_poll_monotonic is None + or now_monotonic - self.last_ntp_poll_monotonic + >= self.ntp_poll_interval_seconds + ) + if not due: + return changed + + server = self._effective_ntp_server() + self.last_ntp_poll_monotonic = now_monotonic + self.ntp_query_started_monotonic = now_monotonic + self.ntp_query_in_progress = True + + if self.ntp_async: + thread = threading.Thread( + target=self._run_ntp_query, + args=(server,), + name="PiFinderNTP", + daemon=True, + ) + thread.start() + return self._set_ntp_state("querying", "Querying NTP server") or changed + + self._run_ntp_query(server) + return self._consume_ntp_result() or changed + + def _sync_block_reason(self) -> Optional[tuple[str, str]]: + if not self.time_sync_enabled: + return "disabled", "Time sync disabled" + if self.selected_source is None: + return "waiting_for_time_source", "Waiting for a stable time source" + if self.state != "stable": + return ( + "waiting_for_time_source", + f"Waiting for stable time source; current state is {self.state}", + ) + if self._selected_datetime() is None: + return "waiting_for_time_source", "Selected time could not be parsed" + return None + + def _cooldown_active( + self, last_monotonic: Optional[float], min_interval_seconds: float + ) -> bool: + if last_monotonic is None: + return False + return self.monotonic_fn() - last_monotonic < min_interval_seconds + + def _system_clock_request_action( + self, gps_dt: datetime.datetime + ) -> tuple[bool, Optional[dict[str, Any]]]: + if not self.system_clock_sync_enabled: + changed = self._set_system_clock_sync_state( + "disabled", "System clock sync disabled" + ) + return changed, None + + offset_seconds = gps_dt.timestamp() - self.time_fn() + if abs(offset_seconds) <= self.system_clock_sync_step_threshold_seconds: + changed = self._set_system_clock_sync_state( + "in_sync", + "System clock offset is within the configured threshold", + offset_seconds, + ) + return changed, None + + if self._cooldown_active( + self.last_system_clock_request_monotonic, + self.system_clock_sync_min_interval_seconds, + ): + changed = self._set_system_clock_sync_state( + "cooldown", + "Waiting before the next system clock sync request", + offset_seconds, + ) + return changed, None + + return False, { + "enabled": True, + "offset_seconds": offset_seconds, + "step_threshold_seconds": self.system_clock_sync_step_threshold_seconds, + "min_interval_seconds": self.system_clock_sync_min_interval_seconds, + } + + def _rtc_request_action( + self, gps_dt: datetime.datetime + ) -> tuple[bool, Optional[dict[str, Any]]]: + del gps_dt + if not self.rtc_sync_enabled: + changed = self._set_rtc_sync_state("disabled", "RTC sync disabled") + return changed, None + + if self._cooldown_active( + self.last_rtc_request_monotonic, self.rtc_sync_min_interval_seconds + ): + changed = self._set_rtc_sync_state( + "cooldown", "Waiting before the next RTC sync request" + ) + return changed, None + + return False, { + "enabled": True, + "min_interval_seconds": self.rtc_sync_min_interval_seconds, + } + + def _request_id(self, actions: dict[str, Any]) -> str: + action_names = "-".join(sorted(actions)) + return f"{int(self.monotonic_fn() * 1000)}-{action_names}" + + def _write_sync_request( + self, + sync_dt: datetime.datetime, + actions: dict[str, Any], + ) -> bool: + latest = self.latest_sample or {} + selected = self.selected_source or {} + payload = { + "version": 1, + "request_id": self._request_id(actions), + "created_monotonic": self.monotonic_fn(), + "created_unix": self.time_fn(), + "sync_time": sync_dt.isoformat(), + "gps_time": sync_dt.isoformat(), + "monitor_state": self.state, + "status_file": str(self.status_file), + "helper_status_file": str(self.helper_status_file), + "actions": actions, + "selected": { + "source": selected.get("source"), + "time": selected.get("time"), + "valid": selected.get("valid"), + "quality_seconds": selected.get("quality_seconds"), + "server": selected.get("server"), + "delay_seconds": selected.get("delay_seconds"), + "tAcc_ns": selected.get("tAcc_ns"), + }, + "latest": { + "source": latest.get("source"), + "valid": latest.get("valid"), + "tAcc_ns": latest.get("tAcc_ns"), + "message_class": latest.get("message_class"), + }, + "sources": { + "gps": self._gps_candidate(), + "ntp": self._ntp_candidate(), + }, + "samples": { + "count": len(self.samples), + "min_required": self.min_samples, + }, + } + result = self.request_writer.write_request(payload) + if not result.get("ok"): + message = str(result.get("message") or "Could not write sync request") + changed = False + if "system_clock" in actions: + changed = ( + self._set_system_clock_sync_state("request_error", message) + or changed + ) + if "rtc" in actions: + changed = self._set_rtc_sync_state("request_error", message) or changed + return changed + + now_monotonic = self.monotonic_fn() + sync_time = sync_dt.isoformat() + message = str(result.get("message") or "Sync request written") + changed = False + if "system_clock" in actions: + self.system_clock_request_count += 1 + self.last_system_clock_request_monotonic = now_monotonic + self.last_system_clock_request_utc = sync_time + changed = ( + self._set_system_clock_sync_state( + "requested", + "System clock sync requested for privileged helper", + actions["system_clock"].get("offset_seconds"), + ) + or changed + ) + if "rtc" in actions: + self.rtc_request_count += 1 + self.last_rtc_request_monotonic = now_monotonic + self.last_rtc_request_utc = sync_time + changed = ( + self._set_rtc_sync_state( + "requested", "RTC sync requested for privileged helper" + ) + or changed + ) + logger.info("Time sync helper request written: %s", message) + return changed + + def _clear_sync_request(self) -> None: + self.request_writer.clear_request() + + def _refresh_action_wait_states(self) -> bool: + changed = False + block_reason = self._sync_block_reason() + if block_reason is not None: + block_state, block_message = block_reason + if self.system_clock_sync_enabled: + changed = ( + self._set_system_clock_sync_state(block_state, block_message) + or changed + ) + else: + changed = ( + self._set_system_clock_sync_state( + "disabled", "System clock sync disabled" + ) + or changed + ) + if self.rtc_sync_enabled: + changed = self._set_rtc_sync_state(block_state, block_message) or changed + else: + changed = ( + self._set_rtc_sync_state("disabled", "RTC sync disabled") or changed + ) + return changed + + def _maybe_apply_sync_actions(self) -> bool: + block_changed = self._refresh_action_wait_states() + if self._sync_block_reason() is not None: + self._clear_sync_request() + return block_changed + + sync_dt = self._selected_datetime() + if sync_dt is None: + return block_changed + + changed, system_clock_action = self._system_clock_request_action(sync_dt) + changed = changed or block_changed + rtc_changed, rtc_action = self._rtc_request_action(sync_dt) + changed = rtc_changed or changed + + actions = {} + if system_clock_action is not None: + actions["system_clock"] = system_clock_action + if rtc_action is not None: + actions["rtc"] = rtc_action + + if actions: + changed = self._write_sync_request(sync_dt, actions) or changed + return changed + + def _estimated_utc_for_monotonic( + self, tick_monotonic: float + ) -> Optional[datetime.datetime]: + if self.latest_sample is None: + return None + gps_time = self.latest_sample.get("gps_time") + sample_monotonic = self.latest_sample.get("monotonic") + if not gps_time or sample_monotonic is None: + return None + try: + gps_dt = datetime.datetime.fromisoformat(gps_time) + except ValueError: + return None + return _utc_datetime(gps_dt) + datetime.timedelta( + seconds=tick_monotonic - sample_monotonic + ) + + def _poll_software_pps(self, now_monotonic: float) -> bool: + if not self.software_pps_enabled: + self.next_pps_tick_monotonic = None + return False + + if self.next_pps_tick_monotonic is None: + interval = self.software_pps_interval_seconds + self.next_pps_tick_monotonic = ( + math.floor(now_monotonic / interval) + 1 + ) * interval + return False + + ticked = False + while now_monotonic >= self.next_pps_tick_monotonic: + tick_monotonic = self.next_pps_tick_monotonic + self.pps_tick_count += 1 + self.last_pps_tick_monotonic = tick_monotonic + self.last_pps_tick_estimated_utc = self._estimated_utc_for_monotonic( + tick_monotonic + ) + self.next_pps_tick_monotonic += self.software_pps_interval_seconds + ticked = True + return ticked + + def poll(self) -> None: + if not self._active(): + return + + now_monotonic = self.monotonic_fn() + ntp_changed = self._poll_ntp(now_monotonic) + ticked = self._poll_software_pps(now_monotonic) + changed = False + + if self.enabled: + if self.latest_sample is None: + changed = ( + self._set_gps_state("waiting_for_gps_time", "Waiting for GPS time") + or changed + ) + elif now_monotonic - self.latest_sample["monotonic"] > self.stale_seconds: + changed = self._set_gps_state( + "stale", + f"No GPS time sample for more than {self.stale_seconds:.0f}s", + ) + elif self.software_pps_enabled: + changed = ( + self._set_state("software_pps_only", "Software PPS enabled") or changed + ) + + changed = self._evaluate_state() or changed or ntp_changed + changed = self._maybe_apply_sync_actions() or changed + self.write_status(force=changed or ticked) + + def note_reset(self) -> None: + if not self._active(): + return + self.samples.clear() + self.latest_sample = None + changed = self._set_gps_state("waiting_for_gps_time", "PiFinder datetime reset") + changed = self._evaluate_state() or changed + changed = self._refresh_action_wait_states() or changed + self.write_status(force=changed) + + def _read_helper_status(self) -> Optional[dict[str, Any]]: + try: + with open(self.helper_status_file, "r", encoding="utf-8") as helper_in: + payload = json.load(helper_in) + except FileNotFoundError: + return None + except Exception: + logger.exception("Could not read time sync helper status") + return {"state": "read_error"} + return payload if isinstance(payload, dict) else {"state": "invalid_status"} + + def status_payload(self) -> dict[str, Any]: + stats = self._offset_stats() + latest = self.latest_sample or {} + ntp_latest = self.latest_ntp_sample or {} + age = None + if latest.get("monotonic") is not None: + age = self.monotonic_fn() - latest["monotonic"] + ntp_age = None + if ntp_latest.get("monotonic") is not None: + ntp_age = self.monotonic_fn() - ntp_latest["monotonic"] + + return { + "enabled": self.time_sync_enabled, + "time_sync_enabled": self.time_sync_enabled, + "state": self.state, + "message": self.message, + "updated_unix": self.time_fn(), + "source_mode": self.source_mode, + "selected": self.selected_source, + "gps_time_sync_enabled": self.enabled, + "gps_time_sync_state": self.gps_state, + "gps_time_sync_message": self.gps_message, + "ntp_time_sync_enabled": self.ntp_enabled, + "ntp_time_sync_state": self.ntp_state, + "ntp_time_sync_message": self.ntp_message, + "system_clock_sync_enabled": self.system_clock_sync_enabled, + "system_clock_sync_state": self.system_clock_sync_state, + "rtc_sync_enabled": self.rtc_sync_enabled, + "rtc_sync_state": self.rtc_sync_state, + "samples": { + "count": len(self.samples), + "min_required": self.min_samples, + "window_seconds": self.sample_window_seconds, + "stale_seconds": self.stale_seconds, + }, + "latest": { + "gps_time": latest.get("gps_time"), + "source": latest.get("source"), + "valid": latest.get("valid"), + "tAcc_ns": latest.get("tAcc_ns"), + "message_class": latest.get("message_class"), + "lock_type": latest.get("lock_type"), + "mode": latest.get("mode"), + "satellites_seen": latest.get("satellites_seen"), + "satellites_used": latest.get("satellites_used"), + "hdop": latest.get("hdop"), + "pdop": latest.get("pdop"), + "reference_time": latest.get("reference_time"), + "offset_seconds": latest.get("offset_seconds"), + "system_offset_seconds": latest.get("system_offset_seconds"), + "age_seconds": age, + }, + "ntp": { + "enabled": self.ntp_enabled, + "state": self.ntp_state, + "message": self.ntp_message, + "server": ntp_latest.get("server", self._effective_ntp_server()), + "configured_server": self.ntp_server, + "custom_server": self.ntp_server_custom, + "time": ntp_latest.get("time"), + "valid": ntp_latest.get("valid"), + "stratum": ntp_latest.get("stratum"), + "leap": ntp_latest.get("leap"), + "offset_seconds": ntp_latest.get("offset_seconds"), + "delay_seconds": ntp_latest.get("delay_seconds"), + "root_delay_seconds": ntp_latest.get("root_delay_seconds"), + "root_dispersion_seconds": ntp_latest.get("root_dispersion_seconds"), + "quality_seconds": ntp_latest.get("quality_seconds"), + "system_offset_seconds": ntp_latest.get("system_offset_seconds"), + "age_seconds": ntp_age, + "last_poll_monotonic": self.last_ntp_poll_monotonic, + "poll_interval_seconds": self.ntp_poll_interval_seconds, + "timeout_seconds": self.ntp_timeout_seconds, + "max_delay_seconds": self.ntp_max_delay_seconds, + "error": ntp_latest.get("error"), + }, + "sources": { + "gps": { + "enabled": self.enabled, + "state": self.gps_state, + "message": self.gps_message, + "candidate": self._gps_candidate(), + }, + "ntp": { + "enabled": self.ntp_enabled, + "state": self.ntp_state, + "message": self.ntp_message, + "candidate": self._ntp_candidate(), + }, + }, + "offset": stats, + "thresholds": { + "max_tAcc_ns": self.max_tacc_ns, + "stable_jitter_seconds": self.stable_jitter_seconds, + "stable_offset_seconds": self.stable_offset_seconds, + "system_clock_sync_step_threshold_seconds": ( + self.system_clock_sync_step_threshold_seconds + ), + }, + "system_clock_sync": { + "enabled": self.system_clock_sync_enabled, + "state": self.system_clock_sync_state, + "message": self.system_clock_sync_message, + "request_count": self.system_clock_request_count, + "min_interval_seconds": self.system_clock_sync_min_interval_seconds, + "last_request_monotonic": self.last_system_clock_request_monotonic, + "last_request_utc": self.last_system_clock_request_utc, + "last_offset_seconds": self.last_system_clock_offset_seconds, + }, + "rtc_sync": { + "enabled": self.rtc_sync_enabled, + "state": self.rtc_sync_state, + "message": self.rtc_sync_message, + "request_count": self.rtc_request_count, + "min_interval_seconds": self.rtc_sync_min_interval_seconds, + "last_request_monotonic": self.last_rtc_request_monotonic, + "last_request_utc": self.last_rtc_request_utc, + }, + "helper": self._read_helper_status(), + "software_pps": { + "enabled": self.software_pps_enabled, + "interval_seconds": self.software_pps_interval_seconds, + "tick_count": self.pps_tick_count, + "last_tick_monotonic": self.last_pps_tick_monotonic, + "last_tick_estimated_utc": ( + self.last_pps_tick_estimated_utc.isoformat() + if self.last_pps_tick_estimated_utc + else None + ), + }, + } + + def write_status(self, force: bool = False) -> None: + if not self._active() and not force: + return + + now_monotonic = self.monotonic_fn() + if ( + not force + and self.last_status_write_monotonic is not None + and now_monotonic - self.last_status_write_monotonic + < self.status_write_interval_seconds + ): + return + + try: + utils.create_path(self.status_file.parent) + with open(self.status_file, "w", encoding="utf-8") as status_out: + json.dump(self.status_payload(), status_out, indent=2, sort_keys=True) + self.last_status_write_monotonic = now_monotonic + except Exception: + logger.exception("Could not write time sync status") diff --git a/python/PiFinder/gps_time_sync_helper.py b/python/PiFinder/gps_time_sync_helper.py new file mode 100644 index 000000000..b1f7d5905 --- /dev/null +++ b/python/PiFinder/gps_time_sync_helper.py @@ -0,0 +1,301 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- +""" +Privileged helper for time-source disciplined system clock and RTC updates. + +PiFinder itself runs as the normal PiFinder user and writes a constrained JSON +request. This helper is intended to run as root under systemd and performs only +the requested clock operations after validating the request. +""" + +from __future__ import annotations + +import argparse +import datetime +import json +import logging +import os +import subprocess +import time +from pathlib import Path +from typing import Any, Callable, Optional + +from PiFinder.gps_time_sync import ( + HELPER_STATUS_FILE, + REQUEST_FILE, + _read_boot_id, + _utc_datetime, +) +from PiFinder import utils + + +logger = logging.getLogger("GPS.TimeSync.Helper") + + +class ClockCommandRunner: + command_timeout_seconds = 10 + + def __init__(self, dry_run: bool = False): + self.dry_run = dry_run + + def _run(self, command: list[str]) -> dict[str, Any]: + if self.dry_run: + return {"ok": True, "message": "dry run: " + " ".join(command)} + + try: + result = subprocess.run( + command, + capture_output=True, + check=False, + text=True, + timeout=self.command_timeout_seconds, + ) + except Exception as exc: + return {"ok": False, "message": str(exc)} + + output = (result.stdout + result.stderr).strip() + if result.returncode == 0: + return {"ok": True, "message": output or "command completed"} + return { + "ok": False, + "message": output or f"command exited with {result.returncode}", + } + + def set_system_clock(self, gps_dt: datetime.datetime) -> dict[str, Any]: + gps_dt = _utc_datetime(gps_dt) + return self._run(["/usr/bin/date", "-u", "--set", f"@{gps_dt.timestamp():.6f}"]) + + def set_rtc(self, gps_dt: datetime.datetime) -> dict[str, Any]: + gps_dt = _utc_datetime(gps_dt) + rtc_date = gps_dt.strftime("%Y-%m-%d %H:%M:%S UTC") + return self._run(["/usr/sbin/hwclock", "--utc", "--set", "--date", rtc_date]) + + +class GpsTimeSyncHelper: + def __init__( + self, + request_file: Path = REQUEST_FILE, + status_file: Path = HELPER_STATUS_FILE, + max_request_age_seconds: float = 120.0, + poll_interval_seconds: float = 1.0, + runner: Optional[ClockCommandRunner] = None, + time_fn: Callable[[], float] = time.time, + monotonic_fn: Callable[[], float] = time.monotonic, + boot_id_fn: Callable[[], str] = _read_boot_id, + ): + self.request_file = request_file + self.status_file = status_file + self.max_request_age_seconds = max(1.0, max_request_age_seconds) + self.poll_interval_seconds = max(0.1, poll_interval_seconds) + self.runner = runner or ClockCommandRunner() + self.time_fn = time_fn + self.monotonic_fn = monotonic_fn + self.boot_id_fn = boot_id_fn + self.last_processed_request_id = self._last_processed_request_id() + + def _read_json_file(self, path: Path) -> Optional[dict[str, Any]]: + try: + with open(path, "r", encoding="utf-8") as file_in: + payload = json.load(file_in) + except FileNotFoundError: + return None + if not isinstance(payload, dict): + raise ValueError(f"{path} does not contain a JSON object") + return payload + + def _last_processed_request_id(self) -> Optional[str]: + try: + status = self._read_json_file(self.status_file) + except Exception: + return None + if not status: + return None + request_id = status.get("last_request_id") + return str(request_id) if request_id else None + + def _write_status(self, payload: dict[str, Any]) -> None: + status = { + "state": payload.get("state", "unknown"), + "message": payload.get("message", ""), + "updated_unix": self.time_fn(), + "updated_monotonic": self.monotonic_fn(), + "effective_uid": os.geteuid(), + "request_file": str(self.request_file), + } + status.update(payload) + + utils.create_path(self.status_file.parent) + tmp_file = self.status_file.with_name(self.status_file.name + ".tmp") + with open(tmp_file, "w", encoding="utf-8") as status_out: + json.dump(status, status_out, indent=2, sort_keys=True) + tmp_file.replace(self.status_file) + + def _validate_request(self, request: dict[str, Any]) -> tuple[dict[str, Any], str]: + if request.get("version") != 1: + raise ValueError("unsupported request version") + + request_id = str(request.get("request_id") or "") + if not request_id: + raise ValueError("request_id is missing") + + if request_id == self.last_processed_request_id: + return {}, "already_processed" + + boot_id = str(request.get("boot_id") or "") + if boot_id != self.boot_id_fn(): + raise ValueError("request was created during a different boot") + + created_monotonic = float(request.get("created_monotonic")) + age = self.monotonic_fn() - created_monotonic + if age < 0: + raise ValueError("request monotonic timestamp is in the future") + if age > self.max_request_age_seconds: + raise ValueError( + f"request is stale: {age:.1f}s > {self.max_request_age_seconds:.1f}s" + ) + + if request.get("monitor_state") != "stable": + raise ValueError("request monitor_state is not stable") + + selected = request.get("selected") + if not isinstance(selected, dict): + selected = request.get("latest") + if not isinstance(selected, dict) or selected.get("valid") is not True: + raise ValueError("request selected time source is not valid") + + actions = request.get("actions") + if not isinstance(actions, dict) or not actions: + raise ValueError("request contains no actions") + + sync_time = request.get("sync_time", request.get("gps_time")) + if not isinstance(sync_time, str): + raise ValueError("sync_time is missing") + try: + sync_dt = _utc_datetime(datetime.datetime.fromisoformat(sync_time)) + except ValueError as exc: + raise ValueError("sync_time is invalid") from exc + + return { + "request_id": request_id, + "sync_dt": sync_dt, + "selected": selected, + "actions": actions, + "age_seconds": age, + }, "ready" + + def _process_system_clock( + self, gps_dt: datetime.datetime, action: dict[str, Any] + ) -> dict[str, Any]: + threshold = float(action.get("step_threshold_seconds", 0.5)) + offset_seconds = gps_dt.timestamp() - self.time_fn() + if abs(offset_seconds) <= threshold: + return { + "state": "in_sync", + "message": "System clock offset is within threshold", + "offset_seconds": offset_seconds, + } + + result = self.runner.set_system_clock(gps_dt) + return { + "state": "synced" if result.get("ok") else "error", + "message": str(result.get("message") or ""), + "offset_seconds": offset_seconds, + } + + def _process_rtc( + self, gps_dt: datetime.datetime, action: dict[str, Any] + ) -> dict[str, Any]: + del action + result = self.runner.set_rtc(gps_dt) + return { + "state": "synced" if result.get("ok") else "error", + "message": str(result.get("message") or ""), + } + + def process_once(self) -> dict[str, Any]: + try: + request = self._read_json_file(self.request_file) + if request is None: + status = {"state": "idle", "message": "No sync request"} + self._write_status(status) + return status + + parsed, request_state = self._validate_request(request) + if request_state == "already_processed": + status = { + "state": "idle", + "message": "Request already processed", + "last_request_id": self.last_processed_request_id, + } + return status + except Exception as exc: + status = {"state": "invalid_request", "message": str(exc)} + self._write_status(status) + return status + + sync_dt = parsed["sync_dt"] + actions = parsed["actions"] + results = {} + + if actions.get("system_clock", {}).get("enabled"): + results["system_clock"] = self._process_system_clock( + sync_dt, actions["system_clock"] + ) + if actions.get("rtc", {}).get("enabled"): + results["rtc"] = self._process_rtc(sync_dt, actions["rtc"]) + + result_states = [result.get("state") for result in results.values()] + if any(state == "error" for state in result_states): + state = "partial_error" if any(state != "error" for state in result_states) else "error" + elif results: + state = "completed" + else: + state = "skipped" + + self.last_processed_request_id = parsed["request_id"] + status = { + "state": state, + "message": "Time sync request processed", + "last_request_id": parsed["request_id"], + "last_sync_time": sync_dt.isoformat(), + "last_gps_time": sync_dt.isoformat(), + "selected": parsed["selected"], + "request_age_seconds": parsed["age_seconds"], + "results": results, + } + self._write_status(status) + return status + + def run_forever(self) -> None: + self._write_status({"state": "idle", "message": "Helper started"}) + while True: + try: + self.process_once() + except Exception: + logger.exception("Unexpected time sync helper error") + time.sleep(self.poll_interval_seconds) + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--once", action="store_true", help="Process one request and exit") + parser.add_argument("--dry-run", action="store_true", help="Log commands without running them") + parser.add_argument("--interval", type=float, default=1.0, help="Polling interval in seconds") + parser.add_argument("--max-age", type=float, default=120.0, help="Maximum request age in seconds") + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + helper = GpsTimeSyncHelper( + max_request_age_seconds=args.max_age, + poll_interval_seconds=args.interval, + runner=ClockCommandRunner(dry_run=args.dry_run), + ) + if args.once: + status = helper.process_once() + print(json.dumps(status, indent=2, sort_keys=True)) + return + helper.run_forever() + + +if __name__ == "__main__": + main() diff --git a/python/PiFinder/gps_ubx.py b/python/PiFinder/gps_ubx.py index dc56bf07e..e54ca436d 100644 --- a/python/PiFinder/gps_ubx.py +++ b/python/PiFinder/gps_ubx.py @@ -15,6 +15,42 @@ MAX_GPS_ERROR = 50000 # 50 km +def _time_accuracy_ns(msg): + if "tAcc_ns" in msg: + try: + return int(msg["tAcc_ns"]) + except (TypeError, ValueError): + return -1 + + tacc = msg.get("tAcc", -1) + try: + tacc_value = float(tacc) + except (TypeError, ValueError): + return -1 + + if tacc_value < 0: + return -1 + return int(round(tacc_value * 1_000_000_000)) + + +def _gps_time_message(msg, info=None): + if not msg.get("time"): + return None + + valid = bool(msg.get("valid", True)) + return ( + "time" if valid else "time_sample", + { + "time": msg["time"], + "tAcc": _time_accuracy_ns(msg), + "source": "GPS" if not info else info, + "message_class": msg.get("class", "unknown"), + "lock_type": msg.get("mode"), + "valid": valid, + }, + ) + + async def process_messages( parser_iterator, gps_queue, console_queue, error_info, wait=0, info=None ): @@ -80,21 +116,17 @@ async def process_messages( logger.debug("GPS fix: %s", msg) elif msg_class == "NAV-TIMEGPS": - if "time" in msg and "valid" in msg and msg["valid"]: - gps_queue.put( - ( - "time", - { - "time": msg["time"], - "tAcc": msg["tAcc"] if "tAcc" in msg else -1, - "source": "GPS" if not info else info, - }, - ) - ) + time_msg = _gps_time_message(msg, info=info) + if time_msg is not None: + gps_queue.put(time_msg) else: - logger.debug(f"TIMEGPS message does not qualify: {msg}") + logger.debug("TIMEGPS message has no time: %s", msg) elif msg_class == "NAV-PVT": + time_msg = _gps_time_message(msg, info=info) + if time_msg is not None: + gps_queue.put(time_msg) + if all(k in msg for k in ["lat", "lon", "altHAE", "hAcc", "vAcc"]): if not gps_locked and msg["hAcc"] < MAX_GPS_ERROR: gps_locked = True diff --git a/python/PiFinder/gps_ubx_parser.py b/python/PiFinder/gps_ubx_parser.py index f0bb7da40..3979db6ec 100644 --- a/python/PiFinder/gps_ubx_parser.py +++ b/python/PiFinder/gps_ubx_parser.py @@ -419,6 +419,7 @@ def _parse_nav_timegps(self, data: bytes) -> dict: "leapSeconds": leapS, "valid": bool(valid & 0x01), "tAcc": tAcc * 1e-9, + "tAcc_ns": tAcc, } logger.debug(f"NAV-TIMEGPS result: {result}") return result @@ -473,8 +474,9 @@ def _parse_nav_pvt(self, data: bytes) -> dict: hour = data[8] minute = data[9] seconds = data[10] - tAcc = int.from_bytes(data[24:28], "little", signed=False) / 1e9 # nano seconds - nano = int.from_bytes(data[24:28], "little", signed=True) / 1e9 # nano seconds + valid = data[11] + tAcc_ns = int.from_bytes(data[12:16], "little", signed=False) + nano_ns = int.from_bytes(data[16:20], "little", signed=True) gpsFix = data[20] numSV = data[23] lon = int.from_bytes(data[24:28], "little", signed=True) / 1e7 @@ -494,6 +496,20 @@ def _parse_nav_pvt(self, data: bytes) -> dict: pDOP = ( int.from_bytes(data[76:78], "little", signed=False) / 100.0 ) # position DOP + valid_time = bool((valid & 0x03) == 0x03) + utc_time = None + try: + utc_time = datetime.datetime( + year, + month, + day, + hour, + minute, + seconds, + tzinfo=datetime.timezone.utc, + ) + datetime.timedelta(seconds=nano_ns * 1e-9) + except ValueError: + valid_time = False result = { "class": "NAV-PVT", @@ -503,8 +519,11 @@ def _parse_nav_pvt(self, data: bytes) -> dict: "UTChour": hour, "UTCminute": minute, "UTCseconds": seconds, - "UTCnano": nano, - "tAcc": tAcc, + "UTCnano": nano_ns, + "time": utc_time, + "valid": valid_time, + "tAcc": tAcc_ns * 1e-9, + "tAcc_ns": tAcc_ns, "mode": gpsFix, "lat": lat, "lon": lon, diff --git a/python/PiFinder/main.py b/python/PiFinder/main.py index 8754ff10d..1944fe579 100644 --- a/python/PiFinder/main.py +++ b/python/PiFinder/main.py @@ -37,6 +37,7 @@ from PiFinder import utils from PiFinder import server from PiFinder import keyboard_interface +from PiFinder import gps_time_sync import PiFinder.sound as sound from PiFinder.types.sound import Earcon, SetVolume @@ -407,6 +408,8 @@ def main( "integrator": integrator_command_queue, } cfg = config.Config() + gps_time_monitor = gps_time_sync.GpsTimeSyncMonitor.from_config(cfg) + gps_time_monitor.write_startup_status() # init screen screen_brightness = cfg.get_option("display_brightness") @@ -698,6 +701,8 @@ def main( # handles power-save by sleeping longer when asleep. sleep_for_framerate(shared_state) + gps_time_monitor.poll() + # GPS try: while True: # Consume from gps_queue until empty @@ -759,17 +764,25 @@ def main( gps_dt = gps_content else: gps_dt = gps_content["time"] + gps_time_monitor.observe_time( + gps_content, shared_state.datetime() + ) shared_state.set_datetime( gps_dt, force=(gps_msg == "time_force") ) if log_time: logger.info("GPS Time (logged only once): %s", gps_dt) log_time = False + if gps_msg == "time_sample": + gps_time_monitor.observe_time( + gps_content, shared_state.datetime() + ) if gps_msg == "reset": location.reset() shared_state.set_location(location) if gps_msg == "reset_datetime": shared_state.reset_datetime() + gps_time_monitor.note_reset() if gps_msg == "satellites": # logger.debug("Main: GPS nr sats seen: %s", gps_content) shared_state.set_sats(gps_content) @@ -787,6 +800,7 @@ def main( menu_manager.jump_to_label("recent") elif ui_command == "reload_config": cfg.load_config() + gps_time_monitor.update_config(cfg) elif ui_command == "catalogs_fully_loaded": logger.info( "All catalogs loaded - WDS and extended catalogs available" diff --git a/python/PiFinder/ui/callbacks.py b/python/PiFinder/ui/callbacks.py index dcdb35fb9..80f834750 100644 --- a/python/PiFinder/ui/callbacks.py +++ b/python/PiFinder/ui/callbacks.py @@ -102,6 +102,44 @@ def apply_sound_volume(ui_module: UIModule) -> None: ui_module.command_queues["ui_queue"].put("set_volume") +def reload_config(ui_module: UIModule) -> None: + """Ask the main loop to reload config-backed runtime services.""" + ui_module.command_queues["ui_queue"].put("reload_config") + ui_module.message(_("Config updated"), 1) + + +def get_custom_ntp_server_display(ui_module: UIModule) -> str: + server = ui_module.config_object.get_option("ntp_server_custom", "") + return f" ({server})" if server else "" + + +def edit_custom_ntp_server(ui_module: UIModule) -> None: + """Open a text-entry screen for a custom NTP server.""" + + def _save(server: str) -> None: + server = server.strip() + if not server: + ui_module.message(_("NTP server\nunchanged"), 2) + return + ui_module.config_object.set_option("ntp_server_custom", server) + ui_module.config_object.set_option("ntp_server", "custom") + ui_module.command_queues["ui_queue"].put("reload_config") + ui_module.message(_("NTP server\nsaved"), 2) + + item_definition = { + "name": _("Custom NTP Server"), + "class": UITextEntry, + "mode": "text_entry", + "initial_text": ui_module.config_object.get_option( + "ntp_server_custom", "pool.ntp.org" + ), + "entry_title": _("NTP Server:"), + "max_length": 64, + "callback": _save, + } + ui_module.add_to_stack(item_definition) + + def capture_exposure_sweep(ui_module: UIModule) -> None: """ Captures 100 images at different exposures for PID testing/calibration. diff --git a/python/PiFinder/ui/gps_time_sync_status.py b/python/PiFinder/ui/gps_time_sync_status.py new file mode 100644 index 000000000..28d11a6e6 --- /dev/null +++ b/python/PiFinder/ui/gps_time_sync_status.py @@ -0,0 +1,286 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- +""" +UI screen for time-source sync and helper status. +""" + +import datetime +import json +from pathlib import Path +from typing import Any, TYPE_CHECKING + +from PiFinder.gps_time_sync import HELPER_STATUS_FILE, REQUEST_FILE, STATUS_FILE +from PiFinder.ui.base import UIModule +from PiFinder.ui.layout import rows_below_titlebar +from PiFinder.ui.ui_utils import TextLayouter + +if TYPE_CHECKING: + + def _(a) -> Any: + return a + + +def _get(payload: dict[str, Any] | None, *keys: str, default: Any = None) -> Any: + current: Any = payload + for key in keys: + if not isinstance(current, dict): + return default + current = current.get(key, default) + return current + + +class UIGPSTimeSyncStatus(UIModule): + """Read-only time-sync status screen.""" + + __title__ = "TIME SYNC" + _display_mode_list = ["summary", "details"] + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.text_layout = TextLayouter( + "", + draw=self.draw, + color=self.colors.get(255), + colors=self.colors, + font=self.fonts.base, + available_lines=rows_below_titlebar(self.display_class, gap=1).max_visible, + ) + self._last_display_mode = self.display_mode + + def _read_json(self, path: Path) -> dict[str, Any] | None: + try: + with open(path, "r", encoding="utf-8") as file_in: + payload = json.load(file_in) + except FileNotFoundError: + return None + except Exception as exc: + return {"state": "read_error", "message": str(exc)} + return payload if isinstance(payload, dict) else {"state": "invalid_json"} + + def _format_bool(self, value: Any) -> str: + if value is True: + return _("Yes") + if value is False: + return _("No") + return "--" + + def _format_age(self, seconds: Any) -> str: + if not isinstance(seconds, (int, float)): + return "--" + if seconds < 60: + return f"{seconds:.0f}s" + if seconds < 3600: + return f"{seconds / 60:.1f}m" + return f"{seconds / 3600:.1f}h" + + def _format_offset(self, seconds: Any) -> str: + if not isinstance(seconds, (int, float)): + return "--" + abs_seconds = abs(seconds) + if abs_seconds >= 86400: + return f"{seconds / 86400:.1f}d" + if abs_seconds >= 1: + return f"{seconds:.1f}s" + return f"{seconds * 1000:.0f}ms" + + def _format_tacc(self, ns_value: Any) -> str: + if not isinstance(ns_value, (int, float)) or ns_value < 0: + return "--" + if ns_value >= 1_000_000_000: + return f"{ns_value / 1_000_000_000:.1f}s" + if ns_value >= 1_000_000: + return f"{ns_value / 1_000_000:.1f}ms" + if ns_value >= 1_000: + return f"{ns_value / 1_000:.1f}us" + return f"{ns_value:.0f}ns" + + def _format_time(self, value: Any) -> str: + if not isinstance(value, str) or not value: + return "--" + try: + dt = datetime.datetime.fromisoformat(value) + except ValueError: + return value + return dt.strftime("%Y-%m-%d %H:%M:%S") + + def _status_bundle(self) -> tuple[dict[str, Any] | None, dict[str, Any] | None, bool]: + status = self._read_json(STATUS_FILE) + helper = _get(status, "helper") + if not isinstance(helper, dict): + helper = self._read_json(HELPER_STATUS_FILE) + request_present = REQUEST_FILE.exists() + return status, helper, request_present + + def _summary_lines( + self, + status: dict[str, Any] | None, + helper: dict[str, Any] | None, + request_present: bool, + ) -> list[str]: + if status is None: + return [ + _("No time sync status"), + _("Helper: {state}").format( + state=_get(helper, "state", default="--") + ), + _("Request: {present}").format( + present=_("Yes") if request_present else _("No") + ), + _("{square} Details").format(square=self._SQUARE_), + ] + + latest = _get(status, "latest", default={}) + selected = _get(status, "selected", default={}) + ntp = _get(status, "ntp", default={}) + system_clock = _get(status, "system_clock_sync", default={}) + rtc = _get(status, "rtc_sync", default={}) + software_pps = _get(status, "software_pps", default={}) + + source = _get(latest, "source", default="--") + message_class = _get(latest, "message_class", default="") + source_text = f"{source} {message_class}".strip() + pps_text = ( + _("On {ticks}").format(ticks=_get(software_pps, "tick_count", default=0)) + if _get(software_pps, "enabled") + else _("Off") + ) + + return [ + _("State: {state}").format(state=_get(status, "state", default="--")), + _("Selected: {source}").format( + source=_get(selected, "source", default="--") + ), + _("GPS valid: {valid}").format( + valid=self._format_bool(_get(latest, "valid")) + ), + _("Source: {source}").format(source=source_text or "--"), + _("NTP: {state}").format(state=_get(ntp, "state", default="--")), + _("NTP srv: {server}").format(server=_get(ntp, "server", default="--")), + _("tAcc: {tacc}").format(tacc=self._format_tacc(_get(latest, "tAcc_ns"))), + _("Sys: {state}").format( + state=_get(system_clock, "state", default="--") + ), + _("RTC: {state}").format(state=_get(rtc, "state", default="--")), + _("Helper: {state}").format(state=_get(helper, "state", default="--")), + _("Request: {present}").format( + present=_("Yes") if request_present else _("No") + ), + _("PPS: {state}").format(state=pps_text), + _("{square} Details").format(square=self._SQUARE_), + ] + + def _detail_lines( + self, + status: dict[str, Any] | None, + helper: dict[str, Any] | None, + request_present: bool, + ) -> list[str]: + if status is None: + return [ + _("Status file missing"), + str(STATUS_FILE), + _("Helper file: {state}").format( + state=_get(helper, "state", default="--") + ), + _("{square} Summary").format(square=self._SQUARE_), + ] + + latest = _get(status, "latest", default={}) + selected = _get(status, "selected", default={}) + ntp = _get(status, "ntp", default={}) + samples = _get(status, "samples", default={}) + system_clock = _get(status, "system_clock_sync", default={}) + rtc = _get(status, "rtc_sync", default={}) + software_pps = _get(status, "software_pps", default={}) + helper_results = _get(helper, "results", default={}) + + lines = [ + _("State: {state}").format(state=_get(status, "state", default="--")), + _("Msg: {message}").format(message=_get(status, "message", default="--")), + _("Selected: {source}").format( + source=_get(selected, "source", default="--") + ), + _("Sel time: {time}").format( + time=self._format_time(_get(selected, "time")) + ), + _("GPS: {time}").format( + time=self._format_time(_get(latest, "gps_time")) + ), + _("Age: {age}").format(age=self._format_age(_get(latest, "age_seconds"))), + _("Valid: {valid}").format(valid=self._format_bool(_get(latest, "valid"))), + _("tAcc: {tacc}").format(tacc=self._format_tacc(_get(latest, "tAcc_ns"))), + _("Offset: {offset}").format( + offset=self._format_offset(_get(latest, "offset_seconds")) + ), + _("Sys off: {offset}").format( + offset=self._format_offset(_get(latest, "system_offset_seconds")) + ), + _("NTP: {state}").format(state=_get(ntp, "state", default="--")), + _("NTP srv: {server}").format(server=_get(ntp, "server", default="--")), + _("NTP time: {time}").format( + time=self._format_time(_get(ntp, "time")) + ), + _("NTP delay: {delay}").format( + delay=self._format_offset(_get(ntp, "delay_seconds")) + ), + _("Samples: {count}/{min_required}").format( + count=_get(samples, "count", default=0), + min_required=_get(samples, "min_required", default="--"), + ), + _("Sys req: {state}").format( + state=_get(system_clock, "state", default="--") + ), + _("RTC req: {state}").format(state=_get(rtc, "state", default="--")), + _("PPS ticks: {ticks}").format( + ticks=_get(software_pps, "tick_count", default=0) + ), + _("Helper: {state}").format(state=_get(helper, "state", default="--")), + _("Helper UID: {uid}").format( + uid=_get(helper, "effective_uid", default="--") + ), + _("Helper msg: {message}").format( + message=_get(helper, "message", default="--") + ), + _("Req file: {present}").format( + present=_("Yes") if request_present else _("No") + ), + ] + + if isinstance(helper_results, dict): + if "system_clock" in helper_results: + lines.append( + _("Sys result: {state}").format( + state=_get(helper_results, "system_clock", "state", default="--") + ) + ) + if "rtc" in helper_results: + lines.append( + _("RTC result: {state}").format( + state=_get(helper_results, "rtc", "state", default="--") + ) + ) + + lines.append(_("{square} Summary").format(square=self._SQUARE_)) + return lines + + def update(self, force=False): + status, helper, request_present = self._status_bundle() + self.clear_screen() + if self.display_mode == "summary": + lines = self._summary_lines(status, helper, request_present) + else: + lines = self._detail_lines(status, helper, request_present) + + reset_pointer = self.display_mode != self._last_display_mode + self._last_display_mode = self.display_mode + self.text_layout.set_text("\n".join(lines), reset_pointer=reset_pointer) + self.text_layout.draw(pos=(0, self.display_class.titlebar_height)) + return self.screen_update() + + def key_up(self): + self.text_layout.previous() + self.update() + + def key_down(self): + self.text_layout.next() + self.update() diff --git a/python/PiFinder/ui/menu_structure.py b/python/PiFinder/ui/menu_structure.py index 5a8b97f66..f2506ab9c 100644 --- a/python/PiFinder/ui/menu_structure.py +++ b/python/PiFinder/ui/menu_structure.py @@ -6,6 +6,7 @@ from PiFinder.ui.console import UIConsole from PiFinder.ui.software import UISoftware from PiFinder.ui.gpsstatus import UIGPSStatus +from PiFinder.ui.gps_time_sync_status import UIGPSTimeSyncStatus from PiFinder.ui.chart import UIChart from PiFinder.ui.align import UIAlign from PiFinder.ui.align_daytime import UIAlignDaytime @@ -1149,6 +1150,169 @@ def _(key: str) -> Any: }, ], }, + { + "name": _("Time Sync"), + "class": UITextMenu, + "select": "single", + "items": [ + { + "name": _("Time Sync"), + "class": UITextMenu, + "select": "single", + "config_option": "time_sync_enabled", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("Source Mode"), + "class": UITextMenu, + "select": "single", + "config_option": "time_sync_source_mode", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Best"), + "value": "best", + }, + { + "name": _("GPS"), + "value": "gps", + }, + { + "name": _("NTP"), + "value": "ntp", + }, + ], + }, + { + "name": _("GPS Source"), + "class": UITextMenu, + "select": "single", + "config_option": "gps_time_sync", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("NTP Source"), + "class": UITextMenu, + "select": "single", + "config_option": "ntp_time_sync", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("NTP Server"), + "class": UITextMenu, + "select": "single", + "config_option": "ntp_server", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("pool.ntp.org"), + "value": "pool.ntp.org", + }, + { + "name": _("time.google.com"), + "value": "time.google.com", + }, + { + "name": _("time.cloudflare.com"), + "value": "time.cloudflare.com", + }, + { + "name": _("time.nist.gov"), + "value": "time.nist.gov", + }, + { + "name": _("Custom"), + "value": "custom", + "callback": callbacks.edit_custom_ntp_server, + "name_suffix_callback": ( + callbacks.get_custom_ntp_server_display + ), + }, + ], + }, + { + "name": _("Software PPS"), + "class": UITextMenu, + "select": "single", + "config_option": "software_pps", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("System Clock"), + "class": UITextMenu, + "select": "single", + "config_option": "time_sync_system_clock", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("RTC Sync"), + "class": UITextMenu, + "select": "single", + "config_option": "rtc_sync", + "post_callback": callbacks.reload_config, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + ], + }, ], }, { @@ -1200,6 +1364,10 @@ def _(key: str) -> Any: "name": _("GPS Status"), "class": UIGPSStatus, }, + { + "name": _("Time Sync"), + "class": UIGPSTimeSyncStatus, + }, { "name": _("Set Location"), "class": UITextMenu, diff --git a/python/PiFinder/ui/textentry.py b/python/PiFinder/ui/textentry.py index 284ec20d2..48b63fd40 100644 --- a/python/PiFinder/ui/textentry.py +++ b/python/PiFinder/ui/textentry.py @@ -90,6 +90,7 @@ def __init__(self, *args, **kwargs) -> None: self.text_entry_mode = self.item_definition.get("mode") == "text_entry" self.current_text = self.item_definition.get("initial_text", "") self.callback = self.item_definition.get("callback") + self.max_length = int(self.item_definition.get("max_length", 12)) self.width = self.display_class.resX self.height = self.display_class.resY @@ -319,7 +320,7 @@ def _perform_search(self, search_text, search_version): logger.error(f"Exception in _perform_search: {e}", exc_info=True) def add_char(self, char): - if len(self.current_text) >= 12: + if len(self.current_text) >= self.max_length: return self.current_text += char self.update_search_results() @@ -423,7 +424,9 @@ def update(self, force=False): # Set title based on mode (will be drawn by screen_update()) if self.text_entry_mode: - self.title = _("Enter Location Name:") + self.title = _( + self.item_definition.get("entry_title", "Enter Location Name:") + ) else: self.title = _("Search") self.draw_search_result_len() diff --git a/python/tests/test_gps_time_sources.py b/python/tests/test_gps_time_sources.py new file mode 100644 index 000000000..c261e8316 --- /dev/null +++ b/python/tests/test_gps_time_sources.py @@ -0,0 +1,167 @@ +import asyncio +import datetime +from queue import Queue + +import pytz + +from PiFinder.gps_gpsd import gpsd_sky_time_sample, gpsd_time_message +from PiFinder.gps_ubx import process_messages +from PiFinder.gps_ubx_parser import UBXParser + + +def test_gpsd_time_message_does_not_require_position_lock(): + gps_time = datetime.datetime(2026, 1, 1, 1, 2, 3, tzinfo=pytz.UTC) + + msg = gpsd_time_message({"time": gps_time, "mode": 1}, gps_locked=False) + + assert msg == ( + "time", + { + "time": gps_time, + "source": "GPSD", + "mode": 1, + "lock": False, + }, + ) + + +def test_gpsd_sky_time_is_monitor_only_sample(): + gps_time = datetime.datetime(2019, 4, 7, 14, 37, 23, tzinfo=pytz.UTC) + + msg = gpsd_sky_time_sample( + { + "time": gps_time, + "nSat": 1, + "uSat": 0, + "hdop": 99.99, + "pdop": 99.99, + } + ) + + assert msg == ( + "time_sample", + { + "time": gps_time, + "source": "GPSD-SKY", + "valid": False, + "satellites_seen": 1, + "satellites_used": 0, + "hdop": 99.99, + "pdop": 99.99, + }, + ) + + +def test_nav_pvt_parser_extracts_time_and_accuracy_from_correct_offsets(): + payload = bytearray(92) + payload[4:6] = (2026).to_bytes(2, "little") + payload[6] = 2 + payload[7] = 3 + payload[8] = 4 + payload[9] = 5 + payload[10] = 6 + payload[11] = 0x03 + payload[12:16] = (25_000_000).to_bytes(4, "little") + payload[16:20] = (123_000_000).to_bytes(4, "little", signed=True) + payload[20] = 0 + payload[23] = 7 + payload[24:28] = int(127.1234567 * 1e7).to_bytes(4, "little", signed=True) + payload[28:32] = int(37.1234567 * 1e7).to_bytes(4, "little", signed=True) + payload[32:36] = int(42_000).to_bytes(4, "little", signed=True) + payload[36:40] = int(41_000).to_bytes(4, "little", signed=True) + payload[40:44] = int(1500).to_bytes(4, "little") + payload[44:48] = int(2000).to_bytes(4, "little") + payload[76:78] = int(1.25 * 100).to_bytes(2, "little") + + parsed = UBXParser(log_queue=None)._parse_nav_pvt(bytes(payload)) + + assert parsed["valid"] is True + assert parsed["time"] == datetime.datetime( + 2026, 2, 3, 4, 5, 6, 123000, tzinfo=datetime.timezone.utc + ) + assert parsed["tAcc_ns"] == 25_000_000 + assert parsed["UTCnano"] == 123_000_000 + assert parsed["mode"] == 0 + assert parsed["numSV"] == 7 + + +def test_nav_pvt_parser_keeps_invalid_time_as_candidate(): + payload = bytearray(92) + payload[4:6] = (2021).to_bytes(2, "little") + payload[6] = 3 + payload[7] = 7 + payload[8] = 14 + payload[9] = 37 + payload[10] = 25 + payload[11] = 0xF0 + + parsed = UBXParser(log_queue=None)._parse_nav_pvt(bytes(payload)) + + assert parsed["valid"] is False + assert parsed["time"] == datetime.datetime( + 2021, 3, 7, 14, 37, 25, tzinfo=datetime.timezone.utc + ) + + +def test_ubx_process_messages_emits_nav_pvt_time_before_position_fix(): + gps_time = datetime.datetime(2026, 1, 1, 1, 2, 3, tzinfo=datetime.timezone.utc) + + async def stream(): + yield { + "class": "NAV-PVT", + "time": gps_time, + "valid": True, + "tAcc_ns": 50_000_000, + "mode": 0, + "lat": 37.0, + "lon": 127.0, + "altHAE": 42.0, + "hAcc": 99_999.0, + "vAcc": 99_999.0, + } + + gps_queue = Queue() + console_queue = Queue() + asyncio.run( + process_messages( + lambda: stream(), + gps_queue, + console_queue, + {"error_2d": 999, "error_3d": 999}, + ) + ) + + gps_msg, gps_content = gps_queue.get_nowait() + assert gps_msg == "time" + assert gps_content["time"] == gps_time + assert gps_content["tAcc"] == 50_000_000 + assert gps_content["message_class"] == "NAV-PVT" + + +def test_ubx_process_messages_emits_invalid_nav_pvt_time_as_sample(): + gps_time = datetime.datetime(2021, 3, 7, 14, 37, 25, tzinfo=datetime.timezone.utc) + + async def stream(): + yield { + "class": "NAV-PVT", + "time": gps_time, + "valid": False, + "tAcc_ns": -1, + "mode": 0, + } + + gps_queue = Queue() + console_queue = Queue() + asyncio.run( + process_messages( + lambda: stream(), + gps_queue, + console_queue, + {"error_2d": 999, "error_3d": 999}, + ) + ) + + gps_msg, gps_content = gps_queue.get_nowait() + assert gps_msg == "time_sample" + assert gps_content["time"] == gps_time + assert gps_content["valid"] is False diff --git a/python/tests/test_gps_time_sync.py b/python/tests/test_gps_time_sync.py new file mode 100644 index 000000000..0741a03d3 --- /dev/null +++ b/python/tests/test_gps_time_sync.py @@ -0,0 +1,507 @@ +import datetime +import json + +import pytz + +from PiFinder.gps_time_sync import GpsTimeSyncMonitor + + +class FakeClock: + def __init__(self, unix=1_700_000_000.0, monotonic=100.0): + self.unix = unix + self.monotonic = monotonic + + def time(self): + return self.unix + + def monotonic_time(self): + return self.monotonic + + def advance(self, seconds): + self.unix += seconds + self.monotonic += seconds + + +class FakeRequestWriter: + def __init__(self, ok=True): + self.ok = ok + self.requests = [] + self.clear_count = 0 + + def write_request(self, payload): + self.requests.append(payload) + if not self.ok: + return {"ok": False, "message": "request write failed"} + return {"ok": True, "message": "request write ok"} + + def clear_request(self): + self.clear_count += 1 + + +class FakeNtpClient: + def __init__(self, results): + self.results = list(results) + self.calls = [] + + def query(self, server, timeout_seconds=1.0): + self.calls.append((server, timeout_seconds)) + result = self.results.pop(0) + if isinstance(result, Exception): + raise result + return result + + +def utc(second): + return datetime.datetime(2026, 1, 1, 0, 0, second, tzinfo=pytz.UTC) + + +def read_status(path): + return json.loads(path.read_text()) + + +def test_gps_time_monitor_marks_stable_after_enough_samples(tmp_path): + clock = FakeClock() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + min_samples=3, + stable_jitter_ms=100, + stable_offset_ms=500, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ) + + for second, offset in [(1, 0.05), (2, 0.04), (3, 0.06)]: + gps_dt = utc(second) + reference_dt = gps_dt - datetime.timedelta(seconds=offset) + monitor.observe_time( + {"time": gps_dt, "tAcc": 10_000, "source": "GPS"}, reference_dt + ) + clock.advance(1) + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["samples"]["count"] == 3 + assert status["offset"]["latest_seconds"] == 0.06 + assert status["offset"]["jitter_seconds"] < 0.03 + + +def test_gps_time_monitor_flags_low_quality_time_accuracy(tmp_path): + clock = FakeClock() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + max_tacc_ns=500_000, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ) + + gps_dt = utc(10) + monitor.observe_time( + {"time": gps_dt, "tAcc": 5_000_000, "source": "GPS"}, + gps_dt, + ) + + status = read_status(status_file) + assert status["state"] == "low_quality" + assert status["latest"]["tAcc_ns"] == 5_000_000 + + +def test_gps_time_monitor_flags_invalid_candidate(tmp_path): + clock = FakeClock() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ) + + gps_dt = utc(11) + monitor.observe_time( + { + "time": gps_dt, + "valid": False, + "source": "GPSD-SKY", + "satellites_seen": 1, + "satellites_used": 0, + }, + gps_dt, + ) + + status = read_status(status_file) + assert status["state"] == "low_quality" + assert status["latest"]["valid"] is False + assert status["latest"]["source"] == "GPSD-SKY" + assert status["latest"]["satellites_seen"] == 1 + assert status["latest"]["satellites_used"] == 0 + + +def test_gps_time_monitor_marks_samples_stale(tmp_path): + clock = FakeClock() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + stale_seconds=5, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ) + + gps_dt = utc(20) + monitor.observe_time({"time": gps_dt, "source": "GPS"}, gps_dt) + clock.advance(6) + monitor.poll() + + status = read_status(status_file) + assert status["state"] == "stale" + + +def test_software_pps_records_ticks(tmp_path): + clock = FakeClock(monotonic=100.2) + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + software_pps_enabled=True, + software_pps_interval_seconds=1.0, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ) + + monitor.poll() + clock.advance(0.9) + monitor.poll() + clock.advance(1.0) + monitor.poll() + + status = read_status(status_file) + assert status["software_pps"]["enabled"] is True + assert status["software_pps"]["tick_count"] == 2 + assert status["software_pps"]["last_tick_monotonic"] == 102.0 + + +def test_startup_status_clears_stale_file_when_disabled(tmp_path): + clock = FakeClock() + status_file = tmp_path / "gps_time_status.json" + status_file.write_text('{"state": "stable"}') + monitor = GpsTimeSyncMonitor( + enabled=False, + software_pps_enabled=False, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ) + + monitor.write_startup_status() + + status = read_status(status_file) + assert status["enabled"] is False + assert status["state"] == "disabled" + + +def test_system_clock_sync_writes_request_after_stable_gps(tmp_path): + first_gps = utc(1) + clock = FakeClock(unix=first_gps.timestamp() - 2.0) + request_writer = FakeRequestWriter() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + system_clock_sync_enabled=True, + min_samples=3, + stable_jitter_ms=100, + stable_offset_ms=500, + system_clock_sync_step_threshold_ms=100, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + request_writer=request_writer, + ) + + for second, offset in [(1, 0.05), (2, 0.04), (3, 0.06)]: + gps_dt = utc(second) + monitor.observe_time( + {"time": gps_dt, "tAcc": 10_000, "source": "GPS"}, + gps_dt - datetime.timedelta(seconds=offset), + ) + clock.advance(1) + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["system_clock_sync"]["state"] == "requested" + assert status["system_clock_sync"]["request_count"] == 1 + assert status["system_clock_sync"]["last_offset_seconds"] == 2.0 + assert len(request_writer.requests) == 1 + request = request_writer.requests[0] + assert request["gps_time"] == utc(3).isoformat() + assert request["monitor_state"] == "stable" + assert request["actions"]["system_clock"]["offset_seconds"] == 2.0 + + +def test_system_clock_sync_waits_for_valid_stable_gps(tmp_path): + clock = FakeClock() + request_writer = FakeRequestWriter() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + system_clock_sync_enabled=True, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + request_writer=request_writer, + ) + + gps_dt = utc(10) + monitor.observe_time( + {"time": gps_dt, "valid": False, "source": "GPSD-SKY"}, + gps_dt, + ) + + status = read_status(status_file) + assert status["state"] == "low_quality" + assert status["system_clock_sync"]["state"] == "waiting_for_time_source" + assert request_writer.requests == [] + assert request_writer.clear_count == 1 + + +def test_system_clock_sync_skips_small_offset(tmp_path): + first_gps = utc(1) + clock = FakeClock(unix=first_gps.timestamp() - 0.05) + request_writer = FakeRequestWriter() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + system_clock_sync_enabled=True, + min_samples=2, + stable_jitter_ms=100, + stable_offset_ms=500, + system_clock_sync_step_threshold_ms=500, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + request_writer=request_writer, + ) + + for second in [1, 2]: + gps_dt = utc(second) + monitor.observe_time( + {"time": gps_dt, "source": "GPS"}, + gps_dt - datetime.timedelta(seconds=0.05), + ) + clock.advance(1) + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["system_clock_sync"]["state"] == "in_sync" + assert request_writer.requests == [] + + +def test_rtc_sync_writes_request_after_stable_gps(tmp_path): + clock = FakeClock() + request_writer = FakeRequestWriter() + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + enabled=True, + rtc_sync_enabled=True, + min_samples=2, + stable_jitter_ms=100, + stable_offset_ms=500, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + request_writer=request_writer, + ) + + for second in [1, 2]: + gps_dt = utc(second) + monitor.observe_time( + {"time": gps_dt, "source": "GPS"}, + gps_dt - datetime.timedelta(seconds=0.05), + ) + clock.advance(1) + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["rtc_sync"]["state"] == "requested" + assert status["rtc_sync"]["request_count"] == 1 + assert len(request_writer.requests) == 1 + request = request_writer.requests[0] + assert request["gps_time"] == utc(2).isoformat() + assert request["actions"]["rtc"]["enabled"] is True + + +def test_ntp_time_source_selected_when_gps_unavailable(tmp_path): + clock = FakeClock(unix=utc(10).timestamp(), monotonic=100.0) + ntp_dt = utc(11) + ntp_client = FakeNtpClient( + [ + { + "time": ntp_dt, + "server": "pool.ntp.org", + "valid": True, + "stratum": 2, + "offset_seconds": 1.0, + "delay_seconds": 0.08, + "root_dispersion_seconds": 0.01, + "quality_seconds": 0.05, + "received_unix": clock.unix, + } + ] + ) + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + time_sync_enabled=True, + enabled=True, + ntp_enabled=True, + ntp_async=False, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ntp_client=ntp_client, + ) + + monitor.poll() + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["selected"]["source"] == "NTP" + assert status["selected"]["time"] == ntp_dt.isoformat() + assert status["ntp"]["state"] == "stable" + assert ntp_client.calls == [("pool.ntp.org", 1.0)] + + +def test_best_source_prefers_lower_quality_value(tmp_path): + clock = FakeClock(unix=utc(1).timestamp(), monotonic=100.0) + ntp_dt = utc(3) + ntp_client = FakeNtpClient( + [ + { + "time": ntp_dt, + "server": "time.google.com", + "valid": True, + "stratum": 2, + "offset_seconds": 0.0, + "delay_seconds": 0.2, + "root_dispersion_seconds": 0.01, + "quality_seconds": 0.11, + } + ] + ) + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + time_sync_enabled=True, + enabled=True, + ntp_enabled=True, + ntp_server="time.google.com", + ntp_async=False, + min_samples=2, + stable_jitter_ms=100, + stable_offset_ms=500, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ntp_client=ntp_client, + ) + + monitor.poll() + for second in [1, 2]: + gps_dt = utc(second) + monitor.observe_time( + {"time": gps_dt, "tAcc": 10_000_000, "source": "GPS"}, + gps_dt - datetime.timedelta(seconds=0.05), + ) + clock.advance(1) + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["selected"]["source"] == "GPS" + assert status["selected"]["quality_seconds"] == 0.01 + + +def test_best_source_ignores_slow_ntp_and_uses_gps(tmp_path): + clock = FakeClock(unix=utc(1).timestamp(), monotonic=100.0) + ntp_client = FakeNtpClient( + [ + { + "time": utc(1), + "server": "pool.ntp.org", + "valid": True, + "stratum": 2, + "delay_seconds": 5.0, + "quality_seconds": 2.5, + } + ] + ) + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + time_sync_enabled=True, + enabled=True, + ntp_enabled=True, + ntp_async=False, + ntp_max_delay_ms=500, + min_samples=2, + stable_jitter_ms=100, + stable_offset_ms=500, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + ntp_client=ntp_client, + ) + + monitor.poll() + for second in [1, 2]: + gps_dt = utc(second) + monitor.observe_time( + {"time": gps_dt, "tAcc": 20_000_000, "source": "GPS"}, + gps_dt - datetime.timedelta(seconds=0.05), + ) + clock.advance(1) + + status = read_status(status_file) + assert status["ntp"]["state"] == "low_quality" + assert status["selected"]["source"] == "GPS" + + +def test_system_clock_request_uses_selected_ntp_time(tmp_path): + ntp_dt = utc(30) + clock = FakeClock(unix=ntp_dt.timestamp() - 3.0, monotonic=100.0) + request_writer = FakeRequestWriter() + ntp_client = FakeNtpClient( + [ + { + "time": ntp_dt, + "server": "pool.ntp.org", + "valid": True, + "stratum": 2, + "offset_seconds": 3.0, + "delay_seconds": 0.05, + "quality_seconds": 0.025, + } + ] + ) + status_file = tmp_path / "gps_time_status.json" + monitor = GpsTimeSyncMonitor( + time_sync_enabled=True, + enabled=False, + ntp_enabled=True, + system_clock_sync_enabled=True, + ntp_async=False, + status_file=status_file, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + request_writer=request_writer, + ntp_client=ntp_client, + ) + + monitor.poll() + + status = read_status(status_file) + assert status["state"] == "stable" + assert status["selected"]["source"] == "NTP" + assert status["system_clock_sync"]["state"] == "requested" + assert len(request_writer.requests) == 1 + request = request_writer.requests[0] + assert request["sync_time"] == ntp_dt.isoformat() + assert request["selected"]["source"] == "NTP" + assert request["actions"]["system_clock"]["offset_seconds"] == 3.0 diff --git a/python/tests/test_gps_time_sync_helper.py b/python/tests/test_gps_time_sync_helper.py new file mode 100644 index 000000000..b658825cf --- /dev/null +++ b/python/tests/test_gps_time_sync_helper.py @@ -0,0 +1,229 @@ +import datetime +import json + +import pytz + +from PiFinder.gps_time_sync_helper import GpsTimeSyncHelper + + +class FakeClock: + def __init__(self, unix=1_700_000_000.0, monotonic=100.0): + self.unix = unix + self.monotonic = monotonic + + def time(self): + return self.unix + + def monotonic_time(self): + return self.monotonic + + +class FakeRunner: + def __init__(self, system_ok=True, rtc_ok=True): + self.system_ok = system_ok + self.rtc_ok = rtc_ok + self.system_calls = [] + self.rtc_calls = [] + + def set_system_clock(self, gps_dt): + self.system_calls.append(gps_dt) + return {"ok": self.system_ok, "message": "system synced"} + + def set_rtc(self, gps_dt): + self.rtc_calls.append(gps_dt) + return {"ok": self.rtc_ok, "message": "rtc synced"} + + +def utc(second): + return datetime.datetime(2026, 1, 1, 0, 0, second, tzinfo=pytz.UTC) + + +def write_request(path, payload): + path.write_text(json.dumps(payload), encoding="utf-8") + + +def valid_request(clock, gps_dt, actions): + return { + "version": 1, + "request_id": "req-1", + "boot_id": "boot-a", + "created_monotonic": clock.monotonic, + "created_unix": clock.unix, + "gps_time": gps_dt.isoformat(), + "monitor_state": "stable", + "latest": {"valid": True, "source": "GPS", "tAcc_ns": 10_000}, + "actions": actions, + } + + +def test_helper_processes_valid_system_clock_and_rtc_request(tmp_path): + gps_dt = utc(5) + clock = FakeClock(unix=gps_dt.timestamp() - 2.0) + runner = FakeRunner() + request_file = tmp_path / "request.json" + status_file = tmp_path / "helper_status.json" + write_request( + request_file, + valid_request( + clock, + gps_dt, + { + "system_clock": { + "enabled": True, + "step_threshold_seconds": 0.1, + }, + "rtc": {"enabled": True}, + }, + ), + ) + helper = GpsTimeSyncHelper( + request_file=request_file, + status_file=status_file, + runner=runner, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + boot_id_fn=lambda: "boot-a", + ) + + status = helper.process_once() + + assert status["state"] == "completed" + assert runner.system_calls == [gps_dt] + assert runner.rtc_calls == [gps_dt] + saved_status = json.loads(status_file.read_text()) + assert saved_status["last_request_id"] == "req-1" + assert saved_status["results"]["system_clock"]["state"] == "synced" + assert saved_status["results"]["rtc"]["state"] == "synced" + + +def test_helper_skips_system_clock_when_already_in_sync(tmp_path): + gps_dt = utc(5) + clock = FakeClock(unix=gps_dt.timestamp() - 0.05) + runner = FakeRunner() + request_file = tmp_path / "request.json" + status_file = tmp_path / "helper_status.json" + write_request( + request_file, + valid_request( + clock, + gps_dt, + { + "system_clock": { + "enabled": True, + "step_threshold_seconds": 0.5, + } + }, + ), + ) + helper = GpsTimeSyncHelper( + request_file=request_file, + status_file=status_file, + runner=runner, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + boot_id_fn=lambda: "boot-a", + ) + + status = helper.process_once() + + assert status["state"] == "completed" + assert runner.system_calls == [] + saved_status = json.loads(status_file.read_text()) + assert saved_status["results"]["system_clock"]["state"] == "in_sync" + + +def test_helper_rejects_invalid_or_stale_request(tmp_path): + gps_dt = utc(5) + clock = FakeClock(unix=gps_dt.timestamp(), monotonic=500.0) + runner = FakeRunner() + request_file = tmp_path / "request.json" + status_file = tmp_path / "helper_status.json" + request = valid_request(clock, gps_dt, {"rtc": {"enabled": True}}) + request["created_monotonic"] = 100.0 + write_request(request_file, request) + helper = GpsTimeSyncHelper( + request_file=request_file, + status_file=status_file, + max_request_age_seconds=120.0, + runner=runner, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + boot_id_fn=lambda: "boot-a", + ) + + status = helper.process_once() + + assert status["state"] == "invalid_request" + assert "stale" in status["message"] + assert runner.rtc_calls == [] + + +def test_helper_does_not_reprocess_same_request_after_restart(tmp_path): + gps_dt = utc(5) + clock = FakeClock(unix=gps_dt.timestamp() - 2.0) + runner = FakeRunner() + request_file = tmp_path / "request.json" + status_file = tmp_path / "helper_status.json" + request = valid_request(clock, gps_dt, {"rtc": {"enabled": True}}) + write_request(request_file, request) + status_file.write_text( + json.dumps({"last_request_id": request["request_id"]}), + encoding="utf-8", + ) + helper = GpsTimeSyncHelper( + request_file=request_file, + status_file=status_file, + runner=runner, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + boot_id_fn=lambda: "boot-a", + ) + + status = helper.process_once() + + assert status["state"] == "idle" + assert runner.rtc_calls == [] + + +def test_helper_accepts_selected_ntp_time_source(tmp_path): + sync_dt = utc(8) + clock = FakeClock(unix=sync_dt.timestamp() - 2.0) + runner = FakeRunner() + request_file = tmp_path / "request.json" + status_file = tmp_path / "helper_status.json" + request = valid_request( + clock, + sync_dt, + { + "system_clock": { + "enabled": True, + "step_threshold_seconds": 0.1, + } + }, + ) + request["sync_time"] = sync_dt.isoformat() + request["selected"] = { + "source": "NTP", + "time": sync_dt.isoformat(), + "valid": True, + "quality_seconds": 0.02, + "server": "pool.ntp.org", + } + request["latest"] = {"valid": False, "source": "GPS"} + write_request(request_file, request) + helper = GpsTimeSyncHelper( + request_file=request_file, + status_file=status_file, + runner=runner, + time_fn=clock.time, + monotonic_fn=clock.monotonic_time, + boot_id_fn=lambda: "boot-a", + ) + + status = helper.process_once() + + assert status["state"] == "completed" + assert runner.system_calls == [sync_dt] + saved_status = json.loads(status_file.read_text()) + assert saved_status["selected"]["source"] == "NTP" + assert saved_status["last_sync_time"] == sync_dt.isoformat() diff --git a/python/tests/test_gps_time_sync_status_ui.py b/python/tests/test_gps_time_sync_status_ui.py new file mode 100644 index 000000000..79cd46770 --- /dev/null +++ b/python/tests/test_gps_time_sync_status_ui.py @@ -0,0 +1,192 @@ +import PiFinder.i18n # noqa: F401 + +from PiFinder.ui.gps_time_sync_status import UIGPSTimeSyncStatus +from PiFinder.ui import menu_structure + + +def _screen(): + return object.__new__(UIGPSTimeSyncStatus) + + +def _iter_menu_nodes(node): + if isinstance(node, dict): + yield node + for value in node.values(): + yield from _iter_menu_nodes(value) + elif isinstance(node, list): + for item in node: + yield from _iter_menu_nodes(item) + + +def test_gps_time_sync_status_menu_entry_exists(): + entries = [ + node + for node in _iter_menu_nodes(menu_structure.pifinder_menu) + if node.get("class") is UIGPSTimeSyncStatus + ] + + assert len(entries) == 1 + assert entries[0]["name"] == "Time Sync" + + +def test_gps_time_sync_settings_menu_entries_exist(): + expected_options = { + "time_sync_enabled", + "time_sync_source_mode", + "gps_time_sync", + "ntp_time_sync", + "ntp_server", + "software_pps", + "time_sync_system_clock", + "rtc_sync", + } + entries = { + node.get("config_option"): node + for node in _iter_menu_nodes(menu_structure.pifinder_menu) + if node.get("config_option") in expected_options + } + + assert set(entries) == expected_options + + for option in ( + "time_sync_enabled", + "gps_time_sync", + "ntp_time_sync", + "software_pps", + "time_sync_system_clock", + "rtc_sync", + ): + node = entries[option] + assert [item["value"] for item in node["items"]] == [False, True] + assert node["items"][0]["name"] == "Off" + assert node["items"][1]["name"] == "On" + assert node["post_callback"] is menu_structure.callbacks.reload_config + + assert [item["value"] for item in entries["time_sync_source_mode"]["items"]] == [ + "best", + "gps", + "ntp", + ] + assert [item["value"] for item in entries["ntp_server"]["items"]] == [ + "pool.ntp.org", + "time.google.com", + "time.cloudflare.com", + "time.nist.gov", + "custom", + ] + + +def test_custom_ntp_server_is_handled_from_ntp_server_menu(): + ntp_server_entries = [ + node + for node in _iter_menu_nodes(menu_structure.pifinder_menu) + if node.get("config_option") == "ntp_server" + ] + + assert len(ntp_server_entries) == 1 + custom_items = [ + item + for item in ntp_server_entries[0]["items"] + if item.get("value") == "custom" + ] + assert len(custom_items) == 1 + assert custom_items[0]["name"] == "Custom" + assert custom_items[0]["callback"] is menu_structure.callbacks.edit_custom_ntp_server + assert ( + custom_items[0]["name_suffix_callback"] + is menu_structure.callbacks.get_custom_ntp_server_display + ) + + standalone_entries = [ + node + for node in _iter_menu_nodes(menu_structure.pifinder_menu) + if node.get("name") == "Custom NTP Server" + ] + assert standalone_entries == [] + + +def test_gps_time_sync_status_summary_lines(): + screen = _screen() + status = { + "state": "low_quality", + "selected": None, + "latest": { + "valid": False, + "source": "GPS", + "message_class": "NAV-PVT", + "tAcc_ns": 4_294_967_295, + }, + "ntp": {"state": "unavailable", "server": "pool.ntp.org"}, + "system_clock_sync": {"state": "disabled"}, + "rtc_sync": {"state": "disabled"}, + "software_pps": {"enabled": True, "tick_count": 7}, + } + helper = {"state": "idle"} + + lines = screen._summary_lines(status, helper, request_present=False) + + assert "State: low_quality" in lines + assert "Selected: --" in lines + assert "GPS valid: No" in lines + assert "Source: GPS NAV-PVT" in lines + assert "NTP: unavailable" in lines + assert "Sys: disabled" in lines + assert "RTC: disabled" in lines + assert "Helper: idle" in lines + assert "Request: No" in lines + assert "PPS: On 7" in lines + + +def test_gps_time_sync_status_detail_lines_include_helper_results(): + screen = _screen() + status = { + "state": "stable", + "message": "Selected GPS time source", + "selected": { + "source": "GPS", + "time": "2026-06-27T01:58:23+00:00", + }, + "latest": { + "gps_time": "2026-06-27T01:58:23+00:00", + "age_seconds": 3.2, + "valid": True, + "tAcc_ns": 10_000, + "offset_seconds": 0.1, + "system_offset_seconds": 5.0, + }, + "ntp": { + "state": "stable", + "server": "pool.ntp.org", + "time": "2026-06-27T01:58:22+00:00", + "delay_seconds": 0.08, + }, + "samples": {"count": 5, "min_required": 5}, + "system_clock_sync": {"state": "requested"}, + "rtc_sync": {"state": "requested"}, + "software_pps": {"tick_count": 11}, + } + helper = { + "state": "completed", + "effective_uid": 0, + "message": "Time sync request processed", + "results": { + "system_clock": {"state": "synced"}, + "rtc": {"state": "synced"}, + }, + } + + lines = screen._detail_lines(status, helper, request_present=True) + + assert "State: stable" in lines + assert "Selected: GPS" in lines + assert "Sel time: 2026-06-27 01:58:23" in lines + assert "GPS: 2026-06-27 01:58:23" in lines + assert "NTP: stable" in lines + assert "Valid: Yes" in lines + assert "Sys req: requested" in lines + assert "RTC req: requested" in lines + assert "Helper: completed" in lines + assert "Helper UID: 0" in lines + assert "Req file: Yes" in lines + assert "Sys result: synced" in lines + assert "RTC result: synced" in lines diff --git a/scripts/install_gps_time_sync_helper.sh b/scripts/install_gps_time_sync_helper.sh new file mode 100755 index 000000000..2798e2f38 --- /dev/null +++ b/scripts/install_gps_time_sync_helper.sh @@ -0,0 +1,75 @@ +#!/usr/bin/env bash +# Install or manage the optional privileged GPS time-sync helper service. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PIFINDER_REPO_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" +source "${PIFINDER_REPO_DIR}/pifinder_paths.sh" + +SERVICE_NAME="pifinder_gps_time_sync.service" +SERVICE_TEMPLATE="${PIFINDER_REPO_DIR}/pi_config_files/${SERVICE_NAME}" +SERVICE_TARGET="/lib/systemd/system/${SERVICE_NAME}" +SERVICE_DROPIN_DIR="/etc/systemd/system/${SERVICE_NAME}.d" +DRY_RUN_DROPIN="${SERVICE_DROPIN_DIR}/dry-run.conf" + +install_service() { + pifinder_render_config "${SERVICE_TEMPLATE}" "${SERVICE_TARGET}" + sudo systemctl daemon-reload + echo "Installed ${SERVICE_NAME}" +} + +install_dry_run_override() { + sudo install -d -m 755 "${SERVICE_DROPIN_DIR}" + printf "%s\n" \ + "[Service]" \ + "ExecStart=" \ + "ExecStart=/usr/bin/python -m PiFinder.gps_time_sync_helper --dry-run" \ + | sudo tee "${DRY_RUN_DROPIN}" >/dev/null + sudo systemctl daemon-reload + echo "Installed dry-run override for ${SERVICE_NAME}" +} + +remove_dry_run_override() { + sudo rm -f "${DRY_RUN_DROPIN}" + sudo rmdir "${SERVICE_DROPIN_DIR}" 2>/dev/null || true + sudo systemctl daemon-reload +} + +case "${1:-install}" in + install) + install_service + echo "Service installed but not enabled." + echo "Run: $0 enable" + ;; + enable) + install_service + remove_dry_run_override + sudo systemctl enable --now "${SERVICE_NAME}" + ;; + enable-dry-run) + install_service + install_dry_run_override + sudo systemctl enable --now "${SERVICE_NAME}" + ;; + disable) + sudo systemctl disable --now "${SERVICE_NAME}" 2>/dev/null || true + remove_dry_run_override + ;; + restart) + install_service + sudo systemctl restart "${SERVICE_NAME}" + ;; + restart-dry-run) + install_service + install_dry_run_override + sudo systemctl restart "${SERVICE_NAME}" + ;; + status) + systemctl status "${SERVICE_NAME}" --no-pager + ;; + *) + echo "Usage: $0 {install|enable|enable-dry-run|disable|restart|restart-dry-run|status}" >&2 + exit 2 + ;; +esac