44#
55# Contact Email: ian@botts-inc.com
66from __future__ import annotations
7+
78import asyncio
89import json
910from uuid import uuid4
1011
12+ import requests
1113import websockets
14+ from conSys4Py import APIResourceTypes
1215from conSys4Py .datamodels .observations import ObservationOMJSONInline
1316
1417from external_models import TimePeriod
1518from external_models .object_models import DatastreamResource
16- from oshconnect import Utilities
17- from oshconnect .datamodels .datamodels import System
18- from oshconnect .datasource import Mode
1919from oshconnect import TemporalModes
20+ from oshconnect .datamodels .datamodels import System
2021
2122
2223class DataSource :
@@ -55,10 +56,6 @@ def terminate_process(self):
5556 def subscribe (self ):
5657 pass
5758
58- def update_properties (self , properties : dict ):
59- # TODO: need to stop in progress sub-processes and restart
60- self .properties = properties
61-
6259 def set_mode (self , mode : TemporalModes ):
6360 self ._playback_mode = mode
6461 self .generate_url ()
@@ -75,11 +72,13 @@ async def connect(self):
7572 self ._status = "connected"
7673 return self ._websocket
7774 elif self ._playback_mode == TemporalModes .ARCHIVE :
75+ self ._websocket = await websockets .connect (self ._url , extra_headers = self ._extra_headers )
7876 self ._status = "connected"
79- return "Playback mode is not yet implemented."
77+ return self . _websocket
8078 elif self ._playback_mode == TemporalModes .BATCH :
79+ self ._websocket = await websockets .connect (self ._url , extra_headers = self ._extra_headers )
8180 self ._status = "connected"
82- return "Live-batch mode is not yet implemented."
81+ return self . _websocket
8382
8483 def disconnect (self ):
8584 self ._websocket .close ()
@@ -91,13 +90,17 @@ def reset(self):
9190 def get_status (self ):
9291 return self ._status
9392
93+ def get_parent_system (self ):
94+ return self ._parent_system
95+
9496 def get_ws_client (self ):
9597 return self ._websocket
9698
9799 def is_within_timeperiod (self , timeperiod : TimePeriod ):
98100 return timeperiod .does_timeperiod_overlap (self ._datastream .valid_time )
99101
100102 def generate_url (self ):
103+ # TODO: need to specify secure vs insecure protocols
101104 if self ._playback_mode == TemporalModes .REAL_TIME :
102105 self ._url = (f'ws://{ self ._parent_system .get_parent_node ().get_address ()} :'
103106 f'{ self ._parent_system .get_parent_node ().get_port ()} '
@@ -109,6 +112,13 @@ def generate_url(self):
109112 f'/sensorhub/api/datastreams/{ self ._datastream .ds_id } '
110113 f'/observations?f=application%2Fjson&resultTime={ self ._datastream .valid_time .start } /'
111114 f'{ self ._datastream .valid_time .end } ' )
115+ elif self ._playback_mode == TemporalModes .BATCH :
116+ # TODO: need to allow for batch counts selection through DS Handler or TimeManager
117+ self ._url = (f'wss://{ self ._parent_system .get_parent_node ().get_address ()} :'
118+ f'{ self ._parent_system .get_parent_node ().get_port ()} '
119+ f'/sensorhub/api/datastreams/{ self ._datastream .ds_id } '
120+ f'/observations?f=application%2Fjson&resultTime={ self ._datastream .valid_time .start } /'
121+ f'{ self ._datastream .valid_time .end } ' )
112122 else :
113123 raise ValueError ("Playback mode not set. Cannot generate URL for DataSource." )
114124
@@ -160,10 +170,15 @@ async def connect_all(self, timeperiod: TimePeriod):
160170 else :
161171 ds_matches = self .datasource_map .values ()
162172
163- [(ds , await ds .connect ()) for ds in ds_matches ]
164- for ds in ds_matches :
165- task = asyncio .create_task (self ._handle_datastream_client (ds ))
166- # return task
173+ if self ._playback_mode == TemporalModes .REAL_TIME :
174+ [(ds , await ds .connect ()) for ds in ds_matches ]
175+ for ds in ds_matches :
176+ task = asyncio .create_task (self ._handle_datastream_client (ds ))
177+ elif self ._playback_mode == TemporalModes .ARCHIVE :
178+ pass
179+ elif self ._playback_mode == TemporalModes .BATCH :
180+ for ds in ds_matches :
181+ task = asyncio .create_task (self .handle_http_batching (ds ))
167182
168183 def disconnect_ds (self , datasource_id : str ):
169184 ds = self .datasource_map .get (datasource_id )
@@ -183,6 +198,33 @@ async def _handle_datastream_client(self, datasource: DataSource):
183198 except Exception as e :
184199 print (f"An error occurred while reading from websocket: { e } " )
185200
201+ async def handle_http_batching (self , datasource : DataSource , offset : int = None , query_params : dict = None ,
202+ next_link : str = None ):
203+ # access api_helper
204+ api_helper = datasource .get_parent_system ().get_parent_node ().get_api_helper ()
205+ # needs to create a new call to make a request to the server if there is a link to a next page
206+ resp = None
207+ if next_link is None :
208+ resp = api_helper .retrieve_resource (APIResourceTypes .OBSERVATION ,
209+ parent_res_id = datasource ._datastream .ds_id ,
210+ req_headers = {'Content-Type' : 'application/json' })
211+ elif next_link is not None :
212+ resp = requests .get (next_link , auth = (datasource ._parent_system .get_parent_node ()._api_helper .username ,
213+ datasource ._parent_system .get_parent_node ()._api_helper .password ))
214+ results = resp .json ()
215+ if 'links' in results :
216+ for link in results ['links' ]:
217+ if link ['rel' ] == 'next' :
218+ new_offset = link ['href' ].split ('=' )[- 1 ]
219+ asyncio .create_task (self .handle_http_batching (datasource , next_link = link ['href' ]))
220+
221+ # print(results)
222+ for obs in results ['items' ]:
223+ obs_obj = ObservationOMJSONInline .model_validate (obs )
224+ msg_wrapper = MessageWrapper (datasource = datasource , message = obs_obj )
225+ self ._message_list .add_message (msg_wrapper )
226+ return resp .json ()
227+
186228
187229class MessageHandler :
188230 _message_list : list [MessageWrapper ]
0 commit comments