Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/device-connect-agent-tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,16 @@ connect(

| Variable | Description |
|---|---|
| `ZENOH_CONNECT` | Zenoh endpoint (e.g., `tcp/localhost:7447`) |
| `ZENOH_CONNECT` | Zenoh endpoint (e.g., `tcp/localhost:7447`). Keeps D2D presence discovery active — use a direct unicast link to a peer when multicast is blocked |
| `ZENOH_MULTICAST_INTERFACE` | Pin multicast scouting to a NIC (name or IP) on multi-homed hosts where `auto` scouts the wrong interface |
| `MESSAGING_BACKEND` | `zenoh` (default), `nats`, or `mqtt` |
| `MESSAGING_URLS` | Broker URLs, comma-separated (generic) |
| `NATS_URL` | NATS broker URL (when using NATS backend) |
| `NATS_CREDENTIALS_FILE` | Path to `.creds.json` file |
| `NATS_JWT` + `NATS_NKEY_SEED` | Direct JWT auth |
| `NATS_TLS_CA_FILE` | CA certificate for TLS |
| `TENANT` | Device Connect zone/namespace (default: `"default"`) |
| `DEVICE_CONNECT_DISCOVERY_MODE` | Set to `d2d` to skip registry and discover via presence |
| `DEVICE_CONNECT_DISCOVERY_MODE` | `auto` (default — D2D when backend is Zenoh), `d2d`/`p2p` to force presence discovery, or `infra` to force the registry service |

Resolution order: explicit parameter > environment variable > auto-discovery.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,39 @@ def __init__(
self._inbox: Dict[str, List[Dict[str, Any]]] = {}
self._sync_subs: Dict[str, Any] = {}

# D2D mode: discover devices via presence instead of registry
# D2D mode: discover devices via presence instead of a registry service.
#
# This mirrors device_connect_agent_tools.mcp.bridge._is_d2d_mode so the
# connect() path and the MCP bridge agree: "auto" (the default) means D2D
# whenever the backend is Zenoh — INCLUDING when an explicit ZENOH_CONNECT
# endpoint is supplied. Previously an explicit endpoint silently forced
# registry mode; against a zero-infra device (no registry to answer
# queries) that just timed out and returned [], which is the exact
# failure seen when pinning a direct unicast link to a peer on a
# multi-homed / Wi-Fi network where multicast scouting is unreliable.
# Opt into the registry path explicitly with
# DEVICE_CONNECT_DISCOVERY_MODE=infra.
no_explicit_urls = (
not nats_url
and not os.getenv("ZENOH_CONNECT")
and not os.getenv("MESSAGING_URLS")
and not os.getenv("NATS_URL")
and not os.getenv("NATS_URLS")
)
self._d2d_mode = (
os.getenv("DEVICE_CONNECT_DISCOVERY_MODE", "").lower() in ("d2d", "p2p")
or (self._backend == "zenoh" and no_explicit_urls)
)
discovery_mode = os.getenv("DEVICE_CONNECT_DISCOVERY_MODE", "").lower()
if discovery_mode in ("d2d", "p2p"):
self._d2d_mode = True
elif discovery_mode == "infra":
self._d2d_mode = False
else: # "auto" / unset
self._d2d_mode = self._backend == "zenoh"
self._d2d_collector = None # lazy-initialized PresenceCollector

# In D2D mode with Zenoh and no explicit URLs, use empty servers (multicast scouting).
# When DEVICE_CONNECT_DISCOVERY_MODE=d2d is forced alongside a router URL (ZENOH_CONNECT),
# keep the router URL so we can still communicate with devices connected to it.
# In D2D mode with Zenoh and no explicit URLs, use empty servers (multicast
# scouting). When an explicit endpoint is given (ZENOH_CONNECT — e.g. a
# direct unicast link to a peer on a network where multicast is blocked),
# keep it so we connect straight to that peer while still discovering it
# via presence.
if self._d2d_mode and self._backend == "zenoh" and no_explicit_urls:
self._servers = []

Expand Down
53 changes: 53 additions & 0 deletions packages/device-connect-agent-tools/tests/test_connection_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,59 @@ def test_config_skips_autodiscovery_when_already_set(self, MockConfig, ad_creds,
conn.close()


# ── D2D vs registry mode selection ──────────────────────────────


class TestD2DModeSelection:
"""DeviceConnection should mirror the MCP bridge's _is_d2d_mode logic.

Regression for the footgun where an explicit ZENOH_CONNECT endpoint
silently dropped out of D2D presence discovery into registry mode —
against a zero-infra device this returned [] after a 30s timeout.
"""

def _make_conn(self, env, backend="zenoh"):
with patch.object(conn_mod, "_auto_discover_tls", return_value=None), \
patch.object(conn_mod, "_auto_discover_credentials", return_value=None), \
patch("device_connect_agent_tools.connection.MessagingConfig") as MockConfig, \
patch.dict(os.environ, env, clear=True):
mock_cfg = MagicMock()
mock_cfg.backend = backend
mock_cfg.servers = (
[os.environ["ZENOH_CONNECT"]] if env.get("ZENOH_CONNECT") else []
)
mock_cfg.credentials = None
mock_cfg.tls_config = None
MockConfig.return_value = mock_cfg
return conn_mod.DeviceConnection()

def test_zenoh_no_urls_is_d2d(self):
conn = self._make_conn({})
assert conn._d2d_mode is True
assert conn._servers == [] # blanked for multicast scouting
conn.close()

def test_zenoh_with_explicit_connect_stays_d2d(self):
# The footgun: ZENOH_CONNECT alone must NOT drop out of D2D.
conn = self._make_conn({"ZENOH_CONNECT": "tcp/192.168.1.119:7447"})
assert conn._d2d_mode is True
# Unicast endpoint is kept so we connect straight to the peer.
assert conn._servers == ["tcp/192.168.1.119:7447"]
conn.close()

def test_infra_mode_opts_out_of_d2d(self):
conn = self._make_conn(
{"ZENOH_CONNECT": "tcp/router:7447", "DEVICE_CONNECT_DISCOVERY_MODE": "infra"}
)
assert conn._d2d_mode is False
conn.close()

def test_explicit_d2d_mode_forces_d2d(self):
conn = self._make_conn({"DEVICE_CONNECT_DISCOVERY_MODE": "d2d"})
assert conn._d2d_mode is True
conn.close()


# ── Auto-discovery helpers ───────────────────────────────────────


Expand Down
22 changes: 22 additions & 0 deletions packages/device-connect-edge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ DEVICE_CONNECT_DISCOVERY_MODE=d2d ZENOH_CONNECT=tcp/localhost:7447 DEVICE_CONNEC

**How it works:** Each device announces its presence (capabilities, identity, status) via `device-connect.{tenant}.{device_id}.presence` messages. Other devices subscribe to a wildcard and maintain an in-memory peer table. Device-to-device RPC works identically to infrastructure mode.

### Multi-homed hosts & unreliable multicast

D2D discovery relies on Zenoh UDP multicast scouting, which can fail on real-world networks for two reasons:

- **Multi-homed hosts.** Robots typically carry several interfaces (a private motor-control NIC such as `eth0` 192.168.123.x, plus `docker0`, VPN `utun*`, or Apple `awdl0`). Zenoh's default `interface="auto"` may bind the scout to the wrong NIC, so two peers on the same LAN never form a session — discovery returns `[]` and RPC reports `no responders`. Pin the scout to the LAN interface:

```bash
# name or local IP both work
ZENOH_MULTICAST_INTERFACE=wlan0 DEVICE_CONNECT_ALLOW_INSECURE=true python my_device.py
```

- **Multicast blocked entirely** (Wi-Fi AP/client isolation, managed switches). Bypass multicast with a direct unicast link — the device listens on a fixed TCP port and the agent connects straight to it:

```bash
# Device (dog):
ZENOH_LISTEN=tcp/0.0.0.0:7447 DEVICE_CONNECT_ALLOW_INSECURE=true python my_device.py
# Agent (user): just point at the device — still discovers via D2D presence
ZENOH_CONNECT=tcp/<device-ip>:7447 DEVICE_CONNECT_ALLOW_INSECURE=true python my_agent.py
```

> As of this release, setting `ZENOH_CONNECT` on an agent keeps D2D presence discovery active. (Previously it silently fell back to registry mode and returned `[]` against a registry-less device unless you also set `DEVICE_CONNECT_DISCOVERY_MODE=d2d`.) Opt into the registry path explicitly with `DEVICE_CONNECT_DISCOVERY_MODE=infra`.

**Trade-offs vs full infrastructure:**

| | Full Infrastructure | D2D Mode |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ async def connect(
**kwargs: Additional options:
- peer_mode: bool — force peer mode with scouting (default: False)
- listen: list[str] — endpoints to listen on
- multicast_interface: str — network interface name or IP to pin
multicast scouting to (also via ZENOH_MULTICAST_INTERFACE env).
Use on multi-homed hosts where interface="auto" scouts on the
wrong NIC.
"""
self._reconnect_cb = reconnect_cb
self._disconnect_cb = disconnect_cb
Expand Down Expand Up @@ -201,11 +205,29 @@ async def connect(
config_dict["listen"] = {"endpoints": listen_endpoints}

# Scouting config — enable multicast + gossip in peer mode
multicast_cfg: Dict[str, Any] = {
"enabled": peer_mode,
"autoconnect": {"router": ["peer", "router"], "peer": ["router", "peer"]},
}
# Pin the multicast scout to a specific network interface. On
# multi-homed hosts — robots with a separate internal motor-control
# NIC, plus docker0 / VPN (utun) / Apple AWDL interfaces — Zenoh's
# default interface="auto" can bind the scout socket to the wrong
# NIC, so two peers on the same LAN intermittently never form a
# session (discovery returns [] and RPC reports "no responders").
# Pinning to the LAN interface makes D2D discovery deterministic.
# Accepts an interface name (e.g. "wlan0") or a local IP address.
multicast_interface = kwargs.get("multicast_interface") or os.getenv(
"ZENOH_MULTICAST_INTERFACE"
)
if multicast_interface:
multicast_cfg["interface"] = multicast_interface
self._logger.info(
"Zenoh multicast scouting pinned to interface: %s",
multicast_interface,
)
config_dict["scouting"] = {
"multicast": {
"enabled": peer_mode,
"autoconnect": {"router": ["peer", "router"], "peer": ["router", "peer"]},
},
"multicast": multicast_cfg,
"gossip": {
"enabled": True,
"multihop": os.getenv("ZENOH_GOSSIP_MULTIHOP", "").lower() in ("true", "1", "yes"),
Expand Down
38 changes: 38 additions & 0 deletions packages/device-connect-edge/tests/test_zenoh_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import asyncio
import json
import os
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -197,6 +198,43 @@ async def test_connect_peer_mode_enables_scouting(self, mock_zenoh):
assert config_dict["scouting"]["gossip"]["multihop"] is False
# Verify d2d_mode flag is set
assert adapter._d2d_mode is True
# No interface pinned by default -> let Zenoh auto-select
assert "interface" not in config_dict["scouting"]["multicast"]

@pytest.mark.asyncio
@patch("device_connect_edge.messaging.zenoh_adapter.zenoh")
@patch("device_connect_edge.messaging.zenoh_adapter._ZENOH_AVAILABLE", True)
async def test_multicast_interface_kwarg_pins_scout(self, mock_zenoh):
session = _make_mock_session()
mock_zenoh.open = MagicMock(return_value=session)
mock_zenoh.Config.from_json5 = MagicMock(return_value=MagicMock())

from device_connect_edge.messaging.zenoh_adapter import ZenohAdapter

adapter = ZenohAdapter()
await adapter.connect(servers=["zenoh://"], multicast_interface="wlan0")

call_args = mock_zenoh.Config.from_json5.call_args[0][0]
config_dict = json.loads(call_args)
assert config_dict["scouting"]["multicast"]["interface"] == "wlan0"

@pytest.mark.asyncio
@patch("device_connect_edge.messaging.zenoh_adapter.zenoh")
@patch("device_connect_edge.messaging.zenoh_adapter._ZENOH_AVAILABLE", True)
async def test_multicast_interface_env_pins_scout(self, mock_zenoh):
session = _make_mock_session()
mock_zenoh.open = MagicMock(return_value=session)
mock_zenoh.Config.from_json5 = MagicMock(return_value=MagicMock())

from device_connect_edge.messaging.zenoh_adapter import ZenohAdapter

adapter = ZenohAdapter()
with patch.dict(os.environ, {"ZENOH_MULTICAST_INTERFACE": "eth1"}):
await adapter.connect(servers=["zenoh://"])

call_args = mock_zenoh.Config.from_json5.call_args[0][0]
config_dict = json.loads(call_args)
assert config_dict["scouting"]["multicast"]["interface"] == "eth1"

@pytest.mark.asyncio
@patch("device_connect_edge.messaging.zenoh_adapter.zenoh")
Expand Down
Loading