11import asyncio
22import json
3+ import socket
4+ import time
35
46import sys
57from pathlib import Path
6-
78project_root = str (Path (__file__ ).parents [1 ])
89if project_root not in sys .path :
910 sys .path .append (project_root )
1011
1112from powersensor_local .async_event_emitter import AsyncEventEmitter
1213
13-
14- async def _send_subscribe (writer ):
15- writer .write (b'subscribe(60)\n ' )
16- await writer .drain ()
17-
18-
1914class PlugListener (AsyncEventEmitter ):
2015 """An interface class for accessing the event stream from a single plug.
2116 The following events may be emitted:
@@ -24,8 +19,11 @@ class PlugListener(AsyncEventEmitter):
2419 - ("disconnected") When a connection is dropped, be it intentional or not.
2520 - ("message",{...}) For each event message received from the plug. The
2621 plug's JSON message is decoded into a dict which is passed as the second
27- argument to the registered event handler(s). The event handlers must be
28- async.
22+ argument to the registered event handler(s).
23+ - ("malformed",line) If JSON decoding of a message fails. The raw line
24+ is included (as a byte string).
25+
26+ The event handlers must be async.
2927 """
3028
3129 def __init__ (self , ip , port = 49476 ):
@@ -72,14 +70,16 @@ async def _close_connection(self):
7270 await self .emit ('disconnected' )
7371
7472 async def _do_connection (self , backoff = 0 ):
73+ if self ._disconnecting :
74+ return
7575 if backoff < 9 :
7676 backoff += 1
7777 try :
7878 await self .emit ('connecting' )
7979 reader , writer = await asyncio .open_connection (self ._ip , self ._port )
8080 self ._connection = (reader , writer )
8181
82- await _send_subscribe (writer )
82+ await self . _send_subscribe (writer )
8383 backoff = 1
8484
8585 await self .emit ('connected' )
@@ -91,7 +91,7 @@ async def _do_connection(self, backoff = 0):
9191 # Handle disconnection and retry with exponential backoff
9292 await self ._close_connection ()
9393 if self ._disconnecting :
94- return None
94+ return
9595 await asyncio .sleep (min (5 * 60 , 2 ** backoff * 1 ))
9696 return await self ._do_connection (backoff )
9797
@@ -105,10 +105,14 @@ async def _process_line(self, reader, writer):
105105 typ = message ['type' ]
106106 if typ == 'subscription' :
107107 if message ['subtype' ] == 'warning' :
108- await _send_subscribe (writer )
108+ await self . _send_subscribe (writer )
109109 elif typ == 'discovery' :
110110 pass
111111 else :
112112 await self .emit ('message' , message )
113- except json .decoder .JSONDecodeError as ex :
114- print (f"JSON error { ex } from { data } " )
113+ except (json .decoder .JSONDecodeError ) as ex :
114+ await self .emit ('malformed' , data )
115+
116+ async def _send_subscribe (self , writer ):
117+ writer .write (b'subscribe(60)\n ' )
118+ await writer .drain ()
0 commit comments