-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathtesting.py
More file actions
530 lines (436 loc) · 19.7 KB
/
testing.py
File metadata and controls
530 lines (436 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
"""
Public testing utilities.
See also _lib._testing for additional private testing utilities.
"""
from __future__ import annotations
import contextlib
import enum
import warnings
from collections.abc import Callable, Generator, Iterator, Sequence
from functools import update_wrapper, wraps
from inspect import getattr_static
from types import FunctionType, ModuleType
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
from ._lib._utils._compat import is_dask_namespace, is_jax_namespace, is_torch_namespace
from ._lib._utils._helpers import JitLibrary, autojit, pickle_flatten, pickle_unflatten
__all__ = ["lazy_xp_function", "patch_lazy_xp_functions"]
if TYPE_CHECKING: # pragma: no cover
# TODO import override from typing (requires Python >=3.12)
import pytest
from dask.typing import Graph, Key, SchedulerGetCallable
from typing_extensions import override
else:
# Sphinx hacks
SchedulerGetCallable = object
def override(func):
return func
P = ParamSpec("P")
T = TypeVar("T")
_ufuncs_tags: dict[object, dict[str, Any]] = {}
class Deprecated(enum.Enum):
"""Unique type for deprecated parameters."""
DEPRECATED = 1
DEPRECATED = Deprecated.DEPRECATED
def _clone_function(f: Callable[..., Any]) -> Callable[..., Any]:
"""Returns a clone of an existing function."""
f_new = FunctionType(
f.__code__,
f.__globals__,
name=f.__name__,
argdefs=f.__defaults__,
closure=f.__closure__,
)
f_new.__kwdefaults__ = f.__kwdefaults__
return update_wrapper(f_new, f)
def lazy_xp_function(
func: Callable[..., Any] | tuple[type, str],
*,
allow_dask_compute: bool | int = False,
jax_jit: bool = True,
torch_compile: bool = True,
static_argnums: Deprecated = DEPRECATED,
static_argnames: Deprecated = DEPRECATED,
) -> None: # numpydoc ignore=GL07
"""
Tag a function to be tested on lazy backends.
Tag a function so that when any tests are executed with ``xp=jax.numpy`` the
function is replaced with a jitted version of itself, and when it is executed with
``xp=dask.array`` the function will raise if it attempts to materialize the graph.
This will be later expanded to provide test coverage for other lazy backends.
In order for the tag to be effective, the test or a fixture must call
:func:`patch_lazy_xp_functions`.
Parameters
----------
func : callable | tuple[type, str]
Function to be tested, or a tuple containing an (uninstantiated) class and a
method name to specify a class method to be tested.
allow_dask_compute : bool | int, optional
Whether `func` is allowed to internally materialize the Dask graph, or maximum
number of times it is allowed to do so. This is typically triggered by
``bool()``, ``float()``, or ``np.asarray()``.
Set to 1 if you are aware that `func` converts the input parameters to NumPy and
want to let it do so at least for the time being, knowing that it is going to be
extremely detrimental for performance.
If a test needs values higher than 1 to pass, it is a canary that the conversion
to NumPy/bool/float is happening multiple times, which translates to multiple
computations of the whole graph. Short of making the function fully lazy, you
should at least add explicit calls to ``np.asarray()`` early in the function.
*Note:* the counter of `allow_dask_compute` resets after each call to `func`, so
a test function that invokes `func` multiple times should still work with this
parameter set to 1.
Set to True to allow `func` to materialize the graph an unlimited number
of times.
Default: False, meaning that `func` must be fully lazy and never materialize the
graph.
jax_jit : bool, optional
Set to True to replace `func` with a smart variant of ``jax.jit(func)`` after
calling the :func:`patch_lazy_xp_functions` test helper with ``xp=jax.numpy``.
This is the default behaviour.
Set to False if `func` is only compatible with eager (non-jitted) JAX.
Unlike with vanilla ``jax.jit``, all arguments and return types that are not JAX
arrays are treated as static; the function can accept and return arbitrary
wrappers around JAX arrays. This difference is because, in real life, most users
won't wrap the function directly with ``jax.jit`` but rather they will use it
within their own code, which is itself then wrapped by ``jax.jit``, and
internally consume the function's outputs.
In other words, the pattern that is being tested is::
>>> @jax.jit
... def user_func(x):
... y = user_prepares_inputs(x)
... z = func(y, some_static_arg=True)
... return user_consumes(z)
Default: True.
static_argnums :
Deprecated; ignored
static_argnames :
Deprecated; ignored
See Also
--------
patch_lazy_xp_functions : Companion function to call from the test or fixture.
jax.jit : JAX function to compile a function for performance.
Examples
--------
In ``test_mymodule.py``::
from array_api_extra.testing import lazy_xp_function from mymodule import myfunc
lazy_xp_function(myfunc)
def test_myfunc(xp):
a = xp.asarray([1, 2])
# When xp=jax.numpy, this is similar to `b = jax.jit(myfunc)(a)`
# When xp=dask.array, crash on compute() or persist()
b = myfunc(a)
Notes
-----
In order for this tag to be effective, the test function must be imported into the
test module globals without its namespace; alternatively its namespace must be
declared in a ``lazy_xp_modules`` list in the test module globals.
Example 1::
from mymodule import myfunc
lazy_xp_function(myfunc)
def test_myfunc(xp):
x = myfunc(xp.asarray([1, 2]))
Example 2::
import mymodule
lazy_xp_modules = [mymodule]
lazy_xp_function(mymodule.myfunc)
def test_myfunc(xp):
x = mymodule.myfunc(xp.asarray([1, 2]))
A test function can circumvent this monkey-patching system by using a namespace
outside of the two above patterns. You need to sanitize your code to make sure this
only happens intentionally.
Example 1::
import mymodule
from mymodule import myfunc
lazy_xp_function(myfunc)
def test_myfunc(xp):
a = xp.asarray([1, 2])
b = myfunc(a) # This is wrapped when xp=jax.numpy or xp=dask.array
c = mymodule.myfunc(a) # This is not
Example 2::
import mymodule
class naked:
myfunc = mymodule.myfunc
lazy_xp_modules = [mymodule]
lazy_xp_function(mymodule.myfunc)
def test_myfunc(xp):
a = xp.asarray([1, 2])
b = mymodule.myfunc(a) # This is wrapped when xp=jax.numpy or xp=dask.array
c = naked.myfunc(a) # This is not
"""
if static_argnums is not DEPRECATED or static_argnames is not DEPRECATED:
warnings.warn(
(
"The `static_argnums` and `static_argnames` parameters are deprecated "
"and ignored. They will be removed in a future version."
),
DeprecationWarning,
stacklevel=2,
)
tags: dict[str, bool | int | type] = {
"allow_dask_compute": allow_dask_compute,
"jax_jit": jax_jit,
"torch_compile": torch_compile,
}
if isinstance(func, tuple):
# Replace the method with a clone before adding tags
# to avoid adding unwanted tags to a parent method when
# the method was inherited from a parent class.
# Note: can't just accept an unbound method `cls.method_name` because in
# case of inheritance it would be impossible to attribute it to the child class.
# This also makes it so tagged methods will appear in their class's ``__dict__``
# and thus findable by ``iter_tagged_modules`` below.
cls, method_name = func
# The method might be a staticmethod or classmethod so we need to do a dance
# to ensure that this is preserved.
raw_attr = getattr_static(cls, method_name)
method = getattr(cls, method_name)
if isinstance(raw_attr, classmethod):
method = method.__func__
cloned_method = _clone_function(method)
method_to_set: Any
if isinstance(raw_attr, staticmethod):
method_to_set = staticmethod(cloned_method)
elif isinstance(raw_attr, classmethod):
method_to_set = classmethod(cloned_method)
else:
method_to_set = cloned_method
setattr(cls, method_name, method_to_set)
f = getattr(cls, method_name)
if isinstance(raw_attr, classmethod):
f = f.__func__
# Annotate that cls owns this method so we can check that later.
tags["owner"] = cls
else:
f = func
try:
f._lazy_xp_function = tags # pylint: disable=protected-access # pyright: ignore[reportFunctionMemberAccess]
except AttributeError: # @cython.vectorize
_ufuncs_tags[f] = tags
def patch_lazy_xp_functions(
request: pytest.FixtureRequest,
monkeypatch: pytest.MonkeyPatch | None = None,
*,
xp: ModuleType,
) -> contextlib.AbstractContextManager[None]:
"""
Test lazy execution of functions tagged with :func:`lazy_xp_function`.
If ``xp==jax.numpy``, search for all functions and methods which have been tagged
with :func:`lazy_xp_function` in the globals of the module that defines the current
test, as well as in the ``lazy_xp_modules`` list in the globals of the same module,
and wrap them with :func:`jax.jit`.
Unwrap them at the end of the test.
If ``xp==dask.array``, wrap the functions with a decorator that disables
``compute()`` and ``persist()`` and ensures that exceptions and warnings are raised
eagerly.
This function should be typically called by your library's `xp` fixture that runs
tests on multiple backends::
@pytest.fixture(params=[
numpy,
array_api_strict,
pytest.param(jax.numpy, marks=pytest.mark.thread_unsafe),
pytest.param(dask.array, marks=pytest.mark.thread_unsafe),
])
def xp(request):
with patch_lazy_xp_functions(request, xp=request.param):
yield request.param
but it can be otherwise be called by the test itself too.
Parameters
----------
request : pytest.FixtureRequest
Pytest fixture, as acquired by the test itself or by one of its fixtures.
monkeypatch : pytest.MonkeyPatch
Deprecated
xp : array_namespace
Array namespace to be tested.
See Also
--------
lazy_xp_function : Tag a function to be tested on lazy backends.
pytest.FixtureRequest : `request` test function parameter.
Notes
-----
This context manager monkey-patches modules and as such is thread unsafe
on Dask and JAX. If you run your test suite with
`pytest-run-parallel <https://github.com/Quansight-Labs/pytest-run-parallel/>`_,
you should mark these backends with ``@pytest.mark.thread_unsafe``, as shown in
the example above.
"""
mod = cast(ModuleType, request.module)
search_targets: list[ModuleType | type] = [
mod,
*cast(list[ModuleType], getattr(mod, "lazy_xp_modules", [])),
]
# Also search for classes within the above modules which have had lazy_xp_function
# applied to methods through ``lazy_xp_function((cls, method_name))`` syntax.
# We might end up adding classes incidentally imported into modules, so using a
# set here to cut down on potential redundancy.
classes: set[type] = set()
for target in search_targets:
for obj in target.__dict__.values():
if isinstance(obj, type):
classes.add(obj)
search_targets.extend(classes)
to_revert: list[tuple[ModuleType | type, str, object]] = []
def temp_setattr(target: ModuleType | type, name: str, func: object) -> None:
"""
Variant of monkeypatch.setattr, which allows monkey-patching only selected
parameters of a test so that pytest-run-parallel can run on the remainder.
"""
assert hasattr(target, name)
# Need getattr_static because the attr could be a staticmethod or other
# descriptor and we don't want that to be stripped away.
original = getattr_static(target, name)
to_revert.append((target, name, original))
setattr(target, name, func)
if monkeypatch is not None:
warnings.warn(
(
"The `monkeypatch` parameter is deprecated and will be removed in a "
"future version. "
"Use `patch_lazy_xp_function` as a context manager instead."
),
DeprecationWarning,
stacklevel=2,
)
# Enable using patch_lazy_xp_function not as a context manager
temp_setattr = monkeypatch.setattr # type: ignore[assignment] # pyright: ignore[reportAssignmentType]
def iter_tagged() -> Iterator[
tuple[ModuleType | type, str, Any, Callable[..., Any], dict[str, Any]]
]:
for target in search_targets:
for name, attr in target.__dict__.items():
# attr might be a staticmethod or classmethod. If so we need
# to peel it back and wrap the underlying function and later
# make sure not to accidentally replace it with a regular
# method.
func: Any = (
attr.__func__
if isinstance(attr, (staticmethod, classmethod))
else attr
)
tags: dict[str, Any] | None = None
with contextlib.suppress(AttributeError):
tags = func._lazy_xp_function # pylint: disable=protected-access
if tags is None:
with contextlib.suppress(KeyError, TypeError):
tags = _ufuncs_tags[func]
if tags is not None:
if isinstance(target, type) and tags.get("owner") is not target:
# There's a common pattern to wrap functions in namespace
# classes to bypass lazy_xp_function like this:
#
# class naked:
# myfunc = mymodule.myfunc
#
# To ensure this still works when checking for tags in
# attributes of classes, ensure that target is the actual
# owning class where func was defined.
continue
# put attr, and func in the outputs so we can later tell
# if this was a staticmethod or classmethod.
yield target, name, attr, func, tags
wrapped: Any
if is_dask_namespace(xp):
for target, name, attr, func, tags in iter_tagged():
n = tags["allow_dask_compute"]
if n is True:
n = 1_000_000
elif n is False:
n = 0
wrapped = _dask_wrap(func, n)
# If we're dealing with a staticmethod or classmethod, make
# sure things stay that way.
if isinstance(attr, staticmethod):
wrapped = staticmethod(wrapped)
elif isinstance(attr, classmethod):
wrapped = classmethod(wrapped)
temp_setattr(target, name, wrapped)
elif is_jax_namespace(xp):
for target, name, attr, func, tags in iter_tagged():
if tags["jax_jit"]:
wrapped = autojit(func, JitLibrary.jax)
# If we're dealing with a staticmethod or classmethod, make
# sure things stay that way.
if isinstance(attr, staticmethod):
wrapped = staticmethod(wrapped)
elif isinstance(attr, classmethod):
wrapped = classmethod(wrapped)
temp_setattr(target, name, wrapped)
elif is_torch_namespace(xp):
for target, name, attr, func, tags in iter_tagged():
if tags["torch_compile"]:
wrapped = autojit(func, JitLibrary.torch)
# If we're dealing with a staticmethod or classmethod, make
# sure things stay that way.
if isinstance(attr, staticmethod):
wrapped = staticmethod(wrapped)
elif isinstance(attr, classmethod):
wrapped = classmethod(wrapped)
temp_setattr(target, name, wrapped)
# We can't just decorate patch_lazy_xp_functions with
# @contextlib.contextmanager because it would not work with the
# deprecated monkeypatch when not used as a context manager.
@contextlib.contextmanager
def revert_on_exit() -> Generator[None]:
try:
yield
finally:
for target, name, orig_func in to_revert:
setattr(target, name, orig_func)
return revert_on_exit()
class CountingDaskScheduler(SchedulerGetCallable):
"""
Dask scheduler that counts how many times `dask.compute` is called.
If the number of times exceeds 'max_count', it raises an error.
This is a wrapper around Dask's own 'synchronous' scheduler.
Parameters
----------
max_count : int
Maximum number of allowed calls to `dask.compute`.
msg : str
Assertion to raise when the count exceeds `max_count`.
"""
count: int
max_count: int
msg: str
def __init__(self, max_count: int, msg: str): # numpydoc ignore=GL08
self.count = 0
self.max_count = max_count
self.msg = msg
@override
def __call__(
self, dsk: Graph, keys: Sequence[Key] | Key, **kwargs: Any
) -> Any: # numpydoc ignore=GL08
import dask
self.count += 1
# This should yield a nice traceback to the
# offending line in the user's code
assert self.count <= self.max_count, self.msg
return dask.get(dsk, keys, **kwargs) # type: ignore[attr-defined] # pyright: ignore[reportPrivateImportUsage]
def _dask_wrap(
func: Callable[P, T], n: int
) -> Callable[P, T]: # numpydoc ignore=PR01,RT01
"""
Wrap `func` to raise if it attempts to call `dask.compute` more than `n` times.
After the function returns, materialize the graph in order to re-raise exceptions.
"""
import dask
import dask.array as da
func_name = getattr(func, "__name__", str(func))
n_str = f"only up to {n}" if n else "no"
msg = (
f"Called `dask.compute()` or `dask.persist()` {n + 1} times, "
f"but {n_str} calls are allowed. Set "
f"`lazy_xp_function({func_name}, allow_dask_compute={n + 1})` "
"to allow for more (but note that this will harm performance). "
)
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: # numpydoc ignore=GL08
scheduler = CountingDaskScheduler(n, msg)
with dask.config.set({"scheduler": scheduler}): # pyright: ignore[reportPrivateImportUsage]
out = func(*args, **kwargs)
# Block until the graph materializes and reraise exceptions. This allows
# `pytest.raises` and `pytest.warns` to work as expected. Note that this would
# not work on scheduler='distributed', as it would not block.
arrays, rest = pickle_flatten(out, da.Array)
arrays = dask.persist(arrays, scheduler="threads")[0] # type: ignore[attr-defined,no-untyped-call] # pyright: ignore[reportPrivateImportUsage]
return pickle_unflatten(arrays, rest) # pyright: ignore[reportUnknownArgumentType]
return wrapper