diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index ed80eac5..634add8f 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -4,6 +4,7 @@ auth, check, copy, + credential_helper, delete, dependencies, docs, diff --git a/cloudsmith_cli/cli/commands/credential_helper/__init__.py b/cloudsmith_cli/cli/commands/credential_helper/__init__.py new file mode 100644 index 00000000..93e5feae --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/__init__.py @@ -0,0 +1,39 @@ +# Copyright 2026 Cloudsmith Ltd +""" +Credential helper commands for Cloudsmith. + +This module provides credential helper commands for package managers +that follow their respective credential helper protocols. +""" + +import click + +from ..main import main +from .docker import docker as docker_cmd +from .manage import install_cmd, list_cmd, uninstall_cmd + + +@click.group() +def credential_helper(): + """ + Credential helpers for package managers. + + These commands provide credentials for package managers like Docker. + Use ``install`` to set up the on-PATH launcher and configure the package + manager automatically, or run the runtime command directly for debugging. + + Examples: + # Install Docker credential helper + $ cloudsmith credential-helper install docker + + # Test Docker credential helper directly + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + """ + + +credential_helper.add_command(docker_cmd, name="docker") +credential_helper.add_command(install_cmd, name="install") +credential_helper.add_command(uninstall_cmd, name="uninstall") +credential_helper.add_command(list_cmd, name="list") + +main.add_command(credential_helper, name="credential-helper") diff --git a/cloudsmith_cli/cli/commands/credential_helper/docker.py b/cloudsmith_cli/cli/commands/credential_helper/docker.py new file mode 100644 index 00000000..2829fa14 --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/docker.py @@ -0,0 +1,66 @@ +# Copyright 2026 Cloudsmith Ltd +""" +Docker credential helper command. + +Implements the Docker credential helper protocol for Cloudsmith registries. + +See: https://github.com/docker/docker-credential-helpers +""" + +import sys + +import click + +from ....credential_helpers.docker import execute +from ...decorators import common_api_auth_options, resolve_credentials + + +@click.command() +@click.argument("operation", required=False, default="get") +@common_api_auth_options +@resolve_credentials +def docker(opts, operation): + """ + Docker credential helper for Cloudsmith registries. + + Reads a Docker registry server URL from stdin and returns credentials in + JSON format. Implements the full Docker credential helper protocol + (get/store/erase/list). + + Provides credentials for all Cloudsmith Docker registries: ``*.cloudsmith.io``, + ``*.cloudsmith.com``, and any custom domains configured for the organisation + (requires CLOUDSMITH_ORG and a valid API key/token). + + Input (stdin): + Server URL as plain text (e.g. "docker.cloudsmith.io") + + Output (stdout): + JSON: {"Username": "token", "Secret": ""} + + Exit codes: + 0: Success + 1: Error (no credentials available, not a Cloudsmith registry, etc.) + + Examples: + # Manual testing + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + + # Called by Docker via launcher + $ echo "docker.cloudsmith.io" | docker-credential-cloudsmith get + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organisation slug (required for custom domain support) + """ + exit_code, stdout, stderr = execute( + operation, + sys.stdin, + credential=opts.credential, + api_host=opts.api_host, + ) + + if stdout is not None: + click.echo(stdout) + if stderr is not None: + click.echo(stderr, err=True) + sys.exit(exit_code) diff --git a/cloudsmith_cli/cli/commands/credential_helper/manage.py b/cloudsmith_cli/cli/commands/credential_helper/manage.py new file mode 100644 index 00000000..54d3a3ae --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/manage.py @@ -0,0 +1,299 @@ +# Copyright 2026 Cloudsmith Ltd +"""Install/uninstall/list commands for credential helpers. + +Provides ``credential-helper install``, ``credential-helper uninstall``, and +``credential-helper list`` to manage the on-PATH launcher binary and the +``config.json`` entries for each supported credential helper. +""" + +from __future__ import annotations + +import os +import sys + +import click + +from ....credential_helpers.docker.installer import DockerInstaller +from ... import utils +from ...decorators import ( + common_api_auth_options, + common_cli_config_options, + common_cli_output_options, + resolve_credentials, +) + +# --------------------------------------------------------------------------- +# Helper registry — extend here when new helpers are added +# --------------------------------------------------------------------------- + +_INSTALLERS: dict[str, type] = { + "docker": DockerInstaller, +} + + +def _get_installer(name: str): + """Return an instantiated installer for *name*, or exit with a clear error. + + Parameters + ---------- + name: + The helper name as supplied by the user (e.g. ``"docker"``). + + Returns + ------- + DockerInstaller + An instance of the appropriate installer class. + + Raises + ------ + SystemExit + If *name* is not in :data:`_INSTALLERS`. + """ + cls = _INSTALLERS.get(name) + if cls is None: + available = ", ".join(sorted(_INSTALLERS)) + click.echo( + f"Error: unknown helper {name!r}. Available helpers: {available}", + err=True, + ) + sys.exit(1) + return cls() + + +# --------------------------------------------------------------------------- +# install +# --------------------------------------------------------------------------- + + +@click.command("install") +@click.argument("helper") +@click.option( + "--bin-dir", default=None, help="Directory to install the launcher binary." +) +@click.option( + "--domain", + "domains", + multiple=True, + help="Additional registry hostname to configure (repeatable).", +) +@click.option( + "--dry-run", + is_flag=True, + default=False, + help="Show what would be done without making any changes.", +) +@click.option( + "--no-discover", + is_flag=True, + default=False, + help="Disable automatic discovery of custom Docker domains.", +) +@click.option( + "--refresh", + is_flag=True, + default=False, + help="Bypass the custom-domain cache and fetch fresh data from the API.", +) +@click.option( + "--org", + default=None, + help="Cloudsmith organisation slug for custom-domain discovery.", +) +@common_cli_config_options +@common_cli_output_options +@common_api_auth_options +@resolve_credentials +@click.pass_context +def install_cmd( + ctx, + opts, + helper: str, + bin_dir: str | None, + domains: tuple[str, ...], + dry_run: bool, + no_discover: bool, + refresh: bool, + org: str | None, +) -> None: + """Install a credential helper launcher and configure the package manager. + + HELPER is the name of the credential helper to install (e.g. ``docker``). + + Examples: + + \b + # Install Docker credential helper + $ cloudsmith credential-helper install docker + + \b + # Install with a custom domain + $ cloudsmith credential-helper install docker --domain my.registry.example.com + + \b + # Preview without making changes + $ cloudsmith credential-helper install docker --dry-run + + \b + # Disable automatic custom-domain discovery + $ cloudsmith credential-helper install docker --no-discover + """ + installer = _get_installer(helper) + org = org or os.environ.get("CLOUDSMITH_ORG", "").strip() or None + api_key = opts.credential.api_key if opts.credential else None + auth_type = ( + getattr(opts.credential, "auth_type", "api_key") + if opts.credential + else "api_key" + ) + try: + actions = installer.install( + bin_dir=bin_dir, + domains=domains, + dry_run=dry_run, + discover=not no_discover, + refresh=refresh, + org=org, + api_key=api_key, + auth_type=auth_type, + api_host=opts.api_host, + ) + except OSError as exc: + raise click.ClickException( + f"Failed to install {helper!r} credential helper: {exc}" + ) + + use_stderr = utils.should_use_stderr(opts) + warnings = [a for a in actions if a.startswith("WARNING")] + normal = [a for a in actions if not a.startswith("WARNING")] + data = { + "helper": helper, + "dry_run": dry_run, + "actions": normal, + "warnings": warnings, + } + if utils.maybe_print_as_json(opts, data): + return + + if dry_run: + click.echo("Dry run — no changes will be made:", err=use_stderr) + for action in normal: + click.echo(f" {action}" if dry_run else action, err=use_stderr) + for warning in warnings: + click.secho(f" {warning}" if dry_run else warning, err=True, fg="yellow") + + +# --------------------------------------------------------------------------- +# uninstall +# --------------------------------------------------------------------------- + + +@click.command("uninstall") +@click.argument("helper") +@click.option( + "--bin-dir", default=None, help="Directory where the launcher binary was installed." +) +@click.option( + "--dry-run", + is_flag=True, + default=False, + help="Show what would be done without making any changes.", +) +@common_cli_config_options +@common_cli_output_options +@click.pass_context +def uninstall_cmd(ctx, opts, helper: str, bin_dir: str | None, dry_run: bool) -> None: + """Uninstall a credential helper launcher and remove its config entries. + + HELPER is the name of the credential helper to uninstall (e.g. ``docker``). + + Examples: + + \b + # Uninstall Docker credential helper + $ cloudsmith credential-helper uninstall docker + + \b + # Preview without making changes + $ cloudsmith credential-helper uninstall docker --dry-run + """ + installer = _get_installer(helper) + try: + actions = installer.uninstall(bin_dir=bin_dir, dry_run=dry_run) + except OSError as exc: + raise click.ClickException( + f"Failed to uninstall {helper!r} credential helper: {exc}" + ) + + use_stderr = utils.should_use_stderr(opts) + warnings = [a for a in actions if a.startswith("WARNING")] + normal = [a for a in actions if not a.startswith("WARNING")] + data = { + "helper": helper, + "dry_run": dry_run, + "actions": normal, + "warnings": warnings, + } + if utils.maybe_print_as_json(opts, data): + return + + if dry_run: + click.echo("Dry run — no changes will be made:", err=use_stderr) + for action in normal: + click.echo(f" {action}" if dry_run else action, err=use_stderr) + for warning in warnings: + click.secho(f" {warning}" if dry_run else warning, err=True, fg="yellow") + + +# --------------------------------------------------------------------------- +# list +# --------------------------------------------------------------------------- + + +@click.command("list") +@common_cli_config_options +@common_cli_output_options +@click.pass_context +def list_cmd(ctx, opts) -> None: + """List available credential helpers and their installation status. + + Shows which helpers are available, whether their launcher binary is + present on PATH, and which registry hosts they are configured for. + + Example: + + \b + $ cloudsmith credential-helper list + """ + use_stderr = utils.should_use_stderr(opts) + + data = [] + for name, cls in sorted(_INSTALLERS.items()): + installer = cls() + st = installer.status() + data.append( + { + "helper": name, + "summary": installer.summary, + "launcher": st.get("launcher"), + "hosts": st.get("hosts", []), + } + ) + + if utils.maybe_print_as_json(opts, data): + return + + for entry in data: + name = entry["helper"] + launcher = entry["launcher"] + hosts = entry["hosts"] + summary = entry["summary"] + + click.echo(f"{name} ({summary})", err=use_stderr) + if launcher: + click.echo(f" launcher : {launcher}", err=use_stderr) + else: + click.echo(" launcher : not installed", err=use_stderr) + if hosts: + click.echo(f" hosts : {', '.join(hosts)}", err=use_stderr) + else: + click.echo(" hosts : none configured", err=use_stderr) diff --git a/cloudsmith_cli/cli/tests/commands/test_credential_helper.py b/cloudsmith_cli/cli/tests/commands/test_credential_helper.py new file mode 100644 index 00000000..98cf0a5a --- /dev/null +++ b/cloudsmith_cli/cli/tests/commands/test_credential_helper.py @@ -0,0 +1,564 @@ +"""Tests for the `cloudsmith credential-helper docker` command.""" + +import io +import json +from unittest.mock import patch + +import httpretty +import httpretty.core +import pytest + +from ....cli.commands.credential_helper.docker import docker +from ....core.credentials.models import CredentialResult +from ....credential_helpers.backends import BackendKind +from ....credential_helpers.custom_domains import ( + CustomDomain, + get_cache_path, + get_custom_domains, + get_format_domains, + read_cache, + write_cache, +) +from ....credential_helpers.docker.runtime import ( + _REFUSAL_MESSAGE, + execute, + get_credentials as helper_get_credentials, +) + +API_HOST = "https://api.cloudsmith.io" + + +# --------------------------------------------------------------------------- +# 1. runtime.execute — protocol matrix +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "operation,get_return,stdin_text,expected_code,expected_stdout,stderr_substr", + [ + # get-success: get_credentials returns a dict → (0, json, None) + ( + "get", + {"Username": "token", "Secret": "k_abc"}, + "docker.cloudsmith.io", + 0, + json.dumps({"Username": "token", "Secret": "k_abc"}), + None, + ), + # get-refusal: get_credentials returns None → (1, None, refusal) + ("get", None, "evil.example.com", 1, None, "Unable to retrieve credentials"), + # store: drains stdin, returns (0, None, None) + ("store", None, '{"ServerURL": "docker.cloudsmith.io"}', 0, None, None), + # erase: drains stdin, returns (0, None, None) + ("erase", None, '{"ServerURL": "docker.cloudsmith.io"}', 0, None, None), + # list: returns (0, '{}', None) + ("list", None, "", 0, "{}", None), + # unknown: returns (1, None, error containing operation name) + ("frobnicate", None, "", 1, None, "frobnicate"), + ], +) +def test_execute_protocol_matrix( + operation, get_return, stdin_text, expected_code, expected_stdout, stderr_substr +): + """execute() returns the expected (code, stdout, stderr) for each operation.""" + stdin = io.StringIO(stdin_text) + + if operation == "get" and get_return is not None: + with patch( + "cloudsmith_cli.credential_helpers.docker.runtime.get_credentials", + return_value=get_return, + ): + code, stdout, stderr = execute(operation, stdin) + elif operation == "get": + with patch( + "cloudsmith_cli.credential_helpers.docker.runtime.get_credentials", + return_value=get_return, + ): + code, stdout, stderr = execute(operation, stdin) + else: + code, stdout, stderr = execute(operation, stdin) + + assert code == expected_code + assert stdout == expected_stdout + if stderr_substr is None: + assert stderr is None + else: + assert stderr_substr in stderr + + +# --------------------------------------------------------------------------- +# 2. execute get-path boundary guards (broken-pipe + RuntimeError) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("scenario", ["raising_get_credentials", "broken_pipe_stdin"]) +def test_execute_get_boundary_guard(scenario): + """Exceptions inside execute('get', ...) never escape — degrade to (1, None, refusal). + + Retained guards: + - broken-pipe: an OSError from stdin.read() is caught at the protocol boundary. + - RuntimeError from get_credentials is caught the same way. + """ + if scenario == "raising_get_credentials": + stdin = io.StringIO("docker.cloudsmith.io") + with patch( + "cloudsmith_cli.credential_helpers.docker.runtime.get_credentials", + side_effect=RuntimeError("boom"), + ): + code, stdout, stderr = execute("get", stdin) + else: + # broken-pipe guard: stdin.read() itself raises an OSError + class BrokenPipeStdin: + def read(self): + raise OSError("broken pipe") + + credential = CredentialResult(api_key="k_abc", source_name="test") + code, stdout, stderr = execute( + "get", BrokenPipeStdin(), credential=credential, api_host=None + ) + + assert code == 1 + assert stdout is None + assert stderr == _REFUSAL_MESSAGE + + +# --------------------------------------------------------------------------- +# 3. get_credentials +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "server_url,credential,is_cloudsmith_return,expected", + [ + # cloudsmith domain + creds → dict + ( + "docker.cloudsmith.io", + CredentialResult(api_key="k_xyz", source_name="test"), + True, + {"Username": "token", "Secret": "k_xyz"}, + ), + # no credential → None (short-circuits before domain check) + ("docker.cloudsmith.io", None, None, None), + # non-cloudsmith domain → None + ( + "evil.example.com", + CredentialResult(api_key="k_xyz", source_name="test"), + False, + None, + ), + ], +) +def test_get_credentials(server_url, credential, is_cloudsmith_return, expected): + """get_credentials returns a creds dict for valid CS domains, None otherwise.""" + if is_cloudsmith_return is None: + # No domain check needed when credential is absent + result = helper_get_credentials(server_url, credential=credential) + else: + with patch( + "cloudsmith_cli.credential_helpers.docker.runtime.is_cloudsmith_domain", + return_value=is_cloudsmith_return, + ): + result = helper_get_credentials(server_url, credential=credential) + + assert result == expected + + +# --------------------------------------------------------------------------- +# 4. CLI wiring smoke test +# --------------------------------------------------------------------------- + + +def test_cli_no_arg_defaults_to_get(runner): + """Invoking docker with no OPERATION defaults to 'get', emitting creds JSON. + + Proves the click shim is correctly wired to execute(). + """ + fake_creds = {"Username": "token", "Secret": "k_abc"} + + with patch( + "cloudsmith_cli.credential_helpers.docker.runtime.get_credentials", + return_value=fake_creds, + ): + result = runner.invoke( + docker, + args=[], + input="docker.cloudsmith.io", + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert json.dumps(fake_creds) in result.stdout + + +def test_execute_get_empty_stdin_returns_exit_1(): + """execute get with empty stdin → (1, None, 'No server URL...').""" + stdin = io.StringIO("") + code, stdout, stderr = execute("get", stdin) + + assert code == 1 + assert stdout is None + assert "No server URL provided" in stderr + + +# --------------------------------------------------------------------------- +# 5. BackendKind load-bearing values +# --------------------------------------------------------------------------- + + +def test_backend_kind_values(): + """DOCKER == 6 and NPM == 9 are load-bearing (used as integer wire values).""" + assert BackendKind.DOCKER == 6 + assert BackendKind.NPM == 9 + + +# --------------------------------------------------------------------------- +# 6. get_custom_domains — HTTP status matrix +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _cache_dir(tmp_path, monkeypatch): + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + monkeypatch.setattr( + httpretty.core.fakesock.socket, + "shutdown", + lambda self, how: None, + raising=False, + ) + + +@pytest.mark.parametrize( + "status,expect_domains,expect_cached", + [ + # 200 → records built and cached + (200, True, True), + # 402 → [] and cached (feature not enabled) + (402, False, True), + # 404 → [] and cached (org not found) + (404, False, True), + # 403 → [] but NOT cached (may succeed after auth) + (403, False, False), + # 401 → [] but NOT cached (same branch as 403) + (401, False, False), + # 500 → [] and NOT cached + (500, False, False), + ], +) +@httpretty.activate(allow_net_connect=False) +def test_get_custom_domains_status_matrix( + tmp_path, monkeypatch, status, expect_domains, expect_cached +): + """get_custom_domains() caches or not based on HTTP status.""" + # redirect per-test (autouse fixture already set module-level path) + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + + if status == 200: + body = json.dumps( + [ + { + "host": "docker.acme.com", + "backend_kind": 6, + "domain_type": 1, + "enabled": True, + "validated": True, + } + ] + ) + else: + body = json.dumps({"detail": "error"}) + + httpretty.register_uri( + httpretty.GET, + f"{API_HOST}/orgs/acme/custom-domains/", + body=body, + status=status, + content_type="application/json", + ) + + result = get_custom_domains("acme", api_key="k_abc", api_host=API_HOST) + cache = read_cache(get_cache_path("acme")) + + if expect_domains: + assert len(result) == 1 + assert result[0].host == "docker.acme.com" + else: + assert result == [] + + if expect_cached: + assert cache is not None # [] is falsy but not None + else: + assert cache is None + + # For the 200 case specifically, verify the auth header (X-Api-Key) + if status == 200: + assert httpretty.last_request().headers.get("X-Api-Key") == "k_abc" + + +# --------------------------------------------------------------------------- +# 7. get_custom_domains — cache edge cases +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "scenario,expected", + [ + # legacy string-list cache → miss (re-fetch required); retains legacy-cache guard + ("legacy_string_list", None), + # empty [] cache → hit (valid cached 'no domains') + ("empty_list", []), + ], +) +def test_get_custom_domains_cache_edge(tmp_path, monkeypatch, scenario, expected): + """Cache format edge cases: legacy string-list is a miss; empty list is a hit.""" + import time + + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + + cache_path = get_cache_path("acme") + + if scenario == "legacy_string_list": + data = {"domains": ["docker.acme.com"], "cached_at": time.time()} + else: + data = {"domains": [], "cached_at": time.time()} + + cache_path.write_text(json.dumps(data), encoding="utf-8") + assert read_cache(cache_path) == expected + + +# --------------------------------------------------------------------------- +# 8. get_format_domains +# --------------------------------------------------------------------------- + + +@httpretty.activate(allow_net_connect=False) +def test_get_format_domains_filters_correctly(tmp_path, monkeypatch): + """get_format_domains returns only enabled+validated Docker hosts. + + Mixed body: docker enabled+validated (included), npm (excluded), + disabled docker (excluded), unvalidated docker (excluded), backend_kind=None (excluded). + """ + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + body = [ + # Included: Docker, enabled, validated + { + "host": "docker.acme.com", + "backend_kind": 6, + "domain_type": 1, + "enabled": True, + "validated": True, + }, + # Excluded: NPM backend + { + "host": "npm.acme.com", + "backend_kind": 9, + "domain_type": 1, + "enabled": True, + "validated": True, + }, + # Excluded: disabled + { + "host": "docker2.acme.com", + "backend_kind": 6, + "domain_type": 1, + "enabled": False, + "validated": True, + }, + # Excluded: unvalidated + { + "host": "docker3.acme.com", + "backend_kind": 6, + "domain_type": 1, + "enabled": True, + "validated": False, + }, + # Excluded: backend_kind=None (download domain) + { + "host": "dl.acme.com", + "backend_kind": None, + "domain_type": 0, + "enabled": True, + "validated": True, + }, + ] + httpretty.register_uri( + httpretty.GET, + f"{API_HOST}/orgs/acme/custom-domains/", + body=json.dumps(body), + status=200, + content_type="application/json", + ) + + hosts = get_format_domains( + "acme", BackendKind.DOCKER, api_key="k", api_host=API_HOST + ) + + assert hosts == ["docker.acme.com"] + + +# --------------------------------------------------------------------------- +# 9. is_cloudsmith_domain +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "host,env_org,cached_domains,backend_kind,expected", + [ + # Standard *.cloudsmith.io → True (no API) + ("docker.cloudsmith.io", None, None, None, True), + ("dl.cloudsmith.io", None, None, None, True), + # Standard *.cloudsmith.com → True (no API) + ("docker.cloudsmith.com", None, None, None, True), + # Non-cloudsmith → False + ("evil.example.com", None, None, None, False), + # Custom enabled+validated → True + ( + "docker.acme.com", + "acme", + [ + CustomDomain( + host="docker.acme.com", backend_kind=6, enabled=True, validated=True + ) + ], + None, + True, + ), + # Custom disabled → False + ( + "docker.acme.com", + "acme", + [ + CustomDomain( + host="docker.acme.com", + backend_kind=6, + enabled=False, + validated=True, + ) + ], + None, + False, + ), + # Custom enabled but unvalidated → False + ( + "docker.acme.com", + "acme", + [ + CustomDomain( + host="docker.acme.com", + backend_kind=6, + enabled=True, + validated=False, + ) + ], + None, + False, + ), + # backend_kind=DOCKER: docker custom domain → True + ( + "docker.acme.com", + "acme", + [ + CustomDomain( + host="docker.acme.com", backend_kind=6, enabled=True, validated=True + ) + ], + BackendKind.DOCKER, + True, + ), + # backend_kind=DOCKER: npm custom domain → False + ( + "npm.acme.com", + "acme", + [ + CustomDomain( + host="npm.acme.com", backend_kind=9, enabled=True, validated=True + ) + ], + BackendKind.DOCKER, + False, + ), + # UPPERCASE custom host row with backend_kind=DOCKER — guards the + # get_format_domains() casing path (the branch the lowercase fix touched) + ( + "DOCKER.ACME.COM", + "acme", + [ + CustomDomain( + host="docker.acme.com", backend_kind=6, enabled=True, validated=True + ) + ], + BackendKind.DOCKER, + True, + ), + ], +) +def test_is_cloudsmith_domain( + tmp_path, monkeypatch, host, env_org, cached_domains, backend_kind, expected +): + """is_cloudsmith_domain returns correct bool for standard, custom, and edge cases.""" + from ....credential_helpers.common import is_cloudsmith_domain + + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + + if env_org: + monkeypatch.setenv("CLOUDSMITH_ORG", env_org) + else: + monkeypatch.delenv("CLOUDSMITH_ORG", raising=False) + + if cached_domains is not None: + write_cache(get_cache_path(env_org), cached_domains) + + kwargs = {"api_key": "k_abc", "api_host": API_HOST} + if backend_kind is not None: + kwargs["backend_kind"] = backend_kind + + result = is_cloudsmith_domain(host, **kwargs) + assert result is expected + + +# --------------------------------------------------------------------------- +# 10. Docker runtime backend_kind wiring +# --------------------------------------------------------------------------- + + +def test_get_credentials_refuses_npm_custom_domain(tmp_path, monkeypatch): + """get_credentials for an NPM custom domain → None (runtime passes BackendKind.DOCKER). + + Proves the runtime passes backend_kind=DOCKER to is_cloudsmith_domain. + """ + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + monkeypatch.setenv("CLOUDSMITH_ORG", "acme") + + cache_path = get_cache_path("acme") + write_cache( + cache_path, + [ + CustomDomain( + host="npm.acme.com", backend_kind=9, enabled=True, validated=True + ) + ], + ) + + credential = CredentialResult(api_key="k_xyz", source_name="test") + result = helper_get_credentials( + "npm.acme.com", credential=credential, api_host=API_HOST + ) + + assert result is None diff --git a/cloudsmith_cli/cli/tests/commands/test_credential_helper_install.py b/cloudsmith_cli/cli/tests/commands/test_credential_helper_install.py new file mode 100644 index 00000000..60c8037b --- /dev/null +++ b/cloudsmith_cli/cli/tests/commands/test_credential_helper_install.py @@ -0,0 +1,718 @@ +# Copyright 2026 Cloudsmith Ltd +"""Tests for credential-helper install/uninstall/list commands and launchers.""" + +from __future__ import annotations + +import json +import os +import stat +from pathlib import Path +from unittest.mock import patch + +import click.testing +import pytest + +from ....credential_helpers.docker.installer import DockerInstaller +from ....credential_helpers.launchers import ( + _launcher_content, + _launcher_filename, + _user_bin_dir, + is_on_path, + remove_launcher, + resolve_bin_dir, + write_launcher, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def runner(): + """Return a CliRunner.""" + return click.testing.CliRunner() + + +# --------------------------------------------------------------------------- +# 1. launcher filename + content (pure, platform-parameterised) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "windows,expected_name,expected_content", + [ + ( + False, + "docker-credential-cloudsmith", + '#!/bin/sh\nexec cloudsmith credential-helper docker "$@"\n', + ), + ( + True, + "docker-credential-cloudsmith.cmd", + "@echo off\r\ncloudsmith credential-helper docker %*\r\n", + ), + ], +) +def test_launcher_filename_and_content(windows, expected_name, expected_content): + """Per-platform launcher name + body — guards the exact Windows .cmd bytes. + + Parameterised on ``windows`` rather than patching ``os.name`` so no + ``pathlib.Path`` is built under a faked platform (which raises + ``NotImplementedError`` on Python < 3.12). + """ + assert ( + _launcher_filename("docker-credential-cloudsmith", windows=windows) + == expected_name + ) + assert ( + _launcher_content("cloudsmith credential-helper docker", windows=windows) + == expected_content + ) + + +# --------------------------------------------------------------------------- +# 2. write_launcher / remove_launcher — end-to-end on the host platform +# --------------------------------------------------------------------------- + + +def test_write_launcher_writes_executable_script(tmp_path): + """write_launcher writes the shim with content + 0o755 on the host platform.""" + dest = write_launcher( + tmp_path, + "docker-credential-cloudsmith", + "cloudsmith credential-helper docker", + ) + expected = '#!/bin/sh\nexec cloudsmith credential-helper docker "$@"\n' + assert dest.read_text(encoding="utf-8") == expected + assert stat.S_IMODE(dest.stat().st_mode) == 0o755 + + +def test_remove_launcher(tmp_path): + """remove_launcher returns True + file gone when present, False when absent.""" + write_launcher( + tmp_path, + "docker-credential-cloudsmith", + "cloudsmith credential-helper docker", + ) + assert remove_launcher(tmp_path, "docker-credential-cloudsmith") is True + assert not (tmp_path / "docker-credential-cloudsmith").exists() + + # Second call: file is gone now + assert remove_launcher(tmp_path, "docker-credential-cloudsmith") is False + + +# --------------------------------------------------------------------------- +# 3. resolve_bin_dir + user-bin (no os.name faking) +# --------------------------------------------------------------------------- + + +def test_resolve_bin_dir_override(tmp_path): + """An explicit override is returned verbatim.""" + assert resolve_bin_dir(str(tmp_path)) == tmp_path + + +@pytest.mark.parametrize( + "windows,expected_suffix", + [ + (False, (".local", "bin")), + (True, ("Cloudsmith", "bin")), + ], +) +def test_user_bin_dir(tmp_path, monkeypatch, windows, expected_suffix): + """_user_bin_dir returns the per-platform user bin location. + + Parameterised on ``windows`` (not ``os.name``) to avoid building a + ``WindowsPath`` on a posix host. + """ + monkeypatch.setattr(Path, "home", staticmethod(lambda: tmp_path)) + monkeypatch.setenv("LOCALAPPDATA", str(tmp_path)) + result = _user_bin_dir(windows) + assert result.parts[-2:] == expected_suffix + + +# --------------------------------------------------------------------------- +# 4. is_on_path +# --------------------------------------------------------------------------- + + +def test_is_on_path(tmp_path, monkeypatch): + """is_on_path returns True when dir is in PATH, False when absent.""" + monkeypatch.setenv("PATH", str(tmp_path)) + assert is_on_path(tmp_path) is True + + monkeypatch.setenv("PATH", "/usr/bin:/usr/local/bin") + assert is_on_path(tmp_path) is False + + +# --------------------------------------------------------------------------- +# 5. DockerInstaller.install +# --------------------------------------------------------------------------- + + +def test_docker_installer_install(tmp_path, monkeypatch): + """install sets default+extra domains, preserves foreign entries, writes the launcher.""" + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + monkeypatch.setenv("PATH", str(bin_dir)) + + # Seed a config with foreign data that must be preserved + docker_dir.mkdir(parents=True) + (docker_dir / "config.json").write_text( + json.dumps( + { + "auths": {"ghcr.io": {"auth": "dG9rZW4="}}, + "credHelpers": {"ghcr.io": "gh"}, + }, + indent=2, + ) + + "\n", + encoding="utf-8", + ) + + installer = DockerInstaller() + installer.install(bin_dir=str(bin_dir), domains=("my.registry.example.com",)) + + cfg = json.loads((docker_dir / "config.json").read_text()) + + # Default host registered + assert cfg["credHelpers"]["docker.cloudsmith.io"] == "cloudsmith" + # Extra --domain host registered + assert cfg["credHelpers"]["my.registry.example.com"] == "cloudsmith" + # Foreign entries preserved + assert cfg["auths"] == {"ghcr.io": {"auth": "dG9rZW4="}} + assert cfg["credHelpers"]["ghcr.io"] == "gh" + # Launcher written + assert (bin_dir / "docker-credential-cloudsmith").exists() + + +# --------------------------------------------------------------------------- +# 6. install --dry-run +# --------------------------------------------------------------------------- + + +def test_docker_installer_dry_run(tmp_path, monkeypatch): + """dry_run=True: no launcher written, config.json absent, returns planned strings.""" + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + + installer = DockerInstaller() + actions = installer.install(bin_dir=str(bin_dir), dry_run=True) + + assert not (bin_dir / "docker-credential-cloudsmith").exists() + assert not (docker_dir / "config.json").exists() + assert any("would write launcher" in a for a in actions) + assert any("docker.cloudsmith.io" in a for a in actions) + + +# --------------------------------------------------------------------------- +# 7. install idempotent +# --------------------------------------------------------------------------- + + +def test_docker_installer_idempotent(tmp_path, monkeypatch): + """Second install run reports no change (config mtime unchanged, 'already up to date').""" + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + + installer = DockerInstaller() + installer.install(bin_dir=str(bin_dir)) + + mtime_before = (docker_dir / "config.json").stat().st_mtime + actions = installer.install(bin_dir=str(bin_dir)) + mtime_after = (docker_dir / "config.json").stat().st_mtime + + assert mtime_before == mtime_after + assert any("already up to date" in a for a in actions) + + +# --------------------------------------------------------------------------- +# 8. uninstall +# --------------------------------------------------------------------------- + + +def test_docker_installer_uninstall(tmp_path, monkeypatch): + """uninstall removes only cloudsmith entries (foreign kept) + removes launcher. + + Also verifies --bin-dir: install to custom dir, uninstall with same --bin-dir removes it. + """ + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + custom_bin_dir = tmp_path / "custom_bin" + + # Install to a custom bin dir + installer = DockerInstaller() + docker_dir.mkdir(parents=True) + cfg_path = docker_dir / "config.json" + cfg_path.write_text( + json.dumps( + { + "credHelpers": { + "docker.cloudsmith.io": "cloudsmith", + "ghcr.io": "gh", + } + }, + indent=2, + ) + + "\n", + encoding="utf-8", + ) + # Install launcher to custom_bin_dir + installer.install(bin_dir=str(custom_bin_dir)) + launcher = custom_bin_dir / "docker-credential-cloudsmith" + assert launcher.exists(), "Precondition: launcher must exist after install" + + # Uninstall — removes cloudsmith keys and launcher + installer.uninstall(bin_dir=str(custom_bin_dir)) + + cfg = json.loads(cfg_path.read_text()) + assert "docker.cloudsmith.io" not in cfg.get("credHelpers", {}) + assert cfg["credHelpers"]["ghcr.io"] == "gh" + assert not launcher.exists() + + +# --------------------------------------------------------------------------- +# 9. DockerInstaller.status — str-not-Path guard +# --------------------------------------------------------------------------- + + +def test_docker_installer_status_type_contract(tmp_path, monkeypatch): + """status()['launcher'] is str when installed and None when not — never a Path. + + Retained guard: the -F json Path-serialization regression. + """ + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + + installer = DockerInstaller() + + # Before install: launcher is None + with patch( + "cloudsmith_cli.credential_helpers.docker.installer.resolve_bin_dir", + return_value=bin_dir, + ): + result_before = installer.status() + + assert result_before["launcher"] is None + assert not isinstance(result_before["launcher"], Path) + + # After install: launcher is a non-None str + installer.install(bin_dir=str(bin_dir)) + with patch( + "cloudsmith_cli.credential_helpers.docker.installer.resolve_bin_dir", + return_value=bin_dir, + ): + result_after = installer.status() + + launcher = result_after["launcher"] + assert launcher is not None + assert isinstance( + launcher, str + ), f"status()['launcher'] must be str, got {type(launcher).__name__!r}" + assert launcher.endswith("docker-credential-cloudsmith") + assert not isinstance(launcher, Path) + + +# --------------------------------------------------------------------------- +# 10. autodiscovery +# --------------------------------------------------------------------------- + +_INSTALLER_GET_FORMAT_DOMAINS = ( + "cloudsmith_cli.credential_helpers.docker.installer.get_format_domains" +) + + +@pytest.mark.parametrize( + "scenario", + [ + "discovery_on", + "no_discover", + "missing_org", + "missing_api_key", + "discovery_raises", + ], +) +def test_autodiscovery(tmp_path, monkeypatch, scenario): + """Autodiscovery matrix: registered, skipped, defaults-only, or graceful failure. + + Retained guard: graceful-discovery-failure (exception → WARNING, no crash). + """ + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + monkeypatch.setenv("PATH", str(bin_dir)) + + if scenario == "discovery_on": + monkeypatch.setattr( + _INSTALLER_GET_FORMAT_DOMAINS, + lambda *_a, **_kw: ["docker.acme.com"], + ) + installer = DockerInstaller() + actions = installer.install( + bin_dir=str(bin_dir), discover=True, org="acme", api_key="k_test" + ) + cfg = json.loads((docker_dir / "config.json").read_text()) + assert cfg["credHelpers"]["docker.cloudsmith.io"] == "cloudsmith" + assert cfg["credHelpers"]["docker.acme.com"] == "cloudsmith" + assert any("discovered" in a and "1" in a for a in actions) + + elif scenario == "no_discover": + called = [] + + def _should_not_be_called(*_a, **_kw): + called.append(True) + return [] + + monkeypatch.setattr(_INSTALLER_GET_FORMAT_DOMAINS, _should_not_be_called) + installer = DockerInstaller() + installer.install( + bin_dir=str(bin_dir), discover=False, org="acme", api_key="k_test" + ) + assert not called, "get_format_domains must not be called when discover=False" + cfg = json.loads((docker_dir / "config.json").read_text()) + assert "docker.cloudsmith.io" in cfg["credHelpers"] + assert "docker.acme.com" not in cfg["credHelpers"] + + elif scenario in ("missing_org", "missing_api_key"): + called = [] + + def _should_not_be_called(*_a, **_kw): + called.append(True) + return [] + + monkeypatch.setattr(_INSTALLER_GET_FORMAT_DOMAINS, _should_not_be_called) + installer = DockerInstaller() + org = None if scenario == "missing_org" else "acme" + api_key = "k_test" if scenario == "missing_org" else None + installer.install(bin_dir=str(bin_dir), discover=True, org=org, api_key=api_key) + assert ( + not called + ), "get_format_domains must not be called when org/api_key absent" + cfg = json.loads((docker_dir / "config.json").read_text()) + assert cfg["credHelpers"]["docker.cloudsmith.io"] == "cloudsmith" + + else: # discovery_raises — graceful failure guard + + def _raise(*_a, **_kw): + raise RuntimeError("network down") + + monkeypatch.setattr(_INSTALLER_GET_FORMAT_DOMAINS, _raise) + installer = DockerInstaller() + # Must NOT raise + actions = installer.install( + bin_dir=str(bin_dir), discover=True, org="acme", api_key="k_test" + ) + cfg = json.loads((docker_dir / "config.json").read_text()) + assert cfg["credHelpers"]["docker.cloudsmith.io"] == "cloudsmith" + assert (bin_dir / "docker-credential-cloudsmith").exists() + warning_actions = [a for a in actions if a.startswith("WARNING")] + assert warning_actions, f"Expected a WARNING action, got: {actions}" + assert any("network down" in a for a in warning_actions) + + +# --------------------------------------------------------------------------- +# 11. --refresh +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("refresh", [False, True]) +def test_refresh_flag(tmp_path, monkeypatch, refresh): + """refresh=False uses the on-disk cache; refresh=True bypasses it and hits the API.""" + import time + + from ....credential_helpers.custom_domains import ( + CustomDomain, + get_cache_path, + get_custom_domains, + write_cache, + ) + + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + + cache_path = get_cache_path("acme") + cached_domain = CustomDomain( + host="docker.acme.com", backend_kind=6, enabled=True, validated=True + ) + write_cache(cache_path, [cached_domain]) + os.utime(cache_path, (time.time(), time.time())) + + fresh_domain = CustomDomain( + host="new.acme.com", backend_kind=6, enabled=True, validated=True + ) + api_calls = [] + + def _fake_list(*_a, **_kw): + api_calls.append(True) + return [ + { + "host": "new.acme.com", + "backend_kind": 6, + "enabled": True, + "validated": True, + } + ] + + with patch( + "cloudsmith_cli.credential_helpers.custom_domains.list_custom_domains", + _fake_list, + ): + result = get_custom_domains("acme", api_key="k", refresh=refresh) + + if refresh: + # API must have been called + assert api_calls, "API must be called when refresh=True" + assert result == [fresh_domain] + else: + # API must NOT have been called + assert ( + not api_calls + ), "API must not be called when refresh=False with valid cache" + assert result == [cached_domain] + + +# --------------------------------------------------------------------------- +# 12. manage CLI — unknown helper +# --------------------------------------------------------------------------- + + +def test_manage_cli_unknown_helper_exits_nonzero(runner): + """install/uninstall with an unknown helper name exits non-zero with a clear error.""" + from ....cli.commands.credential_helper.manage import install_cmd, uninstall_cmd + + for cmd in (install_cmd, uninstall_cmd): + result = runner.invoke(cmd, ["badhelper"]) + assert result.exit_code != 0 + + +# --------------------------------------------------------------------------- +# 13. manage CLI dry-run +# --------------------------------------------------------------------------- + + +def test_manage_cli_dry_run_exits_0(runner, tmp_path, monkeypatch): + """install docker --no-discover --dry-run exits 0.""" + monkeypatch.setenv("DOCKER_CONFIG", str(tmp_path / ".docker")) + + from ....cli.commands.credential_helper.manage import install_cmd + + result = runner.invoke( + install_cmd, + ["docker", "--no-discover", "--dry-run", "--bin-dir", str(tmp_path / "bin")], + ) + + assert result.exit_code == 0, result.output + assert "would" in result.output.lower() or "dry run" in result.output.lower() + + +# --------------------------------------------------------------------------- +# 14. PATH warning +# --------------------------------------------------------------------------- + + +def test_path_warning_when_bin_dir_not_on_path(tmp_path, monkeypatch): + """install returns a WARNING action when target bin_dir is not on PATH.""" + docker_dir = tmp_path / ".docker" + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + monkeypatch.setenv("PATH", "/usr/bin:/usr/local/bin") + + installer = DockerInstaller() + actions = installer.install(bin_dir=str(bin_dir)) + + warning_actions = [a for a in actions if a.startswith("WARNING")] + assert warning_actions, f"Expected a WARNING action, got: {actions}" + assert any("PATH" in a for a in warning_actions) + + +# --------------------------------------------------------------------------- +# 15. Unwritable dir → clean ClickException (no raw traceback) +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif( + os.name != "posix" or (hasattr(os, "geteuid") and os.geteuid() == 0), + reason="permission test only meaningful on POSIX as non-root", +) +def test_unwritable_bin_dir_gives_click_exception(runner, tmp_path, monkeypatch): + """install with an unwritable --bin-dir exits non-zero as ClickException/SystemExit, not raw OSError.""" + monkeypatch.setenv("DOCKER_CONFIG", str(tmp_path / ".docker")) + + from ....cli.commands.credential_helper.manage import install_cmd + + ro_dir = tmp_path / "readonly" + ro_dir.mkdir() + ro_dir.chmod(0o500) + + try: + result = runner.invoke(install_cmd, ["docker", "--bin-dir", str(ro_dir)]) + finally: + ro_dir.chmod(0o700) + + assert result.exit_code != 0 + assert not isinstance( + result.exception, OSError + ), f"Raw OSError escaped: {result.exception}" + + +# --------------------------------------------------------------------------- +# 16. -F output format +# --------------------------------------------------------------------------- + + +_STUB_STATUS = { + "launcher": "/some/bin/docker-credential-cloudsmith", + "hosts": ["docker.cloudsmith.io"], +} + + +def _stub_status_fn(_self): + return _STUB_STATUS + + +@pytest.mark.parametrize( + "cmd_name,cli_args,expected_helper,expect_dry_run_key", + [ + # install dry-run with -F json + ( + "install_cmd", + [ + "docker", + "--dry-run", + "--no-discover", + "--bin-dir", + "{bin_dir}", + "-F", + "json", + ], + "docker", + True, + ), + # uninstall dry-run with -F json + ( + "uninstall_cmd", + ["docker", "--dry-run", "-F", "json"], + "docker", + True, + ), + # list with -F json + ( + "list_cmd", + ["-F", "json"], + "docker", + False, + ), + ], +) +def test_output_format_json( + runner, + tmp_path, + monkeypatch, + cmd_name, + cli_args, + expected_helper, + expect_dry_run_key, +): + """-F json produces valid parseable JSON with expected top-level data shape. + + Retained guard: list -F json serialises a launcher path (str), not a Path object. + """ + monkeypatch.setenv("DOCKER_CONFIG", str(tmp_path / ".docker")) + monkeypatch.setattr(DockerInstaller, "status", _stub_status_fn) + + from ....cli.commands.credential_helper import manage as manage_mod + + cmd = getattr(manage_mod, cmd_name) + + # Replace {bin_dir} placeholder in args + resolved_args = [a.replace("{bin_dir}", str(tmp_path / "bin")) for a in cli_args] + + result = runner.invoke(cmd, resolved_args, catch_exceptions=False) + + assert result.exit_code == 0, result.output + # Pure JSON on stdout (no human text leaking before the JSON) + assert result.output.strip().startswith( + "{" + ), f"Output does not start with {{: {result.output[:100]!r}" + parsed = json.loads(result.output) + data = parsed["data"] + + if cmd_name == "list_cmd": + assert isinstance(data, list) + entry = next(e for e in data if e["helper"] == expected_helper) + assert "launcher" in entry + assert entry["launcher"] == "/some/bin/docker-credential-cloudsmith" + assert "hosts" in entry + else: + assert data["helper"] == expected_helper + assert isinstance(data["actions"], list) + assert isinstance(data["warnings"], list) + if expect_dry_run_key: + assert data["dry_run"] is True + + +def test_output_format_default_shows_human_text(runner, tmp_path, monkeypatch): + """Default (no -F) install dry-run shows human-readable text, not raw JSON.""" + monkeypatch.setenv("DOCKER_CONFIG", str(tmp_path / ".docker")) + + from ....cli.commands.credential_helper.manage import install_cmd + + result = runner.invoke( + install_cmd, + ["docker", "--dry-run", "--no-discover", "--bin-dir", str(tmp_path / "bin")], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.output + assert "dry run" in result.output.lower() or "would" in result.output.lower() + + +# --------------------------------------------------------------------------- +# 17. Malformed credHelpers robustness +# --------------------------------------------------------------------------- + + +def test_install_coerces_malformed_cred_helpers(tmp_path, monkeypatch): + """install coerces a non-dict credHelpers (list) rather than raising TypeError.""" + docker_dir = tmp_path / ".docker" + docker_dir.mkdir(parents=True) + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + + # Seed config with a malformed credHelpers value (list instead of dict) + (docker_dir / "config.json").write_text( + json.dumps({"credHelpers": ["not", "a", "dict"]}), + encoding="utf-8", + ) + + installer = DockerInstaller() + # Must not raise + installer.install(bin_dir=str(bin_dir), discover=False) + + cfg = json.loads((docker_dir / "config.json").read_text(encoding="utf-8")) + assert isinstance(cfg["credHelpers"], dict) + assert cfg["credHelpers"]["docker.cloudsmith.io"] == "cloudsmith" + + +def test_uninstall_tolerates_malformed_cred_helpers(tmp_path, monkeypatch): + """uninstall treats a non-dict credHelpers (string) as a no-op rather than raising.""" + docker_dir = tmp_path / ".docker" + docker_dir.mkdir(parents=True) + monkeypatch.setenv("DOCKER_CONFIG", str(docker_dir)) + bin_dir = tmp_path / "bin" + + # Seed config with a malformed credHelpers value (string instead of dict) + (docker_dir / "config.json").write_text( + json.dumps({"credHelpers": "garbage"}), + encoding="utf-8", + ) + + installer = DockerInstaller() + # Must not raise + installer.uninstall(bin_dir=str(bin_dir)) diff --git a/cloudsmith_cli/core/api/orgs.py b/cloudsmith_cli/core/api/orgs.py index 32077c8e..1b730401 100644 --- a/cloudsmith_cli/core/api/orgs.py +++ b/cloudsmith_cli/core/api/orgs.py @@ -13,6 +13,23 @@ def get_orgs_api(): return get_api_client(cloudsmith_api.OrgsApi) +def list_custom_domains(owner): + """List custom domains for an organization (raw SDK model dicts).""" + client = get_orgs_api() + + with catch_raise_api_exception(): + ( + domains, + _, + headers, + ) = client.orgs_custom_domains_list_with_http_info( # pylint: disable=no-member + org=owner + ) + + ratelimits.maybe_rate_limit(client, headers) + return [domain.to_dict() for domain in domains] + + def list_vulnerability_policies(owner, page, page_size): """List vulnerability policies in a namespace.""" client = get_orgs_api() diff --git a/cloudsmith_cli/core/cache_utils.py b/cloudsmith_cli/core/cache_utils.py new file mode 100644 index 00000000..c08e9a95 --- /dev/null +++ b/cloudsmith_cli/core/cache_utils.py @@ -0,0 +1,160 @@ +# Copyright 2026 Cloudsmith Ltd +"""Shared utilities for on-disk credential and cache storage.""" + +from __future__ import annotations + +import json +import os +import tempfile +from collections.abc import Callable +from typing import Any + + +def _atomic_write_text(dest: str, text: str, *, mode: int = 0o600) -> None: + """Atomically write *text* to *dest* using a sibling temp file. + + Caller is responsible for ensuring the parent directory exists. + """ + parent = os.path.dirname(dest) or "." + tmp_fd, tmp_path = tempfile.mkstemp(dir=parent, prefix=".tmp_", suffix=".json") + try: + with os.fdopen(tmp_fd, "w", encoding="utf-8") as f: + f.write(text) + f.flush() + os.fsync(f.fileno()) + os.chmod(tmp_path, mode) + os.replace(tmp_path, dest) + except (OSError, TypeError, ValueError): + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def atomic_write_json(path: str | os.PathLike, data: Any, *, mode: int = 0o600) -> None: + """Atomically write JSON to a file with restrictive permissions. + + Writes to a sibling temp file, fsyncs, sets mode, then renames over the + destination. Concurrent readers never see a partial file. Temp file is + removed on error. Caller is responsible for ensuring the parent directory + exists. + """ + dest = os.fspath(path) + _atomic_write_text(dest, json.dumps(data), mode=mode) + + +def merge_json_file( + path: str | os.PathLike, + mutate: Callable[[dict], None], + *, + backup: bool = True, + dry_run: bool = False, + mode: int = 0o600, +) -> bool: + """Read a JSON object file, apply *mutate* in place, and atomically write it back. + + Parameters + ---------- + path: + Path to the JSON file (e.g. ``~/.docker/config.json``). + mutate: + Callable that receives the loaded ``dict`` and modifies it in place. + It must not return a value; changes are applied to the dict directly. + backup: + When ``True`` (default) and the file already existed and content will + change, copy the prior file to ``{path}.bak`` before writing. + dry_run: + When ``True``, perform the read + mutate + change-detection but make + **no** writes (no temp file, no ``.bak``, no replace). + mode: + File-permission bits for the written file (default ``0o600``). + + Returns + ------- + bool + ``True`` if the file content changed (or would change under + ``dry_run``), ``False`` otherwise. + + Notes + ----- + * If the file is missing, empty, or does not parse as a JSON object + (``dict``), the starting value is ``{}``. + * Key order is preserved — ``sort_keys`` is **not** used. + * The on-disk form is ``json.dumps(data, indent=2, ensure_ascii=False) + "\\n"``. + * Parent directory is created (mode ``0o700``) if absent. + + Concurrency + ----------- + This is a single-writer, install-time helper (used by + ``credential-helper install/uninstall``). The read-modify-write is + last-writer-wins and is **NOT** safe against concurrent writers mutating + the same file; do not use it on a hot path. The atomic replace guarantees + the file is never left partially written, but concurrent merges can drop + each other's changes. + """ + dest = os.fspath(path) + + # ------------------------------------------------------------------ + # 1. Read existing content + # ------------------------------------------------------------------ + existing_text: str | None = None + try: + with open(dest, encoding="utf-8") as f: + existing_text = f.read() + except FileNotFoundError: + existing_text = None + + # ------------------------------------------------------------------ + # 2. Parse → dict (treat missing/empty/non-dict/malformed as {}) + # ------------------------------------------------------------------ + data: dict = {} + if existing_text: + try: + parsed = json.loads(existing_text) + if isinstance(parsed, dict): + data = parsed + except (json.JSONDecodeError, ValueError): + pass + + # ------------------------------------------------------------------ + # 3. Mutate in place + # ------------------------------------------------------------------ + mutate(data) + + # ------------------------------------------------------------------ + # 4. Stable serialisation + change detection + # ------------------------------------------------------------------ + new_text = json.dumps(data, indent=2, ensure_ascii=False) + "\n" + + if existing_text is not None: + # Normalise existing content for comparison: if the file already has + # the exact canonical form we produce, treat as no-change. + no_change = new_text == existing_text + else: + no_change = False # file didn't exist → always a change + + if no_change: + return False + + if dry_run: + return True + + # ------------------------------------------------------------------ + # 5. Ensure parent directory exists + # ------------------------------------------------------------------ + parent = os.path.dirname(dest) or "." + os.makedirs(parent, mode=0o700, exist_ok=True) + + # ------------------------------------------------------------------ + # 6. Backup (only when file existed and content changes) + # ------------------------------------------------------------------ + if backup and existing_text is not None: + _atomic_write_text(dest + ".bak", existing_text, mode=0o600) + + # ------------------------------------------------------------------ + # 7. Atomic write + # ------------------------------------------------------------------ + _atomic_write_text(dest, new_text, mode=mode) + + return True diff --git a/cloudsmith_cli/core/credentials/oidc/cache.py b/cloudsmith_cli/core/credentials/oidc/cache.py index 183a9615..a0892ca0 100644 --- a/cloudsmith_cli/core/credentials/oidc/cache.py +++ b/cloudsmith_cli/core/credentials/oidc/cache.py @@ -13,6 +13,8 @@ import os import time +from ...cache_utils import atomic_write_json + logger = logging.getLogger(__name__) EXPIRY_MARGIN_SECONDS = 60 @@ -187,9 +189,7 @@ def _store_on_disk(api_host: str, org: str, service_slug: str, data: dict) -> No cache_file = os.path.join(cache_dir, _cache_key(api_host, org, service_slug)) try: - fd = os.open(cache_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) - with os.fdopen(fd, "w") as f: - json.dump(data, f) + atomic_write_json(cache_file, data) logger.debug( "Stored OIDC token on disk (expires_at=%s)", data.get("expires_at") ) diff --git a/cloudsmith_cli/core/tests/test_cache_utils.py b/cloudsmith_cli/core/tests/test_cache_utils.py new file mode 100644 index 00000000..15c32369 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_cache_utils.py @@ -0,0 +1,375 @@ +# Copyright 2026 Cloudsmith Ltd +"""Tests for cloudsmith_cli.core.cache_utils.""" + +from __future__ import annotations + +import json +import os +import stat + +from cloudsmith_cli.core.cache_utils import atomic_write_json, merge_json_file + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _read_json(path: str) -> dict: + with open(path, encoding="utf-8") as f: + return json.load(f) + + +def _read_text(path: str) -> str: + with open(path, encoding="utf-8") as f: + return f.read() + + +def _perms(path: str) -> int: + return stat.S_IMODE(os.stat(path).st_mode) + + +# --------------------------------------------------------------------------- +# atomic_write_json — basic round-trip and permissions +# --------------------------------------------------------------------------- + + +class TestAtomicWriteJson: + def test_round_trip(self, tmp_path): + dest = str(tmp_path / "data.json") + data = {"key": "value", "nested": {"a": 1}} + atomic_write_json(dest, data) + assert _read_json(dest) == data + + def test_default_permissions(self, tmp_path): + dest = str(tmp_path / "data.json") + atomic_write_json(dest, {"x": 1}) + assert _perms(dest) == 0o600 + + def test_custom_permissions(self, tmp_path): + dest = str(tmp_path / "data.json") + atomic_write_json(dest, {"x": 1}, mode=0o644) + assert _perms(dest) == 0o644 + + def test_overwrites_existing(self, tmp_path): + dest = str(tmp_path / "data.json") + atomic_write_json(dest, {"v": 1}) + atomic_write_json(dest, {"v": 2}) + assert _read_json(dest) == {"v": 2} + + +# --------------------------------------------------------------------------- +# merge_json_file +# --------------------------------------------------------------------------- + + +def _add_cred_helper(host: str): + """Return a mutate function that adds a credHelpers entry.""" + + def mutate(data: dict) -> None: + data.setdefault("credHelpers", {})[host] = "cloudsmith" + + return mutate + + +class TestMergeJsonFileForeignKeyPreservation: + """Foreign-key preservation: only touched keys change.""" + + def test_existing_keys_preserved(self, tmp_path): + path = str(tmp_path / "config.json") + initial = { + "auths": {"registry.example.com": {"auth": "dXNlcjpwYXNz"}}, + "credHelpers": {"x.example.com": "y"}, + } + with open(path, "w", encoding="utf-8") as f: + json.dump(initial, f) + + changed = merge_json_file( + path, + _add_cred_helper("docker.cloudsmith.io"), + ) + + assert changed is True + result = _read_json(path) + assert result["auths"] == initial["auths"] + assert result["credHelpers"]["x.example.com"] == "y" + assert result["credHelpers"]["docker.cloudsmith.io"] == "cloudsmith" + + def test_key_order_not_sorted(self, tmp_path): + """Keys must stay in insertion order, not sorted.""" + path = str(tmp_path / "config.json") + # Write with z-first order via json module (which preserves insertion order) + with open(path, "w", encoding="utf-8") as f: + json.dump({"zzz": 1, "aaa": 2}, f) + + def noop(data: dict) -> None: + data["new_key"] = 3 + + merge_json_file(path, noop) + text = _read_text(path) + assert text.index('"zzz"') < text.index('"aaa"'), "Key order must be preserved" + + +class TestMergeJsonFileCreatesMissingFile: + """Creates a new file starting from {} when the file is absent.""" + + def test_creates_file_when_missing(self, tmp_path): + path = str(tmp_path / "subdir" / "config.json") + changed = merge_json_file(path, _add_cred_helper("docker.cloudsmith.io")) + assert changed is True + assert os.path.exists(path) + result = _read_json(path) + assert result == {"credHelpers": {"docker.cloudsmith.io": "cloudsmith"}} + + def test_creates_parent_directory(self, tmp_path): + path = str(tmp_path / "missing_dir" / "config.json") + assert not os.path.exists(os.path.dirname(path)) + merge_json_file(path, _add_cred_helper("x")) + assert os.path.isdir(os.path.dirname(path)) + + def test_parent_dir_permissions(self, tmp_path): + path = str(tmp_path / "newdir" / "config.json") + merge_json_file(path, _add_cred_helper("x")) + parent_perms = _perms(os.path.dirname(path)) + assert parent_perms == 0o700 + + def test_file_permissions_after_create(self, tmp_path): + path = str(tmp_path / "newdir" / "config.json") + merge_json_file(path, _add_cred_helper("x")) + assert _perms(path) == 0o600 + + +class TestMergeJsonFileBackup: + """Backup behaviour: .bak is created with prior content when writing.""" + + def test_backup_created_on_change(self, tmp_path): + path = str(tmp_path / "config.json") + initial = {"auths": {}} + with open(path, "w", encoding="utf-8") as f: + json.dump(initial, f) + + merge_json_file(path, _add_cred_helper("docker.cloudsmith.io")) + + bak_path = path + ".bak" + assert os.path.exists(bak_path), ".bak file should exist after a change" + assert _read_json(bak_path) == initial + + def test_no_backup_when_file_missing(self, tmp_path): + path = str(tmp_path / "config.json") + merge_json_file(path, _add_cred_helper("x")) + assert not os.path.exists(path + ".bak") + + def test_no_backup_when_no_change(self, tmp_path): + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8") as f: + f.write(json.dumps({"credHelpers": {"x": "cloudsmith"}}, indent=2) + "\n") + + def noop_already_set(data: dict) -> None: + data.setdefault("credHelpers", {})["x"] = "cloudsmith" + + changed = merge_json_file(path, noop_already_set) + assert changed is False + assert not os.path.exists(path + ".bak") + + def test_backup_is_mode_0o600_regardless_of_source_perms(self, tmp_path): + """The .bak must always be written 0o600 even when the source is 0o644.""" + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8") as f: + json.dump({"auths": {}}, f) + os.chmod(path, 0o644) + + merge_json_file(path, _add_cred_helper("docker.cloudsmith.io")) + + bak_path = path + ".bak" + assert os.path.exists(bak_path), ".bak must be created" + assert ( + _perms(bak_path) == 0o600 + ), f".bak perms should be 0o600, got {oct(_perms(bak_path))}" + + +class TestMergeJsonFileIdempotent: + """Running the same merge twice: second call returns False, no new .bak.""" + + def test_idempotent_returns_false_second_call(self, tmp_path): + path = str(tmp_path / "config.json") + mutate = _add_cred_helper("docker.cloudsmith.io") + + first = merge_json_file(path, mutate) + assert first is True + + second = merge_json_file(path, mutate) + assert second is False + + def test_idempotent_no_overwrite_bak(self, tmp_path): + path = str(tmp_path / "config.json") + initial = {"auths": {}} + with open(path, "w", encoding="utf-8") as f: + json.dump(initial, f) + + mutate = _add_cred_helper("docker.cloudsmith.io") + merge_json_file(path, mutate) # first: changes file, writes .bak + + bak_path = path + ".bak" + bak_mtime_after_first = os.path.getmtime(bak_path) + + merge_json_file(path, mutate) # second: no change + + bak_mtime_after_second = os.path.getmtime(bak_path) + assert ( + bak_mtime_after_first == bak_mtime_after_second + ), ".bak must not be refreshed" + + +class TestMergeJsonFileDryRun: + """dry_run=True: correct return value, no file written.""" + + def test_dry_run_returns_true_when_would_change(self, tmp_path): + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8") as f: + json.dump({}, f) + + result = merge_json_file(path, _add_cred_helper("x"), dry_run=True) + assert result is True + + def test_dry_run_file_unchanged(self, tmp_path): + path = str(tmp_path / "config.json") + original_text = json.dumps({}) + "\n" + # Write exactly the content we'll compare against + with open(path, "w", encoding="utf-8") as f: + f.write(json.dumps({"existing": True}, indent=2) + "\n") + original_text = _read_text(path) + + merge_json_file(path, _add_cred_helper("x"), dry_run=True) + + assert _read_text(path) == original_text, "dry_run must not modify the file" + + def test_dry_run_no_bak_created(self, tmp_path): + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8") as f: + json.dump({"existing": True}, f) + + merge_json_file(path, _add_cred_helper("x"), dry_run=True) + assert not os.path.exists(path + ".bak") + + def test_dry_run_returns_false_when_no_change(self, tmp_path): + path = str(tmp_path / "config.json") + # Pre-populate so mutate produces no change + with open(path, "w", encoding="utf-8") as f: + f.write(json.dumps({"credHelpers": {"x": "cloudsmith"}}, indent=2) + "\n") + + def already_set(data: dict) -> None: + data.setdefault("credHelpers", {})["x"] = "cloudsmith" + + result = merge_json_file(path, already_set, dry_run=True) + assert result is False + + def test_dry_run_missing_file_no_creation(self, tmp_path): + path = str(tmp_path / "ghost" / "config.json") + result = merge_json_file(path, _add_cred_helper("x"), dry_run=True) + assert result is True + assert not os.path.exists(path) + assert not os.path.exists(os.path.dirname(path)) + + +class TestMergeJsonFileMalformedInput: + """Malformed file → treated as {}.""" + + def test_malformed_json_treated_as_empty(self, tmp_path): + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8") as f: + f.write("not json") + + changed = merge_json_file(path, _add_cred_helper("docker.cloudsmith.io")) + assert changed is True + result = _read_json(path) + assert result == {"credHelpers": {"docker.cloudsmith.io": "cloudsmith"}} + + def test_empty_file_treated_as_empty_dict(self, tmp_path): + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8"): + pass # touch / create empty file + + merge_json_file(path, _add_cred_helper("x")) + result = _read_json(path) + assert "credHelpers" in result + + def test_json_array_treated_as_empty_dict(self, tmp_path): + path = str(tmp_path / "config.json") + with open(path, "w", encoding="utf-8") as f: + json.dump([1, 2, 3], f) + + merge_json_file(path, _add_cred_helper("x")) + result = _read_json(path) + assert isinstance(result, dict) + assert "credHelpers" in result + + +class TestMergeJsonFileStableSerialization: + """On-disk form is json.dumps(data, indent=2, ensure_ascii=False) + newline.""" + + def test_output_format(self, tmp_path): + path = str(tmp_path / "config.json") + merge_json_file(path, _add_cred_helper("docker.cloudsmith.io")) + text = _read_text(path) + expected = json.dumps( + {"credHelpers": {"docker.cloudsmith.io": "cloudsmith"}}, + indent=2, + ensure_ascii=False, + ) + assert text == expected + "\n" + + def test_trailing_newline(self, tmp_path): + path = str(tmp_path / "config.json") + merge_json_file(path, _add_cred_helper("x")) + text = _read_text(path) + assert text.endswith("\n") + + def test_non_ascii_host_raw_utf8_not_escaped(self, tmp_path): + """Non-ASCII chars must be written as raw UTF-8, not \\uXXXX escapes.""" + path = str(tmp_path / "config.json") + unicode_host = "café.docker.example.com" + mutate = _add_cred_helper(unicode_host) + + # First call: file is created (content changes → True) + first = merge_json_file(path, mutate) + assert first is True + + # The written file must contain the raw Unicode character + with open(path, "rb") as fh: + raw_bytes = fh.read() + assert "café".encode() in raw_bytes, "expected raw UTF-8, not \\uXXXX" + assert ( + b"\\u" not in raw_bytes + ), "must not use JSON unicode escapes for non-ASCII" + + # Second call: identical mutate → no change (idempotent) + bak_path = path + ".bak" + bak_mtime_before = ( + os.path.getmtime(bak_path) if os.path.exists(bak_path) else None + ) + + second = merge_json_file(path, mutate) + assert second is False + + # .bak must not have been touched on the no-op call + if bak_mtime_before is not None: + assert ( + os.path.getmtime(bak_path) == bak_mtime_before + ), ".bak must not refresh" + else: + assert not os.path.exists(bak_path), ".bak must not be created on no-op" + + +class TestMergeJsonFileReturnValue: + """Return True on change, False on no-change.""" + + def test_returns_true_on_actual_write(self, tmp_path): + path = str(tmp_path / "config.json") + result = merge_json_file(path, _add_cred_helper("x")) + assert result is True + + def test_returns_false_on_no_change(self, tmp_path): + path = str(tmp_path / "config.json") + mutate = _add_cred_helper("x") + merge_json_file(path, mutate) + result = merge_json_file(path, mutate) + assert result is False diff --git a/cloudsmith_cli/credential_helpers/__init__.py b/cloudsmith_cli/credential_helpers/__init__.py new file mode 100644 index 00000000..de2fb44d --- /dev/null +++ b/cloudsmith_cli/credential_helpers/__init__.py @@ -0,0 +1,7 @@ +# Copyright 2026 Cloudsmith Ltd +""" +Credential helpers for various package managers. + +This package provides credential helper implementations for Docker, pip, npm, etc. +Each helper follows its respective package manager's credential helper protocol. +""" diff --git a/cloudsmith_cli/credential_helpers/backends.py b/cloudsmith_cli/credential_helpers/backends.py new file mode 100644 index 00000000..d6c32b25 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/backends.py @@ -0,0 +1,41 @@ +# Copyright 2026 Cloudsmith Ltd +"""Backend-kind enumeration for credential helpers.""" + +from enum import IntEnum + + +class BackendKind(IntEnum): + """Mirror of the server-side BackendKind enum (cloudsmith/package/enums.py).""" + + DEB = 0 + RPM = 1 + RUBY = 2 + PYTHON = 3 + MAVEN = 4 + BOWER = 5 + DOCKER = 6 + RAW = 7 + CHOCOLATEY = 8 + NPM = 9 + NUGET = 10 + VAGRANT = 11 + COMPOSER = 12 + ALPINE = 13 + HELM = 14 + CONAN = 15 + CARGO = 16 + LUAROCKS = 17 + CRAN = 18 + GO = 19 + DART = 20 + COCOAPODS = 21 + TERRAFORM = 22 + P2 = 23 + CONDA = 24 + HEX = 25 + SWIFT = 26 + HUGGINGFACE = 27 + GENERIC = 28 + VSX = 29 + MCP = 30 + DEFAULT = 99 diff --git a/cloudsmith_cli/credential_helpers/common.py b/cloudsmith_cli/credential_helpers/common.py new file mode 100644 index 00000000..f7bcf85b --- /dev/null +++ b/cloudsmith_cli/credential_helpers/common.py @@ -0,0 +1,111 @@ +# Copyright 2026 Cloudsmith Ltd +""" +Shared utilities for credential helpers. + +Provides domain checking used by all credential helpers. +""" + +import logging +import os + +from .custom_domains import get_custom_domains, get_format_domains + +logger = logging.getLogger(__name__) + + +def extract_hostname(url): + """ + Extract bare hostname from any URL format. + + Handles protocols, sparse+ prefix, ports, paths, and trailing slashes. + + Args: + url: URL in any format (e.g., "sparse+https://cargo.cloudsmith.io/org/repo/") + + Returns: + str: Lowercase hostname (e.g., "cargo.cloudsmith.io") + """ + if not url: + return "" + + normalized = url.lower().strip() + + # Remove sparse+ prefix (Cargo) + if normalized.startswith("sparse+"): + normalized = normalized[7:] + + # Remove protocol + if "://" in normalized: + normalized = normalized.split("://", 1)[1] + + # Remove userinfo (user@host) + if "@" in normalized.split("/")[0]: + normalized = normalized.split("@", 1)[1] + + # Extract hostname (before first / or :) + hostname = normalized.split("/")[0].split(":")[0] + + return hostname + + +def is_cloudsmith_domain( + url, api_key=None, auth_type="api_key", api_host=None, backend_kind=None +): + """ + Check if a URL points to a Cloudsmith service. + + Checks standard *.cloudsmith.io domains first (no auth needed). + If not a standard domain, queries the Cloudsmith API for custom domains. + + Args: + url: URL or hostname to check + api_key: API key/token for authenticating custom domain lookups + auth_type: "api_key" (X-Api-Key header) or "bearer" (Authorization: Bearer) + api_host: Cloudsmith API host URL + backend_kind: If given, custom domains only match when their backend_kind + equals it (standard *.cloudsmith.io domains always match regardless). + When None (default), any enabled+validated custom domain matches. + + Returns: + bool: True if this is a Cloudsmith domain + """ + hostname = extract_hostname(url) + if not hostname: + return False + + # Standard Cloudsmith domains — no auth needed, always match regardless of backend_kind + if ( + hostname in ("cloudsmith.io", "cloudsmith.com") + or hostname.endswith(".cloudsmith.io") + or hostname.endswith(".cloudsmith.com") + ): + return True + + # Custom domains require org + auth + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + if not org: + return False + + if not api_key: + return False + + if backend_kind is not None: + hosts = { + host.lower() + for host in get_format_domains( + org, + backend_kind, + api_key=api_key, + auth_type=auth_type, + api_host=api_host, + ) + } + else: + hosts = { + d.host.lower() + for d in get_custom_domains( + org, api_key=api_key, auth_type=auth_type, api_host=api_host + ) + if d.enabled and d.validated + } + return hostname in hosts diff --git a/cloudsmith_cli/credential_helpers/custom_domains.py b/cloudsmith_cli/credential_helpers/custom_domains.py new file mode 100644 index 00000000..e854b2d0 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/custom_domains.py @@ -0,0 +1,281 @@ +# Copyright 2026 Cloudsmith Ltd +""" +Helper for discovering Cloudsmith custom domains. + +This module provides functions to fetch custom domains from the Cloudsmith API +for use in credential helpers. Results are cached on the filesystem. +""" + +import json +import logging +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +from ..cli.config import get_default_config_path +from ..core.api.exceptions import ApiException +from ..core.api.init import initialise_api +from ..core.api.orgs import list_custom_domains +from ..core.cache_utils import atomic_write_json +from ..core.credentials.models import CredentialResult + +logger = logging.getLogger(__name__) + +# Cache custom domains for 1 hour +CACHE_TTL_SECONDS = 3600 + + +@dataclass(frozen=True) +class CustomDomain: + """A structured Cloudsmith custom domain record.""" + + host: str + backend_kind: int | None + enabled: bool + validated: bool + + +def get_cache_dir() -> Path: + """ + Get the cache directory for custom domains. + """ + cache_dir = Path(get_default_config_path()) / "custom_domains_cache" + cache_dir.mkdir(mode=0o700, parents=True, exist_ok=True) + return cache_dir + + +def get_cache_path(org: str) -> Path: + """ + Get the cache file path for an organization's custom domains. + + Args: + org: Organization slug + + Returns: + Path to cache file + """ + cache_dir = get_cache_dir() + safe_org = "".join(c if c.isalnum() or c in "-_" else "_" for c in org) + return cache_dir / f"{safe_org}.json" + + +def is_cache_valid(cache_path: Path) -> bool: + """ + Check if a cache file exists and is still valid. + + Args: + cache_path: Path to cache file + + Returns: + bool: True if cache exists and hasn't expired + """ + if not cache_path.exists(): + return False + + try: + mtime = cache_path.stat().st_mtime + age = time.time() - mtime + return age < CACHE_TTL_SECONDS + except OSError: + return False + + +def read_cache(cache_path: Path) -> list[CustomDomain] | None: + """ + Read custom domains from cache file. + + Args: + cache_path: Path to cache file + + Returns: + List of CustomDomain records or None if cache invalid/missing + """ + if not is_cache_valid(cache_path): + return None + + try: + with open(cache_path, encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict) and "domains" in data: + domains = data["domains"] + if isinstance(domains, list): + # Detect legacy format: non-empty list of strings (old build stored + # domains as plain strings, not dicts). Treat as a cache miss so the + # caller re-fetches and rewrites in the current dict format. + if domains and not any(isinstance(d, dict) for d in domains): + logger.debug( + "Stale string-format cache detected at %s, treating as miss", + cache_path, + ) + return None + + records = [ + CustomDomain( + host=d["host"], + backend_kind=d.get("backend_kind"), + enabled=bool(d.get("enabled", False)), + validated=bool(d.get("validated", False)), + ) + for d in domains + if isinstance(d, dict) and d.get("host") + ] + logger.debug( + "Read %d domains from cache: %s", len(records), cache_path + ) + return records + except (OSError, json.JSONDecodeError) as exc: + logger.debug("Failed to read cache %s: %s", cache_path, exc) + + return None + + +def write_cache(cache_path: Path, domains: list[CustomDomain]) -> None: + """Write custom domains to cache file.""" + data = { + "domains": [ + { + "host": d.host, + "backend_kind": d.backend_kind, + "enabled": d.enabled, + "validated": d.validated, + } + for d in domains + ], + "cached_at": time.time(), + } + try: + atomic_write_json(cache_path, data) + logger.debug("Wrote %d domains to cache: %s", len(domains), cache_path) + except OSError as exc: + logger.debug("Failed to write cache %s: %s", cache_path, exc) + + +def get_custom_domains( # pylint: disable=too-many-return-statements + org: str, + *, + api_key: str | None = None, + auth_type: str = "api_key", + api_host: str | None = None, + refresh: bool = False, +) -> list[CustomDomain]: + """ + Fetch custom domains for a Cloudsmith organization. + + Results are cached on the filesystem for 1 hour to avoid excessive API calls. + + Args: + org: Organization slug + api_key: Optional API key/token for authentication + auth_type: "api_key" (uses X-Api-Key header) or "bearer" (uses Authorization: Bearer) + api_host: Cloudsmith API host URL (including version). Taken from the SDK + configuration default when not provided. + refresh: When ``True``, skip the cache read and always fetch from the API. + The fresh result is still written to the cache. + + Returns: + List of CustomDomain records. + Empty list if API call fails or org has no custom domains. + + Note: + Only ``ApiException`` is handled here (per-status). Network-layer errors + (DNS/timeout/SSL/urllib3) are intentionally NOT caught — they propagate to + the caller. The credential-helper protocol boundary (the click command) and + the installer are responsible for catching broadly and refusing gracefully, + so the library stays free of bare ``except Exception`` (reviewer feedback). + """ + cache_path = get_cache_path(org) + cached = None if refresh else read_cache(cache_path) + if cached is not None: + logger.debug("Using cached custom domains for %s", org) + return cached + + logger.debug("Fetching custom domains from API for %s", org) + + normalized_auth_type: Literal["api_key", "bearer"] = ( + "bearer" if auth_type == "bearer" else "api_key" + ) + credential = ( + CredentialResult( + api_key=api_key, + source_name="credential-helper", + auth_type=normalized_auth_type, + ) + if api_key + else None + ) + initialise_api(host=api_host, credential=credential) + + try: + raw_domains = list_custom_domains(org) + except ApiException as exc: + if exc.status in (401, 403): + # Don't cache auth failures - might work later once authenticated. + logger.debug( + "Custom domains API requires auth - assuming no custom domains for %s", + org, + ) + return [] + + if exc.status == 404: + # Cache empty result to avoid repeated lookups for a missing org. + logger.debug("Organization %s not found or has no custom domains", org) + write_cache(cache_path, []) + return [] + + if exc.status == 402: + # Custom domains product feature not enabled - treat as none. + logger.debug("Custom domains not enabled for %s", org) + write_cache(cache_path, []) + return [] + + logger.debug("Failed to fetch custom domains for %s: HTTP %s", org, exc.status) + return [] + + records = [ + CustomDomain( + host=d["host"], + backend_kind=d.get("backend_kind"), + enabled=bool(d.get("enabled", False)), + validated=bool(d.get("validated", False)), + ) + for d in raw_domains + if d.get("host") + ] + + logger.debug("Fetched %d custom domains for %s", len(records), org) + write_cache(cache_path, records) + return records + + +def get_format_domains( + org: str, + backend_kind: int, + *, + api_key: str | None = None, + auth_type: str = "api_key", + api_host: str | None = None, + refresh: bool = False, +) -> list[str]: + """ + Return enabled and validated custom domain hostnames for a specific backend format. + + Args: + org: Organization slug + backend_kind: BackendKind int value (e.g. BackendKind.DOCKER == 6) + api_key: Optional API key/token for authentication + auth_type: "api_key" or "bearer" + api_host: Cloudsmith API host URL + refresh: When ``True``, bypass the cache and fetch fresh data from the API. + + Returns: + List of hostnames that are enabled, validated, and match the given backend_kind. + """ + domains = get_custom_domains( + org, api_key=api_key, auth_type=auth_type, api_host=api_host, refresh=refresh + ) + return [ + d.host + for d in domains + if d.backend_kind == int(backend_kind) and d.enabled and d.validated + ] diff --git a/cloudsmith_cli/credential_helpers/docker/__init__.py b/cloudsmith_cli/credential_helpers/docker/__init__.py new file mode 100644 index 00000000..76414023 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2026 Cloudsmith Ltd +from .runtime import execute, get_credentials + +__all__ = ["execute", "get_credentials"] diff --git a/cloudsmith_cli/credential_helpers/docker/installer.py b/cloudsmith_cli/credential_helpers/docker/installer.py new file mode 100644 index 00000000..2609ed28 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/installer.py @@ -0,0 +1,331 @@ +# Copyright 2026 Cloudsmith Ltd +"""Installer for the Docker credential helper. + +Manages writing/removing the ``docker-credential-cloudsmith`` launcher and +patching ``~/.docker/config.json`` to enable the helper for Cloudsmith +registry hosts. +""" + +from __future__ import annotations + +import json +import logging +import os +from pathlib import Path + +from ...core.cache_utils import merge_json_file +from ..backends import BackendKind +from ..custom_domains import get_format_domains +from ..launchers import is_on_path, remove_launcher, resolve_bin_dir, write_launcher + +logger = logging.getLogger(__name__) + + +def _docker_config_path() -> Path: + """Return the path to the Docker client configuration file. + + Respects the ``DOCKER_CONFIG`` environment variable; otherwise returns + the platform default ``~/.docker/config.json``. + """ + docker_config_env = os.environ.get("DOCKER_CONFIG") + if docker_config_env: + return Path(docker_config_env) / "config.json" + return Path.home() / ".docker" / "config.json" + + +class DockerInstaller: + """Manages installation of the Docker credential helper for Cloudsmith. + + This installer writes a ``docker-credential-cloudsmith`` launcher binary + and patches ``~/.docker/config.json`` to route the configured registry + hosts through the Cloudsmith credential helper. + + Usage:: + + installer = DockerInstaller() + actions = installer.install(domains=["my-registry.example.com"]) + for action in actions: + print(action) + """ + + LAUNCHER_NAME = "docker-credential-cloudsmith" + TARGET_CMD = "cloudsmith credential-helper docker" + HELPER_VALUE = "cloudsmith" + DEFAULT_HOST = "docker.cloudsmith.io" + + name = "docker" + summary = "Docker credential helper for Cloudsmith registries" + + def install( + self, + *, + bin_dir: str | None = None, + domains: tuple[str, ...] = (), + discover: bool = True, + refresh: bool = False, + org: str | None = None, + api_key: str | None = None, + auth_type: str = "api_key", + api_host: str | None = None, + dry_run: bool = False, + ) -> list[str]: + """Install the Docker credential helper. + + Writes the launcher binary and registers Cloudsmith registry hosts in + ``~/.docker/config.json``. + + Parameters + ---------- + bin_dir: + Override for the directory to install the launcher. Defaults to + :func:`resolve_bin_dir` auto-detection. + domains: + Additional registry hostnames to configure (in addition to the + default ``docker.cloudsmith.io``). + discover: + When ``True`` (default), attempt to auto-discover Docker custom + domains via the Cloudsmith API. Discovery is best-effort and never + prevents the defaults from being registered. + refresh: + When ``True``, bypass the domain cache and fetch fresh data from + the API. Only meaningful when *discover* is also ``True``. + org: + Cloudsmith organisation slug used for custom-domain discovery. + api_key: + API key used for custom-domain discovery. + auth_type: + Credential type: ``"api_key"`` (default) or ``"bearer"``. + api_host: + Cloudsmith API host URL override. + dry_run: + When ``True``, compute and return planned actions without writing + any files. + + Returns + ------- + list[str] + Human-readable descriptions of actions taken (or planned, when + *dry_run* is ``True``). + """ + target_dir = resolve_bin_dir(bin_dir) + config_path = _docker_config_path() + + actions: list[str] = [] + + # Start with the default host plus any explicitly requested domains. + hosts: list[str] = [self.DEFAULT_HOST, *domains] + + # --- Custom-domain auto-discovery (best-effort) --- + if discover: + if org and api_key: + # Discovery boundary: network/SDK errors must never abort the + # default install. ApiException is already handled inside + # get_format_domains; this broad catch is the deliberate outer + # boundary (consistent with "boundary catches, library stays clean"). + # Note: BaseException subclasses (KeyboardInterrupt/SystemExit) + # intentionally propagate — they are not caught by `except Exception`. + try: + discovered = get_format_domains( + org, + BackendKind.DOCKER, + api_key=api_key, + auth_type=auth_type, + api_host=api_host, + refresh=refresh, + ) + except Exception as exc: # pylint: disable=broad-except + # Discovery is best-effort: never let it abort the install of + # the defaults. (Network/SDK errors degrade to a warning; + # ApiException is already handled inside.) + actions.append( + f"WARNING: custom-domain auto-discovery failed: {exc}" + ) + discovered = [] + new_hosts = [h for h in discovered if h not in hosts] + hosts.extend(discovered) + actions.append( + f"discovered {len(new_hosts)} new Docker custom domain(s)" + ) + else: + logger.debug( + "skipped auto-discovery" + " (no organization/credentials; pass --no-discover to silence)" + ) + + # De-duplicate while preserving order + seen: set[str] = set() + deduped: list[str] = [] + for h in hosts: + if h not in seen: + seen.add(h) + deduped.append(h) + hosts = deduped + + def mutate(config: dict) -> None: + helpers = config.get("credHelpers") + if not isinstance(helpers, dict): + helpers = config["credHelpers"] = {} + for host in hosts: + helpers[host] = self.HELPER_VALUE + + if dry_run: + if os.name == "nt": + launcher_path = target_dir / f"{self.LAUNCHER_NAME}.cmd" + else: + launcher_path = target_dir / self.LAUNCHER_NAME + actions.append(f"would write launcher {launcher_path}") + + would_change = merge_json_file(config_path, mutate, dry_run=True) + for host in hosts: + if would_change: + actions.append( + f"would set credHelpers[{host!r}]={self.HELPER_VALUE!r}" + f" in {config_path}" + ) + else: + actions.append( + f"credHelpers[{host!r}] already set" + f" in {config_path} (no change)" + ) + return actions + + # Real install + launcher_path = write_launcher(target_dir, self.LAUNCHER_NAME, self.TARGET_CMD) + actions.append(f"wrote launcher {launcher_path}") + + changed = merge_json_file(config_path, mutate) + if changed: + for host in hosts: + actions.append( + f"set credHelpers[{host!r}]={self.HELPER_VALUE!r}" + f" in {config_path}" + ) + else: + actions.append(f"config.json already up to date ({config_path})") + + if not is_on_path(target_dir): + actions.append( + f"WARNING: {target_dir} is not on PATH — " + "add it to your PATH so Docker can find docker-credential-cloudsmith" + ) + + return actions + + def uninstall( + self, *, bin_dir: str | None = None, dry_run: bool = False + ) -> list[str]: + """Uninstall the Docker credential helper. + + Removes the launcher binary and strips Cloudsmith-managed entries from + ``~/.docker/config.json``. + + Parameters + ---------- + bin_dir: + Override for the directory where the launcher was installed. + Defaults to :func:`resolve_bin_dir` auto-detection. Pass the same + value that was given to :meth:`install` so the correct launcher file + is found and removed. + dry_run: + When ``True``, return planned actions without writing any files. + + Returns + ------- + list[str] + Human-readable descriptions of actions taken (or planned). + """ + target_dir = resolve_bin_dir(bin_dir) + config_path = _docker_config_path() + + def mutate(config: dict) -> None: + helpers = config.get("credHelpers") + if not isinstance(helpers, dict): + return + removed = [k for k, v in helpers.items() if v == self.HELPER_VALUE] + for key in removed: + del helpers[key] + if removed and not helpers: + del config["credHelpers"] + + actions: list[str] = [] + + if os.name == "nt": + launcher_path = target_dir / f"{self.LAUNCHER_NAME}.cmd" + else: + launcher_path = target_dir / self.LAUNCHER_NAME + + if dry_run: + if launcher_path.exists(): + actions.append(f"would remove launcher {launcher_path}") + else: + actions.append( + f"launcher not found at {launcher_path} (nothing to remove)" + ) + + would_change = merge_json_file(config_path, mutate, dry_run=True) + if would_change: + actions.append( + f"would remove credHelpers entries with value" + f" {self.HELPER_VALUE!r} from {config_path}" + ) + else: + actions.append(f"no credHelpers entries to remove from {config_path}") + return actions + + # Real uninstall + removed = remove_launcher(target_dir, self.LAUNCHER_NAME) + if removed: + actions.append(f"removed launcher {launcher_path}") + else: + actions.append(f"launcher not found at {launcher_path} (nothing to remove)") + + changed = merge_json_file(config_path, mutate) + if changed: + actions.append( + f"removed credHelpers entries with value" + f" {self.HELPER_VALUE!r} from {config_path}" + ) + else: + actions.append(f"no credHelpers entries to remove from {config_path}") + + return actions + + def status(self) -> dict: + """Return current installation status. + + Returns + ------- + dict + A dict with keys: + + ``"launcher"`` + The :class:`~pathlib.Path` of the launcher if it exists, + else ``None``. + ``"hosts"`` + List of hostnames in ``config.json``'s ``credHelpers`` block + whose value equals ``"cloudsmith"``. + """ + target_dir = resolve_bin_dir() + if os.name == "nt": + launcher_path: Path | None = target_dir / f"{self.LAUNCHER_NAME}.cmd" + else: + launcher_path = target_dir / self.LAUNCHER_NAME + + if launcher_path is not None and not launcher_path.exists(): + launcher_path = None + + config_path = _docker_config_path() + hosts: list[str] = [] + if config_path.exists(): + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + if isinstance(data, dict): + helpers = data.get("credHelpers", {}) + hosts = [k for k, v in helpers.items() if v == self.HELPER_VALUE] + except (json.JSONDecodeError, OSError): + pass + + return { + "launcher": str(launcher_path) if launcher_path is not None else None, + "hosts": hosts, + } diff --git a/cloudsmith_cli/credential_helpers/docker/runtime.py b/cloudsmith_cli/credential_helpers/docker/runtime.py new file mode 100644 index 00000000..b078216c --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/runtime.py @@ -0,0 +1,117 @@ +# Copyright 2026 Cloudsmith Ltd +""" +Docker credential helper runtime. + +Transport-light protocol logic for the Docker credential helper protocol. +This module is intentionally free of Click/sys imports so it can be unit-tested +without invoking the CLI machinery. + +See: https://github.com/docker/docker-credential-helpers +""" + +import json +import logging + +from ..backends import BackendKind +from ..common import is_cloudsmith_domain + +logger = logging.getLogger(__name__) + +_REFUSAL_MESSAGE = ( + "Error: Unable to retrieve credentials. " + "Provide credentials via the CLOUDSMITH_API_KEY environment variable, " + "credentials.ini, the system keyring, or an OIDC service. " + "Verify current authentication with `cloudsmith whoami --verbose`." +) + + +def get_credentials(server_url, credential=None, api_host=None): + """ + Get credentials for a Cloudsmith Docker registry. + + Verifies the URL is a Cloudsmith registry (including custom domains) + and returns credentials if available. + + Args: + server_url: The Docker registry server URL + credential: Pre-resolved CredentialResult from the provider chain + api_host: Cloudsmith API host URL + + Returns: + dict: Credentials with 'Username' and 'Secret' keys, or None + """ + if not credential or not credential.api_key: + return None + + if not is_cloudsmith_domain( + server_url, + api_key=credential.api_key, + auth_type=getattr(credential, "auth_type", "api_key"), + api_host=api_host, + backend_kind=BackendKind.DOCKER, + ): + return None + + return {"Username": "token", "Secret": credential.api_key} + + +def _execute_get(stdin, credential, api_host) -> tuple[int, str | None, str | None]: + """Handle the 'get' operation of the Docker credential helper protocol.""" + try: + server_url = stdin.read().strip() + if not server_url: + return (1, None, "Error: No server URL provided on stdin") + + creds = get_credentials(server_url, credential=credential, api_host=api_host) + if creds is None: + return (1, None, _REFUSAL_MESSAGE) + + return (0, json.dumps(creds), None) + except Exception as exc: # pylint: disable=broad-except + # Protocol boundary: a credential helper must never crash `docker pull`/`push`. + # Covers: broken-pipe OSError from stdin.read(), network/SDK errors from + # get_credentials, and TypeError from json.dumps — all degrade to a clean + # refusal (exit 1), not a traceback. + # This is the ONLY intentional broad except in this feature. + # (Exception does not catch KeyboardInterrupt/SystemExit, which is correct.) + logger.debug("docker credential-helper get failed: %s", exc, exc_info=True) + return (1, None, _REFUSAL_MESSAGE) + + +def execute( + operation, stdin, credential=None, api_host=None +) -> tuple[int, str | None, str | None]: + """ + Execute a Docker credential helper protocol operation. + + Args: + operation: One of 'get', 'store', 'erase', 'list' + stdin: A file-like object to read the server URL from (for 'get') + credential: Pre-resolved CredentialResult from the provider chain + api_host: Cloudsmith API host URL + + Returns: + A (exit_code, stdout_text, stderr_text) tuple. Either text value may + be None if there is nothing to write to that stream. + """ + if operation in ("store", "erase"): + # Drain stdin to keep Docker happy; guard against tty/pipe errors. + try: + if not stdin.isatty(): + stdin.read() + except (OSError, ValueError, AttributeError): + pass + return (0, None, None) + + if operation == "list": + return (0, "{}", None) + + if operation == "get": + return _execute_get(stdin, credential, api_host) + + return ( + 1, + None, + f"Error: Unknown operation '{operation}'. " + "Valid operations: get, store, erase, list", + ) diff --git a/cloudsmith_cli/credential_helpers/launchers.py b/cloudsmith_cli/credential_helpers/launchers.py new file mode 100644 index 00000000..acf978ef --- /dev/null +++ b/cloudsmith_cli/credential_helpers/launchers.py @@ -0,0 +1,175 @@ +# Copyright 2026 Cloudsmith Ltd +"""Launcher writer/remover for credential-helper on-PATH binaries. + +Creates a thin shell script (Unix) or .cmd batch file (Windows) named +``docker-credential-cloudsmith`` (or similar) that forwards every call to the +single ``cloudsmith`` binary already installed on the user's PATH. + +The platform-specific bits (file name, script body, user bin directory) live in +small pure helpers parameterised on ``windows`` so they can be unit-tested +without monkeypatching ``os.name`` — faking ``os.name`` on a posix host makes +``pathlib.Path`` raise ``NotImplementedError`` on Python < 3.12. +""" + +from __future__ import annotations + +import os +import shutil +import sys +from pathlib import Path + + +def _is_windows() -> bool: + """Return True when running on Windows.""" + return os.name == "nt" + + +def _launcher_filename(name: str, *, windows: bool) -> str: + """Return the launcher file name for the platform (``.cmd`` on Windows).""" + return f"{name}.cmd" if windows else name + + +def _launcher_content(target_cmd: str, *, windows: bool) -> str: + """Return the launcher script body for the platform. + + Windows uses a ``.cmd`` batch file (``@echo off`` keeps stdout clean for + Docker's credential JSON); Unix uses a ``#!/bin/sh`` script that ``exec``s + the target so signals and the exit code pass straight through. + """ + if windows: + return f"@echo off\r\n{target_cmd} %*\r\n" + return f'#!/bin/sh\nexec {target_cmd} "$@"\n' + + +def _user_bin_dir(windows: bool) -> Path: + """Return the user-local bin directory for the platform. + + Unix → ``~/.local/bin``; Windows → ``%LOCALAPPDATA%\\Cloudsmith\\bin`` + (falling back to the home directory when ``LOCALAPPDATA`` is unset). + """ + if windows: + localappdata = os.environ.get("LOCALAPPDATA") + base = Path(localappdata) if localappdata else Path.home() + return base / "Cloudsmith" / "bin" + return Path.home() / ".local" / "bin" + + +def write_launcher(bin_dir: Path, name: str, target_cmd: str) -> Path: + """Write a launcher script for *name* in *bin_dir* that execs *target_cmd*. + + Parameters + ---------- + bin_dir: + Directory in which to create the launcher. Created if absent. + name: + Base name of the helper binary (e.g. ``docker-credential-cloudsmith``). + target_cmd: + The command the launcher forwards to (e.g. + ``cloudsmith credential-helper docker``). + + Returns + ------- + Path + The path of the written file. + """ + windows = _is_windows() + bin_dir = Path(bin_dir) + bin_dir.mkdir(parents=True, exist_ok=True) + + dest = bin_dir / _launcher_filename(name, windows=windows) + dest.write_text( + _launcher_content(target_cmd, windows=windows), encoding="utf-8", newline="" + ) + if not windows: + dest.chmod(0o755) + + return dest + + +def remove_launcher(bin_dir: Path, name: str) -> bool: + """Remove a launcher previously created by :func:`write_launcher`. + + Parameters + ---------- + bin_dir: + Directory that contains (or contained) the launcher. + name: + Base name of the helper binary (without extension). + + Returns + ------- + bool + ``True`` if a file was removed, ``False`` if no file was found. + """ + target = Path(bin_dir) / _launcher_filename(name, windows=_is_windows()) + + if target.exists(): + target.unlink() + return True + return False + + +def resolve_bin_dir(override: str | None = None) -> Path: + """Determine the best directory in which to place a launcher. + + Resolution order + ---------------- + 1. *override* → ``Path(override)``. + 2. The directory of the running ``cloudsmith`` executable — if that + directory is writable. + 3. The user-local bin directory (see :func:`_user_bin_dir`). + + The chosen directory is **not** created here; that happens when the + launcher is written via :func:`write_launcher`. + + Parameters + ---------- + override: + Explicit path supplied by the caller (e.g. ``--bin-dir`` CLI option). + + Returns + ------- + Path + The resolved directory. + """ + if override is not None: + return Path(override) + + # Option 2: beside the running cloudsmith binary (if writable) + cloudsmith_path = shutil.which("cloudsmith") + if cloudsmith_path: + candidate = Path(os.path.dirname(os.path.realpath(cloudsmith_path))) + else: + candidate = Path(os.path.dirname(os.path.realpath(sys.argv[0]))) + + if os.access(candidate, os.W_OK | os.X_OK): + return candidate + + # Option 3: user-local bin + return _user_bin_dir(_is_windows()) + + +def is_on_path(directory: Path) -> bool: + """Return True if *directory* is an entry in the current ``$PATH``. + + Comparison is case-insensitive on Windows (``os.path.normcase``) and + normalised via ``os.path.normpath`` on all platforms. + + Parameters + ---------- + directory: + The directory to check. + + Returns + ------- + bool + ``True`` if *directory* appears in ``$PATH``. + """ + needle = os.path.normcase(os.path.normpath(str(directory))) + path_env = os.environ.get("PATH", "") + for entry in path_env.split(os.pathsep): + if not entry: + continue + if os.path.normcase(os.path.normpath(entry)) == needle: + return True + return False diff --git a/requirements.txt b/requirements.txt index 1b192af2..28fce239 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,7 +45,7 @@ click-didyoumean==0.3.1 # via cloudsmith-cli (setup.py) click-spinner==0.1.10 # via cloudsmith-cli (setup.py) -cloudsmith-api==2.0.25 +cloudsmith-api==2.0.27 # via cloudsmith-cli (setup.py) configparser==7.2.0 # via click-configfile diff --git a/setup.py b/setup.py index 83dd3034..3a2800a0 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,9 @@ def get_long_description(): ], }, entry_points={ - "console_scripts": ["cloudsmith=cloudsmith_cli.cli.commands.main:main"] + "console_scripts": [ + "cloudsmith=cloudsmith_cli.cli.commands.main:main", + ] }, keywords=["cloudsmith", "cli", "devops"], classifiers=[