Source code for brainaccess.core.eeg_manager

import ctypes
import warnings
import threading
import numpy as np
import copy
from multimethod import multimethod
from typing import Callable, Union, Optional, Any

from brainaccess.utils.exceptions import _callback, _handle_error, BrainAccessException
from brainaccess.core import _dll
from brainaccess.core.battery_info import BatteryInfo
from brainaccess.core.device_info import DeviceInfo
from brainaccess.core.device_model import DeviceModel
from brainaccess.core.gain_mode import GainMode
from brainaccess.core.stream_rate import StreamRate
from brainaccess.core.annotation import Annotation
from brainaccess.core.polarity import Polarity
from brainaccess.core.impedance_measurement_mode import ImpedanceMeasurementMode  # noqa
from brainaccess.core.device_features import DeviceFeatures


# ctypes
# new_eeg_manager
_dll.ba_eeg_manager_new.argtypes = []
_dll.ba_eeg_manager_new.restype = ctypes.c_void_p
# destructor
_dll.ba_eeg_manager_free.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_free.restype = None
# connect(port)
_dll.ba_eeg_manager_connect.argtypes = [
    ctypes.c_void_p,
    ctypes.c_char_p,
    ctypes.CFUNCTYPE(None, ctypes.c_bool, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_connect.restype = ctypes.c_uint8
# is_connected()
_dll.ba_eeg_manager_is_connected.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_is_connected.restype = ctypes.c_bool
# disconnect()
_dll.ba_eeg_manager_disconnect.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_disconnect.restype = None
# start_stream()
_dll.ba_eeg_manager_start_stream.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_start_stream.restype = ctypes.c_uint8
# stop_stream()
_dll.ba_eeg_manager_stop_stream.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_stop_stream.restype = ctypes.c_uint8
# is_streaming()
_dll.ba_eeg_manager_is_streaming.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_is_streaming.restype = ctypes.c_bool
# load_config()
_dll.ba_eeg_manager_load_config.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_load_config.restype = ctypes.c_uint8
# get_battery_info()
_dll.ba_eeg_manager_get_battery_info.argtypes = [
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_get_battery_info.restype = BatteryInfo
# set_channel_enabled()
_dll.ba_eeg_manager_set_channel_enabled.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint16,
    ctypes.c_bool,
]
_dll.ba_eeg_manager_set_channel_enabled.restype = None
# set_channel_gain()
_dll.ba_eeg_manager_set_channel_gain.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint16,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_channel_gain.restype = None
# set_channel_bias()
_dll.ba_eeg_manager_set_channel_bias.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint16,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_channel_bias.restype = None
# set_impedance_mode()
_dll.ba_eeg_manager_set_impedance_mode.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_impedance_mode.restype = None
# get_device_info()
_dll.ba_eeg_manager_get_device_info.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_get_device_info.restype = ctypes.POINTER(DeviceInfo)
# get_channel_index()
_dll.ba_eeg_manager_get_channel_index.argtypes = [ctypes.c_void_p, ctypes.c_uint16]
_dll.ba_eeg_manager_get_channel_index.restype = ctypes.c_size_t
# get_sample_frequency()
_dll.ba_eeg_manager_get_sample_frequency.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_get_sample_frequency.restype = ctypes.c_uint16
# set_callback_chunk()
_dll.ba_eeg_manager_set_callback_chunk.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(
        None,
        ctypes.POINTER(ctypes.c_void_p),
        ctypes.c_size_t,
        ctypes.c_void_p,
    ),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_set_callback_chunk.restype = None
# set_callback_battery()
_dll.ba_eeg_manager_set_callback_battery.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.POINTER(BatteryInfo), ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_set_callback_battery.restype = None
# set_callback_disconnect()
_dll.ba_eeg_manager_set_callback_disconnect.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_set_callback_disconnect.restype = None
# update firmware
_dll.ba_eeg_manager_start_update.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_size_t),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_start_update.restype = ctypes.c_uint8
# annotate()
_dll.ba_eeg_manager_annotate.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
_dll.ba_eeg_manager_annotate.restype = ctypes.c_uint8
# get_annotations()
_dll.ba_eeg_manager_get_annotations.argtypes = [
    ctypes.c_void_p,
    ctypes.POINTER(ctypes.POINTER(Annotation)),
    ctypes.POINTER(ctypes.c_size_t),
]
_dll.ba_eeg_manager_get_annotations.restype = None
# clear_annotations()
_dll.ba_eeg_manager_clear_annotations.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_clear_annotations.restype = None

# Stream size type info super secret function thingy
_dll.ba_eeg_manager_get_stream_channel_data_types.argtypes = [
    ctypes.c_void_p,
    ctypes.POINTER(ctypes.POINTER(ctypes.c_uint8)),
    ctypes.POINTER(ctypes.c_size_t),
]
_dll.ba_eeg_manager_get_stream_channel_data_types.restype = None

# Set sample rate
_dll.ba_eeg_manager_set_data_stream_rate.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_data_stream_rate.restype = ctypes.c_uint8

_managers_mtx = threading.Lock()
_managers: dict = dict()

_types_map = [
    ctypes.c_float,  # 0
    ctypes.c_uint8,  # 1
    ctypes.c_size_t,  # 2
    ctypes.c_double,  # 3
]


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_stop_stream(data: ctypes.c_void_p) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_stop_stream_mix:
                cbk = mgr._callback_stop_stream
                if cbk is not None:
                    cbk()


@ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_size_t)
def _callback_ota_update(data: ctypes.c_void_p, progress: int, total: int) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_ota_update_mtx:
                cbk = mgr._callback_ota_update
                if cbk is not None:
                    cbk(progress, total)


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_start_stream(data: ctypes.c_void_p) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_start_stream_mix:
                cbk = mgr._callback_start_stream
                if cbk is not None:
                    cbk()


@ctypes.CFUNCTYPE(
    None, ctypes.POINTER(ctypes.c_void_p), ctypes.c_size_t, ctypes.c_void_p
)
def _callback_chunk(chunk_data: list, chunk_size: int, data: Any) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_chunk_mtx:
                cbk = mgr._callback_chunk
                if cbk is not None:
                    # Get channel sizes and type information
                    types_ptr = ctypes.POINTER(ctypes.c_uint8)()
                    types_size = ctypes.c_size_t()
                    _dll.ba_eeg_manager_get_stream_channel_data_types(
                        data, ctypes.byref(types_ptr), ctypes.byref(types_size)
                    )
                    types = [_types_map[types_ptr[i]] for i in range(types_size.value)]

                    chunk_arrays = []
                    for i, ctype in enumerate(types):
                        data_pointer = ctypes.cast(
                            chunk_data[i], ctypes.POINTER(ctype * chunk_size)
                        )
                        np_array = np.ctypeslib.as_array(
                            data_pointer.contents, shape=(chunk_size,)
                        )
                        chunk_arrays.append(np_array)

                    cbk(chunk_arrays, chunk_size)


@ctypes.CFUNCTYPE(None, ctypes.POINTER(BatteryInfo), ctypes.c_void_p)
def _callback_battery(b_info, data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_battery_mtx:
                cbk = mgr._callback_battery
                if cbk is not None:
                    cbk(copy.copy(b_info[0]))


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_load_config(data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_load_config_mtx:
                cbk = mgr._callback_load_config
                if cbk is not None:
                    cbk()


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_disconnect(data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_disconnect_mtx:
                cbk = mgr._callback_disconnect
                if cbk is not None:
                    cbk()


[docs] class EEGManager: """Manages all communication with a BrainAccess device. The `EEGManager` is the primary interface for connecting to, configuring, and streaming data from a BrainAccess device. It handles the low-level details of Bluetooth communication and provides a high-level API for controlling the device. """ def __init__(self) -> None: """Initializes a new EEGManager instance. Warning ------- The BrainAccess Core library must be initialized with `init()` before creating an `EEGManager`. """
[docs] self.conenction_success: int = 0
self._callback_chunk_mtx = threading.Lock() self._callback_battery_mtx = threading.Lock() self._callback_disconnect_mtx = threading.Lock() self._callback_start_stream_mtx = threading.Lock() self._callback_stop_stream_mtx = threading.Lock() self._callback_load_config_mtx = threading.Lock() self._callback_ota_update_mtx = threading.Lock() self._manager = _dll.ba_eeg_manager_new() with _managers_mtx: _managers[self._manager] = self self._callback_disconnect = lambda: None _dll.ba_eeg_manager_set_callback_disconnect( self._manager, _callback_disconnect, self._manager ) def __enter__(self) -> "EEGManager": return self def __exit__(self, exc_type, exc_value, traceback) -> None: self.destroy()
[docs] def destroy(self) -> None: """Releases all resources associated with the EEG manager. This method should be called when the manager is no longer needed to ensure a clean shutdown. Warning ------- This method must be called exactly once. """ self.disconnect() # prevent callback deadlock by disconnecting first. with _managers_mtx: _dll.ba_eeg_manager_free(self._manager) del _managers[self._manager]
[docs] def disconnect(self) -> None: """Disconnects from the device, if a connection is active.""" _dll.ba_eeg_manager_disconnect(self._manager)
[docs] def connect(self, bt_device_name: str) -> int: """Connects to a BrainAccess device. Parameters ---------- bt_device_name : str The name of the device to connect to (e.g., "HALO 001", "MINI 001"). Returns ------- int - 0: Connection successful. - 1: Connection failed. - 2: Connection successful, but the data stream is incompatible. A firmware update is recommended. """ cbk, _ = _callback() self.connection_success = _dll.ba_eeg_manager_connect( self._manager, ctypes.c_char_p(bt_device_name.encode("ascii")), cbk, None ) if self.connection_success == 2: warnings.warn("Stream is incompatible. Update the firmware.") return self.connection_success
[docs] def is_connected(self) -> bool: """Checks if a connection to a device is currently active. Returns ------- bool True if connected, False otherwise. """ return _dll.ba_eeg_manager_is_connected(self._manager)
[docs] def start_stream(self, callback: Union[Callable, None] = None) -> bool: """Starts streaming data from the device. Parameters ---------- callback : callable, optional A function to be called when the stream has successfully started. Returns ------- bool True if the stream was started successfully. Raises ------ BrainAccessException If the stream is already running or could not be started. """ if self.connection_success == 2: raise BrainAccessException("Stream is incompatible. Update the firmware.") if callback is not None: with self._callback_start_stream_mtx: self._callback_start_stream = callback else: with self._callback_start_stream_mtx: self._callback_start_stream = lambda: None if self.is_streaming(): raise BrainAccessException("Stream already running") return _handle_error( _dll.ba_eeg_manager_start_stream( self._manager, _callback_start_stream, self._manager ) )
[docs] def stop_stream(self, callback: Union[Callable, None] = None) -> bool: """Stops the data stream from the device. Parameters ---------- callback : callable, optional A function to be called when the stream has successfully stopped. Returns ------- bool True if the stream was stopped successfully. Raises ------ BrainAccessException If the stream is not currently running or could not be stopped. """ if callback is not None: with self._callback_stop_stream_mtx: self._callback_stop_stream = callback else: with self._callback_stop_stream_mtx: self._callback_stop_stream = lambda: None if not self.is_connected(): return _handle_error(1) if not self.is_streaming(): raise BrainAccessException("Stream not running") return _handle_error( _dll.ba_eeg_manager_stop_stream(self._manager, _callback_stop_stream, None) )
[docs] def is_streaming(self) -> bool: """Checks if the device is currently streaming data. Returns ------- bool True if the stream is active, False otherwise. """ return _dll.ba_eeg_manager_is_streaming(self._manager)
[docs] def load_config(self, callback: Union[Callable, None] = None) -> None: """Applies the current channel and other settings to the device. Parameters ---------- callback : callable, optional A function to be called when the configuration has been successfully loaded onto the device. """ if callback is not None: with self._callback_load_config_mtx: self._callback_load_config = callback else: with self._callback_load_config_mtx: self._callback_load_config = lambda: None _handle_error( _dll.ba_eeg_manager_load_config( self._manager, _callback_load_config, self._manager ) )
[docs] def get_battery_info(self) -> BatteryInfo: """Retrieves the current battery status from the device. Returns ------- BatteryInfo An object containing battery level and charging status. """ return _dll.ba_eeg_manager_get_battery_info(self._manager)
[docs] def set_channel_enabled(self, channel: int, state: bool) -> None: """Enables or disables a specific data channel. Warning ------- Channel settings are reset when the stream is stopped. This method must be called before each stream start to configure the desired channels. Parameters ---------- channel : int The ID of the channel to enable or disable (see `brainaccess.core.eeg_channel`). state : bool True to enable the channel, False to disable it. Raises ------ BrainAccessException If the device is currently streaming. """ if self.is_streaming(): raise BrainAccessException("Cannot change channel state while streaming") _dll.ba_eeg_manager_set_channel_enabled( self._manager, ctypes.c_uint16(channel), ctypes.c_bool(state) )
[docs] def set_channel_gain(self, channel: int, gain: GainMode) -> None: """Sets the gain mode for a specific channel. Lower gain values increase the measurable voltage range at the cost of reduced amplitude resolution. A gain of X12 is optimal for most use cases. Warning ------- This setting takes effect on stream start and is reset on stream stop. It only affects channels that support gain control, such as electrode measurement channels. Parameters ---------- channel : int The ID of the channel to configure. gain : GainMode The desired gain mode. Raises ------ BrainAccessException If the device is streaming or the channel number is invalid. """ if channel < 0 or channel > 33: raise BrainAccessException("Invalid channel number") if self.is_streaming(): raise BrainAccessException("Cannot change channel gain while streaming") _dll.ba_eeg_manager_set_channel_gain( self._manager, ctypes.c_uint16(channel), ctypes.c_uint8(gain.value) )
@multimethod
[docs] def set_channel_bias(self, channel: int, bias: bool) -> None: """ DEPRECATED: Use the version with `Polarity` instead. Configures an electrode channel for bias feedback. The signals from bias channels are inverted and fed back into the bias electrode to reduce common-mode noise (e.g., from power lines). Warning ------- This setting takes effect on stream start and is reset on stream stop. Only select channels with good skin contact for bias feedback. Parameters ---------- channel : int The ID of the channel to use for bias feedback. bias : bool True to enable bias feedback for this channel, False to disable. Raises ------ BrainAccessException If the device is currently streaming. """ warnings.warn( "This function is deprecated, use the version with Polarity instead.", DeprecationWarning, ) if self.is_streaming(): raise BrainAccessException("Cannot change channel bias while streaming") self.set_channel_bias(channel, Polarity.BOTH if bias else Polarity.NONE)
@set_channel_bias.register # type: ignore def set_channel_bias(self, channel: int, p: Polarity) -> None: """Configures an electrode channel for bias feedback. The signals from bias channels are inverted and fed back into the bias electrode to reduce common-mode noise (e.g., from power lines). Warning ------- This setting takes effect on stream start and is reset on stream stop. Only select channels with good skin contact for bias feedback. Parameters ---------- channel : int The ID of the channel to use for bias feedback. p : Polarity The polarity to use for the bias signal. If the device is not bipolar, use `Polarity.BOTH`. Raises ------ BrainAccessException If the device is currently streaming. """ if self.is_streaming(): raise BrainAccessException("Cannot change channel bias while streaming") _dll.ba_eeg_manager_set_channel_bias( self._manager, ctypes.c_uint16(channel), ctypes.c_uint8(p.value) )
[docs] def set_impedance_mode(self, mode: ImpedanceMeasurementMode): """Configures the device for impedance measurement. This mode injects a small, known current through the bias electrodes and measures the resulting voltage at each electrode. The impedance can then be calculated using Ohm's law (Impedance = Voltage / Current). Warning ------- This setting takes effect on stream start and is reset on stream stop. Parameters ---------- mode : ImpedanceMeasurementMode The impedance measurement mode to set. Raises ------ BrainAccessException If the device is currently streaming. """ if self.is_streaming(): raise BrainAccessException("Cannot change impedance mode while streaming") _dll.ba_eeg_manager_set_impedance_mode( self._manager, ctypes.c_uint8(mode.value) )
[docs] def get_device_info(self) -> DeviceInfo: """Retrieves static information about the connected device. Warning ------- This method should only be called after a successful connection has been established. Returns ------- DeviceInfo An object containing the device model, hardware/firmware versions, and other static information. """ return _dll.ba_eeg_manager_get_device_info(self._manager).contents
[docs] def get_channel_index(self, channel: int) -> int: """Gets the index of a specific channel within the data chunk. This allows you to locate the data for a particular channel within the array provided by the chunk callback. Parameters ---------- channel : int The ID of the channel. Returns ------- int The index of the channel's data in the chunk array. Raises ------ BrainAccessException If the channel does not exist or is not currently being streamed. """ val = _dll.ba_eeg_manager_get_channel_index( self._manager, ctypes.c_uint16(channel) ) if val == ctypes.c_size_t(-1).value: raise BrainAccessException( "Channel does not exist or is not currently streaming" ) return val
[docs] def get_sample_frequency(self) -> int: """Gets the sampling frequency of the device. Returns ------- int The sampling frequency. """ rate_value = StreamRate(_dll.ba_eeg_manager_get_sample_frequency(self._manager)) return rate_value.to_hz
[docs] def set_callback_chunk(self, f: Callable) -> None: """Sets a callback function to be executed when a new data chunk is available. Warning ------- The callback may be executed in a different thread. Ensure that any shared data is properly synchronized and that the callback executes quickly to avoid blocking communication with the device. Parameters ---------- f : callable The function to be called. It should accept a list of NumPy arrays (one for each channel) and the chunk size as arguments. Set to `None` to disable the callback. """ with self._callback_chunk_mtx: self._callback_chunk = f _dll.ba_eeg_manager_set_callback_chunk( self._manager, _callback_chunk if f is not None else None, self._manager )
[docs] def set_callback_battery(self, callback: Union[Callable, None] = None) -> None: """Sets a callback function to be executed when the battery status is updated. Warning ------- The callback may be executed in a different thread. Ensure that any shared data is properly synchronized and that the callback executes quickly to avoid blocking communication with the device. Parameters ---------- callback : callable, optional The function to be called. It should accept a `BatteryInfo` object as an argument. Set to `None` to disable. Raises ------ BrainAccessException If the callback is `None`. """ if callback is not None: with self._callback_battery_mtx: self._callback_battery = callback else: raise BrainAccessException("Callback cannot be null") _dll.ba_eeg_manager_set_callback_battery( self._manager, _callback_battery, self._manager, )
[docs] def set_callback_disconnect(self, callback: Optional[Callable] = None) -> None: """Sets a callback function to be executed when the device disconnects. Warning ------- The callback may be executed in a different thread. Ensure that any shared data is properly synchronized and that the callback executes quickly. Parameters ---------- callback : callable, optional The function to be called on disconnect. Set to `None` to disable. """ if callback is None: with self._callback_disconnect_mtx: self._callback_disconnect = lambda: None else: with self._callback_disconnect_mtx: self._callback_disconnect = callback _dll.ba_eeg_manager_set_callback_disconnect( self._manager, _callback_disconnect, self._manager )
[docs] def annotate(self, annotation: str) -> None: """Adds a timestamped annotation to the data stream. Warning ------- Annotations are cleared when the device is disconnected. Parameters ---------- annotation : str The text of the annotation. Raises ------ BrainAccessException If the annotation is `None` or empty. """ if annotation is None: raise BrainAccessException("Annotation cannot be None") if len(annotation) == 0: raise BrainAccessException("Annotation cannot be empty") _handle_error( _dll.ba_eeg_manager_annotate( self._manager, ctypes.c_char_p(annotation.encode("ascii")) ) )
[docs] def get_device_features(self) -> DeviceFeatures: """Retrieves the features and capabilities of the connected device. Returns ------- DeviceFeatures An object with methods to query for features like gyroscope, accelerometer, bipolar electrodes, and electrode count. """ info = self.get_device_info() return DeviceFeatures(info)
[docs] def get_annotations(self) -> dict: """Retrieves all accumulated annotations. Warning ------- Annotations are cleared when the device is disconnected. Returns ------- dict A dictionary with two keys: - "annotations": A list of annotation strings. - "timestamps": A list of corresponding timestamps. """ ae = ctypes.POINTER(Annotation)() size = ctypes.c_size_t() _dll.ba_eeg_manager_get_annotations( self._manager, ctypes.pointer(ae), ctypes.pointer(size) ) annotations = [ae[i] for i in range(size.value)] timestamps = [x.timestamp for x in annotations] annotations = [x.annotation for x in annotations] return {"annotations": annotations, "timestamps": timestamps}
[docs] def clear_annotations(self) -> None: """Clears all existing annotations.""" _dll.ba_eeg_manager_clear_annotations(self._manager)
[docs] def start_update(self, callback: Union[Callable, None] = None) -> None: """Starts a firmware update for the device. Parameters ---------- callback : callable, optional A function to be called with the progress of the update. It should accept two arguments: the number of bytes sent and the total number of bytes. Raises ------ BrainAccessException If the update cannot be started. """ if callback is not None: with self._callback_ota_update_mtx: self._callback_ota_update = callback else: with self._callback_ota_update_mtx: self._callback_ota_update = lambda x, y: None _handle_error( _dll.ba_eeg_manager_start_update( self._manager, _callback_ota_update, self._manager ) )
[docs] def set_sample_rate(self, sample_rate: int) -> bool: """Sets the data stream sample rate for the device. The available sample rates may depend on the device model and firmware version. Refer to the `brainaccess.core.stream_rate.StreamRate` enum for available options. Warning ------- This setting takes effect on stream start and is reset on stream stop. Parameters ---------- sample_rate : int The desired data stream sample rate. Raises ------ BrainAccessException If the device is currently streaming. If the device sample rate is not possible """ if self.is_streaming(): raise BrainAccessException("Cannot change sample rate while streaming") _sample_rate = StreamRate.from_hz(sample_rate) if _sample_rate == StreamRate.UNKNOWN: raise BrainAccessException("Wrong sample rate") device_info = self.get_device_info() device_model = device_info.device_model if device_model in [DeviceModel.MAXI, DeviceModel.MIDI]: if sample_rate != StreamRate.X250Hz.to_hz: raise BrainAccessException( f"{device_model.name} only supports 250Hz sample rate." ) elif device_model in [DeviceModel.HALO, DeviceModel.MINI]: if sample_rate > StreamRate.X1kHz.to_hz: raise BrainAccessException( f"{device_model.name} supports sample rates up to 1kHz." ) return _handle_error( _dll.ba_eeg_manager_set_data_stream_rate( self._manager, ctypes.c_uint8(_sample_rate.value) ) )