Usage#
This guide demonstrates how to connect to a BrainAccess device and acquire EEG data. The high-level EEG utility is the recommended starting point.
Quickstart: Data Acquisition with EEG Utility#
The brainaccess.utils.acquisition module offers a high-level EEG class that streamlines the process of connecting to a device, acquiring data, and converting it into an MNE-Python Raw object.
The typical workflow involves these steps:
Initialize the EEG object: Create an instance of the
EEGclass.Set up the device: Use the
setup()method to scan for and connect to your BrainAccess device.Start acquisition: Call
start_acquisition()to begin streaming data.Retrieve data: Use
get_mne()to get the collected data as an MNE Raw object for analysis.Stop acquisition: Call
stop_acquisition()to halt the data stream.
Example Usage
Here is a practical example of how to use the EEG class to acquire data:
Continuous Data Acquisition with MNE#
This example shows how to stream data and integrate it with the MNE-Python library for more advanced analysis.
""" EEG measurement example
Example how to get measurements and
save to fif format
using acquisition class from brainaccess.utils
Change Bluetooth device name (line 23)
"""
import matplotlib.pyplot as plt
import matplotlib
import time
from brainaccess.utils import acquisition
from brainaccess.core.eeg_manager import EEGManager
matplotlib.use("TKAgg", force=True)
eeg = acquisition.EEG()
# define electrode locations
cap: dict = {
0: "Fp1",
1: "Fp2",
2: "O1",
3: "O2",
}
# define device name
device_name = "BA HALO 036"
# start EEG acquisition setup
with EEGManager() as mgr:
eeg.setup(mgr, device_name=device_name, cap=cap, sfreq=1000)
# Start acquiring data
eeg.start_acquisition()
print("Acquisition started")
time.sleep(3)
start_time = time.time()
annotation = 1
while time.time() - start_time < 10000:
time.sleep(1)
# send annotation to the device
print(f"Sending annotation {annotation} to the device")
eeg.annotate(str(annotation))
annotation += 1
print("Preparing to plot data")
time.sleep(2)
# get all eeg data and stop acquisition
eeg.get_mne()
eeg.stop_acquisition()
mgr.disconnect()
# save EEG data to MNE fif format
eeg.data.save(f'{time.strftime("%Y%m%d_%H%M")}-raw.fif')
# Close brainaccess library
eeg.close()
# Show recorded data
eeg.data.mne_raw.filter(1, 40).plot(scalings="auto", verbose=False)
plt.show()
Key Methods in the `EEG` Class
__init__(mode="accumulate"): Initializes the acquisition object. * mode: Can be"accumulate"(gathers all data) or"roll"(keeps a rolling buffer of the latest data).setup(mgr, device_name, cap, ...): Scans for devices, connects, and configures channels based on the provided cap layout.start_acquisition(): Starts the EEG data stream.stop_acquisition(): Stops the EEG data stream.get_mne(tim=None, samples=None): Returns the acquired data as an MNE Raw object. * tim: If provided, returns the last tim seconds of data. * samples: If provided, returns the last samples number of samples.start_impedance_measurement()/stop_impedance_measurement(): Methods to control impedance checking mode.calc_impedances(tim=4): Calculates the impedance for each electrode using the last tim seconds of data.
This utility simplifies data handling by managing the underlying data buffers (EEGData and EEGData_roll classes) and providing a straightforward interface for acquisition.
Advanced Usage: The Core API#
For users who need fine-grained control over the device, the brainaccess.core module provides a low-level interface.
Minimal Data Acquisition#
A basic example of connecting, streaming, and plotting EEG data.
"""EEG measurement example
Example how to get measurements using brainaccess library
Change Bluetooth device name to your device name (line 57)
"""
import numpy as np
import time
import threading
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfiltfilt
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
import brainaccess.core.eeg_channel as eeg_channel
from brainaccess.core.gain_mode import (
GainMode,
)
def butter_bandpass(lowcut, highcut, fs, order=2):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
sos = butter(order, [low, high], analog=False, btype="bandpass", output="sos")
return sos
def butter_bandpass_filter(data, lowcut, highcut, fs, order=2):
sos = butter_bandpass(lowcut, highcut, fs, order=order)
y = sosfiltfilt(sos, data)
return y
def _acq_closure(ch_number: int = 1, buffer_length: int = 1000):
data = np.zeros((ch_number, buffer_length))
mutex = threading.Lock()
def _acq_callback(chunk, chunk_size):
nonlocal data
nonlocal mutex
with mutex:
data = np.roll(data, -chunk_size)
data[:, -chunk_size:] = chunk
def get_data():
nonlocal data
with mutex:
return data.copy()
return _acq_callback, get_data
if __name__ == "__main__":
# Change to your device name
# Device name can be found on the back of the device
device_name = "BA HALO 001"
# init the core
core.init()
# scan for devices
devices = core.scan()
# find the defined one
print("Found devices:", len(devices))
for device in devices:
if device_name in device.name:
port = device.name
break
else:
raise Exception(f"Device {device_name} not found")
# connect to the device
with EEGManager() as mgr:
print("Connecting to device:", port)
_status = mgr.connect(port)
if _status == 1:
raise Exception("Connection failed")
elif _status == 2:
raise Exception("Stream is incompatible. Update the firmware.")
# battery info
print(f"battery level: {mgr.get_battery_info().level} %")
# Get electrode count
device_features = mgr.get_device_features()
eeg_channels_number = device_features.electrode_count()
print(f"Device has {eeg_channels_number} EEG channels")
# set the channels
ch_nr = 0
# for i in range(eeg_channels_number):
for i in range(3, eeg_channels_number):
mgr.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
ch_nr += 1
mgr.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT + i, GainMode.X8)
mgr.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
# check if the device has accelerometer
has_accel = device_features.has_accel()
if has_accel:
print("Setting the accelerometer")
mgr.set_channel_enabled(eeg_channel.ACCELEROMETER, True)
ch_nr += 1
mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 1, True)
ch_nr += 1
mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 2, True)
ch_nr += 1
mgr.set_channel_enabled(eeg_channel.SAMPLE_NUMBER, True)
ch_nr += 1
# set the streaming channel, shows 0 if Bluetooth connection
# was lost and 0 was added to the data
mgr.set_channel_enabled(eeg_channel.STREAMING, True)
ch_nr += 1
# get the sample rate
sr = mgr.get_sample_frequency()
# define the callback for the acquisition
duration = 10
buffer_time = int(sr * duration) # seconds
_acq_callback, get_data = _acq_closure(
ch_number=ch_nr, buffer_length=buffer_time
)
mgr.set_callback_chunk(_acq_callback)
# load defined configuration
mgr.load_config()
# start the stream
mgr.start_stream()
print("Stream started")
# collect data
time.sleep(4)
for i in range(duration):
time.sleep(1)
print(f"Collecting data {i + 1}/{duration}")
# get the data
dat = get_data()
# stop the stream
mgr.stop_stream()
print("Stream stopped")
time.sleep(1)
# The EEGManager destructor calls mgr.disconnect()
# so we don't need to call it here
print("Disconnected from the device")
time.sleep(1)
# close the core
core.close()
print("Core closed")
# plot the data
print("Plotting the data")
# Create channel labels
ch = ["sample"]
ch.extend([f"ch_{i}" for i in range(eeg_channels_number)])
if has_accel:
ch.extend(["accel_x", "accel_y", "accel_z"])
ch.extend(["streaming"])
# Apply bandpass filter to EEG data
eeg_data = dat[1 : eeg_channels_number + 1, :]
eeg_data = butter_bandpass_filter(eeg_data, 1, 40, sr)
# Normalize EEG data
eps = 1e-20 # To avoid division by zero in this example
eeg_data = (eeg_data - np.mean(eeg_data, axis=0)) / (np.std(eeg_data, axis=0) + eps)
# Add offsets for visualization
eeg_data = eeg_data + np.arange(eeg_channels_number)[:, np.newaxis]
# Create subplots
fig, axs = plt.subplots(4, 1, figsize=(10, 10))
# Plot the data
axs[0].plot(eeg_data.T)
axs[0].set_ylabel("EEG Channels")
if has_accel:
axs[1].plot(dat[len(ch) - 4 : -1, :].T)
axs[1].set_ylabel("Accelerometer")
else:
axs[1].axis("off") # Hide the unused subplot
axs[2].plot(dat[0, :])
axs[2].set_ylabel("Sample")
axs[3].plot(dat[-1, :])
axs[3].set_ylabel("Streaming")
plt.show()
Module Reference#
For detailed information on all available classes and methods, refer to the brainaccess.core module reference.