From 7e698e2a95b8049c48761affdcd8290bf12863db Mon Sep 17 00:00:00 2001 From: Eric Larson Date: Fri, 22 May 2026 14:27:57 -0400 Subject: [PATCH 1/5] ENH: Add support for reading embedded CT and FC --- doc/changes/dev/newfeature.rst | 1 + mne/_fiff/meas_info.py | 73 +++++++-- mne/_fiff/proc_history.py | 223 +++++++++++++++------------ mne/_fiff/proj.py | 6 +- mne/_fiff/tests/test_meas_info.py | 38 ++++- mne/_fiff/tests/test_proc_history.py | 4 +- mne/_fiff/write.py | 6 +- mne/cov.py | 2 +- mne/preprocessing/maxwell.py | 2 +- 9 files changed, 223 insertions(+), 132 deletions(-) create mode 100644 doc/changes/dev/newfeature.rst diff --git a/doc/changes/dev/newfeature.rst b/doc/changes/dev/newfeature.rst new file mode 100644 index 00000000000..93f3fe8d868 --- /dev/null +++ b/doc/changes/dev/newfeature.rst @@ -0,0 +1 @@ +Add support for reading cross-talk and fine-calibration embedded in FIF files during acquisition into ``raw.info["cross_talk"]`` and ``raw.info["fine_calibration"]``, respectively, by `Eric Larson`_. \ No newline at end of file diff --git a/mne/_fiff/meas_info.py b/mne/_fiff/meas_info.py index 01a36292cd7..376b6975e40 100644 --- a/mne/_fiff/meas_info.py +++ b/mne/_fiff/meas_info.py @@ -58,7 +58,12 @@ get_channel_type_constants, pick_types, ) -from .proc_history import _read_proc_history, _write_proc_history +from .proc_history import ( + _read_mf_data, + _read_proc_history, + _write_mf_data, + _write_proc_history, +) from .proj import ( Projection, _normalize_proj, @@ -1071,6 +1076,12 @@ class HeliumInfo(ValidatedDict): types="numeric", cast=float, ), + "gantry_angle": partial( + _check_types, + name='helium_info["gantry_angle"]', + types="numeric", + cast=int, + ), "helium_level": partial( _check_types, name='helium_info["helium_level"]', @@ -1314,6 +1325,8 @@ class Info(ValidatedDict, SetChannelsMixin, MontageMixin, ContainsMixin): comps : list of dict CTF software gradient compensation data. See Notes for more information. + cross_talk : dict | None + Cross-talk information added at acquisition time by MEGIN systems. ctf_head_t : Transform | None The transformation from 4D/CTF head coordinates to Neuromag head coordinates. This is only present in 4D/CTF data. @@ -1344,7 +1357,9 @@ class Info(ValidatedDict, SetChannelsMixin, MontageMixin, ContainsMixin): Name of the person that ran the experiment. file_id : dict | None The FIF globally unique ID. See Notes for more information. - gantry_angle : float | None + fine_calibration : dict | None + Fine calibration information added at acquisition time by MEGIN systems. + gantry_angle : int | None Tilt angle of the gantry in degrees. helium_info : dict | None Information about the device helium. See Notes for details. @@ -1698,6 +1713,7 @@ class Info(ValidatedDict, SetChannelsMixin, MontageMixin, ContainsMixin): "comps": "comps cannot be set directly. " "Please use method Raw.apply_gradient_compensation() " "instead.", + "cross_talk": "cross_talk cannot be set directly.", "ctf_head_t": "ctf_head_t cannot be set directly.", "custom_ref_applied": "custom_ref_applied cannot be set directly. " "Please use method inst.set_eeg_reference() " @@ -1711,6 +1727,7 @@ class Info(ValidatedDict, SetChannelsMixin, MontageMixin, ContainsMixin): "events": "events cannot be set directly.", "experimenter": partial(_check_types, name="experimenter", types=(str, None)), "file_id": "file_id cannot be set directly.", + "fine_calibration": "fine_calibration cannot be set directly.", "gantry_angle": "gantry_angle cannot be set directly.", "helium_info": partial( _check_types, name="helium_info", types=(dict, None), cast=HeliumInfo @@ -2319,7 +2336,7 @@ def _write_bad_channels(fid, bads, ch_names_mapping): ch_names_mapping = {} if ch_names_mapping is None else ch_names_mapping bads = _rename_list(bads, ch_names_mapping) start_block(fid, FIFF.FIFFB_MNE_BAD_CHANNELS) - write_name_list_sanitized(fid, FIFF.FIFF_MNE_CH_NAME_LIST, bads, "bads") + write_name_list_sanitized(fid, FIFF.FIFF_MNE_CH_NAME_LIST, bads, name="bads") end_block(fid, FIFF.FIFFB_MNE_BAD_CHANNELS) @@ -2452,7 +2469,7 @@ def read_meas_info(fid, tree, clean_bads=False, verbose=None): line_freq = float(tag.data.item()) elif kind == FIFF.FIFF_GANTRY_ANGLE: tag = read_tag(fid, pos) - gantry_angle = float(tag.data.item()) + gantry_angle = int(tag.data.item()) elif kind in [FIFF.FIFF_MNE_CUSTOM_REF, 236]: # 236 used before v0.11 tag = read_tag(fid, pos) custom_ref_applied = int(tag.data.item()) @@ -2587,6 +2604,10 @@ def read_meas_info(fid, tree, clean_bads=False, verbose=None): for k in range(hpi_meas["nent"]): kind = hpi_meas["directory"][k].kind pos = hpi_meas["directory"][k].pos + if kind == FIFF.FIFF_BLOCK_ID: + hm["block_id"] = read_tag(fid, pos).data + if kind == FIFF.FIFF_PARENT_BLOCK_ID: + hm["parent_id"] = read_tag(fid, pos).data if kind == FIFF.FIFF_CREATOR: hm["creator"] = str(read_tag(fid, pos).data) elif kind == FIFF.FIFF_SFREQ: @@ -2711,6 +2732,9 @@ def read_meas_info(fid, tree, clean_bads=False, verbose=None): if kind == FIFF.FIFF_HE_LEVEL_RAW: tag = read_tag(fid, pos) hi["he_level_raw"] = float(tag.data.item()) + elif kind == FIFF.FIFF_GANTRY_ANGLE: + tag = read_tag(fid, pos) + hi["gantry_angle"] = int(tag.data.item()) elif kind == FIFF.FIFF_HELIUM_LEVEL: tag = read_tag(fid, pos) hi["helium_level"] = float(tag.data.item()) @@ -2755,6 +2779,14 @@ def read_meas_info(fid, tree, clean_bads=False, verbose=None): hs["hpi_coils"] = hc info["hpi_subsystem"] = hs + # Read cross-talk and fine cal + cross_talk = _read_mf_data(fid, tree, kind="cross_talk") + if len(cross_talk): + info["cross_talk"] = cross_talk + fine_calibration = _read_mf_data(fid, tree, kind="fine_calibration") + if len(fine_calibration): + info["fine_calibration"] = fine_calibration + # Read processing history info["proc_history"] = _read_proc_history(fid, tree) @@ -2970,6 +3002,10 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): # HPI Measurement for hpi_meas in info["hpi_meas"]: start_block(fid, FIFF.FIFFB_HPI_MEAS) + if hpi_meas.get("block_id") is not None: + write_id(fid, FIFF.FIFF_BLOCK_ID, hpi_meas["block_id"]) + if hpi_meas.get("parent_id") is not None: + write_id(fid, FIFF.FIFF_PARENT_BLOCK_ID, hpi_meas["parent_id"]) if hpi_meas.get("creator") is not None: write_string(fid, FIFF.FIFF_CREATOR, hpi_meas["creator"]) if hpi_meas.get("sfreq") is not None: @@ -3023,13 +3059,6 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): if info["dev_ctf_t"] is not None: write_coord_trans(fid, info["dev_ctf_t"]) - # Projectors - ch_names_mapping = _make_ch_names_mapping(info["chs"]) - _write_proj(fid, info["projs"], ch_names_mapping=ch_names_mapping) - - # Bad channels - _write_bad_channels(fid, info["bads"], ch_names_mapping=ch_names_mapping) - # General if info.get("experimenter") is not None: write_string(fid, FIFF.FIFF_EXPERIMENTER, info["experimenter"]) @@ -3051,18 +3080,15 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): write_float(fid, FIFF.FIFF_HIGHPASS, info["highpass"]) if info.get("line_freq") is not None: write_float(fid, FIFF.FIFF_LINE_FREQ, info["line_freq"]) - if info.get("gantry_angle") is not None: - write_float(fid, FIFF.FIFF_GANTRY_ANGLE, info["gantry_angle"]) if data_type is not None: write_int(fid, FIFF.FIFF_DATA_PACK, data_type) + if info.get("gantry_angle") is not None: + write_int(fid, FIFF.FIFF_GANTRY_ANGLE, info["gantry_angle"]) if info.get("custom_ref_applied"): write_int(fid, FIFF.FIFF_MNE_CUSTOM_REF, info["custom_ref_applied"]) if info.get("xplotter_layout"): write_string(fid, FIFF.FIFF_XPLOTTER_LAYOUT, info["xplotter_layout"]) - # Channel information - _write_ch_infos(fid, info["chs"], reset_range, ch_names_mapping) - # Subject information if info.get("subject_info") is not None: start_block(fid, FIFF.FIFFB_SUBJECT) @@ -3090,6 +3116,16 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): end_block(fid, FIFF.FIFFB_SUBJECT) del si + # Projectors + ch_names_mapping = _make_ch_names_mapping(info["chs"]) + _write_proj(fid, info["projs"], ch_names_mapping=ch_names_mapping) + + # Channel information + _write_ch_infos(fid, info["chs"], reset_range, ch_names_mapping) + + _write_mf_data(fid, info, kind="cross_talk") + _write_mf_data(fid, info, kind="fine_calibration") + if info.get("device_info") is not None: start_block(fid, FIFF.FIFFB_DEVICE) di = info["device_info"] @@ -3106,6 +3142,8 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): hi = info["helium_info"] if hi.get("he_level_raw") is not None: write_float(fid, FIFF.FIFF_HE_LEVEL_RAW, hi["he_level_raw"]) + if hi.get("gantry_angle") is not None: + write_int(fid, FIFF.FIFF_GANTRY_ANGLE, hi["gantry_angle"]) if hi.get("helium_level") is not None: write_float(fid, FIFF.FIFF_HELIUM_LEVEL, hi["helium_level"]) if hi.get("orig_file_guid") is not None: @@ -3131,6 +3169,9 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): end_block(fid, FIFF.FIFFB_HPI_SUBSYSTEM) del hs + # Bad channels + _write_bad_channels(fid, info["bads"], ch_names_mapping=ch_names_mapping) + # CTF compensation info comps = info["comps"] if ch_names_mapping: diff --git a/mne/_fiff/proc_history.py b/mne/_fiff/proc_history.py index caa2d3de554..6c8d0d520f7 100644 --- a/mne/_fiff/proc_history.py +++ b/mne/_fiff/proc_history.py @@ -2,10 +2,12 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. +from functools import partial + import numpy as np from ..fixes import _csc_array_cast -from ..utils import _check_fname, warn +from ..utils import _check_fname, logger, warn from .constants import FIFF from .open import fiff_open, read_tag from .tag import _float_item, _int_item, find_tag @@ -203,15 +205,49 @@ def _write_proc_history(fid, info): _max_st_writers = (write_int, write_float, write_float) _max_st_casters = (_int_item, _float_item, _float_item) -_sss_ctc_keys = ("block_id", "date", "creator", "decoupler") + +def _sss_ctc_ch_name_clean(tag_data): + """Clean channel names from CTC files.""" + chs = _safe_name_list(tag_data, "read", "ch_names") + # CTC files can have null chars in the last entry, e.g.: + # [..., 'MEG2642', 'MEG2643', 'MEG2641\x00 ... \x00'] + if len(chs) > 0: + chs[-1] = chs[-1].split("\x00")[0] + return chs + + +_sss_ctc_keys = ( + "block_id", + "parent_block_id", + "date", + "creator", + "decoupler", + "ch_names", +) _sss_ctc_ids = ( FIFF.FIFF_BLOCK_ID, + FIFF.FIFF_PARENT_BLOCK_ID, FIFF.FIFF_MEAS_DATE, FIFF.FIFF_CREATOR, FIFF.FIFF_DECOUPLER_MATRIX, + FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST, +) +_sss_ctc_writers = ( + write_id, + write_id, + write_int, + write_string, + write_float_sparse, + partial(write_name_list_sanitized, name="ch_names"), +) +_sss_ctc_casters = ( + dict, + dict, + np.array, + str, + _csc_array_cast, + _sss_ctc_ch_name_clean, ) -_sss_ctc_writers = (write_id, write_int, write_string, write_float_sparse) -_sss_ctc_casters = (dict, np.array, str, _csc_array_cast) _sss_cal_keys = ("cal_chans", "cal_corrs") _sss_cal_ids = (FIFF.FIFF_SSS_CAL_CHANS, FIFF.FIFF_SSS_CAL_CORRS) @@ -219,12 +255,46 @@ def _write_proc_history(fid, info): _sss_cal_casters = (np.array, np.array) +def _write_mf_data(fid, info, *, kind): + this_data = info.get(kind, None) + if not this_data: # empty or None + return + logger.debug("Writing %s info with keys: %s", kind, list(this_data)) + del info + if kind == "cross_talk": + keys = _sss_ctc_keys + ids = _sss_ctc_ids + writers = _sss_ctc_writers + block = FIFF.FIFFB_CHANNEL_DECOUPLER + elif kind == "fine_calibration": + keys = _sss_cal_keys + ids = _sss_cal_ids + writers = _sss_cal_writers + block = FIFF.FIFFB_SSS_CAL + elif kind == "max_st": + keys = _max_st_keys + ids = _max_st_ids + writers = _max_st_writers + block = FIFF.FIFFB_SSS_ST_INFO + else: + assert kind == "sss_info" + keys = _sss_info_keys + ids = _sss_info_ids + writers = _sss_info_writers + block = FIFF.FIFFB_SSS_INFO + start_block(fid, block) + for key, id_, writer in zip(keys, ids, writers): + if key in this_data: + writer(fid, id_, this_data[key]) + end_block(fid, block) + + def _read_ctc(fname): """Read cross-talk correction matrix.""" fname = _check_fname(fname, overwrite="read", must_exist=True) f, tree, _ = fiff_open(fname) with f as fid: - sss_ctc = _read_maxfilter_record(fid, tree)["sss_ctc"] + sss_ctc = _read_mf_data(fid, tree, kind="cross_talk") bad_str = f"Invalid cross-talk FIF: {fname}" if len(sss_ctc) == 0: raise ValueError(bad_str) @@ -239,107 +309,60 @@ def _read_ctc(fname): def _read_maxfilter_record(fid, tree): """Read maxfilter processing record from file.""" - sss_info_block = dir_tree_find(tree, FIFF.FIFFB_SSS_INFO) # 502 - sss_info = dict() - if len(sss_info_block) > 0: - sss_info_block = sss_info_block[0] - for i_ent in range(sss_info_block["nent"]): - kind = sss_info_block["directory"][i_ent].kind - pos = sss_info_block["directory"][i_ent].pos - for key, id_, cast in zip(_sss_info_keys, _sss_info_ids, _sss_info_casters): - if kind == id_: - tag = read_tag(fid, pos) - sss_info[key] = cast(tag.data) - break - - max_st_block = dir_tree_find(tree, FIFF.FIFFB_SSS_ST_INFO) # 504 - max_st = dict() - if len(max_st_block) > 0: - max_st_block = max_st_block[0] - for i_ent in range(max_st_block["nent"]): - kind = max_st_block["directory"][i_ent].kind - pos = max_st_block["directory"][i_ent].pos - for key, id_, cast in zip(_max_st_keys, _max_st_ids, _max_st_casters): - if kind == id_: - tag = read_tag(fid, pos) - max_st[key] = cast(tag.data) - break + max_info = dict() + for key, kind in ( + ("sss_info", "sss_info"), + ("sss_ctc", "cross_talk"), + ("sss_cal", "fine_calibration"), + ("max_st", "max_st"), + ): + this_data = _read_mf_data(fid, tree, kind=kind) + if this_data: + max_info[key] = this_data + return max_info - sss_ctc_block = dir_tree_find(tree, FIFF.FIFFB_CHANNEL_DECOUPLER) # 501 - sss_ctc = dict() - if len(sss_ctc_block) > 0: - sss_ctc_block = sss_ctc_block[0] - for i_ent in range(sss_ctc_block["nent"]): - kind = sss_ctc_block["directory"][i_ent].kind - pos = sss_ctc_block["directory"][i_ent].pos - for key, id_, cast in zip(_sss_ctc_keys, _sss_ctc_ids, _sss_ctc_casters): - if kind == id_: - tag = read_tag(fid, pos) - sss_ctc[key] = cast(tag.data) - break - else: - if kind == FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST: - tag = read_tag(fid, pos) - chs = _safe_name_list(tag.data, "read", "proj_items_chs") - # This list can null chars in the last entry, e.g.: - # [..., 'MEG2642', 'MEG2643', 'MEG2641\x00 ... \x00'] - chs[-1] = chs[-1].split("\x00")[0] - sss_ctc["proj_items_chs"] = chs - sss_cal_block = dir_tree_find(tree, FIFF.FIFFB_SSS_CAL) # 503 - sss_cal = dict() - if len(sss_cal_block) > 0: - sss_cal_block = sss_cal_block[0] - for i_ent in range(sss_cal_block["nent"]): - kind = sss_cal_block["directory"][i_ent].kind - pos = sss_cal_block["directory"][i_ent].pos - for key, id_, cast in zip(_sss_cal_keys, _sss_cal_ids, _sss_cal_casters): +def _read_mf_data(fid, tree, *, kind): + if kind == "cross_talk": + block = FIFF.FIFFB_CHANNEL_DECOUPLER # 501 + keys = _sss_ctc_keys + ids = _sss_ctc_ids + casters = _sss_ctc_casters + elif kind == "fine_calibration": + block = FIFF.FIFFB_SSS_CAL # 503 + keys = _sss_cal_keys + ids = _sss_cal_ids + casters = _sss_cal_casters + elif kind == "max_st": + block = FIFF.FIFFB_SSS_ST_INFO # 504 + keys = _max_st_keys + ids = _max_st_ids + casters = _max_st_casters + else: + assert kind == "sss_info" + block = FIFF.FIFFB_SSS_INFO # 502 + keys = _sss_info_keys + ids = _sss_info_ids + casters = _sss_info_casters + sss_kind_block = dir_tree_find(tree, block) + sss_out = dict() + if len(sss_kind_block) > 0: + logger.debug("Reading %s info with keys: %s", kind, list(keys)) + sss_kind_block = sss_kind_block[0] + for i_ent in range(sss_kind_block["nent"]): + kind = sss_kind_block["directory"][i_ent].kind + pos = sss_kind_block["directory"][i_ent].pos + for key, id_, cast in zip(keys, ids, casters): if kind == id_: tag = read_tag(fid, pos) - sss_cal[key] = cast(tag.data) + sss_out[key] = cast(tag.data) break - - max_info = dict(sss_info=sss_info, sss_ctc=sss_ctc, sss_cal=sss_cal, max_st=max_st) - return max_info + return sss_out def _write_maxfilter_record(fid, record): """Write maxfilter processing record to file.""" - sss_info = record["sss_info"] - if len(sss_info) > 0: - start_block(fid, FIFF.FIFFB_SSS_INFO) - for key, id_, writer in zip(_sss_info_keys, _sss_info_ids, _sss_info_writers): - if key in sss_info: - writer(fid, id_, sss_info[key]) - end_block(fid, FIFF.FIFFB_SSS_INFO) - - max_st = record["max_st"] - if len(max_st) > 0: - start_block(fid, FIFF.FIFFB_SSS_ST_INFO) - for key, id_, writer in zip(_max_st_keys, _max_st_ids, _max_st_writers): - if key in max_st: - writer(fid, id_, max_st[key]) - end_block(fid, FIFF.FIFFB_SSS_ST_INFO) - - sss_ctc = record["sss_ctc"] - if len(sss_ctc) > 0: # dict has entries - start_block(fid, FIFF.FIFFB_CHANNEL_DECOUPLER) - for key, id_, writer in zip(_sss_ctc_keys, _sss_ctc_ids, _sss_ctc_writers): - if key in sss_ctc: - writer(fid, id_, sss_ctc[key]) - if "proj_items_chs" in sss_ctc: - write_name_list_sanitized( - fid, - FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST, - sss_ctc["proj_items_chs"], - "proj_items_chs", - ) - end_block(fid, FIFF.FIFFB_CHANNEL_DECOUPLER) - - sss_cal = record["sss_cal"] - if len(sss_cal) > 0: - start_block(fid, FIFF.FIFFB_SSS_CAL) - for key, id_, writer in zip(_sss_cal_keys, _sss_cal_ids, _sss_cal_writers): - if key in sss_cal: - writer(fid, id_, sss_cal[key]) - end_block(fid, FIFF.FIFFB_SSS_CAL) + _write_mf_data(fid, record, kind="sss_info") + _write_mf_data(fid, record, kind="max_st") + _write_mf_data(fid, record, kind="cross_talk") + _write_mf_data(fid, record, kind="fine_calibration") diff --git a/mne/_fiff/proj.py b/mne/_fiff/proj.py index 65125c45d3d..cf01df3fdfb 100644 --- a/mne/_fiff/proj.py +++ b/mne/_fiff/proj.py @@ -697,12 +697,10 @@ def _write_proj(fid, projs, *, ch_names_mapping=None): for proj in projs: start_block(fid, FIFF.FIFFB_PROJ_ITEM) - write_int(fid, FIFF.FIFF_NCHAN, len(proj["data"]["col_names"])) names = _rename_list(proj["data"]["col_names"], ch_names_mapping) - write_name_list_sanitized( - fid, FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST, names, "col_names" - ) + write_name_list_sanitized(fid, FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST, names) write_string(fid, FIFF.FIFF_NAME, proj["desc"]) + write_int(fid, FIFF.FIFF_NCHAN, len(proj["data"]["col_names"])) write_int(fid, FIFF.FIFF_PROJ_ITEM_KIND, proj["kind"]) if proj["kind"] == FIFF.FIFFV_PROJ_ITEM_FIELD: write_float(fid, FIFF.FIFF_PROJ_ITEM_TIME, 0.0) diff --git a/mne/_fiff/tests/test_meas_info.py b/mne/_fiff/tests/test_meas_info.py index 24a6ca04e26..3bb4caf1dc1 100644 --- a/mne/_fiff/tests/test_meas_info.py +++ b/mne/_fiff/tests/test_meas_info.py @@ -91,7 +91,9 @@ data_path = testing.data_path(download=False) sss_path = data_path / "SSS" +triux_path = data_path / "SSS" / "TRIUX" sss_ctc_fname = sss_path / "test_move_anon_crossTalk_raw_sss.fif" +tri_sss_ctc_cal_fname = triux_path / "triux_bmlhus_erm_ctc_cal_raw_sss.fif" ctf_fname = data_path / "CTF" / "testdata_ctf.ds" raw_invalid_bday_fname = data_path / "misc" / "sample_invalid_birthday_raw.fif" @@ -303,7 +305,7 @@ def test_read_write_info(tmp_path): with info._unlock(): if info["gantry_angle"] is None: # future testing data may include it - info["gantry_angle"] = 0.0 # Elekta supine position + info["gantry_angle"] = 0 # Elekta supine position gantry_angle = info["gantry_angle"] meas_id = info["meas_id"] @@ -1108,8 +1110,9 @@ def test_csr_csc(tmp_path): # CSC assert isinstance(ct, sparse.csc_array) fname = tmp_path / "test.fif" - write_info(fname, info) - info_read = read_info(fname) + write_info(fname, info, verbose="debug") + info_read = read_info(fname, verbose="debug") + assert "decoupler" in info_read["proc_history"][0]["max_info"]["sss_ctc"] ct_read = info_read["proc_history"][0]["max_info"]["sss_ctc"]["decoupler"] assert isinstance(ct_read, sparse.csc_array) assert_array_equal(ct_read.toarray(), ct.toarray()) @@ -1506,3 +1509,32 @@ def test_proj_id_entries(): info["proj_id"] = "bad" with pytest.raises(TypeError, match="must be an instance"): info["proj_id"] = np.array([123]) + + +@testing.requires_testing_data +def test_ct_fc_infd(tmp_path): + """Test that cross-talk and fine calibration info are read correctly.""" + raw = read_raw_fif(raw_fname) + raw_sss = read_raw_fif(tri_sss_ctc_cal_fname) + with raw.info._unlock(): + for key_to, key_from in ( + ("cross_talk", "sss_ctc"), + ("fine_calibration", "sss_cal"), + ): + raw.info[key_to] = raw_sss.info["proc_history"][0]["max_info"][key_from] + raw.info["cross_talk"]["parent_block_id"] = dict( + version=4, + machid=np.ones(2, int), + secs=1, + usecs=2, + ) + del raw_sss + raw.info._check_consistency() + # all entries present + assert len(raw.info["cross_talk"]) == 6 + assert len(raw.info["fine_calibration"]) == 2 + fname = tmp_path / "test-raw.fif" + raw.save(fname) + raw_read = read_raw_fif(fname) + assert_object_equal(raw.info["cross_talk"], raw_read.info["cross_talk"]) + assert_object_equal(raw.info["fine_calibration"], raw_read.info["fine_calibration"]) diff --git a/mne/_fiff/tests/test_proc_history.py b/mne/_fiff/tests/test_proc_history.py index 82f2d6262be..3f35cfa045d 100644 --- a/mne/_fiff/tests/test_proc_history.py +++ b/mne/_fiff/tests/test_proc_history.py @@ -25,9 +25,7 @@ def test_maxfilter_io(): assert mf["sss_info"]["out_order"] <= 5 assert mf["sss_info"]["nchan"] > len(mf["sss_info"]["components"]) - assert ( - info["ch_names"][: mf["sss_info"]["nchan"]] == mf["sss_ctc"]["proj_items_chs"] - ) + assert info["ch_names"][: mf["sss_info"]["nchan"]] == mf["sss_ctc"]["ch_names"] assert mf["sss_ctc"]["decoupler"].shape == ( mf["sss_info"]["nchan"], mf["sss_info"]["nchan"], diff --git a/mne/_fiff/write.py b/mne/_fiff/write.py index 8486ca13121..2bc48d9788c 100644 --- a/mne/_fiff/write.py +++ b/mne/_fiff/write.py @@ -149,7 +149,7 @@ def write_name_list(fid, kind, data): write_string(fid, kind, ":".join(data)) -def write_name_list_sanitized(fid, kind, lst, name): +def write_name_list_sanitized(fid, kind, lst, *, name="ch_names"): """Write a sanitized, colon-separated list of names.""" write_string(fid, kind, _safe_name_list(lst, "write", name)) @@ -408,9 +408,7 @@ def write_dig_points(fid, dig, block=False, coord_frame=None, *, ch_names=None): fid.write(np.array(d["ident"], ">i4").tobytes()) fid.write(np.array(d["r"][:3], ">f4").tobytes()) if ch_names is not None: - write_name_list_sanitized( - fid, FIFF.FIFF_MNE_CH_NAME_LIST, ch_names, "ch_names" - ) + write_name_list_sanitized(fid, FIFF.FIFF_MNE_CH_NAME_LIST, ch_names) if block: end_block(fid, FIFF.FIFFB_ISOTRAK) diff --git a/mne/cov.py b/mne/cov.py index 07af31476d8..23bec5de778 100644 --- a/mne/cov.py +++ b/mne/cov.py @@ -2527,7 +2527,7 @@ def _write_cov(fid, cov): # Channel names if cov["names"] is not None and len(cov["names"]) > 0: write_name_list_sanitized( - fid, FIFF.FIFF_MNE_ROW_NAMES, cov["names"], 'cov["names"]' + fid, FIFF.FIFF_MNE_ROW_NAMES, cov["names"], name='cov["names"]' ) # Data diff --git a/mne/preprocessing/maxwell.py b/mne/preprocessing/maxwell.py index 8c270252bb2..059a9a39158 100644 --- a/mne/preprocessing/maxwell.py +++ b/mne/preprocessing/maxwell.py @@ -2901,7 +2901,7 @@ def _read_cross_talk(cross_talk, ch_names): ctc = None if cross_talk is not None: sss_ctc = _read_ctc(cross_talk) - ctc_chs = sss_ctc["proj_items_chs"] + ctc_chs = sss_ctc["ch_names"] # checking for extra space ambiguity in channel names # between old and new fif files if ch_names[0] not in ctc_chs: From 04218e1b50125e0f7210316b148899e18607bfbb Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 22 May 2026 19:34:25 +0000 Subject: [PATCH 2/5] [autofix.ci] apply automated fixes --- doc/changes/dev/{newfeature.rst => 13911.newfeature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename doc/changes/dev/{newfeature.rst => 13911.newfeature.rst} (100%) diff --git a/doc/changes/dev/newfeature.rst b/doc/changes/dev/13911.newfeature.rst similarity index 100% rename from doc/changes/dev/newfeature.rst rename to doc/changes/dev/13911.newfeature.rst From 489a193b7ef3aa918c13fd3eb50a509cc65fa8e4 Mon Sep 17 00:00:00 2001 From: Eric Larson Date: Fri, 22 May 2026 17:01:13 -0400 Subject: [PATCH 3/5] FIX: Mapping --- mne/_fiff/meas_info.py | 8 +-- mne/_fiff/proc_history.py | 100 +++++++++++++----------------- mne/_fiff/tests/test_meas_info.py | 2 + 3 files changed, 49 insertions(+), 61 deletions(-) diff --git a/mne/_fiff/meas_info.py b/mne/_fiff/meas_info.py index 376b6975e40..c5e0873eb7f 100644 --- a/mne/_fiff/meas_info.py +++ b/mne/_fiff/meas_info.py @@ -2780,10 +2780,10 @@ def read_meas_info(fid, tree, clean_bads=False, verbose=None): info["hpi_subsystem"] = hs # Read cross-talk and fine cal - cross_talk = _read_mf_data(fid, tree, kind="cross_talk") + cross_talk = _read_mf_data(fid, tree, kind="sss_ctc") if len(cross_talk): info["cross_talk"] = cross_talk - fine_calibration = _read_mf_data(fid, tree, kind="fine_calibration") + fine_calibration = _read_mf_data(fid, tree, kind="sss_cal") if len(fine_calibration): info["fine_calibration"] = fine_calibration @@ -3123,8 +3123,8 @@ def write_meas_info(fid, info, data_type=None, reset_range=True): # Channel information _write_ch_infos(fid, info["chs"], reset_range, ch_names_mapping) - _write_mf_data(fid, info, kind="cross_talk") - _write_mf_data(fid, info, kind="fine_calibration") + _write_mf_data(fid, info, kind="sss_ctc", key="cross_talk") + _write_mf_data(fid, info, kind="sss_cal", key="fine_calibration") if info.get("device_info") is not None: start_block(fid, FIFF.FIFFB_DEVICE) diff --git a/mne/_fiff/proc_history.py b/mne/_fiff/proc_history.py index 6c8d0d520f7..cfd035d497c 100644 --- a/mne/_fiff/proc_history.py +++ b/mne/_fiff/proc_history.py @@ -254,34 +254,46 @@ def _sss_ctc_ch_name_clean(tag_data): _sss_cal_writers = (write_int_matrix, write_float_matrix) _sss_cal_casters = (np.array, np.array) +_io_map = dict( + sss_ctc=dict( + keys=_sss_ctc_keys, + ids=_sss_ctc_ids, + casters=_sss_ctc_casters, + writers=_sss_ctc_writers, + block=FIFF.FIFFB_CHANNEL_DECOUPLER, + ), + sss_cal=dict( + keys=_sss_cal_keys, + ids=_sss_cal_ids, + casters=_sss_cal_casters, + writers=_sss_cal_writers, + block=FIFF.FIFFB_SSS_CAL, + ), + sss_info=dict( + keys=_sss_info_keys, + ids=_sss_info_ids, + casters=_sss_info_casters, + writers=_sss_info_writers, + block=FIFF.FIFFB_SSS_INFO, + ), + max_st=dict( + keys=_max_st_keys, + ids=_max_st_ids, + casters=_max_st_casters, + writers=_max_st_writers, + block=FIFF.FIFFB_SSS_ST_INFO, + ), +) + -def _write_mf_data(fid, info, *, kind): - this_data = info.get(kind, None) +def _write_mf_data(fid, info, *, kind, key=None): + key = kind if key is None else key + this_data = info.get(key, dict()) if not this_data: # empty or None return + del info, key logger.debug("Writing %s info with keys: %s", kind, list(this_data)) - del info - if kind == "cross_talk": - keys = _sss_ctc_keys - ids = _sss_ctc_ids - writers = _sss_ctc_writers - block = FIFF.FIFFB_CHANNEL_DECOUPLER - elif kind == "fine_calibration": - keys = _sss_cal_keys - ids = _sss_cal_ids - writers = _sss_cal_writers - block = FIFF.FIFFB_SSS_CAL - elif kind == "max_st": - keys = _max_st_keys - ids = _max_st_ids - writers = _max_st_writers - block = FIFF.FIFFB_SSS_ST_INFO - else: - assert kind == "sss_info" - keys = _sss_info_keys - ids = _sss_info_ids - writers = _sss_info_writers - block = FIFF.FIFFB_SSS_INFO + keys, ids, _, writers, block = _io_map[kind].values() start_block(fid, block) for key, id_, writer in zip(keys, ids, writers): if key in this_data: @@ -294,7 +306,7 @@ def _read_ctc(fname): fname = _check_fname(fname, overwrite="read", must_exist=True) f, tree, _ = fiff_open(fname) with f as fid: - sss_ctc = _read_mf_data(fid, tree, kind="cross_talk") + sss_ctc = _read_mf_data(fid, tree, kind="sss_ctc") bad_str = f"Invalid cross-talk FIF: {fname}" if len(sss_ctc) == 0: raise ValueError(bad_str) @@ -310,40 +322,14 @@ def _read_ctc(fname): def _read_maxfilter_record(fid, tree): """Read maxfilter processing record from file.""" max_info = dict() - for key, kind in ( - ("sss_info", "sss_info"), - ("sss_ctc", "cross_talk"), - ("sss_cal", "fine_calibration"), - ("max_st", "max_st"), - ): - this_data = _read_mf_data(fid, tree, kind=kind) - if this_data: - max_info[key] = this_data + for key in _io_map: + this_data = _read_mf_data(fid, tree, kind=key) + max_info[key] = this_data or dict() # always add, even if empty return max_info def _read_mf_data(fid, tree, *, kind): - if kind == "cross_talk": - block = FIFF.FIFFB_CHANNEL_DECOUPLER # 501 - keys = _sss_ctc_keys - ids = _sss_ctc_ids - casters = _sss_ctc_casters - elif kind == "fine_calibration": - block = FIFF.FIFFB_SSS_CAL # 503 - keys = _sss_cal_keys - ids = _sss_cal_ids - casters = _sss_cal_casters - elif kind == "max_st": - block = FIFF.FIFFB_SSS_ST_INFO # 504 - keys = _max_st_keys - ids = _max_st_ids - casters = _max_st_casters - else: - assert kind == "sss_info" - block = FIFF.FIFFB_SSS_INFO # 502 - keys = _sss_info_keys - ids = _sss_info_ids - casters = _sss_info_casters + keys, ids, casters, _, block = _io_map[kind].values() sss_kind_block = dir_tree_find(tree, block) sss_out = dict() if len(sss_kind_block) > 0: @@ -364,5 +350,5 @@ def _write_maxfilter_record(fid, record): """Write maxfilter processing record to file.""" _write_mf_data(fid, record, kind="sss_info") _write_mf_data(fid, record, kind="max_st") - _write_mf_data(fid, record, kind="cross_talk") - _write_mf_data(fid, record, kind="fine_calibration") + _write_mf_data(fid, record, kind="sss_ctc") + _write_mf_data(fid, record, kind="sss_cal") diff --git a/mne/_fiff/tests/test_meas_info.py b/mne/_fiff/tests/test_meas_info.py index 56915f6d265..d9b86af0c14 100644 --- a/mne/_fiff/tests/test_meas_info.py +++ b/mne/_fiff/tests/test_meas_info.py @@ -1112,6 +1112,8 @@ def test_csr_csc(tmp_path): fname = tmp_path / "test.fif" write_info(fname, info, verbose="debug") info_read = read_info(fname, verbose="debug") + assert "max_info" in info_read["proc_history"][0] + assert "sss_ctc" in info_read["proc_history"][0]["max_info"] assert "decoupler" in info_read["proc_history"][0]["max_info"]["sss_ctc"] ct_read = info_read["proc_history"][0]["max_info"]["sss_ctc"]["decoupler"] assert isinstance(ct_read, sparse.csc_array) From eb328c147e8de5b28783b8320858ffd82115ab1a Mon Sep 17 00:00:00 2001 From: Eric Larson Date: Fri, 22 May 2026 17:14:23 -0400 Subject: [PATCH 4/5] FIX: Order --- mne/_fiff/proc_history.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/mne/_fiff/proc_history.py b/mne/_fiff/proc_history.py index cfd035d497c..3a6b1b57787 100644 --- a/mne/_fiff/proc_history.py +++ b/mne/_fiff/proc_history.py @@ -255,20 +255,6 @@ def _sss_ctc_ch_name_clean(tag_data): _sss_cal_casters = (np.array, np.array) _io_map = dict( - sss_ctc=dict( - keys=_sss_ctc_keys, - ids=_sss_ctc_ids, - casters=_sss_ctc_casters, - writers=_sss_ctc_writers, - block=FIFF.FIFFB_CHANNEL_DECOUPLER, - ), - sss_cal=dict( - keys=_sss_cal_keys, - ids=_sss_cal_ids, - casters=_sss_cal_casters, - writers=_sss_cal_writers, - block=FIFF.FIFFB_SSS_CAL, - ), sss_info=dict( keys=_sss_info_keys, ids=_sss_info_ids, @@ -283,6 +269,20 @@ def _sss_ctc_ch_name_clean(tag_data): writers=_max_st_writers, block=FIFF.FIFFB_SSS_ST_INFO, ), + sss_ctc=dict( + keys=_sss_ctc_keys, + ids=_sss_ctc_ids, + casters=_sss_ctc_casters, + writers=_sss_ctc_writers, + block=FIFF.FIFFB_CHANNEL_DECOUPLER, + ), + sss_cal=dict( + keys=_sss_cal_keys, + ids=_sss_cal_ids, + casters=_sss_cal_casters, + writers=_sss_cal_writers, + block=FIFF.FIFFB_SSS_CAL, + ), ) @@ -348,7 +348,5 @@ def _read_mf_data(fid, tree, *, kind): def _write_maxfilter_record(fid, record): """Write maxfilter processing record to file.""" - _write_mf_data(fid, record, kind="sss_info") - _write_mf_data(fid, record, kind="max_st") - _write_mf_data(fid, record, kind="sss_ctc") - _write_mf_data(fid, record, kind="sss_cal") + for key in _io_map: + _write_mf_data(fid, record, kind=key) From b1c53cc155924be5852eac72d9eea94c8645c8dc Mon Sep 17 00:00:00 2001 From: Eric Larson Date: Fri, 22 May 2026 17:16:05 -0400 Subject: [PATCH 5/5] FIX: Check --- mne/_fiff/meas_info.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mne/_fiff/meas_info.py b/mne/_fiff/meas_info.py index c5e0873eb7f..5cb60fccc77 100644 --- a/mne/_fiff/meas_info.py +++ b/mne/_fiff/meas_info.py @@ -1079,7 +1079,7 @@ class HeliumInfo(ValidatedDict): "gantry_angle": partial( _check_types, name='helium_info["gantry_angle"]', - types="numeric", + types="int-like", cast=int, ), "helium_level": partial(