diff --git a/packages/device-connect-agent-tools/README.md b/packages/device-connect-agent-tools/README.md index c3ad08d..ba38157 100644 --- a/packages/device-connect-agent-tools/README.md +++ b/packages/device-connect-agent-tools/README.md @@ -340,7 +340,8 @@ connect( | Variable | Description | |---|---| -| `ZENOH_CONNECT` | Zenoh endpoint (e.g., `tcp/localhost:7447`) | +| `ZENOH_CONNECT` | Zenoh endpoint (e.g., `tcp/localhost:7447`). Keeps D2D presence discovery active — use a direct unicast link to a peer when multicast is blocked | +| `ZENOH_MULTICAST_INTERFACE` | Pin multicast scouting to a NIC (name or IP) on multi-homed hosts where `auto` scouts the wrong interface | | `MESSAGING_BACKEND` | `zenoh` (default), `nats`, or `mqtt` | | `MESSAGING_URLS` | Broker URLs, comma-separated (generic) | | `NATS_URL` | NATS broker URL (when using NATS backend) | @@ -348,7 +349,7 @@ connect( | `NATS_JWT` + `NATS_NKEY_SEED` | Direct JWT auth | | `NATS_TLS_CA_FILE` | CA certificate for TLS | | `TENANT` | Device Connect zone/namespace (default: `"default"`) | -| `DEVICE_CONNECT_DISCOVERY_MODE` | Set to `d2d` to skip registry and discover via presence | +| `DEVICE_CONNECT_DISCOVERY_MODE` | `auto` (default — D2D when backend is Zenoh), `d2d`/`p2p` to force presence discovery, or `infra` to force the registry service | Resolution order: explicit parameter > environment variable > auto-discovery. diff --git a/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py b/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py index 482f5a1..9fadc37 100644 --- a/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py +++ b/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py @@ -238,7 +238,18 @@ def __init__( self._inbox: Dict[str, List[Dict[str, Any]]] = {} self._sync_subs: Dict[str, Any] = {} - # D2D mode: discover devices via presence instead of registry + # D2D mode: discover devices via presence instead of a registry service. + # + # This mirrors device_connect_agent_tools.mcp.bridge._is_d2d_mode so the + # connect() path and the MCP bridge agree: "auto" (the default) means D2D + # whenever the backend is Zenoh — INCLUDING when an explicit ZENOH_CONNECT + # endpoint is supplied. Previously an explicit endpoint silently forced + # registry mode; against a zero-infra device (no registry to answer + # queries) that just timed out and returned [], which is the exact + # failure seen when pinning a direct unicast link to a peer on a + # multi-homed / Wi-Fi network where multicast scouting is unreliable. + # Opt into the registry path explicitly with + # DEVICE_CONNECT_DISCOVERY_MODE=infra. no_explicit_urls = ( not nats_url and not os.getenv("ZENOH_CONNECT") @@ -246,15 +257,20 @@ def __init__( and not os.getenv("NATS_URL") and not os.getenv("NATS_URLS") ) - self._d2d_mode = ( - os.getenv("DEVICE_CONNECT_DISCOVERY_MODE", "").lower() in ("d2d", "p2p") - or (self._backend == "zenoh" and no_explicit_urls) - ) + discovery_mode = os.getenv("DEVICE_CONNECT_DISCOVERY_MODE", "").lower() + if discovery_mode in ("d2d", "p2p"): + self._d2d_mode = True + elif discovery_mode == "infra": + self._d2d_mode = False + else: # "auto" / unset + self._d2d_mode = self._backend == "zenoh" self._d2d_collector = None # lazy-initialized PresenceCollector - # In D2D mode with Zenoh and no explicit URLs, use empty servers (multicast scouting). - # When DEVICE_CONNECT_DISCOVERY_MODE=d2d is forced alongside a router URL (ZENOH_CONNECT), - # keep the router URL so we can still communicate with devices connected to it. + # In D2D mode with Zenoh and no explicit URLs, use empty servers (multicast + # scouting). When an explicit endpoint is given (ZENOH_CONNECT — e.g. a + # direct unicast link to a peer on a network where multicast is blocked), + # keep it so we connect straight to that peer while still discovering it + # via presence. if self._d2d_mode and self._backend == "zenoh" and no_explicit_urls: self._servers = [] diff --git a/packages/device-connect-agent-tools/tests/test_connection_unit.py b/packages/device-connect-agent-tools/tests/test_connection_unit.py index 306933e..09dcb91 100644 --- a/packages/device-connect-agent-tools/tests/test_connection_unit.py +++ b/packages/device-connect-agent-tools/tests/test_connection_unit.py @@ -91,6 +91,59 @@ def test_config_skips_autodiscovery_when_already_set(self, MockConfig, ad_creds, conn.close() +# ── D2D vs registry mode selection ────────────────────────────── + + +class TestD2DModeSelection: + """DeviceConnection should mirror the MCP bridge's _is_d2d_mode logic. + + Regression for the footgun where an explicit ZENOH_CONNECT endpoint + silently dropped out of D2D presence discovery into registry mode — + against a zero-infra device this returned [] after a 30s timeout. + """ + + def _make_conn(self, env, backend="zenoh"): + with patch.object(conn_mod, "_auto_discover_tls", return_value=None), \ + patch.object(conn_mod, "_auto_discover_credentials", return_value=None), \ + patch("device_connect_agent_tools.connection.MessagingConfig") as MockConfig, \ + patch.dict(os.environ, env, clear=True): + mock_cfg = MagicMock() + mock_cfg.backend = backend + mock_cfg.servers = ( + [os.environ["ZENOH_CONNECT"]] if env.get("ZENOH_CONNECT") else [] + ) + mock_cfg.credentials = None + mock_cfg.tls_config = None + MockConfig.return_value = mock_cfg + return conn_mod.DeviceConnection() + + def test_zenoh_no_urls_is_d2d(self): + conn = self._make_conn({}) + assert conn._d2d_mode is True + assert conn._servers == [] # blanked for multicast scouting + conn.close() + + def test_zenoh_with_explicit_connect_stays_d2d(self): + # The footgun: ZENOH_CONNECT alone must NOT drop out of D2D. + conn = self._make_conn({"ZENOH_CONNECT": "tcp/192.168.1.119:7447"}) + assert conn._d2d_mode is True + # Unicast endpoint is kept so we connect straight to the peer. + assert conn._servers == ["tcp/192.168.1.119:7447"] + conn.close() + + def test_infra_mode_opts_out_of_d2d(self): + conn = self._make_conn( + {"ZENOH_CONNECT": "tcp/router:7447", "DEVICE_CONNECT_DISCOVERY_MODE": "infra"} + ) + assert conn._d2d_mode is False + conn.close() + + def test_explicit_d2d_mode_forces_d2d(self): + conn = self._make_conn({"DEVICE_CONNECT_DISCOVERY_MODE": "d2d"}) + assert conn._d2d_mode is True + conn.close() + + # ── Auto-discovery helpers ─────────────────────────────────────── diff --git a/packages/device-connect-edge/README.md b/packages/device-connect-edge/README.md index 6d73b84..dc4c1de 100644 --- a/packages/device-connect-edge/README.md +++ b/packages/device-connect-edge/README.md @@ -162,6 +162,28 @@ DEVICE_CONNECT_DISCOVERY_MODE=d2d ZENOH_CONNECT=tcp/localhost:7447 DEVICE_CONNEC **How it works:** Each device announces its presence (capabilities, identity, status) via `device-connect.{tenant}.{device_id}.presence` messages. Other devices subscribe to a wildcard and maintain an in-memory peer table. Device-to-device RPC works identically to infrastructure mode. +### Multi-homed hosts & unreliable multicast + +D2D discovery relies on Zenoh UDP multicast scouting, which can fail on real-world networks for two reasons: + +- **Multi-homed hosts.** Robots typically carry several interfaces (a private motor-control NIC such as `eth0` 192.168.123.x, plus `docker0`, VPN `utun*`, or Apple `awdl0`). Zenoh's default `interface="auto"` may bind the scout to the wrong NIC, so two peers on the same LAN never form a session — discovery returns `[]` and RPC reports `no responders`. Pin the scout to the LAN interface: + + ```bash + # name or local IP both work + ZENOH_MULTICAST_INTERFACE=wlan0 DEVICE_CONNECT_ALLOW_INSECURE=true python my_device.py + ``` + +- **Multicast blocked entirely** (Wi-Fi AP/client isolation, managed switches). Bypass multicast with a direct unicast link — the device listens on a fixed TCP port and the agent connects straight to it: + + ```bash + # Device (dog): + ZENOH_LISTEN=tcp/0.0.0.0:7447 DEVICE_CONNECT_ALLOW_INSECURE=true python my_device.py + # Agent (user): just point at the device — still discovers via D2D presence + ZENOH_CONNECT=tcp/:7447 DEVICE_CONNECT_ALLOW_INSECURE=true python my_agent.py + ``` + + > As of this release, setting `ZENOH_CONNECT` on an agent keeps D2D presence discovery active. (Previously it silently fell back to registry mode and returned `[]` against a registry-less device unless you also set `DEVICE_CONNECT_DISCOVERY_MODE=d2d`.) Opt into the registry path explicitly with `DEVICE_CONNECT_DISCOVERY_MODE=infra`. + **Trade-offs vs full infrastructure:** | | Full Infrastructure | D2D Mode | diff --git a/packages/device-connect-edge/device_connect_edge/messaging/zenoh_adapter.py b/packages/device-connect-edge/device_connect_edge/messaging/zenoh_adapter.py index bf1c835..357ddc1 100644 --- a/packages/device-connect-edge/device_connect_edge/messaging/zenoh_adapter.py +++ b/packages/device-connect-edge/device_connect_edge/messaging/zenoh_adapter.py @@ -166,6 +166,10 @@ async def connect( **kwargs: Additional options: - peer_mode: bool — force peer mode with scouting (default: False) - listen: list[str] — endpoints to listen on + - multicast_interface: str — network interface name or IP to pin + multicast scouting to (also via ZENOH_MULTICAST_INTERFACE env). + Use on multi-homed hosts where interface="auto" scouts on the + wrong NIC. """ self._reconnect_cb = reconnect_cb self._disconnect_cb = disconnect_cb @@ -201,11 +205,29 @@ async def connect( config_dict["listen"] = {"endpoints": listen_endpoints} # Scouting config — enable multicast + gossip in peer mode + multicast_cfg: Dict[str, Any] = { + "enabled": peer_mode, + "autoconnect": {"router": ["peer", "router"], "peer": ["router", "peer"]}, + } + # Pin the multicast scout to a specific network interface. On + # multi-homed hosts — robots with a separate internal motor-control + # NIC, plus docker0 / VPN (utun) / Apple AWDL interfaces — Zenoh's + # default interface="auto" can bind the scout socket to the wrong + # NIC, so two peers on the same LAN intermittently never form a + # session (discovery returns [] and RPC reports "no responders"). + # Pinning to the LAN interface makes D2D discovery deterministic. + # Accepts an interface name (e.g. "wlan0") or a local IP address. + multicast_interface = kwargs.get("multicast_interface") or os.getenv( + "ZENOH_MULTICAST_INTERFACE" + ) + if multicast_interface: + multicast_cfg["interface"] = multicast_interface + self._logger.info( + "Zenoh multicast scouting pinned to interface: %s", + multicast_interface, + ) config_dict["scouting"] = { - "multicast": { - "enabled": peer_mode, - "autoconnect": {"router": ["peer", "router"], "peer": ["router", "peer"]}, - }, + "multicast": multicast_cfg, "gossip": { "enabled": True, "multihop": os.getenv("ZENOH_GOSSIP_MULTIHOP", "").lower() in ("true", "1", "yes"), diff --git a/packages/device-connect-edge/tests/test_zenoh_adapter.py b/packages/device-connect-edge/tests/test_zenoh_adapter.py index bb9e616..6409f05 100644 --- a/packages/device-connect-edge/tests/test_zenoh_adapter.py +++ b/packages/device-connect-edge/tests/test_zenoh_adapter.py @@ -10,6 +10,7 @@ import asyncio import json +import os from unittest.mock import MagicMock, patch import pytest @@ -197,6 +198,43 @@ async def test_connect_peer_mode_enables_scouting(self, mock_zenoh): assert config_dict["scouting"]["gossip"]["multihop"] is False # Verify d2d_mode flag is set assert adapter._d2d_mode is True + # No interface pinned by default -> let Zenoh auto-select + assert "interface" not in config_dict["scouting"]["multicast"] + + @pytest.mark.asyncio + @patch("device_connect_edge.messaging.zenoh_adapter.zenoh") + @patch("device_connect_edge.messaging.zenoh_adapter._ZENOH_AVAILABLE", True) + async def test_multicast_interface_kwarg_pins_scout(self, mock_zenoh): + session = _make_mock_session() + mock_zenoh.open = MagicMock(return_value=session) + mock_zenoh.Config.from_json5 = MagicMock(return_value=MagicMock()) + + from device_connect_edge.messaging.zenoh_adapter import ZenohAdapter + + adapter = ZenohAdapter() + await adapter.connect(servers=["zenoh://"], multicast_interface="wlan0") + + call_args = mock_zenoh.Config.from_json5.call_args[0][0] + config_dict = json.loads(call_args) + assert config_dict["scouting"]["multicast"]["interface"] == "wlan0" + + @pytest.mark.asyncio + @patch("device_connect_edge.messaging.zenoh_adapter.zenoh") + @patch("device_connect_edge.messaging.zenoh_adapter._ZENOH_AVAILABLE", True) + async def test_multicast_interface_env_pins_scout(self, mock_zenoh): + session = _make_mock_session() + mock_zenoh.open = MagicMock(return_value=session) + mock_zenoh.Config.from_json5 = MagicMock(return_value=MagicMock()) + + from device_connect_edge.messaging.zenoh_adapter import ZenohAdapter + + adapter = ZenohAdapter() + with patch.dict(os.environ, {"ZENOH_MULTICAST_INTERFACE": "eth1"}): + await adapter.connect(servers=["zenoh://"]) + + call_args = mock_zenoh.Config.from_json5.call_args[0][0] + config_dict = json.loads(call_args) + assert config_dict["scouting"]["multicast"]["interface"] == "eth1" @pytest.mark.asyncio @patch("device_connect_edge.messaging.zenoh_adapter.zenoh")