diff --git a/src/mdio/builder/templates/base.py b/src/mdio/builder/templates/base.py index 544e290f..644ba91c 100644 --- a/src/mdio/builder/templates/base.py +++ b/src/mdio/builder/templates/base.py @@ -18,6 +18,7 @@ from mdio.builder.schemas.v1.units import AllUnitModel from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.schemas.v1.variable import VariableMetadata +from mdio.builder.templates.types import CoordinateSpec if TYPE_CHECKING: from mdio.builder.schemas.v1.dataset import Dataset @@ -43,6 +44,13 @@ def __init__(self, data_domain: SeismicDataDomain) -> None: self._physical_coord_names: tuple[str, ...] = () self._logical_coord_names: tuple[str, ...] = () self._var_chunk_shape: tuple[int, ...] = () + self.synthesize_missing_dims: tuple[str, ...] = () + + # TEMPORARY (removed with declare_coordinate_specs): set when grid overrides mutate this + # template in-place (dims collapsed into 'trace', extra coordinates added). Once mutated, + # the runtime layout intentionally diverges from the static declare_coordinate_specs() + # contract, so the drift guard in build_dataset() must not run. + self._grid_overrides_applied: bool = False self._builder: MDIODatasetBuilder | None = None self._dim_sizes: tuple[int, ...] = () @@ -67,6 +75,45 @@ def _repr_html_(self) -> str: """Return an HTML representation of the template for Jupyter notebooks.""" return template_repr_html(self) + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare the non-dimension coordinate specs (name, dims, dtype) for this template. + + The ingestion ``SchemaResolver`` uses these specs to determine which trace-header + fields to read and how to rewrite coordinate dimensions under grid overrides. + + .. note:: + TEMPORARY (to be removed before the next minor release): these specs currently + duplicate the non-dimension coordinates created in :meth:`_add_coordinates`. + :meth:`build_dataset` validates that the two stay in sync (see + :meth:`_validate_declared_coordinate_specs`). Once the ingestion pipeline builds + datasets directly from the resolved schema, ``_add_coordinates`` will be derived + from these specs and the duplication will disappear. + + The default implementation assumes every non-dimension coordinate spans **all** + spatial dimensions. Subclasses whose coordinates span only a subset (or use a + non-default dtype) must override this method, otherwise ``build_dataset`` raises. + + Returns: + The declared non-dimension coordinate specs. + """ + specs = [ + CoordinateSpec( + name=coord_name, + dimensions=self.spatial_dimension_names, + dtype=ScalarType.FLOAT64, + ) + for coord_name in self.physical_coordinate_names + ] + specs.extend( + CoordinateSpec( + name=coord_name, + dimensions=self.spatial_dimension_names, + dtype=ScalarType.UINT8 if coord_name == "gun" else ScalarType.INT32, + ) + for coord_name in self.logical_coordinate_names + ) + return tuple(specs) + def build_dataset( self, name: str, @@ -107,6 +154,10 @@ def build_dataset( except ValueError as exc: # coordinate may already exist if "same name twice" not in str(exc): raise + # Skip the static drift guard when grid overrides have transformed the template: the + # runtime layout no longer matches the declared (override-free) specs by design. + if not self._grid_overrides_applied: + self._validate_declared_coordinate_specs() self._add_variables() self._add_trace_mask() @@ -123,6 +174,80 @@ def add_units(self, units: dict[str, AllUnitModel]) -> None: raise ValueError(msg) self._units |= units + def apply_resolved_dimensions( + self, + dim_names: tuple[str, ...], + chunk_shape: tuple[int, ...], + ) -> None: + """Update the template's dimension layout from a resolved schema. + + Supported entry point for the ingestion pipeline to push back dimension names + and chunk shape after the SchemaResolver has applied grid overrides + (e.g. NonBinned, HasDuplicates), instead of mutating private attributes. + + Args: + dim_names: Final ordered dimension names. + chunk_shape: Chunk shape matching ``dim_names`` length. + + Raises: + ValueError: If ``len(chunk_shape) != len(dim_names)``. + """ + if len(chunk_shape) != len(dim_names): + msg = f"chunk_shape length {len(chunk_shape)} does not match dim_names length {len(dim_names)}" + raise ValueError(msg) + self._dim_names = tuple(dim_names) + self._var_chunk_shape = tuple(chunk_shape) + + def _validate_declared_coordinate_specs(self) -> None: + """Fail the build if :meth:`declare_coordinate_specs` drifted from the built coordinates. + + TEMPORARY (to be removed before the next minor release): while + :meth:`declare_coordinate_specs` duplicates the non-dimension coordinates created in + :meth:`_add_coordinates`, this guard ensures the two never diverge in name, dimensions, + or dtype. The ingestion ``SchemaResolver`` trusts the declared specs, so silent drift + would corrupt resolved schemas. The check runs for every template (built-in and + user-defined) on every ``build_dataset`` call that does not apply grid overrides. Grid + overrides mutate the template in-place (collapsing dims into ``trace`` and adding + coordinates), so the runtime layout intentionally diverges from the declared specs and + the guard is skipped for those builds. It is removed once ``_add_coordinates`` is derived + from the resolved schema and the duplication no longer exists. + + Raises: + ValueError: If the declared specs do not match the built non-dimension coordinates. + """ + dim_names = set(self._dim_names) + built = {coord.name: coord for coord in self._builder._coordinates if coord.name not in dim_names} + declared = {spec.name: spec for spec in self.declare_coordinate_specs()} + + if set(declared) != set(built): + built_only = sorted(set(built) - set(declared)) + declared_only = sorted(set(declared) - set(built)) + msg = ( + f"declare_coordinate_specs() for template {self.name!r} is out of sync with the " + f"coordinates built by _add_coordinates(). Built but not declared: {built_only}. " + f"Declared but not built: {declared_only}. Override declare_coordinate_specs() so " + f"it matches the non-dimension coordinates this template creates." + ) + raise ValueError(msg) + + for coord_name, spec in declared.items(): + coord = built[coord_name] + built_dims = tuple(dim.name for dim in coord.dimensions) + if built_dims != spec.dimensions: + msg = ( + f"declare_coordinate_specs() for template {self.name!r} declares coordinate " + f"{coord_name!r} over dimensions {spec.dimensions}, but _add_coordinates() built " + f"it over {built_dims}." + ) + raise ValueError(msg) + if coord.data_type != spec.dtype: + msg = ( + f"declare_coordinate_specs() for template {self.name!r} declares coordinate " + f"{coord_name!r} as {spec.dtype}, but _add_coordinates() built it as " + f"{coord.data_type}." + ) + raise ValueError(msg) + @property def name(self) -> str: """Returns the name of the template.""" diff --git a/src/mdio/builder/templates/seismic_2d_cdp.py b/src/mdio/builder/templates/seismic_2d_cdp.py index 3dc6f9d4..fd2ca808 100644 --- a/src/mdio/builder/templates/seismic_2d_cdp.py +++ b/src/mdio/builder/templates/seismic_2d_cdp.py @@ -8,6 +8,7 @@ from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate from mdio.builder.templates.types import CdpGatherDomain +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -35,6 +36,13 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "2D", "gatherType": "cdp"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare CDP-indexed X/Y coordinates for the 2D CDP gathers template.""" + return ( + CoordinateSpec(name="cdp_x", dimensions=("cdp",), dtype=ScalarType.FLOAT64), + CoordinateSpec(name="cdp_y", dimensions=("cdp",), dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates self._builder.add_coordinate( diff --git a/src/mdio/builder/templates/seismic_2d_streamer_shot.py b/src/mdio/builder/templates/seismic_2d_streamer_shot.py index 40a1931a..4999fba8 100644 --- a/src/mdio/builder/templates/seismic_2d_streamer_shot.py +++ b/src/mdio/builder/templates/seismic_2d_streamer_shot.py @@ -6,6 +6,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -26,6 +27,17 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "2D", "gatherType": "common_source"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare shot- and receiver-indexed coordinates for the 2D streamer shot gathers template.""" + shot_dim = ("shot_point",) + receiver_dims = ("shot_point", "channel") + return ( + CoordinateSpec(name="source_coord_x", dimensions=shot_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="source_coord_y", dimensions=shot_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_x", dimensions=receiver_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_y", dimensions=receiver_dims, dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates for name in self._dim_names: diff --git a/src/mdio/builder/templates/seismic_3d_cdp.py b/src/mdio/builder/templates/seismic_3d_cdp.py index 5948bab3..6490a7de 100644 --- a/src/mdio/builder/templates/seismic_3d_cdp.py +++ b/src/mdio/builder/templates/seismic_3d_cdp.py @@ -8,6 +8,7 @@ from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate from mdio.builder.templates.types import CdpGatherDomain +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -35,6 +36,13 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "cdp"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare inline/crossline-indexed X/Y coordinates for the 3D CDP gathers template.""" + return ( + CoordinateSpec(name="cdp_x", dimensions=("inline", "crossline"), dtype=ScalarType.FLOAT64), + CoordinateSpec(name="cdp_y", dimensions=("inline", "crossline"), dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates self._builder.add_coordinate( diff --git a/src/mdio/builder/templates/seismic_3d_coca.py b/src/mdio/builder/templates/seismic_3d_coca.py index 9f3b30b1..fc296a33 100644 --- a/src/mdio/builder/templates/seismic_3d_coca.py +++ b/src/mdio/builder/templates/seismic_3d_coca.py @@ -6,6 +6,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -26,6 +27,13 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "common_offset_common_azimuth"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare inline/crossline-indexed X/Y coordinates for the 3D CoCA gathers template.""" + return ( + CoordinateSpec(name="cdp_x", dimensions=("inline", "crossline"), dtype=ScalarType.FLOAT64), + CoordinateSpec(name="cdp_y", dimensions=("inline", "crossline"), dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates self._builder.add_coordinate( diff --git a/src/mdio/builder/templates/seismic_3d_obn.py b/src/mdio/builder/templates/seismic_3d_obn.py index a74b5635..e5b81659 100644 --- a/src/mdio/builder/templates/seismic_3d_obn.py +++ b/src/mdio/builder/templates/seismic_3d_obn.py @@ -5,6 +5,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -49,6 +50,19 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "common_receiver"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare receiver- and shot-indexed coordinates for the 3D OBN receiver gathers template.""" + receiver_dim = ("receiver",) + shot_dims = ("shot_line", "gun", "shot_index") + return ( + CoordinateSpec(name="group_coord_x", dimensions=receiver_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_y", dimensions=receiver_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="shot_point", dimensions=shot_dims, dtype=ScalarType.UINT32), + CoordinateSpec(name="orig_field_record_num", dimensions=shot_dims, dtype=ScalarType.UINT32), + CoordinateSpec(name="source_coord_x", dimensions=shot_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="source_coord_y", dimensions=shot_dims, dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates # EXCLUDE: `shot_index` since it's 0-N (calculated dimension) diff --git a/src/mdio/builder/templates/seismic_3d_offset_tiles.py b/src/mdio/builder/templates/seismic_3d_offset_tiles.py index 65567a5a..3ec802c7 100644 --- a/src/mdio/builder/templates/seismic_3d_offset_tiles.py +++ b/src/mdio/builder/templates/seismic_3d_offset_tiles.py @@ -6,6 +6,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -33,6 +34,13 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "offset_tiles"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare inline/crossline-indexed X/Y coordinates for the 3D offset tiles template.""" + return ( + CoordinateSpec(name="cdp_x", dimensions=("inline", "crossline"), dtype=ScalarType.FLOAT64), + CoordinateSpec(name="cdp_y", dimensions=("inline", "crossline"), dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates self._builder.add_coordinate( diff --git a/src/mdio/builder/templates/seismic_3d_receiver_gathers.py b/src/mdio/builder/templates/seismic_3d_receiver_gathers.py index 4f5ef5f1..232f7a22 100644 --- a/src/mdio/builder/templates/seismic_3d_receiver_gathers.py +++ b/src/mdio/builder/templates/seismic_3d_receiver_gathers.py @@ -6,6 +6,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec class Seismic3DReceiverGathersTemplate(AbstractDatasetTemplate): @@ -32,6 +33,18 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "receiver_gathers"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare receiver- and shot-indexed coordinates for the 3D receiver gathers template.""" + receiver_dim = ("receiver",) + shot_dims = ("shot_line", "shot_index") + return ( + CoordinateSpec(name="receiver_x", dimensions=receiver_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="receiver_y", dimensions=receiver_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="shot_point", dimensions=shot_dims, dtype=ScalarType.UINT32), + CoordinateSpec(name="source_coord_x", dimensions=shot_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="source_coord_y", dimensions=shot_dims, dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates # Note: shot_index is calculated (0-N), so we don't add a coordinate for it diff --git a/src/mdio/builder/templates/seismic_3d_shot_receiver_line.py b/src/mdio/builder/templates/seismic_3d_shot_receiver_line.py index 5f5b5157..e5e2dbad 100644 --- a/src/mdio/builder/templates/seismic_3d_shot_receiver_line.py +++ b/src/mdio/builder/templates/seismic_3d_shot_receiver_line.py @@ -5,6 +5,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -32,6 +33,18 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "common_source"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare shot-line- and receiver-line-indexed coordinates for the 3D shot/receiver-line template.""" + source_dims = ("shot_line", "shot_point") + group_dims = ("receiver_line", "receiver") + return ( + CoordinateSpec(name="source_coord_x", dimensions=source_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="source_coord_y", dimensions=source_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_x", dimensions=group_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_y", dimensions=group_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="orig_field_record_num", dimensions=source_dims, dtype=ScalarType.UINT32), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates self._builder.add_coordinate( diff --git a/src/mdio/builder/templates/seismic_3d_streamer_field.py b/src/mdio/builder/templates/seismic_3d_streamer_field.py index 9036320a..c2613e6a 100644 --- a/src/mdio/builder/templates/seismic_3d_streamer_field.py +++ b/src/mdio/builder/templates/seismic_3d_streamer_field.py @@ -5,6 +5,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -38,6 +39,19 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyDimensionality": "3D", "gatherType": "common_source"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare shot- and receiver-indexed coordinates for the 3D streamer field records template.""" + shot_dims = ("sail_line", "gun", "shot_index") + receiver_dims = ("sail_line", "gun", "shot_index", "cable", "channel") + return ( + CoordinateSpec(name="orig_field_record_num", dimensions=shot_dims, dtype=ScalarType.UINT32), + CoordinateSpec(name="shot_point", dimensions=shot_dims, dtype=ScalarType.UINT32), + CoordinateSpec(name="source_coord_x", dimensions=shot_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="source_coord_y", dimensions=shot_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_x", dimensions=receiver_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_y", dimensions=receiver_dims, dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates # EXCLUDE: `shot_index` since its 0-N diff --git a/src/mdio/builder/templates/seismic_3d_streamer_shot.py b/src/mdio/builder/templates/seismic_3d_streamer_shot.py index dd4b7940..4709dba3 100644 --- a/src/mdio/builder/templates/seismic_3d_streamer_shot.py +++ b/src/mdio/builder/templates/seismic_3d_streamer_shot.py @@ -6,6 +6,7 @@ from mdio.builder.schemas.dtype import ScalarType from mdio.builder.schemas.v1.variable import CoordinateMetadata from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec from mdio.builder.templates.types import SeismicDataDomain @@ -27,6 +28,18 @@ def _name(self) -> str: def _load_dataset_attributes(self) -> dict[str, Any]: return {"surveyType": "3D", "gatherType": "common_source"} + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare shot- and receiver-indexed coordinates for the 3D streamer shot gathers template.""" + shot_dim = ("shot_point",) + receiver_dims = ("shot_point", "cable", "channel") + return ( + CoordinateSpec(name="gun", dimensions=shot_dim, dtype=ScalarType.UINT8), + CoordinateSpec(name="source_coord_x", dimensions=shot_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="source_coord_y", dimensions=shot_dim, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_x", dimensions=receiver_dims, dtype=ScalarType.FLOAT64), + CoordinateSpec(name="group_coord_y", dimensions=receiver_dims, dtype=ScalarType.FLOAT64), + ) + def _add_coordinates(self) -> None: # Add dimension coordinates for name in self._dim_names: diff --git a/src/mdio/builder/templates/types.py b/src/mdio/builder/templates/types.py index ed60392c..70d9de57 100644 --- a/src/mdio/builder/templates/types.py +++ b/src/mdio/builder/templates/types.py @@ -1,8 +1,30 @@ -"""Module that contains type aliases for templates.""" +"""Type aliases and declarative specs for templates.""" from typing import Literal from typing import TypeAlias +from pydantic import BaseModel + +from mdio.builder.schemas.dtype import ScalarType + SeismicDataDomain: TypeAlias = Literal["depth", "time"] CdpGatherDomain: TypeAlias = Literal["offset", "angle"] + + +class CoordinateSpec(BaseModel): + """Specification for a non-dimension coordinate declared by a template. + + Templates declare their non-dimension coordinates via + :meth:`AbstractDatasetTemplate.declare_coordinate_specs`. The ingestion + ``SchemaResolver`` consumes these specs to build the final resolved schema. + + Attributes: + name: Coordinate name (e.g. ``"cdp_x"``, ``"gun"``, ``"source_coord_x"``). + dimensions: Names of the dimensions this coordinate is indexed by. + dtype: Data type for the coordinate. + """ + + name: str + dimensions: tuple[str, ...] + dtype: ScalarType diff --git a/src/mdio/converters/segy.py b/src/mdio/converters/segy.py index f00f7bbf..1dd9d20a 100644 --- a/src/mdio/converters/segy.py +++ b/src/mdio/converters/segy.py @@ -176,6 +176,7 @@ def _update_template_from_grid_overrides( actual_spatial_dims, ) template._dim_names = actual_spatial_dims + (template.trace_domain,) + template._grid_overrides_applied = True # If using NonBinned override, expose non-binned dims as logical coordinates on the template instance # and patch _add_coordinates to skip adding them as 1D dimension coordinates @@ -190,6 +191,7 @@ def _update_template_from_grid_overrides( to_add = tuple(n for n in non_binned_dims if n not in existing) if to_add: template._logical_coord_names = template._logical_coord_names + to_add + template._grid_overrides_applied = True # Patch _add_coordinates to skip adding non-binned dims as 1D dimension coordinates # This prevents them from being added with wrong dimensions (e.g., just "trace") diff --git a/src/mdio/ingestion/schema_resolver.py b/src/mdio/ingestion/schema_resolver.py new file mode 100644 index 00000000..f28da1df --- /dev/null +++ b/src/mdio/ingestion/schema_resolver.py @@ -0,0 +1,211 @@ +"""Schema resolution: turn a template + grid overrides into a final, ingestion-ready schema.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any + +from pydantic import BaseModel +from pydantic import Field + +from mdio.builder.templates.types import CoordinateSpec # noqa: TC001 (pydantic needs this at runtime) + +if TYPE_CHECKING: + from mdio.builder.templates.base import AbstractDatasetTemplate + from mdio.segy.geometry import GridOverrides + + +class DimensionSpec(BaseModel): + """Specification for a dimension in the final dataset. + + Attributes: + name: Dimension name (e.g. ``"inline"``, ``"shot_point"``, ``"trace"``, ``"time"``). + is_spatial: Whether this is a spatial dimension. ``False`` only for the vertical + (data-domain) dimension. + is_calculated: Whether this dimension's coordinate values are produced by an index + strategy at ingest time (e.g. ``shot_index`` from a template, or ``trace`` added + by a grid override) rather than read directly. The pipeline uses this to give a + clear error if a required strategy was not enabled. + """ + + name: str + is_spatial: bool = True + is_calculated: bool = False + + +class ResolvedSchema(BaseModel): + """Final resolved schema for dataset ingestion. + + This represents the complete, resolved schema after applying template configuration + and grid overrides. It contains everything needed to build the dataset without + any further decision-making. + + Attributes: + name: Name of the dataset/template + dimensions: Ordered list of dimension specifications + coordinates: List of coordinate specifications + chunk_shape: Tuple of chunk sizes for each dimension + metadata: Additional metadata attributes + default_variable_name: Name of the main data variable + """ + + name: str + dimensions: list[DimensionSpec] + coordinates: list[CoordinateSpec] + chunk_shape: tuple[int, ...] + metadata: dict[str, Any] = Field(default_factory=dict) + default_variable_name: str = "amplitude" + + def required_header_fields(self) -> set[str]: + """Names that must be readable from SEG-Y trace headers to materialize this schema.""" + fields = {dim.name for dim in self.dimensions if dim.is_spatial and not dim.is_calculated} + fields.update(coord.name for coord in self.coordinates) + # coordinate_scalar is always needed to scale X/Y coordinates. + fields.add("coordinate_scalar") + return fields + + def spatial_dimensions(self) -> list[DimensionSpec]: + """Get only spatial dimensions (excludes vertical/trace domain).""" + return [dim for dim in self.dimensions if dim.is_spatial] + + +class SchemaResolver: + """Resolves template + grid overrides into a final schema. + + This class takes a template and optional grid overrides and produces + a ResolvedSchema that completely specifies the dataset structure before + any data is scanned or processed. + """ + + def resolve( + self, + template: AbstractDatasetTemplate, + grid_overrides: GridOverrides | None = None, + ) -> ResolvedSchema: + """Resolve template and overrides into final schema. + + Args: + template: The MDIO dataset template + grid_overrides: Optional grid override configuration + + Returns: + ResolvedSchema with all dimensions, coordinates, and metadata resolved + """ + schema = self._template_to_schema(template) + + if grid_overrides: + schema = self._apply_override_transformations(schema, grid_overrides) + + return schema + + def _template_to_schema(self, template: AbstractDatasetTemplate) -> ResolvedSchema: + """Convert a template to a resolved schema without overrides.""" + calculated = set(template.calculated_dimension_names) + dimensions = [ + DimensionSpec(name=name, is_spatial=True, is_calculated=name in calculated) + for name in template.spatial_dimension_names + ] + dimensions.append(DimensionSpec(name=template.dimension_names[-1], is_spatial=False)) + + return ResolvedSchema( + name=template.name, + dimensions=dimensions, + coordinates=list(template.declare_coordinate_specs()), + chunk_shape=template.full_chunk_shape, + metadata=template._load_dataset_attributes() or {}, + default_variable_name=template.default_variable_name, + ) + + def _apply_override_transformations( + self, + schema: ResolvedSchema, + grid_overrides: GridOverrides, + ) -> ResolvedSchema: + """Apply grid override transformations to the schema.""" + schema_dict = schema.model_dump() + + if grid_overrides.non_binned: + schema_dict = self._apply_non_binned_transform(schema_dict, grid_overrides) + elif grid_overrides.has_duplicates: + schema_dict = self._apply_duplicate_transform(schema_dict) + + schema_dict["metadata"]["gridOverrides"] = grid_overrides.to_legacy_dict() + + return ResolvedSchema(**schema_dict) + + def _apply_non_binned_transform( + self, + schema_dict: dict, + grid_overrides: GridOverrides, + ) -> dict: + """Replace selected spatial dimensions with a single ``trace`` dimension.""" + dimensions = schema_dict["dimensions"] + chunk_shape = list(schema_dict["chunk_shape"]) + + replace_dims = grid_overrides.non_binned_dims + if replace_dims is None: + # Default: replace all spatial dims except the first. + spatial_dims = [d for d in dimensions if d["is_spatial"]] + replace_dims = [d["name"] for d in spatial_dims[1:]] if len(spatial_dims) > 1 else [] + + new_dimensions = [] + new_chunk_shape = [] + replaced_count = 0 + + for i, dim in enumerate(dimensions): + if dim["name"] in replace_dims: + replaced_count += 1 + continue + if dim["is_spatial"]: + new_dimensions.append(dim) + new_chunk_shape.append(chunk_shape[i]) + else: + if replaced_count > 0: + new_dimensions.append(DimensionSpec(name="trace", is_spatial=True, is_calculated=True).model_dump()) + new_chunk_shape.append(grid_overrides.chunksize) + new_dimensions.append(dim) + new_chunk_shape.append(chunk_shape[i]) + + schema_dict["dimensions"] = new_dimensions + schema_dict["chunk_shape"] = tuple(new_chunk_shape) + + # Rewrite coordinate dimension references: collapsed dims drop out, replaced by ``trace``. + replaced_dims_set = set(replace_dims) + updated_coordinates = [] + for coord in schema_dict["coordinates"]: + original_dims = coord["dimensions"] + had_collapsed_dims = any(d in replaced_dims_set for d in original_dims) + coord_dims = [d for d in original_dims if d not in replaced_dims_set] + + if had_collapsed_dims and replaced_count > 0: + coord_dims.append("trace") + + updated_coord = dict(coord) + updated_coord["dimensions"] = tuple(coord_dims) + updated_coordinates.append(updated_coord) + schema_dict["coordinates"] = updated_coordinates + + return schema_dict + + def _apply_duplicate_transform(self, schema_dict: dict) -> dict: + """Insert a ``trace`` dimension with chunksize 1 before the vertical dimension.""" + dimensions = schema_dict["dimensions"] + chunk_shape = list(schema_dict["chunk_shape"]) + + new_dimensions = [] + new_chunk_shape = [] + + for i, dim in enumerate(dimensions): + if dim["is_spatial"]: + new_dimensions.append(dim) + new_chunk_shape.append(chunk_shape[i]) + else: + new_dimensions.append(DimensionSpec(name="trace", is_spatial=True, is_calculated=True).model_dump()) + new_chunk_shape.append(1) + new_dimensions.append(dim) + new_chunk_shape.append(chunk_shape[i]) + + schema_dict["dimensions"] = new_dimensions + schema_dict["chunk_shape"] = tuple(new_chunk_shape) + + return schema_dict diff --git a/tests/unit/test_ingestion_schema_resolver.py b/tests/unit/test_ingestion_schema_resolver.py new file mode 100644 index 00000000..365b3df2 --- /dev/null +++ b/tests/unit/test_ingestion_schema_resolver.py @@ -0,0 +1,107 @@ +"""Unit tests for the v1.2 SchemaResolver.""" + +from __future__ import annotations + +from mdio.builder.templates.seismic_3d_cdp import Seismic3DCdpGathersTemplate +from mdio.builder.templates.seismic_3d_obn import Seismic3DObnReceiverGathersTemplate +from mdio.builder.templates.seismic_3d_streamer_shot import Seismic3DStreamerShotGathersTemplate +from mdio.ingestion.schema_resolver import SchemaResolver +from mdio.segy.geometry import GridOverrides + + +class TestSchemaResolverNoOverrides: + """Resolving a template without grid overrides mirrors the template layout.""" + + def test_streamer_shot_template_basic(self) -> None: + """A plain template resolves to its dimensions, vertical axis, and chunk shape.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + schema = SchemaResolver().resolve(template, grid_overrides=None) + + assert schema.name == "StreamerShotGathers3D" + assert [d.name for d in schema.dimensions] == ["shot_point", "cable", "channel", "time"] + assert schema.dimensions[-1].is_spatial is False + assert schema.dimensions[-1].is_calculated is False + # Default chunk shape comes straight from the template. + assert schema.chunk_shape == template.full_chunk_shape + + def test_obn_template_marks_shot_index_as_calculated(self) -> None: + """The OBN template's ``shot_index`` resolves as a calculated spatial dimension.""" + template = Seismic3DObnReceiverGathersTemplate(data_domain="time") + schema = SchemaResolver().resolve(template, grid_overrides=None) + + shot_index = next(d for d in schema.dimensions if d.name == "shot_index") + assert shot_index.is_calculated is True + assert shot_index.is_spatial is True + + def test_cdp_required_header_fields(self) -> None: + """Required header fields cover spatial dims, coordinates, and ``coordinate_scalar``.""" + template = Seismic3DCdpGathersTemplate(data_domain="time", gather_domain="offset") + schema = SchemaResolver().resolve(template, grid_overrides=None) + + # Spatial dim header keys + coordinate header keys + always-present coordinate_scalar. + required = schema.required_header_fields() + assert {"inline", "crossline", "offset", "cdp_x", "cdp_y", "coordinate_scalar"}.issubset(required) + + +class TestSchemaResolverNonBinned: + """NonBinned overrides collapse spatial dimensions into a single ``trace`` axis.""" + + def test_default_collapse_keeps_first_spatial_dim(self) -> None: + """Default NonBinned keeps the first spatial dim and collapses the rest into ``trace``.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + # Streamer shot template default chunk shape is (8, 1, 128, 2048). + schema = SchemaResolver().resolve(template, GridOverrides(non_binned=True, chunksize=64)) + + names = [d.name for d in schema.dimensions] + assert names == ["shot_point", "trace", "time"] + # shot_point keeps its original chunk (8); trace gets the override (64); vertical (2048) preserved. + assert schema.chunk_shape == (8, 64, 2048) + + def test_explicit_non_binned_dims(self) -> None: + """Explicit ``non_binned_dims`` collapse only the named dimensions into ``trace``.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + overrides = GridOverrides(non_binned=True, chunksize=128, non_binned_dims=["channel"]) + schema = SchemaResolver().resolve(template, overrides) + + names = [d.name for d in schema.dimensions] + assert names == ["shot_point", "cable", "trace", "time"] + # shot_point=8, cable=1 preserved; trace=128 (override); vertical=2048. + assert schema.chunk_shape == (8, 1, 128, 2048) + + def test_coordinate_dimensions_collapsed_when_referenced(self) -> None: + """Coordinates referencing collapsed dims are rewritten to depend on ``trace``.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + schema = SchemaResolver().resolve(template, GridOverrides(non_binned=True, chunksize=64)) + # group_coord_x originally depends on (shot_point, cable, channel). After NonBinned + # collapses cable+channel, it should depend on (shot_point, trace). + group_coord_x = next(c for c in schema.coordinates if c.name == "group_coord_x") + assert group_coord_x.dimensions == ("shot_point", "trace") + + def test_non_binned_flag_recorded_in_metadata(self) -> None: + """The NonBinned flag is recorded under ``gridOverrides`` metadata.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + overrides = GridOverrides(non_binned=True, chunksize=64) + schema = SchemaResolver().resolve(template, overrides) + assert "gridOverrides" in schema.metadata + assert schema.metadata["gridOverrides"].get("NonBinned") is True + + +class TestSchemaResolverHasDuplicates: + """HasDuplicates overrides insert a 1-wide ``trace`` dimension before the vertical axis.""" + + def test_inserts_trace_dim_with_chunksize_one(self) -> None: + """HasDuplicates inserts a ``trace`` dim with chunksize 1 before the vertical dim.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + schema = SchemaResolver().resolve(template, GridOverrides(has_duplicates=True)) + + names = [d.name for d in schema.dimensions] + assert names == ["shot_point", "cable", "channel", "trace", "time"] + # Streamer shot default chunks (8, 1, 128, 2048); trace dim is a 1-wide chunk inserted + # before the vertical dim. + assert schema.chunk_shape == (8, 1, 128, 1, 2048) + + def test_has_duplicates_metadata(self) -> None: + """The HasDuplicates flag is recorded under ``gridOverrides`` metadata.""" + template = Seismic3DStreamerShotGathersTemplate(data_domain="time") + schema = SchemaResolver().resolve(template, GridOverrides(has_duplicates=True)) + assert schema.metadata["gridOverrides"].get("HasDuplicates") is True diff --git a/tests/unit/v1/templates/test_coordinate_spec_validation.py b/tests/unit/v1/templates/test_coordinate_spec_validation.py new file mode 100644 index 00000000..beb1076a --- /dev/null +++ b/tests/unit/v1/templates/test_coordinate_spec_validation.py @@ -0,0 +1,73 @@ +"""Tests for the temporary declare_coordinate_specs / _add_coordinates drift guard. + +These tests pin the safety net that keeps ``declare_coordinate_specs`` in sync with the +coordinates actually built by ``_add_coordinates``, including for user-defined templates. +The guard (and these tests) are removed once the ingestion pipeline builds datasets directly +from the resolved schema. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from mdio.builder.schemas.dtype import ScalarType +from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.types import CoordinateSpec + + +class _SubsetCoordTemplate(AbstractDatasetTemplate): + """User-style template whose coordinate spans a subset of the spatial dimensions.""" + + def __init__(self, *, declare_correct_specs: bool) -> None: + super().__init__(data_domain="time") + self._declare_correct_specs = declare_correct_specs + self._dim_names = ("shot", "channel", "time") + self._physical_coord_names = ("src_x",) + self._var_chunk_shape = (8, 8, 128) + + @property + def _name(self) -> str: + return "SubsetCoordTest" + + def _load_dataset_attributes(self) -> dict[str, Any]: + return {} + + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + """Declare ``src_x`` over the correct subset, or fall back to the (wrong) default.""" + if self._declare_correct_specs: + return (CoordinateSpec(name="src_x", dimensions=("shot",), dtype=ScalarType.FLOAT64),) + return super().declare_coordinate_specs() + + def _add_coordinates(self) -> None: + for name in self._dim_names: + self._builder.add_coordinate(name, dimensions=(name,), data_type=ScalarType.INT32) + # src_x is indexed by `shot` only, a subset of the spatial dims (shot, channel). + self._builder.add_coordinate("src_x", dimensions=("shot",), data_type=ScalarType.FLOAT64) + + +def test_matching_specs_build_successfully() -> None: + """A template whose declared specs match its built coordinates builds without error.""" + template = _SubsetCoordTemplate(declare_correct_specs=True) + dataset = template.build_dataset("ok", sizes=(4, 8, 128)) + assert any(v.name == "src_x" for v in dataset.variables) + + +def test_dimension_drift_is_rejected() -> None: + """A template relying on the default specs while building a subset-indexed coordinate fails.""" + template = _SubsetCoordTemplate(declare_correct_specs=False) + with pytest.raises(ValueError, match="declares coordinate 'src_x' over dimensions"): + template.build_dataset("drift", sizes=(4, 8, 128)) + + +def test_missing_declaration_is_rejected() -> None: + """A coordinate built but never declared is reported as out of sync.""" + + class _UndeclaredCoordTemplate(_SubsetCoordTemplate): + def declare_coordinate_specs(self) -> tuple[CoordinateSpec, ...]: + return () + + template = _UndeclaredCoordTemplate(declare_correct_specs=True) + with pytest.raises(ValueError, match="out of sync"): + template.build_dataset("missing", sizes=(4, 8, 128))