Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions packages/device-connect-agent-tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,27 @@ connect(
)
```

Portal-generated `.creds.json` files can also carry the broker URL and tenant.
When `NATS_CREDENTIALS_FILE` points to one of these bundles, `connect()` reads
the `nats.urls`, `nats.jwt`, `nats.nkey_seed`, optional TLS CA, and top-level
`tenant` fields, so agents and the MCP bridge use the same portal endpoint and
Device Connect namespace without passing those values separately:

```json
{
"tenant": "lab-a",
"nats": {
"urls": ["nats://portal.example:4222"],
"jwt": "...",
"nkey_seed": "..."
}
}
```

```bash
NATS_CREDENTIALS_FILE=./lab-a-agent.creds.json python my_agent.py
```

### Explicit Configuration

```python
Expand All @@ -344,15 +365,15 @@ connect(
| `MESSAGING_BACKEND` | `zenoh` (default), `nats`, or `mqtt` |
| `MESSAGING_URLS` | Broker URLs, comma-separated (generic) |
| `NATS_URL` | NATS broker URL (when using NATS backend) |
| `NATS_CREDENTIALS_FILE` | Path to `.creds.json` file |
| `NATS_CREDENTIALS_FILE` | Path to `.creds.json` file; portal bundles may also provide broker URLs and tenant |
| `NATS_JWT` + `NATS_NKEY_SEED` | Direct JWT auth |
| `NATS_TLS_CA_FILE` | CA certificate for TLS |
| `TENANT` | Device Connect zone/namespace (default: `"default"`) |
| `DEVICE_CONNECT_DISCOVERY_MODE` | Set to `d2d` to skip registry and discover via presence |

Resolution order: explicit parameter > environment variable > auto-discovery.

### Device-to-Device Mode (No Infrastructure)
### Device-to-Device Mode

With no endpoint URLs configured, the discovery tools automatically use D2D presence-based discovery (Zenoh multicast scouting) instead of querying the registry service. No Docker infrastructure needed:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,65 @@ def _auto_discover_credentials() -> Optional[Dict[str, Any]]:
return None


def _portal_credentials_path() -> Optional[str]:
"""Return the portal ``.creds.json`` path from env, if configured."""
for name in ("NATS_CREDENTIALS_FILE", "PORTAL_CREDENTIALS_FILE"):
path = os.getenv(name)
if path and path.endswith(".creds.json") and Path(path).expanduser().is_file():
return str(Path(path).expanduser())
return None


def load_portal_credentials_file(path: str | Path) -> Dict[str, Any]:
"""Load tenant, broker URLs, auth, and TLS from a portal ``.creds.json`` file."""
data = json.loads(Path(path).expanduser().read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"expected JSON object in credentials file: {path}")

nats_config = data.get("nats", {}) if isinstance(data.get("nats"), dict) else {}
urls = nats_config.get("urls", [])
if isinstance(urls, str):
urls = [urls]

auth: Dict[str, Any] = {}
if "jwt" in nats_config:
auth["jwt"] = nats_config["jwt"]
if "nkey_seed" in nats_config:
auth["nkey_seed"] = nats_config["nkey_seed"]

tls_config = None
if "tls_ca_file" in nats_config:
tls_config = {"ca_file": nats_config["tls_ca_file"]}

return {
"tenant": data.get("tenant"),
"device_id": data.get("device_id"),
"urls": [u.strip() for u in urls if isinstance(u, str) and u.strip()],
"auth": auth or None,
"tls": tls_config,
}


def _resolve_portal_credentials() -> Optional[Dict[str, Any]]:
path = _portal_credentials_path()
if not path:
return None
return load_portal_credentials_file(path)


def _backend_for_servers(servers: List[str]) -> Optional[str]:
if not servers:
return None
if all(
url.startswith(("nats://", "tls://", "nats+tls://", "ws://", "wss://"))
for url in servers
):
return "nats"
if all(url.startswith(("tcp/", "udp/", "tls/", "quic/")) for url in servers):
return "zenoh"
return None


def _auto_discover_tls() -> Optional[Dict[str, Any]]:
"""Search well-known paths for the CA certificate."""
root = _find_device_connect_root()
Expand Down Expand Up @@ -201,19 +260,41 @@ class DeviceConnection:
def __init__(
self,
nats_url: Optional[str] = None,
zone: str = "default",
zone: Optional[str] = None,
credentials: Optional[Dict[str, Any]] = None,
tls_config: Optional[Dict[str, Any]] = None,
request_timeout: float = 30.0,
*,
servers: Optional[List[str]] = None,
):
self.zone = zone
portal = _resolve_portal_credentials()

if zone is None:
zone = os.environ.get("TENANT") or (portal or {}).get("tenant")
self.zone = zone or "default"
self._request_timeout = request_timeout

explicit_servers: Optional[List[str]] = None
if servers:
explicit_servers = list(servers)
elif nats_url:
explicit_servers = [nats_url]
elif portal and portal.get("urls"):
explicit_servers = portal["urls"]

explicit_credentials = credentials or (portal or {}).get("auth")
explicit_tls = tls_config or (portal or {}).get("tls")

backend = os.getenv("MESSAGING_BACKEND")
if not backend and explicit_servers:
backend = _backend_for_servers(explicit_servers)

# Resolve config: explicit params -> env vars (via MessagingConfig) -> auto-discovery
config = MessagingConfig(
servers=[nats_url] if nats_url else None,
credentials=credentials,
tls_config=tls_config,
backend=backend,
servers=explicit_servers,
credentials=explicit_credentials,
tls_config=explicit_tls,
)

self._backend = config.backend # "nats", "zenoh", or "mqtt" (auto-detected)
Expand Down Expand Up @@ -642,46 +723,56 @@ def loop(self) -> asyncio.AbstractEventLoop:

def connect(
nats_url: Optional[str] = None,
zone: str = "default",
zone: Optional[str] = None,
credentials: Optional[Dict[str, Any]] = None,
tls_config: Optional[Dict[str, Any]] = None,
request_timeout: float = 30.0,
*,
servers: Optional[List[str]] = None,
) -> None:
"""Initialize the messaging connection.

The backend (NATS, Zenoh, MQTT) is auto-detected from environment
variables or can be set via MESSAGING_BACKEND.

When ``NATS_CREDENTIALS_FILE`` (or ``PORTAL_CREDENTIALS_FILE``) points at a
portal ``.creds.json``, broker URLs, JWT auth, TLS, and tenant are loaded
from that file automatically (same bundle as the MCP bridge).

Resolution order (for each setting):
1. Explicit parameter
2. Environment variable
3. Auto-discovery from well-known paths
2. Portal ``.creds.json`` (URLs, auth, tenant)
3. Environment variable
4. Auto-discovery from well-known paths

Environment variables:
- MESSAGING_BACKEND — "nats", "zenoh", or "mqtt" (auto-detected)
- MESSAGING_URLS — broker URLs (comma-separated)
- ZENOH_CONNECT — Zenoh endpoints (auto-selects zenoh backend)
- NATS_URL — NATS broker URL (legacy)
- NATS_CREDENTIALS_FILE — portal ``.creds.json`` (URLs + JWT + tenant)
- TENANT — Device Connect zone/namespace (default: "default")

Args:
nats_url: Broker URL (works for any backend despite the name).
zone: Device Connect tenant/zone namespace.
zone: Device Connect tenant/zone namespace. When omitted, uses ``TENANT``
env or the tenant field from a portal credentials file.
credentials: Auth credentials dict.
tls_config: TLS configuration dict.
request_timeout: Default timeout for device RPC calls.
servers: Broker URL list (overrides env defaults; use for multi-broker NATS).
"""
global _connection
with _lock:
if _connection is not None:
return
zone = zone or os.environ.get("TENANT", "default")
conn = DeviceConnection(
nats_url=nats_url,
zone=zone,
credentials=credentials,
tls_config=tls_config,
request_timeout=request_timeout,
servers=servers,
)
conn.connect()
_connection = conn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,19 @@ def from_credentials_file(cls, path: str) -> "BridgeConfig":
if "tls_ca_file" in nats_config:
tls_config = {"ca_file": nats_config["tls_ca_file"]}

backend = os.getenv("MESSAGING_BACKEND")
if not backend and urls:
if all(
u.startswith(("nats://", "tls://", "nats+tls://", "ws://", "wss://"))
for u in urls
):
backend = "nats"
elif all(u.startswith(("tcp/", "udp/", "tls/", "quic/")) for u in urls):
backend = "zenoh"

return cls(
messaging_urls=urls,
messaging_backend=backend,
messaging_auth=auth if auth else None,
messaging_tls=tls_config,
tenant=data.get("tenant", "default"),
Expand All @@ -161,7 +172,16 @@ def from_credentials_file(cls, path: str) -> "BridgeConfig":
def get_backend(self) -> str:
"""Determine messaging backend from explicit config or URL scheme."""
if self.messaging_backend:
return self.messaging_backend
return self.messaging_backend.lower()
if self.messaging_urls and all(
u.startswith(("nats://", "tls://", "nats+tls://", "ws://", "wss://"))
for u in self.messaging_urls
):
return "nats"
if self.messaging_urls and all(
u.startswith(("tcp/", "udp/", "tls/", "quic/")) for u in self.messaging_urls
):
return "zenoh"
from device_connect_edge.messaging.config import MessagingConfig
config = MessagingConfig(servers=self.messaging_urls)
return config.backend
Expand Down
73 changes: 73 additions & 0 deletions packages/device-connect-agent-tools/tests/test_connection_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_explicit_url_passed_to_config(self, MockConfig, _ad_creds, _ad_tls):

conn = conn_mod.DeviceConnection(nats_url="nats://myhost:4222")
MockConfig.assert_called_once_with(
backend="nats",
servers=["nats://myhost:4222"],
credentials=None,
tls_config=None,
Expand Down Expand Up @@ -94,6 +95,78 @@ def test_config_skips_autodiscovery_when_already_set(self, MockConfig, ad_creds,
# ── Auto-discovery helpers ───────────────────────────────────────


class TestPortalCredentialsFile:
def test_load_portal_credentials_file(self, tmp_path):
creds = tmp_path / "portal.creds.json"
creds.write_text(
json.dumps(
{
"device_id": "robot-1",
"tenant": "erivan01",
"nats": {
"urls": ["nats://portal.deviceconnect.dev:4222"],
"jwt": "eyJ-test",
"nkey_seed": "SUATEST",
},
}
)
)
meta = conn_mod.load_portal_credentials_file(creds)
assert meta["tenant"] == "erivan01"
assert meta["device_id"] == "robot-1"
assert meta["urls"] == ["nats://portal.deviceconnect.dev:4222"]
assert meta["auth"]["jwt"] == "eyJ-test"

@patch.object(conn_mod, "_auto_discover_tls", return_value=None)
@patch.object(conn_mod, "_auto_discover_credentials", return_value=None)
@patch("device_connect_agent_tools.connection.MessagingConfig")
def test_device_connection_uses_portal_urls_and_tenant(self, MockConfig, _ad_creds, _ad_tls, tmp_path):
creds = tmp_path / "portal.creds.json"
creds.write_text(
json.dumps(
{
"tenant": "erivan01",
"nats": {
"urls": ["nats://portal.deviceconnect.dev:4222"],
"jwt": "j",
"nkey_seed": "s",
},
}
)
)
mock_cfg = MagicMock()
mock_cfg.servers = ["nats://portal.deviceconnect.dev:4222"]
mock_cfg.credentials = {"jwt": "j", "nkey_seed": "s"}
mock_cfg.tls_config = None
mock_cfg.backend = "nats"
MockConfig.return_value = mock_cfg

with patch.dict(os.environ, {"NATS_CREDENTIALS_FILE": str(creds)}, clear=True):
conn = conn_mod.DeviceConnection()
MockConfig.assert_called_once()
kwargs = MockConfig.call_args.kwargs
assert kwargs["servers"] == ["nats://portal.deviceconnect.dev:4222"]
assert kwargs["backend"] == "nats"
assert conn.zone == "erivan01"
conn.close()

@patch.object(conn_mod, "_auto_discover_tls", return_value=None)
@patch.object(conn_mod, "_auto_discover_credentials", return_value=None)
@patch("device_connect_agent_tools.connection.MessagingConfig")
def test_device_connection_zone_from_tenant_env(self, MockConfig, _ad_creds, _ad_tls):
mock_cfg = MagicMock()
mock_cfg.servers = ["nats://localhost:4222"]
mock_cfg.credentials = None
mock_cfg.tls_config = None
mock_cfg.backend = "nats"
MockConfig.return_value = mock_cfg

with patch.dict(os.environ, {"TENANT": "erivan01"}, clear=True):
conn = conn_mod.DeviceConnection()
assert conn.zone == "erivan01"
conn.close()


class TestAutoDiscovery:
def test_find_device_connect_root_from_cwd(self, tmp_path):
"""Should find root when security_infra/credentials exists."""
Expand Down