Skip to content

Commit cc29015

Browse files
convert results to Observation objects
1 parent 91a1723 commit cc29015

6 files changed

Lines changed: 132 additions & 28 deletions

File tree

oshconnect/__init__.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import swecommondm as swe_common
1414
from conSys4Py import APIResourceTypes
1515
from conSys4Py.core.default_api_helpers import APIHelper
16+
from conSys4Py.datamodels.observations import ObservationOMJSONInline
1617

1718
from external_models.object_models import System as SystemResource
1819

@@ -24,11 +25,33 @@ class Endpoints:
2425
connected_systems: str = f"{root}/api"
2526

2627

27-
28-
2928
class TemporalModes(Enum):
3029
REAL_TIME = 0
3130
ARCHIVE = 1
3231
BATCH = 2
3332
RT_SYNC = 3
3433
ARCHIVE_SYNC = 4
34+
35+
36+
class Utilities:
37+
38+
@staticmethod
39+
def convert_auth_to_base64(username: str, password: str) -> str:
40+
return base64.b64encode(f"{username}:{password}".encode()).decode()
41+
42+
class StreamQueue:
43+
_queue: list[ObservationOMJSONInline]
44+
def __init__(self):
45+
self._queue = []
46+
47+
def push(self, item):
48+
self._queue.append(item)
49+
50+
def pop(self):
51+
return self._queue.pop(0)
52+
53+
def peek(self):
54+
return self._queue[0]
55+
56+
def is_empty(self):
57+
return len(self._queue) == 0

oshconnect/datamodels/datamodels.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
@dataclass(kw_only=True)
2222
class Node:
2323
_id: str
24+
protocol: str
2425
address: str
2526
port: int
2627
endpoints: Endpoints
@@ -29,15 +30,16 @@ class Node:
2930
_api_helper: APIHelper
3031
_system_ids: list[uuid] = field(default_factory=list)
3132

32-
def __init__(self, address: str, port: int, username: str = None, password: str = None, **kwargs: dict):
33+
def __init__(self, protocol: str, address: str, port: int, username: str = None, password: str = None, **kwargs: dict):
3334
self._id = f'node-{uuid.uuid4()}'
35+
self.protocol = protocol
3436
self.address = address
3537
self.port = port
3638
self.is_secure = username is not None and password is not None
3739
if self.is_secure:
3840
self.add_basicauth(username, password)
3941
self.endpoints = Endpoints()
40-
self._api_helper = APIHelper(server_url=f'{self.address}:{self.port}',
42+
self._api_helper = APIHelper(server_url=f'{self.protocol}://{self.address}:{self.port}',
4143
api_root=self.endpoints.connected_systems, username=username, password=password)
4244
if self.is_secure:
4345
self._api_helper.user_auth = True
@@ -106,16 +108,17 @@ def __init__(self, name: str, label: str, **kwargs):
106108
def update_parent_node(self, node: Node):
107109
self._parent_node = node
108110

111+
def get_parent_node(self) -> Node:
112+
return self._parent_node
113+
109114
def discover_datastreams(self):
110115
res = self._parent_node.get_api_helper().retrieve_resource(APIResourceTypes.DATASTREAM, req_headers={})
111116
datastream_json = res.json()['items']
112-
print(f'Result of datastream discovery: {datastream_json}')
113117
datastreams = []
114118
for ds in datastream_json:
115119
datastream_objs = DatastreamResource.model_validate(ds)
116120
datastreams.append(datastream_objs)
117121

118-
print(f'Found datastreams: {datastreams}')
119122
return datastreams
120123

121124
@staticmethod
@@ -132,10 +135,15 @@ def from_system_resource(system_resource: SystemResource):
132135

133136

134137
class Datastream:
138+
should_poll: bool
139+
_datastream_resource: DatastreamResource
135140

136-
def __init__(self):
141+
def __init__(self, datastream_resource: DatastreamResource):
137142
pass
138143

144+
def get_id(self):
145+
return self._datastream_resource.ds_id
146+
139147

140148
class ControlChannel:
141149

oshconnect/datasource/datasource.py

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,42 @@
33
# Author: Ian Patterson <ian@botts-inc.com>
44
#
55
# Contact Email: ian@botts-inc.com
6-
6+
import asyncio
77
from uuid import uuid4
88

9+
import websockets
10+
11+
from external_models.object_models import DatastreamResource
12+
from oshconnect import Utilities
13+
from oshconnect.datamodels.datamodels import System
914
from oshconnect.datasource import Mode
1015

1116

1217
class DataSource:
18+
"""
19+
DataSource: represents the active connection of a datastream object
20+
"""
1321

14-
def __init__(self, name: str, mode: Mode, properties: dict):
22+
def __init__(self, name: str, mode: str, properties: dict, datastream: DatastreamResource, parent_system: System):
23+
self._status = None
1524
self._id = f'datasource-{uuid4()}'
1625
self.name = name
1726
self.mode = mode
1827
self.properties = properties
28+
self._datastream = datastream
29+
self._websocket = None
30+
self._parent_system = parent_system
31+
self._url = None
32+
if mode == "websocket":
33+
self._url = (f'ws://{self._parent_system.get_parent_node().get_address()}:'
34+
f'{self._parent_system.get_parent_node().get_port()}'
35+
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
36+
f'/observations?f=application%2Fjson')
37+
self._auth = None
38+
self._extra_headers = None
39+
if self._parent_system.get_parent_node().is_secure:
40+
self._auth = self._parent_system.get_parent_node().get_decoded_auth()
41+
self._extra_headers = {'Authorization': f'Basic {self._auth}'}
1942

2043
def get_id(self) -> str:
2144
return self._id
@@ -40,20 +63,26 @@ def update_properties(self, properties: dict):
4063
def initialize(self):
4164
pass
4265

43-
def connect(self):
44-
pass
66+
async def connect(self):
67+
if self.mode == "websocket":
68+
self._websocket = await websockets.connect(self._url, extra_headers=self._extra_headers)
69+
self._status = "connected"
70+
return self._websocket
4571

46-
def disconnect(self):
47-
pass
4872

49-
def reset(self):
50-
pass
73+
def disconnect(self):
74+
pass
75+
76+
77+
def reset(self):
78+
pass
79+
5180

52-
def get_status(self):
53-
return self.status
81+
def get_status(self):
82+
return self.status
5483

5584

56-
class DatasourceHandler:
85+
class DataSourceHandler:
5786
datasource_map: dict[str, DataSource]
5887

5988
def __init__(self):
@@ -70,12 +99,13 @@ def initialize_all(self):
7099
# list comp is faster than for loop
71100
[ds.initialize() for ds in self.datasource_map.values()]
72101

73-
def connect_ds(self, datasource_id: str):
102+
async def connect_ds(self, datasource_id: str):
74103
ds = self.datasource_map.get(datasource_id)
75-
ds.connect()
104+
await ds.connect()
76105

77-
def connect_all(self):
78-
[ds.connect() for ds in self.datasource_map.values()]
106+
async def connect_all(self):
107+
results = await asyncio.gather(*(ds.connect() for ds in self.datasource_map.values()))
108+
return results
79109

80110
def disconnect_ds(self, datasource_id: str):
81111
ds = self.datasource_map.get(datasource_id)

oshconnect/oshconnect.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,39 @@
44
# Author: Ian Patterson
55
# Contact email: ian@botts-inc.com
66
# ==============================================================================
7+
import asyncio
8+
import json
79

810
from conSys4Py.core.default_api_helpers import APIHelper
11+
from conSys4Py.datamodels.observations import ObservationOMJSONInline
912

1013
from oshconnect.datamodels.datamodels import Node, System
11-
from oshconnect.datasource.datasource import DataSource
14+
from oshconnect.datasource.datasource import DataSource, DataSourceHandler
1215
from oshconnect.datastore.datastore import DataStore
1316
from oshconnect.styling.styling import Styling
1417
from oshconnect.timemanagement.timemanagement import TimeManagement
1518

1619

1720
class OSHConnect:
1821
_name: str = None
19-
datasource: DataSource = None
22+
# datasource: DataSource = None
2023
datastore: DataStore = None
2124
styling: Styling = None
2225
timestream: TimeManagement = None
2326
_nodes: list[Node] = []
2427
_systems: list[System] = []
2528
_cs_api_builder: APIHelper = None
29+
_datasource_handler: DataSourceHandler = None
2630
_datafeeds: list[DataSource] = []
2731
_datataskers: list[DataStore] = []
2832
_datagroups: list = []
33+
_tasks: list = []
2934

3035
def __init__(self, name: str, **kwargs):
3136
self._name = name
3237
if 'nodes' in kwargs:
3338
self._nodes = kwargs['nodes']
39+
self._datasource_handler = DataSourceHandler()
3440

3541
def get_name(self):
3642
return self._name
@@ -74,8 +80,18 @@ def select_temporal_mode(self, mode: str):
7480
"""
7581
pass
7682

77-
def playback_streams(self, streams: list):
78-
pass
83+
async def playback_streams(self, stream_ids: list = None):
84+
if stream_ids is None:
85+
clients = await self._datasource_handler.connect_all()
86+
for client in clients:
87+
task = asyncio.create_task(self._handle_datastream_client(client))
88+
self._tasks.append(task)
89+
else:
90+
for stream_id in stream_ids:
91+
clients = await self._datasource_handler.connect_ds(stream_id)
92+
for client in clients:
93+
msg = await client.recv()
94+
print(msg)
7995

8096
def visualize_streams(self, streams: list):
8197
pass
@@ -85,10 +101,15 @@ def get_visualization_recommendations(self, streams: list):
85101
pass
86102

87103
def discover_datastreams(self):
104+
# NOTE: This will need to check to prevent dupes in the future
88105
for system in self._systems:
89106
res_datastreams = system.discover_datastreams()
90-
print(f'Datastreams found: {res_datastreams}')
91-
self._datafeeds.extend(res_datastreams)
107+
# create DataSource(s)
108+
new_datasource = [
109+
DataSource(name=ds.name, mode="websocket", properties={}, datastream=ds, parent_system=system) for ds in
110+
res_datastreams]
111+
self._datafeeds.extend(new_datasource)
112+
list(map(self._datasource_handler.add_datasource, new_datasource))
92113

93114
def discover_systems(self, nodes: list[str] = None):
94115
search_nodes = self._nodes
@@ -107,3 +128,12 @@ def authenticate_user(self, user: dict):
107128

108129
def synchronize_streams(self, systems: list):
109130
pass
131+
132+
async def _handle_datastream_client(self, client):
133+
try:
134+
async for msg in client:
135+
msg_dict = json.loads(msg.decode('utf-8'))
136+
obs = ObservationOMJSONInline.model_validate(msg_dict)
137+
138+
except Exception as e:
139+
print(f"An error occurred while reading from websocket: {e}")

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ consys4py = { git = "https://github.com/Botts-Innovative-Research/CSAPI4Py.git",
1111
swecommondm = { git = "https://github.com/ChainReaction31/SWECommonDMPython.git", branch = "master" }
1212
pydantic = "^2.7.4"
1313
shapely = "^2.0.4"
14+
websockets = "^12.0"
1415

1516
[tool.poetry.dev-dependencies]
1617
pytest = "^8.2.2"

tests/test_oshconnect.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
# ==============================================================================
77

88
import pytest
9+
import requests
10+
import websockets
911

1012
from oshconnect.oshconnect import OSHConnect
1113
from oshconnect.datamodels.datamodels import Node, System
@@ -45,3 +47,13 @@ def test_oshconnect_find_datastreams(self):
4547
app.discover_datastreams()
4648
assert len(app._datafeeds) > 0
4749

50+
async def test_obs_ws_stream(self):
51+
ds_url = ("ws://localhost:8585/sensorhub/api/datastreams/e07n5sbjqvalm/observations?f=application%2Fjson"
52+
"&resultTime=latest/2025-06-18T15:46:32Z")
53+
54+
# stream = requests.get(ds_url, stream=True, auth=('admin', 'admin'))
55+
async with websockets.connect(ds_url, extra_headers={'Authorization': 'Basic YWRtaW46YWRtaW4='}) as stream:
56+
async for message in stream:
57+
print(message)
58+
59+

0 commit comments

Comments
 (0)