Skip to content

Commit dfda98f

Browse files
update test scripts for commands, awaiting checks for MQTT subs on osh-core for further testing
1 parent 19a8191 commit dfda98f

5 files changed

Lines changed: 403 additions & 1 deletion

File tree

conSys/comm/__init__.py

Whitespace-only changes.

conSys/comm/mqtt.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import paho.mqtt.client as mqtt
2+
3+
4+
class MQTTClient:
5+
def __init__(self, url, port=1883, username=None, password=None, path='mqtt', client_id="", transport='tcp'):
6+
"""
7+
Wraps a paho mqtt client to provide a simple interface for interacting with the mqtt server that is customized
8+
for this library.
9+
10+
:param url: url of the mqtt server
11+
:param port: port the mqtt server is communicating over, default is 1883 or whichever port the main node is
12+
using if in websocket mode
13+
:param username: used if node is requiring authentication to access this service
14+
:param password: used if node is requiring authentication to access this service
15+
:param path: used for setting the path when using websockets (usually sensorhub/mqtt by default)
16+
"""
17+
self.__url = url
18+
self.__port = port
19+
self.__path = path
20+
self.__client_id = client_id
21+
self.__transport = transport
22+
23+
self.__client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
24+
25+
if self.__transport == 'websockets':
26+
self.__client.ws_set_options(path=self.__path)
27+
28+
if username is not None and password is not None:
29+
self.__client.username_pw_set(username, password)
30+
self.__client.tls_set()
31+
32+
self.__client.on_connect = self.on_connect
33+
self.__client.on_subscribe = self.on_subscribe
34+
self.__client.on_message = self.on_message
35+
self.__client.on_publish = self.on_publish
36+
self.__client.on_log = self.on_log
37+
self.__client.on_disconnect = self.on_disconnect
38+
39+
self.__is_connected = False
40+
41+
@staticmethod
42+
def on_connect(client, userdata, flags, rc, properties):
43+
print(f'Connected with result code: {rc}')
44+
print(f'{properties}')
45+
46+
@staticmethod
47+
def on_subscribe(client, userdata, mid, granted_qos, properties):
48+
print(f'Subscribed: {mid} {granted_qos}')
49+
50+
@staticmethod
51+
def on_message(client, userdata, msg):
52+
print(f'{msg.payload.decode("utf-8")}')
53+
54+
@staticmethod
55+
def on_publish(client, userdata, mid):
56+
print(f'Published: {mid}')
57+
58+
@staticmethod
59+
def on_log(client, userdata, level, buf):
60+
print(f'Log: {buf}')
61+
62+
@staticmethod
63+
def on_disconnect(client, userdata, dc_flag, rc, properties):
64+
print(f'Disconnected: {rc}')
65+
66+
def connect(self):
67+
print(f'Connecting to {self.__url}:{self.__port}')
68+
self.__client.connect(self.__url, self.__port)
69+
70+
def subscribe(self, topic, qos=0, msg_callback=None):
71+
"""
72+
Subscribe to a topic, and optionally set a callback for when a message is received on that topic. To actually
73+
retrieve any information you must set a callback.
74+
75+
:param topic: MQTT topic to subscribe to (example/topic)
76+
:param qos: quality of service, 0, 1, or 2
77+
:param msg_callback: callback with the form: callback(client, userdata, msg)
78+
:return:
79+
"""
80+
self.__client.subscribe(topic, qos)
81+
if msg_callback is not None:
82+
self.__client.message_callback_add(topic, msg_callback)
83+
84+
def publish(self, topic, payload=None, qos=0, retain=False):
85+
self.__client.publish(topic, payload, qos, retain)
86+
87+
def disconnect(self):
88+
self.__client.disconnect()
89+
90+
def set_on_connect(self, on_connect):
91+
"""
92+
Set the on_connect callback for the MQTT client.
93+
94+
:param on_connect:
95+
:return:
96+
"""
97+
self.__client.on_connect = on_connect
98+
99+
def set_on_disconnect(self, on_disconnect):
100+
"""
101+
Set the on_disconnect callback for the MQTT client.
102+
103+
:param on_disconnect:
104+
:return:
105+
"""
106+
self.__client.on_disconnect = on_disconnect
107+
108+
def set_on_subscribe(self, on_subscribe):
109+
"""
110+
Set the on_subscribe callback for the MQTT client.
111+
112+
:param on_subscribe:
113+
:return:
114+
"""
115+
self.__client.on_subscribe = on_subscribe
116+
117+
def set_on_unsubscribe(self, on_unsubscribe):
118+
"""
119+
Set the on_unsubscribe callback for the MQTT client.
120+
121+
:param on_unsubscribe:
122+
:return:
123+
"""
124+
self.__client.on_unsubscribe = on_unsubscribe
125+
126+
def set_on_publish(self, on_publish):
127+
"""
128+
Set the on_publish callback for the MQTT client.
129+
130+
:param on_publish:
131+
:return:
132+
"""
133+
self.__client.on_publish = on_publish
134+
135+
def set_on_message(self, on_message):
136+
"""
137+
Set the on_message callback for the MQTT client. It is recommended to set individual callbacks for each
138+
subscribed topic.
139+
140+
:param on_message:
141+
:return:
142+
"""
143+
self.__client.on_message = on_message
144+
145+
def set_on_log(self, on_log):
146+
"""
147+
Set the on_log callback for the MQTT client.
148+
149+
:param on_log:
150+
:return:
151+
"""
152+
self.__client.on_log = on_log
153+
154+
def set_on_message_callback(self, sub, on_message_callback):
155+
"""
156+
Set the on_message callback for a specific topic.
157+
:param sub:
158+
:param on_message_callback:
159+
:return:
160+
"""
161+
self.__client.message_callback_add(sub, on_message_callback)
162+
163+
def start(self):
164+
"""
165+
Start the MQTT client in a separate thread. This is required for the client to be able to receive messages.
166+
167+
:return:
168+
"""
169+
self.__client.loop_start()
170+
171+
def stop(self):
172+
"""
173+
Stop the MQTT client.\
174+
175+
:return:
176+
"""
177+
self.__client.loop_stop()
178+
179+
def __toggle_is_connected(self):
180+
self.__is_connected = not self.__is_connected
181+
182+
def is_connected(self):
183+
return self.__is_connected
184+
185+
def publish(self, topic, msg):
186+
self.__client.publish(topic, msg, 1)
187+
188+
@staticmethod
189+
def publish_single(self, topic, msg):
190+
self.__client.single(topic, msg, 0)
191+
192+
@staticmethod
193+
def publish_multiple(self, topic, msgs):
194+
self.__client.multiple(msgs, )

tests/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,10 @@
1-
import pytest
1+
import pytest
2+
3+
sever_url = None
4+
5+
6+
@pytest.fixture(autouse=True)
7+
def server_url():
8+
print('Setting up server url')
9+
global server_url
10+
server_url = 'http://localhost:8282/sensorhub'

tests/test_commands.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import json
2+
import time
3+
from datetime import datetime
4+
5+
from conSys import GeoJSONBody, Systems, ControlChannels, ObservationFormat, Commands
6+
from conSys.datamodels.commands import CommandJSON
7+
from conSys.datamodels.control_streams import ControlStreamJSONSchema, JSONControlChannelSchema
8+
from conSys.datamodels.swe_components import DataRecordSchema, TimeSchema, CountSchema, URI
9+
from conSys.comm.mqtt import MQTTClient
10+
11+
server_url = "http://localhost:8282/sensorhub"
12+
geo_json_headers = {"Content-Type": "application/geo+json"}
13+
sml_json_headers = {"Content-Type": "application/sml+json"}
14+
json_headers = {"Content-Type": "application/json"}
15+
systems_list = []
16+
17+
18+
def test_setup():
19+
geo_temp = GeoJSONBody(type='Feature', id='12345-commanded',
20+
description="Test Insertion of System via GEOJSON",
21+
properties={
22+
"featureType": "http://www.w3.org/ns/ssn/System",
23+
"name": f'Test System - GeoJSON',
24+
"uid": f'urn:test:client:geo-single',
25+
"description": "A Test System inserted from the Python Connected Systems API Client",
26+
})
27+
resp = Systems.create_new_systems(server_url, geo_temp.model_dump_json(exclude_none=True, by_alias=True),
28+
uname="admin",
29+
pword="admin",
30+
headers=geo_json_headers)
31+
print(resp)
32+
33+
systems_list = Systems.list_all_systems(server_url, headers=json_headers).json()
34+
system_id = systems_list["items"][0]["id"]
35+
36+
time_schema = TimeSchema(label="Test Control Channel Time", definition="http://test.com/Time", name="timestamp",
37+
uom=URI(href="http://test.com/TimeUOM"))
38+
count_schema = CountSchema(label="Test Control Channel Count", definition="http://test.com/Count", name="testcount")
39+
40+
control_schema = JSONControlChannelSchema(command_format=ObservationFormat.SWE_JSON.value,
41+
params_schema=DataRecordSchema(label="Test Control Channel Record",
42+
definition="http://test.com/Record",
43+
fields=[time_schema, count_schema]))
44+
request_body = ControlStreamJSONSchema(name="Test Control Channel", input_name="TestControlInput1",
45+
schema=control_schema)
46+
ctl_resp = ControlChannels.add_control_streams_to_system(server_url, system_id,
47+
request_body.model_dump_json(exclude_none=True,
48+
by_alias=True),
49+
headers=json_headers)
50+
print(ctl_resp)
51+
52+
control_streams = ControlChannels.list_all_control_streams(server_url).json()
53+
command_json = CommandJSON(control_id=control_streams["items"][0]["id"],
54+
issue_time=datetime.now().isoformat() + 'Z',
55+
params={"timestamp": datetime.now().timestamp() * 1000, "testcount": 1})
56+
57+
print(f'Issuing Command: {command_json.model_dump_json(exclude_none=True, by_alias=True)}')
58+
cmd_resp = Commands.send_commands_to_specific_control_stream(server_url, control_streams["items"][0]["id"],
59+
command_json.model_dump_json(exclude_none=True,
60+
by_alias=True),
61+
headers=json_headers)
62+
print(cmd_resp)
63+
64+
65+
def test_subscribe_and_command():
66+
mqtt_client = MQTTClient(url='localhost')
67+
68+
control_streams = ControlChannels.list_all_control_streams(server_url).json()
69+
control_id = control_streams["items"][0]["id"]
70+
71+
mqtt_client.connect()
72+
73+
def on_message_command(client, userdata, msg):
74+
print("Received Command")
75+
print(f'{msg.payload.decode("utf-8")}')
76+
control_stream_id = control_id
77+
resp = {
78+
'id': '*******',
79+
'command@id': control_stream_id,
80+
'statusCode': 'COMPLETED'
81+
}
82+
# client.publish(f'/api/controls/{control_stream_id}/status', payload=json.dumps(resp), qos=1)
83+
84+
def on_message_all(client, userdata, msg):
85+
print(f'\nReceived Message:{msg}')
86+
print(f'{msg.payload.decode("utf-8")}')
87+
print(f'Topic: {msg.topic}\n')
88+
89+
mqtt_client.set_on_message_callback(f'/api/controls/{control_id}/commands', on_message_command)
90+
mqtt_client.set_on_message_callback('#', on_message_all)
91+
mqtt_client.subscribe('#')
92+
mqtt_client.subscribe(f'/api/controls/{control_id}/commands')
93+
mqtt_client.start()
94+
95+
time.sleep(2)
96+
command_json = CommandJSON(control_id=control_streams["items"][0]["id"],
97+
issue_time=datetime.now().isoformat() + 'Z',
98+
params={"timestamp": datetime.now().timestamp() * 1000, "testcount": 1})
99+
100+
print(f'Issuing Command: {command_json.model_dump_json(exclude_none=True, by_alias=True)}')
101+
cmd_resp = Commands.send_commands_to_specific_control_stream(server_url, control_streams["items"][0]["id"],
102+
command_json.model_dump_json(exclude_none=True,
103+
by_alias=True),
104+
headers=json_headers)
105+
print(f'\n*****Command Response: {cmd_resp}*****')
106+
status_resp = {
107+
'id': '*******',
108+
'command@id': "unknown",
109+
'statusCode': 'COMPLETED'
110+
}
111+
Commands.add_command_status_reports(server_url, "0", json.dumps(status_resp))

0 commit comments

Comments
 (0)