Skip to content

Commit 344415a

Browse files
committed
Raw interface to single plug.
1 parent 4148f6c commit 344415a

4 files changed

Lines changed: 194 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Issues = "https://github.com/DiUS/python-powersensor_local/issues"
2323
[project.scripts]
2424
ps-events = "powersensor_local.events:app"
2525
ps-rawfirehose = "powersensor_local.rawfirehose:app"
26+
ps-rawplug = "powersensor.rawplug:app"
2627

2728
[build-system]
2829
requires = [ "hatchling" ]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
class AsyncEventEmitter:
2+
"""Small helper class for pub/sub functionality with async handlers."""
3+
def __init__(self):
4+
self._listeners = {}
5+
6+
def subscribe(self, evstr, cb):
7+
"""Registers an event handler for the given event key. The handler must
8+
be async. Duplicate registrations are ignored."""
9+
if self._listeners.get(evstr) is None:
10+
self._listeners[evstr] = []
11+
if not cb in self._listeners[evstr]:
12+
self._listeners[evstr].append(cb)
13+
14+
def unsubscribe(self, evstr, cb):
15+
"""Unregisters the given event handler from the given event type."""
16+
if self._listeners.get(evstr) is None:
17+
return
18+
if cb in self._listeners[evstr]:
19+
self._listeners[evstr].remove(cb)
20+
21+
async def emit(self, evstr, *args):
22+
"""Emits an event to all registered listeners for that event type.
23+
Additional arguments may be supplied with event as appropriate. Each
24+
event handler is awaited before delivering the event to the next."""
25+
if self._listeners.get(evstr) is None:
26+
return
27+
for cb in self._listeners[evstr]:
28+
await cb(evstr, *args)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import asyncio
2+
import json
3+
import socket
4+
import time
5+
6+
from async_event_emitter import AsyncEventEmitter
7+
8+
class PlugListener(AsyncEventEmitter):
9+
"""An interface class for accessing the event stream from a single plug.
10+
The following events may be emitted:
11+
- ("connecting") Whenever a connection attempt is made.
12+
- ("connected") When a connection is successful.
13+
- ("disconnected") When a connection is dropped, be it intentional or not.
14+
- ("message",{...}) For each event message received from the plug. The
15+
plug's JSON message is decoded into a dict which is passed as the second
16+
argument to the registered event handler(s). The event handlers must be
17+
async.
18+
"""
19+
20+
def __init__(self, ip, port=49476):
21+
"""Initialises a PlugListener object, bound to the given IP address.
22+
The port number may be overridden if necessary."""
23+
super().__init__()
24+
self._ip = ip
25+
self._port = port
26+
self._task = None
27+
self._connection = None
28+
self._disconnecting = False
29+
30+
def connect(self):
31+
"""Initiates the connection to the plug. The object will automatically
32+
retry as necessary if/when it can't connect to the plug, until such
33+
a time disconnect() is called."""
34+
if self._task is not None:
35+
raise RuntimeError("already connected/connecting")
36+
self._disconnecting = False
37+
self._task = asyncio.create_task(self._do_connection())
38+
39+
async def disconnect(self):
40+
"""Goes through the disconnection process towards a plug. No further
41+
automatic reconnects will take place, until connect() is called."""
42+
if self._task is None:
43+
return
44+
45+
self._disconnecting = True
46+
47+
await self._close_connection()
48+
49+
if self._task is not None:
50+
await self._task
51+
self._task = None
52+
53+
async def _close_connection(self):
54+
if self._connection is not None:
55+
(reader, writer) = self._connection
56+
self._connection = None
57+
58+
writer.close()
59+
await writer.wait_closed()
60+
61+
await self.emit('disconnected')
62+
63+
async def _do_connection(self, backoff = 0):
64+
if backoff < 9:
65+
backoff += 1
66+
try:
67+
await self.emit('connecting')
68+
reader, writer = await asyncio.open_connection(self._ip, self._port)
69+
self._connection = (reader, writer)
70+
71+
await self._send_subscribe(writer)
72+
backoff = 1
73+
74+
await self.emit('connected')
75+
76+
while not self._disconnecting:
77+
await self._process_line(reader, writer)
78+
79+
except (ConnectionResetError, asyncio.TimeoutError):
80+
# Handle disconnection and retry with exponential backoff
81+
await self._close_connection()
82+
if self._disconnecting:
83+
return
84+
await asyncio.sleep(min(5 * 60, 2**backoff * 1))
85+
return await self._do_connection(backoff)
86+
87+
async def _process_line(self, reader, writer):
88+
data = await reader.readline()
89+
if data == b'':
90+
raise ConnectionResetError
91+
if data != b'\n': # Silently ignore empty lines
92+
try:
93+
message = json.loads(data.decode('utf-8'))
94+
typ = message['type']
95+
if typ == 'subscription':
96+
if message['subtype'] == 'warning':
97+
await self._send_subscribe(writer)
98+
elif typ == 'discovery':
99+
pass
100+
else:
101+
await self.emit('message', message)
102+
except (json.decoder.JSONDecodeError) as ex:
103+
print(f"JSON error {ex} from {data}")
104+
105+
async def _send_subscribe(self, writer):
106+
writer.write(b'subscribe(60)\n')
107+
await writer.drain()

src/powersensor_local/rawplug.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#!/usr/bin/env python3
2+
3+
"""Utility script for accessing the raw plug subscription data from a single
4+
network-local Powersensor devices. Intended for advanced debugging use only."""
5+
6+
import asyncio
7+
import os
8+
import signal
9+
import sys
10+
from plug_listener import PlugListener
11+
12+
exiting = False
13+
plug = None
14+
15+
async def do_exit():
16+
global exiting
17+
global plug
18+
if plug != None:
19+
await plug.disconnect()
20+
del plug
21+
exiting = True
22+
23+
async def on_evt_msg(_, msg):
24+
print(msg)
25+
26+
async def on_evt(evt):
27+
print(evt)
28+
29+
async def main():
30+
if len(sys.argv) < 2:
31+
print(f"Syntax: {sys.argv[0]} <ip> [port]")
32+
sys.exit(1)
33+
34+
# Signal handler for Ctrl+C
35+
def handle_sigint(signum, frame):
36+
signal.signal(signal.SIGINT, signal.SIG_DFL)
37+
asyncio.create_task(do_exit())
38+
39+
signal.signal(signal.SIGINT, handle_sigint)
40+
41+
global plug
42+
plug = PlugListener(sys.argv[1], *sys.argv[2:2])
43+
plug.subscribe('message', on_evt_msg)
44+
plug.subscribe('connecting', on_evt)
45+
plug.subscribe('connecting', on_evt)
46+
plug.subscribe('connected', on_evt)
47+
plug.subscribe('disconnected', on_evt)
48+
plug.connect()
49+
50+
# Keep the event loop running until Ctrl+C is pressed
51+
while not exiting:
52+
await asyncio.sleep(1)
53+
54+
def app():
55+
asyncio.run(main())
56+
57+
if __name__ == "__main__":
58+
app()

0 commit comments

Comments
 (0)