forked from linode/linode_api4-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_monitor.py
More file actions
313 lines (255 loc) · 9.92 KB
/
test_monitor.py
File metadata and controls
313 lines (255 loc) · 9.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import time
from test.integration.helpers import (
get_test_label,
send_request_when_resource_available,
wait_for_condition,
)
import pytest
from linode_api4 import LinodeClient, PaginatedList
from linode_api4.objects import (
AlertDefinition,
AlertDefinitionEntity,
ApiError,
MonitorDashboard,
MonitorMetricsDefinition,
MonitorService,
MonitorServiceToken,
)
from linode_api4.objects.monitor import AlertStatus
# List all dashboards
def test_get_all_dashboards(test_linode_client):
client = test_linode_client
dashboards = client.monitor.dashboards()
assert isinstance(dashboards[0], MonitorDashboard)
dashboard_get = dashboards[0]
get_service_type = dashboard_get.service_type
# Fetch Dashboard by ID
dashboard_by_id = client.load(MonitorDashboard, 1)
assert isinstance(dashboard_by_id, MonitorDashboard)
assert dashboard_by_id.id == 1
# #Fetch Dashboard by service_type
dashboards_by_svc = client.monitor.dashboards(service_type=get_service_type)
assert isinstance(dashboards_by_svc[0], MonitorDashboard)
assert dashboards_by_svc[0].service_type == get_service_type
def test_filter_and_group_by(test_linode_client):
client = test_linode_client
dashboards_by_svc = client.monitor.dashboards(service_type="linode")
assert isinstance(dashboards_by_svc[0], MonitorDashboard)
# Get the first dashboard for linode service type
dashboard = dashboards_by_svc[0]
assert dashboard.service_type == "linode"
# Ensure the dashboard has widgets
assert hasattr(
dashboard, "widgets"
), "Dashboard should have widgets attribute"
assert dashboard.widgets is not None, "Dashboard widgets should not be None"
assert (
len(dashboard.widgets) > 0
), "Dashboard should have at least one widget"
# Test the first widget's group_by and filters fields
widget = dashboard.widgets[0]
# Test group_by field type
group_by = widget.group_by
assert group_by is None or isinstance(
group_by, list
), "group_by should be None or list type"
if group_by is not None:
for item in group_by:
assert isinstance(item, str), "group_by items should be strings"
# Test filters field type
filters = widget.filters
assert filters is None or isinstance(
filters, list
), "filters should be None or list type"
if filters is not None:
from linode_api4.objects.monitor import Filter
for filter_item in filters:
assert isinstance(
filter_item, Filter
), "filter items should be Filter objects"
assert hasattr(
filter_item, "dimension_label"
), "Filter should have dimension_label"
assert hasattr(
filter_item, "operator"
), "Filter should have operator"
assert hasattr(filter_item, "value"), "Filter should have value"
# List supported services
def test_get_supported_services(test_linode_client):
client = test_linode_client
supported_services = client.monitor.services()
assert isinstance(supported_services[0], MonitorService)
get_supported_service = supported_services[0].service_type
# Get details for a particular service
service_details = client.load(MonitorService, get_supported_service)
assert isinstance(service_details, MonitorService)
assert service_details.service_type == get_supported_service
# Get Metric definition details for that particular service
metric_definitions = client.monitor.metric_definitions(
service_type=get_supported_service
)
assert isinstance(metric_definitions[0], MonitorMetricsDefinition)
def test_get_not_supported_service(test_linode_client):
client = test_linode_client
with pytest.raises(RuntimeError) as err:
client.load(MonitorService, "saas")
assert "[404] Not found" in str(err.value)
# Test Helpers
def get_db_engine_id(client: LinodeClient, engine: str):
engines = client.database.engines()
engine_id = ""
for e in engines:
if e.engine == engine:
engine_id = e.id
return str(engine_id)
@pytest.fixture(scope="session")
def test_create_and_test_db(test_linode_client):
client = test_linode_client
label = get_test_label() + "-sqldb"
region = "us-ord"
engine_id = get_db_engine_id(client, "mysql")
dbtype = "g6-standard-1"
db = client.database.mysql_create(
label=label,
region=region,
engine=engine_id,
ltype=dbtype,
cluster_size=None,
)
def get_db_status():
return db.status == "active"
# TAKES 15-30 MINUTES TO FULLY PROVISION DB
wait_for_condition(60, 2000, get_db_status)
yield db
send_request_when_resource_available(300, db.delete)
def test_my_db_functionality(test_linode_client, test_create_and_test_db):
client = test_linode_client
assert test_create_and_test_db.status == "active"
entity_id = test_create_and_test_db.id
# create token for the particular service
token = client.monitor.create_token(
service_type="dbaas", entity_ids=[entity_id]
)
assert isinstance(token, MonitorServiceToken)
assert len(token.token) > 0, "Token should not be empty"
assert hasattr(token, "token"), "Response object has no 'token' attribute"
def test_integration_create_get_update_delete_alert_definition(
test_linode_client,
):
"""E2E: create an alert definition, fetch it, update it, then delete it.
This test attempts to be resilient: it cleans up the created definition
in a finally block so CI doesn't leak resources.
"""
client = test_linode_client
service_type = "dbaas"
label = get_test_label() + "-e2e-alert"
rule_criteria = {
"rules": [
{
"aggregate_function": "avg",
"dimension_filters": [
{
"dimension_label": "node_type",
"label": "Node Type",
"operator": "eq",
"value": "primary",
}
],
"label": "Memory Usage",
"metric": "memory_usage",
"operator": "gt",
"threshold": 90,
"unit": "percent",
}
]
}
trigger_conditions = {
"criteria_condition": "ALL",
"evaluation_period_seconds": 300,
"polling_interval_seconds": 300,
"trigger_occurrences": 1,
}
# Make the label unique and ensure it begins/ends with an alphanumeric char
label = f"{label}-{int(time.time())}"
description = "E2E alert created by SDK integration test"
# Pick an existing alert channel to attach to the definition; skip if none
channels = list(client.monitor.alert_channels())
if not channels:
pytest.skip(
"No alert channels available on account for creating alert definitions"
)
created = None
def wait_for_alert_ready(alert_id, service_type: str):
timeout = 360 # maximum time in seconds to wait for alert creation
initial_timeout = 1
start = time.time()
interval = initial_timeout
alert = client.load(AlertDefinition, alert_id, service_type)
while (
getattr(alert, "status", None)
!= AlertStatus.AlertDefinitionStatusEnabled
and (time.time() - start) < timeout
):
time.sleep(interval)
interval *= 2
try:
alert._api_get()
except ApiError as e:
# transient errors while polling; continue until timeout
if e.status != 404:
raise
return alert
try:
# Create the alert definition using API-compliant top-level fields
created = client.monitor.create_alert_definition(
service_type=service_type,
label=label,
severity=1,
description=description,
channel_ids=[channels[0].id],
rule_criteria=rule_criteria,
trigger_conditions=trigger_conditions,
)
assert created.id
assert getattr(created, "label", None) == label
assert getattr(created, "entities", None) is not None
created = wait_for_alert_ready(created.id, service_type)
updated = client.load(AlertDefinition, created.id, service_type)
updated.label = f"{label}-updated"
updated.save()
assert getattr(updated, "entities", None) is not None
updated = wait_for_alert_ready(updated.id, service_type)
assert created.id == updated.id
assert updated.label == f"{label}-updated"
finally:
if created:
# Best-effort cleanup; allow transient errors.
delete_alert = client.load(
AlertDefinition, created.id, service_type
)
delete_alert.delete()
def test_alert_definition_entities(test_linode_client):
"""Test listing entities associated with an alert definition.
This test first retrieves alert definitions for a service type, then lists entities for the first alert definition.
It asserts that the returned entities have expected fields.
"""
client = test_linode_client
service_type = "dbaas"
alert_definitions = client.monitor.alert_definitions(
service_type=service_type
)
if len(alert_definitions) == 0:
pytest.fail("No alert definitions available for dbaas service type")
assert getattr(alert_definitions[0], "entities", None) is not None
alert_def = alert_definitions[0]
entities = client.monitor.alert_definition_entities(
service_type, alert_def.id
)
assert isinstance(entities, PaginatedList)
if len(entities) > 0:
entity = entities[0]
assert isinstance(entity, AlertDefinitionEntity)
assert entity.id
assert entity.label
assert entity.url
assert entity._type == service_type