"""
Flask Control Application for EEG, Motion, PPG, and Video/Stimuli Programs
This module serves as a centralized controller for multiple programs:
- Program 1 (p1): Camera control
- Program 2 (p2): Image stimuli control
- Program 4 (p4): Video stimuli control
It connects to a Muse EEG device via LSL streams, collects EEG, accelerometer, gyroscope,
and PPG data, and stores it in shared memory for visualization. It also exposes Flask
routes for starting/stopping recordings, controlling video feeds, and managing stimuli.
Key Features:
- Real-time data acquisition from Muse device (EEG, accelerometer, gyroscope, PPG)
- Shared memory for inter-process communication with visualization
- Flask REST API for user interaction (recording, visualization, stimuli control)
- Multiprocessing support for external programs (p1, p2, p4)
- Performance-optimized real-time visualization of EEG and motion data
"""
from flask import Flask, render_template, jsonify, request
import threading
import time
import json
from pylsl import StreamInlet, resolve_stream
import numpy as np
import multiprocessing
from multiprocessing import shared_memory, Manager
import matplotlib.pyplot as plt
import logging
from website import p1, p2, p4
import warnings
import logging
warnings.filterwarnings("ignore")
logging.getLogger('matplotlib').setLevel(logging.CRITICAL)
logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING)
app = Flask(__name__)
# Sample rates and parameters for Muse
SAMPLE_RATE = 256 # Hz for Muse EEG
ACC_SAMPLE_RATE = 52 # Hz for Muse accelerometer
GYRO_SAMPLE_RATE = 52 # Hz for Muse gyroscope
PPG_SAMPLE_RATE = 64 # Hz for Muse PPG
DISPLAY_TIME = 10 # seconds to display
BUFFER_SIZE = SAMPLE_RATE * DISPLAY_TIME # Buffer size for EEG data
PPG_BUFFER_SIZE = PPG_SAMPLE_RATE * DISPLAY_TIME # Buffer size for PPG data
# Global variables for recording
recording = False
recorded_data = {
"eeg": [],
"acc": [],
"gyro": [],
"ppg": []
}
# Initialize data buffers
eeg_data = np.zeros((4, BUFFER_SIZE)) # 4 EEG channels (TP9, AF7, AF8, TP10)
acc_data = np.zeros((3, BUFFER_SIZE)) # 3 accelerometer axes
gyro_data = np.zeros((3, BUFFER_SIZE)) # 3 gyroscope axes
ppg_data = np.zeros((3, PPG_BUFFER_SIZE)) # 3 PPG channels (PPG1, PPG2, PPG3)
# Initialize time buffers
time_buffer = np.linspace(-DISPLAY_TIME, 0, BUFFER_SIZE)
ppg_time_buffer = np.linspace(-DISPLAY_TIME, 0, PPG_BUFFER_SIZE)
# Connection status flags
eeg_connected = False
acc_connected = False
gyro_connected = False
ppg_connected = False
# Global variables for LSL inlets
eeg_inlet = None
acc_inlet = None
gyro_inlet = None
ppg_inlet = None
# Threading lock for thread safety
data_lock = threading.Lock()
# Configure logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
# Connect to Muse streams
[docs]
def connect_to_muse():
"""
Connect to Muse EEG, accelerometer, gyroscope, and PPG LSL streams.
Sets global connection flags and initializes StreamInlet objects
for each available stream. Prints connection status to console.
"""
global eeg_connected, acc_connected, gyro_connected, ppg_connected
global eeg_inlet, acc_inlet, gyro_inlet, ppg_inlet # Make inlets global
print("Looking for an EEG stream...")
eeg_streams = resolve_stream('type', 'EEG')
if eeg_streams:
print("Creating inlet for EEG stream...")
eeg_inlet = StreamInlet(eeg_streams[0]) # Assign to global variable
eeg_connected = True
# Look for accelerometer stream
acc_streams = resolve_stream('type', 'Accelerometer')
if acc_streams:
print("Creating inlet for Accelerometer stream...")
acc_inlet = StreamInlet(acc_streams[0]) # Assign to global variable
acc_connected = True
# Look for gyroscope stream
gyro_streams = resolve_stream('type', 'Gyroscope')
if gyro_streams:
print("Creating inlet for Gyroscope stream...")
gyro_inlet = StreamInlet(gyro_streams[0]) # Assign to global variable
gyro_connected = True
# Look for PPG stream
ppg_streams = resolve_stream('type', 'PPG')
if ppg_streams:
print("Creating inlet for PPG stream...")
ppg_inlet = StreamInlet(ppg_streams[0]) # Assign to global variable
ppg_connected = True
if eeg_connected or acc_connected or gyro_connected or ppg_connected:
print("Successfully connected to Muse streams.")
else:
print("No streams found. Please ensure the Muse device is connected and streaming.")
# Process data in a separate thread
[docs]
def process_data_thread():
"""
Continuous thread for reading Muse LSL streams and updating shared memory buffers.
- Reads EEG, accelerometer, gyroscope, and PPG chunks.
- Updates global buffers and shared memory arrays.
- Appends recorded data if recording is enabled.
"""
global recording, recorded_data
global eeg_inlet, acc_inlet, gyro_inlet, ppg_inlet
# Explicitly declare globals for data arrays
global eeg_data, acc_data, gyro_data, ppg_data
global shared_eeg_data, shared_acc_data, shared_gyro_data, shared_ppg_data
while True:
try:
if eeg_connected and eeg_inlet:
chunk, timestamps = eeg_inlet.pull_chunk()
if chunk:
with data_lock: # Acquire lock before modifying shared data
for i, sample in enumerate(chunk):
# Update the global buffer
eeg_data = np.roll(eeg_data, -1, axis=1)
for ch in range(4):
if ch < len(sample):
eeg_data[ch, -1] = sample[ch]
# Update the shared memory buffer
shared_eeg_data[:] = eeg_data[:]
if recording:
recorded_data["eeg"].append({
"timestamp": timestamps[i],
"values": sample[:4]
})
if acc_connected and acc_inlet:
chunk, timestamps = acc_inlet.pull_chunk()
if chunk:
with data_lock:
for i, sample in enumerate(chunk):
# Update global buffer
acc_data = np.roll(acc_data, -1, axis=1)
for axis in range(3):
if axis < len(sample):
acc_data[axis, -1] = sample[axis]
# Update shared memory buffer
shared_acc_data[:] = acc_data[:]
if recording:
recorded_data["acc"].append({
"timestamp": timestamps[i],
"values": sample[:3]
})
if gyro_connected and gyro_inlet:
chunk, timestamps = gyro_inlet.pull_chunk()
if chunk:
with data_lock:
for i, sample in enumerate(chunk):
# Update global buffer
gyro_data = np.roll(gyro_data, -1, axis=1)
for axis in range(3):
if axis < len(sample):
gyro_data[axis, -1] = sample[axis]
# Update shared memory buffer
shared_gyro_data[:] = gyro_data[:]
if recording:
recorded_data["gyro"].append({
"timestamp": timestamps[i],
"values": sample[:3]
})
if ppg_connected and ppg_inlet:
chunk, timestamps = ppg_inlet.pull_chunk()
if chunk:
with data_lock:
for i, sample in enumerate(chunk):
# Update global buffer
ppg_data = np.roll(ppg_data, -1, axis=1)
for ch in range(3):
if ch < len(sample):
ppg_data[ch, -1] = sample[ch]
# Update shared memory buffer
shared_ppg_data[:] = ppg_data[:]
if recording:
recorded_data["ppg"].append({
"timestamp": timestamps[i],
"values": sample[:3]
})
time.sleep(0.001)
except Exception as e:
logging.error(f"Error in processing thread: {e}")
time.sleep(0.1)
# Start the data processing thread
thread = threading.Thread(target=process_data_thread, daemon=True)
thread.start()
# Flask routes
@app.route("/")
def index():
return render_template("index.html")
# subject description start
@app.route("/subject_information", methods=["POST"])
def subject_information():
data = request.get_json() # Get JSON data from request
config = json.loads(data)
with open("data/subject_information.json", "w") as file:
json.dump(config, file)
print("Received Data:", data) # Debugging
return jsonify({"status": "Subject Information Saved"})
# subject description end
# command for video feed control
[docs]
def send_command(conn, command, json_data=None):
"""
Send a command to a child process via a Pipe connection.
Args:
conn (multiprocessing.Connection): Parent end of a Pipe connection.
command (str): Command string to send.
json_data (dict, optional): Optional JSON data to send after command.
Returns:
bool: True if command successfully sent and response received, False otherwise.
"""
try:
# Send the command
conn.send(command)
print(f"Sent command: {command}")
# If the command is 'save_config', send the JSON data, for program 2
if command == "save_config" or command == "save_video_config" and json_data is not None:
conn.send(json_data)
print(f"Sent JSON data: {json_data}")
# Wait for a response
response = conn.recv()
print(f"Received response: {response}")
return True
except Exception as e:
print(f"Error sending command: {e}")
return False
@app.route("/start_recording_video", methods=["POST"])
def start_recording_video():
global parent_conn
cmd = 'start_recording'
if send_command(parent_conn, cmd):
return jsonify({"status": f"Recording Camera Feed"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure camera program is running."})
@app.route("/stop_recording_video", methods=["POST"])
def stop_recording_video():
global parent_conn
cmd = 'stop_recording'
if send_command(parent_conn, cmd):
return jsonify({"status": f"Camera Feed Recording Stopped and Saved"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure camera program is running."})
@app.route("/open_visualization_video", methods=["POST"])
def open_visualization_video():
global parent_conn
cmd = 'show_feed'
if send_command(parent_conn, cmd):
return jsonify({"status": f"Showing Camera Feed"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure camera program is running."})
@app.route("/close_visualization_video", methods=["POST"])
def close_visualization_video():
global parent_conn
cmd = 'close_feed'
if send_command(parent_conn, cmd):
return jsonify({"status": f"Closed Video Feed"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
# command for muse control
@app.route("/start_recording", methods=["POST"])
def start_recording():
global recording, recorded_data
recording = True
recorded_data = {
"eeg": [],
"acc": [],
"gyro": [],
"ppg": []
}
return jsonify({"status": "Recording started"})
@app.route("/stop_recording", methods=["POST"])
def stop_recording():
global recording
recording = False
filename = f"data/recorded_data_{time.strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w") as f:
json.dump(recorded_data, f, indent=4)
return jsonify({"status": f"Recording stopped. Data saved to {filename}"})
@app.route("/open_visualization", methods=["POST"])
def open_visualization():
logging.debug("Opening visualization window")
# Pass the shared memory names to the visualization process
process = multiprocessing.Process(
target=start_visualization,
args=(shared_memory_names, time_buffer.tolist(), ppg_time_buffer.tolist(),
BUFFER_SIZE, PPG_BUFFER_SIZE)
)
process.start()
return jsonify({"status": "Visualization window opened"})
# command for image stimuli program start
@app.route("/load_stimuli_config", methods=["POST"])
def load_stimuli_config():
global parent_conn2
cmd = 'load_stimuli'
if send_command(parent_conn2, cmd):
return jsonify({"status": f"Loaded Stimuli Configuration"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
@app.route("/save_stimuli_config", methods=["POST"])
def save_stimuli_config():
global parent_conn2
data = request.get_json() # Get JSON data from request
print("Received Data:", data) # Debugging
cmd = 'save_config'
if send_command(parent_conn2, cmd, data):
return jsonify({"status": f"Saved Stimuli Configuration"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
@app.route("/start_stimuli", methods=["POST"])
def start_stimuli():
global parent_conn2
cmd = 'start_stimuli'
if send_command(parent_conn2, cmd):
return jsonify({"status": f"Started Stimuli"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
# command for image stimuli program end
# command for video stimuli program start
@app.route("/load_video_stimuli_config", methods=["POST"])
def load_video_stimuli_config():
global parent_conn4
cmd = 'load_video_stimuli'
if send_command(parent_conn4, cmd):
return jsonify({"status": f"Loaded Stimuli Configuration"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
@app.route("/save_video_stimuli_config", methods=["POST"])
def save_video_stimuli_config():
global parent_conn4
data = request.get_json() # Get JSON data from request
print("Received Data:", data) # Debugging
cmd = 'save_video_config'
if send_command(parent_conn4, cmd, data):
return jsonify({"status": f"Saved Stimuli Configuration"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
@app.route("/start_video_stimuli", methods=["POST"])
def start_video_stimuli():
global parent_conn4
cmd = 'start_video_stimuli'
if send_command(parent_conn4, cmd):
return jsonify({"status": f"Started Stimuli"})
else:
return jsonify({"status": f"Failed to send command: {cmd}. Make sure Program 2 is running."})
# command for video stimuli program end
[docs]
def start_visualization(shared_memory_names, time_buffer_list, ppg_time_buffer_list,
buffer_size, ppg_buffer_size):
"""
Start a Tkinter-based visualization window showing EEG, frequency bands,
accelerometer, gyroscope, and PPG signals.
Args:
shared_memory_names (dict): Names of shared memory blocks.
time_buffer_list (list): Time vector for EEG, ACC, GYRO plotting.
ppg_time_buffer_list (list): Time vector for PPG plotting.
buffer_size (int): Buffer size for EEG/ACC/GYRO.
ppg_buffer_size (int): Buffer size for PPG.
"""
import tkinter as tk
import numpy as np
from multiprocessing import shared_memory
import logging
import matplotlib
matplotlib.use("TkAgg") # Explicitly set backend
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from scipy import signal
# Convert lists back to numpy arrays
time_buffer = np.array(time_buffer_list)
ppg_time_buffer = np.array(ppg_time_buffer_list)
# Attach to the shared memory created in the main process
eeg_shm = shared_memory.SharedMemory(name=shared_memory_names['eeg'])
acc_shm = shared_memory.SharedMemory(name=shared_memory_names['acc'])
gyro_shm = shared_memory.SharedMemory(name=shared_memory_names['gyro'])
ppg_shm = shared_memory.SharedMemory(name=shared_memory_names['ppg'])
# Create numpy arrays that use the shared memory
eeg_shape = (4, buffer_size)
acc_shape = (3, buffer_size)
gyro_shape = (3, buffer_size)
ppg_shape = (3, ppg_buffer_size)
eeg_data = np.ndarray(eeg_shape, dtype=np.float64, buffer=eeg_shm.buf)
acc_data = np.ndarray(acc_shape, dtype=np.float64, buffer=acc_shm.buf)
gyro_data = np.ndarray(gyro_shape, dtype=np.float64, buffer=gyro_shm.buf)
ppg_data = np.ndarray(ppg_shape, dtype=np.float64, buffer=ppg_shm.buf)
# Assume EEG sampling rate (adjust as needed)
fs = 256 # Hz, typical for EEG
# Calculate frequency bands for EEG
def extract_frequency_bands(eeg_data, fs):
# Define frequency bands
bands = {
'delta': (0.5, 4), # 0.5-4 Hz
'theta': (4, 8), # 4-8 Hz
'alpha': (8, 13), # 8-13 Hz
'beta': (13, 30), # 13-30 Hz
'gamma': (30, 45) # 30-45 Hz (or higher)
}
results = {}
# Design filters for each band
for band_name, (low_freq, high_freq) in bands.items():
# Normalize frequencies to Nyquist frequency
low = low_freq / (fs/2)
high = high_freq / (fs/2)
# Create bandpass filter
b, a = signal.butter(4, [low, high], btype='bandpass')
# Apply filter to each EEG channel
band_data = np.zeros_like(eeg_data)
for i in range(eeg_data.shape[0]):
# Apply filter and get filtered signal
band_data[i, :] = signal.filtfilt(b, a, eeg_data[i, :])
results[band_name] = band_data
return results
class VisualizationWindow:
def __init__(self):
# Use a more efficient matplotlib backend
plt.style.use('fast') # Use a faster style
self.root = tk.Tk()
self.root.title("Enhanced EEG Visualization")
self.root.geometry("1600x1000") # Larger window to accommodate all plots
# Create figure for plotting
self.fig = Figure(figsize=(16, 14), dpi=100)
self.setup_plots()
# Add figure to Tkinter window
self.canvas = FigureCanvasTkAgg(self.fig, master=self.root)
self.canvas_widget = self.canvas.get_tk_widget()
self.canvas_widget.pack(fill=tk.BOTH, expand=True)
# Reduce the number of points for faster rendering
self.downsample_factor = 2
# Pre-calculate fixed y-axis limits for better performance
self._calculate_fixed_limits()
# Start animation with reduced frequency
self.ani = FuncAnimation(
self.fig,
self.update_plot,
interval=200, # Slower refresh rate for better performance
blit=True, # Use blitting for faster rendering
cache_frame_data=False
)
# Handle window close
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
# Add performance optimizations for Tkinter
self.root.update_idletasks()
self.last_update_time = 0
def _calculate_fixed_limits(self):
"""Pre-calculate reasonable fixed y-axis limits for all plots"""
self.eeg_ylim = (-500, 500) # μV range for EEG
self.band_ylim = (-150, 150) # μV range for frequency bands
self.acc_ylim = (-1.2, 1.2) # m/s² range for accelerometer
self.gyro_ylim = (-250, 250) # deg/s range for gyroscope
self.ppg_ylim = (-2500, 2500) # Arbitrary units for PPG
# Apply fixed limits
for ax in self.eeg_axes:
ax.set_ylim(self.eeg_ylim)
for band_name, band_axes in self.band_axes.items():
for ax in band_axes:
ax.set_ylim(self.band_ylim)
self.acc_ax.set_ylim(self.acc_ylim)
self.gyro_ax.set_ylim(self.gyro_ylim)
self.ppg_ax.set_ylim(self.ppg_ylim)
# Set fixed x-axis limits
t_max = max(time_buffer.max() if len(time_buffer) > 0 else 10,
ppg_time_buffer.max() if len(ppg_time_buffer) > 0 else 10)
t_min = min(time_buffer.min() if len(time_buffer) > 0 else 0,
ppg_time_buffer.min() if len(ppg_time_buffer) > 0 else 0)
all_axes = self.eeg_axes[:]
for band_axes_list in self.band_axes.values():
all_axes.extend(band_axes_list)
all_axes.extend([self.acc_ax, self.gyro_ax, self.ppg_ax])
for ax in all_axes:
ax.set_xlim(t_min, t_max)
def setup_plots(self):
# Clear any existing plots
self.fig.clear()
# Create a 6x5 grid layout
gs = self.fig.add_gridspec(6, 5, hspace=0.5, wspace=0.4)
channel_names = ['TP9', 'AF7', 'AF8', 'TP10']
band_names = ['Raw', 'Delta', 'Theta', 'Alpha', 'Beta', 'Gamma']
# Define colors for different data types
band_colors = {
'Delta': 'purple',
'Theta': 'blue',
'Alpha': 'green',
'Beta': 'orange',
'Gamma': 'red',
'Raw': 'black'
}
# Initialize collections to store axes and lines
self.eeg_axes = []
self.eeg_lines = []
self.band_axes = {
'delta': [],
'theta': [],
'alpha': [],
'beta': [],
'gamma': []
}
self.band_lines = {
'delta': [],
'theta': [],
'alpha': [],
'beta': [],
'gamma': []
}
# Create the main 4x6 grid for EEG channels
for col, channel in enumerate(channel_names):
for row, band in enumerate(band_names):
ax = self.fig.add_subplot(gs[row, col])
# Set title only for the first row
if row == 0:
ax.set_title(f"{channel}")
# Set band label only for the first column
if col == 0:
ax.set_ylabel(f"{band}")
# Only show x-axis labels for the bottom row
if row < 5:
ax.set_xticklabels([])
else:
ax.set_xlabel("Time (s)")
ax.grid(True, alpha=0.3)
# Create line for this subplot with appropriate color
color = band_colors[band]
line, = ax.plot([], [], lw=1, color=color)
# Store the axes and lines appropriately
if band == 'Raw':
self.eeg_axes.append(ax)
self.eeg_lines.append(line)
else:
# Convert band name to lowercase to match original code's structure
band_lower = band.lower()
self.band_axes[band_lower].append(ax)
self.band_lines[band_lower].append(line)
# Create sensor plots in the 5th column
# Accelerometer (first row, fifth column)
self.acc_ax = self.fig.add_subplot(gs[0, 4])
self.acc_ax.set_title("Accelerometer")
self.acc_ax.set_ylabel("m/s²")
# self.acc_ax.set_xlabel("Time (s)")
self.acc_ax.grid(True, alpha=0.3)
# Gyroscope (second row, fifth column)
self.gyro_ax = self.fig.add_subplot(gs[1, 4])
self.gyro_ax.set_title("Gyroscope")
self.gyro_ax.set_ylabel("deg/s")
# self.gyro_ax.set_xlabel("Time (s)")
self.gyro_ax.grid(True, alpha=0.3)
# PPG (third row, fifth column)
self.ppg_ax = self.fig.add_subplot(gs[2, 4])
self.ppg_ax.set_title("PPG Signals")
self.ppg_ax.set_ylabel("A.U.")
# self.ppg_ax.set_xlabel("Time (s)")
self.ppg_ax.grid(True, alpha=0.3)
# Create lines for motion sensors and PPG
colors = ['r', 'g', 'b']
self.acc_lines = []
for i in range(3):
line, = self.acc_ax.plot([], [], lw=1, color=colors[i], label=f"{'XYZ'[i]}")
self.acc_lines.append(line)
self.acc_ax.legend(loc="upper right", ncol=3, fontsize='small')
self.gyro_lines = []
for i in range(3):
line, = self.gyro_ax.plot([], [], lw=1, color=colors[i], label=f"{'XYZ'[i]}")
self.gyro_lines.append(line)
self.gyro_ax.legend(loc="upper right", ncol=3, fontsize='small')
self.ppg_lines = []
for i in range(3):
line, = self.ppg_ax.plot([], [], lw=1, color=colors[i], label=f"PPG{i + 1}")
self.ppg_lines.append(line)
self.ppg_ax.legend(loc="lower right", ncol=3, fontsize='small')
# Add frequency band legend in the remaining space in the 5th column
legend_ax = self.fig.add_subplot(gs[3:, 4])
legend_ax.axis('off')
# Create legend entries for each band with their frequency ranges
band_ranges = {
'Delta': "0.5-4 Hz",
'Theta': "4-8 Hz",
'Alpha': "8-13 Hz",
'Beta': "13-30 Hz",
'Gamma': "30-45 Hz"
}
legend_handles = []
for band, color in band_colors.items():
if band != 'Raw':
legend_handles.append(matplotlib.lines.Line2D([0], [0], color=color, lw=2,
label=f"{band}: {band_ranges.get(band, '')}"))
legend_ax.legend(handles=legend_handles, loc='center', fontsize='medium')
# Add a main title
self.fig.suptitle("EEG Brain Wave Analysis", fontsize=16, y=0.99)
# Adjust the figure layout
self.fig.tight_layout(rect=[0, 0, 1, 0.98])
# Optimize figure rendering
self.fig.set_facecolor('white')
self.fig.set_dpi(90) # Lower DPI for better performance
def update_plot(self, frame):
# Down sample data for faster plotting
ds = self.downsample_factor
time_ds = time_buffer[::ds]
ppg_time_ds = ppg_time_buffer[::ds]
# Update EEG lines with data from shared memory
for i in range(4):
self.eeg_lines[i].set_data(time_ds, eeg_data[i][::ds])
# Calculate frequency bands
if len(time_ds) > 0: # Only process if we have data
bands_data = extract_frequency_bands(eeg_data, fs)
# Update band lines
for band_name, band_data in bands_data.items():
for i in range(4): # 4 EEG channels
self.band_lines[band_name][i].set_data(time_ds, band_data[i][::ds])
# Update accelerometer lines
for i in range(3):
self.acc_lines[i].set_data(time_ds, acc_data[i][::ds])
# Update gyroscope lines
for i in range(3):
self.gyro_lines[i].set_data(time_ds, gyro_data[i][::ds])
# Update PPG lines
for i in range(3):
self.ppg_lines[i].set_data(ppg_time_ds, ppg_data[i][::ds])
# Return all lines for blitting
all_lines = self.eeg_lines[:]
for band_lines in self.band_lines.values():
all_lines.extend(band_lines)
all_lines.extend(self.acc_lines + self.gyro_lines + self.ppg_lines)
return all_lines
def on_close(self):
logging.debug("Closing visualization window")
if hasattr(self, 'ani') and self.ani is not None:
self.ani.event_source.stop() # Stop the animation
# Clean up shared memory (only unlink, not close)
# Don't close/unlink the shared memory here because the main process is still using it
self.root.quit() # Stop the mainloop
self.root.destroy() # Close the window
# Create and run the Tkinter visualization window
visualization_window = VisualizationWindow()
try:
visualization_window.root.mainloop()
except Exception as e:
logging.error(f"Error in visualization: {str(e)}")
finally:
# Cleanup when done
eeg_shm.close()
acc_shm.close()
gyro_shm.close()
ppg_shm.close()
# Add a function to clean up shared memory resources
[docs]
def cleanup_shared_memory():
"""
Clean up shared memory resources and notify the video feed program.
This should be called when the Flask application exits to release system resources.
"""
global eeg_shm, acc_shm, gyro_shm, ppg_shm
eeg_shm.close()
acc_shm.close()
gyro_shm.close()
ppg_shm.close()
eeg_shm.unlink()
acc_shm.unlink()
gyro_shm.unlink()
ppg_shm.unlink()
global parent_conn
cmd = 'video_clean_up'
if send_command(parent_conn, cmd):
print('Video feed cleaned up')
else:
print('Failed to clean up video feed')
print(f"Shared memory cleaned up")
if __name__ == "__main__":
# Create shared memory for inter-process communication
# Create shared memory blocks
eeg_shm = shared_memory.SharedMemory(create=True, size=eeg_data.nbytes)
acc_shm = shared_memory.SharedMemory(create=True, size=acc_data.nbytes)
gyro_shm = shared_memory.SharedMemory(create=True, size=gyro_data.nbytes)
ppg_shm = shared_memory.SharedMemory(create=True, size=ppg_data.nbytes)
# Create numpy arrays that use the shared memory
shared_eeg_data = np.ndarray(eeg_data.shape, dtype=eeg_data.dtype, buffer=eeg_shm.buf)
shared_acc_data = np.ndarray(acc_data.shape, dtype=acc_data.dtype, buffer=acc_shm.buf)
shared_gyro_data = np.ndarray(gyro_data.shape, dtype=gyro_data.dtype, buffer=gyro_shm.buf)
shared_ppg_data = np.ndarray(ppg_data.shape, dtype=ppg_data.dtype, buffer=ppg_shm.buf)
# Initialize the shared memory arrays with the initial data
shared_eeg_data[:] = eeg_data[:]
shared_acc_data[:] = acc_data[:]
shared_gyro_data[:] = gyro_data[:]
shared_ppg_data[:] = ppg_data[:]
# Store shared memory names for the visualization process
shared_memory_names = {
'eeg': eeg_shm.name,
'acc': acc_shm.name,
'gyro': gyro_shm.name,
'ppg': ppg_shm.name
}
try:
connect_to_muse()
# Start Program 2 as a separate process
print("Program 'flask - control' started - will send commands to 'Camera Program p1'")
global parent_conn, child_conn # Make pipes global
parent_conn, child_conn = multiprocessing.Pipe()
# Start Program 2 as a separate process
p1_process = multiprocessing.Process(target=p1.main, args=(child_conn,))
p1_process.start()
# Wait for Program 2 to start (5 seconds)
print("Waiting for 'Camera Program p1' to start (5 seconds)...")
time.sleep(2)
send_command(parent_conn, 'initialize_camera')
time.sleep(3)
# Start Program 2 as a separate process
print("Program 'flask - control' started - will send commands to 'Image Stimuli Program p2'")
global parent_conn2, child_conn2 # Make pipes global
parent_conn2, child_conn2 = multiprocessing.Pipe()
# Start Program 2 as a separate process
p2_process = multiprocessing.Process(target=p2.main, args=(child_conn2,))
p2_process.start()
# Wait for Program 2 to start (5 seconds)
time.sleep(2)
# Start Program 4 as a separate process
print("Program 'flask - control' started - will send commands to Program 'Video Stimuli Program p4'")
global parent_conn4, child_conn4 # Make pipes global
parent_conn4, child_conn4 = multiprocessing.Pipe()
# Start Program 4 as a separate process
p4_process = multiprocessing.Process(target=p4.main, args=(child_conn4,))
p4_process.start()
# Wait for Program 4 to start (5 seconds)
time.sleep(2)
# run flask app
app.run(host='0.0.0.0', port=8082)
# app.run(port=8082)
finally:
# Clean up shared memory when the application exits
cleanup_shared_memory()