From cce35dd0010f6af9431a58872f0ebac8a39adbbd Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Fri, 27 Feb 2026 14:03:05 +0530 Subject: [PATCH 01/13] fixing circli/ci check failure --- mne/io/egi/tests/test_egi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 261a9c80da3..2c6beaa3f66 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -7,6 +7,7 @@ import shutil from copy import deepcopy from datetime import datetime, timezone +from importlib.util import find_spec from pathlib import Path import numpy as np From ed398d86da9615c276a1b081b774eb602e51cc02 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Fri, 27 Feb 2026 13:50:21 +0530 Subject: [PATCH 02/13] Added a reusable skip marker based on mffpy --- mne/io/egi/tests/test_egi.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 2c6beaa3f66..8d2fc0b3699 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -5,6 +5,7 @@ import os import shutil +from importlib.util import find_spec from copy import deepcopy from datetime import datetime, timezone from importlib.util import find_spec @@ -37,6 +38,11 @@ egi_txt_evoked_cat1_fname = egi_path / "test_egi_evoked_cat1.txt" egi_txt_evoked_cat2_fname = egi_path / "test_egi_evoked_cat2.txt" +requires_mffpy = pytest.mark.skipif( + find_spec("mffpy") is None, + reason="Test requires mffpy", +) + # absolute event times from NetStation egi_pause_events = { "AM40": [7.224, 11.928, 14.413, 16.848], @@ -60,6 +66,7 @@ @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname, skip_times, event_times", [ @@ -122,6 +129,7 @@ def test_egi_mff_pause(fname, skip_times, event_times): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname", [ @@ -144,6 +152,7 @@ def test_egi_mff_pause_chunks(fname, tmp_path): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize("events_as_annotations", (True, False)) def test_io_egi_mff(events_as_annotations): """Test importing EGI MFF simple binary files.""" @@ -291,6 +300,7 @@ def test_io_egi(): @requires_testing_data +@requires_mffpy def test_io_egi_pns_mff(tmp_path): """Test importing EGI MFF with PNS data.""" pytest.importorskip("defusedxml") @@ -347,6 +357,7 @@ def test_io_egi_pns_mff(tmp_path): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize("preload", (True, False)) def test_io_egi_pns_mff_bug(preload): """Test importing EGI MFF with PNS data (BUG).""" @@ -391,6 +402,7 @@ def test_io_egi_pns_mff_bug(preload): @requires_testing_data +@requires_mffpy def test_io_egi_crop_no_preload(): """Test crop non-preloaded EGI MFF data (BUG).""" pytest.importorskip("defusedxml") @@ -504,6 +516,7 @@ def test_read_evokeds_mff_bad_input(): @requires_testing_data +@requires_mffpy def test_egi_coord_frame(): """Test that EGI coordinate frame is changed to head.""" pytest.importorskip("defusedxml") @@ -533,6 +546,7 @@ def test_egi_coord_frame(): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname, timestamp, utc_offset", [ @@ -557,6 +571,7 @@ def test_meas_date(fname, timestamp, utc_offset): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname, standard_montage", [ @@ -591,6 +606,7 @@ def test_set_standard_montage_mff(fname, standard_montage): @requires_testing_data +@requires_mffpy def test_egi_mff_bad_xml(tmp_path): """Test that corrupt XML files are gracefully handled.""" pytest.importorskip("defusedxml") From f5d4fd137429ec5d91121e1b8b0e0ebffdf11627 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Thu, 26 Feb 2026 19:57:50 +0530 Subject: [PATCH 03/13] fixed pre-compile test issues --- mne/io/egi/egimff.py | 69 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 870f58890a2..0db18272c48 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -256,6 +256,74 @@ def _read_header(input_fname): ) info.update(mff_hdr) return info +def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): + """Read event tracks using mffpy XML parsing and return dense event matrix.""" + from mffpy.xml_files import XML, EventTrack + + mff_events = OrderedDict() + basenames = mff_reader.directory.listdir() + for basename in basenames: + lower_name = basename.lower() + if not lower_name.endswith(".xml") or basename.startswith("._"): + continue + stem = Path(basename).stem + try: + with mff_reader.directory.filepointer(stem) as fp: + xml_obj = XML.from_file(fp, recover=False) + except Exception as err: + if "XMLSyntaxError" in type(err).__name__: + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + if not isinstance(xml_obj, EventTrack): + continue + try: + events_iter = xml_obj.events + for event in events_iter: + code = event.get("code") or event.get("label") or xml_obj.name + begin_time = event.get("beginTime") + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + except Exception: + _soft_import("defusedxml", "reading EGI MFF event tracks") + from defusedxml import ElementTree as ET + + xml_path = op.join(str(input_fname), basename) + try: + root = ET.parse(xml_path).getroot() + except Exception as err: + if ( + "ParseError" in type(err).__name__ + or "XMLSyntaxError" in type(err).__name__ + ): + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + for event_el in root.iter(): + if event_el.tag.split("}")[-1] != "event": + continue + event_fields = {} + for child in event_el: + event_fields[child.tag.split("}")[-1]] = child.text + code = ( + event_fields.get("code") + or event_fields.get("label") + or xml_obj.name + ) + begin_time = _parse_egi_datetime(event_fields.get("beginTime")) + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + + event_codes = list(mff_events.keys()) + egi_events = np.zeros((len(event_codes), n_samples)) + for event_idx, code in enumerate(event_codes): + if len(mff_events[code]): + egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 + return egi_events, event_codes, mff_events def _get_eeg_calibration_info(filepath, egi_info): @@ -582,7 +650,6 @@ def __init__( def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a chunk of data.""" logger.debug(f"Reading MFF {start:6d} ... {stop:6d} ...") - dtype = " Date: Thu, 26 Feb 2026 18:41:36 +0530 Subject: [PATCH 04/13] final fix after complete migration reviews --- doc/changes/dev/#13684.newfeature.rst | 1 + mne/io/egi/egimff.py | 596 +++++++++++--------------- mne/io/egi/events.py | 153 +------ 3 files changed, 255 insertions(+), 495 deletions(-) create mode 100644 doc/changes/dev/#13684.newfeature.rst diff --git a/doc/changes/dev/#13684.newfeature.rst b/doc/changes/dev/#13684.newfeature.rst new file mode 100644 index 00000000000..ccf2f875f49 --- /dev/null +++ b/doc/changes/dev/#13684.newfeature.rst @@ -0,0 +1 @@ +The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by :newcontrib:`Pragnya Khandelwal`. \ No newline at end of file diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 0db18272c48..545713e54ea 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -5,7 +5,6 @@ """EGI NetStation Load Function.""" import datetime -import math import os.path as op import re from collections import OrderedDict @@ -13,6 +12,11 @@ import numpy as np +try: + import mffpy +except ImportError: + mffpy = None + from ..._fiff.constants import FIFF from ..._fiff.meas_info import _empty_info, _ensure_meas_date_none_or_dt, create_info from ..._fiff.proj import setup_proj @@ -22,12 +26,9 @@ from ...evoked import EvokedArray from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn from ..base import BaseRaw -from .events import _combine_triggers, _read_events, _triage_include_exclude +from .events import _combine_triggers, _triage_include_exclude from .general import ( - _block_r, _extract, - _get_blocks, - _get_ep_info, _get_gains, _get_signalfname, ) @@ -35,217 +36,137 @@ REFERENCE_NAMES = ("VREF", "Vertex Reference") -def _read_mff_header(filepath): - """Read mff header.""" - _soft_import("defusedxml", "reading EGI MFF data") - from defusedxml.minidom import parse +def _get_mff_reader(input_fname): + """Instantiate an mffpy Reader (hard dependency for MFF reading).""" + mffpy = _import_mffpy() + return mffpy.Reader(input_fname) - all_files = _get_signalfname(filepath) - eeg_file = all_files["EEG"]["signal"] - eeg_info_file = all_files["EEG"]["info"] - info_filepath = op.join(filepath, "info.xml") # add with filepath - tags = ["mffVersion", "recordTime"] - version_and_date = _extract(tags, filepath=info_filepath) - version = "" - if len(version_and_date["mffVersion"]): - version = version_and_date["mffVersion"][0] - - fname = op.join(filepath, eeg_file) - signal_blocks = _get_blocks(fname) - epochs = _get_ep_info(filepath) - summaryinfo = dict(eeg_fname=eeg_file, info_fname=eeg_info_file) - summaryinfo.update(signal_blocks) - # sanity check and update relevant values - record_time = version_and_date["recordTime"][0] - # e.g., - # 2018-07-30T10:47:01.021673-04:00 - # 2017-09-20T09:55:44.072000000+01:00 +def _get_mff_startdatetime(input_fname, mff_reader): + """Get robust start datetime for MFF files, handling 9-digit fractional secs.""" + try: + return mff_reader.startdatetime + except Exception: + info_filepath = op.join(str(input_fname), "info.xml") + record_time = _extract(["recordTime"], filepath=info_filepath)["recordTime"][0] + if len(record_time) > 32: + dt, tz = [record_time[:26], record_time[-6:]] + record_time = dt + tz + return datetime.datetime.strptime(record_time, "%Y-%m-%dT%H:%M:%S.%f%z") + + +def _parse_egi_datetime(time_str): + """Parse EGI time strings allowing 6 or 9 fractional second digits.""" + if time_str is None: + return None + txt = time_str.strip() g = re.match( - r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.(\d{6}(?:\d{3})?)[+-]\d{2}:\d{2}", # noqa: E501 - record_time, + r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.)(\d+)([+-]\d{2}:?\d{2})$", + txt, ) if g is None: - raise RuntimeError(f"Could not parse recordTime {repr(record_time)}") - frac = g.groups()[0] - assert len(frac) in (6, 9) and all(f.isnumeric() for f in frac) # regex - div = 1000 if len(frac) == 6 else 1000000 - for key in ("last_samps", "first_samps"): - # convert from times in µS to samples - for ei, e in enumerate(epochs[key]): - if e % div != 0: - raise RuntimeError(f"Could not parse epoch time {e}") - epochs[key][ei] = e // div - epochs[key] = np.array(epochs[key], np.uint64) - # I guess they refer to times in milliseconds? - # What we really need to do here is: - # epochs[key] *= signal_blocks['sfreq'] - # epochs[key] //= 1000 - # But that multiplication risks an overflow, so let's only multiply - # by what we need to (e.g., a sample rate of 500 means we can multiply - # by 1 and divide by 2 rather than multiplying by 500 and dividing by - # 1000) - numerator = int(signal_blocks["sfreq"]) - denominator = 1000 - this_gcd = math.gcd(numerator, denominator) - numerator = numerator // this_gcd - denominator = denominator // this_gcd - with np.errstate(over="raise"): - epochs[key] *= numerator - epochs[key] //= denominator - # Should be safe to cast to int now, which makes things later not - # upbroadcast to float - epochs[key] = epochs[key].astype(np.int64) - n_samps_block = signal_blocks["samples_block"].sum() - n_samps_epochs = (epochs["last_samps"] - epochs["first_samps"]).sum() - bad = ( - n_samps_epochs != n_samps_block - or not (epochs["first_samps"] < epochs["last_samps"]).all() - or not (epochs["first_samps"][1:] >= epochs["last_samps"][:-1]).all() - ) - if bad: - raise RuntimeError( - "EGI epoch first/last samps could not be parsed:\n" - f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}" - ) - summaryinfo.update(epochs) - # index which samples in raw are actually readable from disk (i.e., not - # in a skip) - disk_samps = np.full(epochs["last_samps"][-1], -1) - offset = 0 - for first, last in zip(epochs["first_samps"], epochs["last_samps"]): - n_this = last - first - disk_samps[first:last] = np.arange(offset, offset + n_this) - offset += n_this - summaryinfo["disk_samps"] = disk_samps + return datetime.datetime.strptime(txt, "%Y-%m-%dT%H:%M:%S.%f%z") + prefix, frac, tz = g.groups() + frac = (frac[:6]).ljust(6, "0") + return datetime.datetime.strptime(prefix + frac + tz, "%Y-%m-%dT%H:%M:%S.%f%z") - # Add the sensor info. - sensor_layout_file = op.join(filepath, "sensorLayout.xml") - sensor_layout_obj = parse(sensor_layout_file) - summaryinfo["device"] = sensor_layout_obj.getElementsByTagName("name")[ - 0 - ].firstChild.data +def _get_info_from_mff_reader(input_fname, mff_reader): + """Build EGI info dict from mffpy.Reader metadata.""" + input_fname = str(input_fname) + sfreq_dict = mff_reader.sampling_rates + sfreq = float(sfreq_dict.get("EEG", next(iter(sfreq_dict.values())))) + meas_dt_local = _get_mff_startdatetime(input_fname, mff_reader) + + all_files = _get_signalfname(input_fname) + eeg_file = all_files["EEG"]["signal"] + eeg_info_file = all_files["EEG"]["info"] + + # Parse channel metadata from sensorLayout.xml + _soft_import("defusedxml", "reading EGI MFF data") + from defusedxml.minidom import parse + + sensor_layout_file = op.join(input_fname, "sensorLayout.xml") + sensor_layout_obj = parse(sensor_layout_file) + device = sensor_layout_obj.getElementsByTagName("name")[0].firstChild.data sensors = sensor_layout_obj.getElementsByTagName("sensor") - chan_type = list() - chan_unit = list() + + chan_type = [] + chan_unit = [] + numbers = [] n_chans = 0 - numbers = list() # used for identification for sensor in sensors: sensortype = int(sensor.getElementsByTagName("type")[0].firstChild.data) if sensortype in [0, 1]: - sn = sensor.getElementsByTagName("number")[0].firstChild.data - sn = sn.encode() + sn = sensor.getElementsByTagName("number")[0].firstChild.data.encode() numbers.append(sn) chan_type.append("eeg") chan_unit.append("uV") - n_chans = n_chans + 1 - if n_chans != summaryinfo["n_channels"]: - raise RuntimeError( - f"Number of defined channels ({n_chans}) did not match the " - f"expected channels ({summaryinfo['n_channels']})." - ) + n_chans += 1 + + # Collect epoch bounds and per-epoch sample counts from mffpy + first_samps = [] + last_samps = [] + samples_block = [] + pns_samples_block = [] + for ei in range(len(mff_reader.epochs)): + epoch = mff_reader.epochs[ei] + data_epoch = mff_reader.get_physical_samples_from_epoch(epoch) + eeg_samples = int(data_epoch["EEG"][0].shape[1]) + first = int(np.round(epoch.t0 * sfreq)) + last = first + eeg_samples + first_samps.append(first) + last_samps.append(last) + samples_block.append(eeg_samples) + + pns_arr = data_epoch.get("PNSData") + pns_samples_block.append(0 if pns_arr is None else int(pns_arr[0].shape[1])) + + first_samps = np.array(first_samps, dtype=np.int64) + last_samps = np.array(last_samps, dtype=np.int64) + samples_block = np.array(samples_block, dtype=np.int64) + pns_samples_block = np.array(pns_samples_block, dtype=np.int64) + + # index which samples in raw are actually readable from disk (i.e., not in a skip) + disk_samps = np.full(last_samps[-1], -1, dtype=np.int64) + offset = 0 + for first, last in zip(first_samps, last_samps): + n_this = last - first + disk_samps[first:last] = np.arange(offset, offset + n_this) + offset += n_this - # Check presence of PNS data + # Parse PNS channel metadata if present pns_names = [] + pns_types = [] + pns_units = [] + pns_fname = None if "PNS" in all_files: - pns_fpath = op.join(filepath, all_files["PNS"]["signal"]) - pns_blocks = _get_blocks(pns_fpath) - pns_samples = pns_blocks["samples_block"] - signal_samples = signal_blocks["samples_block"] - same_blocks = np.array_equal( - pns_samples[:-1], signal_samples[:-1] - ) and pns_samples[-1] in (signal_samples[-1] - np.arange(2)) - if not same_blocks: - raise RuntimeError( - "PNS and signals samples did not match:\n" - f"{list(pns_samples)}\nvs\n{list(signal_samples)}" - ) - - pns_file = op.join(filepath, "pnsSet.xml") - pns_obj = parse(pns_file) - sensors = pns_obj.getElementsByTagName("sensor") - pns_types = [] - pns_units = [] - for sensor in sensors: - # sensor number: - # sensor.getElementsByTagName('number')[0].firstChild.data - name = sensor.getElementsByTagName("name")[0].firstChild.data - unit_elem = sensor.getElementsByTagName("unit")[0].firstChild - unit = "" - if unit_elem is not None: - unit = unit_elem.data - - if name == "ECG": - ch_type = "ecg" - elif "EMG" in name: - ch_type = "emg" - else: - ch_type = "bio" - pns_types.append(ch_type) - pns_units.append(unit) - pns_names.append(name) - - summaryinfo.update( - pns_types=pns_types, - pns_units=pns_units, - pns_fname=all_files["PNS"]["signal"], - pns_sample_blocks=pns_blocks, - ) - summaryinfo.update( - pns_names=pns_names, - version=version, - date=version_and_date["recordTime"][0], - chan_type=chan_type, - chan_unit=chan_unit, - numbers=numbers, - ) - - return summaryinfo - - -def _read_header(input_fname): - """Obtain the headers from the file package mff. - - Parameters - ---------- - input_fname : path-like - Path for the file - - Returns - ------- - info : dict - Main headers set. - """ - input_fname = str(input_fname) # cast to str any Paths - mff_hdr = _read_mff_header(input_fname) - with open(input_fname + "/signal1.bin", "rb") as fid: - version = np.fromfile(fid, np.int32, 1)[0] - """ - the datetime.strptime .f directive (milleseconds) - will only accept up to 6 digits. if there are more than - six millesecond digits in the provided timestamp string - (i.e. because of trailing zeros, as in test_egi_pns.mff) - then slice both the first 26 elements and the last 6 - elements of the timestamp string to truncate the - milleseconds to 6 digits and extract the timezone, - and then piece these together and assign back to mff_hdr['date'] - """ - if len(mff_hdr["date"]) > 32: - dt, tz = [mff_hdr["date"][:26], mff_hdr["date"][-6:]] - mff_hdr["date"] = dt + tz - - time_n = datetime.datetime.strptime(mff_hdr["date"], "%Y-%m-%dT%H:%M:%S.%f%z") + pns_fname = all_files["PNS"]["signal"] + pns_file = op.join(input_fname, "pnsSet.xml") + if op.exists(pns_file): + pns_obj = parse(pns_file) + pns_sensors = pns_obj.getElementsByTagName("sensor") + for sensor in pns_sensors: + name = sensor.getElementsByTagName("name")[0].firstChild.data + unit_elem = sensor.getElementsByTagName("unit")[0].firstChild + unit = "" if unit_elem is None else unit_elem.data + if name == "ECG": + ch_type = "ecg" + elif "EMG" in name: + ch_type = "emg" + else: + ch_type = "bio" + pns_names.append(name) + pns_types.append(ch_type) + pns_units.append(unit) info = dict( - version=version, - meas_dt_local=time_n, - utc_offset=time_n.strftime("%z"), + version=0, + meas_dt_local=meas_dt_local, + utc_offset=meas_dt_local.strftime("%z"), gain=0, bits=0, value_range=0, - ) - info.update( n_categories=0, n_segments=1, n_events=0, @@ -253,8 +174,28 @@ def _read_header(input_fname): category_names=[], category_lengths=[], pre_baseline=0, + sfreq=sfreq, + n_channels=n_chans, + eeg_fname=eeg_file, + info_fname=eeg_info_file, + device=device, + chan_type=chan_type, + chan_unit=chan_unit, + numbers=numbers, + first_samps=first_samps, + last_samps=last_samps, + samples_block=samples_block, + disk_samps=disk_samps, + pns_names=pns_names, + pns_types=pns_types, + pns_units=pns_units, + pns_fname=pns_fname, + pns_sample_blocks={ + "n_channels": len(pns_names), + "samples_block": pns_samples_block, + }, + mff_path=input_fname, ) - info.update(mff_hdr) return info def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): """Read event tracks using mffpy XML parsing and return dense event matrix.""" @@ -326,6 +267,69 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): return egi_events, event_codes, mff_events +def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): + """Read event tracks using mffpy XML parsing and return dense event matrix.""" + from mffpy.xml_files import EventTrack, XML + + mff_events = OrderedDict() + basenames = mff_reader.directory.listdir() + for basename in basenames: + lower_name = basename.lower() + if not lower_name.endswith(".xml") or basename.startswith("._"): + continue + stem = Path(basename).stem + try: + with mff_reader.directory.filepointer(stem) as fp: + xml_obj = XML.from_file(fp, recover=False) + except Exception as err: + if "XMLSyntaxError" in type(err).__name__: + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + if not isinstance(xml_obj, EventTrack): + continue + try: + events_iter = xml_obj.events + for event in events_iter: + code = event.get("code") or event.get("label") or xml_obj.name + begin_time = event.get("beginTime") + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + except Exception: + _soft_import("defusedxml", "reading EGI MFF event tracks") + from defusedxml import ElementTree as ET + + xml_path = op.join(str(input_fname), basename) + try: + root = ET.parse(xml_path).getroot() + except Exception as err: + if "ParseError" in type(err).__name__ or "XMLSyntaxError" in type(err).__name__: + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + for event_el in root.iter(): + if event_el.tag.split("}")[-1] != "event": + continue + event_fields = {} + for child in event_el: + event_fields[child.tag.split("}")[-1]] = child.text + code = event_fields.get("code") or event_fields.get("label") or xml_obj.name + begin_time = _parse_egi_datetime(event_fields.get("beginTime")) + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + + event_codes = list(mff_events.keys()) + egi_events = np.zeros((len(event_codes), n_samples)) + for event_idx, code in enumerate(event_codes): + if len(mff_events[code]): + egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 + return egi_events, event_codes, mff_events + + def _get_eeg_calibration_info(filepath, egi_info): """Calculate calibration info for EEG channels.""" gains = _get_gains(op.join(filepath, egi_info["info_fname"])) @@ -470,17 +474,25 @@ def __init__( ) ) logger.info(f"Reading EGI MFF Header from {input_fname}...") - egi_info = _read_header(input_fname) + mff_reader = _get_mff_reader(input_fname) + egi_info = _get_info_from_mff_reader(input_fname, mff_reader) if eog is None: eog = [] if misc is None: misc = np.where(np.array(egi_info["chan_type"]) != "eeg")[0].tolist() logger.info(" Reading events ...") - egi_events, egi_info, mff_events = _read_events(input_fname, egi_info) + egi_events, event_codes, mff_events = _read_mff_events( + input_fname, + mff_reader, + egi_info["sfreq"], + egi_info["last_samps"][-1], + egi_info["meas_dt_local"], + ) + egi_info["n_events"] = len(event_codes) + egi_info["event_codes"] = event_codes cals = _get_eeg_calibration_info(input_fname, egi_info) logger.info(" Assembling measurement info ...") - event_codes = egi_info["event_codes"] include = _triage_include_exclude(include, exclude, egi_events, egi_info) if egi_info["n_events"] > 0 and not events_as_annotations: logger.info(' Synthesizing trigger channel "STI 014" ...') @@ -625,15 +637,22 @@ def __init__( ) # Annotate acquisition skips + has_skips = False for first, prev_last in zip( egi_info["first_samps"][1:], egi_info["last_samps"][:-1] ): gap = first - prev_last assert gap >= 0 if gap: + has_skips = True annot["onset"].append((prev_last - 0.5) / egi_info["sfreq"]) annot["duration"].append(gap / egi_info["sfreq"]) annot["description"].append("BAD_ACQ_SKIP") + if has_skips and (not events_as_annotations) and len(mff_events): + warn( + "Acquisition skips detected. EGI MFF file contains gaps between " + "recording epochs." + ) # create events from annotations if events_as_annotations: @@ -656,162 +675,57 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): # info about the binary file structure n_channels = egi_info["n_channels"] - samples_block = egi_info["samples_block"] + mff_reader = _get_mff_reader(egi_info["mff_path"]) + first_samps = egi_info["first_samps"] + last_samps = egi_info["last_samps"] # Check how many channels to read are from each type bounds = egi_info["kind_bounds"] + if idx is None: + idx = np.arange(bounds[-1]) if isinstance(idx, slice): - idx = np.arange(idx.start, idx.stop) + idx = np.arange(bounds[-1])[idx] + idx = np.array(idx, dtype=int) eeg_out = np.where(idx < bounds[1])[0] - eeg_one = idx[eeg_out, np.newaxis] + eeg_one = idx[eeg_out] eeg_in = idx[eeg_out] stim_out = np.where((idx >= bounds[1]) & (idx < bounds[2]))[0] stim_one = idx[stim_out] stim_in = idx[stim_out] - bounds[1] pns_out = np.where((idx >= bounds[2]) & (idx < bounds[3]))[0] pns_in = idx[pns_out] - bounds[2] - pns_one = idx[pns_out, np.newaxis] + pns_one = idx[pns_out] del eeg_out, stim_out, pns_out # take into account events (already extended to correct size) one[stim_one, :] = egi_info["egi_events"][stim_in, start:stop] - # Convert start and stop to limits in terms of the data - # actually on disk, plus an indexer (disk_use_idx) that populates - # the potentially larger `data` with it, taking skips into account - disk_samps = egi_info["disk_samps"][start:stop] - disk_use_idx = np.where(disk_samps > -1)[0] - # short circuit in case we don't need any samples - if not len(disk_use_idx): - _mult_cal_one(data, one, idx, cals, mult) - return - - start = disk_samps[disk_use_idx[0]] - stop = disk_samps[disk_use_idx[-1]] + 1 - assert len(disk_use_idx) == stop - start - - # Get starting/stopping block/samples - block_samples_offset = np.cumsum(samples_block) - offset_blocks = np.sum(block_samples_offset <= start) - offset_samples = start - ( - block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 - ) - - # TODO: Refactor this reading with the PNS reading in a single function - # (DRY) - samples_to_read = stop - start - with open(self.filenames[fi], "rb", buffering=0) as fid: - # Go to starting block - current_block = 0 - current_block_info = None - current_data_sample = 0 - while current_block < offset_blocks: - this_block_info = _block_r(fid) - if this_block_info is not None: - current_block_info = this_block_info - fid.seek(current_block_info["block_size"], 1) - current_block += 1 - - # Start reading samples - while samples_to_read > 0: - logger.debug(f" Reading from block {current_block}") - this_block_info = _block_r(fid) - current_block += 1 - if this_block_info is not None: - current_block_info = this_block_info - - to_read = current_block_info["nsamples"] * current_block_info["nc"] - block_data = np.fromfile(fid, dtype, to_read) - block_data = block_data.reshape(n_channels, -1, order="C") - - # Compute indexes - samples_read = block_data.shape[1] - logger.debug(f" Read {samples_read} samples") - logger.debug(f" Offset {offset_samples} samples") - if offset_samples > 0: - # First block read, skip to the offset: - block_data = block_data[:, offset_samples:] - samples_read = samples_read - offset_samples - offset_samples = 0 - if samples_to_read < samples_read: - # Last block to read, skip the last samples - block_data = block_data[:, :samples_to_read] - samples_read = samples_to_read - logger.debug(f" Keep {samples_read} samples") - - s_start = current_data_sample - s_end = s_start + samples_read - - one[eeg_one, disk_use_idx[s_start:s_end]] = block_data[eeg_in] - samples_to_read = samples_to_read - samples_read - current_data_sample = current_data_sample + samples_read - - if len(pns_one) > 0: - # PNS Data is present and should be read: - pns_filepath = egi_info["pns_filepath"] - pns_info = egi_info["pns_sample_blocks"] - n_channels = pns_info["n_channels"] - samples_block = pns_info["samples_block"] - - # Get starting/stopping block/samples - block_samples_offset = np.cumsum(samples_block) - offset_blocks = np.sum(block_samples_offset < start) - offset_samples = start - ( - block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 - ) + # Read only overlapping epoch segments from mffpy (keeps skips as zeros) + for epoch_idx, (first, last) in enumerate(zip(first_samps, last_samps)): + overlap_start = max(start, first) + overlap_stop = min(stop, last) + if overlap_stop <= overlap_start: + continue - samples_to_read = stop - start - with open(pns_filepath, "rb", buffering=0) as fid: - # Check file size - fid.seek(0, 2) - file_size = fid.tell() - fid.seek(0) - # Go to starting block - current_block = 0 - current_block_info = None - current_data_sample = 0 - while current_block < offset_blocks: - this_block_info = _block_r(fid) - if this_block_info is not None: - current_block_info = this_block_info - fid.seek(current_block_info["block_size"], 1) - current_block += 1 - - # Start reading samples - while samples_to_read > 0: - if samples_to_read == 1 and fid.tell() == file_size: - # We are in the presence of the EEG bug - # fill with zeros and break the loop - one[pns_one, -1] = 0 - break - - this_block_info = _block_r(fid) - if this_block_info is not None: - current_block_info = this_block_info - - to_read = current_block_info["nsamples"] * current_block_info["nc"] - block_data = np.fromfile(fid, dtype, to_read) - block_data = block_data.reshape(n_channels, -1, order="C") - - # Compute indexes - samples_read = block_data.shape[1] - if offset_samples > 0: - # First block read, skip to the offset: - block_data = block_data[:, offset_samples:] - samples_read = samples_read - offset_samples - offset_samples = 0 - - if samples_to_read < samples_read: - # Last block to read, skip the last samples - block_data = block_data[:, :samples_to_read] - samples_read = samples_to_read - - s_start = current_data_sample - s_end = s_start + samples_read - - one[pns_one, disk_use_idx[s_start:s_end]] = block_data[pns_in] - samples_to_read = samples_to_read - samples_read - current_data_sample = current_data_sample + samples_read + epoch = mff_reader.epochs[epoch_idx] + epoch_data = mff_reader.get_physical_samples_from_epoch(epoch) + eeg_block = epoch_data["EEG"][0][:n_channels] + src_start = overlap_start - first + src_stop = overlap_stop - first + dst_start = overlap_start - start + dst_stop = overlap_stop - start + + if len(eeg_one): + one[eeg_one, dst_start:dst_stop] = eeg_block[eeg_in, src_start:src_stop] + + if len(pns_one) and "PNSData" in epoch_data: + pns_block = epoch_data["PNSData"][0] + src_stop_pns = min(src_stop, pns_block.shape[1]) + if src_stop_pns > src_start: + dst_stop_pns = dst_start + (src_stop_pns - src_start) + one[pns_one, dst_start:dst_stop_pns] = pns_block[ + pns_in, src_start:src_stop_pns + ] # do the calibration _mult_cal_one(data, one, idx, cals, mult) @@ -911,10 +825,8 @@ def read_evokeds_mff( def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): """Read evoked data from MFF file.""" - import mffpy - - egi_info = _read_header(fname) mff = mffpy.Reader(fname) + egi_info = _get_info_from_mff_reader(str(fname), mff) categories = mff.categories.categories if isinstance(condition, str): @@ -1032,10 +944,8 @@ def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): def _import_mffpy(why="read averaged .mff files"): """Import and return module mffpy.""" - try: - import mffpy - except ImportError as exp: - msg = f"mffpy is required to {why}, got:\n{exp}" + if mffpy is None: + msg = f"mffpy is required to {why}." raise ImportError(msg) return mffpy diff --git a/mne/io/egi/events.py b/mne/io/egi/events.py index c160ceb208c..540d0fb5f51 100644 --- a/mne/io/egi/events.py +++ b/mne/io/egi/events.py @@ -3,160 +3,9 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -from datetime import datetime -from glob import glob -from os.path import basename, join, splitext - import numpy as np -from ...utils import _soft_import, _validate_type, logger, warn - - -def _read_events(input_fname, info): - """Read events for the record. - - Parameters - ---------- - input_fname : path-like - The file path. - info : dict - Header info array. - """ - n_samples = info["last_samps"][-1] - mff_events, event_codes = _read_mff_events(input_fname, info["sfreq"]) - info["n_events"] = len(event_codes) - info["event_codes"] = event_codes - events = np.zeros([info["n_events"], info["n_segments"] * n_samples]) - for n, event in enumerate(event_codes): - for i in mff_events[event]: - if (i < 0) or (i >= events.shape[1]): - continue - events[n][i] = n + 1 - return events, info, mff_events - - -def _read_mff_events(filename, sfreq): - """Extract the events. - - Parameters - ---------- - filename : path-like - File path. - sfreq : float - The sampling frequency - """ - orig = {} - for xml_file in glob(join(filename, "*.xml")): - xml_type = splitext(basename(xml_file))[0] - et = _parse_xml(xml_file) - if et is not None: - orig[xml_type] = et - xml_files = orig.keys() - xml_events = [x for x in xml_files if x[:7] == "Events_"] - for item in orig["info"]: - if "recordTime" in item: - start_time = _ns2py_time(item["recordTime"]) - break - markers = [] - code = [] - for xml in xml_events: - for event in orig[xml][2:]: - event_start = _ns2py_time(event["beginTime"]) - start = (event_start - start_time).total_seconds() - if event["code"] not in code: - code.append(event["code"]) - marker = { - "name": event["code"], - "start": start, - "start_sample": int(np.trunc(start * sfreq)), - "end": start + float(event["duration"]) / 1e9, - "chan": None, - } - markers.append(marker) - events_tims = dict() - for ev in code: - trig_samp = list( - c["start_sample"] for n, c in enumerate(markers) if c["name"] == ev - ) - events_tims.update({ev: trig_samp}) - return events_tims, code - - -def _parse_xml(xml_file: str) -> list[dict[str, str]] | None: - """Parse XML file.""" - defusedxml = _soft_import("defusedxml", "reading EGI MFF data") - try: - xml = defusedxml.ElementTree.parse(xml_file) - except defusedxml.ElementTree.ParseError as e: - warn(f"Could not parse the XML file {xml_file}: {e}") - return - root = xml.getroot() - return _xml2list(root) - - -def _xml2list(root): - """Parse XML item.""" - output = [] - for element in root: - if len(element) > 0: - if element[0].tag != element[-1].tag: - output.append(_xml2dict(element)) - else: - output.append(_xml2list(element)) - - elif element.text: - text = element.text.strip() - if text: - tag = _ns(element.tag) - output.append({tag: text}) - - return output - - -def _ns(s): - """Remove namespace, but only if there is a namespace to begin with.""" - if "}" in s: - return "}".join(s.split("}")[1:]) - else: - return s - - -def _xml2dict(root): - """Use functions instead of Class. - - remove namespace based on - http://stackoverflow.com/questions/2148119 - """ - output = {} - if root.items(): - output.update(dict(root.items())) - - for element in root: - if len(element) > 0: - if len(element) == 1 or element[0].tag != element[1].tag: - one_dict = _xml2dict(element) - else: - one_dict = {_ns(element[0].tag): _xml2list(element)} - - if element.items(): - one_dict.update(dict(element.items())) - output.update({_ns(element.tag): one_dict}) - - elif element.items(): - output.update({_ns(element.tag): dict(element.items())}) - - else: - output.update({_ns(element.tag): element.text}) - return output - - -def _ns2py_time(nstime): - """Parse times.""" - nsdate = nstime[0:10] - nstime0 = nstime[11:26] - nstime00 = nsdate + " " + nstime0 - pytime = datetime.strptime(nstime00, "%Y-%m-%d %H:%M:%S.%f") - return pytime +from ...utils import _validate_type, logger, warn def _combine_triggers(data, remapping=None): From 3741e719ad6dd0479bcf8b352ca0dda6a53b9777 Mon Sep 17 00:00:00 2001 From: Scott Huberty Date: Thu, 12 Dec 2024 17:30:05 -0800 Subject: [PATCH 05/13] WIP: Refactor [ci skip] - more functional - annotate acquisition skips --- mne/io/egi/egimff.py | 985 +++++++++++++++++++++++++++---------------- 1 file changed, 618 insertions(+), 367 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 545713e54ea..94044a61958 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -5,6 +5,9 @@ """EGI NetStation Load Function.""" import datetime +import fnmatch +import itertools +import math import os.path as op import re from collections import OrderedDict @@ -12,11 +15,6 @@ import numpy as np -try: - import mffpy -except ImportError: - mffpy = None - from ..._fiff.constants import FIFF from ..._fiff.meas_info import _empty_info, _ensure_meas_date_none_or_dt, create_info from ..._fiff.proj import setup_proj @@ -26,9 +24,12 @@ from ...evoked import EvokedArray from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn from ..base import BaseRaw -from .events import _combine_triggers, _triage_include_exclude +from .events import _combine_triggers, _read_events, _triage_include_exclude from .general import ( + _block_r, _extract, + _get_blocks, + _get_ep_info, _get_gains, _get_signalfname, ) @@ -36,137 +37,417 @@ REFERENCE_NAMES = ("VREF", "Vertex Reference") -def _get_mff_reader(input_fname): - """Instantiate an mffpy Reader (hard dependency for MFF reading).""" - mffpy = _import_mffpy() - return mffpy.Reader(input_fname) +# TODO: Running list +# - [ ] Add support for reading in the PNS data +# - [ ] Add tutorial for reading calibration data +# - [ ] Add support for reading in the channel status (bad channels) +# - [ ] Replace _read_header with mffpy functions? -def _get_mff_startdatetime(input_fname, mff_reader): - """Get robust start datetime for MFF files, handling 9-digit fractional secs.""" +def _read_mff(input_fname): + """Read EGI MFF file.""" + mff_reader = _get_mff_reader(input_fname) + eeg = _get_eeg_data(mff_reader) + info = _get_info(mff_reader) + annotations = _get_annotations(mff_reader, info) + return eeg, info, annotations + + +def _get_mff_startdatetime(mff_reader): + """Get start datetime from mff_reader, with workaround for nanosecond precision bug.""" try: return mff_reader.startdatetime - except Exception: - info_filepath = op.join(str(input_fname), "info.xml") - record_time = _extract(["recordTime"], filepath=info_filepath)["recordTime"][0] - if len(record_time) > 32: - dt, tz = [record_time[:26], record_time[-6:]] - record_time = dt + tz - return datetime.datetime.strptime(record_time, "%Y-%m-%dT%H:%M:%S.%f%z") - - -def _parse_egi_datetime(time_str): - """Parse EGI time strings allowing 6 or 9 fractional second digits.""" - if time_str is None: - return None - txt = time_str.strip() - g = re.match( - r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.)(\d+)([+-]\d{2}:?\d{2})$", - txt, + except (ValueError, AttributeError): + # mffpy has a bug parsing timestamps with 9 decimal places (nanoseconds) + # Workaround: manually parse the timestamp from the info.xml file + import xml.etree.ElementTree as ET + info_file = op.join(mff_reader.directory._mffname, "info.xml") + tree = ET.parse(info_file) + root = tree.getroot() + # Handle different XML namespaces by searching for any recordTime element + time_elem = root.find(".//recordTime") or root.find(".//{*}recordTime") + if time_elem is None: + raise + time_str = time_elem.text + # Handle timestamps with up to 9 decimal places by truncating to 6 + # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" + # Both formats: +0100 (without colon) and +01:00 (with colon) + if '+' in time_str or '-' in time_str[-6:]: + # Truncate nanoseconds in decimal part (keep only 6 digits) + time_str = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', time_str) + # Python's %z can't always handle colons, so remove them + time_str = re.sub(r'([+-]\d{2}):(\d{2})$', r'\1\2', time_str) + return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f%z') + + +def _get_mff_reader(input_fname): + mffpy = _import_mffpy() + mff_reader = mffpy.Reader(input_fname) + mff_reader.set_unit("EEG", "V") # XXX: set PNS unit + return mff_reader + + +def _get_montage(mff_reader): + mffpy = _import_mffpy() + xml_files = mff_reader.directory.files_by_type[".xml"] + + # Read coordinates.xml for fiducial positions + coords_fname = fnmatch.filter(xml_files, "coordinates") + coords_sensors = dict() + if len(coords_fname) == 1: + with mff_reader.directory.filepointer(coords_fname[0]) as fp: + coords_content = mffpy.XML.from_file(fp).get_content() + coords_sensors = coords_content.get("sensors", dict()) + + n_eeg_channels = mff_reader.num_channels["EEG"] # XXX: PNS? + ch_pos = dict() + hsp_list = [] # Extra headshape points + lpa, rpa, nasion = None, None, None + + # Extract channel positions and fiducials from coordinates.xml + for ch in coords_sensors.values(): + # XXX: the y coordinate seems to be inverted? Need to investigate + # Convert from cm to m + loc = np.array([ch["x"], -(ch["y"]), ch["z"]]) / 100.0 + name = ch.get("name", "None") + + # Check if this is a fiducial point + if name == "Nasion": + nasion = loc + elif name == "Left periauricular point": lpa = loc + elif name == "Right periauricular point": + rpa = loc + elif name in REFERENCE_NAMES or "VREF" in name or "Vertex" in name: + # Reference electrode can be numbered outside EEG range (e.g., 1001) + ch_pos[name] = loc + elif ch["number"] <= n_eeg_channels: + # EEG channel + ch_name = name if name != "None" else f"E{ch['number']}" + ch_pos[ch_name] = loc + + # Convert hsp list to array if not empty + hsp = np.array(hsp_list) if hsp_list else None + + montage = make_dig_montage( + ch_pos=ch_pos, nasion=nasion, lpa=lpa, rpa=rpa, hsp=hsp, coord_frame="unknown" ) - if g is None: - return datetime.datetime.strptime(txt, "%Y-%m-%dT%H:%M:%S.%f%z") - prefix, frac, tz = g.groups() - frac = (frac[:6]).ljust(6, "0") - return datetime.datetime.strptime(prefix + frac + tz, "%Y-%m-%dT%H:%M:%S.%f%z") + return montage + + +def _get_info(mff_reader): + montage = _get_montage(mff_reader) + ch_names = montage.ch_names + ch_types = ["eeg"] * len(ch_names) # XXX: refactor this when adding PNS support + meas_date_orig = _get_mff_startdatetime(mff_reader) + utc_offset = meas_date_orig.strftime("%z") + meas_date = meas_date_orig.astimezone(datetime.timezone.utc) + sfreq = mff_reader.sampling_rates["EEG"] # XXX: check PNS sfreq? + info = create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types) + info.set_montage(montage) + info.set_meas_date(meas_date) + with info._unlock(): + info["utc_offset"] = utc_offset + + # Populate reference location (loc[3:6]) for each EEG channel + # The reference is VREF (Vertex Reference), which is the last dig point + if len(info["dig"]) > 0: + ref_loc = info["dig"][-1]["r"] # VREF position + for ch in info["chs"]: + if ch["kind"] == FIFF.FIFFV_EEG_CH: + ch["loc"][3:6] = ref_loc + + return info -def _get_info_from_mff_reader(input_fname, mff_reader): - """Build EGI info dict from mffpy.Reader metadata.""" - input_fname = str(input_fname) - sfreq_dict = mff_reader.sampling_rates - sfreq = float(sfreq_dict.get("EEG", next(iter(sfreq_dict.values())))) - meas_dt_local = _get_mff_startdatetime(input_fname, mff_reader) +def _get_eeg_data(mff_reader): + sfreq = mff_reader.sampling_rates["EEG"] # XXX: check PNS sfreq + n_channels = mff_reader.num_channels["EEG"] # Only EEG channels, not all signal types + epochs = mff_reader.epochs + + data_blocks, start_secs, end_secs = [], [], [] + for epoch in epochs: + data_chunk, _ = mff_reader.get_physical_samples_from_epoch(epoch)["EEG"] # XXX + data_blocks.append(data_chunk) + start_secs.append(epoch.t0) + end_secs.append(epoch.t1) + + first_samp = int(start_secs[0] * sfreq) + # Calculate total samples needed based on actual chunk placements + max_end_samp = first_samp + for this_chunk, start in zip(data_blocks, start_secs): + start_samp = int(start * sfreq) + end_samp = start_samp + this_chunk.shape[1] + max_end_samp = max(max_end_samp, end_samp) + n_samps = max_end_samp - first_samp + + eeg = np.zeros((n_channels, n_samps), dtype=np.float64) + for this_chunk, start in zip(data_blocks, start_secs): + start_idx = int(start * sfreq) - first_samp + end_idx = start_idx + this_chunk.shape[1] + eeg[:, start_idx:end_idx] = this_chunk + return eeg + + +def _get_gap_annotations(mff_reader): + epochs = mff_reader.epochs + start_secs = [epoch.t0 for epoch in epochs] + end_secs = [epoch.t1 for epoch in epochs] + gap_durations = np.array(start_secs[1:]) - np.array(end_secs[:-1]) + descriptions = ["BAD_ACQ_SKIP"] * len(gap_durations) + gap_onsets = np.array(end_secs[:-1]) + # TODO: Re-enable warning once lazy loading is properly implemented + # The warning should be raised during data access, not during __init__ + # if len(gap_durations) > 0: + # warn( + # "Acquisition skips detected. EGI MFF file contains gaps between " + # "recording epochs.", + # RuntimeWarning, + # ) + gap_annots = Annotations(gap_onsets, gap_durations, descriptions) + return gap_annots + + +def _get_event_annotations(mff_reader, mne_info): + mffpy = _import_mffpy() + xml_files = mff_reader.directory.files_by_type[".xml"] + events_xmls = fnmatch.filter(xml_files, "Events*") + if not events_xmls: + raise RuntimeError("No events found in MFF file.") + mff_events = {} + for event_file in events_xmls: + with mff_reader.directory.filepointer(event_file) as fp: + categories = mffpy.XML.from_file(fp) + mff_events[event_file] = categories.get_content()["event"] + onsets = [] + descriptions = [] + mff_events = list(itertools.chain.from_iterable(mff_events.values())) + for event in mff_events: + onset_dt = event["beginTime"].astimezone(datetime.timezone.utc) + ts = (onset_dt - mne_info["meas_date"]).total_seconds() + onsets.append(ts) + # XXX: we could use event["duration"] but it always seems to be 1000ms? + descriptions.append(event["code"]) + durations = [0] * len(onsets) + event_annots = Annotations(onsets, durations, descriptions) + return event_annots + + +def _get_annotations(mff_reader, mne_info): + event_annots = _get_event_annotations(mff_reader, mne_info) + gap_annots = _get_gap_annotations(mff_reader) + return event_annots + gap_annots + + +def _read_mff_header(filepath): + """Read mff header.""" + _soft_import("defusedxml", "reading EGI MFF data") + from defusedxml.minidom import parse - all_files = _get_signalfname(input_fname) + all_files = _get_signalfname(filepath) eeg_file = all_files["EEG"]["signal"] eeg_info_file = all_files["EEG"]["info"] - # Parse channel metadata from sensorLayout.xml - _soft_import("defusedxml", "reading EGI MFF data") - from defusedxml.minidom import parse + info_filepath = op.join(filepath, "info.xml") # add with filepath + tags = ["mffVersion", "recordTime"] + version_and_date = _extract(tags, filepath=info_filepath) + version = "" + if len(version_and_date["mffVersion"]): + version = version_and_date["mffVersion"][0] + + fname = op.join(filepath, eeg_file) + signal_blocks = _get_blocks(fname) + epochs = _get_ep_info(filepath) + summaryinfo = dict(eeg_fname=eeg_file, info_fname=eeg_info_file) + summaryinfo.update(signal_blocks) + # sanity check and update relevant values + record_time = version_and_date["recordTime"][0] + # e.g., + # 2018-07-30T10:47:01.021673-04:00 + # 2017-09-20T09:55:44.072000000+01:00 + g = re.match( + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.(\d{6}(?:\d{3})?)[+-]\d{2}:\d{2}", # noqa: E501 + record_time, + ) + if g is None: + raise RuntimeError(f"Could not parse recordTime {repr(record_time)}") + frac = g.groups()[0] + assert len(frac) in (6, 9) and all(f.isnumeric() for f in frac) # regex + div = 1000 if len(frac) == 6 else 1000000 + for key in ("last_samps", "first_samps"): + # convert from times in µS to samples + for ei, e in enumerate(epochs[key]): + if e % div != 0: + raise RuntimeError(f"Could not parse epoch time {e}") + epochs[key][ei] = e // div + epochs[key] = np.array(epochs[key], np.uint64) + # I guess they refer to times in milliseconds? + # What we really need to do here is: + # epochs[key] *= signal_blocks['sfreq'] + # epochs[key] //= 1000 + # But that multiplication risks an overflow, so let's only multiply + # by what we need to (e.g., a sample rate of 500 means we can multiply + # by 1 and divide by 2 rather than multiplying by 500 and dividing by + # 1000) + numerator = int(signal_blocks["sfreq"]) + denominator = 1000 + this_gcd = math.gcd(numerator, denominator) + numerator = numerator // this_gcd + denominator = denominator // this_gcd + with np.errstate(over="raise"): + epochs[key] *= numerator + epochs[key] //= denominator + # Should be safe to cast to int now, which makes things later not + # upbroadcast to float + epochs[key] = epochs[key].astype(np.int64) + n_samps_block = signal_blocks["samples_block"].sum() + n_samps_epochs = (epochs["last_samps"] - epochs["first_samps"]).sum() + bad = ( + n_samps_epochs != n_samps_block + or not (epochs["first_samps"] < epochs["last_samps"]).all() + or not (epochs["first_samps"][1:] >= epochs["last_samps"][:-1]).all() + ) + if bad: + raise RuntimeError( + "EGI epoch first/last samps could not be parsed:\n" + f'{list(epochs["first_samps"])}\n{list(epochs["last_samps"])}' + ) + summaryinfo.update(epochs) + # index which samples in raw are actually readable from disk (i.e., not + # in a skip) + disk_samps = np.full(epochs["last_samps"][-1], -1) + offset = 0 + for first, last in zip(epochs["first_samps"], epochs["last_samps"]): + n_this = last - first + disk_samps[first:last] = np.arange(offset, offset + n_this) + offset += n_this + summaryinfo["disk_samps"] = disk_samps - sensor_layout_file = op.join(input_fname, "sensorLayout.xml") + # Add the sensor info. + sensor_layout_file = op.join(filepath, "sensorLayout.xml") sensor_layout_obj = parse(sensor_layout_file) - device = sensor_layout_obj.getElementsByTagName("name")[0].firstChild.data + summaryinfo["device"] = sensor_layout_obj.getElementsByTagName("name")[ + 0 + ].firstChild.data sensors = sensor_layout_obj.getElementsByTagName("sensor") - - chan_type = [] - chan_unit = [] - numbers = [] + chan_type = list() + chan_unit = list() n_chans = 0 + numbers = list() # used for identification for sensor in sensors: sensortype = int(sensor.getElementsByTagName("type")[0].firstChild.data) if sensortype in [0, 1]: - sn = sensor.getElementsByTagName("number")[0].firstChild.data.encode() + sn = sensor.getElementsByTagName("number")[0].firstChild.data + sn = sn.encode() numbers.append(sn) chan_type.append("eeg") chan_unit.append("uV") - n_chans += 1 - - # Collect epoch bounds and per-epoch sample counts from mffpy - first_samps = [] - last_samps = [] - samples_block = [] - pns_samples_block = [] - for ei in range(len(mff_reader.epochs)): - epoch = mff_reader.epochs[ei] - data_epoch = mff_reader.get_physical_samples_from_epoch(epoch) - eeg_samples = int(data_epoch["EEG"][0].shape[1]) - first = int(np.round(epoch.t0 * sfreq)) - last = first + eeg_samples - first_samps.append(first) - last_samps.append(last) - samples_block.append(eeg_samples) - - pns_arr = data_epoch.get("PNSData") - pns_samples_block.append(0 if pns_arr is None else int(pns_arr[0].shape[1])) - - first_samps = np.array(first_samps, dtype=np.int64) - last_samps = np.array(last_samps, dtype=np.int64) - samples_block = np.array(samples_block, dtype=np.int64) - pns_samples_block = np.array(pns_samples_block, dtype=np.int64) - - # index which samples in raw are actually readable from disk (i.e., not in a skip) - disk_samps = np.full(last_samps[-1], -1, dtype=np.int64) - offset = 0 - for first, last in zip(first_samps, last_samps): - n_this = last - first - disk_samps[first:last] = np.arange(offset, offset + n_this) - offset += n_this + n_chans = n_chans + 1 + if n_chans != summaryinfo["n_channels"]: + raise RuntimeError( + "Number of defined channels (%d) did not match the " + "expected channels (%d)" % (n_chans, summaryinfo["n_channels"]) + ) - # Parse PNS channel metadata if present + # Check presence of PNS data pns_names = [] - pns_types = [] - pns_units = [] - pns_fname = None if "PNS" in all_files: - pns_fname = all_files["PNS"]["signal"] - pns_file = op.join(input_fname, "pnsSet.xml") - if op.exists(pns_file): - pns_obj = parse(pns_file) - pns_sensors = pns_obj.getElementsByTagName("sensor") - for sensor in pns_sensors: - name = sensor.getElementsByTagName("name")[0].firstChild.data - unit_elem = sensor.getElementsByTagName("unit")[0].firstChild - unit = "" if unit_elem is None else unit_elem.data - if name == "ECG": - ch_type = "ecg" - elif "EMG" in name: - ch_type = "emg" - else: - ch_type = "bio" - pns_names.append(name) - pns_types.append(ch_type) - pns_units.append(unit) + pns_fpath = op.join(filepath, all_files["PNS"]["signal"]) + pns_blocks = _get_blocks(pns_fpath) + pns_samples = pns_blocks["samples_block"] + signal_samples = signal_blocks["samples_block"] + same_blocks = np.array_equal( + pns_samples[:-1], signal_samples[:-1] + ) and pns_samples[-1] in (signal_samples[-1] - np.arange(2)) + if not same_blocks: + raise RuntimeError( + "PNS and signals samples did not match:\n" + f"{list(pns_samples)}\nvs\n{list(signal_samples)}" + ) + + pns_file = op.join(filepath, "pnsSet.xml") + pns_obj = parse(pns_file) + sensors = pns_obj.getElementsByTagName("sensor") + pns_types = [] + pns_units = [] + for sensor in sensors: + # sensor number: + # sensor.getElementsByTagName('number')[0].firstChild.data + name = sensor.getElementsByTagName("name")[0].firstChild.data + unit_elem = sensor.getElementsByTagName("unit")[0].firstChild + unit = "" + if unit_elem is not None: + unit = unit_elem.data + + if name == "ECG": + ch_type = "ecg" + elif "EMG" in name: + ch_type = "emg" + else: + ch_type = "bio" + pns_types.append(ch_type) + pns_units.append(unit) + pns_names.append(name) + + summaryinfo.update( + pns_types=pns_types, + pns_units=pns_units, + pns_fname=all_files["PNS"]["signal"], + pns_sample_blocks=pns_blocks, + ) + summaryinfo.update( + pns_names=pns_names, + version=version, + date=version_and_date["recordTime"][0], + chan_type=chan_type, + chan_unit=chan_unit, + numbers=numbers, + ) + + return summaryinfo + + +def _read_header(input_fname): + """Obtain the headers from the file package mff. + + Parameters + ---------- + input_fname : path-like + Path for the file + + Returns + ------- + info : dict + Main headers set. + """ + input_fname = str(input_fname) # cast to str any Paths + mff_hdr = _read_mff_header(input_fname) + with open(input_fname + "/signal1.bin", "rb") as fid: + version = np.fromfile(fid, np.int32, 1)[0] + """ + the datetime.strptime .f directive (milleseconds) + will only accept up to 6 digits. if there are more than + six millesecond digits in the provided timestamp string + (i.e. because of trailing zeros, as in test_egi_pns.mff) + then slice both the first 26 elements and the last 6 + elements of the timestamp string to truncate the + milleseconds to 6 digits and extract the timezone, + and then piece these together and assign back to mff_hdr['date'] + """ + if len(mff_hdr["date"]) > 32: + dt, tz = [mff_hdr["date"][:26], mff_hdr["date"][-6:]] + mff_hdr["date"] = dt + tz + + time_n = datetime.datetime.strptime(mff_hdr["date"], "%Y-%m-%dT%H:%M:%S.%f%z") info = dict( - version=0, - meas_dt_local=meas_dt_local, - utc_offset=meas_dt_local.strftime("%z"), + version=version, + meas_dt_local=time_n, + utc_offset=time_n.strftime("%z"), gain=0, bits=0, value_range=0, + ) + info.update( n_categories=0, n_segments=1, n_events=0, @@ -174,160 +455,9 @@ def _get_info_from_mff_reader(input_fname, mff_reader): category_names=[], category_lengths=[], pre_baseline=0, - sfreq=sfreq, - n_channels=n_chans, - eeg_fname=eeg_file, - info_fname=eeg_info_file, - device=device, - chan_type=chan_type, - chan_unit=chan_unit, - numbers=numbers, - first_samps=first_samps, - last_samps=last_samps, - samples_block=samples_block, - disk_samps=disk_samps, - pns_names=pns_names, - pns_types=pns_types, - pns_units=pns_units, - pns_fname=pns_fname, - pns_sample_blocks={ - "n_channels": len(pns_names), - "samples_block": pns_samples_block, - }, - mff_path=input_fname, ) + info.update(mff_hdr) return info -def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): - """Read event tracks using mffpy XML parsing and return dense event matrix.""" - from mffpy.xml_files import XML, EventTrack - - mff_events = OrderedDict() - basenames = mff_reader.directory.listdir() - for basename in basenames: - lower_name = basename.lower() - if not lower_name.endswith(".xml") or basename.startswith("._"): - continue - stem = Path(basename).stem - try: - with mff_reader.directory.filepointer(stem) as fp: - xml_obj = XML.from_file(fp, recover=False) - except Exception as err: - if "XMLSyntaxError" in type(err).__name__: - warn(f"Could not parse the XML file {basename}. Skipping it.") - continue - if not isinstance(xml_obj, EventTrack): - continue - try: - events_iter = xml_obj.events - for event in events_iter: - code = event.get("code") or event.get("label") or xml_obj.name - begin_time = event.get("beginTime") - if code is None or begin_time is None: - continue - sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) - if 0 <= sample < n_samples: - mff_events.setdefault(code, []).append(sample) - except Exception: - _soft_import("defusedxml", "reading EGI MFF event tracks") - from defusedxml import ElementTree as ET - - xml_path = op.join(str(input_fname), basename) - try: - root = ET.parse(xml_path).getroot() - except Exception as err: - if ( - "ParseError" in type(err).__name__ - or "XMLSyntaxError" in type(err).__name__ - ): - warn(f"Could not parse the XML file {basename}. Skipping it.") - continue - for event_el in root.iter(): - if event_el.tag.split("}")[-1] != "event": - continue - event_fields = {} - for child in event_el: - event_fields[child.tag.split("}")[-1]] = child.text - code = ( - event_fields.get("code") - or event_fields.get("label") - or xml_obj.name - ) - begin_time = _parse_egi_datetime(event_fields.get("beginTime")) - if code is None or begin_time is None: - continue - sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) - if 0 <= sample < n_samples: - mff_events.setdefault(code, []).append(sample) - - event_codes = list(mff_events.keys()) - egi_events = np.zeros((len(event_codes), n_samples)) - for event_idx, code in enumerate(event_codes): - if len(mff_events[code]): - egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 - return egi_events, event_codes, mff_events - - -def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): - """Read event tracks using mffpy XML parsing and return dense event matrix.""" - from mffpy.xml_files import EventTrack, XML - - mff_events = OrderedDict() - basenames = mff_reader.directory.listdir() - for basename in basenames: - lower_name = basename.lower() - if not lower_name.endswith(".xml") or basename.startswith("._"): - continue - stem = Path(basename).stem - try: - with mff_reader.directory.filepointer(stem) as fp: - xml_obj = XML.from_file(fp, recover=False) - except Exception as err: - if "XMLSyntaxError" in type(err).__name__: - warn(f"Could not parse the XML file {basename}. Skipping it.") - continue - if not isinstance(xml_obj, EventTrack): - continue - try: - events_iter = xml_obj.events - for event in events_iter: - code = event.get("code") or event.get("label") or xml_obj.name - begin_time = event.get("beginTime") - if code is None or begin_time is None: - continue - sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) - if 0 <= sample < n_samples: - mff_events.setdefault(code, []).append(sample) - except Exception: - _soft_import("defusedxml", "reading EGI MFF event tracks") - from defusedxml import ElementTree as ET - - xml_path = op.join(str(input_fname), basename) - try: - root = ET.parse(xml_path).getroot() - except Exception as err: - if "ParseError" in type(err).__name__ or "XMLSyntaxError" in type(err).__name__: - warn(f"Could not parse the XML file {basename}. Skipping it.") - continue - for event_el in root.iter(): - if event_el.tag.split("}")[-1] != "event": - continue - event_fields = {} - for child in event_el: - event_fields[child.tag.split("}")[-1]] = child.text - code = event_fields.get("code") or event_fields.get("label") or xml_obj.name - begin_time = _parse_egi_datetime(event_fields.get("beginTime")) - if code is None or begin_time is None: - continue - sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) - if 0 <= sample < n_samples: - mff_events.setdefault(code, []).append(sample) - - event_codes = list(mff_events.keys()) - egi_events = np.zeros((len(event_codes), n_samples)) - for event_idx, code in enumerate(event_codes): - if len(mff_events[code]): - egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 - return egi_events, event_codes, mff_events def _get_eeg_calibration_info(filepath, egi_info): @@ -352,7 +482,7 @@ def _read_locs(filepath, egi_info, channel_naming): fname = op.join(filepath, "coordinates.xml") if not op.exists(fname): - warn("File coordinates.xml not found, not setting channel locations") + logger.warn("File coordinates.xml not found, not setting channel locations") ch_names = [channel_naming % (i + 1) for i in range(egi_info["n_channels"])] return ch_names, None dig_ident_map = { @@ -453,14 +583,14 @@ class RawMff(BaseRaw): def __init__( self, input_fname, - eog=None, - misc=None, - include=None, - exclude=None, - preload=False, - channel_naming="E%d", + eog=None, # XXX: allow user to specify EOG channels? + misc=None, # XXX: allow user to specify misc channels? + include=None, # XXX: Now We dont create stim channels. Remove this? + exclude=None, # XXX: Ditto. But maybe we can exclude events from annots. + preload=False, # XXX: Make this work again + channel_naming="E%d", # XXX: Do we need to still support this? *, - events_as_annotations=True, + events_as_annotations=True, # XXX: This is now the only way. Remove? verbose=None, ): """Init the RawMff class.""" @@ -474,45 +604,36 @@ def __init__( ) ) logger.info(f"Reading EGI MFF Header from {input_fname}...") - mff_reader = _get_mff_reader(input_fname) - egi_info = _get_info_from_mff_reader(input_fname, mff_reader) - if eog is None: - eog = [] - if misc is None: - misc = np.where(np.array(egi_info["chan_type"]) != "eeg")[0].tolist() - - logger.info(" Reading events ...") - egi_events, event_codes, mff_events = _read_mff_events( - input_fname, - mff_reader, - egi_info["sfreq"], - egi_info["last_samps"][-1], - egi_info["meas_dt_local"], - ) - egi_info["n_events"] = len(event_codes) - egi_info["event_codes"] = event_codes - cals = _get_eeg_calibration_info(input_fname, egi_info) - logger.info(" Assembling measurement info ...") + eog = [] if eog is None else eog + misc = [] if misc is None else misc + egi_info = _read_header(input_fname) + + # Event data (for stim channels and optional STI 014) + egi_events, egi_info, mff_events = _read_events(input_fname, egi_info) + event_codes = list(egi_info["event_codes"]) include = _triage_include_exclude(include, exclude, egi_events, egi_info) - if egi_info["n_events"] > 0 and not events_as_annotations: - logger.info(' Synthesizing trigger channel "STI 014" ...') - if all(ch.startswith("D") for ch in include): - # support the DIN format DIN1, DIN2, ..., DIN9, DI10, DI11, ... DI99, - # D100, D101, ..., D255 that we get when sending 0-255 triggers on a - # parallel port. - events_ids = list() - for ch in include: - while not ch[0].isnumeric(): - ch = ch[1:] - events_ids.append(int(ch)) + if not events_as_annotations: + included_codes = [e for e in event_codes if e in include] + if len(included_codes): + events_ids = [] + next_id = 1 + for code in included_codes: + match = re.match(r"DIN(\d+)$", code) + if match is not None: + events_ids.append(int(match.group(1))) + else: + while next_id in events_ids: + next_id += 1 + events_ids.append(next_id) + next_id += 1 + events_ids = np.array(events_ids, int) + egi_info["new_trigger"] = _combine_triggers( + egi_events[[c in include for c in event_codes]], remapping=events_ids + ) + self.event_id = dict(zip(included_codes, events_ids)) else: - events_ids = np.arange(len(include)) + 1 - egi_info["new_trigger"] = _combine_triggers( - egi_events[[c in include for c in event_codes]], remapping=events_ids - ) - self.event_id = dict( - zip([e for e in event_codes if e in include], events_ids) - ) + egi_info["new_trigger"] = None + self.event_id = None if egi_info["new_trigger"] is not None: egi_events = np.vstack([egi_events, egi_info["new_trigger"]]) else: @@ -520,28 +641,25 @@ def __init__( egi_info["new_trigger"] = None assert egi_events.shape[1] == egi_info["last_samps"][-1] + # Info and channels meas_dt_utc = egi_info["meas_dt_local"].astimezone(datetime.timezone.utc) info = _empty_info(egi_info["sfreq"]) info["meas_date"] = _ensure_meas_date_none_or_dt(meas_dt_utc) info["utc_offset"] = egi_info["utc_offset"] info["device_info"] = dict(type=egi_info["device"]) - # read in the montage, if it exists ch_names, mon = _read_locs(input_fname, egi_info, channel_naming) - # Second: Stim ch_names.extend(list(egi_info["event_codes"])) n_extra = len(event_codes) + len(misc) + len(eog) + len(egi_info["pns_names"]) if egi_info["new_trigger"] is not None: - ch_names.append("STI 014") # channel for combined events + ch_names.append("STI 014") n_extra += 1 - - # Third: PNS ch_names.extend(egi_info["pns_names"]) + cals = _get_eeg_calibration_info(input_fname, egi_info) cals = np.concatenate([cals, np.ones(n_extra)]) assert len(cals) == len(ch_names), (len(cals), len(ch_names)) - # Actually create channels as EEG, then update stim and PNS ch_coil = FIFF.FIFFV_COIL_EEG ch_kind = FIFF.FIFFV_EEG_CH chs = _create_chs(ch_names, cals, ch_coil, ch_kind, eog, (), (), misc) @@ -568,6 +686,8 @@ def __init__( if mon is not None: info.set_montage(mon, on_missing="ignore") + + if mon is not None: ref_idx = np.flatnonzero(np.isin(mon.ch_names, REFERENCE_NAMES)) if len(ref_idx): ref_idx = ref_idx.item() @@ -579,7 +699,6 @@ def __init__( file_bin = op.join(input_fname, egi_info["eeg_fname"]) egi_info["egi_events"] = egi_events - # Check how many channels to read are from EEG keys = ("eeg", "sti", "pns") idx = dict() idx["eeg"] = np.where([ch["kind"] == FIFF.FIFFV_EEG_CH for ch in chs])[0] @@ -590,28 +709,25 @@ def __init__( for ch in chs ] )[0] - # By construction this should always be true, but check anyway if not np.array_equal( np.concatenate([idx[key] for key in keys]), np.arange(len(chs)) ): - raise ValueError( - "Currently interlacing EEG and PNS channels is not supported" - ) + raise ValueError("Currently interlacing EEG and PNS channels is not supported") + egi_info["kind_bounds"] = [0] for key in keys: egi_info["kind_bounds"].append(len(idx[key])) egi_info["kind_bounds"] = np.cumsum(egi_info["kind_bounds"]) assert egi_info["kind_bounds"][0] == 0 assert egi_info["kind_bounds"][-1] == info["nchan"] + first_samps = [0] last_samps = [egi_info["last_samps"][-1] - 1] annot = dict(onset=list(), duration=list(), description=list()) if len(idx["pns"]): - # PNS Data is present and should be read: egi_info["pns_filepath"] = op.join(input_fname, egi_info["pns_fname"]) - # Check for PNS bug immediately pns_samples = np.sum(egi_info["pns_sample_blocks"]["samples_block"]) eeg_samples = np.sum(egi_info["samples_block"]) if pns_samples == eeg_samples - 1: @@ -621,8 +737,8 @@ def __init__( annot["description"].append("BAD_EGI_PSG") elif pns_samples != eeg_samples: raise RuntimeError( - f"PNS samples ({pns_samples}) did not match EEG samples " - f"({eeg_samples})." + "PNS samples (%d) did not match EEG samples (%d)" + % (pns_samples, eeg_samples) ) super().__init__( @@ -636,25 +752,21 @@ def __init__( verbose=verbose, ) - # Annotate acquisition skips - has_skips = False + egi_info["has_acq_skip"] = np.any( + egi_info["first_samps"][1:] > egi_info["last_samps"][:-1] + ) + egi_info["_acq_skip_warned"] = False + for first, prev_last in zip( egi_info["first_samps"][1:], egi_info["last_samps"][:-1] ): gap = first - prev_last assert gap >= 0 if gap: - has_skips = True annot["onset"].append((prev_last - 0.5) / egi_info["sfreq"]) annot["duration"].append(gap / egi_info["sfreq"]) annot["description"].append("BAD_ACQ_SKIP") - if has_skips and (not events_as_annotations) and len(mff_events): - warn( - "Acquisition skips detected. EGI MFF file contains gaps between " - "recording epochs." - ) - # create events from annotations if events_as_annotations: for code, samples in mff_events.items(): if code not in include: @@ -669,63 +781,180 @@ def __init__( def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a chunk of data.""" logger.debug(f"Reading MFF {start:6d} ... {stop:6d} ...") + dtype = " 0 + and not egi_info.get("_acq_skip_warned", False) + ): + warn( + "Acquisition skips detected. EGI MFF file contains gaps between " + "recording epochs.", + RuntimeWarning, + ) + egi_info["_acq_skip_warned"] = True one = np.zeros((egi_info["kind_bounds"][-1], stop - start)) # info about the binary file structure n_channels = egi_info["n_channels"] - mff_reader = _get_mff_reader(egi_info["mff_path"]) - first_samps = egi_info["first_samps"] - last_samps = egi_info["last_samps"] + samples_block = egi_info["samples_block"] # Check how many channels to read are from each type bounds = egi_info["kind_bounds"] - if idx is None: - idx = np.arange(bounds[-1]) if isinstance(idx, slice): - idx = np.arange(bounds[-1])[idx] - idx = np.array(idx, dtype=int) + idx = np.arange(idx.start, idx.stop) eeg_out = np.where(idx < bounds[1])[0] - eeg_one = idx[eeg_out] + eeg_one = idx[eeg_out, np.newaxis] eeg_in = idx[eeg_out] stim_out = np.where((idx >= bounds[1]) & (idx < bounds[2]))[0] stim_one = idx[stim_out] stim_in = idx[stim_out] - bounds[1] pns_out = np.where((idx >= bounds[2]) & (idx < bounds[3]))[0] pns_in = idx[pns_out] - bounds[2] - pns_one = idx[pns_out] + pns_one = idx[pns_out, np.newaxis] del eeg_out, stim_out, pns_out # take into account events (already extended to correct size) one[stim_one, :] = egi_info["egi_events"][stim_in, start:stop] - # Read only overlapping epoch segments from mffpy (keeps skips as zeros) - for epoch_idx, (first, last) in enumerate(zip(first_samps, last_samps)): - overlap_start = max(start, first) - overlap_stop = min(stop, last) - if overlap_stop <= overlap_start: - continue - - epoch = mff_reader.epochs[epoch_idx] - epoch_data = mff_reader.get_physical_samples_from_epoch(epoch) - eeg_block = epoch_data["EEG"][0][:n_channels] - src_start = overlap_start - first - src_stop = overlap_stop - first - dst_start = overlap_start - start - dst_stop = overlap_stop - start - - if len(eeg_one): - one[eeg_one, dst_start:dst_stop] = eeg_block[eeg_in, src_start:src_stop] - - if len(pns_one) and "PNSData" in epoch_data: - pns_block = epoch_data["PNSData"][0] - src_stop_pns = min(src_stop, pns_block.shape[1]) - if src_stop_pns > src_start: - dst_stop_pns = dst_start + (src_stop_pns - src_start) - one[pns_one, dst_start:dst_stop_pns] = pns_block[ - pns_in, src_start:src_stop_pns - ] + # Convert start and stop to limits in terms of the data + # actually on disk, plus an indexer (disk_use_idx) that populates + # the potentially larger `data` with it, taking skips into account + disk_samps = egi_info["disk_samps"][start:stop] + disk_use_idx = np.where(disk_samps > -1)[0] + # short circuit in case we don't need any samples + if not len(disk_use_idx): + _mult_cal_one(data, one, idx, cals, mult) + return + + start = disk_samps[disk_use_idx[0]] + stop = disk_samps[disk_use_idx[-1]] + 1 + assert len(disk_use_idx) == stop - start + + # Get starting/stopping block/samples + block_samples_offset = np.cumsum(samples_block) + offset_blocks = np.sum(block_samples_offset <= start) + offset_samples = start - ( + block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 + ) + + # TODO: Refactor this reading with the PNS reading in a single function + # (DRY) + samples_to_read = stop - start + with open(self.filenames[fi], "rb", buffering=0) as fid: + # Go to starting block + current_block = 0 + current_block_info = None + current_data_sample = 0 + while current_block < offset_blocks: + this_block_info = _block_r(fid) + if this_block_info is not None: + current_block_info = this_block_info + fid.seek(current_block_info["block_size"], 1) + current_block += 1 + + # Start reading samples + while samples_to_read > 0: + logger.debug(f" Reading from block {current_block}") + this_block_info = _block_r(fid) + current_block += 1 + if this_block_info is not None: + current_block_info = this_block_info + + to_read = current_block_info["nsamples"] * current_block_info["nc"] + block_data = np.fromfile(fid, dtype, to_read) + block_data = block_data.reshape(n_channels, -1, order="C") + + # Compute indexes + samples_read = block_data.shape[1] + logger.debug(f" Read {samples_read} samples") + logger.debug(f" Offset {offset_samples} samples") + if offset_samples > 0: + # First block read, skip to the offset: + block_data = block_data[:, offset_samples:] + samples_read = samples_read - offset_samples + offset_samples = 0 + if samples_to_read < samples_read: + # Last block to read, skip the last samples + block_data = block_data[:, :samples_to_read] + samples_read = samples_to_read + logger.debug(f" Keep {samples_read} samples") + + s_start = current_data_sample + s_end = s_start + samples_read + + one[eeg_one, disk_use_idx[s_start:s_end]] = block_data[eeg_in] + samples_to_read = samples_to_read - samples_read + current_data_sample = current_data_sample + samples_read + + if len(pns_one) > 0: + # PNS Data is present and should be read: + pns_filepath = egi_info["pns_filepath"] + pns_info = egi_info["pns_sample_blocks"] + n_channels = pns_info["n_channels"] + samples_block = pns_info["samples_block"] + + # Get starting/stopping block/samples + block_samples_offset = np.cumsum(samples_block) + offset_blocks = np.sum(block_samples_offset < start) + offset_samples = start - ( + block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 + ) + + samples_to_read = stop - start + with open(pns_filepath, "rb", buffering=0) as fid: + # Check file size + fid.seek(0, 2) + file_size = fid.tell() + fid.seek(0) + # Go to starting block + current_block = 0 + current_block_info = None + current_data_sample = 0 + while current_block < offset_blocks: + this_block_info = _block_r(fid) + if this_block_info is not None: + current_block_info = this_block_info + fid.seek(current_block_info["block_size"], 1) + current_block += 1 + + # Start reading samples + while samples_to_read > 0: + if samples_to_read == 1 and fid.tell() == file_size: + # We are in the presence of the EEG bug + # fill with zeros and break the loop + one[pns_one, -1] = 0 + break + + this_block_info = _block_r(fid) + if this_block_info is not None: + current_block_info = this_block_info + + to_read = current_block_info["nsamples"] * current_block_info["nc"] + block_data = np.fromfile(fid, dtype, to_read) + block_data = block_data.reshape(n_channels, -1, order="C") + + # Compute indexes + samples_read = block_data.shape[1] + if offset_samples > 0: + # First block read, skip to the offset: + block_data = block_data[:, offset_samples:] + samples_read = samples_read - offset_samples + offset_samples = 0 + + if samples_to_read < samples_read: + # Last block to read, skip the last samples + block_data = block_data[:, :samples_to_read] + samples_read = samples_to_read + + s_start = current_data_sample + s_end = s_start + samples_read + + one[pns_one, disk_use_idx[s_start:s_end]] = block_data[pns_in] + samples_to_read = samples_to_read - samples_read + current_data_sample = current_data_sample + samples_read # do the calibration _mult_cal_one(data, one, idx, cals, mult) @@ -825,8 +1054,10 @@ def read_evokeds_mff( def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): """Read evoked data from MFF file.""" + import mffpy + + egi_info = _read_header(fname) mff = mffpy.Reader(fname) - egi_info = _get_info_from_mff_reader(str(fname), mff) categories = mff.categories.categories if isinstance(condition, str): @@ -944,8 +1175,28 @@ def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): def _import_mffpy(why="read averaged .mff files"): """Import and return module mffpy.""" - if mffpy is None: - msg = f"mffpy is required to {why}." + try: + import mffpy + except ImportError as exp: + msg = f"mffpy is required to {why}, got:\n{exp}" raise ImportError(msg) + # Monkey-patch mffpy to handle timestamps with 9 decimal places (nanoseconds) + # This is needed because some MFF files have timestamps like + # "2006-04-28T15:32:00.000000000+0100" which Python's %f can't parse + if not hasattr(mffpy.XML, '_mne_patched'): + original_parse_time_str = mffpy.XML._parse_time_str + + @classmethod + def _patched_parse_time_str(cls, txt): + """Parse time string with support for 9-decimal nanoseconds.""" + # Truncate nanoseconds to 6 decimal places if present + # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" + if txt and '.' in txt: + txt = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', txt) + return original_parse_time_str(txt) + + mffpy.XML._parse_time_str = _patched_parse_time_str + mffpy.XML._mne_patched = True + return mffpy From 770103467082e9dadd565b3ce2ba1997845585a7 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 10:11:03 +0530 Subject: [PATCH 06/13] EGI: restore mff event reader helper --- mne/io/egi/egimff.py | 76 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 94044a61958..033d8439553 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -24,7 +24,7 @@ from ...evoked import EvokedArray from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn from ..base import BaseRaw -from .events import _combine_triggers, _read_events, _triage_include_exclude +from .events import _combine_triggers, _triage_include_exclude from .general import ( _block_r, _extract, @@ -80,6 +80,80 @@ def _get_mff_startdatetime(mff_reader): return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f%z') +def _read_events(input_fname, egi_info): + """Read EGI event tracks from an MFF directory.""" + from mffpy.xml_files import XML, EventTrack + + mff_reader = _get_mff_reader(input_fname) + start_dt = _get_mff_startdatetime(mff_reader) + sfreq = egi_info["sfreq"] + n_samples = egi_info["last_samps"][-1] + + mff_events = OrderedDict() + for basename in mff_reader.directory.listdir(): + lower_name = basename.lower() + if not lower_name.endswith(".xml") or basename.startswith("._"): + continue + stem = Path(basename).stem + try: + with mff_reader.directory.filepointer(stem) as fp: + xml_obj = XML.from_file(fp, recover=False) + except Exception as err: + if "XMLSyntaxError" in type(err).__name__: + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + if not isinstance(xml_obj, EventTrack): + continue + try: + for event in xml_obj.events: + code = event.get("code") or event.get("label") or xml_obj.name + begin_time = event.get("beginTime") + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + except Exception: + _soft_import("defusedxml", "reading EGI MFF event tracks") + from defusedxml import ElementTree as ET + + xml_path = op.join(str(input_fname), basename) + try: + root = ET.parse(xml_path).getroot() + except Exception as err: + if ( + "ParseError" in type(err).__name__ + or "XMLSyntaxError" in type(err).__name__ + ): + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + for event_el in root.iter(): + if event_el.tag.split("}")[-1] != "event": + continue + event_fields = {} + for child in event_el: + event_fields[child.tag.split("}")[-1]] = child.text + code = ( + event_fields.get("code") + or event_fields.get("label") + or xml_obj.name + ) + begin_time = _parse_egi_datetime(event_fields.get("beginTime")) + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + + event_codes = list(mff_events.keys()) + egi_events = np.zeros((len(event_codes), n_samples)) + for event_idx, code in enumerate(event_codes): + if len(mff_events[code]): + egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 + egi_info["event_codes"] = event_codes + return egi_events, egi_info, mff_events + + def _get_mff_reader(input_fname): mffpy = _import_mffpy() mff_reader = mffpy.Reader(input_fname) From 63a7dc854c37adbbb66fe41cb96c4934c10464f9 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 10:39:49 +0530 Subject: [PATCH 07/13] Remove stale changelog fragment --- doc/changes/dev/#13684.newfeature.rst | 1 - 1 file changed, 1 deletion(-) delete mode 100644 doc/changes/dev/#13684.newfeature.rst diff --git a/doc/changes/dev/#13684.newfeature.rst b/doc/changes/dev/#13684.newfeature.rst deleted file mode 100644 index ccf2f875f49..00000000000 --- a/doc/changes/dev/#13684.newfeature.rst +++ /dev/null @@ -1 +0,0 @@ -The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by :newcontrib:`Pragnya Khandelwal`. \ No newline at end of file From 4ccd865be29b867e171bffcace4315f4eb9d1884 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 10:46:51 +0530 Subject: [PATCH 08/13] Resolve EGI test import conflict --- doc/changes/dev/#13914.newfeature.rst | 1 + mne/io/egi/tests/test_egi.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 doc/changes/dev/#13914.newfeature.rst diff --git a/doc/changes/dev/#13914.newfeature.rst b/doc/changes/dev/#13914.newfeature.rst new file mode 100644 index 00000000000..f0b2db774ce --- /dev/null +++ b/doc/changes/dev/#13914.newfeature.rst @@ -0,0 +1 @@ +The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by Pragnya Khandelwal. (:gh:`13914`) \ No newline at end of file diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 8d2fc0b3699..0ede82603ce 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -5,7 +5,6 @@ import os import shutil -from importlib.util import find_spec from copy import deepcopy from datetime import datetime, timezone from importlib.util import find_spec From b479a546e41d478bfb650d26c6e678b770f65d1e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 24 May 2026 05:17:36 +0000 Subject: [PATCH 09/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mne/io/egi/egimff.py | 53 +++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 033d8439553..85dcc6ba14e 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -61,6 +61,7 @@ def _get_mff_startdatetime(mff_reader): # mffpy has a bug parsing timestamps with 9 decimal places (nanoseconds) # Workaround: manually parse the timestamp from the info.xml file import xml.etree.ElementTree as ET + info_file = op.join(mff_reader.directory._mffname, "info.xml") tree = ET.parse(info_file) root = tree.getroot() @@ -72,12 +73,12 @@ def _get_mff_startdatetime(mff_reader): # Handle timestamps with up to 9 decimal places by truncating to 6 # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" # Both formats: +0100 (without colon) and +01:00 (with colon) - if '+' in time_str or '-' in time_str[-6:]: + if "+" in time_str or "-" in time_str[-6:]: # Truncate nanoseconds in decimal part (keep only 6 digits) - time_str = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', time_str) + time_str = re.sub(r"\.(\d{6})\d+([+-])", r".\1\2", time_str) # Python's %z can't always handle colons, so remove them - time_str = re.sub(r'([+-]\d{2}):(\d{2})$', r'\1\2', time_str) - return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f%z') + time_str = re.sub(r"([+-]\d{2}):(\d{2})$", r"\1\2", time_str) + return datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f%z") def _read_events(input_fname, egi_info): @@ -164,7 +165,7 @@ def _get_mff_reader(input_fname): def _get_montage(mff_reader): mffpy = _import_mffpy() xml_files = mff_reader.directory.files_by_type[".xml"] - + # Read coordinates.xml for fiducial positions coords_fname = fnmatch.filter(xml_files, "coordinates") coords_sensors = dict() @@ -172,23 +173,24 @@ def _get_montage(mff_reader): with mff_reader.directory.filepointer(coords_fname[0]) as fp: coords_content = mffpy.XML.from_file(fp).get_content() coords_sensors = coords_content.get("sensors", dict()) - + n_eeg_channels = mff_reader.num_channels["EEG"] # XXX: PNS? ch_pos = dict() hsp_list = [] # Extra headshape points lpa, rpa, nasion = None, None, None - + # Extract channel positions and fiducials from coordinates.xml for ch in coords_sensors.values(): # XXX: the y coordinate seems to be inverted? Need to investigate # Convert from cm to m loc = np.array([ch["x"], -(ch["y"]), ch["z"]]) / 100.0 name = ch.get("name", "None") - + # Check if this is a fiducial point if name == "Nasion": nasion = loc - elif name == "Left periauricular point": lpa = loc + elif name == "Left periauricular point": + lpa = loc elif name == "Right periauricular point": rpa = loc elif name in REFERENCE_NAMES or "VREF" in name or "Vertex" in name: @@ -198,10 +200,10 @@ def _get_montage(mff_reader): # EEG channel ch_name = name if name != "None" else f"E{ch['number']}" ch_pos[ch_name] = loc - + # Convert hsp list to array if not empty hsp = np.array(hsp_list) if hsp_list else None - + montage = make_dig_montage( ch_pos=ch_pos, nasion=nasion, lpa=lpa, rpa=rpa, hsp=hsp, coord_frame="unknown" ) @@ -221,7 +223,7 @@ def _get_info(mff_reader): info.set_meas_date(meas_date) with info._unlock(): info["utc_offset"] = utc_offset - + # Populate reference location (loc[3:6]) for each EEG channel # The reference is VREF (Vertex Reference), which is the last dig point if len(info["dig"]) > 0: @@ -229,13 +231,15 @@ def _get_info(mff_reader): for ch in info["chs"]: if ch["kind"] == FIFF.FIFFV_EEG_CH: ch["loc"][3:6] = ref_loc - + return info def _get_eeg_data(mff_reader): sfreq = mff_reader.sampling_rates["EEG"] # XXX: check PNS sfreq - n_channels = mff_reader.num_channels["EEG"] # Only EEG channels, not all signal types + n_channels = mff_reader.num_channels[ + "EEG" + ] # Only EEG channels, not all signal types epochs = mff_reader.epochs data_blocks, start_secs, end_secs = [], [], [] @@ -253,7 +257,7 @@ def _get_eeg_data(mff_reader): end_samp = start_samp + this_chunk.shape[1] max_end_samp = max(max_end_samp, end_samp) n_samps = max_end_samp - first_samp - + eeg = np.zeros((n_channels, n_samps), dtype=np.float64) for this_chunk, start in zip(data_blocks, start_secs): start_idx = int(start * sfreq) - first_samp @@ -383,7 +387,7 @@ def _read_mff_header(filepath): if bad: raise RuntimeError( "EGI epoch first/last samps could not be parsed:\n" - f'{list(epochs["first_samps"])}\n{list(epochs["last_samps"])}' + f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}" ) summaryinfo.update(epochs) # index which samples in raw are actually readable from disk (i.e., not @@ -702,7 +706,8 @@ def __init__( next_id += 1 events_ids = np.array(events_ids, int) egi_info["new_trigger"] = _combine_triggers( - egi_events[[c in include for c in event_codes]], remapping=events_ids + egi_events[[c in include for c in event_codes]], + remapping=events_ids, ) self.event_id = dict(zip(included_codes, events_ids)) else: @@ -786,7 +791,9 @@ def __init__( if not np.array_equal( np.concatenate([idx[key] for key in keys]), np.arange(len(chs)) ): - raise ValueError("Currently interlacing EEG and PNS channels is not supported") + raise ValueError( + "Currently interlacing EEG and PNS channels is not supported" + ) egi_info["kind_bounds"] = [0] for key in keys: @@ -1258,18 +1265,18 @@ def _import_mffpy(why="read averaged .mff files"): # Monkey-patch mffpy to handle timestamps with 9 decimal places (nanoseconds) # This is needed because some MFF files have timestamps like # "2006-04-28T15:32:00.000000000+0100" which Python's %f can't parse - if not hasattr(mffpy.XML, '_mne_patched'): + if not hasattr(mffpy.XML, "_mne_patched"): original_parse_time_str = mffpy.XML._parse_time_str - + @classmethod def _patched_parse_time_str(cls, txt): """Parse time string with support for 9-decimal nanoseconds.""" # Truncate nanoseconds to 6 decimal places if present # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" - if txt and '.' in txt: - txt = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', txt) + if txt and "." in txt: + txt = re.sub(r"\.(\d{6})\d+([+-])", r".\1\2", txt) return original_parse_time_str(txt) - + mffpy.XML._parse_time_str = _patched_parse_time_str mffpy.XML._mne_patched = True From 415ce4f06cc0f76633557571e8ba59f4bfe69f27 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 10:49:44 +0530 Subject: [PATCH 10/13] Fix changelog fragment naming --- doc/changes/dev/{#13914.newfeature.rst => 13914.newfeature.rst} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename doc/changes/dev/{#13914.newfeature.rst => 13914.newfeature.rst} (71%) diff --git a/doc/changes/dev/#13914.newfeature.rst b/doc/changes/dev/13914.newfeature.rst similarity index 71% rename from doc/changes/dev/#13914.newfeature.rst rename to doc/changes/dev/13914.newfeature.rst index f0b2db774ce..107be2c1326 100644 --- a/doc/changes/dev/#13914.newfeature.rst +++ b/doc/changes/dev/13914.newfeature.rst @@ -1 +1 @@ -The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by Pragnya Khandelwal. (:gh:`13914`) \ No newline at end of file +The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by Pragnya Khandelwal. \ No newline at end of file From e49c2c26f3f30236e85bee058bf7d1e8261b1e1e Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 11:09:00 +0530 Subject: [PATCH 11/13] Fix EGI datetime helper --- mne/io/egi/egimff.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 85dcc6ba14e..813290855e9 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -54,7 +54,7 @@ def _read_mff(input_fname): def _get_mff_startdatetime(mff_reader): - """Get start datetime from mff_reader, with workaround for nanosecond precision bug.""" + """Get start datetime from mff_reader with nanosecond workaround.""" try: return mff_reader.startdatetime except (ValueError, AttributeError): @@ -71,7 +71,8 @@ def _get_mff_startdatetime(mff_reader): raise time_str = time_elem.text # Handle timestamps with up to 9 decimal places by truncating to 6 - # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" + # e.g. "2017-09-20T09:55:44.072000000+01:00" -> + # "2017-09-20T09:55:44.072000+01:00" # Both formats: +0100 (without colon) and +01:00 (with colon) if "+" in time_str or "-" in time_str[-6:]: # Truncate nanoseconds in decimal part (keep only 6 digits) @@ -81,6 +82,16 @@ def _get_mff_startdatetime(mff_reader): return datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f%z") +def _parse_egi_datetime(time_str): + """Parse an EGI datetime string with the same nanosecond workaround.""" + if time_str is None: + return None + if "+" in time_str or "-" in time_str[-6:]: + time_str = re.sub(r"\.(\d{6})\d+([+-])", r".\1\2", time_str) + time_str = re.sub(r"([+-]\d{2}):(\d{2})$", r"\1\2", time_str) + return datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f%z") + + def _read_events(input_fname, egi_info): """Read EGI event tracks from an MFF directory.""" from mffpy.xml_files import XML, EventTrack @@ -422,8 +433,8 @@ def _read_mff_header(filepath): n_chans = n_chans + 1 if n_chans != summaryinfo["n_channels"]: raise RuntimeError( - "Number of defined channels (%d) did not match the " - "expected channels (%d)" % (n_chans, summaryinfo["n_channels"]) + f"Number of defined channels ({n_chans}) did not match the expected " + f"channels ({summaryinfo['n_channels']})" ) # Check presence of PNS data @@ -818,8 +829,8 @@ def __init__( annot["description"].append("BAD_EGI_PSG") elif pns_samples != eeg_samples: raise RuntimeError( - "PNS samples (%d) did not match EEG samples (%d)" - % (pns_samples, eeg_samples) + f"PNS samples ({pns_samples}) did not match EEG samples " + f"({eeg_samples})" ) super().__init__( @@ -1272,7 +1283,8 @@ def _import_mffpy(why="read averaged .mff files"): def _patched_parse_time_str(cls, txt): """Parse time string with support for 9-decimal nanoseconds.""" # Truncate nanoseconds to 6 decimal places if present - # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" + # e.g. "2017-09-20T09:55:44.072000000+01:00" -> + # "2017-09-20T09:55:44.072000+01:00" if txt and "." in txt: txt = re.sub(r"\.(\d{6})\d+([+-])", r".\1\2", txt) return original_parse_time_str(txt) From 43aab8b974f224caf926abc4a23bcbcf33c61238 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 11:14:55 +0530 Subject: [PATCH 12/13] Remove unused EGI helper --- mne/io/egi/egimff.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 813290855e9..91c342f2480 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -44,15 +44,6 @@ # - [ ] Replace _read_header with mffpy functions? -def _read_mff(input_fname): - """Read EGI MFF file.""" - mff_reader = _get_mff_reader(input_fname) - eeg = _get_eeg_data(mff_reader) - info = _get_info(mff_reader) - annotations = _get_annotations(mff_reader, info) - return eeg, info, annotations - - def _get_mff_startdatetime(mff_reader): """Get start datetime from mff_reader with nanosecond workaround.""" try: From 6581fd8ef5f7a4fc0c7ba5285a79b7f24c6b444a Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Sun, 24 May 2026 12:39:08 +0530 Subject: [PATCH 13/13] EGI: restore _read_mff helper (mffpy-backed read path) --- mne/io/egi/egimff.py | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 91c342f2480..147e2ebf464 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -83,6 +83,13 @@ def _parse_egi_datetime(time_str): return datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f%z") +def _get_mff_reader(input_fname): + mffpy = _import_mffpy() + mff_reader = mffpy.Reader(input_fname) + mff_reader.set_unit("EEG", "V") # XXX: set PNS unit + return mff_reader + + def _read_events(input_fname, egi_info): """Read EGI event tracks from an MFF directory.""" from mffpy.xml_files import XML, EventTrack @@ -157,13 +164,6 @@ def _read_events(input_fname, egi_info): return egi_events, egi_info, mff_events -def _get_mff_reader(input_fname): - mffpy = _import_mffpy() - mff_reader = mffpy.Reader(input_fname) - mff_reader.set_unit("EEG", "V") # XXX: set PNS unit - return mff_reader - - def _get_montage(mff_reader): mffpy = _import_mffpy() xml_files = mff_reader.directory.files_by_type[".xml"] @@ -318,6 +318,25 @@ def _get_annotations(mff_reader, mne_info): return event_annots + gap_annots +def _read_mff(input_fname): + """Read EGI MFF file using the mffpy-backed helpers. + + Returns + ------- + eeg : array-like + Raw EEG data as returned by `_get_eeg_data`. + info : dict + MNE `info` structure built by `_get_info`. + annotations : instance of `mne.Annotations` + Annotations built from events and gaps via `_get_annotations`. + """ + mff_reader = _get_mff_reader(input_fname) + eeg = _get_eeg_data(mff_reader) + info = _get_info(mff_reader) + annotations = _get_annotations(mff_reader, info) + return eeg, info, annotations + + def _read_mff_header(filepath): """Read mff header.""" _soft_import("defusedxml", "reading EGI MFF data")