Usage#

This guide demonstrates how to connect to a BrainAccess device and acquire EEG data. The high-level EEG utility is the recommended starting point.

Quickstart: Data Acquisition with EEG Utility#

The brainaccess.utils.acquisition module offers a high-level EEG class that streamlines the process of connecting to a device, acquiring data, and converting it into an MNE-Python Raw object.

The typical workflow involves these steps:

  1. Initialize the EEG object: Create an instance of the EEG class.

  2. Set up the device: Use the setup() method to scan for and connect to your BrainAccess device.

  3. Start acquisition: Call start_acquisition() to begin streaming data.

  4. Retrieve data: Use get_mne() to get the collected data as an MNE Raw object for analysis.

  5. Stop acquisition: Call stop_acquisition() to halt the data stream.

Example Usage

Here is a practical example of how to use the EEG class to acquire data:

Continuous Data Acquisition with MNE#

This example shows how to stream data and integrate it with the MNE-Python library for more advanced analysis.

""" EEG measurement example

Example how to get measurements and
save to fif format
using acquisition class from brainaccess.utils

Change Bluetooth device name (line 23)
"""

import matplotlib.pyplot as plt
import matplotlib
import time

from brainaccess.utils import acquisition
from brainaccess.core.eeg_manager import EEGManager

matplotlib.use("TKAgg", force=True)

eeg = acquisition.EEG()

# define electrode locations
cap: dict = {
    0: "Fp1",
    1: "Fp2",
    2: "O1",
    3: "O2",
}

# define device name
device_name = "BA HALO 036"

# start EEG acquisition setup
with EEGManager() as mgr:
    eeg.setup(mgr, device_name=device_name, cap=cap, sfreq=1000)

    # Start acquiring data
    eeg.start_acquisition()
    print("Acquisition started")
    time.sleep(3)

    start_time = time.time()
    annotation = 1
    while time.time() - start_time < 10000:
        time.sleep(1)
        # send annotation to the device
        print(f"Sending annotation {annotation} to the device")
        eeg.annotate(str(annotation))
        annotation += 1

    print("Preparing to plot data")
    time.sleep(2)

    # get all eeg data and stop acquisition
    eeg.get_mne()
    eeg.stop_acquisition()
    mgr.disconnect()

# save EEG data to MNE fif format
eeg.data.save(f'{time.strftime("%Y%m%d_%H%M")}-raw.fif')
# Close brainaccess library
eeg.close()
# Show recorded data
eeg.data.mne_raw.filter(1, 40).plot(scalings="auto", verbose=False)
plt.show()

Key Methods in the `EEG` Class

  • __init__(mode="accumulate"): Initializes the acquisition object. * mode: Can be "accumulate" (gathers all data) or "roll" (keeps a rolling buffer of the latest data).

  • setup(mgr, device_name, cap, ...): Scans for devices, connects, and configures channels based on the provided cap layout.

  • start_acquisition(): Starts the EEG data stream.

  • stop_acquisition(): Stops the EEG data stream.

  • get_mne(tim=None, samples=None): Returns the acquired data as an MNE Raw object. * tim: If provided, returns the last tim seconds of data. * samples: If provided, returns the last samples number of samples.

  • start_impedance_measurement() / stop_impedance_measurement(): Methods to control impedance checking mode.

  • calc_impedances(tim=4): Calculates the impedance for each electrode using the last tim seconds of data.

This utility simplifies data handling by managing the underlying data buffers (EEGData and EEGData_roll classes) and providing a straightforward interface for acquisition.

Advanced Usage: The Core API#

For users who need fine-grained control over the device, the brainaccess.core module provides a low-level interface.

Minimal Data Acquisition#

A basic example of connecting, streaming, and plotting EEG data.

"""EEG measurement example

Example how to get measurements using brainaccess library

Change Bluetooth device name to your device name (line 57)
"""

import numpy as np
import time
import threading
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfiltfilt
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
import brainaccess.core.eeg_channel as eeg_channel
from brainaccess.core.gain_mode import (
    GainMode,
)


def butter_bandpass(lowcut, highcut, fs, order=2):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    sos = butter(order, [low, high], analog=False, btype="bandpass", output="sos")
    return sos


def butter_bandpass_filter(data, lowcut, highcut, fs, order=2):
    sos = butter_bandpass(lowcut, highcut, fs, order=order)
    y = sosfiltfilt(sos, data)
    return y


def _acq_closure(ch_number: int = 1, buffer_length: int = 1000):
    data = np.zeros((ch_number, buffer_length))
    mutex = threading.Lock()

    def _acq_callback(chunk, chunk_size):
        nonlocal data
        nonlocal mutex
        with mutex:
            data = np.roll(data, -chunk_size)
            data[:, -chunk_size:] = chunk

    def get_data():
        nonlocal data
        with mutex:
            return data.copy()

    return _acq_callback, get_data


if __name__ == "__main__":
    # Change to your device name
    # Device name can be found on the back of the device
    device_name = "BA HALO 001"

    # init the core
    core.init()

    # scan for devices
    devices = core.scan()
    # find the defined one
    print("Found devices:", len(devices))
    for device in devices:
        if device_name in device.name:
            port = device.name
            break
    else:
        raise Exception(f"Device {device_name} not found")

    # connect to the device
    with EEGManager() as mgr:
        print("Connecting to device:", port)
        _status = mgr.connect(port)
        if _status == 1:
            raise Exception("Connection failed")
        elif _status == 2:
            raise Exception("Stream is incompatible. Update the firmware.")

        # battery info
        print(f"battery level: {mgr.get_battery_info().level} %")

        # Get electrode count
        device_features = mgr.get_device_features()
        eeg_channels_number = device_features.electrode_count()
        print(f"Device has {eeg_channels_number} EEG channels")

        # set the channels
        ch_nr = 0
        # for i in range(eeg_channels_number):
        for i in range(3, eeg_channels_number):
            mgr.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
            ch_nr += 1
            mgr.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT + i, GainMode.X8)
        mgr.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True)

        # check if the device has accelerometer
        has_accel = device_features.has_accel()
        if has_accel:
            print("Setting the accelerometer")
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER, True)
            ch_nr += 1
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 1, True)
            ch_nr += 1
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 2, True)
            ch_nr += 1

        mgr.set_channel_enabled(eeg_channel.SAMPLE_NUMBER, True)
        ch_nr += 1

        # set the streaming channel, shows 0 if Bluetooth connection
        # was lost and 0 was added to the data
        mgr.set_channel_enabled(eeg_channel.STREAMING, True)
        ch_nr += 1

        # get the sample rate
        sr = mgr.get_sample_frequency()

        # define the callback for the acquisition
        duration = 10
        buffer_time = int(sr * duration)  # seconds
        _acq_callback, get_data = _acq_closure(
            ch_number=ch_nr, buffer_length=buffer_time
        )
        mgr.set_callback_chunk(_acq_callback)

        # load defined configuration
        mgr.load_config()

        # start the stream
        mgr.start_stream()
        print("Stream started")

        # collect data
        time.sleep(4)
        for i in range(duration):
            time.sleep(1)
            print(f"Collecting data {i + 1}/{duration}")

        # get the data
        dat = get_data()

        # stop the stream
        mgr.stop_stream()
        print("Stream stopped")
        time.sleep(1)

        # The EEGManager destructor calls mgr.disconnect()
        # so we don't need to call it here

    print("Disconnected from the device")
    time.sleep(1)

    # close the core
    core.close()
    print("Core closed")

    # plot the data
    print("Plotting the data")

    # Create channel labels
    ch = ["sample"]
    ch.extend([f"ch_{i}" for i in range(eeg_channels_number)])
    if has_accel:
        ch.extend(["accel_x", "accel_y", "accel_z"])
    ch.extend(["streaming"])

    # Apply bandpass filter to EEG data
    eeg_data = dat[1 : eeg_channels_number + 1, :]
    eeg_data = butter_bandpass_filter(eeg_data, 1, 40, sr)

    # Normalize EEG data
    eps = 1e-20  # To avoid division by zero in this example
    eeg_data = (eeg_data - np.mean(eeg_data, axis=0)) / (np.std(eeg_data, axis=0) + eps)
    # Add offsets for visualization
    eeg_data = eeg_data + np.arange(eeg_channels_number)[:, np.newaxis]

    # Create subplots
    fig, axs = plt.subplots(4, 1, figsize=(10, 10))

    # Plot the data
    axs[0].plot(eeg_data.T)
    axs[0].set_ylabel("EEG Channels")

    if has_accel:
        axs[1].plot(dat[len(ch) - 4 : -1, :].T)
        axs[1].set_ylabel("Accelerometer")
    else:
        axs[1].axis("off")  # Hide the unused subplot

    axs[2].plot(dat[0, :])
    axs[2].set_ylabel("Sample")

    axs[3].plot(dat[-1, :])
    axs[3].set_ylabel("Streaming")

    plt.show()

Module Reference#

For detailed information on all available classes and methods, refer to the brainaccess.core module reference.