Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions datamasque/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@
FileOrContent = Union[str, bytes, TextIOBase, BufferedIOBase, Path]
_T = TypeVar("_T", bound=BaseModel)


def _build_session(verify_ssl: bool) -> requests.Session:
"""
Build a configured `requests.Session` for one client's lifetime.

Centralises the `verify` default so every call site inherits it
automatically — keeping the per-call code free of boilerplate and removing
the risk of forgetting the flag on a new endpoint.
"""

session = requests.Session()
session.verify = verify_ssl
return session


# Substrings (case-insensitive) that mark a key whose value should be redacted
# before logging on an error path, so that passwords, API tokens, and similar secrets don't
# end up in user-visible logs when a request fails.
Expand Down Expand Up @@ -71,6 +86,15 @@ class BaseClient:

Holds the connection config, cached auth token, and the core `make_request` dispatcher
used by all per-feature mixins that compose `DataMasqueClient`.

Uses a single `requests.Session` for the lifetime of the client so that
per-host TCP / TLS connections are pooled across calls (paginated list
endpoints and tight polling loops benefit most). Session-wide defaults
(`verify`) are set once on construction; per-call headers like
`Authorization` are merged at request time.

`requests.Session` is not thread-safe; do not share a client between
threads. Construct one per worker.
"""

token: str = ""
Expand All @@ -86,6 +110,7 @@ def __init__(self, connection_config: DataMasqueInstanceConfig) -> None:
self.password = connection_config.password
self.verify_ssl = connection_config.verify_ssl
self.token_source = connection_config.token_source
self._session = _build_session(self.verify_ssl)

@contextmanager
def _maybe_suppress_insecure_warning(self) -> Iterator[None]:
Expand Down Expand Up @@ -186,28 +211,32 @@ def make_request(
url = urljoin(self.base_url, path)

def send() -> Response:
headers: Optional[dict] = {"Authorization": self.token} if requires_authorization else None
headers = {"Authorization": self.token} if requires_authorization else None
try:
with self._maybe_suppress_insecure_warning():
if files:
files_payload = {f.field_name: (f.filename, f.content, f.content_type or "") for f in files}
return requests.request(
return self._session.request(
method,
url,
data=data,
params=params,
headers=headers,
files=files_payload,
verify=self.verify_ssl,
)
return requests.request(
method, url, json=data, params=params, headers=headers, verify=self.verify_ssl
)
return self._session.request(method, url, json=data, params=params, headers=headers)
except requests.RequestException as e:
raise DataMasqueTransportError(f"Failed to reach DataMasque server at {url}: {e}") from e

response = send()
if response.status_code == 401:
if response.status_code == 401 and requires_authorization:
# Token-expiry recovery: re-auth and replay. Only meaningful when the
# caller actually sent a token; on `requires_authorization=False`
# calls a 401 means the server itself is rejecting anonymous access
# (e.g. admin-install on an already-configured instance), and
# re-authing with whatever creds the client happens to hold would
# both misdiagnose the failure and emit a misleading
# "credentials are incorrect" error to the user.
logger.debug("Re-authenticating")
self.authenticate()
# Reset file pointers so the retry doesn't send empty files
Expand Down
14 changes: 7 additions & 7 deletions datamasque/client/ifm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pydantic import BaseModel
from requests import Response

from datamasque.client.base import suppress_insecure_warning_if_needed
from datamasque.client.base import _build_session, suppress_insecure_warning_if_needed
from datamasque.client.exceptions import (
DataMasqueApiError,
DataMasqueNotReadyError,
Expand Down Expand Up @@ -82,6 +82,9 @@ def __init__(self, connection_config: DataMasqueIfmInstanceConfig) -> None:
self.password = connection_config.password
self.verify_ssl = connection_config.verify_ssl
self.token_source = connection_config.token_source
# One session for both admin-server (JWT login/refresh) and IFM (data plane)
# traffic -- different hosts, but a single session handles per-host pooling.
self._session = _build_session(self.verify_ssl)

def authenticate(self) -> None:
"""Obtain an access (and refresh) token from the admin server, or via `token_source`."""
Expand All @@ -95,10 +98,9 @@ def authenticate(self) -> None:
login_url = urljoin(self.admin_server_base_url, "/api/auth/jwt/login/")
try:
with self._maybe_suppress_insecure_warning():
response = requests.post(
response = self._session.post(
login_url,
json={"username": self.username, "password": self.password},
verify=self.verify_ssl,
)
except requests.RequestException as e:
raise DataMasqueTransportError(f"Failed to reach admin server at {login_url}: {e}") from e
Expand All @@ -122,10 +124,9 @@ def _refresh_or_reauth(self) -> None:
refresh_url = urljoin(self.admin_server_base_url, "/api/auth/jwt/refresh/")
try:
with self._maybe_suppress_insecure_warning():
response = requests.post(
response = self._session.post(
refresh_url,
json={"refresh": self.refresh_token},
verify=self.verify_ssl,
)
except requests.RequestException as e:
raise DataMasqueTransportError(f"Failed to reach admin server at {refresh_url}: {e}") from e
Expand Down Expand Up @@ -187,13 +188,12 @@ def _make_request(
def send() -> Response:
try:
with self._maybe_suppress_insecure_warning():
return requests.request(
return self._session.request(
method,
url,
json=json_body,
params=params,
headers={"Authorization": f"Bearer {self.access_token}"},
verify=self.verify_ssl,
)
except requests.RequestException as e:
raise DataMasqueTransportError(f"Failed to reach IFM server at {url}: {e}") from e
Expand Down
58 changes: 40 additions & 18 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ def test_healthcheck_transport_failure(client):


@pytest.mark.parametrize("verify_ssl", [True, False])
def test_make_request_verify_ssl_true_by_default(config, verify_ssl):
"""Verifies SSL setting is passed through to the `requests` call."""
def test_session_verify_reflects_config(config, verify_ssl):
"""
`verify_ssl` is applied to the client's `requests.Session` once at construction.

Every outgoing call then inherits it without per-call boilerplate.
"""
config_with_ssl = DataMasqueInstanceConfig(
base_url=config.base_url,
username=config.username,
Expand All @@ -84,24 +88,14 @@ def test_make_request_verify_ssl_true_by_default(config, verify_ssl):
)
client = DataMasqueClient(config_with_ssl)

with patch(
"datamasque.client.base.requests.request",
return_value=make_ok_response(),
) as mock_request:
client.make_request("GET", "/api/test/")

_, kwargs = mock_request.call_args
assert kwargs["verify"] is verify_ssl
assert client._session.verify is verify_ssl


def test_make_request_verify_ssl_true_does_not_touch_global_warning_filter(client):
"""With `verify_ssl=True`, the client should not modify `warnings.filters`."""
filters_before = list(warnings.filters)

with patch(
"datamasque.client.base.requests.request",
return_value=make_ok_response(),
):
with patch.object(client._session, "request", return_value=make_ok_response()):
client.make_request("GET", "/api/test/")

assert warnings.filters == filters_before
Expand All @@ -125,10 +119,7 @@ def raise_insecure_warning_then_respond(*_args, **_kwargs):

with warnings.catch_warnings(record=True) as captured:
warnings.simplefilter("always") # ensure we'd otherwise see the warning
with patch(
"datamasque.client.base.requests.request",
side_effect=raise_insecure_warning_then_respond,
):
with patch.object(client._session, "request", side_effect=raise_insecure_warning_then_respond):
client.make_request("GET", "/api/test/")

# The warning raised inside the request call was suppressed by the client.
Expand Down Expand Up @@ -289,6 +280,37 @@ def test_token_source_called_again_on_401_retry():
assert client.token == "Token t2"


def test_401_does_not_retry_when_requires_authorization_is_false(client):
"""
A 401 on an anonymous request must surface as-is, not trigger a re-auth retry.

`/api/users/admin-install/` returns 401 once any user exists -- the endpoint
is gated on "no user has been created yet" and DRF treats it as a normal
auth-required endpoint thereafter. Re-authing on that 401 would both
misdiagnose the failure ("login credentials are correct") and waste a
round-trip on a call the caller said doesn't need auth.
"""
with requests_mock.Mocker() as m:
m.post(
"http://test-server/api/users/admin-install/",
status_code=401,
json={"detail": "Authentication credentials were not provided."},
)

with pytest.raises(DataMasqueApiError) as excinfo:
client.make_request(
"POST",
"/api/users/admin-install/",
data={"email": "x@y", "username": "x", "password": "p", "re_password": "p", "allowed_hosts": []},
requires_authorization=False,
)

assert excinfo.value.response.status_code == 401
# Exactly one request: no re-auth roundtrip to /api/auth/token/login/ and no replay.
assert m.call_count == 1
assert m.request_history[0].path == "/api/users/admin-install/"


def test_token_source_callable_exception_propagates():
"""Errors from `token_source` are surfaced to the caller, not swallowed."""

Expand Down
Loading