From d5c1df8c8a36305033d33061045e5e677a9833d4 Mon Sep 17 00:00:00 2001 From: Scott Huberty Date: Thu, 21 Nov 2024 10:05:47 -0800 Subject: [PATCH 01/14] t status --short --branch; git rebase --show-current-patch | Out-String WIP: Use Mffpy for read_raw_egi [ci skip] --- mne/io/egi/egimff.py | 181 +++++++++++++++++++++---------------------- 1 file changed, 89 insertions(+), 92 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 870f58890a2..f3bb8933598 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -5,6 +5,8 @@ """EGI NetStation Load Function.""" import datetime +import fnmatch +import itertools import math import os.path as op import re @@ -14,7 +16,7 @@ import numpy as np from ..._fiff.constants import FIFF -from ..._fiff.meas_info import _empty_info, _ensure_meas_date_none_or_dt, create_info +from ..._fiff.meas_info import create_info from ..._fiff.proj import setup_proj from ..._fiff.utils import _create_chs, _mult_cal_one from ...annotations import Annotations @@ -22,7 +24,6 @@ from ...evoked import EvokedArray from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn from ..base import BaseRaw -from .events import _combine_triggers, _read_events, _triage_include_exclude from .general import ( _block_r, _extract, @@ -35,6 +36,14 @@ REFERENCE_NAMES = ("VREF", "Vertex Reference") +# TODO: Running list +# - [ ] Add support for reading in the PNS data +# - [ ] Add tutorial for reading calibration data +# - [ ] Annotate acquisition skips +# - [ ] Add support for reading in the channel status (bad channels) +# - [ ] Replace _read_header with mffpy functions? + + def _read_mff_header(filepath): """Read mff header.""" _soft_import("defusedxml", "reading EGI MFF data") @@ -381,17 +390,19 @@ class RawMff(BaseRaw): def __init__( self, input_fname, - eog=None, - misc=None, - include=None, - exclude=None, - preload=False, - channel_naming="E%d", + eog=None, # XXX: allow user to specify EOG channels? + misc=None, # XXX: allow user to specify misc channels? + include=None, # XXX: Now We dont create stim channels. Remove this? + exclude=None, # XXX: Ditto. But maybe we can exclude events from annots. + preload=False, # XXX: Make this work again + channel_naming="E%d", # XXX: Do we need to still support this? *, - events_as_annotations=True, + events_as_annotations=True, # XXX: This is now the only way. Remove? verbose=None, ): """Init the RawMff class.""" + mffpy = _import_mffpy() + input_fname = str( _check_fname( input_fname, @@ -402,61 +413,45 @@ def __init__( ) ) logger.info(f"Reading EGI MFF Header from {input_fname}...") - egi_info = _read_header(input_fname) - if eog is None: - eog = [] - if misc is None: - misc = np.where(np.array(egi_info["chan_type"]) != "eeg")[0].tolist() - - logger.info(" Reading events ...") - egi_events, egi_info, mff_events = _read_events(input_fname, egi_info) - cals = _get_eeg_calibration_info(input_fname, egi_info) - logger.info(" Assembling measurement info ...") - event_codes = egi_info["event_codes"] - include = _triage_include_exclude(include, exclude, egi_events, egi_info) - if egi_info["n_events"] > 0 and not events_as_annotations: - logger.info(' Synthesizing trigger channel "STI 014" ...') - if all(ch.startswith("D") for ch in include): - # support the DIN format DIN1, DIN2, ..., DIN9, DI10, DI11, ... DI99, - # D100, D101, ..., D255 that we get when sending 0-255 triggers on a - # parallel port. - events_ids = list() - for ch in include: - while not ch[0].isnumeric(): - ch = ch[1:] - events_ids.append(int(ch)) - else: - events_ids = np.arange(len(include)) + 1 - egi_info["new_trigger"] = _combine_triggers( - egi_events[[c in include for c in event_codes]], remapping=events_ids - ) - self.event_id = dict( - zip([e for e in event_codes if e in include], events_ids) - ) - if egi_info["new_trigger"] is not None: - egi_events = np.vstack([egi_events, egi_info["new_trigger"]]) - else: - self.event_id = None - egi_info["new_trigger"] = None - assert egi_events.shape[1] == egi_info["last_samps"][-1] - - meas_dt_utc = egi_info["meas_dt_local"].astimezone(datetime.timezone.utc) - info = _empty_info(egi_info["sfreq"]) - info["meas_date"] = _ensure_meas_date_none_or_dt(meas_dt_utc) - info["utc_offset"] = egi_info["utc_offset"] - info["device_info"] = dict(type=egi_info["device"]) - - # read in the montage, if it exists - ch_names, mon = _read_locs(input_fname, egi_info, channel_naming) - # Second: Stim - ch_names.extend(list(egi_info["event_codes"])) - n_extra = len(event_codes) + len(misc) + len(eog) + len(egi_info["pns_names"]) - if egi_info["new_trigger"] is not None: - ch_names.append("STI 014") # channel for combined events - n_extra += 1 - - # Third: PNS - ch_names.extend(egi_info["pns_names"]) + ################################### MFF Info ################################### + mff_reader = mffpy.Reader(input_fname) + mff_reader.set_unit("EEG", "V") + # mff_reader.set_unit("PNS", "V") XXX: need to test this + + meas_date = mff_reader.startdatetime.astimezone(datetime.timezone.utc) + sfreq = mff_reader.sampling_rates["EEG"] + # XXX: Can we have different sampling rates for EEG and PNS? + + xml_files = mff_reader.directory.files_by_type[".xml"] + info_files = fnmatch.filter(xml_files, "info?") + # XXX: usually info1.xml is EEG and info2.xml is PNS + if len(info_files) > 1: + raise NotImplementedError("TODO: Support for PNS data") + + ################################## Channels ################################### + with mff_reader.directory.filepointer(info_files[0]) as fp: + mff_info = mffpy.XML.from_file(fp) + _ = mff_info.generalInformation["montageName"] # XXX: Do we need this? + sensor_fname = fnmatch.filter(xml_files, "sensorLayout") + assert len(sensor_fname) == 1 # XXX: remove + sensor_fname = sensor_fname[0] + with mff_reader.directory.filepointer(sensor_fname) as fp: + sensor_layout = mffpy.XML.from_file(fp).get_content()["sensors"] + ch_pos = dict() + for ch in sensor_layout.values(): + if ch["type"] not in [0, 1]: # XXX: find out what type 2 is. Its not EEG + continue + name = f"E{ch['number']}" if ch["name"] == "None" else ch["name"] + loc = np.array([ch["x"], ch["y"], ch["z"]]) / 1000 # XXX: check units + ch_pos[name] = loc + montage = make_dig_montage(ch_pos=ch_pos, coord_frame="head") + + ################################## Samples #################################### + # XXX: This probably won't work as intended when there are acquisition skips + # XXX: I think that is why mffpy prefers get_physical_samples_from_epoch + eeg, _ = mff_reader.get_physical_samples()["EEG"] + # change dtype to float64 + eeg = eeg.astype(np.float64) # MNE expects float64 cals = np.concatenate([cals, np.ones(n_extra)]) assert len(cals) == len(ch_names), (len(cals), len(ch_names)) @@ -547,37 +542,39 @@ def __init__( super().__init__( info, - preload=preload, - orig_format="single", - filenames=[file_bin], - first_samps=first_samps, - last_samps=last_samps, - raw_extras=[egi_info], + preload=eeg, # XXX: Make eager/lazy loading work again + orig_format="single", # XXX: Check if this is still correct + filenames=[input_fname], # XXX: multiple files? I need an example + first_samps=(0,), # XXX: multiple files? + last_samps=None, # XXX: multiple files? + raw_extras=(None,), # XXX: do we still need this? verbose=verbose, ) - # Annotate acquisition skips - for first, prev_last in zip( - egi_info["first_samps"][1:], egi_info["last_samps"][:-1] - ): - gap = first - prev_last - assert gap >= 0 - if gap: - annot["onset"].append((prev_last - 0.5) / egi_info["sfreq"]) - annot["duration"].append(gap / egi_info["sfreq"]) - annot["description"].append("BAD_ACQ_SKIP") - - # create events from annotations - if events_as_annotations: - for code, samples in mff_events.items(): - if code not in include: - continue - annot["onset"].extend(np.array(samples) / egi_info["sfreq"]) - annot["duration"].extend([0.0] * len(samples)) - annot["description"].extend([code] * len(samples)) - - if len(annot["onset"]): - self.set_annotations(Annotations(**annot)) + ################################## Annotations ################################# + # TODO: Annotate acquisition skips + # + # Create Annotations from events + events_xmls = fnmatch.filter(xml_files, "Events*") + if not events_xmls: + raise RuntimeError("No events found in MFF file.") + mff_events = {} + for event_file in events_xmls: + with mff_reader.directory.filepointer(event_file) as fp: + categories = mffpy.XML.from_file(fp) + mff_events[event_file] = categories.get_content()["event"] + + onsets = [] + durations = [] + descriptions = [] + mff_events = list(itertools.chain.from_iterable(mff_events.values())) + for event in mff_events: + onset_dt = event["beginTime"].astimezone(datetime.timezone.utc) + ts = (onset_dt - self.info["meas_date"]).total_seconds() + onsets.append(ts) + durations.append(event["duration"] / 1000) + descriptions.append(event["code"]) + self.set_annotations(Annotations(onsets, durations, descriptions)) def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a chunk of data.""" From 118342698751ca90a3ee0855d8bedc52e5acadf2 Mon Sep 17 00:00:00 2001 From: Scott Huberty Date: Thu, 12 Dec 2024 17:30:05 -0800 Subject: [PATCH 02/14] WIP: Refactor [ci skip] - more functional - annotate acquisition skips --- mne/io/egi/egimff.py | 405 +++++++++++++++++++++++++++++++++---------- 1 file changed, 318 insertions(+), 87 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index f3bb8933598..94044a61958 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -16,7 +16,7 @@ import numpy as np from ..._fiff.constants import FIFF -from ..._fiff.meas_info import create_info +from ..._fiff.meas_info import _empty_info, _ensure_meas_date_none_or_dt, create_info from ..._fiff.proj import setup_proj from ..._fiff.utils import _create_chs, _mult_cal_one from ...annotations import Annotations @@ -24,6 +24,7 @@ from ...evoked import EvokedArray from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn from ..base import BaseRaw +from .events import _combine_triggers, _read_events, _triage_include_exclude from .general import ( _block_r, _extract, @@ -39,11 +40,204 @@ # TODO: Running list # - [ ] Add support for reading in the PNS data # - [ ] Add tutorial for reading calibration data -# - [ ] Annotate acquisition skips # - [ ] Add support for reading in the channel status (bad channels) # - [ ] Replace _read_header with mffpy functions? +def _read_mff(input_fname): + """Read EGI MFF file.""" + mff_reader = _get_mff_reader(input_fname) + eeg = _get_eeg_data(mff_reader) + info = _get_info(mff_reader) + annotations = _get_annotations(mff_reader, info) + return eeg, info, annotations + + +def _get_mff_startdatetime(mff_reader): + """Get start datetime from mff_reader, with workaround for nanosecond precision bug.""" + try: + return mff_reader.startdatetime + except (ValueError, AttributeError): + # mffpy has a bug parsing timestamps with 9 decimal places (nanoseconds) + # Workaround: manually parse the timestamp from the info.xml file + import xml.etree.ElementTree as ET + info_file = op.join(mff_reader.directory._mffname, "info.xml") + tree = ET.parse(info_file) + root = tree.getroot() + # Handle different XML namespaces by searching for any recordTime element + time_elem = root.find(".//recordTime") or root.find(".//{*}recordTime") + if time_elem is None: + raise + time_str = time_elem.text + # Handle timestamps with up to 9 decimal places by truncating to 6 + # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" + # Both formats: +0100 (without colon) and +01:00 (with colon) + if '+' in time_str or '-' in time_str[-6:]: + # Truncate nanoseconds in decimal part (keep only 6 digits) + time_str = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', time_str) + # Python's %z can't always handle colons, so remove them + time_str = re.sub(r'([+-]\d{2}):(\d{2})$', r'\1\2', time_str) + return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f%z') + + +def _get_mff_reader(input_fname): + mffpy = _import_mffpy() + mff_reader = mffpy.Reader(input_fname) + mff_reader.set_unit("EEG", "V") # XXX: set PNS unit + return mff_reader + + +def _get_montage(mff_reader): + mffpy = _import_mffpy() + xml_files = mff_reader.directory.files_by_type[".xml"] + + # Read coordinates.xml for fiducial positions + coords_fname = fnmatch.filter(xml_files, "coordinates") + coords_sensors = dict() + if len(coords_fname) == 1: + with mff_reader.directory.filepointer(coords_fname[0]) as fp: + coords_content = mffpy.XML.from_file(fp).get_content() + coords_sensors = coords_content.get("sensors", dict()) + + n_eeg_channels = mff_reader.num_channels["EEG"] # XXX: PNS? + ch_pos = dict() + hsp_list = [] # Extra headshape points + lpa, rpa, nasion = None, None, None + + # Extract channel positions and fiducials from coordinates.xml + for ch in coords_sensors.values(): + # XXX: the y coordinate seems to be inverted? Need to investigate + # Convert from cm to m + loc = np.array([ch["x"], -(ch["y"]), ch["z"]]) / 100.0 + name = ch.get("name", "None") + + # Check if this is a fiducial point + if name == "Nasion": + nasion = loc + elif name == "Left periauricular point": lpa = loc + elif name == "Right periauricular point": + rpa = loc + elif name in REFERENCE_NAMES or "VREF" in name or "Vertex" in name: + # Reference electrode can be numbered outside EEG range (e.g., 1001) + ch_pos[name] = loc + elif ch["number"] <= n_eeg_channels: + # EEG channel + ch_name = name if name != "None" else f"E{ch['number']}" + ch_pos[ch_name] = loc + + # Convert hsp list to array if not empty + hsp = np.array(hsp_list) if hsp_list else None + + montage = make_dig_montage( + ch_pos=ch_pos, nasion=nasion, lpa=lpa, rpa=rpa, hsp=hsp, coord_frame="unknown" + ) + return montage + + +def _get_info(mff_reader): + montage = _get_montage(mff_reader) + ch_names = montage.ch_names + ch_types = ["eeg"] * len(ch_names) # XXX: refactor this when adding PNS support + meas_date_orig = _get_mff_startdatetime(mff_reader) + utc_offset = meas_date_orig.strftime("%z") + meas_date = meas_date_orig.astimezone(datetime.timezone.utc) + sfreq = mff_reader.sampling_rates["EEG"] # XXX: check PNS sfreq? + info = create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types) + info.set_montage(montage) + info.set_meas_date(meas_date) + with info._unlock(): + info["utc_offset"] = utc_offset + + # Populate reference location (loc[3:6]) for each EEG channel + # The reference is VREF (Vertex Reference), which is the last dig point + if len(info["dig"]) > 0: + ref_loc = info["dig"][-1]["r"] # VREF position + for ch in info["chs"]: + if ch["kind"] == FIFF.FIFFV_EEG_CH: + ch["loc"][3:6] = ref_loc + + return info + + +def _get_eeg_data(mff_reader): + sfreq = mff_reader.sampling_rates["EEG"] # XXX: check PNS sfreq + n_channels = mff_reader.num_channels["EEG"] # Only EEG channels, not all signal types + epochs = mff_reader.epochs + + data_blocks, start_secs, end_secs = [], [], [] + for epoch in epochs: + data_chunk, _ = mff_reader.get_physical_samples_from_epoch(epoch)["EEG"] # XXX + data_blocks.append(data_chunk) + start_secs.append(epoch.t0) + end_secs.append(epoch.t1) + + first_samp = int(start_secs[0] * sfreq) + # Calculate total samples needed based on actual chunk placements + max_end_samp = first_samp + for this_chunk, start in zip(data_blocks, start_secs): + start_samp = int(start * sfreq) + end_samp = start_samp + this_chunk.shape[1] + max_end_samp = max(max_end_samp, end_samp) + n_samps = max_end_samp - first_samp + + eeg = np.zeros((n_channels, n_samps), dtype=np.float64) + for this_chunk, start in zip(data_blocks, start_secs): + start_idx = int(start * sfreq) - first_samp + end_idx = start_idx + this_chunk.shape[1] + eeg[:, start_idx:end_idx] = this_chunk + return eeg + + +def _get_gap_annotations(mff_reader): + epochs = mff_reader.epochs + start_secs = [epoch.t0 for epoch in epochs] + end_secs = [epoch.t1 for epoch in epochs] + gap_durations = np.array(start_secs[1:]) - np.array(end_secs[:-1]) + descriptions = ["BAD_ACQ_SKIP"] * len(gap_durations) + gap_onsets = np.array(end_secs[:-1]) + # TODO: Re-enable warning once lazy loading is properly implemented + # The warning should be raised during data access, not during __init__ + # if len(gap_durations) > 0: + # warn( + # "Acquisition skips detected. EGI MFF file contains gaps between " + # "recording epochs.", + # RuntimeWarning, + # ) + gap_annots = Annotations(gap_onsets, gap_durations, descriptions) + return gap_annots + + +def _get_event_annotations(mff_reader, mne_info): + mffpy = _import_mffpy() + xml_files = mff_reader.directory.files_by_type[".xml"] + events_xmls = fnmatch.filter(xml_files, "Events*") + if not events_xmls: + raise RuntimeError("No events found in MFF file.") + mff_events = {} + for event_file in events_xmls: + with mff_reader.directory.filepointer(event_file) as fp: + categories = mffpy.XML.from_file(fp) + mff_events[event_file] = categories.get_content()["event"] + onsets = [] + descriptions = [] + mff_events = list(itertools.chain.from_iterable(mff_events.values())) + for event in mff_events: + onset_dt = event["beginTime"].astimezone(datetime.timezone.utc) + ts = (onset_dt - mne_info["meas_date"]).total_seconds() + onsets.append(ts) + # XXX: we could use event["duration"] but it always seems to be 1000ms? + descriptions.append(event["code"]) + durations = [0] * len(onsets) + event_annots = Annotations(onsets, durations, descriptions) + return event_annots + + +def _get_annotations(mff_reader, mne_info): + event_annots = _get_event_annotations(mff_reader, mne_info) + gap_annots = _get_gap_annotations(mff_reader) + return event_annots + gap_annots + + def _read_mff_header(filepath): """Read mff header.""" _soft_import("defusedxml", "reading EGI MFF data") @@ -115,7 +309,7 @@ def _read_mff_header(filepath): if bad: raise RuntimeError( "EGI epoch first/last samps could not be parsed:\n" - f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}" + f'{list(epochs["first_samps"])}\n{list(epochs["last_samps"])}' ) summaryinfo.update(epochs) # index which samples in raw are actually readable from disk (i.e., not @@ -131,7 +325,6 @@ def _read_mff_header(filepath): # Add the sensor info. sensor_layout_file = op.join(filepath, "sensorLayout.xml") sensor_layout_obj = parse(sensor_layout_file) - summaryinfo["device"] = sensor_layout_obj.getElementsByTagName("name")[ 0 ].firstChild.data @@ -151,8 +344,8 @@ def _read_mff_header(filepath): n_chans = n_chans + 1 if n_chans != summaryinfo["n_channels"]: raise RuntimeError( - f"Number of defined channels ({n_chans}) did not match the " - f"expected channels ({summaryinfo['n_channels']})." + "Number of defined channels (%d) did not match the " + "expected channels (%d)" % (n_chans, summaryinfo["n_channels"]) ) # Check presence of PNS data @@ -289,7 +482,7 @@ def _read_locs(filepath, egi_info, channel_naming): fname = op.join(filepath, "coordinates.xml") if not op.exists(fname): - warn("File coordinates.xml not found, not setting channel locations") + logger.warn("File coordinates.xml not found, not setting channel locations") ch_names = [channel_naming % (i + 1) for i in range(egi_info["n_channels"])] return ch_names, None dig_ident_map = { @@ -401,8 +594,6 @@ def __init__( verbose=None, ): """Init the RawMff class.""" - mffpy = _import_mffpy() - input_fname = str( _check_fname( input_fname, @@ -413,50 +604,62 @@ def __init__( ) ) logger.info(f"Reading EGI MFF Header from {input_fname}...") - ################################### MFF Info ################################### - mff_reader = mffpy.Reader(input_fname) - mff_reader.set_unit("EEG", "V") - # mff_reader.set_unit("PNS", "V") XXX: need to test this - - meas_date = mff_reader.startdatetime.astimezone(datetime.timezone.utc) - sfreq = mff_reader.sampling_rates["EEG"] - # XXX: Can we have different sampling rates for EEG and PNS? - - xml_files = mff_reader.directory.files_by_type[".xml"] - info_files = fnmatch.filter(xml_files, "info?") - # XXX: usually info1.xml is EEG and info2.xml is PNS - if len(info_files) > 1: - raise NotImplementedError("TODO: Support for PNS data") - - ################################## Channels ################################### - with mff_reader.directory.filepointer(info_files[0]) as fp: - mff_info = mffpy.XML.from_file(fp) - _ = mff_info.generalInformation["montageName"] # XXX: Do we need this? - sensor_fname = fnmatch.filter(xml_files, "sensorLayout") - assert len(sensor_fname) == 1 # XXX: remove - sensor_fname = sensor_fname[0] - with mff_reader.directory.filepointer(sensor_fname) as fp: - sensor_layout = mffpy.XML.from_file(fp).get_content()["sensors"] - ch_pos = dict() - for ch in sensor_layout.values(): - if ch["type"] not in [0, 1]: # XXX: find out what type 2 is. Its not EEG - continue - name = f"E{ch['number']}" if ch["name"] == "None" else ch["name"] - loc = np.array([ch["x"], ch["y"], ch["z"]]) / 1000 # XXX: check units - ch_pos[name] = loc - montage = make_dig_montage(ch_pos=ch_pos, coord_frame="head") + eog = [] if eog is None else eog + misc = [] if misc is None else misc + egi_info = _read_header(input_fname) + + # Event data (for stim channels and optional STI 014) + egi_events, egi_info, mff_events = _read_events(input_fname, egi_info) + event_codes = list(egi_info["event_codes"]) + include = _triage_include_exclude(include, exclude, egi_events, egi_info) + if not events_as_annotations: + included_codes = [e for e in event_codes if e in include] + if len(included_codes): + events_ids = [] + next_id = 1 + for code in included_codes: + match = re.match(r"DIN(\d+)$", code) + if match is not None: + events_ids.append(int(match.group(1))) + else: + while next_id in events_ids: + next_id += 1 + events_ids.append(next_id) + next_id += 1 + events_ids = np.array(events_ids, int) + egi_info["new_trigger"] = _combine_triggers( + egi_events[[c in include for c in event_codes]], remapping=events_ids + ) + self.event_id = dict(zip(included_codes, events_ids)) + else: + egi_info["new_trigger"] = None + self.event_id = None + if egi_info["new_trigger"] is not None: + egi_events = np.vstack([egi_events, egi_info["new_trigger"]]) + else: + self.event_id = None + egi_info["new_trigger"] = None + assert egi_events.shape[1] == egi_info["last_samps"][-1] + + # Info and channels + meas_dt_utc = egi_info["meas_dt_local"].astimezone(datetime.timezone.utc) + info = _empty_info(egi_info["sfreq"]) + info["meas_date"] = _ensure_meas_date_none_or_dt(meas_dt_utc) + info["utc_offset"] = egi_info["utc_offset"] + info["device_info"] = dict(type=egi_info["device"]) - ################################## Samples #################################### - # XXX: This probably won't work as intended when there are acquisition skips - # XXX: I think that is why mffpy prefers get_physical_samples_from_epoch - eeg, _ = mff_reader.get_physical_samples()["EEG"] - # change dtype to float64 - eeg = eeg.astype(np.float64) # MNE expects float64 + ch_names, mon = _read_locs(input_fname, egi_info, channel_naming) + ch_names.extend(list(egi_info["event_codes"])) + n_extra = len(event_codes) + len(misc) + len(eog) + len(egi_info["pns_names"]) + if egi_info["new_trigger"] is not None: + ch_names.append("STI 014") + n_extra += 1 + ch_names.extend(egi_info["pns_names"]) + cals = _get_eeg_calibration_info(input_fname, egi_info) cals = np.concatenate([cals, np.ones(n_extra)]) assert len(cals) == len(ch_names), (len(cals), len(ch_names)) - # Actually create channels as EEG, then update stim and PNS ch_coil = FIFF.FIFFV_COIL_EEG ch_kind = FIFF.FIFFV_EEG_CH chs = _create_chs(ch_names, cals, ch_coil, ch_kind, eog, (), (), misc) @@ -483,6 +686,8 @@ def __init__( if mon is not None: info.set_montage(mon, on_missing="ignore") + + if mon is not None: ref_idx = np.flatnonzero(np.isin(mon.ch_names, REFERENCE_NAMES)) if len(ref_idx): ref_idx = ref_idx.item() @@ -494,7 +699,6 @@ def __init__( file_bin = op.join(input_fname, egi_info["eeg_fname"]) egi_info["egi_events"] = egi_events - # Check how many channels to read are from EEG keys = ("eeg", "sti", "pns") idx = dict() idx["eeg"] = np.where([ch["kind"] == FIFF.FIFFV_EEG_CH for ch in chs])[0] @@ -505,28 +709,25 @@ def __init__( for ch in chs ] )[0] - # By construction this should always be true, but check anyway if not np.array_equal( np.concatenate([idx[key] for key in keys]), np.arange(len(chs)) ): - raise ValueError( - "Currently interlacing EEG and PNS channels is not supported" - ) + raise ValueError("Currently interlacing EEG and PNS channels is not supported") + egi_info["kind_bounds"] = [0] for key in keys: egi_info["kind_bounds"].append(len(idx[key])) egi_info["kind_bounds"] = np.cumsum(egi_info["kind_bounds"]) assert egi_info["kind_bounds"][0] == 0 assert egi_info["kind_bounds"][-1] == info["nchan"] + first_samps = [0] last_samps = [egi_info["last_samps"][-1] - 1] annot = dict(onset=list(), duration=list(), description=list()) if len(idx["pns"]): - # PNS Data is present and should be read: egi_info["pns_filepath"] = op.join(input_fname, egi_info["pns_fname"]) - # Check for PNS bug immediately pns_samples = np.sum(egi_info["pns_sample_blocks"]["samples_block"]) eeg_samples = np.sum(egi_info["samples_block"]) if pns_samples == eeg_samples - 1: @@ -536,45 +737,46 @@ def __init__( annot["description"].append("BAD_EGI_PSG") elif pns_samples != eeg_samples: raise RuntimeError( - f"PNS samples ({pns_samples}) did not match EEG samples " - f"({eeg_samples})." + "PNS samples (%d) did not match EEG samples (%d)" + % (pns_samples, eeg_samples) ) super().__init__( info, - preload=eeg, # XXX: Make eager/lazy loading work again - orig_format="single", # XXX: Check if this is still correct - filenames=[input_fname], # XXX: multiple files? I need an example - first_samps=(0,), # XXX: multiple files? - last_samps=None, # XXX: multiple files? - raw_extras=(None,), # XXX: do we still need this? + preload=preload, + orig_format="single", + filenames=[file_bin], + first_samps=first_samps, + last_samps=last_samps, + raw_extras=[egi_info], verbose=verbose, ) - ################################## Annotations ################################# - # TODO: Annotate acquisition skips - # - # Create Annotations from events - events_xmls = fnmatch.filter(xml_files, "Events*") - if not events_xmls: - raise RuntimeError("No events found in MFF file.") - mff_events = {} - for event_file in events_xmls: - with mff_reader.directory.filepointer(event_file) as fp: - categories = mffpy.XML.from_file(fp) - mff_events[event_file] = categories.get_content()["event"] - - onsets = [] - durations = [] - descriptions = [] - mff_events = list(itertools.chain.from_iterable(mff_events.values())) - for event in mff_events: - onset_dt = event["beginTime"].astimezone(datetime.timezone.utc) - ts = (onset_dt - self.info["meas_date"]).total_seconds() - onsets.append(ts) - durations.append(event["duration"] / 1000) - descriptions.append(event["code"]) - self.set_annotations(Annotations(onsets, durations, descriptions)) + egi_info["has_acq_skip"] = np.any( + egi_info["first_samps"][1:] > egi_info["last_samps"][:-1] + ) + egi_info["_acq_skip_warned"] = False + + for first, prev_last in zip( + egi_info["first_samps"][1:], egi_info["last_samps"][:-1] + ): + gap = first - prev_last + assert gap >= 0 + if gap: + annot["onset"].append((prev_last - 0.5) / egi_info["sfreq"]) + annot["duration"].append(gap / egi_info["sfreq"]) + annot["description"].append("BAD_ACQ_SKIP") + + if events_as_annotations: + for code, samples in mff_events.items(): + if code not in include: + continue + annot["onset"].extend(np.array(samples) / egi_info["sfreq"]) + annot["duration"].extend([0.0] * len(samples)) + annot["description"].extend([code] * len(samples)) + + if len(annot["onset"]): + self.set_annotations(Annotations(**annot)) def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a chunk of data.""" @@ -582,6 +784,17 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): dtype = " 0 + and not egi_info.get("_acq_skip_warned", False) + ): + warn( + "Acquisition skips detected. EGI MFF file contains gaps between " + "recording epochs.", + RuntimeWarning, + ) + egi_info["_acq_skip_warned"] = True one = np.zeros((egi_info["kind_bounds"][-1], stop - start)) # info about the binary file structure @@ -968,4 +1181,22 @@ def _import_mffpy(why="read averaged .mff files"): msg = f"mffpy is required to {why}, got:\n{exp}" raise ImportError(msg) + # Monkey-patch mffpy to handle timestamps with 9 decimal places (nanoseconds) + # This is needed because some MFF files have timestamps like + # "2006-04-28T15:32:00.000000000+0100" which Python's %f can't parse + if not hasattr(mffpy.XML, '_mne_patched'): + original_parse_time_str = mffpy.XML._parse_time_str + + @classmethod + def _patched_parse_time_str(cls, txt): + """Parse time string with support for 9-decimal nanoseconds.""" + # Truncate nanoseconds to 6 decimal places if present + # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" + if txt and '.' in txt: + txt = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', txt) + return original_parse_time_str(txt) + + mffpy.XML._parse_time_str = _patched_parse_time_str + mffpy.XML._mne_patched = True + return mffpy From af049bae62c60d2a87f33e6bf77b1e7b6d00274a Mon Sep 17 00:00:00 2001 From: Pragnya Date: Tue, 24 Feb 2026 21:33:20 +0530 Subject: [PATCH 03/14] FIX: emit warning for missing coordinates.xml --- mne/io/egi/egimff.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 94044a61958..b610838407f 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -482,7 +482,7 @@ def _read_locs(filepath, egi_info, channel_naming): fname = op.join(filepath, "coordinates.xml") if not op.exists(fname): - logger.warn("File coordinates.xml not found, not setting channel locations") + warn("File coordinates.xml not found, not setting channel locations") ch_names = [channel_naming % (i + 1) for i in range(egi_info["n_channels"])] return ch_names, None dig_ident_map = { From 8887e021101ea7e23e279cfd9282c3357e994cca Mon Sep 17 00:00:00 2001 From: Pragnya Date: Wed, 25 Feb 2026 17:50:57 +0530 Subject: [PATCH 04/14] Add author entry to names.inc (hygiene) --- doc/changes/names.inc | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/changes/names.inc b/doc/changes/names.inc index 79db14d44e3..90e3bfd2a75 100644 --- a/doc/changes/names.inc +++ b/doc/changes/names.inc @@ -258,6 +258,7 @@ .. _Pierre-Antoine Bannier: https://github.com/PABannier .. _Ping-Keng Jao: https://github.com/nafraw .. _Proloy Das: https://github.com/proloyd +.. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal .. _Qian Chu: https://github.com/qian-chu .. _Qianliang Li: https://www.dtu.dk/english/service/phonebook/person?id=126774 .. _Quentin Barthélemy: https://github.com/qbarthelemy From 3022ebe070501e51318977b5e9b579391ea88ddb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:22:20 +0000 Subject: [PATCH 05/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- doc/changes/names.inc | 2 +- mne/io/egi/egimff.py | 53 ++++++++++++++++++++++++------------------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/doc/changes/names.inc b/doc/changes/names.inc index 90e3bfd2a75..af539af0e30 100644 --- a/doc/changes/names.inc +++ b/doc/changes/names.inc @@ -257,8 +257,8 @@ .. _Pierre Guetschel: https://github.com/PierreGtch .. _Pierre-Antoine Bannier: https://github.com/PABannier .. _Ping-Keng Jao: https://github.com/nafraw -.. _Proloy Das: https://github.com/proloyd .. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal +.. _Proloy Das: https://github.com/proloyd .. _Qian Chu: https://github.com/qian-chu .. _Qianliang Li: https://www.dtu.dk/english/service/phonebook/person?id=126774 .. _Quentin Barthélemy: https://github.com/qbarthelemy diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index b610838407f..f1f09f51d27 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -61,6 +61,7 @@ def _get_mff_startdatetime(mff_reader): # mffpy has a bug parsing timestamps with 9 decimal places (nanoseconds) # Workaround: manually parse the timestamp from the info.xml file import xml.etree.ElementTree as ET + info_file = op.join(mff_reader.directory._mffname, "info.xml") tree = ET.parse(info_file) root = tree.getroot() @@ -72,12 +73,12 @@ def _get_mff_startdatetime(mff_reader): # Handle timestamps with up to 9 decimal places by truncating to 6 # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" # Both formats: +0100 (without colon) and +01:00 (with colon) - if '+' in time_str or '-' in time_str[-6:]: + if "+" in time_str or "-" in time_str[-6:]: # Truncate nanoseconds in decimal part (keep only 6 digits) - time_str = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', time_str) + time_str = re.sub(r"\.(\d{6})\d+([+-])", r".\1\2", time_str) # Python's %z can't always handle colons, so remove them - time_str = re.sub(r'([+-]\d{2}):(\d{2})$', r'\1\2', time_str) - return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f%z') + time_str = re.sub(r"([+-]\d{2}):(\d{2})$", r"\1\2", time_str) + return datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%f%z") def _get_mff_reader(input_fname): @@ -90,7 +91,7 @@ def _get_mff_reader(input_fname): def _get_montage(mff_reader): mffpy = _import_mffpy() xml_files = mff_reader.directory.files_by_type[".xml"] - + # Read coordinates.xml for fiducial positions coords_fname = fnmatch.filter(xml_files, "coordinates") coords_sensors = dict() @@ -98,23 +99,24 @@ def _get_montage(mff_reader): with mff_reader.directory.filepointer(coords_fname[0]) as fp: coords_content = mffpy.XML.from_file(fp).get_content() coords_sensors = coords_content.get("sensors", dict()) - + n_eeg_channels = mff_reader.num_channels["EEG"] # XXX: PNS? ch_pos = dict() hsp_list = [] # Extra headshape points lpa, rpa, nasion = None, None, None - + # Extract channel positions and fiducials from coordinates.xml for ch in coords_sensors.values(): # XXX: the y coordinate seems to be inverted? Need to investigate # Convert from cm to m loc = np.array([ch["x"], -(ch["y"]), ch["z"]]) / 100.0 name = ch.get("name", "None") - + # Check if this is a fiducial point if name == "Nasion": nasion = loc - elif name == "Left periauricular point": lpa = loc + elif name == "Left periauricular point": + lpa = loc elif name == "Right periauricular point": rpa = loc elif name in REFERENCE_NAMES or "VREF" in name or "Vertex" in name: @@ -124,10 +126,10 @@ def _get_montage(mff_reader): # EEG channel ch_name = name if name != "None" else f"E{ch['number']}" ch_pos[ch_name] = loc - + # Convert hsp list to array if not empty hsp = np.array(hsp_list) if hsp_list else None - + montage = make_dig_montage( ch_pos=ch_pos, nasion=nasion, lpa=lpa, rpa=rpa, hsp=hsp, coord_frame="unknown" ) @@ -147,7 +149,7 @@ def _get_info(mff_reader): info.set_meas_date(meas_date) with info._unlock(): info["utc_offset"] = utc_offset - + # Populate reference location (loc[3:6]) for each EEG channel # The reference is VREF (Vertex Reference), which is the last dig point if len(info["dig"]) > 0: @@ -155,13 +157,15 @@ def _get_info(mff_reader): for ch in info["chs"]: if ch["kind"] == FIFF.FIFFV_EEG_CH: ch["loc"][3:6] = ref_loc - + return info def _get_eeg_data(mff_reader): sfreq = mff_reader.sampling_rates["EEG"] # XXX: check PNS sfreq - n_channels = mff_reader.num_channels["EEG"] # Only EEG channels, not all signal types + n_channels = mff_reader.num_channels[ + "EEG" + ] # Only EEG channels, not all signal types epochs = mff_reader.epochs data_blocks, start_secs, end_secs = [], [], [] @@ -179,7 +183,7 @@ def _get_eeg_data(mff_reader): end_samp = start_samp + this_chunk.shape[1] max_end_samp = max(max_end_samp, end_samp) n_samps = max_end_samp - first_samp - + eeg = np.zeros((n_channels, n_samps), dtype=np.float64) for this_chunk, start in zip(data_blocks, start_secs): start_idx = int(start * sfreq) - first_samp @@ -309,7 +313,7 @@ def _read_mff_header(filepath): if bad: raise RuntimeError( "EGI epoch first/last samps could not be parsed:\n" - f'{list(epochs["first_samps"])}\n{list(epochs["last_samps"])}' + f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}" ) summaryinfo.update(epochs) # index which samples in raw are actually readable from disk (i.e., not @@ -628,7 +632,8 @@ def __init__( next_id += 1 events_ids = np.array(events_ids, int) egi_info["new_trigger"] = _combine_triggers( - egi_events[[c in include for c in event_codes]], remapping=events_ids + egi_events[[c in include for c in event_codes]], + remapping=events_ids, ) self.event_id = dict(zip(included_codes, events_ids)) else: @@ -712,7 +717,9 @@ def __init__( if not np.array_equal( np.concatenate([idx[key] for key in keys]), np.arange(len(chs)) ): - raise ValueError("Currently interlacing EEG and PNS channels is not supported") + raise ValueError( + "Currently interlacing EEG and PNS channels is not supported" + ) egi_info["kind_bounds"] = [0] for key in keys: @@ -1184,18 +1191,18 @@ def _import_mffpy(why="read averaged .mff files"): # Monkey-patch mffpy to handle timestamps with 9 decimal places (nanoseconds) # This is needed because some MFF files have timestamps like # "2006-04-28T15:32:00.000000000+0100" which Python's %f can't parse - if not hasattr(mffpy.XML, '_mne_patched'): + if not hasattr(mffpy.XML, "_mne_patched"): original_parse_time_str = mffpy.XML._parse_time_str - + @classmethod def _patched_parse_time_str(cls, txt): """Parse time string with support for 9-decimal nanoseconds.""" # Truncate nanoseconds to 6 decimal places if present # e.g., "2017-09-20T09:55:44.072000000+01:00" -> "2017-09-20T09:55:44.072000+01:00" - if txt and '.' in txt: - txt = re.sub(r'\.(\d{6})\d+([+-])', r'.\1\2', txt) + if txt and "." in txt: + txt = re.sub(r"\.(\d{6})\d+([+-])", r".\1\2", txt) return original_parse_time_str(txt) - + mffpy.XML._parse_time_str = _patched_parse_time_str mffpy.XML._mne_patched = True From 9531d0405fd74ded3c6bad94338941cca6df1914 Mon Sep 17 00:00:00 2001 From: Pragnya Date: Wed, 25 Feb 2026 17:50:57 +0530 Subject: [PATCH 06/14] Add author entry to names.inc (hygiene) --- doc/changes/names.inc | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/changes/names.inc b/doc/changes/names.inc index 79db14d44e3..90e3bfd2a75 100644 --- a/doc/changes/names.inc +++ b/doc/changes/names.inc @@ -258,6 +258,7 @@ .. _Pierre-Antoine Bannier: https://github.com/PABannier .. _Ping-Keng Jao: https://github.com/nafraw .. _Proloy Das: https://github.com/proloyd +.. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal .. _Qian Chu: https://github.com/qian-chu .. _Qianliang Li: https://www.dtu.dk/english/service/phonebook/person?id=126774 .. _Quentin Barthélemy: https://github.com/qbarthelemy From c9140ee0601eaea64b534e7d04b68aa6024809b5 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Thu, 26 Feb 2026 18:41:36 +0530 Subject: [PATCH 07/14] final fix after complete migration reviews --- doc/changes/dev/#13684.newfeature.rst | 1 + mne/io/egi/egimff.py | 598 +++++++++++--------------- mne/io/egi/events.py | 153 +------ 3 files changed, 256 insertions(+), 496 deletions(-) create mode 100644 doc/changes/dev/#13684.newfeature.rst diff --git a/doc/changes/dev/#13684.newfeature.rst b/doc/changes/dev/#13684.newfeature.rst new file mode 100644 index 00000000000..ccf2f875f49 --- /dev/null +++ b/doc/changes/dev/#13684.newfeature.rst @@ -0,0 +1 @@ +The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by :newcontrib:`Pragnya Khandelwal`. \ No newline at end of file diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 870f58890a2..407297f533b 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -5,7 +5,6 @@ """EGI NetStation Load Function.""" import datetime -import math import os.path as op import re from collections import OrderedDict @@ -13,6 +12,11 @@ import numpy as np +try: + import mffpy +except ImportError: + mffpy = None + from ..._fiff.constants import FIFF from ..._fiff.meas_info import _empty_info, _ensure_meas_date_none_or_dt, create_info from ..._fiff.proj import setup_proj @@ -22,12 +26,9 @@ from ...evoked import EvokedArray from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn from ..base import BaseRaw -from .events import _combine_triggers, _read_events, _triage_include_exclude +from .events import _combine_triggers, _triage_include_exclude from .general import ( - _block_r, _extract, - _get_blocks, - _get_ep_info, _get_gains, _get_signalfname, ) @@ -35,217 +36,137 @@ REFERENCE_NAMES = ("VREF", "Vertex Reference") -def _read_mff_header(filepath): - """Read mff header.""" - _soft_import("defusedxml", "reading EGI MFF data") - from defusedxml.minidom import parse +def _get_mff_reader(input_fname): + """Instantiate an mffpy Reader (hard dependency for MFF reading).""" + mffpy = _import_mffpy() + return mffpy.Reader(input_fname) - all_files = _get_signalfname(filepath) - eeg_file = all_files["EEG"]["signal"] - eeg_info_file = all_files["EEG"]["info"] - info_filepath = op.join(filepath, "info.xml") # add with filepath - tags = ["mffVersion", "recordTime"] - version_and_date = _extract(tags, filepath=info_filepath) - version = "" - if len(version_and_date["mffVersion"]): - version = version_and_date["mffVersion"][0] - - fname = op.join(filepath, eeg_file) - signal_blocks = _get_blocks(fname) - epochs = _get_ep_info(filepath) - summaryinfo = dict(eeg_fname=eeg_file, info_fname=eeg_info_file) - summaryinfo.update(signal_blocks) - # sanity check and update relevant values - record_time = version_and_date["recordTime"][0] - # e.g., - # 2018-07-30T10:47:01.021673-04:00 - # 2017-09-20T09:55:44.072000000+01:00 +def _get_mff_startdatetime(input_fname, mff_reader): + """Get robust start datetime for MFF files, handling 9-digit fractional secs.""" + try: + return mff_reader.startdatetime + except Exception: + info_filepath = op.join(str(input_fname), "info.xml") + record_time = _extract(["recordTime"], filepath=info_filepath)["recordTime"][0] + if len(record_time) > 32: + dt, tz = [record_time[:26], record_time[-6:]] + record_time = dt + tz + return datetime.datetime.strptime(record_time, "%Y-%m-%dT%H:%M:%S.%f%z") + + +def _parse_egi_datetime(time_str): + """Parse EGI time strings allowing 6 or 9 fractional second digits.""" + if time_str is None: + return None + txt = time_str.strip() g = re.match( - r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.(\d{6}(?:\d{3})?)[+-]\d{2}:\d{2}", # noqa: E501 - record_time, + r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.)(\d+)([+-]\d{2}:?\d{2})$", + txt, ) if g is None: - raise RuntimeError(f"Could not parse recordTime {repr(record_time)}") - frac = g.groups()[0] - assert len(frac) in (6, 9) and all(f.isnumeric() for f in frac) # regex - div = 1000 if len(frac) == 6 else 1000000 - for key in ("last_samps", "first_samps"): - # convert from times in µS to samples - for ei, e in enumerate(epochs[key]): - if e % div != 0: - raise RuntimeError(f"Could not parse epoch time {e}") - epochs[key][ei] = e // div - epochs[key] = np.array(epochs[key], np.uint64) - # I guess they refer to times in milliseconds? - # What we really need to do here is: - # epochs[key] *= signal_blocks['sfreq'] - # epochs[key] //= 1000 - # But that multiplication risks an overflow, so let's only multiply - # by what we need to (e.g., a sample rate of 500 means we can multiply - # by 1 and divide by 2 rather than multiplying by 500 and dividing by - # 1000) - numerator = int(signal_blocks["sfreq"]) - denominator = 1000 - this_gcd = math.gcd(numerator, denominator) - numerator = numerator // this_gcd - denominator = denominator // this_gcd - with np.errstate(over="raise"): - epochs[key] *= numerator - epochs[key] //= denominator - # Should be safe to cast to int now, which makes things later not - # upbroadcast to float - epochs[key] = epochs[key].astype(np.int64) - n_samps_block = signal_blocks["samples_block"].sum() - n_samps_epochs = (epochs["last_samps"] - epochs["first_samps"]).sum() - bad = ( - n_samps_epochs != n_samps_block - or not (epochs["first_samps"] < epochs["last_samps"]).all() - or not (epochs["first_samps"][1:] >= epochs["last_samps"][:-1]).all() - ) - if bad: - raise RuntimeError( - "EGI epoch first/last samps could not be parsed:\n" - f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}" - ) - summaryinfo.update(epochs) - # index which samples in raw are actually readable from disk (i.e., not - # in a skip) - disk_samps = np.full(epochs["last_samps"][-1], -1) - offset = 0 - for first, last in zip(epochs["first_samps"], epochs["last_samps"]): - n_this = last - first - disk_samps[first:last] = np.arange(offset, offset + n_this) - offset += n_this - summaryinfo["disk_samps"] = disk_samps + return datetime.datetime.strptime(txt, "%Y-%m-%dT%H:%M:%S.%f%z") + prefix, frac, tz = g.groups() + frac = (frac[:6]).ljust(6, "0") + return datetime.datetime.strptime(prefix + frac + tz, "%Y-%m-%dT%H:%M:%S.%f%z") - # Add the sensor info. - sensor_layout_file = op.join(filepath, "sensorLayout.xml") - sensor_layout_obj = parse(sensor_layout_file) - summaryinfo["device"] = sensor_layout_obj.getElementsByTagName("name")[ - 0 - ].firstChild.data +def _get_info_from_mff_reader(input_fname, mff_reader): + """Build EGI info dict from mffpy.Reader metadata.""" + input_fname = str(input_fname) + sfreq_dict = mff_reader.sampling_rates + sfreq = float(sfreq_dict.get("EEG", next(iter(sfreq_dict.values())))) + meas_dt_local = _get_mff_startdatetime(input_fname, mff_reader) + + all_files = _get_signalfname(input_fname) + eeg_file = all_files["EEG"]["signal"] + eeg_info_file = all_files["EEG"]["info"] + + # Parse channel metadata from sensorLayout.xml + _soft_import("defusedxml", "reading EGI MFF data") + from defusedxml.minidom import parse + + sensor_layout_file = op.join(input_fname, "sensorLayout.xml") + sensor_layout_obj = parse(sensor_layout_file) + device = sensor_layout_obj.getElementsByTagName("name")[0].firstChild.data sensors = sensor_layout_obj.getElementsByTagName("sensor") - chan_type = list() - chan_unit = list() + + chan_type = [] + chan_unit = [] + numbers = [] n_chans = 0 - numbers = list() # used for identification for sensor in sensors: sensortype = int(sensor.getElementsByTagName("type")[0].firstChild.data) if sensortype in [0, 1]: - sn = sensor.getElementsByTagName("number")[0].firstChild.data - sn = sn.encode() + sn = sensor.getElementsByTagName("number")[0].firstChild.data.encode() numbers.append(sn) chan_type.append("eeg") chan_unit.append("uV") - n_chans = n_chans + 1 - if n_chans != summaryinfo["n_channels"]: - raise RuntimeError( - f"Number of defined channels ({n_chans}) did not match the " - f"expected channels ({summaryinfo['n_channels']})." - ) + n_chans += 1 + + # Collect epoch bounds and per-epoch sample counts from mffpy + first_samps = [] + last_samps = [] + samples_block = [] + pns_samples_block = [] + for ei in range(len(mff_reader.epochs)): + epoch = mff_reader.epochs[ei] + data_epoch = mff_reader.get_physical_samples_from_epoch(epoch) + eeg_samples = int(data_epoch["EEG"][0].shape[1]) + first = int(np.round(epoch.t0 * sfreq)) + last = first + eeg_samples + first_samps.append(first) + last_samps.append(last) + samples_block.append(eeg_samples) + + pns_arr = data_epoch.get("PNSData") + pns_samples_block.append(0 if pns_arr is None else int(pns_arr[0].shape[1])) + + first_samps = np.array(first_samps, dtype=np.int64) + last_samps = np.array(last_samps, dtype=np.int64) + samples_block = np.array(samples_block, dtype=np.int64) + pns_samples_block = np.array(pns_samples_block, dtype=np.int64) + + # index which samples in raw are actually readable from disk (i.e., not in a skip) + disk_samps = np.full(last_samps[-1], -1, dtype=np.int64) + offset = 0 + for first, last in zip(first_samps, last_samps): + n_this = last - first + disk_samps[first:last] = np.arange(offset, offset + n_this) + offset += n_this - # Check presence of PNS data + # Parse PNS channel metadata if present pns_names = [] + pns_types = [] + pns_units = [] + pns_fname = None if "PNS" in all_files: - pns_fpath = op.join(filepath, all_files["PNS"]["signal"]) - pns_blocks = _get_blocks(pns_fpath) - pns_samples = pns_blocks["samples_block"] - signal_samples = signal_blocks["samples_block"] - same_blocks = np.array_equal( - pns_samples[:-1], signal_samples[:-1] - ) and pns_samples[-1] in (signal_samples[-1] - np.arange(2)) - if not same_blocks: - raise RuntimeError( - "PNS and signals samples did not match:\n" - f"{list(pns_samples)}\nvs\n{list(signal_samples)}" - ) - - pns_file = op.join(filepath, "pnsSet.xml") - pns_obj = parse(pns_file) - sensors = pns_obj.getElementsByTagName("sensor") - pns_types = [] - pns_units = [] - for sensor in sensors: - # sensor number: - # sensor.getElementsByTagName('number')[0].firstChild.data - name = sensor.getElementsByTagName("name")[0].firstChild.data - unit_elem = sensor.getElementsByTagName("unit")[0].firstChild - unit = "" - if unit_elem is not None: - unit = unit_elem.data - - if name == "ECG": - ch_type = "ecg" - elif "EMG" in name: - ch_type = "emg" - else: - ch_type = "bio" - pns_types.append(ch_type) - pns_units.append(unit) - pns_names.append(name) - - summaryinfo.update( - pns_types=pns_types, - pns_units=pns_units, - pns_fname=all_files["PNS"]["signal"], - pns_sample_blocks=pns_blocks, - ) - summaryinfo.update( - pns_names=pns_names, - version=version, - date=version_and_date["recordTime"][0], - chan_type=chan_type, - chan_unit=chan_unit, - numbers=numbers, - ) - - return summaryinfo - - -def _read_header(input_fname): - """Obtain the headers from the file package mff. - - Parameters - ---------- - input_fname : path-like - Path for the file - - Returns - ------- - info : dict - Main headers set. - """ - input_fname = str(input_fname) # cast to str any Paths - mff_hdr = _read_mff_header(input_fname) - with open(input_fname + "/signal1.bin", "rb") as fid: - version = np.fromfile(fid, np.int32, 1)[0] - """ - the datetime.strptime .f directive (milleseconds) - will only accept up to 6 digits. if there are more than - six millesecond digits in the provided timestamp string - (i.e. because of trailing zeros, as in test_egi_pns.mff) - then slice both the first 26 elements and the last 6 - elements of the timestamp string to truncate the - milleseconds to 6 digits and extract the timezone, - and then piece these together and assign back to mff_hdr['date'] - """ - if len(mff_hdr["date"]) > 32: - dt, tz = [mff_hdr["date"][:26], mff_hdr["date"][-6:]] - mff_hdr["date"] = dt + tz - - time_n = datetime.datetime.strptime(mff_hdr["date"], "%Y-%m-%dT%H:%M:%S.%f%z") + pns_fname = all_files["PNS"]["signal"] + pns_file = op.join(input_fname, "pnsSet.xml") + if op.exists(pns_file): + pns_obj = parse(pns_file) + pns_sensors = pns_obj.getElementsByTagName("sensor") + for sensor in pns_sensors: + name = sensor.getElementsByTagName("name")[0].firstChild.data + unit_elem = sensor.getElementsByTagName("unit")[0].firstChild + unit = "" if unit_elem is None else unit_elem.data + if name == "ECG": + ch_type = "ecg" + elif "EMG" in name: + ch_type = "emg" + else: + ch_type = "bio" + pns_names.append(name) + pns_types.append(ch_type) + pns_units.append(unit) info = dict( - version=version, - meas_dt_local=time_n, - utc_offset=time_n.strftime("%z"), + version=0, + meas_dt_local=meas_dt_local, + utc_offset=meas_dt_local.strftime("%z"), gain=0, bits=0, value_range=0, - ) - info.update( n_categories=0, n_segments=1, n_events=0, @@ -253,11 +174,94 @@ def _read_header(input_fname): category_names=[], category_lengths=[], pre_baseline=0, + sfreq=sfreq, + n_channels=n_chans, + eeg_fname=eeg_file, + info_fname=eeg_info_file, + device=device, + chan_type=chan_type, + chan_unit=chan_unit, + numbers=numbers, + first_samps=first_samps, + last_samps=last_samps, + samples_block=samples_block, + disk_samps=disk_samps, + pns_names=pns_names, + pns_types=pns_types, + pns_units=pns_units, + pns_fname=pns_fname, + pns_sample_blocks={ + "n_channels": len(pns_names), + "samples_block": pns_samples_block, + }, + mff_path=input_fname, ) - info.update(mff_hdr) return info +def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): + """Read event tracks using mffpy XML parsing and return dense event matrix.""" + from mffpy.xml_files import EventTrack, XML + + mff_events = OrderedDict() + basenames = mff_reader.directory.listdir() + for basename in basenames: + lower_name = basename.lower() + if not lower_name.endswith(".xml") or basename.startswith("._"): + continue + stem = Path(basename).stem + try: + with mff_reader.directory.filepointer(stem) as fp: + xml_obj = XML.from_file(fp, recover=False) + except Exception as err: + if "XMLSyntaxError" in type(err).__name__: + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + if not isinstance(xml_obj, EventTrack): + continue + try: + events_iter = xml_obj.events + for event in events_iter: + code = event.get("code") or event.get("label") or xml_obj.name + begin_time = event.get("beginTime") + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + except Exception: + _soft_import("defusedxml", "reading EGI MFF event tracks") + from defusedxml import ElementTree as ET + + xml_path = op.join(str(input_fname), basename) + try: + root = ET.parse(xml_path).getroot() + except Exception as err: + if "ParseError" in type(err).__name__ or "XMLSyntaxError" in type(err).__name__: + warn(f"Could not parse the XML file {basename}. Skipping it.") + continue + for event_el in root.iter(): + if event_el.tag.split("}")[-1] != "event": + continue + event_fields = {} + for child in event_el: + event_fields[child.tag.split("}")[-1]] = child.text + code = event_fields.get("code") or event_fields.get("label") or xml_obj.name + begin_time = _parse_egi_datetime(event_fields.get("beginTime")) + if code is None or begin_time is None: + continue + sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + if 0 <= sample < n_samples: + mff_events.setdefault(code, []).append(sample) + + event_codes = list(mff_events.keys()) + egi_events = np.zeros((len(event_codes), n_samples)) + for event_idx, code in enumerate(event_codes): + if len(mff_events[code]): + egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 + return egi_events, event_codes, mff_events + + def _get_eeg_calibration_info(filepath, egi_info): """Calculate calibration info for EEG channels.""" gains = _get_gains(op.join(filepath, egi_info["info_fname"])) @@ -402,17 +406,25 @@ def __init__( ) ) logger.info(f"Reading EGI MFF Header from {input_fname}...") - egi_info = _read_header(input_fname) + mff_reader = _get_mff_reader(input_fname) + egi_info = _get_info_from_mff_reader(input_fname, mff_reader) if eog is None: eog = [] if misc is None: misc = np.where(np.array(egi_info["chan_type"]) != "eeg")[0].tolist() logger.info(" Reading events ...") - egi_events, egi_info, mff_events = _read_events(input_fname, egi_info) + egi_events, event_codes, mff_events = _read_mff_events( + input_fname, + mff_reader, + egi_info["sfreq"], + egi_info["last_samps"][-1], + egi_info["meas_dt_local"], + ) + egi_info["n_events"] = len(event_codes) + egi_info["event_codes"] = event_codes cals = _get_eeg_calibration_info(input_fname, egi_info) logger.info(" Assembling measurement info ...") - event_codes = egi_info["event_codes"] include = _triage_include_exclude(include, exclude, egi_events, egi_info) if egi_info["n_events"] > 0 and not events_as_annotations: logger.info(' Synthesizing trigger channel "STI 014" ...') @@ -557,15 +569,22 @@ def __init__( ) # Annotate acquisition skips + has_skips = False for first, prev_last in zip( egi_info["first_samps"][1:], egi_info["last_samps"][:-1] ): gap = first - prev_last assert gap >= 0 if gap: + has_skips = True annot["onset"].append((prev_last - 0.5) / egi_info["sfreq"]) annot["duration"].append(gap / egi_info["sfreq"]) annot["description"].append("BAD_ACQ_SKIP") + if has_skips and (not events_as_annotations) and len(mff_events): + warn( + "Acquisition skips detected. EGI MFF file contains gaps between " + "recording epochs." + ) # create events from annotations if events_as_annotations: @@ -589,162 +608,57 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): # info about the binary file structure n_channels = egi_info["n_channels"] - samples_block = egi_info["samples_block"] + mff_reader = _get_mff_reader(egi_info["mff_path"]) + first_samps = egi_info["first_samps"] + last_samps = egi_info["last_samps"] # Check how many channels to read are from each type bounds = egi_info["kind_bounds"] + if idx is None: + idx = np.arange(bounds[-1]) if isinstance(idx, slice): - idx = np.arange(idx.start, idx.stop) + idx = np.arange(bounds[-1])[idx] + idx = np.array(idx, dtype=int) eeg_out = np.where(idx < bounds[1])[0] - eeg_one = idx[eeg_out, np.newaxis] + eeg_one = idx[eeg_out] eeg_in = idx[eeg_out] stim_out = np.where((idx >= bounds[1]) & (idx < bounds[2]))[0] stim_one = idx[stim_out] stim_in = idx[stim_out] - bounds[1] pns_out = np.where((idx >= bounds[2]) & (idx < bounds[3]))[0] pns_in = idx[pns_out] - bounds[2] - pns_one = idx[pns_out, np.newaxis] + pns_one = idx[pns_out] del eeg_out, stim_out, pns_out # take into account events (already extended to correct size) one[stim_one, :] = egi_info["egi_events"][stim_in, start:stop] - # Convert start and stop to limits in terms of the data - # actually on disk, plus an indexer (disk_use_idx) that populates - # the potentially larger `data` with it, taking skips into account - disk_samps = egi_info["disk_samps"][start:stop] - disk_use_idx = np.where(disk_samps > -1)[0] - # short circuit in case we don't need any samples - if not len(disk_use_idx): - _mult_cal_one(data, one, idx, cals, mult) - return - - start = disk_samps[disk_use_idx[0]] - stop = disk_samps[disk_use_idx[-1]] + 1 - assert len(disk_use_idx) == stop - start - - # Get starting/stopping block/samples - block_samples_offset = np.cumsum(samples_block) - offset_blocks = np.sum(block_samples_offset <= start) - offset_samples = start - ( - block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 - ) - - # TODO: Refactor this reading with the PNS reading in a single function - # (DRY) - samples_to_read = stop - start - with open(self.filenames[fi], "rb", buffering=0) as fid: - # Go to starting block - current_block = 0 - current_block_info = None - current_data_sample = 0 - while current_block < offset_blocks: - this_block_info = _block_r(fid) - if this_block_info is not None: - current_block_info = this_block_info - fid.seek(current_block_info["block_size"], 1) - current_block += 1 - - # Start reading samples - while samples_to_read > 0: - logger.debug(f" Reading from block {current_block}") - this_block_info = _block_r(fid) - current_block += 1 - if this_block_info is not None: - current_block_info = this_block_info - - to_read = current_block_info["nsamples"] * current_block_info["nc"] - block_data = np.fromfile(fid, dtype, to_read) - block_data = block_data.reshape(n_channels, -1, order="C") - - # Compute indexes - samples_read = block_data.shape[1] - logger.debug(f" Read {samples_read} samples") - logger.debug(f" Offset {offset_samples} samples") - if offset_samples > 0: - # First block read, skip to the offset: - block_data = block_data[:, offset_samples:] - samples_read = samples_read - offset_samples - offset_samples = 0 - if samples_to_read < samples_read: - # Last block to read, skip the last samples - block_data = block_data[:, :samples_to_read] - samples_read = samples_to_read - logger.debug(f" Keep {samples_read} samples") - - s_start = current_data_sample - s_end = s_start + samples_read - - one[eeg_one, disk_use_idx[s_start:s_end]] = block_data[eeg_in] - samples_to_read = samples_to_read - samples_read - current_data_sample = current_data_sample + samples_read - - if len(pns_one) > 0: - # PNS Data is present and should be read: - pns_filepath = egi_info["pns_filepath"] - pns_info = egi_info["pns_sample_blocks"] - n_channels = pns_info["n_channels"] - samples_block = pns_info["samples_block"] - - # Get starting/stopping block/samples - block_samples_offset = np.cumsum(samples_block) - offset_blocks = np.sum(block_samples_offset < start) - offset_samples = start - ( - block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 - ) - - samples_to_read = stop - start - with open(pns_filepath, "rb", buffering=0) as fid: - # Check file size - fid.seek(0, 2) - file_size = fid.tell() - fid.seek(0) - # Go to starting block - current_block = 0 - current_block_info = None - current_data_sample = 0 - while current_block < offset_blocks: - this_block_info = _block_r(fid) - if this_block_info is not None: - current_block_info = this_block_info - fid.seek(current_block_info["block_size"], 1) - current_block += 1 - - # Start reading samples - while samples_to_read > 0: - if samples_to_read == 1 and fid.tell() == file_size: - # We are in the presence of the EEG bug - # fill with zeros and break the loop - one[pns_one, -1] = 0 - break - - this_block_info = _block_r(fid) - if this_block_info is not None: - current_block_info = this_block_info - - to_read = current_block_info["nsamples"] * current_block_info["nc"] - block_data = np.fromfile(fid, dtype, to_read) - block_data = block_data.reshape(n_channels, -1, order="C") - - # Compute indexes - samples_read = block_data.shape[1] - if offset_samples > 0: - # First block read, skip to the offset: - block_data = block_data[:, offset_samples:] - samples_read = samples_read - offset_samples - offset_samples = 0 - - if samples_to_read < samples_read: - # Last block to read, skip the last samples - block_data = block_data[:, :samples_to_read] - samples_read = samples_to_read - - s_start = current_data_sample - s_end = s_start + samples_read - - one[pns_one, disk_use_idx[s_start:s_end]] = block_data[pns_in] - samples_to_read = samples_to_read - samples_read - current_data_sample = current_data_sample + samples_read + # Read only overlapping epoch segments from mffpy (keeps skips as zeros) + for epoch_idx, (first, last) in enumerate(zip(first_samps, last_samps)): + overlap_start = max(start, first) + overlap_stop = min(stop, last) + if overlap_stop <= overlap_start: + continue + + epoch = mff_reader.epochs[epoch_idx] + epoch_data = mff_reader.get_physical_samples_from_epoch(epoch) + eeg_block = epoch_data["EEG"][0][:n_channels] + src_start = overlap_start - first + src_stop = overlap_stop - first + dst_start = overlap_start - start + dst_stop = overlap_stop - start + + if len(eeg_one): + one[eeg_one, dst_start:dst_stop] = eeg_block[eeg_in, src_start:src_stop] + + if len(pns_one) and "PNSData" in epoch_data: + pns_block = epoch_data["PNSData"][0] + src_stop_pns = min(src_stop, pns_block.shape[1]) + if src_stop_pns > src_start: + dst_stop_pns = dst_start + (src_stop_pns - src_start) + one[pns_one, dst_start:dst_stop_pns] = pns_block[ + pns_in, src_start:src_stop_pns + ] # do the calibration _mult_cal_one(data, one, idx, cals, mult) @@ -844,10 +758,8 @@ def read_evokeds_mff( def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): """Read evoked data from MFF file.""" - import mffpy - - egi_info = _read_header(fname) mff = mffpy.Reader(fname) + egi_info = _get_info_from_mff_reader(str(fname), mff) categories = mff.categories.categories if isinstance(condition, str): @@ -965,10 +877,8 @@ def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): def _import_mffpy(why="read averaged .mff files"): """Import and return module mffpy.""" - try: - import mffpy - except ImportError as exp: - msg = f"mffpy is required to {why}, got:\n{exp}" + if mffpy is None: + msg = f"mffpy is required to {why}." raise ImportError(msg) return mffpy diff --git a/mne/io/egi/events.py b/mne/io/egi/events.py index c160ceb208c..540d0fb5f51 100644 --- a/mne/io/egi/events.py +++ b/mne/io/egi/events.py @@ -3,160 +3,9 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -from datetime import datetime -from glob import glob -from os.path import basename, join, splitext - import numpy as np -from ...utils import _soft_import, _validate_type, logger, warn - - -def _read_events(input_fname, info): - """Read events for the record. - - Parameters - ---------- - input_fname : path-like - The file path. - info : dict - Header info array. - """ - n_samples = info["last_samps"][-1] - mff_events, event_codes = _read_mff_events(input_fname, info["sfreq"]) - info["n_events"] = len(event_codes) - info["event_codes"] = event_codes - events = np.zeros([info["n_events"], info["n_segments"] * n_samples]) - for n, event in enumerate(event_codes): - for i in mff_events[event]: - if (i < 0) or (i >= events.shape[1]): - continue - events[n][i] = n + 1 - return events, info, mff_events - - -def _read_mff_events(filename, sfreq): - """Extract the events. - - Parameters - ---------- - filename : path-like - File path. - sfreq : float - The sampling frequency - """ - orig = {} - for xml_file in glob(join(filename, "*.xml")): - xml_type = splitext(basename(xml_file))[0] - et = _parse_xml(xml_file) - if et is not None: - orig[xml_type] = et - xml_files = orig.keys() - xml_events = [x for x in xml_files if x[:7] == "Events_"] - for item in orig["info"]: - if "recordTime" in item: - start_time = _ns2py_time(item["recordTime"]) - break - markers = [] - code = [] - for xml in xml_events: - for event in orig[xml][2:]: - event_start = _ns2py_time(event["beginTime"]) - start = (event_start - start_time).total_seconds() - if event["code"] not in code: - code.append(event["code"]) - marker = { - "name": event["code"], - "start": start, - "start_sample": int(np.trunc(start * sfreq)), - "end": start + float(event["duration"]) / 1e9, - "chan": None, - } - markers.append(marker) - events_tims = dict() - for ev in code: - trig_samp = list( - c["start_sample"] for n, c in enumerate(markers) if c["name"] == ev - ) - events_tims.update({ev: trig_samp}) - return events_tims, code - - -def _parse_xml(xml_file: str) -> list[dict[str, str]] | None: - """Parse XML file.""" - defusedxml = _soft_import("defusedxml", "reading EGI MFF data") - try: - xml = defusedxml.ElementTree.parse(xml_file) - except defusedxml.ElementTree.ParseError as e: - warn(f"Could not parse the XML file {xml_file}: {e}") - return - root = xml.getroot() - return _xml2list(root) - - -def _xml2list(root): - """Parse XML item.""" - output = [] - for element in root: - if len(element) > 0: - if element[0].tag != element[-1].tag: - output.append(_xml2dict(element)) - else: - output.append(_xml2list(element)) - - elif element.text: - text = element.text.strip() - if text: - tag = _ns(element.tag) - output.append({tag: text}) - - return output - - -def _ns(s): - """Remove namespace, but only if there is a namespace to begin with.""" - if "}" in s: - return "}".join(s.split("}")[1:]) - else: - return s - - -def _xml2dict(root): - """Use functions instead of Class. - - remove namespace based on - http://stackoverflow.com/questions/2148119 - """ - output = {} - if root.items(): - output.update(dict(root.items())) - - for element in root: - if len(element) > 0: - if len(element) == 1 or element[0].tag != element[1].tag: - one_dict = _xml2dict(element) - else: - one_dict = {_ns(element[0].tag): _xml2list(element)} - - if element.items(): - one_dict.update(dict(element.items())) - output.update({_ns(element.tag): one_dict}) - - elif element.items(): - output.update({_ns(element.tag): dict(element.items())}) - - else: - output.update({_ns(element.tag): element.text}) - return output - - -def _ns2py_time(nstime): - """Parse times.""" - nsdate = nstime[0:10] - nstime0 = nstime[11:26] - nstime00 = nsdate + " " + nstime0 - pytime = datetime.strptime(nstime00, "%Y-%m-%d %H:%M:%S.%f") - return pytime +from ...utils import _validate_type, logger, warn def _combine_triggers(data, remapping=None): From 77b71f4f8391018a654183adf0e14af4483c9bc8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 13:31:44 +0000 Subject: [PATCH 08/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- doc/changes/names.inc | 2 +- mne/io/egi/egimff.py | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/doc/changes/names.inc b/doc/changes/names.inc index ed5ff9d5a93..351d6bdae04 100644 --- a/doc/changes/names.inc +++ b/doc/changes/names.inc @@ -258,8 +258,8 @@ .. _Pierre-Antoine Bannier: https://github.com/PABannier .. _Ping-Keng Jao: https://github.com/nafraw .. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal -.. _Proloy Das: https://github.com/proloyd .. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal +.. _Proloy Das: https://github.com/proloyd .. _Qian Chu: https://github.com/qian-chu .. _Qianliang Li: https://www.dtu.dk/english/service/phonebook/person?id=126774 .. _Quentin Barthélemy: https://github.com/qbarthelemy diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 407297f533b..a6116147b51 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -201,7 +201,7 @@ def _get_info_from_mff_reader(input_fname, mff_reader): def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): """Read event tracks using mffpy XML parsing and return dense event matrix.""" - from mffpy.xml_files import EventTrack, XML + from mffpy.xml_files import XML, EventTrack mff_events = OrderedDict() basenames = mff_reader.directory.listdir() @@ -237,7 +237,10 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): try: root = ET.parse(xml_path).getroot() except Exception as err: - if "ParseError" in type(err).__name__ or "XMLSyntaxError" in type(err).__name__: + if ( + "ParseError" in type(err).__name__ + or "XMLSyntaxError" in type(err).__name__ + ): warn(f"Could not parse the XML file {basename}. Skipping it.") continue for event_el in root.iter(): @@ -246,7 +249,11 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): event_fields = {} for child in event_el: event_fields[child.tag.split("}")[-1]] = child.text - code = event_fields.get("code") or event_fields.get("label") or xml_obj.name + code = ( + event_fields.get("code") + or event_fields.get("label") + or xml_obj.name + ) begin_time = _parse_egi_datetime(event_fields.get("beginTime")) if code is None or begin_time is None: continue From 956135bae4adb98553d1ec7b3b0aec1f73af523c Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Thu, 26 Feb 2026 19:57:50 +0530 Subject: [PATCH 09/14] fixed pre-compile test issues --- doc/changes/names.inc | 2 +- mne/io/egi/egimff.py | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/doc/changes/names.inc b/doc/changes/names.inc index ed5ff9d5a93..351d6bdae04 100644 --- a/doc/changes/names.inc +++ b/doc/changes/names.inc @@ -258,8 +258,8 @@ .. _Pierre-Antoine Bannier: https://github.com/PABannier .. _Ping-Keng Jao: https://github.com/nafraw .. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal -.. _Proloy Das: https://github.com/proloyd .. _Pragnya Khandelwal: https://github.com/PragnyaKhandelwal +.. _Proloy Das: https://github.com/proloyd .. _Qian Chu: https://github.com/qian-chu .. _Qianliang Li: https://www.dtu.dk/english/service/phonebook/person?id=126774 .. _Quentin Barthélemy: https://github.com/qbarthelemy diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index 407297f533b..ad07d84a7df 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -201,7 +201,7 @@ def _get_info_from_mff_reader(input_fname, mff_reader): def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): """Read event tracks using mffpy XML parsing and return dense event matrix.""" - from mffpy.xml_files import EventTrack, XML + from mffpy.xml_files import XML, EventTrack mff_events = OrderedDict() basenames = mff_reader.directory.listdir() @@ -237,7 +237,10 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): try: root = ET.parse(xml_path).getroot() except Exception as err: - if "ParseError" in type(err).__name__ or "XMLSyntaxError" in type(err).__name__: + if ( + "ParseError" in type(err).__name__ + or "XMLSyntaxError" in type(err).__name__ + ): warn(f"Could not parse the XML file {basename}. Skipping it.") continue for event_el in root.iter(): @@ -246,7 +249,11 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): event_fields = {} for child in event_el: event_fields[child.tag.split("}")[-1]] = child.text - code = event_fields.get("code") or event_fields.get("label") or xml_obj.name + code = ( + event_fields.get("code") + or event_fields.get("label") + or xml_obj.name + ) begin_time = _parse_egi_datetime(event_fields.get("beginTime")) if code is None or begin_time is None: continue @@ -601,7 +608,6 @@ def __init__( def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a chunk of data.""" logger.debug(f"Reading MFF {start:6d} ... {stop:6d} ...") - dtype = " Date: Fri, 27 Feb 2026 12:51:02 +0530 Subject: [PATCH 10/14] removed legacy helper functions for mffpy refactor --- mne/io/egi/egimff.py | 11 +++-- mne/io/egi/general.py | 108 ------------------------------------------ 2 files changed, 7 insertions(+), 112 deletions(-) diff --git a/mne/io/egi/egimff.py b/mne/io/egi/egimff.py index ad07d84a7df..899a4518f67 100644 --- a/mne/io/egi/egimff.py +++ b/mne/io/egi/egimff.py @@ -226,7 +226,8 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): begin_time = event.get("beginTime") if code is None or begin_time is None: continue - sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + delta = (begin_time - start_dt).total_seconds() + sample = int(np.floor(delta * sfreq)) if 0 <= sample < n_samples: mff_events.setdefault(code, []).append(sample) except Exception: @@ -257,7 +258,8 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): begin_time = _parse_egi_datetime(event_fields.get("beginTime")) if code is None or begin_time is None: continue - sample = int(np.floor((begin_time - start_dt).total_seconds() * sfreq)) + delta = (begin_time - start_dt).total_seconds() + sample = int(np.floor(delta * sfreq)) if 0 <= sample < n_samples: mff_events.setdefault(code, []).append(sample) @@ -265,7 +267,8 @@ def _read_mff_events(input_fname, mff_reader, sfreq, n_samples, start_dt): egi_events = np.zeros((len(event_codes), n_samples)) for event_idx, code in enumerate(event_codes): if len(mff_events[code]): - egi_events[event_idx, np.array(mff_events[code], dtype=int)] = 1 + event_samples = np.asarray(mff_events[code], dtype=int) + egi_events[event_idx, event_samples] = 1 return egi_events, event_codes, mff_events @@ -624,7 +627,7 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): idx = np.arange(bounds[-1]) if isinstance(idx, slice): idx = np.arange(bounds[-1])[idx] - idx = np.array(idx, dtype=int) + idx = np.asarray(idx, dtype=int) eeg_out = np.where(idx < bounds[1])[0] eeg_one = idx[eeg_out] eeg_in = idx[eeg_out] diff --git a/mne/io/egi/general.py b/mne/io/egi/general.py index ed028e3e5ed..d3819b2abc5 100644 --- a/mne/io/egi/general.py +++ b/mne/io/egi/general.py @@ -50,86 +50,6 @@ def _get_gains(filepath): return gains -def _get_ep_info(filepath): - """Get epoch info.""" - _soft_import("defusedxml", "reading EGI MFF data") - from defusedxml.minidom import parse - - epochfile = filepath + "/epochs.xml" - epochlist = parse(epochfile) - epochs = epochlist.getElementsByTagName("epoch") - keys = ("first_samps", "last_samps", "first_blocks", "last_blocks") - epoch_info = {key: list() for key in keys} - for epoch in epochs: - ep_begin = int(epoch.getElementsByTagName("beginTime")[0].firstChild.data) - ep_end = int(epoch.getElementsByTagName("endTime")[0].firstChild.data) - first_block = int(epoch.getElementsByTagName("firstBlock")[0].firstChild.data) - last_block = int(epoch.getElementsByTagName("lastBlock")[0].firstChild.data) - epoch_info["first_samps"].append(ep_begin) - epoch_info["last_samps"].append(ep_end) - epoch_info["first_blocks"].append(first_block) - epoch_info["last_blocks"].append(last_block) - # Don't turn into ndarray here, keep native int because it can deal with - # huge numbers (could use np.uint64 but it's more work) - return epoch_info - - -def _get_blocks(filepath): - """Get info from meta data blocks.""" - binfile = os.path.join(filepath) - n_blocks = 0 - samples_block = [] - header_sizes = [] - n_channels = [] - sfreq = [] - # Meta data consists of: - # * 1 byte of flag (1 for meta data, 0 for data) - # * 1 byte of header size - # * 1 byte of block size - # * 1 byte of n_channels - # * n_channels bytes of offsets - # * n_channels bytes of sigfreqs? - with open(binfile, "rb") as fid: - fid.seek(0, 2) # go to end of file - file_length = fid.tell() - block_size = file_length - fid.seek(0) - position = 0 - while position < file_length: - block = _block_r(fid) - if block is None: - samples_block.append(samples_block[n_blocks - 1]) - n_blocks += 1 - fid.seek(block_size, 1) - position = fid.tell() - continue - block_size = block["block_size"] - header_size = block["header_size"] - header_sizes.append(header_size) - samples_block.append(block["nsamples"]) - n_blocks += 1 - fid.seek(block_size, 1) - sfreq.append(block["sfreq"]) - n_channels.append(block["nc"]) - position = fid.tell() - - if any([n != n_channels[0] for n in n_channels]): - raise RuntimeError("All the blocks don't have the same amount of channels.") - if any([f != sfreq[0] for f in sfreq]): - raise RuntimeError("All the blocks don't have the same sampling frequency.") - if len(samples_block) < 1: - raise RuntimeError("There seems to be no data") - samples_block = np.array(samples_block) - signal_blocks = dict( - n_channels=n_channels[0], - sfreq=sfreq[0], - n_blocks=n_blocks, - samples_block=samples_block, - header_sizes=header_sizes, - ) - return signal_blocks - - def _get_signalfname(filepath): """Get filenames.""" _soft_import("defusedxml", "reading EGI MFF data") @@ -162,31 +82,3 @@ def _get_signalfname(filepath): f"found in {filepath}:\n{infofiles_str}" ) return all_files - - -def _block_r(fid): - """Read meta data.""" - if np.fromfile(fid, dtype=np.dtype("i4"), count=1).item() != 1: # not meta - return None - header_size = np.fromfile(fid, dtype=np.dtype("i4"), count=1).item() - block_size = np.fromfile(fid, dtype=np.dtype("i4"), count=1).item() - hl = int(block_size / 4) - nc = np.fromfile(fid, dtype=np.dtype("i4"), count=1).item() - nsamples = int(hl / nc) - np.fromfile(fid, dtype=np.dtype("i4"), count=nc) # sigoffset - sigfreq = np.fromfile(fid, dtype=np.dtype("i4"), count=nc) - depth = sigfreq[0] & 0xFF - if depth != 32: - raise ValueError("I do not know how to read this MFF (depth != 32)") - sfreq = sigfreq[0] >> 8 - count = int(header_size / 4 - (4 + 2 * nc)) - np.fromfile(fid, dtype=np.dtype("i4"), count=count) # sigoffset - block = dict( - nc=nc, - hl=hl, - nsamples=nsamples, - block_size=block_size, - header_size=header_size, - sfreq=sfreq, - ) - return block From 6fbece4d4ba9e7f7e4a089334f59669503f94b67 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Fri, 27 Feb 2026 13:15:02 +0530 Subject: [PATCH 11/14] towncrier release note entry for PR #13684 --- doc/changes/dev/#13684.newfeature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/changes/dev/#13684.newfeature.rst b/doc/changes/dev/#13684.newfeature.rst index ccf2f875f49..e31d194a6fb 100644 --- a/doc/changes/dev/#13684.newfeature.rst +++ b/doc/changes/dev/#13684.newfeature.rst @@ -1 +1 @@ -The EGI MFF reader has been refactored to use the :func:`mffpy` backend, improving support for multi-stream files and high-precision metadata, by :newcontrib:`Pragnya Khandelwal`. \ No newline at end of file +The EGI MFF reader has been refactored to use the ``mffpy`` backend, improving support for multi-stream files and high-precision metadata, by :newcontrib:`Pragnya Khandelwal`. \ No newline at end of file From 915b43592cac67ce4601aa83a448f842a8729ac9 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Fri, 27 Feb 2026 13:50:21 +0530 Subject: [PATCH 12/14] Added a reusable skip marker based on mffpy --- mne/io/egi/tests/test_egi.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 261a9c80da3..945c75bc9cb 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -5,6 +5,7 @@ import os import shutil +from importlib.util import find_spec from copy import deepcopy from datetime import datetime, timezone from pathlib import Path @@ -36,6 +37,11 @@ egi_txt_evoked_cat1_fname = egi_path / "test_egi_evoked_cat1.txt" egi_txt_evoked_cat2_fname = egi_path / "test_egi_evoked_cat2.txt" +requires_mffpy = pytest.mark.skipif( + find_spec("mffpy") is None, + reason="Test requires mffpy", +) + # absolute event times from NetStation egi_pause_events = { "AM40": [7.224, 11.928, 14.413, 16.848], @@ -59,6 +65,7 @@ @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname, skip_times, event_times", [ @@ -121,6 +128,7 @@ def test_egi_mff_pause(fname, skip_times, event_times): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname", [ @@ -143,6 +151,7 @@ def test_egi_mff_pause_chunks(fname, tmp_path): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize("events_as_annotations", (True, False)) def test_io_egi_mff(events_as_annotations): """Test importing EGI MFF simple binary files.""" @@ -290,6 +299,7 @@ def test_io_egi(): @requires_testing_data +@requires_mffpy def test_io_egi_pns_mff(tmp_path): """Test importing EGI MFF with PNS data.""" pytest.importorskip("defusedxml") @@ -346,6 +356,7 @@ def test_io_egi_pns_mff(tmp_path): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize("preload", (True, False)) def test_io_egi_pns_mff_bug(preload): """Test importing EGI MFF with PNS data (BUG).""" @@ -390,6 +401,7 @@ def test_io_egi_pns_mff_bug(preload): @requires_testing_data +@requires_mffpy def test_io_egi_crop_no_preload(): """Test crop non-preloaded EGI MFF data (BUG).""" pytest.importorskip("defusedxml") @@ -503,6 +515,7 @@ def test_read_evokeds_mff_bad_input(): @requires_testing_data +@requires_mffpy def test_egi_coord_frame(): """Test that EGI coordinate frame is changed to head.""" pytest.importorskip("defusedxml") @@ -532,6 +545,7 @@ def test_egi_coord_frame(): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname, timestamp, utc_offset", [ @@ -556,6 +570,7 @@ def test_meas_date(fname, timestamp, utc_offset): @requires_testing_data +@requires_mffpy @pytest.mark.parametrize( "fname, standard_montage", [ @@ -590,6 +605,7 @@ def test_set_standard_montage_mff(fname, standard_montage): @requires_testing_data +@requires_mffpy def test_egi_mff_bad_xml(tmp_path): """Test that corrupt XML files are gracefully handled.""" pytest.importorskip("defusedxml") From e10a87d3010a41a2f99818711f57bf11289c62ad Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 08:26:02 +0000 Subject: [PATCH 13/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mne/io/egi/tests/test_egi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 945c75bc9cb..0ede82603ce 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -5,9 +5,9 @@ import os import shutil -from importlib.util import find_spec from copy import deepcopy from datetime import datetime, timezone +from importlib.util import find_spec from pathlib import Path import numpy as np From 01b6206dbc8a42b3710c5f5acf67b42b65101c70 Mon Sep 17 00:00:00 2001 From: PragnyaKhandelwal Date: Fri, 27 Feb 2026 14:03:05 +0530 Subject: [PATCH 14/14] fixing circli/ci check failure --- mne/io/egi/tests/test_egi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 945c75bc9cb..0ede82603ce 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -5,9 +5,9 @@ import os import shutil -from importlib.util import find_spec from copy import deepcopy from datetime import datetime, timezone +from importlib.util import find_spec from pathlib import Path import numpy as np