-
Notifications
You must be signed in to change notification settings - Fork 244
Expand file tree
/
Copy pathtest_except.py
More file actions
126 lines (90 loc) · 3.28 KB
/
test_except.py
File metadata and controls
126 lines (90 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
import datetime
import gc
import tracemalloc
from pytest import raises
from msgpack import ExtType, FormatError, OutOfData, StackError, Unpacker, packb, unpackb
class DummyException(Exception):
pass
def test_raise_on_find_unsupported_value():
with raises(TypeError):
packb(datetime.datetime.now())
def test_raise_from_object_hook():
def hook(obj):
raise DummyException
raises(DummyException, unpackb, packb({}), object_hook=hook)
raises(DummyException, unpackb, packb({"fizz": "buzz"}), object_hook=hook)
raises(DummyException, unpackb, packb({"fizz": "buzz"}), object_pairs_hook=hook)
raises(DummyException, unpackb, packb({"fizz": {"buzz": "spam"}}), object_hook=hook)
raises(
DummyException,
unpackb,
packb({"fizz": {"buzz": "spam"}}),
object_pairs_hook=hook,
)
def test_raise_from_list_hook():
def hook(lst: list) -> list:
raise DummyException
with raises(DummyException):
unpackb(packb([1, 2, 3]), list_hook=hook)
with raises(DummyException):
unpacker = Unpacker(list_hook=hook)
unpacker.feed(packb([1, 2, 3]))
unpacker.unpack()
def test_raise_from_ext_hook():
def hook(code: int, data: bytes) -> ExtType:
raise DummyException
packed = packb(ExtType(42, b"hello"))
with raises(DummyException):
unpackb(packed, ext_hook=hook)
with raises(DummyException):
unpacker = Unpacker(ext_hook=hook)
unpacker.feed(packed)
unpacker.unpack()
def test_invalidvalue():
incomplete = b"\xd9\x97#DL_" # raw8 - length=0x97
with raises(ValueError):
unpackb(incomplete)
with raises(OutOfData):
unpacker = Unpacker()
unpacker.feed(incomplete)
unpacker.unpack()
with raises(FormatError):
unpackb(b"\xc1") # (undefined tag)
with raises(FormatError):
unpackb(b"\x91\xc1") # fixarray(len=1) [ (undefined tag) ]
with raises(StackError):
unpackb(b"\x91" * 3000) # nested fixarray(len=1)
def test_no_memory_leak_on_nested_invalid_tag() -> None:
"""Regression test: unpacking nested arrays containing an invalid tag must not leak objects."""
kwargs: dict = {
"raw": False,
"strict_map_key": False,
"max_array_len": 1 << 20,
"max_map_len": 1 << 20,
}
n = 1000
for depth in range(1, 15):
data = bytes([0x91] * depth + [0xC1])
gc.collect()
tracemalloc.start()
s1 = tracemalloc.take_snapshot()
for _ in range(n):
try:
unpackb(data, **kwargs)
except Exception:
pass
gc.collect()
s2 = tracemalloc.take_snapshot()
tracemalloc.stop()
leaked = sum(s.count_diff for s in s2.compare_to(s1, "lineno") if s.count_diff > 0)
per_call = leaked / n
assert per_call < 1.0, f"depth={depth}: {per_call:.2f} leaked objects/call (expected < 1)"
def test_strict_map_key():
valid = {"unicode": 1, b"bytes": 2}
packed = packb(valid, use_bin_type=True)
assert valid == unpackb(packed, raw=False, strict_map_key=True)
invalid = {42: 1}
packed = packb(invalid, use_bin_type=True)
with raises(ValueError):
unpackb(packed, raw=False, strict_map_key=True)