Skip to content

Commit 8b99325

Browse files
Add playback mode to OShConnect, DatasourceHandler, and DataSource
1 parent 0609d0c commit 8b99325

4 files changed

Lines changed: 79 additions & 54 deletions

File tree

external_models/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
# Author: Ian Patterson
55
# Contact Email: ian@botts-inc.com
66
# ==============================================================================
7+
from __future__ import annotations
8+
79
from typing import Any
810

911
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
10-
from pydantic._internal import _repr
1112
from shapely import Point
1213
from typing_extensions import Self
1314

@@ -62,6 +63,12 @@ def valid_time_period(cls, data) -> Any:
6263
def __repr__(self):
6364
return f'{[self.start, self.end]}'
6465

66+
def does_timeperiod_overlap(self, checked_timeperiod: TimePeriod) -> bool:
67+
if checked_timeperiod.start < self.end and checked_timeperiod.end > self.start:
68+
return True
69+
else:
70+
return False
71+
6572

6673

6774
class SecurityConstraints:

oshconnect/__init__.py

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,8 @@
66
# ==============================================================================
77

88
import base64
9-
import uuid
10-
from abc import ABC
11-
from dataclasses import dataclass, field
9+
from dataclasses import dataclass
1210
from enum import Enum
13-
import swecommondm as swe_common
14-
from conSys4Py import APIResourceTypes
15-
from conSys4Py.core.default_api_helpers import APIHelper
16-
from conSys4Py.datamodels.observations import ObservationOMJSONInline
17-
18-
from external_models.object_models import System as SystemResource
1911

2012

2113
@dataclass(kw_only=True)
@@ -26,32 +18,15 @@ class Endpoints:
2618

2719

2820
class TemporalModes(Enum):
29-
REAL_TIME = 0
30-
ARCHIVE = 1
31-
BATCH = 2
32-
RT_SYNC = 3
33-
ARCHIVE_SYNC = 4
21+
REAL_TIME = "realtime"
22+
ARCHIVE = "archive"
23+
BATCH = "batch"
24+
RT_SYNC = "realtimesync"
25+
ARCHIVE_SYNC = "archivesync"
3426

3527

3628
class Utilities:
3729

3830
@staticmethod
3931
def convert_auth_to_base64(username: str, password: str) -> str:
4032
return base64.b64encode(f"{username}:{password}".encode()).decode()
41-
42-
class StreamQueue:
43-
_queue: list[ObservationOMJSONInline]
44-
def __init__(self):
45-
self._queue = []
46-
47-
def push(self, item):
48-
self._queue.append(item)
49-
50-
def pop(self):
51-
return self._queue.pop(0)
52-
53-
def peek(self):
54-
return self._queue[0]
55-
56-
def is_empty(self):
57-
return len(self._queue) == 0

oshconnect/datasource/datasource.py

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,28 @@
1111
import websockets
1212
from conSys4Py.datamodels.observations import ObservationOMJSONInline
1313

14+
from external_models import TimePeriod
1415
from external_models.object_models import DatastreamResource
1516
from oshconnect import Utilities
1617
from oshconnect.datamodels.datamodels import System
1718
from oshconnect.datasource import Mode
19+
from oshconnect import TemporalModes
1820

1921

2022
class DataSource:
2123
"""
2224
DataSource: represents the active connection of a datastream object
2325
"""
2426

25-
def __init__(self, name: str, mode: str, properties: dict, datastream: DatastreamResource, parent_system: System):
27+
def __init__(self, name: str, datastream: DatastreamResource, parent_system: System):
2628
self._status = None
2729
self._id = f'datasource-{uuid4()}'
2830
self.name = name
29-
self.mode = mode
30-
self.properties = properties
3131
self._datastream = datastream
3232
self._websocket = None
3333
self._parent_system = parent_system
34+
self._playback_mode = None
3435
self._url = None
35-
if mode == "websocket":
36-
self._url = (f'ws://{self._parent_system.get_parent_node().get_address()}:'
37-
f'{self._parent_system.get_parent_node().get_port()}'
38-
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
39-
f'/observations?f=application%2Fjson')
4036
self._auth = None
4137
self._extra_headers = None
4238
if self._parent_system.get_parent_node().is_secure:
@@ -63,8 +59,9 @@ def update_properties(self, properties: dict):
6359
# TODO: need to stop in progress sub-processes and restart
6460
self.properties = properties
6561

66-
def set_mode(self, mode: str):
67-
self.mode = mode
62+
def set_mode(self, mode: TemporalModes):
63+
self._playback_mode = mode
64+
self.generate_url()
6865

6966
def initialize(self):
7067
if self._websocket.is_open():
@@ -73,14 +70,14 @@ def initialize(self):
7370
self._status = "initialized"
7471

7572
async def connect(self):
76-
if self.mode == "websocket":
73+
if self._playback_mode == TemporalModes.REAL_TIME:
7774
self._websocket = await websockets.connect(self._url, extra_headers=self._extra_headers)
7875
self._status = "connected"
7976
return self._websocket
80-
elif self.mode == "playback":
77+
elif self._playback_mode == TemporalModes.ARCHIVE:
8178
self._status = "connected"
8279
return "Playback mode is not yet implemented."
83-
elif self.mode == "live-batch":
80+
elif self._playback_mode == TemporalModes.BATCH:
8481
self._status = "connected"
8582
return "Live-batch mode is not yet implemented."
8683

@@ -97,16 +94,40 @@ def get_status(self):
9794
def get_ws_client(self):
9895
return self._websocket
9996

97+
def is_within_timeperiod(self, timeperiod: TimePeriod):
98+
return timeperiod.does_timeperiod_overlap(self._datastream.valid_time)
99+
100+
def generate_url(self):
101+
if self._playback_mode == TemporalModes.REAL_TIME:
102+
self._url = (f'ws://{self._parent_system.get_parent_node().get_address()}:'
103+
f'{self._parent_system.get_parent_node().get_port()}'
104+
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
105+
f'/observations?f=application%2Fjson')
106+
elif self._playback_mode == TemporalModes.ARCHIVE:
107+
self._url = (f'ws://{self._parent_system.get_parent_node().get_address()}:'
108+
f'{self._parent_system.get_parent_node().get_port()}'
109+
f'/sensorhub/api/datastreams/{self._datastream.ds_id}'
110+
f'/observations?f=application%2Fjson&resultTime={self._datastream.valid_time.start}/'
111+
f'{self._datastream.valid_time.end}')
112+
else:
113+
raise ValueError("Playback mode not set. Cannot generate URL for DataSource.")
114+
100115

101116
class DataSourceHandler:
102117
datasource_map: dict[str, DataSource]
103118
_message_list: MessageHandler
119+
_playback_mode: TemporalModes
104120

105-
def __init__(self):
121+
def __init__(self, playback_mode: TemporalModes = TemporalModes.REAL_TIME):
106122
self.datasource_map = {}
107123
self._message_list = MessageHandler()
124+
self._playback_mode = playback_mode
125+
126+
def set_playback_mode(self, mode: TemporalModes):
127+
self._playback_mode = mode
108128

109129
def add_datasource(self, datasource: DataSource):
130+
datasource.set_mode(self._playback_mode)
110131
self.datasource_map[datasource.get_id()] = datasource
111132

112133
def remove_datasource(self, datasource_id: str):
@@ -120,17 +141,27 @@ def initialize_all(self):
120141
# list comp is faster than for loop
121142
[ds.initialize() for ds in self.datasource_map.values()]
122143

123-
def set_ds_mode(self, mode: str):
124-
var = (ds.set_mode(mode) for ds in self.datasource_map.values())
144+
def set_ds_mode(self):
145+
var = (ds.set_mode(self._playback_mode) for ds in self.datasource_map.values())
125146

126147
async def connect_ds(self, datasource_id: str):
127148
ds = self.datasource_map.get(datasource_id)
128149
await ds.connect()
129150

130-
async def connect_all(self):
131-
# call connect for all datasources
132-
[(ds, await ds.connect()) for ds in self.datasource_map.values()]
133-
for ds in self.datasource_map.values():
151+
async def connect_all(self, timeperiod: TimePeriod):
152+
"""
153+
Connects all datasources, optionally within a provided TimePeriod
154+
:param timeperiod:
155+
:return:
156+
"""
157+
# search for datasources that fall within the timeperiod
158+
if timeperiod is not None:
159+
ds_matches = [ds for ds in self.datasource_map.values() if ds.is_within_timeperiod(timeperiod)]
160+
else:
161+
ds_matches = self.datasource_map.values()
162+
163+
[(ds, await ds.connect()) for ds in ds_matches]
164+
for ds in ds_matches:
134165
task = asyncio.create_task(self._handle_datastream_client(ds))
135166
# return task
136167

oshconnect/oshconnect.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from conSys4Py.core.default_api_helpers import APIHelper
1111
from conSys4Py.datamodels.observations import ObservationOMJSONInline
1212

13+
from oshconnect import TemporalModes
1314
from oshconnect.datamodels.datamodels import Node, System
1415
from oshconnect.datasource.datasource import DataSource, DataSourceHandler
1516
from oshconnect.datastore.datastore import DataStore
@@ -31,12 +32,20 @@ class OSHConnect:
3132
_datataskers: list[DataStore] = []
3233
_datagroups: list = []
3334
_tasks: list = []
35+
_playback_mode: TemporalModes = TemporalModes.REAL_TIME
3436

3537
def __init__(self, name: str, **kwargs):
38+
"""
39+
:param name: name of the OSHConnect instance, in the event that
40+
:param kwargs:
41+
- 'playback_mode': TemporalModes
42+
"""
3643
self._name = name
3744
if 'nodes' in kwargs:
3845
self._nodes = kwargs['nodes']
3946
self._datasource_handler = DataSourceHandler()
47+
if 'playback_mode' in kwargs:
48+
self._playback_mode = kwargs['playback_mode']
4049

4150
def get_name(self):
4251
return self._name
@@ -82,7 +91,7 @@ def select_temporal_mode(self, mode: str):
8291

8392
async def playback_streams(self, stream_ids: list = None):
8493
if stream_ids is None:
85-
await self._datasource_handler.connect_all()
94+
await self._datasource_handler.connect_all(None)
8695
else:
8796
for stream_id in stream_ids:
8897
await self._datasource_handler.connect_ds(stream_id)
@@ -100,7 +109,7 @@ def discover_datastreams(self):
100109
res_datastreams = system.discover_datastreams()
101110
# create DataSource(s)
102111
new_datasource = [
103-
DataSource(name=ds.name, mode="websocket", properties={}, datastream=ds, parent_system=system) for ds in
112+
DataSource(name=ds.name, datastream=ds, parent_system=system) for ds in
104113
res_datastreams]
105114
self._datafeeds.extend(new_datasource)
106115
list(map(self._datasource_handler.add_datasource, new_datasource))
@@ -122,3 +131,6 @@ def authenticate_user(self, user: dict):
122131

123132
def synchronize_streams(self, systems: list):
124133
pass
134+
135+
def set_playback_mode(self, mode: TemporalModes):
136+
self._datasource_handler.set_playback_mode(mode)

0 commit comments

Comments
 (0)