44import json
55import logging
66from datetime import datetime
7- from typing import Any , Dict , TypeVar , Union
7+ from typing import Any , TypeVar , Union
88
99from fastapi import APIRouter , WebSocket , WebSocketDisconnect
10- from sqlmodel import select
10+ from sqlmodel import Session , select
1111
1212import murfey .server .prometheus as prom
1313from murfey .server .murfey_db import get_murfey_db_session
2222
2323class ConnectionManager :
2424 def __init__ (self ):
25- self .active_connections : Dict [int | str , WebSocket ] = {}
25+ self .active_connections : dict [int | str , WebSocket ] = {}
2626
2727 async def connect (
28- self , websocket : WebSocket , client_id : int | str , register_client : bool = True
28+ self ,
29+ websocket : WebSocket ,
30+ client_id : Union [int , str ],
31+ register_client : bool = True ,
2932 ):
3033 await websocket .accept ()
3134 self .active_connections [client_id ] = websocket
@@ -38,16 +41,17 @@ async def connect(
3841
3942 @staticmethod
4043 def _register_new_client (client_id : int ):
44+ log .debug (f"Registering new client with ID { client_id } " )
4145 new_client = ClientEnvironment (client_id = client_id , connected = True )
42- murfey_db = next (get_murfey_db_session ())
46+ murfey_db : Session = next (get_murfey_db_session ())
4347 murfey_db .add (new_client )
4448 murfey_db .commit ()
4549 murfey_db .close ()
4650
47- def disconnect (self , client_id : int | str , unregister_client : bool = True ):
51+ def disconnect (self , client_id : Union [ int , str ] , unregister_client : bool = True ):
4852 self .active_connections .pop (client_id )
4953 if unregister_client :
50- murfey_db = next (get_murfey_db_session ())
54+ murfey_db : Session = next (get_murfey_db_session ())
5155 client_env = murfey_db .exec (
5256 select (ClientEnvironment ).where (
5357 ClientEnvironment .client_id == client_id
@@ -73,7 +77,7 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
7377 while True :
7478 data = await websocket .receive_text ()
7579 try :
76- json_data = json .loads (data )
80+ json_data : dict = json .loads (data )
7781 if json_data ["type" ] == "log" : # and isinstance(json_data, dict)
7882 json_data .pop ("type" )
7983 await forward_log (json_data , websocket )
@@ -92,15 +96,16 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
9296
9397@ws .websocket ("/connect/{client_id}" )
9498async def websocket_connection_endpoint (
95- websocket : WebSocket , client_id : Union [int , str ]
99+ websocket : WebSocket ,
100+ client_id : Union [int , str ],
96101):
97102 await manager .connect (websocket , client_id , register_client = False )
98103 await manager .broadcast (f"Client { client_id } joined" )
99104 try :
100105 while True :
101106 data = await websocket .receive_text ()
102107 try :
103- json_data = json .loads (data )
108+ json_data : dict = json .loads (data )
104109 if json_data .get ("type" ) == "log" : # and isinstance(json_data, dict)
105110 json_data .pop ("type" )
106111 await forward_log (json_data , websocket )
@@ -115,12 +120,12 @@ async def websocket_connection_endpoint(
115120 await manager .broadcast (f"Client #{ client_id } disconnected" )
116121
117122
118- async def check_connections (active_connections ):
123+ async def check_connections (active_connections : list [ WebSocket ] ):
119124 log .info ("Checking connections" )
120125 for connection in active_connections :
121126 log .info ("Checking response" )
122127 try :
123- await asyncio .wait_for (connection .receive (), timeout = 6 )
128+ await asyncio .wait_for (connection .receive (), timeout = 10 )
124129 except asyncio .TimeoutError :
125130 log .info (f"Disconnecting Client { connection [0 ]} " )
126131 manager .disconnect (connection [0 ], connection [1 ])
@@ -139,7 +144,7 @@ async def forward_log(logrecord: dict[str, Any], websocket: WebSocket):
139144
140145@ws .delete ("/test/{client_id}" )
141146async def close_ws_connection (client_id : int ):
142- murfey_db = next (get_murfey_db_session ())
147+ murfey_db : Session = next (get_murfey_db_session ())
143148 client_env = murfey_db .exec (
144149 select (ClientEnvironment ).where (ClientEnvironment .client_id == client_id )
145150 ).one ()
0 commit comments