From d4e226383994d0b90114cf650f7b5b562843c535 Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 17:43:59 -0400 Subject: [PATCH 1/4] added extra arguments for autoplot --- src/labcore/scripts/monitr_server.py | 34 +++++++++++++++++++--------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/labcore/scripts/monitr_server.py b/src/labcore/scripts/monitr_server.py index c4ee3dc..ce0dc2d 100644 --- a/src/labcore/scripts/monitr_server.py +++ b/src/labcore/scripts/monitr_server.py @@ -18,6 +18,10 @@ def make_template(data_root: Union[str, Path] = ".") -> Any: def data_selected_cb(*events: Any) -> None: loader.file_path = events[0].new + if loader.auto_load_toggle.value: + import asyncio + # Schedule async load in event loop + asyncio.create_task(loader.load_and_preprocess()) ds.param.watch(data_selected_cb, ["selected_path"]) @@ -33,28 +37,36 @@ def refilter_data_select(*events: Any) -> None: return temp -def run_autoplot() -> None: +def run_autoplot(): parser = argparse.ArgumentParser( description="Data monitoring program made for Pfaff lab by Rocky Daehler, building" - " on Plottr made by Wolfgang Pfaff. Run command on it's own to start the" - " application, and pass an (optional) path to the data directory as a" - " second argument." - ) - parser.add_argument("Datapath", nargs="?", default=".") + " on Plottr made by Wolfgang Pfaff. Run command on it's own to start the" + " application, and pass an (optional) path to the data directory as a" + " second argument.") + parser.add_argument('Datapath', nargs='?', default='.', help='Path to the data directory (default: current directory)') + parser.add_argument('-p', '--port', type=int, default=19530, help='Port to run the server on (default: 19530)') + parser.add_argument('-a', '--address', type=str, default='0.0.0.0', help='Address to bind to (default: 0.0.0.0)') + parser.add_argument('-o', '--allow-origin', type=str, default=None, + help='Allowed websocket origins (comma-separated, e.g., "host1:19530,host2:19530")') args = parser.parse_args() data_root = Path(args.Datapath) - if not data_root.is_dir(): - logger.error( - "Provided Path was invalid.\nPlease provide a path to an existing directory housing your data." - ) + if (not data_root.is_dir()): + logger.error("Provided Path was invalid.\nPlease provide a path to an existing directory housing your data.") return logger.info(f"Running Labcore.Autoplot on data from {data_root}") + logger.info(f"Server running on http://{args.address}:{args.port}") template = make_template(data_root) - template.show() + # Parse websocket origins + websocket_origin = None + if args.allow_origin: + websocket_origin = [o.strip() for o in args.allow_origin.split(',')] + logger.info(f"Allowed websocket origins: {websocket_origin}") + + template.show(port=args.port, address=args.address, websocket_origin=websocket_origin) make_template(".").servable() From 68d23e5062cf38b4240101997758f84fbd83852a Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 17:44:43 -0400 Subject: [PATCH 2/4] Add firle archival to Dataset Analysis --- src/labcore/analysis/analysis_base.py | 39 +++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/labcore/analysis/analysis_base.py b/src/labcore/analysis/analysis_base.py index 5b744b2..55ad4e5 100644 --- a/src/labcore/analysis/analysis_base.py +++ b/src/labcore/analysis/analysis_base.py @@ -5,6 +5,7 @@ from pathlib import Path from types import TracebackType from typing import Any, Dict, List, Optional, Type, Union +from shutil import copy2 # Needed to generate hvplot from a script import holoviews as hv @@ -21,6 +22,7 @@ class AnalysisExistsError(Exception): + logger.info('Analysis already exists, adding another') pass @@ -161,6 +163,43 @@ def add_figure( make_figure = add_figure + def archive_file(self, file_path, name: Optional[str] = None, folder: Optional[Path] = None): + """Copy a source file into the analysis output folder(s). + + Parameters + ---------- + file_path + Path to the file that should be archived. + name + Optional base name for the archived copy. Defaults to the source stem. + folder + Optional destination folder. Defaults to every folder managed by this analysis. + + Returns + ------- + list[Path] + Paths of the archived copies. + """ + source = Path(file_path) + if not source.exists(): + raise FileNotFoundError(f"source file does not exist: {source}") + + archive_name = source.stem if name is None else name + suffix = source.suffix.lstrip(".") + target_folders = self.savefolders if folder is None else [Path(folder)] + + archived_paths = [] + for target_folder in target_folders: + if not target_folder.exists(): + target_folder.mkdir(parents=True, exist_ok=True) + + archived_path = self._new_file_path(target_folder, archive_name, suffix) + copy2(source, archived_path) + archived_paths.append(archived_path) + + self.files.extend(path for path in archived_paths if path not in self.files) + return archived_paths + def to_table(self, name: str, data: Dict[str, Any]) -> None: data.update( { From e807da9a1928c1e22b62142eb50c2c841d2ba264 Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 17:45:18 -0400 Subject: [PATCH 3/4] added a fit func --- src/labcore/analysis/__init__.py | 1 + src/labcore/analysis/fitfuncs/generic.py | 37 ++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/labcore/analysis/__init__.py b/src/labcore/analysis/__init__.py index de0195c..7531f79 100644 --- a/src/labcore/analysis/__init__.py +++ b/src/labcore/analysis/__init__.py @@ -1,6 +1,7 @@ from .analysis_base import DatasetAnalysis as DatasetAnalysis from .fit import Fit as Fit from .fit import FitResult as FitResult +from .fit import fit_and_add_to_ds from .hvplotting import ComplexHist as ComplexHist from .hvplotting import Node as Node from .hvplotting import ValuePlot as ValuePlot diff --git a/src/labcore/analysis/fitfuncs/generic.py b/src/labcore/analysis/fitfuncs/generic.py index f3905d5..3379998 100644 --- a/src/labcore/analysis/fitfuncs/generic.py +++ b/src/labcore/analysis/fitfuncs/generic.py @@ -144,6 +144,43 @@ def guess( return dict(A=A, of=of, phi=phi, f=f, tau=tau) +class ExponentialDecayCosinetoground(Fit): + @staticmethod + def model(coordinates, A, of, f, phi, tau) -> np.ndarray: + """$A \sin(2*\pi*(f*x + \phi/360)) \exp(-x/\tau) + of$""" + return A * (1+np.sin(2 * np.pi * (f * coordinates + phi/360))) * np.exp(-coordinates/tau) + of + + @staticmethod + def guess(coordinates, data): + """This guess will ignore the first value because since it usually is not relaiable.""" + + # offset guess: The mean of the data + of = np.mean(data) + + # amplitude guess: difference between max and min. + A = np.abs(np.max(data) - np.min(data)) / 2. + if data[0] < data[-1]: + A *= -1 + + # f guess: Maximum of the absolute value of the fourier transform. + fft_data = np.fft.rfft(data)[1:] + fft_coordinates = np.fft.rfftfreq(data.size, coordinates[1] - coordinates[0])[1:] + + # note to confirm, could there be multiple peaks? I am always taking the first one here. + f_max_index = np.argmax(fft_data) + f = fft_coordinates[f_max_index] + + # phi guess + phi = -np.angle(fft_data[f_max_index], deg=True) + + # tau guess: pick the point where we reach roughly 1/e + one_over_e_val = of + A/3. + one_over_e_idx = np.argmin(np.abs(data-one_over_e_val)) + tau = coordinates[one_over_e_idx] + + return dict(A=A, of=of, phi=phi, f=f, tau=tau) + + class Gaussian(Fit): @staticmethod From aae4a86ea706ffecfb4ee4db73e715f9cd41329d Mon Sep 17 00:00:00 2001 From: Gaurav Agarwal Date: Thu, 18 Jun 2026 17:46:31 -0400 Subject: [PATCH 4/4] Added 'ddh5-xr' which facilitates a non-datadict pipeline for ddh5 to xarray conversion. SWMR reading and writing supported. --- src/labcore/data/__init__.py | 17 + src/labcore/data/ddh5_xr.py | 1945 +++++++++++++++++++++++++++ src/labcore/measurement/__init__.py | 1 + src/labcore/measurement/storage.py | 222 +++ 4 files changed, 2185 insertions(+) create mode 100644 src/labcore/data/ddh5_xr.py diff --git a/src/labcore/data/__init__.py b/src/labcore/data/__init__.py index e69de29..1a27194 100644 --- a/src/labcore/data/__init__.py +++ b/src/labcore/data/__init__.py @@ -0,0 +1,17 @@ +from .datadict import DataDict, MeshgridDataDict, dd2xr +from .tools import split_complex +from .datadict_storage import datadict_from_hdf5, datadict_to_hdf5, all_datadicts_from_hdf5, DDH5Writer, load_as_xr +from .ddh5_xr import ( + ddh5_to_xarray, + ddh5_to_gridded_ddh5, + ddh5_schema, + ddh5_info, + validate_ddh5, + MissingMode, + DDH5Schema, + DDH5FieldInfo, + GridInfo, + DDH5ValidationReport, + GridInferenceError, + DDH5Writer_swmr, +) \ No newline at end of file diff --git a/src/labcore/data/ddh5_xr.py b/src/labcore/data/ddh5_xr.py new file mode 100644 index 0000000..0f36a2f --- /dev/null +++ b/src/labcore/data/ddh5_xr.py @@ -0,0 +1,1945 @@ +""" +ddh5_xr.py -- Direct DDH5-to-xarray conversion without DataDict overhead. + +Optimized pipeline that skips DataDict/MeshgridDataDict intermediate +representations. Reads DDH5 files with SWMR support for lock-free +concurrent access, supports lazy loading via h5py/dask, preserves +coordinate order/axis dependencies/units/labels, and handles +multi-dimensional independents. + +Gridding strategies for incomplete data: + 'pad' -- fill missing grid slots with NaN (raises warning) + 'truncate' -- trim to smallest complete grid (raises warning) + 'raise' -- fail on shape mismatch + +Modular layers: + Schema discovery -> Grid inference -> Coordinate building -> Data var + building -> Dataset assembly, plus Validation and Gridded-DDH5 export. +""" + +from __future__ import annotations + +import datetime +import json +import logging +import os +import re +import shutil +import time +import uuid +import warnings +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Collection, Dict, List, Optional, Tuple, Union + +import numpy as np +import h5py + +from .datadict_storage import FileOpener, deh5ify, DATAFILEXT, NumpyEncoder + +try: + from labcore.utils.num import guess_grid_from_sweep_direction +except ImportError: + guess_grid_from_sweep_direction = None # type: ignore + +try: + from labcore.utils.misc import reorder_indices as _reorder_indices +except ImportError: + _reorder_indices = None # type: ignore + +logger = logging.getLogger(__name__) + +try: + import dask.array as da + + _DASK_AVAILABLE = True +except ImportError: + _DASK_AVAILABLE = False + da = None + +try: + import xarray as xr + + _XARRAY_AVAILABLE = True +except ImportError: + _XARRAY_AVAILABLE = False + xr = None + + +# =========================================================================== +# Constants +# =========================================================================== + +_TIMESTRFORMAT = "%Y-%m-%d %H:%M:%S" + + +class DDH5SWMRReader: + """Context manager for reading DDH5 with SWMR for lock-free reads. + + Tries SWMR mode first; falls back to regular HDF5 opening. + Unlike :class:`FileOpener`, does **not** create a lock file, + so it is safe for concurrent read-while-write scenarios. + + Parameters + ---------- + path : str, Path + Full path to the ``.ddh5`` file. + timeout : float, optional + Maximum time (seconds) to wait for the file to appear / become + accessible. Default: 30 s. + """ + + def __init__(self, path: Union[str, Path], timeout: float = 30.0): + self._path = Path(path) + self._timeout = timeout + self._h5: Optional[h5py.File] = None + self.swmr_active: bool = False + + def __enter__(self) -> h5py.File: + t0 = time.time() + while True: + if not self._path.exists(): + if time.time() - t0 > self._timeout: + raise FileNotFoundError( + f"File {self._path} not found within {self._timeout} s" + ) + time.sleep(0.1) + continue + + try: + self._h5 = h5py.File(str(self._path), "r", swmr=True) + self.swmr_active = True + return self._h5 + except Exception: + pass + + try: + self._h5 = h5py.File(str(self._path), "r") + self.swmr_active = False + return self._h5 + except Exception: + if time.time() - t0 > self._timeout: + raise + time.sleep(0.1) + + def __exit__(self, *args: Any) -> None: + if self._h5 is not None: + self._h5.close() + self._h5 = None + + +def timestamp_from_path(p: Path) -> datetime.datetime: + """Return a `datetime` timestamp from a standard-formatted path. + Assumes that the path stem has a timestamp that begins in ISO-like format + ``YYYY-mm-ddTHHMMSS``. + """ + timestring = str(p.stem)[:13] + ":" + str(p.stem)[13:15] + ":" + str(p.stem)[15:17] + return datetime.datetime.fromisoformat(timestring) + + +def find_data( + root, + newer_than: Optional[datetime.datetime] = None, + older_than: Optional[datetime.datetime] = None, + folder_filter: Optional[str] = None, +) -> dict: + if not isinstance(root, Path): + root = Path(root) + + folders = {} + for f, dirs, files in os.walk(root): + if "data.ddh5" in files: + fp = Path(f) + ts = timestamp_from_path(fp) + if newer_than is not None and ts <= newer_than: + continue + if older_than is not None and ts >= older_than: + continue + if folder_filter is not None: + pattern = re.compile(folder_filter) + if not pattern.search(str(fp.stem)): + continue + + folders[fp] = (dirs, files) + return folders + + +# =========================================================================== +# Enums / modes +# =========================================================================== + + +class MissingMode(str, Enum): + """ + How to handle data that does not perfectly fill the inferred grid. + + * ``pad`` -- fill empty grid slots with ``np.nan``; raise ``RuntimeWarning``. + * ``truncate`` -- trim trailing incomplete cycle; raise ``RuntimeWarning``. + * ``raise`` -- raise ``ValueError`` on any shape mismatch. + """ + + PAD = "pad" + TRUNCATE = "truncate" + RAISE = "raise" + + +class GridInferenceError(ValueError): + """Raised when the grid shape cannot be determined from the DDH5 data.""" + + +# =========================================================================== +# Dataclasses +# =========================================================================== + + +@dataclass +class DDH5FieldInfo: + """Metadata for a single data field (independent or dependent). + + Parameters + ---------- + name : str + Field name as used in the HDF5 dataset. + shape : tuple of int + Stored shape of the dataset ``(nrecords, ...)``. + dtype : numpy.dtype + Data type of the stored values. + axes : list of str + For dependents: list of independent axis names. + For independents: empty list. + unit : str + Physical unit string (empty if not specified). + label : str + Human-readable label (falls back to *name* if unset). + """ + + name: str + shape: Tuple[int, ...] + dtype: np.dtype + axes: List[str] = field(default_factory=list) + unit: str = "" + label: str = "" + + +@dataclass +class DDH5Schema: + """Complete structural description of a DDH5 ``/data`` group. + + Parameters + ---------- + fields : dict[str, DDH5FieldInfo] + Map from field name to its metadata. + dependents : list of str + Names of fields that have an ``axes`` attribute (non-empty). + independents : list of str + Names of fields that have no ``axes`` attribute (empty list). + nrecords : int + First dimension length shared by all datasets. + meta : dict[str, Any] + Global group-level metadata (cleaned by :func:`deh5ify`). + """ + + fields: Dict[str, DDH5FieldInfo] + dependents: List[str] + independents: List[str] + nrecords: int + meta: Dict[str, Any] + + def select(self, names: List[str]) -> DDH5Schema: + """Return a sub-schema containing only *names* (plus required axes).""" + keep = set(names) + for name in names: + info = self.fields.get(name) + if info and info.axes: + keep.update(info.axes) + fields = {n: self.fields[n] for n in keep if n in self.fields} + deps = [n for n in self.dependents if n in keep] + indeps = [n for n in self.independents if n in keep] + return DDH5Schema( + fields=fields, + dependents=deps, + independents=indeps, + nrecords=self.nrecords, + meta=self.meta, + ) + + +@dataclass +class GridInfo: + """Inferred multi-dimensional grid description. + + Distinguishes between *record-grid* axes (1-D sweep axes that reshape + the record dimension) and *sub-dimension* axes (multi-dimensional + axes whose inner dimensions become additional xarray dims). + + Parameters + ---------- + axes_order : list of str + Final xarray dimension names matching the ``axes`` attribute + convention. + shape : tuple of int + Final xarray shape = *record_shape* + *sub_dim_shapes*. + record_shape : tuple of int + Shape from 1-D sweep axes (reshapes the record dimension). + record_axes : list of str + Axes that form the record grid, in sweep order (slow→fast). + sweep_shape : tuple of int or None + Natural storage order shape from + :func:`guess_grid_from_sweep_direction` (only record-grid axes). + sweep_axes : list of str or None + Natural storage order of record-grid axes (slow→fast). + sub_dim_shapes : tuple of int + Per-record sub-dimension sizes from multi-dimensional axes. + sub_dim_axes : list of str + Names of multi-dimensional axes that contribute sub-dimensions. + is_complete : bool + ``True`` when ``prod(record_shape) == nrecords``. + expected_len : int + ``prod(record_shape)`` -- elements needed for a perfect record grid. + actual_len : int + Number of records stored (first-dimension length). + """ + + axes_order: List[str] = field(default_factory=list) + shape: Tuple[int, ...] = () + record_shape: Tuple[int, ...] = () + record_axes: List[str] = field(default_factory=list) + sweep_shape: Optional[Tuple[int, ...]] = None + sweep_axes: Optional[List[str]] = None + sub_dim_shapes: Tuple[int, ...] = () + sub_dim_axes: List[str] = field(default_factory=list) + is_complete: bool = True + expected_len: int = 0 + actual_len: int = 0 + + +@dataclass +class DDH5ValidationReport: + """Result of :func:`validate_ddh5`. + + Parameters + ---------- + is_valid : bool + ``True`` when no fatal errors were found. + errors : list of str + Fatal issues (missing axes, non-positive lengths, ...). + warnings : list of str + Non-fatal issues (NaN in axis data, non-monotonic, ...). + schema : DDH5Schema or None + Extracted schema (available even when validation fails partially). + grid_info : GridInfo or None + Inferred grid (``None`` when validation fails or axes cannot be + determined). + """ + + is_valid: bool + errors: List[str] = field(default_factory=list) + warnings_list: List[str] = field(default_factory=list) + schema: Optional[DDH5Schema] = None + grid_info: Optional[GridInfo] = None + + +# =========================================================================== +# Internal helpers +# =========================================================================== + + + +def _data_file_path(file: Union[str, Path]) -> Path: + """Normalise a filepath, appending ``.ddh5`` if missing.""" + path = Path(file) + if path.suffix != f".{DATAFILEXT}": + path = Path(path.parent, path.stem + f".{DATAFILEXT}") + return path + + +def _lazy_array(h5ds: h5py.Dataset) -> Any: + """Create a lazy array from an h5py Dataset. + + Uses ``dask.array`` when available, otherwise returns the h5py dataset + itself (which supports slicing). + """ + if _DASK_AVAILABLE: + chunks = h5ds.chunks + if chunks is None: + chunks = "auto" + return da.from_array(h5ds, chunks=chunks) + return h5ds + + +def _set_h5_attr(h5obj: Any, name: str, val: Any) -> None: + """Set HDF5 attribute with automatic type adaptation. + + Handles :class:`bytes`, :class:`numpy.ndarray`, :class:`list` of strings, + and plain scalars. + """ + try: + if isinstance(val, list): + all_str = all(isinstance(v, str) for v in val) + if all_str: + h5obj.attrs[name] = np.array(val, dtype=h5py.string_dtype()) + else: + h5obj.attrs[name] = np.array(val) + elif isinstance(val, (np.ndarray,)): + h5obj.attrs[name] = val + elif isinstance(val, bool): + h5obj.attrs[name] = int(val) + elif isinstance(val, bytes): + h5obj.attrs[name] = val + elif val is None: + h5obj.attrs[name] = "__NONE__" + else: + h5obj.attrs[name] = val + except Exception: + try: + h5obj.attrs[name] = str(val) + except Exception: + logger.debug("Could not set HDF5 attribute %s=%s", name, val) + + +# =========================================================================== +# Layer 1 -- Schema discovery +# =========================================================================== + + +def ddh5_schema( + path: Union[str, Path], + groupname: str = "data", + file_timeout: Optional[float] = None, + swmr: bool = True, +) -> DDH5Schema: + """Read DDH5 structure **without** loading value data. + + Only dataset shapes, attributes, and group-level metadata are read. + This is lightweight and suitable for use before deciding whether + to load data. + + Parameters + ---------- + path : str or Path + Path to the ``.ddh5`` file. + groupname : str, optional + HDF5 group name (default ``"data"``). + file_timeout : float, optional + Passed through to the file opener. + swmr : bool, optional + Attempt SWMR reading (default ``True``). + + Returns + ------- + DDH5Schema + """ + filepath = _data_file_path(path) + if not filepath.exists(): + raise FileNotFoundError(f"DDH5 file not found: {filepath}") + + if swmr: + opener = DDH5SWMRReader(filepath, timeout=file_timeout or 30.0) + else: + opener = FileOpener(filepath, "r", timeout=file_timeout) + + with opener as f: + if groupname not in f: + raise ValueError(f"HDF5 group '{groupname}' not found in {filepath}") + + grp = f[groupname] + keys = list(grp.keys()) + + if not keys: + nrecords = 0 + else: + nrecords = grp[keys[0]].shape[0] + + fields: Dict[str, DDH5FieldInfo] = {} + meta: Dict[str, Any] = {} + + for attr_name in grp.attrs: + meta[attr_name] = deh5ify(grp.attrs[attr_name]) + + for k in keys: + ds = grp[k] + try: + shp = ds.shape + except Exception: + shp = (0,) + try: + dtype = ds.dtype + except Exception: + dtype = np.dtype("float64") + + raw_axes = ds.attrs.get("axes", None) + if raw_axes is not None: + axes_val = deh5ify(raw_axes) + if isinstance(axes_val, (np.ndarray,)): + axes_val = axes_val.tolist() + if isinstance(axes_val, str): + axes_val = [axes_val] + axes = list(axes_val) if axes_val else [] + else: + axes = [] + + unit_raw = ds.attrs.get("unit", "") + unit = deh5ify(unit_raw) if unit_raw else "" + if isinstance(unit, bytes): + unit = unit.decode("utf-8", errors="replace") + unit = str(unit) if unit else "" + + label = k + for attr_name in ds.attrs: + cleaned = str(attr_name).strip("_") + if cleaned == "label": + lv = deh5ify(ds.attrs[attr_name]) + if isinstance(lv, bytes): + lv = lv.decode("utf-8", errors="replace") + if lv and str(lv): + label = str(lv) + break + + fields[k] = DDH5FieldInfo( + name=k, + shape=shp, + dtype=np.dtype(dtype), + axes=axes, + unit=str(unit), + label=str(label), + ) + + # A field is independent if it is referenced as an axis by ANY field, + # or if it has no axes of its own. + # A field is dependent only if it has axes and is NOT used as an axis. + all_axes: set = set() + for fi in fields.values(): + all_axes.update(fi.axes) + + deps = [n for n, fi in fields.items() if fi.axes and n not in all_axes] + indeps = [n for n, fi in fields.items() if n in all_axes or not fi.axes] + + return DDH5Schema( + fields=fields, + dependents=deps, + independents=indeps, + nrecords=nrecords, + meta=meta, + ) + + +# =========================================================================== +# Layer 2 -- Grid inference +# =========================================================================== + + +def _infer_axis_lengths( + grp: h5py.Group, + axes: List[str], + fields: Dict[str, DDH5FieldInfo], +) -> List[int]: + """Determine the grid length along each axis. + + For 1-D axes calls :func:`guess_grid_from_sweep_direction` to infer + the sweep period. For multi-dimensional axes the inner dimension + product is returned. + + Parameters + ---------- + grp : h5py.Group + Open HDF5 group containing the datasets. + axes : list of str + Axis names in the **sweep** order (slowest→fastest). + fields : dict[str, DDH5FieldInfo] + Field metadata keyed by name. + + Returns + ------- + list of int + Grid length for each axis, in the same order as *axes*. + """ + lengths: List[int] = [] + for ax_name in axes: + fi = fields[ax_name] + if len(fi.shape) == 1: + ds = grp[ax_name] + ax_data = ds[:] + lengths.append(len(np.unique(ax_data))) + else: + inner_prod = int(np.prod(fi.shape[1:])) + if fi.shape[0] <= 1: + lengths.append(inner_prod) + else: + ds = grp[ax_name] + first_slice = ds[0] + repeated = True + for i in range(1, min(fi.shape[0], 20)): + if not np.array_equal(ds[i], first_slice): + repeated = False + break + if repeated: + lengths.append(inner_prod) + else: + lengths.append(inner_prod) + return lengths + + +def _infer_grid( + grp: h5py.Group, + schema: DDH5Schema, + missing: MissingMode, +) -> GridInfo: + """Infer the multi-dimensional grid shape from axis data. + + Uses :func:`guess_grid_from_sweep_direction` to detect the natural + sweep ordering (slowest→fastest) and, when it differs from the + ``axes`` attribute order, applies the same transpose that + :func:`datadict_to_meshgrid` performs to match the nominal axis order. + + Parameters + ---------- + grp : h5py.Group + HDF5 group containing the datasets. + schema : DDH5Schema + Discovered schema. + missing : MissingMode + How to handle incomplete grids. + + Returns + ------- + GridInfo + + Raises + ------ + GridInferenceError + When the grid shape cannot be determined. + ValueError + When *missing='raise'* and data does not perfectly fill the grid. + """ + if not schema.dependents: + return GridInfo( + axes_order=[], + shape=(schema.nrecords,), + is_complete=True, + expected_len=schema.nrecords, + actual_len=schema.nrecords, + ) + + primary_dep = schema.dependents[0] + axes_attr = list(schema.fields[primary_dep].axes) + + if not axes_attr: + return GridInfo( + axes_order=[], + shape=(schema.nrecords,), + is_complete=True, + expected_len=schema.nrecords, + actual_len=schema.nrecords, + ) + + for ax in axes_attr: + if ax not in schema.fields: + raise GridInferenceError( + f"Axis '{ax}' referenced by '{primary_dep}' " + f"does not exist in the DDH5 file." + ) + + # Separate 1-D sweep (record-grid) axes from multi-dimensional (sub-dim) ones. + axes_1d: List[str] = [] + axes_md: List[str] = [] + for ax in axes_attr: + fi = schema.fields[ax] + if len(fi.shape) == 1: + axes_1d.append(ax) + else: + axes_md.append(ax) + + # ---- Determine record-grid shape from 1-D sweep axes ---- + sweep_axes: List[str] = [] + sweep_shape_tuple: Tuple[int, ...] = () + record_shape: Tuple[int, ...] = () + record_axes: List[str] = [] + + if axes_1d and guess_grid_from_sweep_direction is not None: + axis_data = {ax: grp[ax][:] for ax in axes_1d} + result = guess_grid_from_sweep_direction(**axis_data) + if result is None: + raise GridInferenceError( + "Could not determine sweep order from 1-D axis data. " + "The data may not form a regular grid." + ) + sweep_axes_slow_fast, raw_shape = result + sweep_axes = list(sweep_axes_slow_fast) + sweep_shape_tuple = tuple(int(s) for s in raw_shape) + record_shape = sweep_shape_tuple + record_axes = list(sweep_axes) + elif axes_1d: + lengths = _infer_axis_lengths(grp, axes_1d, schema.fields) + sweep_axes = list(axes_1d) + sweep_shape_tuple = tuple(lengths) + record_shape = sweep_shape_tuple + record_axes = list(axes_1d) + + # ---- Sub-dimensions from multi-dimensional axes ---- + sub_dim_shapes: Tuple[int, ...] = () + sub_dim_axes: List[str] = [] + for ax in axes_md: + md_len = _infer_axis_lengths(grp, [ax], schema.fields)[0] + sub_dim_shapes = sub_dim_shapes + (md_len,) + sub_dim_axes.append(ax) + + # ---- Compute final shape & axes order matching the *axes* attribute ---- + record_len = int(np.prod(record_shape)) if record_shape else schema.nrecords + sub_len = int(np.prod(sub_dim_shapes)) + dep_info = schema.fields[primary_dep] + actual_len = dep_info.shape[0] + + # Transpose record axes if sweep order differs from attribute order + if record_axes and axes_1d and record_axes != axes_1d: + if _reorder_indices is not None and len(record_axes) == len(axes_1d): + transpose_idxs = _reorder_indices(record_axes, axes_1d) + else: + transpose_idxs = list(range(len(record_axes))) + record_shape = tuple(record_shape[i] for i in transpose_idxs) + record_axes = list(axes_1d) + else: + record_axes = list(axes_1d) if axes_1d else record_axes + + final_axes = list(record_axes) + list(sub_dim_axes) + final_shape = record_shape + sub_dim_shapes + + if record_len != actual_len: + if missing == MissingMode.RAISE: + raise ValueError( + f"Record grid shape {record_shape} (={record_len} records) " + f"does not match the data ({actual_len} records). " + f"Use missing='pad' or missing='truncate'." + ) + elif missing == MissingMode.TRUNCATE: + truncated_shape = list(record_shape) + truncated_axes = list(record_axes) + found = False + for i in range(len(truncated_shape)): + reduced = int(np.prod(truncated_shape[: i + 1])) + if actual_len >= reduced and (i + 1 < len(truncated_shape)): + continue + if actual_len >= reduced: + truncated_shape = truncated_shape[: i + 1] + truncated_axes = truncated_axes[: i + 1] + found = True + break + else: + inner = ( + int(np.prod(truncated_shape[1: i + 1])) + if i > 0 + else 1 + ) + max_outer = actual_len // inner + if max_outer > 0: + truncated_shape[0] = max_outer + truncated_shape = truncated_shape[: i + 1] + truncated_axes = truncated_axes[: i + 1] + else: + truncated_shape = [actual_len] + truncated_axes = [] + found = True + break + if not found: + truncated_shape = [actual_len] + truncated_axes = [] + record_shape = tuple(truncated_shape) + record_axes = truncated_axes + sweep_shape_tuple = record_shape + sweep_axes = list(record_axes) + record_len = int(np.prod(record_shape)) + final_axes = list(record_axes) + list(sub_dim_axes) + final_shape = record_shape + sub_dim_shapes + warnings.warn( + f"Data truncated to grid shape {record_shape}: " + f"got {actual_len} records. " + f"{actual_len - record_len} trailing " + f"elements dropped.", + RuntimeWarning, + ) + elif missing == MissingMode.PAD: + warnings.warn( + f"Record grid shape {record_shape} requires " + f"{record_len} records, but only {actual_len} found. " + f"Missing {record_len - actual_len} slots will be " + f"filled with NaN.", + RuntimeWarning, + ) + + return GridInfo( + axes_order=final_axes, + shape=final_shape, + record_shape=record_shape, + record_axes=record_axes, + sweep_shape=sweep_shape_tuple if sweep_shape_tuple else None, + sweep_axes=sweep_axes if sweep_axes else None, + sub_dim_shapes=sub_dim_shapes, + sub_dim_axes=sub_dim_axes, + is_complete=(record_len == actual_len), + expected_len=record_len, + actual_len=actual_len, + ) + + +# =========================================================================== +# Layer 3 -- Data loading & reshaping +# =========================================================================== + + +def _reshape_to_grid( + data: np.ndarray, + grid_info: GridInfo, + missing: MissingMode, +) -> np.ndarray: + """Reshape the record (first) dimension of *data* to the record grid. + + Only the leading dimension (records) is reshaped; any trailing inner + dimensions (from multi-dimensional axes or array-type dependents) are + preserved. + + When the sweep order from :func:`guess_grid_from_sweep_direction` + differs from the nominal axis order, a transpose is applied to the + record-grid portion so that the final shape matches *axes_order*. + + Parameters + ---------- + data : np.ndarray + Data with first axis = record count. + grid_info : GridInfo + Inferred grid. + missing : MissingMode + Strategy for incomplete grids. + + Returns + ------- + np.ndarray + Reshaped data with leading dims = *grid_info.shape*. + """ + inner_shp = data.shape[1:] + record_len = grid_info.expected_len + actual_len = data.shape[0] + + if record_len < actual_len: + if missing == MissingMode.TRUNCATE: + data = data[:record_len] + elif missing == MissingMode.RAISE: + raise ValueError( + f"Data has {actual_len} records, " + f"grid expects {record_len}" + ) + else: + data = data[:record_len] + elif record_len > actual_len: + if missing == MissingMode.PAD: + pad_shape = (record_len - actual_len,) + inner_shp + padding = np.full(pad_shape, np.nan, dtype=data.dtype) + data = np.concatenate([data, padding], axis=0) + elif missing == MissingMode.RAISE: + raise ValueError( + f"Data has {actual_len} records, " + f"grid expects {record_len}" + ) + + record_shape = grid_info.record_shape + sweep_shape = grid_info.sweep_shape + + if not record_shape: + return data + + if sweep_shape is None: + target = record_shape + inner_shp + return data.reshape(target) + + # Reshape records → sweep_shape (natural storage order) + target = sweep_shape + inner_shp + data = data.reshape(target) + + # Transpose record-grid axes if sweep order ≠ nominal order + if sweep_shape != record_shape and grid_info.sweep_axes is not None: + if _reorder_indices is not None and len(sweep_shape) == len(record_shape): + transpose_idxs = tuple( + _reorder_indices(grid_info.sweep_axes, grid_info.record_axes) + ) + else: + transpose_idxs = tuple(range(len(sweep_shape))) + if transpose_idxs != tuple(range(len(sweep_shape))): + inner_axes = tuple( + range(len(sweep_shape), len(sweep_shape) + len(inner_shp)) + ) + data = data.transpose(transpose_idxs + inner_axes) + + return data + + +def _build_coordinates( + grp: h5py.Group, + schemas: DDH5Schema, + grid_info: GridInfo, + missing: MissingMode = MissingMode.PAD, + lazy: bool = False, +) -> Tuple[Dict[str, Any], Dict[str, Dict[str, Any]]]: + """Build xarray coordinate arrays and their metadata. + + For 1D grid axes, extracts ordered unique values from the axis dataset + (avoiding unnecessary reshape-to-full-grid). + + For multi-dimensional axes that repeat across records, takes the first + record's array as the coordinate. + + For multi-dimensional axes that vary per record, reshapes them to the + grid and includes them as multi-dimensional coordinates. + + Parameters + ---------- + grp : h5py.Group + HDF5 group. + schemas : DDH5Schema + Schema. + grid_info : GridInfo + Inferred grid. + missing : MissingMode + Strategy for incomplete grids. + lazy : bool + If True, use lazy arrays. + + Returns + ------- + coords : dict[str, array-like] + Coordinate arrays keyed by name. + coord_attrs : dict[str, dict[str, Any]] + Per-coordinate metadata (units, label, axes). + """ + coords: Dict[str, Any] = {} + coord_attrs: Dict[str, Dict[str, Any]] = {} + grid_axes_set = set(grid_info.axes_order) + n_axes = len(grid_info.axes_order) + + for name in schemas.independents: + ds = grp[name] + fi = schemas.fields[name] + + attrs: Dict[str, Any] = {} + if fi.unit: + attrs["units"] = fi.unit + if fi.label and fi.label != name: + attrs["label"] = fi.label + + if name in grid_axes_set and n_axes > 0: + axis_idx = grid_info.axes_order.index(name) + ndim = len(fi.shape) + axis_grid_len = grid_info.shape[axis_idx] if axis_idx < len(grid_info.shape) else fi.shape[0] + + if ndim == 1: + raw_1d = ds[:] + unique_vals = np.unique(raw_1d) + if len(unique_vals) == axis_grid_len: + coord_data = unique_vals + else: + _, idx = np.unique(raw_1d, return_index=True) + coord_data = raw_1d[np.sort(idx)] + if len(coord_data) > axis_grid_len: + coord_data = coord_data[:axis_grid_len] + coords[name] = coord_data + else: + raw_md = ds[:] + same = True + first = raw_md[0] + for i in range(1, min(raw_md.shape[0], 30)): + if not np.array_equal(raw_md[i], first): + same = False + break + if same: + coords[name] = first + else: + reshaped = _reshape_to_grid( + raw_md, grid_info, missing + ) + coords[name] = reshaped + else: + if lazy: + coords[name] = _lazy_array(ds) + else: + coords[name] = ds[:] + + coord_attrs[name] = attrs + + return coords, coord_attrs + + +def _build_data_variables( + grp: h5py.Group, + schemas: DDH5Schema, + grid_info: GridInfo, + missing: MissingMode, + lazy: bool = False, + fields_filter: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Build xarray DataArray entries for dependent datasets. + + Parameters + ---------- + grp : h5py.Group + HDF5 group. + schemas : DDH5Schema + Schema. + grid_info : GridInfo + Inferred grid. + missing : MissingMode + Handling for incomplete grids. + lazy : bool + If True, use lazy arrays where possible. + fields_filter : list of str, optional + If given, only include these dependent names. + + Returns + ------- + dict[str, xr.DataArray or tuple] + Entries suitable for ``xr.Dataset(..., coords=...)``. + """ + data_vars: Dict[str, Any] = {} + dims = grid_info.axes_order if grid_info.axes_order else ["dim_0"] + + for name in schemas.dependents: + if fields_filter is not None and name not in fields_filter: + continue + + ds = grp[name] + fi = schemas.fields[name] + unit = fi.unit + axes = fi.axes + + if grid_info.shape and grid_info.shape != (schemas.nrecords,): + raw_data = ds[:] + reshaped = _reshape_to_grid( + raw_data, grid_info, missing + ) + else: + if lazy: + reshaped = _lazy_array(ds) + else: + reshaped = ds[:] + + if _XARRAY_AVAILABLE: + da_attrs: Dict[str, Any] = {} + if unit: + da_attrs["units"] = unit + if axes: + da_attrs["axes"] = axes + da_attrs["label"] = fi.label + + data_vars[name] = xr.DataArray( + data=reshaped, + dims=dims[: reshaped.ndim] if reshaped.ndim <= len(dims) else dims, + attrs=da_attrs, + ) + else: + data_vars[name] = (tuple(dims), reshaped) + + return data_vars + + +# =========================================================================== +# Layer 4 -- Public conversion API +# =========================================================================== + + +def ddh5_to_xarray( + path: Union[str, Path], + groupname: str = "data", + lazy: bool = False, + fields: Optional[List[str]] = None, + startidx: Optional[int] = None, + stopidx: Optional[int] = None, + missing: MissingMode = MissingMode.PAD, + swmr: bool = True, + file_timeout: Optional[float] = None, +) -> "xr.Dataset": + """Convert a DDH5 file directly to :class:`xarray.Dataset`. + + Skips :class:`DataDict` and :class:`MeshgridDataDict` intermediate + representations. Supports SWMR reading, lazy loading, and two + strategies (:attr:`MissingMode.PAD` / :attr:`MissingMode.TRUNCATE`) + for incomplete grids. + + Parameters + ---------- + path : str or Path + Path to the ``.ddh5`` file (``.ddh5`` extension optional). + groupname : str, optional + HDF5 group name (default ``"data"``). + lazy : bool, optional + Use :mod:`dask` / h5py lazy arrays. Default ``False``. + fields : list of str, optional + Limit output to these data fields (dependents + their axes). + If ``None``, all fields are included. + startidx : int, optional + Start record index (0-based). + stopidx : int, optional + End record index (exclusive). + missing : MissingMode, optional + - ``'pad'`` -- fill missing grid slots with NaN + warning + - ``'truncate'`` -- trim trailing incomplete cycle + warning + - ``'raise'`` -- fail on shape mismatch + swmr : bool, optional + Attempt SWMR mode for lock-free reads (default ``True``). + file_timeout : float, optional + Max wait time for file access (seconds). + + Returns + ------- + xarray.Dataset + + Raises + ------ + ImportError + When ``xarray`` is not installed. + GridInferenceError + When the grid shape cannot be inferred from axis data. + ValueError + When *missing='raise'* and shape mismatch is detected. + """ + if not _XARRAY_AVAILABLE: + raise ImportError( + "xarray is required for ddh5_to_xarray. " + "Install with: pip install xarray" + ) + + filepath = _data_file_path(path) + if not filepath.exists(): + raise FileNotFoundError(f"DDH5 file not found: {filepath}") + + if swmr: + opener = DDH5SWMRReader(filepath, timeout=file_timeout or 30.0) + else: + opener = FileOpener(filepath, "r", timeout=file_timeout) + + with opener as f: + if groupname not in f: + raise ValueError(f"HDF5 group '{groupname}' not found.") + + grp = f[groupname] + + schemas = ddh5_schema( + filepath, groupname=groupname, file_timeout=file_timeout, swmr=False + ) + + if fields is not None: + schemas = schemas.select(fields) + + if startidx is not None or stopidx is not None: + start = startidx or 0 + stop = stopidx or schemas.nrecords + schemas.nrecords = stop - start + for fi in schemas.fields.values(): + fi.shape = (schemas.nrecords,) + fi.shape[1:] + + grid_info = _infer_grid(grp, schemas, missing) + + coords_raw, coord_attrs = _build_coordinates( + grp, schemas, grid_info, missing=missing, lazy=lazy + ) + data_vars = _build_data_variables( + + grp, + schemas, + grid_info, + missing=missing, + lazy=lazy, + fields_filter=fields, + ) + + coords: Dict[str, Any] = {} + grid_dims = grid_info.axes_order if grid_info.axes_order else ["dim_0"] + for cname, cdata in coords_raw.items(): + if isinstance(cdata, np.ndarray) and cdata.ndim > 1: + nd = min(cdata.ndim, len(grid_dims)) + coords[cname] = (tuple(grid_dims[:nd]), cdata) + else: + coords[cname] = cdata + + ddh5_attrs: Dict[str, Any] = {} + for k, v in schemas.meta.items(): + clean_key = k.strip("_") + ddh5_attrs[clean_key] = v + ddh5_attrs["source_file"] = str(filepath.resolve()) + + try: + dataset = xr.Dataset(data_vars=data_vars, coords=coords, attrs=ddh5_attrs) + except Exception: + logger.exception("Failed to assemble xarray Dataset") + raise + + for dim_name, dim_attrs in coord_attrs.items(): + if dim_name in dataset.coords: + dataset[dim_name].attrs.update(dim_attrs) + + return dataset + + +# =========================================================================== +# Gridded DDH5 export +# =========================================================================== + + +def ddh5_to_gridded_ddh5( + path: Union[str, Path], + groupname: str = "data", + output_path: Optional[Union[str, Path]] = None, + missing: MissingMode = MissingMode.PAD, + swmr: bool = True, + file_timeout: Optional[float] = None, +) -> Path: + """Convert a record-format DDH5 to a gridded DDH5. + + Reads the source DDH5, reshapes all data to the inferred + multi-dimensional grid, and writes a new ``.ddh5`` file in the same + directory. The gridded file is written with gzip compression and is + SWMR-compatible for subsequent efficient reads. + + Parameters + ---------- + path : str or Path + Path to the source ``.ddh5`` file. + groupname : str, optional + HDF5 group name. + output_path : str or Path, optional + Output path. Defaults to ``/_gridded.ddh5``. + missing : MissingMode, optional + Same semantics as :func:`ddh5_to_xarray`. + swmr : bool, optional + Attempt SWMR for reading the source. + file_timeout : float, optional + Max wait time for file access (seconds). + + Returns + ------- + Path + Path to the created gridded DDH5 file. + """ + filepath = _data_file_path(path) + if not filepath.exists(): + raise FileNotFoundError(f"Source DDH5 not found: {filepath}") + + if output_path is None: + output_path = filepath.parent / f"{filepath.stem}_gridded.ddh5" + else: + output_path = _data_file_path(output_path) + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + if swmr: + reader = DDH5SWMRReader(filepath, timeout=file_timeout or 30.0) + else: + reader = FileOpener(filepath, "r", timeout=file_timeout) + + with reader as f: + if groupname not in f: + raise ValueError(f"Group '{groupname}' not found.") + + grp = f[groupname] + + schemas = ddh5_schema( + filepath, groupname=groupname, file_timeout=file_timeout, swmr=False + ) + grid_info = _infer_grid(grp, schemas, missing) + + all_fields: Dict[str, np.ndarray] = {} + source_meta_attrs: Dict[str, Dict[str, Any]] = {} + + for name in schemas.independents: + ds = grp[name] + raw = ds[:] + if name in grid_info.axes_order and grid_info.shape: + reshaped = _reshape_to_grid( + raw, grid_info, missing + ) + all_fields[name] = reshaped + else: + all_fields[name] = raw + source_meta_attrs[name] = { + a: deh5ify(grp[name].attrs[a]) + for a in grp[name].attrs + if a.startswith("__") + } + + for name in schemas.dependents: + ds = grp[name] + raw = ds[:] + if grid_info.shape and grid_info.shape != (schemas.nrecords,): + reshaped = _reshape_to_grid( + raw, grid_info, missing + ) + all_fields[name] = reshaped + else: + all_fields[name] = raw + source_meta_attrs[name] = { + a: deh5ify(grp[name].attrs[a]) + for a in grp[name].attrs + if a.startswith("__") + } + + t = time.localtime() + tsec = time.mktime(t) + tstr = time.strftime(_TIMESTRFORMAT, t) + + with h5py.File(str(output_path), "w", libver="latest") as outf: + out_grp = outf.create_group(groupname) + + try: + out_grp.attrs["__creation_time_sec__"] = tsec + except Exception: + out_grp.attrs["__creation_time_sec__"] = str(tsec) + try: + out_grp.attrs["__creation_time_str__"] = tstr + except Exception: + pass + + for k, v in schemas.meta.items(): + _set_h5_attr(out_grp, k, v) + + _set_h5_attr(out_grp, "__gridded__", True) + _set_h5_attr(out_grp, "__grid_shape__", list(grid_info.shape)) + _set_h5_attr(out_grp, "__grid_axes__", grid_info.axes_order) + _set_h5_attr(out_grp, "__source_file__", str(filepath.resolve())) + + for name, data in all_fields.items(): + fi = schemas.fields[name] + maxshp = (None,) * data.ndim if data.ndim > 0 else (None,) + try: + out_ds = out_grp.create_dataset( + name, + data=data, + maxshape=maxshp, + compression="gzip", + compression_opts=4, + ) + except Exception: + out_ds = out_grp.create_dataset( + name, + data=data, + maxshape=maxshp, + ) + + if fi.axes: + _set_h5_attr(out_ds, "axes", fi.axes) + if fi.unit: + _set_h5_attr(out_ds, "unit", fi.unit) + if fi.label and fi.label != name: + _set_h5_attr(out_ds, "label", fi.label) + _set_h5_attr(out_ds, "__creation_time_sec__", tsec) + + for attr_name, attr_val in source_meta_attrs.get(name, {}).items(): + _set_h5_attr(out_ds, attr_name, attr_val) + + outf.flush() + + logger.info("Gridded DDH5 written: %s", output_path) + return output_path + + +# =========================================================================== +# Layer 5 -- Validation +# =========================================================================== + + +def validate_ddh5( + path: Union[str, Path], + groupname: str = "data", + file_timeout: Optional[float] = None, +) -> DDH5ValidationReport: + """Validate a DDH5 file for metadata and structural consistency. + + Checks performed: + 1. Group existence and accessibility. + 2. All axes referenced by dependents exist as datasets. + 3. Axes must be independents (empty ``axes``). + 4. All datasets share the same record-count (first dimension length). + 5. For gridded data, check axis monotonicity. + 6. Units and labels presence is noted but not enforced. + + Parameters + ---------- + path : str or Path + Path to the ``.ddh5`` file. + groupname : str, optional + HDF5 group name. + file_timeout : float, optional + Max wait time for file access. + + Returns + ------- + DDH5ValidationReport + Structured report with ``is_valid``, ``errors``, and ``warnings``. + """ + errors: List[str] = [] + warnings_list: List[str] = [] + schemas: Optional[DDH5Schema] = None + grid_info: Optional[GridInfo] = None + + filepath = _data_file_path(path) + if not filepath.exists(): + errors.append(f"File not found: {filepath}") + return DDH5ValidationReport( + is_valid=False, errors=errors, warnings_list=warnings_list + ) + + try: + schemas = ddh5_schema(filepath, groupname=groupname, file_timeout=file_timeout) + except Exception as exc: + errors.append(f"Schema discovery failed: {exc}") + return DDH5ValidationReport( + is_valid=False, errors=errors, warnings_list=warnings_list + ) + + with FileOpener(filepath, "r", timeout=file_timeout) as f: + grp = f[groupname] + + # --- Axes existence --- + for dep_name in schemas.dependents: + for ax in schemas.fields[dep_name].axes: + if ax not in schemas.fields: + errors.append( + f"Dependent '{dep_name}' references axis '{ax}' " + f"which is not a dataset." + ) + + # --- Record count consistency --- + lens: Dict[str, int] = {} + for name, fi in schemas.fields.items(): + try: + ds = grp[name] + lens[name] = ds.shape[0] + except Exception as exc: + errors.append(f"Cannot read shape of '{name}': {exc}") + + if lens: + unique_lens = set(lens.values()) + if len(unique_lens) > 1: + min_len = min(unique_lens) + offenders = [ + f"{n}={l}" for n, l in lens.items() if l != min_len + ] + warnings_list.append( + f"Record counts differ across datasets. Minimum={min_len}. " + f"Offenders: {', '.join(offenders)}" + ) + + # --- Grid inference --- + try: + grid_info = _infer_grid(grp, schemas, MissingMode.PAD) + except GridInferenceError as exc: + warnings_list.append(f"Grid inference: {exc}") + except Exception as exc: + warnings_list.append(f"Grid inference unexpected error: {exc}") + + # --- Monotonicity check for gridded axes --- + if grid_info and grid_info.axes_order: + for axis_idx, ax_name in enumerate(grid_info.axes_order): + fi = schemas.fields[ax_name] + if len(fi.shape) != 1: + continue + try: + ax_data = grp[ax_name][:] + if grid_info.shape and axis_idx < len(grid_info.shape): + reshaped = np.reshape( + ax_data[: grid_info.expected_len], grid_info.shape + ) + slices_list = [0] * len(grid_info.shape) + slices_list[axis_idx] = slice(None) + sliced = reshaped[tuple(slices_list)] + if len(sliced) > 1: + diffs = np.diff(sliced.astype(float)) + diffs = diffs[~np.isnan(diffs)] + if len(diffs) > 0: + signs = np.sign(diffs) + unique_signs = np.unique(signs[signs != 0]) + if 0 in unique_signs or len(unique_signs) > 1: + warnings_list.append( + f"Axis '{ax_name}' is not monotonic." + ) + except Exception as exc: + warnings_list.append( + f"Monotonicity check failed for '{ax_name}': {exc}" + ) + + # --- NaN presence in axis data --- + for ax_name in schemas.independents: + try: + ax_data = grp[ax_name][:] + if np.any(np.isnan(ax_data.astype(float))): + warnings_list.append(f"Axis '{ax_name}' contains NaN values.") + except Exception: + pass + + is_valid = len(errors) == 0 + return DDH5ValidationReport( + is_valid=is_valid, + errors=errors, + warnings_list=warnings_list, + schema=schemas, + grid_info=grid_info, + ) + + +# =========================================================================== +# Utility +# =========================================================================== + + +def ddh5_info( + path: Union[str, Path], + groupname: str = "data", + file_timeout: Optional[float] = None, +) -> str: + """Return a human-readable summary of DDH5 contents. + + Includes field names, shapes, axes relationships, units, and + grid information. + + Parameters + ---------- + path : str or Path + Path to the ``.ddh5`` file. + groupname : str, optional + HDF5 group name. + file_timeout : float, optional + Max wait time for file access. + + Returns + ------- + str + """ + filepath = _data_file_path(path) + if not filepath.exists(): + return f"DDH5 file not found: {filepath}" + + schemas = ddh5_schema( + path, groupname=groupname, file_timeout=file_timeout, swmr=False + ) + + lines: List[str] = [] + lines.append(f"DDH5: {filepath}") + lines.append(f" Group: /{groupname}") + lines.append(f" Records: {schemas.nrecords}") + lines.append(f" Fields: {len(schemas.fields)}") + lines.append(f" Dependents: {len(schemas.dependents)}") + lines.append(f" Independents: {len(schemas.independents)}") + + if schemas.dependents: + lines.append("") + lines.append(" Dependents:") + for name in schemas.dependents: + fi = schemas.fields[name] + axes_str = ", ".join(fi.axes) if fi.axes else "(none)" + unit_str = f" [{fi.unit}]" if fi.unit else "" + lines.append( + f" {name}{unit_str} shape={fi.shape} axes=({axes_str})" + ) + + if schemas.independents: + lines.append("") + lines.append(" Independents:") + for name in schemas.independents: + fi = schemas.fields[name] + unit_str = f" [{fi.unit}]" if fi.unit else "" + deps_str = f" axes=({', '.join(fi.axes)})" if fi.axes else "" + lines.append(f" {name}{unit_str} shape={fi.shape}{deps_str}") + + if schemas.meta: + lines.append("") + lines.append(" Global meta:") + for k, v in schemas.meta.items(): + clean_k = k.strip("_") + lines.append(f" {clean_k}: {v}") + + return "\n".join(lines) + + +# =========================================================================== +# SWMR Writing +# =========================================================================== + + +def _add_cur_time_attr(h5obj: Any, name: str = "creation") -> None: + """Add current time as HDF5 attributes to *h5obj*. + + Writes ``___time_sec__`` and ``___time_str__``. + """ + t = time.localtime() + tsec = time.mktime(t) + tstr = time.strftime(_TIMESTRFORMAT, t) + _set_h5_attr(h5obj, f"__{name}_time_sec__", tsec) + _set_h5_attr(h5obj, f"__{name}_time_str__", tstr) + + +def _auto_chunks( + shape: Tuple[int, ...], + dtype: np.dtype, + chunk_size: Optional[int] = None, +) -> Tuple[int, ...]: + """Compute an optimised chunk shape for SWMR HDF5 datasets. + + Chunks along the first axis only; inner dimensions are kept + contiguous. Targets ~1 MB per chunk unless *chunk_size* is + explicitly given. + """ + if not shape: + return (1,) + + target_bytes = 1024 * 1024 # 1 MB + inner_elems = int(np.prod(shape[1:])) if len(shape) > 1 else 1 + bytes_per_row = max(1, int(dtype.itemsize) * max(1, inner_elems)) + + if chunk_size is not None: + rows = chunk_size + else: + rows = max(1, min(1024, target_bytes // bytes_per_row)) + + return (rows,) + shape[1:] + + +class DDH5Writer_swmr: + """Context manager for SWMR (Single Writer Multiple Reader) DDH5 writing. + + Creates a ``.ddh5`` file with chunked datasets suitable for + concurrent reads while data is being appended. Uses h5py's + ``libver='latest'`` and SWMR mode underneath. + + Chunk sizes are auto-optimised from dtype and shape but can be + overridden via *chunk_size*. + + .. note:: + HDF5 attributes cannot be modified once SWMR mode is active. + Only dataset resizing and data writing are permitted during + ``add_data`` calls. + + Parameters + ---------- + structure : dict + Field definitions. Each key is a dataset name; each value is a + dict optionally containing: + + - ``axes`` : list[str] -- axis names (empty or absent for + independent axes) + - ``unit`` : str -- physical unit + - ``label`` : str -- human-readable label + - ``values`` : np.ndarray -- optional initial data (determines + shape and dtype) + - ``dtype`` : numpy dtype -- if *values* is not given (default + ``float64``) + - ``shape`` : tuple[int] -- inner dimensions past the record + axis (required if *values* is not given) + + basedir : str or Path, optional + Root directory for auto-generated data folders. + groupname : str, optional + HDF5 group name (default ``"data"``). + name : str, optional + Dataset name, used in folder and metadata. + filename : str, optional + Filename stem (default ``"data"``). + filepath : str or Path, optional + Explicit output path. Overrides *basedir*/*name*/*filename*. + chunk_size : int, optional + Override auto-computed chunk size (rows per chunk). + """ + + def __init__( + self, + structure: Dict[str, Dict[str, Any]], + basedir: Union[str, Path] = ".", + groupname: str = "data", + name: Optional[str] = None, + filename: str = "data", + filepath: Optional[Union[str, Path]] = None, + chunk_size: Optional[int] = None, + ): + self._structure = structure + self.basedir = Path(basedir) + self.name = name or "" + self.groupname = groupname + self.filename = Path(filename) + self._chunk_size = chunk_size + self._filepath: Optional[Path] = ( + Path(filepath) if filepath is not None else None + ) + self._h5file: Optional[h5py.File] = None + self._swmr_active: bool = False + self._uuid = uuid.uuid1() + self._datasets_meta: Dict[str, Dict[str, Any]] = {} + + @property + def filepath(self) -> Path: + assert self._filepath is not None + return self._filepath + + # -- path helpers --------------------------------------------------- + + def data_folder(self) -> Path: + """Return the sub-folder relative to *basedir*.""" + ID = str(self._uuid).split("-")[0] + parent = ( + f"{datetime.datetime.now().replace(microsecond=0).isoformat().replace(':', '')}" + f"_{ID}" + ) + if self.name: + parent += f"-{self.name}" + return Path(time.strftime("%Y-%m-%d"), parent) + + def data_file_path(self) -> Path: + """Determine the output filepath, avoiding clashes with existing folders.""" + folder = Path(self.basedir, self.data_folder()) + appendix = "" + idx = 2 + while folder.exists(): + appendix = f"-{idx}" + folder = Path(self.basedir, str(self.data_folder()) + appendix) + idx += 1 + return Path(folder, self.filename) + + # -- context manager ------------------------------------------------- + + def __enter__(self) -> "DDH5Writer_swmr": + if self._filepath is None: + self._filepath = _data_file_path(self.data_file_path()) + self._filepath.parent.mkdir(parents=True, exist_ok=True) + else: + self._filepath = _data_file_path(self._filepath) + self._filepath.parent.mkdir(parents=True, exist_ok=True) + + logger.info("SWMR data location: %s", self._filepath) + + # Phase 1: create file with chunked datasets (not SWMR yet) + with h5py.File(str(self._filepath), "w", libver="latest") as f: + grp = f.create_group(self.groupname) + _add_cur_time_attr(grp, "creation") + + if self.name: + _set_h5_attr(grp, "dataset_name", self.name) + + self._datasets_meta = {} + for field_name, info in self._structure.items(): + values = info.get("values", None) + axes = info.get("axes", []) + unit = info.get("unit", "") + label = info.get("label", "") + + if values is not None: + data = np.asarray(values) + dtype = data.dtype + inner_shape = data.shape[1:] + shp = data.shape + else: + dtype = np.dtype(info.get("dtype", "float64")) + inner_shape = tuple(info.get("shape", ())) + shp = (0,) + inner_shape + data = np.empty(shp, dtype=dtype) + + chunks = _auto_chunks(shp, dtype, self._chunk_size) + maxshp = (None,) + inner_shape + + ds = grp.create_dataset( + field_name, + data=data, + maxshape=maxshp, + chunks=chunks, + compression="gzip", + compression_opts=4, + ) + + _add_cur_time_attr(ds) + if axes: + _set_h5_attr(ds, "axes", axes) + if unit: + _set_h5_attr(ds, "unit", unit) + if label and label != field_name: + _set_h5_attr(ds, "label", label) + + self._datasets_meta[field_name] = { + "inner_shape": inner_shape, + "dtype": dtype, + "nrecords": shp[0], + } + + f.flush() + + # Phase 2: reopen for append writes (try SWMR, fall back) + try: + self._h5file = h5py.File(str(self._filepath), "r+") + self._h5file.swmr_mode = True + self._swmr_active = True + except Exception: + logger.info( + "SWMR write mode not available, using regular mode", + exc_info=True, + ) + # Close the stale handle from the failed SWMR attempt + stale = self._h5file + self._h5file = None + if stale is not None: + try: + stale.close() + except Exception: + pass + self._h5file = h5py.File(str(self._filepath), "r+") + self._swmr_active = False + + return self + + def __exit__( + self, + exc_type: Optional[type], + exc_value: Optional[BaseException], + exc_tb: Any, + ) -> None: + close_time_written = False + if self._h5file is not None: + try: + if not self._swmr_active and self.groupname in self._h5file: + _add_cur_time_attr( + self._h5file[self.groupname], "close" + ) + close_time_written = True + self._h5file.flush() + finally: + self._h5file.close() + self._h5file = None + self._swmr_active = False + + # In SWMR mode attrs were read-only — reopen to write close time. + if not close_time_written and self._filepath is not None and self._filepath.exists(): + try: + with h5py.File(str(self._filepath), "r+") as f: + if self.groupname in f: + _add_cur_time_attr(f[self.groupname], "close") + f.flush() + except Exception: + logger.debug("Could not write close time attrs", exc_info=True) + + if exc_type is None: + self.add_tag("__complete__") + else: + self.add_tag("__interrupted__") + + # -- data writing ---------------------------------------------------- + + def add_data(self, **kwargs: Any) -> None: + """Append data to the DDH5 file. + + Each keyword argument must match a field name defined in + *structure*. Data arrays must have matching inner dimensions + and must all have the same number of records (first axis). + Datasets are resized, written, and flushed so that SWMR + readers see the update immediately. + """ + if self._h5file is None: + raise RuntimeError( + "DDH5Writer_swmr is not active (use as context manager)." + ) + + grp = self._h5file[self.groupname] + + # Validate and prepare + nrows: Optional[int] = None + prepared: Dict[str, np.ndarray] = {} + for name, data in kwargs.items(): + if name not in self._datasets_meta: + raise KeyError( + f"Field '{name}' was not defined in structure. " + f"Known fields: {list(self._datasets_meta)}." + ) + arr = np.asarray(data) + if arr.ndim == 0: + arr = arr.reshape(1) + expected_inner = self._datasets_meta[name]["inner_shape"] + if arr.shape[1:] != expected_inner: + raise ValueError( + f"Field '{name}': expected inner shape {expected_inner}, " + f"got {arr.shape[1:]}" + ) + if nrows is None: + nrows = arr.shape[0] + elif arr.shape[0] != nrows: + raise ValueError( + f"All fields in add_data must have the same number of " + f"records. '{name}' has {arr.shape[0]}, expected {nrows}." + ) + prepared[name] = arr + + if nrows is None or nrows == 0: + return + + # Write each field + for name, arr in prepared.items(): + ds = grp[name] + old_len = ds.shape[0] + new_len = old_len + nrows + ds.resize(new_len, axis=0) + ds[old_len:new_len] = arr + self._datasets_meta[name]["nrecords"] = new_len + + self._h5file.flush() + + # -- tags & filesystem helpers --------------------------------------- + + def add_tag(self, tags: Union[str, Collection[str]]) -> None: + """Create ``.tag`` marker files in the data directory. + + Parameters + ---------- + tags : str or collection of str + Tag name(s). A file ``.tag`` is created for each. + """ + assert self._filepath is not None + if isinstance(tags, str): + tags = [tags] + for tag in tags: + tagpath = self._filepath.parent / f"{tag}.tag" + if not tagpath.exists(): + tagpath.touch() + + def backup_file(self, paths: Union[str, Collection[str]]) -> None: + """Copy one or more files into the data directory. + + Parameters + ---------- + paths : str or collection of str + Path(s) to copy. + """ + assert self._filepath is not None + if isinstance(paths, str): + paths = [paths] + for p in paths: + shutil.copy(p, self._filepath.parent) + + def save_text(self, fname: str, text: str) -> None: + """Write a text file in the data directory. + + Parameters + ---------- + fname : str + Output filename (created in the data directory). + text : str + Text content. + """ + assert self._filepath is not None + out = self._filepath.parent / fname + out.write_text(text, encoding="utf-8") + + def save_dict(self, fname: str, d: Dict[str, Any]) -> None: + """Write a JSON file in the data directory. + + Parameters + ---------- + fname : str + Output filename (created in the data directory). + d : dict + Object to serialise. + """ + assert self._filepath is not None + out = self._filepath.parent / fname + out.write_text( + json.dumps(d, indent=4, ensure_ascii=False, cls=NumpyEncoder), + encoding="utf-8", + ) diff --git a/src/labcore/measurement/__init__.py b/src/labcore/measurement/__init__.py index 68393cf..058d960 100644 --- a/src/labcore/measurement/__init__.py +++ b/src/labcore/measurement/__init__.py @@ -29,3 +29,4 @@ from .sweep import once as once from .sweep import pointer as pointer from .sweep import sweep_parameter as sweep_parameter +from .storage import run_and_save_sweep, run_and_save_sweep_swmr \ No newline at end of file diff --git a/src/labcore/measurement/storage.py b/src/labcore/measurement/storage.py index b04ecac..99bd415 100644 --- a/src/labcore/measurement/storage.py +++ b/src/labcore/measurement/storage.py @@ -29,6 +29,7 @@ from ..data.datadict import DataDict from ..data.datadict_storage import DDH5Writer +from ..data.ddh5_xr import DDH5Writer_swmr, ddh5_to_xarray from .sweep import Sweep __author__ = "Wolfgang Pfaff" @@ -69,6 +70,19 @@ def _create_datadict_structure(sweep: Sweep) -> DataDict: return data_dict +def _create_swmr_structure(sweep: Sweep) -> Dict[str, Dict[str, Any]]: + data_specs = sweep.get_data_specs() + structure: Dict[str, Dict[str, Any]] = {} + for spec in data_specs: + entry: Dict[str, Any] = {} + if spec.depends_on is not None: + entry['axes'] = spec.depends_on + if spec.unit: + entry['unit'] = spec.unit + entry['label'] = spec.name + structure[spec.name] = entry + return structure + def _check_none(line: Dict, all: bool = True) -> bool: """ @@ -250,3 +264,211 @@ def run_and_save_sweep( ) ret = (dir, data_dict) if return_data else (dir, None) return ret + + + +def _broadcast_line(line: Dict[str, Any]) -> Dict[str, np.ndarray]: + seqtypes = (np.ndarray, tuple, list) + records: Dict[str, np.ndarray] = {} + for name, val in line.items(): + if isinstance(val, seqtypes): + records[name] = np.asarray(val) + elif val is None: + records[name] = np.array([np.nan]) + else: + records[name] = np.array([val]) + + possible = {name: [1, arr.shape[0]] for name, arr in records.items()} + commons = [] + for name, opts in possible.items(): + for n in opts: + if n in commons: + continue + if all(n in other for other in possible.values()): + commons.append(n) + nrecs = max(commons) + + for name, arr in records.items(): + if nrecs == 1 and arr.shape[0] > 1: + records[name] = arr.reshape((1,) + arr.shape) + + return records + + +def run_and_save_sweep_swmr(sweep: Sweep, + data_dir: str, + name: str, + ignore_all_None_results: bool = True, + save_action_kwargs: bool = False, + add_timestamps = False, + archive_files: Optional[List[str]] = None, + return_data: bool = False, + chunk_size: Optional[int] = None, + **extra_saving_items) -> Tuple[Union[str, Path], Optional[Any]]: + """ + Iterates through a sweep, saving data via SWMR writer for + concurrent read-while-write access. + + Uses :class:`DDH5Writer_swmr` which creates chunked HDF5 datasets + suitable for live monitoring. Returns an :class:`xarray.Dataset` + when *return_data* is ``True`` (via :func:`ddh5_to_xarray`). + + :param sweep: Sweep object to iterate through. + :param data_dir: Directory of file location. + :param name: Name of the file. + :param ignore_all_None_results: if ``True``, don't save any records that contain a ``None``. + if ``False``, only do not save records that are all-``None``. + :param save_action_kwargs: If ``True``, the action_kwargs of the sweep will be saved + as a json file in the same directory as the data. + :param add_timestamps: If ``True``, prepend timestamps to auxiliary file names. + :param archive_files: List of files to copy into a folder called 'archive_files' + in the same directory that the data is saved. + :param return_data: If ``True``, return an :class:`xarray.Dataset` read back from + the saved file via :func:`ddh5_to_xarray`. + :param chunk_size: Row chunk size for SWMR datasets. If ``None``, + auto-computed from dtype and shape. + :param extra_saving_items: Kwargs for extra objects that should be saved. + Dictionaries are saved as JSON (falling back to pickle), other objects + are pickled. + + :raises TypeError: A Typerror is raised if the object passed for archive_files + is not correct. + """ + structure = _create_swmr_structure(sweep) + + sweep_iter = iter(sweep) + try: + first_line = next(sweep_iter) + except StopIteration: + with DDH5Writer_swmr(structure, basedir=data_dir, name=name, chunk_size=chunk_size) as writer: + dir = writer.filepath.parent + if add_timestamps: + t = time.localtime() + time_stamp = time.strftime(TIMESTRFORMAT, t) + '_' + for key, val in extra_saving_items.items(): + if callable(val): + value = val() + else: + value = val + if add_timestamps: + pickle_path_file = os.path.join(dir, time_stamp + key + '.pickle') + json_path_file = os.path.join(dir, time_stamp + key + '.json') + else: + pickle_path_file = os.path.join(dir, key + '.pickle') + json_path_file = os.path.join(dir, key + '.json') + if isinstance(value, dict): + try: + _save_dictionary(value, json_path_file) + except TypeError as error: + if os.path.isfile(json_path_file): + os.remove(json_path_file) + logging.info(f'{key} has not been able to save to json: {error.args}.' + f' The item will be pickled instead.') + _pickle_and_save(value, pickle_path_file) + else: + _pickle_and_save(value, pickle_path_file) + if save_action_kwargs: + if add_timestamps: + json_path_file = os.path.join(dir, time_stamp + 'sweep_action_kwargs.json') + else: + json_path_file = os.path.join(dir, 'sweep_action_kwargs.json') + _save_dictionary(sweep.action_kwargs, json_path_file) + logger.info('The measurement has finished successfully and all of the data has been saved.') + return dir, None + + broadcast_first = _broadcast_line(first_line) + for fname, arr in broadcast_first.items(): + if fname in structure: + inner = arr.shape[1:] + if inner: + structure[fname]['shape'] = inner + structure[fname]['dtype'] = arr.dtype + + try: + with DDH5Writer_swmr(structure, basedir=data_dir, name=name, chunk_size=chunk_size) as writer: + + dir: Path = writer.filepath.parent + if add_timestamps: + t = time.localtime() + time_stamp = time.strftime(TIMESTRFORMAT, t) + '_' + + for key, val in extra_saving_items.items(): + if callable(val): + value = val() + else: + value = val + + if add_timestamps: + pickle_path_file = os.path.join(dir, time_stamp + key + '.pickle') + json_path_file = os.path.join(dir, time_stamp + key + '.json') + else: + pickle_path_file = os.path.join(dir, key + '.pickle') + json_path_file = os.path.join(dir, key + '.json') + + if isinstance(value, dict): + try: + _save_dictionary(value, json_path_file) + except TypeError as error: + if os.path.isfile(json_path_file): + os.remove(json_path_file) + logging.info(f'{key} has not been able to save to json: {error.args}.' + f' The item will be pickled instead.') + _pickle_and_save(value, pickle_path_file) + else: + _pickle_and_save(value, pickle_path_file) + + if save_action_kwargs: + if add_timestamps: + json_path_file = os.path.join(dir, time_stamp + 'sweep_action_kwargs.json') + else: + json_path_file = os.path.join(dir, 'sweep_action_kwargs.json') + _save_dictionary(sweep.action_kwargs, json_path_file) + + if archive_files is not None: + archive_files_dir = os.path.join(dir, 'archive_files') + os.mkdir(archive_files_dir) + if not isinstance(archive_files, list) and not isinstance(archive_files, tuple): + if isinstance(archive_files, str): + archive_files = [archive_files] + else: + raise TypeError(f'{type(archive_files)} is not a list.') + for path in archive_files: + if os.path.isdir(path): + folder_name = os.path.basename(path) + if folder_name == '': + folder_name = os.path.basename(os.path.dirname(path)) + shutil.copytree(path, os.path.join(archive_files_dir, folder_name), dirs_exist_ok=True) + elif os.path.isfile(path): + shutil.copy(path, archive_files_dir) + else: + matches = glob.glob(path, recursive=True) + if len(matches) == 0: + logging.info(f'{path} could not be found. Measurement will continue without archiving {path}') + for file in matches: + shutil.copy(file, archive_files_dir) + + if not _check_none(first_line, all=ignore_all_None_results): + writer.add_data(**broadcast_first) + while True: + try: + line = next(sweep_iter) + except StopIteration: + break + if not _check_none(line, all=ignore_all_None_results): + writer.add_data(**_broadcast_line(line)) + + except KeyboardInterrupt: + logger.warning('Sweep stopped by Keyboard interrupt. Data completed before interrupt should be saved.') + if return_data: + try: + dataset = ddh5_to_xarray(writer.filepath) + return dir, dataset + except Exception: + return dir, None + return dir, None + + logger.info('The measurement has finished successfully and all of the data has been saved.') + if return_data: + return dir, ddh5_to_xarray(writer.filepath) + return dir, None +