1+ #!/usr/bin/env python
2+
3+ #simpleOmniscan450Example.py
4+ from brping import definitions
5+ from brping import Omniscan450
6+ from brping import PingMessage
7+ import time
8+ import argparse
9+
10+ from builtins import input
11+
12+ import signal
13+ import sys
14+ import os
15+ from datetime import datetime
16+
17+ ##Parse Command line options
18+ ############################
19+
20+ parser = argparse .ArgumentParser (description = "Ping python library example." )
21+ parser .add_argument ('--device' , action = "store" , required = False , type = str , help = "Ping device port. E.g: /dev/ttyUSB0" )
22+ parser .add_argument ('--baudrate' , action = "store" , type = int , default = 115200 , help = "Ping device baudrate. E.g: 115200" )
23+ parser .add_argument ('--udp' , action = "store" , required = False , type = str , help = "Omniscan IP:Port. E.g: 192.168.2.92:51200" )
24+ parser .add_argument ('--tcp' , action = "store" , required = False , type = str , help = "Omniscan IP:Port. E.g: 192.168.2.92:51200" )
25+ parser .add_argument ('--range' , action = "store" , required = False , type = str , help = "Set range. E.g: 5000 or 0:5000" )
26+ parser .add_argument ('--log' , action = "store" , nargs = '?' , const = True , type = str , help = "Log filename. Will log if it doesn't exist, or replay if it does. Blank creates new log." )
27+ args = parser .parse_args ()
28+ if args .device is None and args .udp is None and args .tcp is None and args .log is None :
29+ parser .print_help ()
30+ exit (1 )
31+
32+ # Signal handler to stop pinging on the Omniscan450
33+ def signal_handler (sig , frame ):
34+ print ("Stopping pinging on Omniscan450..." )
35+ myOmniscan450 .control_os_ping_params (enable = 0 )
36+ if myOmniscan450 .iodev :
37+ try :
38+ myOmniscan450 .iodev .close ()
39+ except Exception as e :
40+ print (f"Failed to close socket: { e } " )
41+ sys .exit (0 )
42+
43+ signal .signal (signal .SIGINT , signal_handler )
44+
45+ # Make a new Omniscan450
46+ myOmniscan450 = Omniscan450 ()
47+ if args .device is not None :
48+ myOmniscan450 .connect_serial (args .device , args .baudrate )
49+ elif args .udp is not None :
50+ (host , port ) = args .udp .split (':' )
51+ myOmniscan450 .connect_udp (host , int (port ))
52+ elif args .tcp is not None :
53+ (host , port ) = args .tcp .split (':' )
54+ myOmniscan450 .connect_tcp (host , int (port ))
55+
56+ # Check for log argument
57+ # If no log is specified, create one using date and time
58+ # If a log is specified, existing log will be opened
59+ new_log = False
60+ if args .log is not None :
61+ if args .log is True :
62+ log_filename = datetime .now ().strftime ("omniscan-%Y-%m-%d_%H-%M-%S.txt" )
63+ elif isinstance (args .log , str ):
64+ log_filename = args .log
65+ else :
66+ log_filename = None
67+
68+ if log_filename :
69+ log_path = os .path .join ("logs/omniscan" , log_filename )
70+ if os .path .exists (log_path ):
71+ print (f"Replaying from existing log file: { log_filename } " )
72+ new_log = False
73+ else :
74+ print (f"Logging to new file: { log_filename } " )
75+ new_log = True
76+
77+ if args .log is None or new_log :
78+ if myOmniscan450 .initialize () is False :
79+ print ("Failed to initialize Omniscan450!" )
80+ exit (1 )
81+
82+ data1 = myOmniscan450 .get_device_information ()
83+ print ("Device type: %s" % data1 ["device_type" ])
84+
85+ print ("------------------------------------" )
86+ print ("Starting Omniscan450.." )
87+ print ("Press CTRL+C to exit" )
88+ print ("------------------------------------" )
89+
90+ input ("Press Enter to continue..." )
91+
92+ # Running Omniscan from existing log file
93+ if args .log is not None and not new_log :
94+ os .makedirs ("logs/omniscan" , exist_ok = True )
95+ log_path = os .path .join ("logs/omniscan" , log_filename )
96+ with open (log_path , 'rb' ) as f :
97+ raw_bytes = f .read ()
98+ data = PingMessage (msg_data = raw_bytes )
99+
100+ if data :
101+ scaled_result = Omniscan450 .scale_power (data )
102+ for i in range (len (scaled_result )):
103+ print (f"{ i + 1 } : Raw: { data .pwr_results [i ]} \t Scaled: { scaled_result [i ]} dB" )
104+ print (f"Min power: { data .min_pwr_db } dB" )
105+ print (f"Max power: { data .max_pwr_db } dB" )
106+ else :
107+ print ("Failed to get report" )
108+
109+ # Connected to physical omniscan
110+ else :
111+ if args .range is not None :
112+ parts = args .range .split (':' )
113+
114+ if len (parts ) == 2 :
115+ myOmniscan450 .control_os_ping_params (
116+ start_mm = int (parts [0 ]),
117+ length_mm = int (parts [1 ]),
118+ enable = 1
119+ )
120+ elif len (parts ) == 1 :
121+ myOmniscan450 .control_os_ping_params (
122+ start_mm = 0 ,
123+ length_mm = int (parts [0 ]),
124+ enable = 1
125+ )
126+ else :
127+ print ("Invalid range input, using default range" )
128+ myOmniscan450 .control_os_ping_params (enable = 1 )
129+ else :
130+ # For default settings, just set enable pinging
131+ myOmniscan450 .control_os_ping_params (enable = 1 )
132+
133+ # For a custom ping rate
134+ custom_msec_per_ping = Omniscan450 .calc_msec_per_ping (1000 ) # 1000 Hz
135+
136+ # To find pulse length percent
137+ custom_pulse_length = Omniscan450 .calc_pulse_length_pc (0.2 ) # 0.2%
138+
139+ ## Set these attributes like this
140+ # myOmniscan450.control_os_ping_params(
141+ # msec_per_ping=custom_msec_per_ping,
142+ # pulse_len_percent=custom_pulse_length,
143+ # num_results=200,
144+ # enable=1
145+ # )
146+
147+ # View power results
148+ data = myOmniscan450 .wait_message ([definitions .OMNISCAN450_OS_MONO_PROFILE ])
149+ if data :
150+ scaled_result = Omniscan450 .scale_power (data )
151+ for i in range (len (scaled_result )):
152+ print (f"{ i + 1 } : Raw: { data .pwr_results [i ]} \t Scaled: { scaled_result [i ]} dB" )
153+ print (f"Min power: { data .min_pwr_db } dB" )
154+ print (f"Max power: { data .max_pwr_db } dB" )
155+ else :
156+ print ("Failed to get report" )
157+
158+ # Disable pinging and close socket
159+ myOmniscan450 .control_os_ping_params (enable = 0 )
160+ if myOmniscan450 .iodev :
161+ try :
162+ myOmniscan450 .iodev .close ()
163+ except Exception as e :
164+ print (f"Failed to close socket: { e } " )
165+
166+ if new_log :
167+ os .makedirs ("logs/omniscan" , exist_ok = True )
168+ log_path = os .path .join ("logs/omniscan" , log_filename )
169+ with open (log_path , 'ab' ) as f :
170+ f .write (data .msg_data )
0 commit comments