Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .codespellignorelines
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
<pre><code>Code block\ndoes not\nrespect\nnewlines\n</code></pre>
"trough",
assert "task_instance_id" in route.dependant.path_param_names, (
assert "connection_test_id" in route.dependant.path_param_names, (
37 changes: 37 additions & 0 deletions airflow-core/docs/howto/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,43 @@ will be disabled (if you are testing in the UI).
Airflow CLI) have different libs or providers installed, test results *might* differ.


Asynchronous (worker-dispatched) connection testing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The test described above runs on the API server. Airflow can also run the test on a worker instead, so
that connection credentials are only ever used on a worker -- the same place your tasks use them -- rather
than on the API server. This is useful when a connection is only reachable from workers, or when you would
rather not have credentials exercised on the API server.

This uses the same ``[core] test_connection`` enablement flag as the synchronous test and is driven through
the :doc:`Connections REST API </stable-rest-api-ref/>`: submit a test with ``POST /connections/enqueue-test``
(which returns a token), then poll for the result with ``GET /connections/enqueue-test``, passing the token
in the ``Airflow-Connection-Test-Token`` header. A result can only be read back with that token, and only by
users authorized for the connection; in multi-team deployments a test for a connection owned by another team
is not visible to you.

The worker that runs the test is authorized separately from the polling token above. The scheduler issues the
worker a short-lived JWT for that single request, whose subject is the connection-test request id and whose
scope is ``workload``. The Execution API endpoints the worker calls enforce a ``ct:self`` check -- the token
subject must match the connection-test id in the request path -- so the worker's token can only fetch the
connection for, and report the result of, the one request it was issued for; it cannot reach other connection
tests, task instances, or connections.

The behaviour is tuned under the :ref:`[connection_test] <config:connection_test>` section:

* :ref:`connection_test.timeout <config:connection_test__timeout>` -- how long a test may run before it is
reported as timed out (default ``60`` seconds).
* :ref:`connection_test.max_concurrency <config:connection_test__max_concurrency>` -- how many tests may run
at once; further requests wait until a slot is free (default ``4``).
* :ref:`connection_test.reaper_interval <config:connection_test__reaper_interval>` -- how often Airflow
checks for and fails tests that exceeded their timeout (default ``30.0`` seconds).

.. note::

Because the test runs on a worker, its result reflects the libraries, providers and network access
available to that worker, which may differ from the API server.


Custom connection types
^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
5 changes: 4 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``8812eb67b63c`` (head) | ``acc215baed80`` | ``3.3.0`` | Change Deadline interval to JSON. |
| ``a7e6d4c3b2f1`` (head) | ``8812eb67b63c`` | ``3.3.0`` | Add connection_test_request table for the deferred |
| | | | connection-test workflow. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``8812eb67b63c`` | ``acc215baed80`` | ``3.3.0`` | Change Deadline interval to JSON. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``acc215baed80`` | ``a1b2c3d4e5f6`` | ``3.3.0`` | Add team_name to trigger table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/62343.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add worker-dispatched (asynchronous) connection testing: connection tests can run on a worker instead of in-process on the API server (so credentials are exercised on a worker), submitted via ``POST /connections/enqueue-test`` (which returns a token) and polled via ``GET /connections/enqueue-test`` with the token in the ``Airflow-Connection-Test-Token`` header, configured under the new ``[connection_test]`` section (``timeout``, ``max_concurrency``, ``reaper_interval``); the synchronous ``POST /connections/test`` endpoint is unchanged.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import json
from collections.abc import Iterable, Mapping
from datetime import datetime
from typing import Annotated, Any

from pydantic import Field, field_validator, model_validator
Expand Down Expand Up @@ -78,12 +79,30 @@ class ConnectionCollectionResponse(BaseModel):


class ConnectionTestResponse(BaseModel):
"""Connection Test serializer for responses."""
"""Connection Test serializer for synchronous test responses."""

status: bool
message: str


class ConnectionTestQueuedResponse(BaseModel):
"""Response returned when a connection test has been enqueued for worker execution."""

token: str
connection_id: str
state: str


class AsyncConnectionTestResponse(BaseModel):
"""Response returned when polling for the status of an enqueued connection test."""

token: str
connection_id: str
state: str
result_message: str | None = None
created_at: datetime


class ConnectionHookFieldBehavior(BaseModel):
"""A class to store the behavior of each standard field of a Hook."""

Expand Down Expand Up @@ -210,3 +229,26 @@ def validate_team_name(self) -> ConnectionBody:


ConnectionBodyPartial = make_partial_model(ConnectionBody)


class ConnectionTestRequestBody(ConnectionBody):
"""
Request body for enqueueing a connection test on a worker.

Inherits ``connection_id`` pattern, ``extra`` JSON validation, and
``team_name`` handling from ``ConnectionBody`` so tested connections share
the same input contract as persisted ones.
"""

commit_on_success: bool = Field(
default=False,
description="If True, save or update the connection in the connection table when the test succeeds.",
)
executor: str | None = Field(
default=None,
description="Executor name to dispatch the connection test to.",
)
queue: str | None = Field(
default=None,
description="Worker queue to route the connection test to (executor-dependent).",
)
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,102 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/connections/enqueue-test:
get:
tags:
- Connection
summary: Get Connection Test
description: Poll for the status of an enqueued connection test by its token
(passed as a header).
operationId: get_connection_test
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: Airflow-Connection-Test-Token
in: header
required: true
schema:
type: string
title: Airflow-Connection-Test-Token
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/AsyncConnectionTestResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
post:
tags:
- Connection
summary: Enqueue Connection Test
description: Enqueue a connection test for deferred execution on a worker; returns
a polling token.
operationId: enqueue_connection_test
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionTestRequestBody'
responses:
'202':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionTestQueuedResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/api/v2/connections:
get:
tags:
Expand Down Expand Up @@ -11561,6 +11657,35 @@ components:
- created_date
title: AssetWatcherResponse
description: Asset watcher serializer for responses.
AsyncConnectionTestResponse:
properties:
token:
type: string
title: Token
connection_id:
type: string
title: Connection Id
state:
type: string
title: State
result_message:
anyOf:
- type: string
- type: 'null'
title: Result Message
created_at:
type: string
format: date-time
title: Created At
type: object
required:
- token
- connection_id
- state
- created_at
title: AsyncConnectionTestResponse
description: Response returned when polling for the status of an enqueued connection
test.
BackfillCollectionResponse:
properties:
backfills:
Expand Down Expand Up @@ -12652,6 +12777,108 @@ components:
- team_name
title: ConnectionResponse
description: Connection serializer for responses.
ConnectionTestQueuedResponse:
properties:
token:
type: string
title: Token
connection_id:
type: string
title: Connection Id
state:
type: string
title: State
type: object
required:
- token
- connection_id
- state
title: ConnectionTestQueuedResponse
description: Response returned when a connection test has been enqueued for
worker execution.
ConnectionTestRequestBody:
properties:
connection_id:
type: string
maxLength: 200
pattern: ^[\w.-]+$
title: Connection Id
conn_type:
type: string
title: Conn Type
description:
anyOf:
- type: string
- type: 'null'
title: Description
host:
anyOf:
- type: string
- type: 'null'
title: Host
login:
anyOf:
- type: string
- type: 'null'
title: Login
schema:
anyOf:
- type: string
- type: 'null'
title: Schema
port:
anyOf:
- type: integer
- type: 'null'
title: Port
password:
anyOf:
- type: string
- type: 'null'
title: Password
extra:
anyOf:
- type: string
- type: 'null'
title: Extra
team_name:
anyOf:
- type: string
maxLength: 50
- type: 'null'
title: Team Name
commit_on_success:
type: boolean
title: Commit On Success
description: If True, save or update the connection in the connection table
when the test succeeds.
default: false
executor:
anyOf:
- type: string
- type: 'null'
title: Executor
description: Executor name to dispatch the connection test to.
queue:
anyOf:
- type: string
- type: 'null'
title: Queue
description: Worker queue to route the connection test to (executor-dependent).
additionalProperties: false
type: object
required:
- connection_id
- conn_type
title: ConnectionTestRequestBody
description: 'Request body for enqueueing a connection test on a worker.


Inherits ``connection_id`` pattern, ``extra`` JSON validation, and

``team_name`` handling from ``ConnectionBody`` so tested connections share

the same input contract as persisted ones.'
ConnectionTestResponse:
properties:
status:
Expand All @@ -12665,7 +12892,7 @@ components:
- status
- message
title: ConnectionTestResponse
description: Connection Test serializer for responses.
description: Connection Test serializer for synchronous test responses.
CreateAssetEventsBody:
properties:
asset_id:
Expand Down
Loading
Loading