-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGUIComp_StreamMgmt.py
More file actions
361 lines (297 loc) · 14.5 KB
/
GUIComp_StreamMgmt.py
File metadata and controls
361 lines (297 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
import logging
import os
import traceback
import mne
import numpy as np
from datetime import datetime
import time
import pyedflib
from pyqtgraph.Qt import QtCore, QtWidgets
from mne_lsl.lsl import resolve_streams
from mne_lsl.stream import StreamLSL
from GUIComp_Utils import GUI_Utils
from pathlib import Path
import yaml
mne.set_log_level('WARNING') # Set MNE log level to WARNING
class DeviceInfo:
"""Device information class, including channel selection, sample frequency, and device name"""
def __init__(self, channel_picks, sample_freq, name=""):
self.channel_picks = channel_picks
self.sample_freq = sample_freq
self.name = name # Newly added device name attribute
def __repr__(self):
return (
f"DeviceInfo(name={self.name}, channel_picks={self.channel_picks}, "
f"sample_freq={self.sample_freq} Hz)"
)
class DeviceInfoDatabase:
"""Collection of channels and sampling frequencies for all devices"""
# Muse
MUSE_ALL = DeviceInfo(["AF7", "AF8", "TP10"],
256,
"Muse All")
MUSE = DeviceInfo(["AF7"],
256,
"Muse")
#### Muse S + BlueMuse + mne-lsl viewer will hang after a few minutes due to freq config problem"""
# MUSE_S = DeviceInfo(["AF7"],
# 512, # BlueMuse shows 512, marked 256hz in stream, and actual push rate is
# is 512 Hz. Use this value here.
# "Muse S")
# MNE-LSL Player
PLAYER_ALL = DeviceInfo(["EEG Fpz-Cz", "EEG Pz-Oz"],
100,
"Player All")
PLAYER = DeviceInfo(["EEG Fpz-Cz"],
100,
"Player")
# TGAM
TGMA_ALL = DeviceInfo(["Fp1", "Fp2"],
512,
"TGMA All")
TGMA = DeviceInfo(["Fp1"], 512,
"TGAM")
# Flexolink
FLEXOLINK_ALL = DeviceInfo(["Fpz-Raw", "Fpz-Filtered"], 250,
"Flexo")
FLEXOLINK = DeviceInfo(["Fpz-Filtered"], 250,
"Flexo")
class EEGStreamManager:
def __init__(self, main_window,debug_mode = False):
self.main_window = main_window
self.status_bar = main_window.status_bar
self.stream: StreamLSL = None # LSL stream instance
self.timer = None
self.recording = False # Whether it is recording
self.record_file = None # Recording file object
self.record_file_name = None # Recording file name
self.device_info = None
self.record_button = None # Recording button reference
self.data_buffer = None
self.debug_mode = debug_mode
self.debug_counter = 0
self.debug_sample_counter = 0
self.debug_last_print_time = time.time()
self.write_count = 0 # For tracking file write operations
self.first_write_time = None # For tracking first write timestamp
self.total_written_samples = 0 # For tracking total written samples
self.log_file = open("eeg_stream.log", "a")
self.log_message("EEGStreamManager initialized")
def add_conn_menu_on_toolbar(self, toolbar):
"""Add connection menu"""
connect_menu = QtWidgets.QMenu("🎧", toolbar)
connect_menu.addAction("* Muse 2016 (via BlueMuse)").triggered.connect(
lambda: self.connect_eeg_stream(DeviceInfoDatabase.MUSE))
# connect_menu.addAction("* Muse S (via BlueMuse)").triggered.connect(
# lambda: self.connect_eeg_stream(DeviceInfoDatabase.MUSE_S))
connect_menu.addAction("* LSL Player (via MNE-LSL)").triggered.connect(
lambda: self.connect_eeg_stream(DeviceInfoDatabase.PLAYER))
connect_menu.addAction("* TGMA (via TGAM-LSL-Bridge)").triggered.connect(
lambda: self.connect_eeg_stream(DeviceInfoDatabase.TGMA))
connect_menu.addAction("* FlexoLink (via FlexoTool)").triggered.connect(
lambda: self.connect_eeg_stream(DeviceInfoDatabase.FLEXOLINK))
connect_button = GUI_Utils.transform_menu_to_toolbutton("🔗", connect_menu)
toolbar.addWidget(connect_button)
def add_record_menu_on_toolbar(self, toolbar):
"""Add recording menu as a toolbutton"""
# create menu for recording options
record_menu = QtWidgets.QMenu("recording", toolbar)
self.record_channel_action = record_menu.addAction("Current Channel")
self.record_channel_action.triggered.connect(self.record_current_channel)
# self.record_stream_action = record_menu.addAction("Whole Stream")
# self.record_stream_action.triggered.connect(self.record_whole_stream)
self.record_button = GUI_Utils.transform_menu_to_toolbutton("🔴", record_menu)
toolbar.addWidget(self.record_button)
def record_current_channel(self):
"""Handle recording of the current selected channel"""
if self.recording:
# stop recording
self.recording = False
self.record_button.setText("🔴")
# self.log_message("Recording stopped")
self.log_message("Recording stopped")
self.close_recording_file()
else:
if not self.stream:
self.log_message("Please connect stream before recording")
return
# start recording
self.recording = True
self.record_button.setText("🟥")
self.record_channel_action.setText("Current Channel")
self.open_recording_file()
self.log_message(
f"Recording channels {self.device_info.channel_picks} to {self.record_file_name}")
def record_whole_stream(self):
"""Handle 'record whole stream' option"""
title = "To Be Implemented"
text = "Your contribution to the source repository is more than welcome!"
QtWidgets.QMessageBox.information(None, title, text)
def open_recording_file(self):
"""Open an EDF+ file for recording"""
# Create data_recorded directory if not exists
os.makedirs("./data_recorded", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.record_file_name = f"./data_recorded/{self.device_info.name}_case_{timestamp}.edf"
channel_names = self.device_info.channel_picks
channel_count = len(channel_names)
sample_rate = self.device_info.sample_freq
# Create EDF+ file
self.record_file = pyedflib.EdfWriter(self.record_file_name, channel_count, file_type=pyedflib.FILETYPE_EDFPLUS)
# Reset write count and first write time for new recording
self.write_count = 0
self.first_write_time = time.time()
self.total_written_samples = 0
# Set signal parameters
signal_headers = []
for i in range(channel_count):
signal_headers.append({
'label': channel_names[i] if channel_names[i] else f"Channel_{i}",
'dimension': 'uV',
'sample_frequency': sample_rate, # Changed from 'sample_rate' to 'sample_frequency'
'physical_min': -327.68*2,
'physical_max': 327.67*2,
'digital_min': -32768,
'digital_max': 32767,
'prefilter': '',
'transducer': ''
})
self.record_file.setSignalHeaders(signal_headers)
# Initialize buffer with proper dimensions
self.data_buffer = np.empty((channel_count, 0))
def check_newdata_and_process(self):
"""Periodically update the display and save the latest data"""
try:
if self.stream.n_new_samples < 1:
return
data, _ = self.get_new_data_from_stream()
if data is None:
return
if self.debug_mode:
samples_this_call = data.shape[1] # Get number of samples in this call
self.debug_sample_counter += samples_this_call
# Calculate time difference
current_time = time.time()
if current_time - self.debug_last_print_time >= 1.0:
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"DEBUG:[{timestamp}] Received samples/s: {self.debug_sample_counter}")
self.debug_sample_counter = 0
self.debug_last_print_time = current_time
# Keep original data processing logic
selected_channel_data = data
for handler in self.main_window.loaded_indicators:
handler.process_new_data_and_update_plot(selected_channel_data)
if self.recording and self.record_file:
self.save_data_to_file(data)
except Exception as e:
traceback.print_exc()
self.log_message("failed to process new data")
def save_data_to_file(self, data):
# Check if data_buffer is None and initialize if needed
# although numpy 1.x don't need below part of code, but numpy 2.x needs it
if self.data_buffer is None:
channel_count = len(self.device_info.channel_picks)
self.data_buffer = np.empty((channel_count, 0))
self.write_count = 0
self.first_write_time = time.time()
"""Save data to an EDF+ file"""
# Add new data to buffer
self.data_buffer = np.hstack((self.data_buffer, data))
required_samples = self.device_info.sample_freq
if self.data_buffer.shape[1] >= required_samples:
# Extract data for 1 second
data_to_write = self.data_buffer[:, :required_samples]
self.data_buffer = self.data_buffer[:, required_samples:]
# Convert to microvolts and write to file
sample_list = [data_to_write[i, :] for i in range(data_to_write.shape[0])]
self.record_file.writeSamples(sample_list)
# Update total written samples
self.total_written_samples += data_to_write.shape[1]
# Update write count and calculate sample rate every 10 writes
self.write_count += 1
# Fallback: ensure first_write_time is set
if self.first_write_time is None:
self.first_write_time = time.time()
if self.write_count % 10 == 0:
elapsed_time = time.time() - self.first_write_time
sample_rate = self.total_written_samples / elapsed_time
self.log_message(f"Write sampling rate: {sample_rate:.2f} samples/sec")
def close_recording_file(self):
"""Close the EDF+ file"""
if self.record_file:
self.record_file.close()
self.record_file = None
# Get the current timestamp and generate a new file name
end_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name, ext = os.path.splitext(self.record_file_name) # Split file name and extension
new_file_name = f"{base_name}_to_{end_timestamp}{ext}"
# Rename the file
os.rename(self.record_file_name, new_file_name)
# Update the file name and display the status
self.record_file_name = new_file_name
self.log_message(f"Data saved: {self.record_file_name}")
def connect_eeg_stream(self, deviceInfo):
"""Connect to an EEG data stream"""
self.device_info = deviceInfo
real_freq = deviceInfo.sample_freq
# read YAML config file
config_path = Path(__file__).parent / 'indicators/indicator_global_config.yaml'
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
indicator_cfg_freq = config['STREAM']['sample_freq']
if real_freq != indicator_cfg_freq:
# update config
config['STREAM']['sample_freq'] = real_freq
# write to YAML file
with open(config_path, 'w') as f:
yaml.dump(config, f, sort_keys=False)
self.log_message(f" indicator_global_config.yaml updated: {real_freq}Hz")
QtCore.QCoreApplication.processEvents() # make sure the message is displayed
indicator_cfg_freq = real_freq
try:
stream_list = resolve_streams(stype='EEG') + resolve_streams(stype='eeg')
if not stream_list:
self.log_message("No stream found")
return
sinfo = stream_list[0]
self.stream = StreamLSL(bufsize=1, name=sinfo.name, stype=sinfo.stype, source_id=sinfo.source_id)
self.stream.connect()
self.stream.pick("eeg")
assert "CPz" not in self.stream.ch_names
self.stream.add_reference_channels("CPz")
self.start_timer()
self.log_message(f"connected to {deviceInfo.channel_picks}")
except Exception as e:
traceback.print_exc()
self.log_message("stream connection failed")
def disconnect_stream(self):
"""Disconnect the EEG data stream"""
if self.stream:
self.stream.disconnect()
def start_timer(self):
"""Start a timer"""
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.check_newdata_and_process)
self.timer.start(50) # Update every 50 ms
def get_new_data_from_stream(self):
"""Get data from all channels in the EEG stream"""
secs_for_new_data = self.stream.n_new_samples / self.device_info.sample_freq
data = self.stream.get_data(winsize=secs_for_new_data, picks=self.device_info.channel_picks) # picks=None means all channels
return data
def get_selected_channel_data(self, data):
"""Extract data from the selected channel"""
if self.device_info.channel_picks is None:
print("need to pick channels")
exit(1)
return data[self.device_info.channel_picks.index(self.device_info.channel_picks[0]), :]
def log_message(self, message):
"""Log message to file and display in status bar"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
self.log_file.write(log_entry)
self.log_file.flush()
self.status_bar.showMessage(message) # Changed from self.log_message to status_bar.showMessage
def __del__(self):
"""Clean up resources"""
if hasattr(self, 'log_file') and self.log_file:
self.log_file.close()