Skip to content

Commit 6c910c9

Browse files
- use MQTT as default communication protocol
- update some URl construction methods
1 parent 298c77c commit 6c910c9

8 files changed

Lines changed: 383 additions & 125 deletions

File tree

src/oshconnect/csapi4py/default_api_helpers.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from .constants import APIResourceTypes, EncodingSchema, APITerms
1717

1818

19+
# TODO: rework to make the first resource in the endpoint the primary key for URL construction, currently, the implementation is a bit on the confusing side with what is being generated and why.
20+
1921
def determine_parent_type(res_type: APIResourceTypes):
2022
match res_type:
2123
case APIResourceTypes.SYSTEM:
@@ -84,6 +86,7 @@ class APIHelper(ABC):
8486
server_url: str = None
8587
port: int = None
8688
protocol: str = "https"
89+
server_root: str = "sensorhub"
8790
api_root: str = "api"
8891
username: str = None
8992
password: str = None
@@ -178,52 +181,54 @@ def delete_resource(self, res_type: APIResourceTypes, res_id: str, parent_res_id
178181
return api_request.make_request()
179182

180183
# Helpers
181-
def resource_url_resolver(self, res_type: APIResourceTypes, res_id: str = None, parent_res_id: str = None,
184+
def resource_url_resolver(self, subresource_type: APIResourceTypes, subresource_id: str = None,
185+
resource_id: str = None,
182186
from_collection: bool = False):
183187
"""
184188
Helper to generate a URL endpoint for a given resource type and id by matching the resource type to an
185189
appropriate parent endpoint and inserting the resource ids as necessary.
186-
:param res_type:
187-
:param res_id:
188-
:param parent_res_id:
190+
:param subresource_type:
191+
:param subresource_id:
192+
:param resource_id:
189193
:param from_collection:
190194
:return:
191195
"""
192-
if res_type is None:
196+
if subresource_type is None:
193197
raise ValueError('Resource type must contain a valid APIResourceType')
194-
if res_type is APIResourceTypes.COLLECTION and from_collection:
198+
if subresource_type is APIResourceTypes.COLLECTION and from_collection:
195199
raise ValueError('Collections are not sub-resources of other collections')
196200

197201
parent_type = None
198-
if parent_res_id and not from_collection:
199-
parent_type = determine_parent_type(res_type)
200-
elif parent_res_id and from_collection:
202+
if resource_id and not from_collection:
203+
parent_type = determine_parent_type(subresource_type)
204+
elif resource_id and from_collection:
201205
parent_type = APIResourceTypes.COLLECTION
202206

203-
return self.construct_url(parent_type, res_id, res_type, parent_res_id)
207+
return self.construct_url(parent_type, subresource_id, subresource_type, resource_id)
204208

205-
def construct_url(self, parent_type, res_id, res_type, parent_res_id, for_socket: bool = False):
209+
def construct_url(self, resource_type: APIResourceTypes, subresource_id, subresource_type, resource_id,
210+
for_socket: bool = False):
206211
"""
207212
Constructs an API endpoint url from the given parameters
208-
:param parent_type:
209-
:param res_id:
210-
:param res_type:
211-
:param parent_res_id:
213+
:param resource_type:
214+
:param subresource_id:
215+
:param subresource_type:
216+
:param resource_id:
212217
:param for_socket: If true, will construct a WebSocket URL (ws:// or wss://) instead of HTTP/HTTPS.
213218
:return:
214219
"""
215220
# TODO: Test for less common cases to ensure that the URL is being constructed correctly
216221
base_url = self.get_api_root_url(socket=for_socket)
217222

218-
resource_endpoint = resource_type_to_endpoint(res_type, parent_type)
223+
resource_endpoint = resource_type_to_endpoint(subresource_type, resource_type)
219224
url = f'{base_url}/{resource_endpoint}'
220225

221-
if parent_type:
222-
parent_endpoint = resource_type_to_endpoint(parent_type)
223-
url = f'{base_url}/{parent_endpoint}/{parent_res_id}/{resource_endpoint}'
226+
if resource_type:
227+
parent_endpoint = resource_type_to_endpoint(resource_type)
228+
url = f'{base_url}/{parent_endpoint}/{resource_id}/{resource_endpoint}'
224229

225-
if res_id:
226-
url = f'{url}/{res_id}'
230+
if subresource_id:
231+
url = f'{url}/{subresource_id}'
227232

228233
return url
229234

@@ -244,13 +249,27 @@ def get_api_root_url(self, socket: bool = False):
244249
:param socket: If true, will return a WebSocket URL (ws:// or wss://) instead of HTTP/HTTPS.
245250
:return:
246251
"""
247-
return f'{self.get_base_url(socket=socket)}/{self.api_root}'
252+
return f'{self.get_base_url(socket=socket)}/{self.server_root}/{self.api_root}'
248253

249254
def set_protocol(self, protocol: str):
250255
if protocol not in ['http', 'https', 'ws', 'wss']:
251256
raise ValueError('Protocol must be either "http" or "https"')
252257
self.protocol = protocol
253258

259+
def get_mqtt_topic(self, resource_type, subresource_type, resource_id: str,
260+
for_socket: bool = False):
261+
"""
262+
Returns the MQTT topic for the resource type, if applicable.
263+
:return:
264+
"""
265+
resource_endpoint = f'/{resource_type_to_endpoint(subresource_type, resource_type)}'
266+
parent_endpoint = "" if resource_type is None else f'/{resource_type_to_endpoint(resource_type)}'
267+
parent_id = "" if resource_id is None else f'/{resource_id}'
268+
topic_locator = f'/{self.api_root}{parent_endpoint}{parent_id}{resource_endpoint}'
269+
print(f'MQTT Topic: {topic_locator}')
270+
271+
return topic_locator
272+
254273

255274
@dataclass(kw_only=True)
256275
class ResponseParserHelper:

src/oshconnect/csapi4py/mqtt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def start(self):
173173

174174
def stop(self):
175175
"""
176-
Stop the MQTT client.\
176+
Stop the MQTT client.
177177
178178
:return:
179179
"""

src/oshconnect/encoding.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from pydantic import BaseModel, Field
1+
from pydantic import BaseModel, Field, ConfigDict
22

33

44
class Encoding(BaseModel):
5+
model_config = ConfigDict(populate_by_name=True)
56
id: str = Field(None)
67
type: str = Field(...)
78
vector_as_arrays: bool = Field(False, alias='vectorAsArrays')

src/oshconnect/eventbus.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# =============================================================================
2+
# Copyright (c) 2025 Botts Innovative Research Inc.
3+
# Date: 2025/10/6
4+
# Author: Ian Patterson
5+
# Contact Email: ian@botts-inc.com
6+
# =============================================================================
7+
import collections
8+
from abc import ABC
9+
10+
11+
class EventBus(ABC):
12+
"""
13+
A base class for an event bus system.
14+
"""
15+
_deque: collections.deque

src/oshconnect/oshconnectapi.py

Lines changed: 50 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
# ==============================================================================
77
import logging
88
import shelve
9+
from uuid import UUID
910

1011
from .csapi4py.default_api_helpers import APIHelper
11-
from .resource_datamodels import DatastreamResource
12-
from .datasource import DataStream, MessageWrapper
12+
from .datasource import MessageWrapper
1313
from .datastore import DataStore
14+
from .resource_datamodels import DatastreamResource
1415
from .streamableresource import Node, System, SessionManager, Datastream
1516
from .styling import Styling
1617
from .timemanagement import TemporalModes, TimeManagement, TimePeriod
@@ -25,7 +26,7 @@ class OSHConnect:
2526
_systems: list[System] = []
2627
_cs_api_builder: APIHelper = None
2728
# _datasource_handler: DataStreamHandler = None
28-
_datastreams: list[DataStream] = []
29+
_datastreams: list[Datastream] = []
2930
_datataskers: list[DataStore] = []
3031
_datagroups: list = []
3132
_tasks: list = []
@@ -38,15 +39,6 @@ def __init__(self, name: str, **kwargs):
3839
:param kwargs:
3940
"""
4041
self._name = name
41-
# if 'nodes' in kwargs:
42-
# self._nodes = kwargs['nodes']
43-
# self._playback_mode = kwargs['playback_mode']
44-
# self._datasource_handler.set_playback_mode(self._playback_mode)
45-
# self._datasource_handler = DataStreamHandler()
46-
# if 'playback_mode' in kwargs:
47-
# self._playback_mode = kwargs['playback_mode']
48-
# self._datasource_handler.set_playback_mode(self._playback_mode)
49-
5042
logging.info(f"OSHConnect instance {name} created")
5143
self._session_manager = SessionManager()
5244

@@ -116,48 +108,20 @@ def select_temporal_mode(self, mode: str):
116108
"""
117109
pass
118110

119-
async def playback_streams(self, stream_ids: list = None):
120-
"""
121-
Begins playback of the datastreams that have been connected to the app. The method of playback is determined
122-
by the temporal mode that has been set.
123-
:param stream_ids:
124-
:return:
125-
"""
126-
if stream_ids is None:
127-
await self._datasource_handler.connect_all(
128-
self.timestream.get_time_range())
129-
else:
130-
for stream_id in stream_ids:
131-
await self._datasource_handler.connect_ds(stream_id)
132-
133111
def visualize_streams(self, streams: list):
134112
pass
135113

136114
# Second Level Use Cases
137115
def get_visualization_recommendations(self, streams: list):
138116
pass
139117

140-
def dep_discover_datastreams(self):
141-
"""
142-
Discover datastreams of the current systems of the OSHConnect instance and create objects for them that are
143-
stored in the DataSourceHandler.
144-
:return:
145-
"""
146-
# NOTE: This will need to check to prevent dupes in the future
147-
for system in self._systems:
148-
res_datastreams = system.discover_datastreams()
149-
# create DataSource(s)
150-
new_datasource = [
151-
DataStream(name=ds.name, datastream=ds, parent_system=system.get_system_resource())
152-
for ds in
153-
res_datastreams]
154-
self._datastreams.extend(new_datasource)
155-
list(map(self._datasource_handler.add_datasource, new_datasource))
156-
157118
def discover_datastreams(self):
158119
for system in self._systems:
159120
res_datastreams = system.discover_datastreams()
160-
datastreams = list(map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds), res_datastreams))
121+
datastreams = list(
122+
map(lambda ds: Datastream(parent_node=system.get_parent_node(), id=ds.ds_id, datastream_resource=ds),
123+
res_datastreams))
124+
datastreams = [ds.set_parent_resource_id(system.get_underlying_resource().system_id) for ds in datastreams]
161125
self._datastreams.extend(datastreams)
162126

163127
def discover_systems(self, nodes: list[str] = None):
@@ -279,13 +243,54 @@ def remove_system(self, system_id: str):
279243
pass
280244

281245
# DataStream Helpers
282-
def get_datastreams(self) -> list[DataStream]:
246+
def get_datastreams(self) -> list[Datastream]:
283247
return self._datastreams
284248

249+
def get_datastream_ids(self) -> list[UUID]:
250+
return [ds.get_internal_id() for ds in self._datastreams]
251+
285252
def connect_session_streams(self, session_id: str):
286253
"""
287254
Connects all datastreams that are associated with the given session ID.
288255
:param session_id:
289256
:return:
290257
"""
291258
self._session_manager.start_session_streams(session_id)
259+
260+
def get_resource_group(self, resource_ids: list[UUID]) -> tuple[list[System], list[Datastream]]:
261+
"""
262+
Get a group of resources by their IDs. Can be any mix of systems, datastreams, and controlstreams.
263+
:param resource_ids: list of resource IDs (internal UUID)
264+
"""
265+
systems = [system for system in self._systems if system.get_internal_id() in resource_ids]
266+
datastreams = [ds for ds in self._datastreams if ds.get_internal_id() in resource_ids]
267+
return systems, datastreams
268+
269+
def initialize_resource_groups(self, resource_ids: list = None):
270+
"""
271+
Initializes the datastreams that are specified.
272+
"""
273+
systems, datastreams = self.get_resource_group(resource_ids)
274+
275+
if systems:
276+
for system in systems:
277+
system.initialize()
278+
if datastreams:
279+
for ds in datastreams:
280+
ds.initialize()
281+
282+
def start_datastreams(self, dsid_list: list = None):
283+
"""
284+
Starts the datastreams that are specified.
285+
"""
286+
datastreams = self.get_resource_group(dsid_list)[1]
287+
for ds in datastreams:
288+
ds.start()
289+
290+
def start_systems(self, sysid_list: list = None):
291+
"""
292+
Starts the systems that are specified.
293+
"""
294+
systems = self.get_resource_group(sysid_list)[0]
295+
for system in systems:
296+
system.start()

src/oshconnect/resource_datamodels.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
class BoundingBox(BaseModel):
20-
model_config = ConfigDict(arbitrary_types_allowed=True)
20+
model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)
2121

2222
lower_left_corner: Point = Field(..., description="The lower left corner of the bounding box.")
2323
upper_right_corner: Point = Field(..., description="The upper right corner of the bounding box.")
@@ -88,7 +88,7 @@ class ProcessMethod:
8888

8989

9090
class BaseResource(BaseModel):
91-
model_config = ConfigDict(arbitrary_types_allowed=True)
91+
model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)
9292

9393
id: str = Field(..., alias="id")
9494
name: str = Field(...)
@@ -98,7 +98,7 @@ class BaseResource(BaseModel):
9898

9999

100100
class SystemResource(BaseModel):
101-
model_config = ConfigDict(arbitrary_types_allowed=True)
101+
model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)
102102

103103
feature_type: str = Field(None, alias="type")
104104
system_id: str = Field(None, alias="id")
@@ -139,7 +139,7 @@ class DatastreamResource(BaseModel):
139139
that, depending on the format of the request, the fields needed may differ. There may be derived models in a later
140140
release that will have different sets of required fields to ease the validation process for users.
141141
"""
142-
# model_config = ConfigDict(populate_by_name=True)
142+
model_config = ConfigDict(populate_by_name=True)
143143

144144
ds_id: str = Field(..., alias="id")
145145
name: str = Field(...)
@@ -176,6 +176,7 @@ def handle_aliases(cls, values):
176176

177177

178178
class ObservationResource(BaseModel):
179+
model_config = ConfigDict(populate_by_name=True)
179180
sampling_feature_id: str = Field(None, alias="samplingFeature@Id")
180181
procedure_link: Link = Field(None, alias="procedure@link")
181182
phenomenon_time: DateTimeSchema = Field(None, alias="phenomenonTime")
@@ -186,6 +187,7 @@ class ObservationResource(BaseModel):
186187

187188

188189
class ControlStreamResource(BaseModel):
190+
model_config = ConfigDict(populate_by_name=True)
189191
name: str = Field(...)
190192
description: str = Field(None)
191193
valid_time: TimePeriod = Field(..., alias="validTime")

0 commit comments

Comments
 (0)