Skip to content

Commit f249957

Browse files
tests for observations
also includes some fixes for system helpers
1 parent 006eb07 commit f249957

6 files changed

Lines changed: 155 additions & 21 deletions

File tree

conSys/datamodels/api_utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
1+
from __future__ import annotations
12
from pydantic import BaseModel, Field, HttpUrl, field_validator
23

34

5+
class Link(BaseModel):
6+
href: HttpUrl = Field(...)
7+
rel: str = Field(None)
8+
type: str = Field(None)
9+
hreflang: str = Field(None)
10+
title: str = Field(None)
11+
uid: URI = Field(None)
12+
rt: URI = Field(None)
13+
interface: URI = Field(None, serialization_alias='if')
14+
15+
416
class UCUMCode(BaseModel):
517
code: str = Field(...)
618
label: str = Field(None)

conSys/datamodels/observations.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from datetime import datetime
2+
3+
from pydantic import BaseModel, Field
4+
from typing import Union, Optional, List
5+
6+
from conSys.datamodels.api_utils import Link
7+
8+
9+
class ObservationOMJSONInline(BaseModel):
10+
"""
11+
A class to represent an observation in OM-JSON format
12+
"""
13+
datastream_id: str = Field(None, serialization_alias="datastream@id")
14+
foi_id: str = Field(None, serialization_alias="foi@id")
15+
phenomenon_time: str = Field(None, serialization_alias="phenomenonTime")
16+
result_time: str = Field(datetime.now().isoformat(), serialization_alias="resultTime")
17+
parameters: dict = Field(None)
18+
result: Union[int, float, str, dict, list] = Field(...)
19+
result_links: List[Link] = Field(None, serialization_alias="result@links")
20+

conSys/datamodels/swe_components.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class AnySimpleComponentSchema(AnyComponentSchema):
103103
nil_values: list = Field(None, serialization_alias='nilValues')
104104
constraint: Any = Field(None)
105105
value: Any = Field(None)
106+
name: str = Field(...)
106107

107108

108109
class AnyScalarComponentSchema(AnySimpleComponentSchema):

conSys/part_1/systems.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ def list_all_systems(server_addr: HttpUrl, api_root: str = APITerms.API.value, h
2020
.for_resource_type(APITerms.SYSTEMS.value)
2121
.build_url_from_base()
2222
.with_headers(headers)
23+
.with_request_method('GET')
2324
.build())
24-
resp = requests.get(api_request.url, params=api_request.body, headers=api_request.headers)
25-
return resp.json()
25+
26+
return api_request.make_request()
2627

2728

2829
def create_new_systems(server_addr: HttpUrl, request_body: Union[str, dict], api_root: str = APITerms.API.value,

conSys/part_2/datastreams.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def list_all_datastreams_of_system(server_addr: HttpUrl, system_id: str, api_roo
3535
.with_api_root(api_root)
3636
.for_resource_type(APITerms.SYSTEMS.value)
3737
.with_resource_id(system_id)
38-
.for_resource_type(APITerms.DATASTREAMS.value)
38+
.for_sub_resource_type(APITerms.DATASTREAMS.value)
3939
.build_url_from_base()
4040
.with_headers(headers)
4141
.with_request_method('GET')

tests/test_script_full_suite.py

Lines changed: 118 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import random
2+
from datetime import datetime
23

34
from conSys import Systems, SamplingFeatures, Datastreams, SmlJSONBody, GeoJSONBody, model_utils, \
4-
DatastreamBodyJSON, ObservationFormat, URI, Procedures, Geometry, Deployments, ControlChannels
5+
DatastreamBodyJSON, ObservationFormat, URI, Procedures, Geometry, Deployments, ControlChannels, Observations
56
from conSys.datamodels.control_streams import ControlStreamJSONSchema, SWEControlChannelSchema, JSONControlChannelSchema
67
from conSys.datamodels.datastreams import SWEDatastreamSchema
78
from conSys.datamodels.encoding import JSONEncoding
89
from conSys.datamodels.swe_components import BooleanSchema, TimeSchema, DataRecordSchema, CountSchema
10+
from conSys.datamodels.observations import ObservationOMJSONInline
911

1012
server_url = "http://localhost:8282/sensorhub"
1113
geo_json_headers = {"Content-Type": "application/geo+json"}
@@ -19,6 +21,7 @@
1921
component_json = []
2022
command_json = []
2123
control_channel_json = []
24+
test_time_start = datetime.utcnow()
2225

2326
"""
2427
Setup Section
@@ -75,26 +78,25 @@ def test_list_systems():
7578
Tests the listing of systems using the Connected Systems API by listing all systems and all systems in a collection.
7679
:return:
7780
"""
78-
sys_list = Systems.list_all_systems(server_url)["items"]
79-
retrieved_systems.extend(sys_list)
80-
81-
for system in retrieved_systems:
82-
print(system)
81+
sys_list = Systems.list_all_systems(server_url)
82+
print(sys_list.json())
83+
retrieved_systems = sys_list.json()
8384

8485

8586
def test_retrieve_system():
8687
"""
8788
Tests the retrieval of a system using the Connected Systems API by retrieving a single system and a batch of systems.
8889
:return:
8990
"""
90-
system_id = retrieved_systems[0]['id']
91+
system_id = Systems.list_all_systems(server_url).json()['items'][0]['id']
9192
retrieved_system = Systems.retrieve_system_by_id(server_url, system_id)
9293
print(retrieved_system)
9394
assert retrieved_system is not None
9495
assert retrieved_system['id'] == system_id
9596

9697

9798
def test_update_systems():
99+
retrieved_systems = Systems.list_all_systems(server_url, headers=json_headers).json()['items']
98100
if retrieved_systems is None or len(retrieved_systems) == 0:
99101
raise ValueError("No systems to update")
100102
for system in retrieved_systems:
@@ -157,9 +159,8 @@ def test_update_deployment_by_id():
157159

158160
def test_add_systems_to_deployment():
159161
deployments = Deployments.list_all_deployments(server_url)
160-
systems = Systems.list_all_systems(server_url, headers=json_headers)
162+
systems = Systems.list_all_systems(server_url, headers=json_headers).json()
161163
system_link = {'href': f"{server_url}/api/systems/{systems['items'][0]['id']}"}
162-
# uri_list = str(system_links).replace("'", "\"")
163164
resp = Deployments.add_systems_to_deployment(server_url, deployments.json()['items'][0]['id'], str(system_link),
164165
headers=geo_json_headers)
165166
print(resp)
@@ -194,6 +195,7 @@ def test_add_systems_to_deployment():
194195

195196

196197
def test_create_sampling_feature():
198+
retrieved_systems = Systems.list_all_systems(server_url, headers=json_headers).json()['items']
197199
geo_sf = GeoJSONBody(type='Feature', id=str(random.randint(1000, 9999)),
198200
description="Test Insertion of Sampling Feature via GEOJSON",
199201
properties={
@@ -228,12 +230,14 @@ def test_list_sampling_features():
228230

229231

230232
def test_list_sampling_feature_by_system():
233+
retrieved_systems = Systems.list_all_systems(server_url, headers=json_headers).json()['items']
231234
sf_list = SamplingFeatures.list_sampling_features_of_system(server_url, retrieved_systems[0]['id'])
232235
print(sf_list.json())
233236
sf_id = sf_list.json()['items'][0]['id']
234237

235238

236239
def test_update_sampling_feature():
240+
retrieved_systems = Systems.list_all_systems(server_url, headers=json_headers).json()['items']
237241
sf_list = SamplingFeatures.list_sampling_features_of_system(server_url, retrieved_systems[0]['id'])
238242
print(sf_list.json())
239243
sf_id = sf_list.json()['items'][0]['id']
@@ -260,14 +264,16 @@ def test_retrieve_sampling_feature_by_id():
260264

261265

262266
"""
263-
Datastream Section
267+
Datastreams and Observations Section
264268
"""
265269

266270

267271
def test_create_datastreams():
268-
time_schema = TimeSchema(label="Test Datastream Time", definition="http://test.com/Time",
272+
retrieved_systems = Systems.list_all_systems(server_url, headers=json_headers).json()['items']
273+
time_schema = TimeSchema(label="Test Datastream Time", definition="http://test.com/Time", name="timestamp",
269274
uom=URI(href="http://test.com/TimeUOM"))
270-
bool_schema = BooleanSchema(label="Test Datastream Boolean", definition="http://test.com/Boolean")
275+
bool_schema = BooleanSchema(label="Test Datastream Boolean", definition="http://test.com/Boolean",
276+
name="testboolean")
271277
datarecord_schema = SWEDatastreamSchema(encoding=JSONEncoding(), obs_format=ObservationFormat.SWE_JSON.value,
272278
record_schema=DataRecordSchema(label="Test Datastream Record",
273279
definition="http://test.com/Record",
@@ -283,18 +289,109 @@ def test_create_datastreams():
283289
print(resp)
284290

285291

292+
def test_list_datastreams():
293+
ds_list = Datastreams.list_all_datastreams(server_url)
294+
print(ds_list.json())
295+
296+
297+
def test_list_datastreams_of_system():
298+
sys_list = Systems.list_all_systems(server_url, headers=json_headers).json()
299+
print(sys_list)
300+
ds_list = Datastreams.list_all_datastreams_of_system(server_url, sys_list['items'][0]['id'], headers=json_headers)
301+
# print(ds_list.json())
302+
print(ds_list)
303+
304+
305+
def test_retrieve_datastream_by_id():
306+
ds_list = Datastreams.list_all_datastreams(server_url).json()
307+
ds = Datastreams.retrieve_datastream_by_id(server_url, ds_list['items'][0]['id'])
308+
print(ds.json())
309+
310+
311+
def test_update_datastream_by_id():
312+
ds_list = Datastreams.list_all_datastreams(server_url).json()
313+
time_schema = TimeSchema(label="Test Datastream Time (Updated)", definition="http://test.com/Time",
314+
name="timestamp",
315+
uom=URI(href="http://test.com/TimeUOM"))
316+
count_schema = CountSchema(label="Test Datastream Count (Updated)", definition="http://test.com/Count",
317+
name="testcount")
318+
bool_schema = BooleanSchema(label="Test Datastream Boolean (Updated)", definition="http://test.com/Boolean",
319+
name="testboolean")
320+
321+
datarecord_schema = SWEDatastreamSchema(encoding=JSONEncoding(), obs_format=ObservationFormat.SWE_JSON.value,
322+
record_schema=DataRecordSchema(label="Test Datastream Record (Updated)",
323+
definition="http://test.com/Record",
324+
fields=[bool_schema]))
325+
print(f'Datastream Schema: {datarecord_schema.model_dump_json(exclude_none=True, by_alias=True)}')
326+
datastream_body = DatastreamBodyJSON(name="Test Datastream (Updated)", output_name="Test Output #1",
327+
schema=datarecord_schema)
328+
temp_test_json = datastream_body.model_dump_json(exclude_none=True, by_alias=True)
329+
print(f'Test Datastream JSON: {temp_test_json}')
330+
resp = Datastreams.update_datastream_by_id(server_url, ds_list['items'][0]['id'],
331+
datastream_body.model_dump_json(exclude_none=True, by_alias=True),
332+
headers=json_headers)
333+
print(resp)
334+
335+
336+
def test_add_observations_to_datastream():
337+
ds_list = Datastreams.list_all_datastreams(server_url).json()
338+
the_time = datetime.utcnow().isoformat() + 'Z' # for now just add the Z because I can't be bothered to validate results without a model
339+
time_millis = datetime.now().timestamp() * 1000
340+
time_millis = test_time_start.timestamp() * 1000
341+
obs = ObservationOMJSONInline(phenomenon_time=the_time,
342+
result_time=the_time,
343+
result={
344+
"timestamp": time_millis,
345+
"testboolean": True
346+
})
347+
print(f'Observation: {obs.model_dump_json(exclude_none=True, by_alias=True)}')
348+
resp = Observations.add_observations_to_datastream(server_url, ds_list['items'][0]['id'],
349+
obs.model_dump_json(exclude_none=True, by_alias=True),
350+
headers=json_headers)
351+
print(resp)
352+
353+
354+
def test_list_all_observations():
355+
obs_list = Observations.list_all_observations(server_url)
356+
print(obs_list.json())
357+
358+
359+
def test_list_observations_of_datastream():
360+
ds_list = Datastreams.list_all_datastreams(server_url).json()
361+
obs_list = Observations.list_observations_from_datastream(server_url, ds_list['items'][0]['id'])
362+
print(obs_list.json())
363+
364+
365+
def test_update_observation_by_id():
366+
# Fails because the test server asks for a datastream id, but the model provides one so no idea there for now
367+
obs_list = Observations.list_all_observations(server_url).json()
368+
the_time = datetime.utcnow().isoformat() + 'Z' # for now just add the Z because I can't be bothered to validate results without a model
369+
time_millis = test_time_start.timestamp() * 1000
370+
obs = ObservationOMJSONInline(phenomenon_time=the_time, datastream_id=obs_list['items'][0]['datastream@id'],
371+
result_time=the_time,
372+
result={
373+
"timestamp": time_millis,
374+
"testboolean": False
375+
})
376+
print(f'Observation: {obs.model_dump_json(exclude_none=True, by_alias=True)}')
377+
resp = Observations.update_observation_by_id(server_url, obs_list['items'][0]['id'],
378+
obs.model_dump_json(exclude_none=True, by_alias=True),
379+
headers=json_headers)
380+
print(resp)
381+
382+
286383
"""
287384
Command and Control Channel Section
288385
"""
289386

290387

291388
def test_create_control_channel():
292-
systems_list = Systems.list_all_systems(server_url)
389+
systems_list = Systems.list_all_systems(server_url, headers=json_headers).json()
293390
system_id = systems_list["items"][0]["id"]
294391

295-
time_schema = TimeSchema(label="Test Control Channel Time", definition="http://test.com/Time",
392+
time_schema = TimeSchema(label="Test Control Channel Time", definition="http://test.com/Time", name="timestamp",
296393
uom=URI(href="http://test.com/TimeUOM"))
297-
count_schema = CountSchema(label="Test Control Channel Count", definition="http://test.com/Count")
394+
count_schema = CountSchema(label="Test Control Channel Count", definition="http://test.com/Count", name="testcount")
298395

299396
control_schema = JSONControlChannelSchema(command_format=ObservationFormat.SWE_JSON.value,
300397
params_schema=DataRecordSchema(label="Test Control Channel Record",
@@ -316,7 +413,7 @@ def test_list_control_streams():
316413

317414

318415
def test_list_control_streams_of_system():
319-
systems_list = Systems.list_all_systems(server_url)
416+
systems_list = Systems.list_all_systems(server_url, headers=json_headers).json()
320417
system_id = systems_list["items"][0]["id"]
321418
control_streams = ControlChannels.list_control_streams_of_system(server_url, system_id)
322419
print(control_streams)
@@ -336,9 +433,12 @@ def test_update_control_stream_by_id():
336433
control_streams = ControlChannels.list_all_control_streams(server_url).json()
337434

338435
time_schema = TimeSchema(label="Test Control Channel Time (Updated)", definition="http://test.com/Time",
436+
name="timestamp",
339437
uom=URI(href="http://test.com/TimeUOM"))
340-
count_schema = CountSchema(label="Test Control Channel Count (Updated)", definition="http://test.com/Count")
341-
bool_schema = BooleanSchema(label="Test Control Channel Boolean (Updated)", definition="http://test.com/Boolean")
438+
count_schema = CountSchema(label="Test Control Channel Count (Updated)", definition="http://test.com/Count",
439+
name="testcount")
440+
bool_schema = BooleanSchema(label="Test Control Channel Boolean (Updated)", definition="http://test.com/Boolean",
441+
name="testboolean")
342442

343443
control_schema = JSONControlChannelSchema(command_format=ObservationFormat.SWE_JSON.value,
344444
params_schema=DataRecordSchema(

0 commit comments

Comments
 (0)