Source code for example_minimal_eeg_acquisition

"""EEG measurement example

Example how to get measurements using brainaccess library

Change Bluetooth device name to your device name (line 68)
"""

import matplotlib
import numpy as np
import time
import threading
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfiltfilt
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
import brainaccess.core.eeg_channel as eeg_channel
from brainaccess.core.gain_mode import (
    GainMode,
)


matplotlib.use("TKAgg", force=True)


[docs] def butter_bandpass( lowcut: float, highcut: float, fs: int, order: int = 2 ) -> np.ndarray: """Design a bandpass Butterworth filter.""" nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq sos = butter(order, [low, high], analog=False, btype="bandpass", output="sos") return sos
[docs] def butter_bandpass_filter( data: np.ndarray, lowcut: float, highcut: float, fs: int, order: int = 2 ) -> np.ndarray: """Apply a bandpass Butterworth filter to the data.""" sos = butter_bandpass(lowcut, highcut, fs, order=order) y = sosfiltfilt(sos, data) return y
def _acq_closure(ch_number: int = 1, buffer_length: int = 1000) -> tuple: """Acquisition callback closure.""" data = np.zeros((ch_number, buffer_length)) mutex = threading.Lock() def _acq_callback(chunk: list, chunk_size: int) -> None: nonlocal data nonlocal mutex with mutex: data = np.roll(data, -chunk_size) data[:, -chunk_size:] = chunk def get_data() -> np.ndarray: nonlocal data with mutex: return data.copy() return _acq_callback, get_data if __name__ == "__main__": # Change to your device name # Device name can be found on the back of the device
[docs] device_name = "BA HALO 001"
# init the core core.init() # scan for devices devices = core.scan() print("Found devices:", len(devices)) print(f"Devices: {[device.name for device in devices]}") # connect to the device with EEGManager() as mgr: print("Connecting to device:", device_name) _status = mgr.connect(device_name) if _status == 2: raise Exception("Stream is incompatible. Update the firmware.") elif _status > 0: raise Exception("Connection failed") # battery info print(f"battery level: {mgr.get_battery_info().level} %") # Get electrode count device_features = mgr.get_device_features() eeg_channels_number = device_features.electrode_count() print(f"Device has {eeg_channels_number} EEG channels") # set the channels ch_nr = 0 for i in range(0, eeg_channels_number): mgr.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True) # noqa ch_nr += 1 mgr.set_channel_gain( eeg_channel.ELECTRODE_MEASUREMENT + i, GainMode.X8 ) # noqa mgr.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True) # Keep track of enabled eeg channel number eeg_enabled_nr = ch_nr # check if the device has accelerometer has_accel = device_features.has_accel() if has_accel: print("Setting the accelerometer") mgr.set_channel_enabled(eeg_channel.ACCELEROMETER, True) ch_nr += 1 mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 1, True) ch_nr += 1 mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 2, True) ch_nr += 1 mgr.set_channel_enabled(eeg_channel.SAMPLE_NUMBER, True) ch_nr += 1 # set the streaming channel, shows 0 if Bluetooth connection # was lost and 0 was added to the data mgr.set_channel_enabled(eeg_channel.STREAMING, True) ch_nr += 1 # get the sample rate sr = mgr.get_sample_frequency() # define the callback for the acquisition duration = 10 buffer_time = int(sr * duration) # seconds _acq_callback, get_data = _acq_closure( ch_number=ch_nr, buffer_length=buffer_time ) mgr.set_callback_chunk(_acq_callback) # load defined configuration mgr.load_config() # start the stream mgr.start_stream() print("Stream started") # collect data time.sleep(4) for i in range(duration): time.sleep(1) print(f"Collecting data {i + 1}/{duration}") # get the data dat = get_data() # stop the stream mgr.stop_stream() print("Stream stopped") time.sleep(1) # The EEGManager destructor calls mgr.disconnect() # so we don't need to call it here print("Disconnected from the device") time.sleep(1) # close the core core.close() print("Core closed") # plot the data print("Plotting the data") # Apply bandpass filter to EEG data eeg_data = dat[1 : eeg_enabled_nr + 1, :] eeg_data = eeg_data - np.mean(eeg_data, axis=1, keepdims=True) eeg_data = butter_bandpass_filter(eeg_data, 1, 40, sr) # Add offsets for visualization eeg_data = eeg_data + np.arange(eeg_enabled_nr)[:, np.newaxis] # Create subplots fig, axs = plt.subplots(4, 1, figsize=(10, 10)) # Plot the data axs[0].plot(eeg_data.T) axs[0].set_ylabel("EEG Channels") axs[0].set_xlim([300, 2500]) if has_accel: axs[1].plot(dat[-4:-1, :].T) axs[1].set_ylabel("Accelerometer") else: axs[1].axis("off") # Hide the unused subplot axs[2].plot(dat[0, :]) axs[2].set_ylabel("Sample") axs[3].plot(dat[-1, :]) axs[3].set_ylabel("Streaming") plt.show()