Skip to content

Commit 3ab5a85

Browse files
update control channels helpers for parity
1 parent 3207662 commit 3ab5a85

3 files changed

Lines changed: 107 additions & 36 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from __future__ import annotations
2+
3+
from typing import Union
4+
5+
from pydantic import BaseModel, Field, SerializeAsAny
6+
7+
from conSys import AnyComponentSchema
8+
from conSys.datamodels.encoding import Encoding
9+
10+
11+
class ControlStreamJSONSchema(BaseModel):
12+
"""
13+
A class to represent the schema of a control stream
14+
"""
15+
id: str = Field(None)
16+
name: str = Field(...)
17+
description: str = Field(None)
18+
deployment_link: str = Field(None, serialization_alias='deployment@link')
19+
ultimate_feature_of_interest_link: str = Field(None, serialization_alias='ultimateFeatureOfInterest@link')
20+
sampling_feature_link: str = Field(None, alias='samplingFeature@link')
21+
valid_time: list = Field(None, serialization_alias='validTime')
22+
input_name: str = Field(None, serialization_alias='inputName')
23+
links: list = Field(None)
24+
schema: SerializeAsAny[Union[SWEControlChannelSchema, JSONControlChannelSchema]] = Field(...)
25+
26+
27+
class SWEControlChannelSchema(BaseModel):
28+
"""
29+
A class to represent the schema of a control channel
30+
"""
31+
command_format: str = Field("application/swe+json", serialization_alias='commandFormat')
32+
encoding: SerializeAsAny[Encoding] = Field(...)
33+
record_schema: SerializeAsAny[AnyComponentSchema] = Field(..., serialization_alias='recordSchema')
34+
35+
36+
class JSONControlChannelSchema(BaseModel):
37+
command_format: str = Field("application/cmd+json", serialization_alias='commandFormat')
38+
params_schema: SerializeAsAny[AnyComponentSchema] = Field(..., serialization_alias='paramsSchema')

conSys/part_2/control_channels.py

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
from typing import Union
2+
13
import requests
24
from pydantic import HttpUrl
35

46
from conSys.con_sys_api import ConnectedSystemsRequestBuilder
57
from conSys.constants import APITerms
68

79

8-
def list_all_constrol_streams(server_addr: HttpUrl, api_root: str = APITerms.API.value):
10+
def list_all_control_streams(server_addr: HttpUrl, api_root: str = APITerms.API.value, headers: dict = None):
911
"""
1012
Lists all control streams
1113
:return:
@@ -15,12 +17,14 @@ def list_all_constrol_streams(server_addr: HttpUrl, api_root: str = APITerms.API
1517
.with_api_root(api_root)
1618
.for_resource_type(APITerms.CONTROL_STREAMS.value)
1719
.build_url_from_base()
20+
.with_headers(headers)
21+
.with_request_method('GET')
1822
.build())
19-
resp = requests.get(api_request.url, params=api_request.body, headers=api_request.headers)
20-
return resp.json()
23+
return api_request.make_request()
2124

2225

23-
def list_control_streams_of_system(server_addr: HttpUrl, system_id: str, api_root: str = APITerms.API.value):
26+
def list_control_streams_of_system(server_addr: HttpUrl, system_id: str, api_root: str = APITerms.API.value,
27+
headers=None):
2428
"""
2529
Lists all control streams of a system
2630
:return:
@@ -32,13 +36,14 @@ def list_control_streams_of_system(server_addr: HttpUrl, system_id: str, api_roo
3236
.with_resource_id(system_id)
3337
.for_resource_type(APITerms.CONTROL_STREAMS.value)
3438
.build_url_from_base()
39+
.with_headers(headers)
40+
.with_request_method('GET')
3541
.build())
36-
resp = requests.get(api_request.url, params=api_request.body, headers=api_request.headers)
37-
return resp.json()
42+
return api_request.make_request()
3843

3944

40-
def add_control_streams_to_system(server_addr: HttpUrl, system_id: str, request_body: dict,
41-
api_root: str = APITerms.API.value):
45+
def add_control_streams_to_system(server_addr: HttpUrl, system_id: str, request_body: Union[str, dict],
46+
api_root: str = APITerms.API.value, headers=None):
4247
"""
4348
Adds a control stream to a system by its id
4449
:return:
@@ -48,16 +53,17 @@ def add_control_streams_to_system(server_addr: HttpUrl, system_id: str, request_
4853
.with_api_root(api_root)
4954
.for_resource_type(APITerms.SYSTEMS.value)
5055
.with_resource_id(system_id)
51-
.for_resource_type(APITerms.CONTROL_STREAMS.value)
56+
.for_sub_resource_type(APITerms.CONTROL_STREAMS.value)
5257
.with_request_body(request_body)
5358
.build_url_from_base()
59+
.with_headers(headers)
60+
.with_request_method('POST')
5461
.build())
55-
resp = requests.post(api_request.url, params=api_request.body, headers=api_request.headers)
56-
return resp.json()
62+
return api_request.make_request()
5763

5864

5965
def retrieve_control_stream_description_by_id(server_addr: HttpUrl, control_stream_id: str,
60-
api_root: str = APITerms.API.value):
66+
api_root: str = APITerms.API.value, headers: dict = None):
6167
"""
6268
Retrieves a control stream by its id
6369
:return:
@@ -68,12 +74,15 @@ def retrieve_control_stream_description_by_id(server_addr: HttpUrl, control_stre
6874
.for_resource_type(APITerms.CONTROL_STREAMS.value)
6975
.with_resource_id(control_stream_id)
7076
.build_url_from_base()
77+
.with_headers(headers)
78+
.with_request_method('GET')
7179
.build())
72-
resp = requests.get(api_request.url, params=api_request.body, headers=api_request.headers)
73-
return resp.json()
80+
81+
return api_request.make_request()
7482

7583

76-
def update_control_stream_description_by_id(server_addr: HttpUrl, control_stream_id: str, request_body: dict,
84+
def update_control_stream_description_by_id(server_addr: HttpUrl, control_stream_id: str,
85+
request_body: Union[str, dict],
7786
api_root: str = APITerms.API.value):
7887
"""
7988
Updates a control stream by its id
@@ -91,7 +100,8 @@ def update_control_stream_description_by_id(server_addr: HttpUrl, control_stream
91100
return resp.json()
92101

93102

94-
def delete_control_stream_by_id(server_addr: HttpUrl, control_stream_id: str, api_root: str = APITerms.API.value):
103+
def delete_control_stream_by_id(server_addr: HttpUrl, control_stream_id: str, api_root: str = APITerms.API.value,
104+
headers=None):
95105
"""
96106
Deletes a control stream by its id
97107
:return:
@@ -102,13 +112,15 @@ def delete_control_stream_by_id(server_addr: HttpUrl, control_stream_id: str, ap
102112
.for_resource_type(APITerms.CONTROL_STREAMS.value)
103113
.with_resource_id(control_stream_id)
104114
.build_url_from_base()
115+
.with_headers(headers)
116+
.with_request_method('DELETE')
105117
.build())
106-
resp = requests.delete(api_request.url, params=api_request.body, headers=api_request.headers)
107-
return resp.json()
118+
119+
return api_request.make_request()
108120

109121

110122
def retrieve_control_stream_schema_by_id(server_addr: HttpUrl, control_stream_id: str,
111-
api_root: str = APITerms.API.value):
123+
api_root: str = APITerms.API.value, headers: dict = None):
112124
"""
113125
Retrieves a control stream schema by its id
114126
:return:
@@ -120,13 +132,15 @@ def retrieve_control_stream_schema_by_id(server_addr: HttpUrl, control_stream_id
120132
.with_resource_id(control_stream_id)
121133
.for_resource_type(APITerms.SCHEMA.value)
122134
.build_url_from_base()
135+
.with_headers(headers)
136+
.with_request_method('GET')
123137
.build())
124-
resp = requests.get(api_request.url, params=api_request.body, headers=api_request.headers)
125-
return resp.json()
138+
139+
return api_request.make_request()
126140

127141

128142
def update_control_stream_schema_by_id(server_addr: HttpUrl, control_stream_id: str, request_body: dict,
129-
api_root: str = APITerms.API.value):
143+
api_root: str = APITerms.API.value, headers: dict = None):
130144
"""
131145
Updates a control stream schema by its id
132146
:return:
@@ -139,6 +153,7 @@ def update_control_stream_schema_by_id(server_addr: HttpUrl, control_stream_id:
139153
.for_resource_type(APITerms.SCHEMA.value)
140154
.with_request_body(request_body)
141155
.build_url_from_base()
156+
.with_headers(headers)
157+
.with_request_method('PUT')
142158
.build())
143-
resp = requests.put(api_request.url, params=api_request.body, headers=api_request.headers)
144-
return resp.json()
159+
return api_request.make_request()

tests/test_script_full_suite.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import random
22

33
from conSys import Systems, SamplingFeatures, Datastreams, SmlJSONBody, GeoJSONBody, model_utils, \
4-
DatastreamBodyJSON, ObservationFormat, URI, Procedures, Geometry, Deployments
4+
DatastreamBodyJSON, ObservationFormat, URI, Procedures, Geometry, Deployments, ControlChannels
5+
from conSys.datamodels.control_streams import ControlStreamJSONSchema, SWEControlChannelSchema, JSONControlChannelSchema
56
from conSys.datamodels.datastreams import SWEDatastreamSchema
67
from conSys.datamodels.encoding import JSONEncoding
7-
from conSys.datamodels.swe_components import BooleanSchema, TimeSchema, DataRecordSchema
8+
from conSys.datamodels.swe_components import BooleanSchema, TimeSchema, DataRecordSchema, CountSchema
89

910
server_url = "http://localhost:8282/sensorhub"
1011
geo_json_headers = {"Content-Type": "application/geo+json"}
@@ -281,20 +282,37 @@ def test_create_datastreams():
281282
headers=json_headers)
282283
print(resp)
283284

285+
284286
"""
285287
Command and Control Channel Section
286288
"""
289+
290+
287291
def test_create_control_channel():
288-
geo_json_body = GeoJSONBody(type='Feature', id=str(random.randint(1000, 9999)),
289-
description="Test Insertion of Control Channel via GEOJSON",
290-
properties={
291-
"featureType": "http://www.w3.org/ns/ssn/ControlChannel",
292-
"name": f'Test Control Channel - GeoJSON',
293-
"uid": f'urn:test:client:geo-cc',
294-
"description": "A Test Control Channel inserted from the Python CSAPI Client",
295-
})
296-
resp = ControlChannels.create_new_control_channels(server_url, geo_json_body.model_dump_json(exclude_none=True, by_alias=True),
297-
headers=geo_json_headers)
292+
systems_list = Systems.list_all_systems(server_url)
293+
system_id = systems_list["items"][0]["id"]
294+
295+
time_schema = TimeSchema(label="Test Control Channel Time", definition="http://test.com/Time",
296+
uom=URI(href="http://test.com/TimeUOM"))
297+
count_schema = CountSchema(label="Test Control Channel Count", definition="http://test.com/Count")
298+
299+
control_schema = JSONControlChannelSchema(command_format=ObservationFormat.SWE_JSON.value,
300+
params_schema=DataRecordSchema(label="Test Control Channel Record",
301+
definition="http://test.com/Record",
302+
fields=[time_schema, count_schema]))
303+
request_body = ControlStreamJSONSchema(name="Test Control Channel", input_name="TestControlInput1",
304+
schema=control_schema)
305+
print(f'Request Body for Control Stream: {request_body.model_dump_json(exclude_none=True, by_alias=True)}')
306+
resp = ControlChannels.add_control_streams_to_system(server_url, system_id,
307+
request_body.model_dump_json(exclude_none=True,
308+
by_alias=True),
309+
headers=json_headers)
310+
print(resp)
311+
312+
313+
def test_list_control_streams():
314+
control_streams = ControlChannels.list_all_control_streams(server_url)
315+
print(control_streams)
298316

299317

300318
"""

0 commit comments

Comments
 (0)