|
| 1 | +import asyncio |
| 2 | +import json |
| 3 | +import socket |
| 4 | +import time |
| 5 | +import sys |
| 6 | + |
| 7 | +from pathlib import Path |
| 8 | +project_root = str(Path(__file__).parents[1]) |
| 9 | +if project_root not in sys.path: |
| 10 | + sys.path.append(project_root) |
| 11 | + |
| 12 | +from powersensor_local.async_event_emitter import AsyncEventEmitter |
| 13 | + |
| 14 | +class PlugListenerUdp(AsyncEventEmitter, asyncio.DatagramProtocol): |
| 15 | + """An interface class for accessing the event stream from a single plug. |
| 16 | + The following events may be emitted: |
| 17 | + - ("connecting") Whenever a connection attempt is made. |
| 18 | + - ("connected") When a connection is successful. |
| 19 | + - ("disconnected") When a connection is dropped, be it intentional or not. |
| 20 | + - ("message",{...}) For each event message received from the plug. The |
| 21 | + plug's JSON message is decoded into a dict which is passed as the second |
| 22 | + argument to the registered event handler(s). |
| 23 | + - ("malformed",line) If JSON decoding of a message fails. The raw line |
| 24 | + is included (as a byte string). |
| 25 | +
|
| 26 | + The event handlers must be async. |
| 27 | + """ |
| 28 | + |
| 29 | + def __init__(self, ip, port=49476): |
| 30 | + """Initialises a PlugListener object, bound to the given IP address. |
| 31 | + The port number may be overridden if necessary.""" |
| 32 | + super().__init__() |
| 33 | + self._ip = ip |
| 34 | + self._port = port |
| 35 | + self._backoff = 0 # exponential backoff |
| 36 | + self._transport = None # UDP transport/socket |
| 37 | + self._reconnect = None # reconnect timer |
| 38 | + self._disconnecting = False # disconnecting flag |
| 39 | + self._was_connected = False # 'disconnected' event armed? |
| 40 | + |
| 41 | + def connect(self): |
| 42 | + """Initiates the connection to the plug. The object will automatically |
| 43 | + retry as necessary if/when it can't connect to the plug, until such |
| 44 | + a time disconnect() is called.""" |
| 45 | + self._disconnecting = False |
| 46 | + self._backoff = 0 |
| 47 | + if self._transport is None: |
| 48 | + asyncio.create_task(self._do_connection()) |
| 49 | + |
| 50 | + async def disconnect(self): |
| 51 | + """Goes through the disconnection process towards a plug. No further |
| 52 | + automatic reconnects will take place, until connect() is called.""" |
| 53 | + self._disconnecting = True |
| 54 | + |
| 55 | + await self._close_connection() |
| 56 | + |
| 57 | + async def _close_connection(self, unsub = True): |
| 58 | + if self._reconnect is not None: |
| 59 | + self._reconnect.cancel() |
| 60 | + self._reconnect = None |
| 61 | + |
| 62 | + if self._transport is not None: |
| 63 | + if unsub: |
| 64 | + self._transport.sendto(b'subscribe(0)\n') |
| 65 | + self._transport.close() |
| 66 | + self._transport = None |
| 67 | + |
| 68 | + if self._was_connected: |
| 69 | + await self.emit('disconnected') |
| 70 | + self._was_connected = False |
| 71 | + |
| 72 | + if not self._disconnecting: |
| 73 | + await self._do_connection() |
| 74 | + |
| 75 | + def _retry(self): |
| 76 | + self._reconnect = None |
| 77 | + asyncio.create_task(self._do_connection()) |
| 78 | + |
| 79 | + async def _do_connection(self): |
| 80 | + if self._disconnecting: |
| 81 | + return |
| 82 | + if self._backoff < 9: |
| 83 | + self._backoff += 1 |
| 84 | + await self.emit('connecting') |
| 85 | + loop = asyncio.get_running_loop() |
| 86 | + await loop.create_datagram_endpoint( |
| 87 | + self.protocol_factory, |
| 88 | + family = socket.AF_INET, |
| 89 | + remote_addr = (self._ip, self._port)) |
| 90 | + self._reconnect = loop.call_later( |
| 91 | + min(5*60, 2**self._backoff + 2), self._retry) |
| 92 | + |
| 93 | + def _send_subscribe(self): |
| 94 | + if self._transport is not None: |
| 95 | + self._transport.sendto(b'subscribe(60)\n') |
| 96 | + |
| 97 | + # DatagramProtocol support below |
| 98 | + |
| 99 | + def protocol_factory(self): |
| 100 | + return self |
| 101 | + |
| 102 | + def connection_made(self, transport): |
| 103 | + self._transport = transport |
| 104 | + self._send_subscribe() |
| 105 | + |
| 106 | + def datagram_received(self, data, addr): |
| 107 | + if self._reconnect is not None: |
| 108 | + self._reconnect.cancel() |
| 109 | + self._reconnect = None |
| 110 | + self._backoff = 0 |
| 111 | + asyncio.create_task(self.emit('connected')) |
| 112 | + |
| 113 | + if not self._was_connected: |
| 114 | + self._was_connected = True |
| 115 | + |
| 116 | + lines = data.decode('utf-8').splitlines() |
| 117 | + for line in lines: |
| 118 | + try: |
| 119 | + message = json.loads(line) |
| 120 | + typ = message['type'] |
| 121 | + if typ == 'subscription': |
| 122 | + if message['subtype'] == 'warning': |
| 123 | + self._send_subscribe() |
| 124 | + elif typ == 'discovery': |
| 125 | + pass |
| 126 | + else: |
| 127 | + asyncio.create_task(self.emit('message', message)) |
| 128 | + except (json.decoder.JSONDecodeError) as ex: |
| 129 | + asyncio.create_task(self.emit('malformed', data)) |
| 130 | + |
| 131 | + def error_received(self, exc): |
| 132 | + asyncio.create_task(self._close_connection(False)) |
| 133 | + |
| 134 | + def connection_lost(self, exc): |
| 135 | + if self._transport is not None: |
| 136 | + asyncio.create_task(self._close_connection(False)) |
0 commit comments