From 71d6a96626ed352d445444e44bf773fe8e40119e Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 7 Jan 2026 18:03:27 -0700 Subject: [PATCH 1/7] feat: add NMA_HydraulicsData model and admin view, including backfill functionality --- admin/config.py | 9 +- admin/views/__init__.py | 2 + admin/views/hydraulicsdata.py | 129 +++++++++++++++ .../d1a2b3c4e5f6_create_nma_hydraulicsdata.py | 55 +++++++ db/nma_legacy.py | 47 +++++- transfers/backfill/hydraulicsdata.py | 151 ++++++++++++++++++ transfers/backfill/staging.py | 4 + 7 files changed, 386 insertions(+), 11 deletions(-) create mode 100644 admin/views/hydraulicsdata.py create mode 100644 alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py create mode 100644 transfers/backfill/hydraulicsdata.py diff --git a/admin/config.py b/admin/config.py index e88dfdc3..adeab0c7 100644 --- a/admin/config.py +++ b/admin/config.py @@ -36,11 +36,11 @@ GroupAdmin, NotesAdmin, SampleAdmin, + HydraulicsDataAdmin, GeologicFormationAdmin, DataProvenanceAdmin, FieldEventAdmin, FieldActivityAdmin, - FieldEventParticipantAdmin, ParameterAdmin, ) from db.engine import engine @@ -60,10 +60,10 @@ from db.group import Group from db.notes import Notes from db.sample import Sample +from db.nma_legacy import NMAHydraulicsData from db.geologic_formation import GeologicFormation from db.data_provenance import DataProvenance -from db.field import FieldEvent, FieldActivity, FieldEventParticipant -from db.permission_history import PermissionHistory +from db.field import FieldEvent, FieldActivity from db.parameter import Parameter @@ -127,6 +127,9 @@ def create_admin(app): # Samples admin.add_view(SampleAdmin(Sample)) + # Hydraulics + admin.add_view(HydraulicsDataAdmin(NMAHydraulicsData)) + # Field admin.add_view(FieldEventAdmin(FieldEvent)) admin.add_view(FieldActivityAdmin(FieldActivity)) diff --git a/admin/views/__init__.py b/admin/views/__init__.py index 74c2c141..55b7bbd2 100644 --- a/admin/views/__init__.py +++ b/admin/views/__init__.py @@ -31,6 +31,7 @@ from admin.views.group import GroupAdmin from admin.views.notes import NotesAdmin from admin.views.sample import SampleAdmin +from admin.views.hydraulicsdata import HydraulicsDataAdmin from admin.views.geologic_formation import GeologicFormationAdmin from admin.views.data_provenance import DataProvenanceAdmin from admin.views.field import ( @@ -55,6 +56,7 @@ "GroupAdmin", "NotesAdmin", "SampleAdmin", + "HydraulicsDataAdmin", "GeologicFormationAdmin", "DataProvenanceAdmin", "FieldEventAdmin", diff --git a/admin/views/hydraulicsdata.py b/admin/views/hydraulicsdata.py new file mode 100644 index 00000000..6082ce50 --- /dev/null +++ b/admin/views/hydraulicsdata.py @@ -0,0 +1,129 @@ +# =============================================================================== +# Copyright 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +HydraulicsDataAdmin view for legacy NMA_HydraulicsData. +""" +from admin.views.base import OcotilloModelView + + +class HydraulicsDataAdmin(OcotilloModelView): + """ + Admin view for NMAHydraulicsData model. + """ + + # ========== Basic Configuration ========== + + name = "Hydraulics Data" + label = "Hydraulics Data" + icon = "fa fa-tint" + + can_create = False + can_edit = False + can_delete = False + + # ========== List View ========== + + column_list = [ + "global_id", + "point_id", + "hydraulic_unit", + "hydraulic_unit_type", + "test_top", + "test_bottom", + "t_ft2_d", + "k_darcy", + "data_source", + ] + + column_sortable_list = [ + "global_id", + "point_id", + "hydraulic_unit", + "hydraulic_unit_type", + "test_top", + "test_bottom", + "t_ft2_d", + "k_darcy", + "data_source", + ] + + search_fields = [ + "global_id", + "point_id", + "hydraulic_unit", + "hydraulic_remarks", + ] + + column_filters = [ + "hydraulic_unit", + "hydraulic_unit_type", + "data_source", + ] + + can_export = True + export_types = ["csv", "excel"] + + page_size = 50 + page_size_options = [25, 50, 100, 200] + + # ========== Form View ========== + + fields = [ + "global_id", + "point_id", + "hydraulic_unit", + "hydraulic_unit_type", + "hydraulic_remarks", + "test_top", + "test_bottom", + "t_ft2_d", + "s_dimensionless", + "ss_ft_1", + "sy_decimalfractn", + "kh_ft_d", + "kv_ft_d", + "hl_day_1", + "hd_ft2_d", + "cs_gal_d_ft", + "p_decimal_fraction", + "k_darcy", + "data_source", + ] + + labels = { + "global_id": "GlobalID", + "point_id": "PointID", + "hydraulic_unit": "HydraulicUnit", + "hydraulic_unit_type": "HydraulicUnitType", + "hydraulic_remarks": "Hydraulic Remarks", + "test_top": "TestTop", + "test_bottom": "TestBottom", + "t_ft2_d": "T (ft2/d)", + "s_dimensionless": "S (dimensionless)", + "ss_ft_1": "Ss (ft-1)", + "sy_decimalfractn": "Sy (decimalfractn)", + "kh_ft_d": "KH (ft/d)", + "kv_ft_d": "KV (ft/d)", + "hl_day_1": "HL (day-1)", + "hd_ft2_d": "HD (ft2/d)", + "cs_gal_d_ft": "Cs (gal/d/ft)", + "p_decimal_fraction": "P (decimal fraction)", + "k_darcy": "k (darcy)", + "data_source": "Data Source", + } + + +# ============= EOF ============================================= diff --git a/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py b/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py new file mode 100644 index 00000000..59368b50 --- /dev/null +++ b/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py @@ -0,0 +1,55 @@ +"""Create legacy NMA_HydraulicsData table. + +Revision ID: d1a2b3c4e5f6 +Revises: c9f1d2e3a4b5 +Create Date: 2026-02-10 04:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "d1a2b3c4e5f6" +down_revision: Union[str, Sequence[str], None] = "c9f1d2e3a4b5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy hydraulics data table used for backfill.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_HydraulicsData"): + op.create_table( + "NMA_HydraulicsData", + sa.Column("GlobalID", sa.String(length=40), primary_key=True), + sa.Column("PointID", sa.String(length=50), nullable=True), + sa.Column("HydraulicUnit", sa.String(length=18), nullable=True), + sa.Column("TestTop", sa.SmallInteger(), nullable=False), + sa.Column("TestBottom", sa.SmallInteger(), nullable=False), + sa.Column("HydraulicUnitType", sa.String(length=2), nullable=True), + sa.Column("Hydraulic Remarks", sa.String(length=200), nullable=True), + sa.Column("T (ft2/d)", sa.Float(), nullable=True), + sa.Column("S (dimensionless)", sa.Float(), nullable=True), + sa.Column("Ss (ft-1)", sa.Float(), nullable=True), + sa.Column("Sy (decimalfractn)", sa.Float(), nullable=True), + sa.Column("KH (ft/d)", sa.Float(), nullable=True), + sa.Column("KV (ft/d)", sa.Float(), nullable=True), + sa.Column("HL (day-1)", sa.Float(), nullable=True), + sa.Column("HD (ft2/d)", sa.Float(), nullable=True), + sa.Column("Cs (gal/d/ft)", sa.Float(), nullable=True), + sa.Column("P (decimal fraction)", sa.Float(), nullable=True), + sa.Column("k (darcy)", sa.Float(), nullable=True), + sa.Column("Data Source", sa.String(length=255), nullable=True), + ) + + +def downgrade() -> None: + """Drop the legacy hydraulics data table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_HydraulicsData"): + op.drop_table("NMA_HydraulicsData") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 8033dcc4..cc7e02d4 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -19,14 +19,7 @@ from datetime import date, datetime from typing import Optional -from sqlalchemy import ( - Boolean, - Date, - DateTime, - Float, - Integer, - String, -) +from sqlalchemy import Boolean, Date, DateTime, Float, Integer, SmallInteger, String from sqlalchemy.orm import Mapped, mapped_column from db.base import Base @@ -148,4 +141,42 @@ class ViewNGWMNLithology(Base): ) +class NMAHydraulicsData(Base): + """ + Legacy HydraulicsData table from AMPAPI. + """ + + __tablename__ = "NMA_HydraulicsData" + + global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) + point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) + data_source: Mapped[Optional[str]] = mapped_column("Data Source", String(255)) + + cs_gal_d_ft: Mapped[Optional[float]] = mapped_column("Cs (gal/d/ft)", Float) + hd_ft2_d: Mapped[Optional[float]] = mapped_column("HD (ft2/d)", Float) + hl_day_1: Mapped[Optional[float]] = mapped_column("HL (day-1)", Float) + kh_ft_d: Mapped[Optional[float]] = mapped_column("KH (ft/d)", Float) + kv_ft_d: Mapped[Optional[float]] = mapped_column("KV (ft/d)", Float) + p_decimal_fraction: Mapped[Optional[float]] = mapped_column( + "P (decimal fraction)", Float + ) + s_dimensionless: Mapped[Optional[float]] = mapped_column("S (dimensionless)", Float) + ss_ft_1: Mapped[Optional[float]] = mapped_column("Ss (ft-1)", Float) + sy_decimalfractn: Mapped[Optional[float]] = mapped_column( + "Sy (decimalfractn)", Float + ) + t_ft2_d: Mapped[Optional[float]] = mapped_column("T (ft2/d)", Float) + k_darcy: Mapped[Optional[float]] = mapped_column("k (darcy)", Float) + + test_bottom: Mapped[int] = mapped_column("TestBottom", SmallInteger, nullable=False) + test_top: Mapped[int] = mapped_column("TestTop", SmallInteger, nullable=False) + hydraulic_unit: Mapped[Optional[str]] = mapped_column("HydraulicUnit", String(18)) + hydraulic_unit_type: Mapped[Optional[str]] = mapped_column( + "HydraulicUnitType", String(2) + ) + hydraulic_remarks: Mapped[Optional[str]] = mapped_column( + "Hydraulic Remarks", String(200) + ) + + # ============= EOF ============================================= diff --git a/transfers/backfill/hydraulicsdata.py b/transfers/backfill/hydraulicsdata.py new file mode 100644 index 00000000..8972eec7 --- /dev/null +++ b/transfers/backfill/hydraulicsdata.py @@ -0,0 +1,151 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +from typing import Any, Optional + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import NMAHydraulicsData +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class NMAHydraulicsDataBackfill(Transferer): + """ + Backfill for the legacy NMA_HydraulicsData table. + """ + + source_table = "HydraulicsData" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + input_df = read_csv(self.source_table) + return input_df, input_df + + def _transfer_hook(self, session: Session) -> None: + rows = self._dedupe_rows( + [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], + key="GlobalID", + ) + + insert_stmt = insert(NMAHydraulicsData) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into NMA_HydraulicsData" + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "PointID": excluded["PointID"], + "HydraulicUnit": excluded["HydraulicUnit"], + "TestTop": excluded["TestTop"], + "TestBottom": excluded["TestBottom"], + "HydraulicUnitType": excluded["HydraulicUnitType"], + "Hydraulic Remarks": excluded["Hydraulic Remarks"], + "T (ft2/d)": excluded["T (ft2/d)"], + "S (dimensionless)": excluded["S (dimensionless)"], + "Ss (ft-1)": excluded["Ss (ft-1)"], + "Sy (decimalfractn)": excluded["Sy (decimalfractn)"], + "KH (ft/d)": excluded["KH (ft/d)"], + "KV (ft/d)": excluded["KV (ft/d)"], + "HL (day-1)": excluded["HL (day-1)"], + "HD (ft2/d)": excluded["HD (ft2/d)"], + "Cs (gal/d/ft)": excluded["Cs (gal/d/ft)"], + "P (decimal fraction)": excluded["P (decimal fraction)"], + "k (darcy)": excluded["k (darcy)"], + "Data Source": excluded["Data Source"], + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def val(key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + def as_int(key: str) -> Optional[int]: + v = val(key) + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + return { + "GlobalID": val("GlobalID"), + "PointID": val("PointID"), + "HydraulicUnit": val("HydraulicUnit"), + "TestTop": as_int("TestTop"), + "TestBottom": as_int("TestBottom"), + "HydraulicUnitType": val("HydraulicUnitType"), + "Hydraulic Remarks": val("Hydraulic Remarks"), + "T (ft2/d)": val("T (ft2/d)"), + "S (dimensionless)": val("S (dimensionless)"), + "Ss (ft-1)": val("Ss (ft-1)"), + "Sy (decimalfractn)": val("Sy (decimalfractn)"), + "KH (ft/d)": val("KH (ft/d)"), + "KV (ft/d)": val("KV (ft/d)"), + "HL (day-1)": val("HL (day-1)"), + "HD (ft2/d)": val("HD (ft2/d)"), + "Cs (gal/d/ft)": val("Cs (gal/d/ft)"), + "P (decimal fraction)": val("P (decimal fraction)"), + "k (darcy)": val("k (darcy)"), + "Data Source": val("Data Source"), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """ + Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. + Later rows win. + """ + deduped = {} + for row in rows: + gid = row.get(key) + if gid is None: + continue + deduped[gid] = row + return list(deduped.values()) + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the backfill.""" + transferer = NMAHydraulicsDataBackfill(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + # Allow running via `python -m transfers.backfill.hydraulicsdata` + run() + +# ============= EOF ============================================= diff --git a/transfers/backfill/staging.py b/transfers/backfill/staging.py index 172b6737..679db5c4 100644 --- a/transfers/backfill/staging.py +++ b/transfers/backfill/staging.py @@ -32,6 +32,8 @@ from transfers.backfill.waterlevelscontinuous_pressure_daily import ( run as run_pressure_daily, ) +from transfers.backfill.chemistry_sampleinfo import run as run_chemistry_sampleinfo +from transfers.backfill.hydraulicsdata import run as run_hydraulicsdata from transfers.logger import logger @@ -41,6 +43,8 @@ def run(batch_size: int = 1000) -> None: """ steps = ( ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), + ("Chemistry_SampleInfo", run_chemistry_sampleinfo), + ("HydraulicsData", run_hydraulicsdata), ("NGWMN views", run_ngwmn_views), ) From fd3f1c33c7b06afcccf9278a80d62fb1c44a88de Mon Sep 17 00:00:00 2001 From: jross Date: Tue, 13 Jan 2026 17:31:27 -0700 Subject: [PATCH 2/7] feat: replace primary key with GlobalID in NMA_MinorTraceChemistry and update related logic --- ...cd5d_add_nma_chemistry_lineage_relations.py | 4 ++-- db/nma_legacy.py | 4 +++- transfers/minor_trace_chemistry_transfer.py | 18 +++++++++++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py b/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py index 82dbe471..9ac9a99a 100644 --- a/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py +++ b/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py @@ -83,7 +83,7 @@ def upgrade() -> None: # Create NMA_MinorTraceChemistry table op.create_table( "NMA_MinorTraceChemistry", - sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=False), sa.Column( "chemistry_sample_info_id", postgresql.UUID(as_uuid=True), @@ -100,7 +100,7 @@ def upgrade() -> None: sa.Column("uncertainty", sa.Float(), nullable=True), sa.Column("volume", sa.Float(), nullable=True), sa.Column("volume_unit", sa.String(20), nullable=True), - sa.PrimaryKeyConstraint("id"), + sa.PrimaryKeyConstraint("GlobalID"), sa.ForeignKeyConstraint( ["chemistry_sample_info_id"], ["NMA_Chemistry_SampleInfo.SamplePtID"], diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 053276af..98d920fe 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -299,7 +299,9 @@ class NMAMinorTraceChemistry(Base): ), ) - id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) # FK to ChemistrySampleInfo - required (no orphans) chemistry_sample_info_id: Mapped[uuid.UUID] = mapped_column( diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 1c527c6e..d89a20c9 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -116,7 +116,7 @@ def _transfer_hook(self, session: Session) -> None: logger.warning("No valid rows to transfer") return - # Dedupe by unique key (chemistry_sample_info_id, analyte) + # Dedupe by GlobalID to avoid PK conflicts. rows = self._dedupe_rows(row_dicts) logger.info(f"Upserting {len(rows)} MinorTraceChemistry records") @@ -127,7 +127,7 @@ def _transfer_hook(self, session: Session) -> None: chunk = rows[i : i + self.batch_size] logger.info(f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows)") stmt = insert_stmt.values(chunk).on_conflict_do_update( - constraint="uq_minor_trace_chemistry_sample_analyte", + index_elements=["GlobalID"], set_={ "sample_value": excluded.sample_value, "units": excluded.units, @@ -164,7 +164,17 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: ) return None + global_id = self._uuid_val(getattr(row, "GlobalID", None)) + if global_id is None: + self._capture_error( + getattr(row, "GlobalID", None), + f"Invalid GlobalID: {getattr(row, 'GlobalID', None)}", + "GlobalID", + ) + return None + return { + "global_id": global_id, "chemistry_sample_info_id": sample_pt_id, "analyte": self._safe_str(row, "Analyte"), "sample_value": self._safe_float(row, "SampleValue"), @@ -183,7 +193,9 @@ def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" deduped = {} for row in rows: - key = (row["chemistry_sample_info_id"], row["analyte"]) + key = row.get("global_id") + if key is None: + continue deduped[key] = row return list(deduped.values()) From 8dab067e0c14250bdb439988d9c5ece046e92084 Mon Sep 17 00:00:00 2001 From: jross Date: Tue, 13 Jan 2026 17:41:26 -0700 Subject: [PATCH 3/7] feat: set GlobalID as primary key for NMA_MinorTraceChemistry and add search_vector triggers for searchable tables --- ...f4c6a8b1c2_make_minor_trace_globalid_pk.py | 72 +++++++++++++++++ ...e4f7a9c0b2d3_add_search_vector_triggers.py | 77 +++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py create mode 100644 alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py diff --git a/alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py b/alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py new file mode 100644 index 00000000..d624237d --- /dev/null +++ b/alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py @@ -0,0 +1,72 @@ +"""Make GlobalID the primary key for NMA_MinorTraceChemistry. + +Revision ID: d2f4c6a8b1c2 +Revises: 6e1c90f6135a +Create Date: 2026-03-01 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "d2f4c6a8b1c2" +down_revision: Union[str, Sequence[str], None] = "6e1c90f6135a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_MinorTraceChemistry"): + return + + columns = {col["name"] for col in inspector.get_columns("NMA_MinorTraceChemistry")} + if "GlobalID" not in columns: + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=False), + ) + + pk = inspector.get_pk_constraint("NMA_MinorTraceChemistry") + pk_name = pk.get("name") + if pk_name: + op.drop_constraint(pk_name, "NMA_MinorTraceChemistry", type_="primary") + + if "id" in columns: + op.drop_column("NMA_MinorTraceChemistry", "id") + + op.create_primary_key( + "NMA_MinorTraceChemistry_pkey", + "NMA_MinorTraceChemistry", + ["GlobalID"], + ) + + +def downgrade() -> None: + """Downgrade schema.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_MinorTraceChemistry"): + return + + op.drop_constraint( + "NMA_MinorTraceChemistry_pkey", + "NMA_MinorTraceChemistry", + type_="primary", + ) + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + ) + op.create_primary_key( + "NMA_MinorTraceChemistry_id_pkey", + "NMA_MinorTraceChemistry", + ["id"], + ) + op.drop_column("NMA_MinorTraceChemistry", "GlobalID") diff --git a/alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py b/alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py new file mode 100644 index 00000000..cdf3164e --- /dev/null +++ b/alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py @@ -0,0 +1,77 @@ +"""Add search_vector triggers for searchable tables. + +Revision ID: e4f7a9c0b2d3 +Revises: d2f4c6a8b1c2 +Create Date: 2026-03-01 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "e4f7a9c0b2d3" +down_revision: Union[str, Sequence[str], None] = "d2f4c6a8b1c2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +SEARCH_VECTOR_TRIGGERS = { + "contact": ("name", "role", "organization", "nma_pk_owners"), + "phone": ("phone_number",), + "email": ("email",), + "address": ( + "address_line_1", + "address_line_2", + "city", + "state", + "postal_code", + "country", + ), + "asset": ("name", "mime_type", "storage_service", "storage_path"), + "thing": ("name",), + "well_purpose": ("purpose",), + "well_casing_material": ("material",), + "publication": ("title", "abstract", "doi", "publisher", "url"), + "pub_author": ("name", "affiliation"), +} + + +def _create_trigger(table: str, columns: Sequence[str]) -> None: + trigger_name = f"{table}_search_vector_update" + column_list = ", ".join(f"'{col}'" for col in columns) + op.execute(f'DROP TRIGGER IF EXISTS "{trigger_name}" ON "{table}"') + op.execute( + f""" + CREATE TRIGGER "{trigger_name}" + BEFORE INSERT OR UPDATE ON "{table}" + FOR EACH ROW EXECUTE FUNCTION + tsvector_update_trigger('search_vector', 'pg_catalog.simple', {column_list}); + """ + ) + + +def _drop_trigger(table: str) -> None: + trigger_name = f"{table}_search_vector_update" + op.execute(f'DROP TRIGGER IF EXISTS "{trigger_name}" ON "{table}"') + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + for table, columns in SEARCH_VECTOR_TRIGGERS.items(): + if not inspector.has_table(table): + continue + column_names = {col["name"] for col in inspector.get_columns(table)} + if "search_vector" not in column_names: + continue + _create_trigger(table, columns) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + for table in SEARCH_VECTOR_TRIGGERS: + if inspector.has_table(table): + _drop_trigger(table) From 6f5fc8d7dcd160b80e507e0a612a4bf47cdfec07 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 20:02:29 -0700 Subject: [PATCH 4/7] feat: migrate NMA_HydraulicsData schema and implement data transfer logic --- README.md | 6 + .../d1a2b3c4e5f6_create_nma_hydraulicsdata.py | 8 +- db/nma_legacy.py | 5 + tests/test_hydraulics_data_legacy.py | 252 ++++++++++++++++++ transfers/{backfill => }/hydraulicsdata.py | 66 ++++- transfers/metrics.py | 4 + transfers/transfer.py | 44 ++- 7 files changed, 370 insertions(+), 15 deletions(-) create mode 100644 tests/test_hydraulics_data_legacy.py rename transfers/{backfill => }/hydraulicsdata.py (68%) diff --git a/README.md b/README.md index 37f625db..b35d4933 100644 --- a/README.md +++ b/README.md @@ -240,3 +240,9 @@ python -m transfers.transfer ``` Configure the `.env` file with the appropriate credentials before running transfers. + +To drop the existing schema and rebuild from migrations before transferring data, set: + +```bash +export DROP_AND_REBUILD_DB=true +``` diff --git a/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py b/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py index 59368b50..6d87dec8 100644 --- a/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py +++ b/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = "d1a2b3c4e5f6" -down_revision: Union[str, Sequence[str], None] = "c9f1d2e3a4b5" +down_revision: Union[str, Sequence[str], None] = "6e1c90f6135a" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -28,6 +28,12 @@ def upgrade() -> None: sa.Column("GlobalID", sa.String(length=40), primary_key=True), sa.Column("PointID", sa.String(length=50), nullable=True), sa.Column("HydraulicUnit", sa.String(length=18), nullable=True), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=False, + ), sa.Column("TestTop", sa.SmallInteger(), nullable=False), sa.Column("TestBottom", sa.SmallInteger(), nullable=False), sa.Column("HydraulicUnitType", sa.String(length=2), nullable=True), diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 03ac2dd4..2cbc1e0d 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -169,6 +169,9 @@ class NMAHydraulicsData(Base): global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) data_source: Mapped[Optional[str]] = mapped_column("Data Source", String(255)) + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) cs_gal_d_ft: Mapped[Optional[float]] = mapped_column("Cs (gal/d/ft)", Float) hd_ft2_d: Mapped[Optional[float]] = mapped_column("HD (ft2/d)", Float) @@ -196,6 +199,8 @@ class NMAHydraulicsData(Base): "Hydraulic Remarks", String(200) ) + thing: Mapped["Thing"] = relationship("Thing") + class ChemistrySampleInfo(Base): """ diff --git a/tests/test_hydraulics_data_legacy.py b/tests/test_hydraulics_data_legacy.py new file mode 100644 index 00000000..4498b5a5 --- /dev/null +++ b/tests/test_hydraulics_data_legacy.py @@ -0,0 +1,252 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Unit tests for HydraulicsData legacy model. + +These tests verify the migration of columns from the legacy HydraulicsData table. +Migrated columns: +- GlobalID -> global_id +- PointID -> point_id +- Data Source -> data_source +- Cs (gal/d/ft) -> cs_gal_d_ft +- HD (ft2/d) -> hd_ft2_d +- HL (day-1) -> hl_day_1 +- KH (ft/d) -> kh_ft_d +- KV (ft/d) -> kv_ft_d +- P (decimal fraction) -> p_decimal_fraction +- S (dimensionless) -> s_dimensionless +- Ss (ft-1) -> ss_ft_1 +- Sy (decimalfractn) -> sy_decimalfractn +- T (ft2/d) -> t_ft2_d +- k (darcy) -> k_darcy +- TestBottom -> test_bottom +- TestTop -> test_top +- HydraulicUnit -> hydraulic_unit +- HydraulicUnitType -> hydraulic_unit_type +- Hydraulic Remarks -> hydraulic_remarks +- thing_id -> thing_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import NMAHydraulicsData + + +def _next_global_id() -> str: + return str(uuid4()) + + +# ===================== CREATE tests ========================== +def test_create_hydraulics_data_all_fields(water_well_thing): + """Test creating a hydraulics data record with all fields.""" + with session_ctx() as session: + record = NMAHydraulicsData( + global_id=_next_global_id(), + point_id=water_well_thing.name, + data_source="Legacy Source", + cs_gal_d_ft=1.2, + hd_ft2_d=3.4, + hl_day_1=0.02, + kh_ft_d=12.5, + kv_ft_d=1.1, + p_decimal_fraction=0.15, + s_dimensionless=0.2, + ss_ft_1=0.003, + sy_decimalfractn=0.12, + t_ft2_d=45.6, + k_darcy=2.5, + test_bottom=120, + test_top=30, + hydraulic_unit="Unit A", + hydraulic_unit_type="U", + hydraulic_remarks="Test remarks", + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.point_id == water_well_thing.name + assert record.data_source == "Legacy Source" + assert record.test_top == 30 + assert record.test_bottom == 120 + assert record.thing_id == water_well_thing.id + + session.delete(record) + session.commit() + + +def test_create_hydraulics_data_minimal(water_well_thing): + """Test creating a hydraulics data record with minimal fields.""" + with session_ctx() as session: + record = NMAHydraulicsData( + global_id=_next_global_id(), + test_top=10, + test_bottom=20, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.point_id is None + assert record.data_source is None + assert record.thing_id == water_well_thing.id + + session.delete(record) + session.commit() + + +# ===================== READ tests ========================== +def test_read_hydraulics_data_by_global_id(water_well_thing): + """Test reading a hydraulics data record by GlobalID.""" + with session_ctx() as session: + record = NMAHydraulicsData( + global_id=_next_global_id(), + test_top=5, + test_bottom=15, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + + fetched = session.get(NMAHydraulicsData, record.global_id) + assert fetched is not None + assert fetched.global_id == record.global_id + + session.delete(record) + session.commit() + + +def test_query_hydraulics_data_by_point_id(water_well_thing): + """Test querying hydraulics data by point_id.""" + with session_ctx() as session: + record1 = NMAHydraulicsData( + global_id=_next_global_id(), + point_id=water_well_thing.name, + test_top=10, + test_bottom=20, + thing_id=water_well_thing.id, + ) + record2 = NMAHydraulicsData( + global_id=_next_global_id(), + point_id="OTHER-POINT", + test_top=30, + test_bottom=40, + thing_id=water_well_thing.id, + ) + session.add_all([record1, record2]) + session.commit() + + results = ( + session.query(NMAHydraulicsData) + .filter(NMAHydraulicsData.point_id == water_well_thing.name) + .all() + ) + assert len(results) >= 1 + assert all(r.point_id == water_well_thing.name for r in results) + + session.delete(record1) + session.delete(record2) + session.commit() + + +# ===================== UPDATE tests ========================== +def test_update_hydraulics_data(water_well_thing): + """Test updating a hydraulics data record.""" + with session_ctx() as session: + record = NMAHydraulicsData( + global_id=_next_global_id(), + test_top=5, + test_bottom=15, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + + record.hydraulic_remarks = "Updated remarks" + record.data_source = "Updated source" + session.commit() + session.refresh(record) + + assert record.hydraulic_remarks == "Updated remarks" + assert record.data_source == "Updated source" + + session.delete(record) + session.commit() + + +# ===================== DELETE tests ========================== +def test_delete_hydraulics_data(water_well_thing): + """Test deleting a hydraulics data record.""" + with session_ctx() as session: + record = NMAHydraulicsData( + global_id=_next_global_id(), + test_top=5, + test_bottom=15, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + + session.delete(record) + session.commit() + + fetched = session.get(NMAHydraulicsData, record.global_id) + assert fetched is None + + +# ===================== Column existence tests ========================== +def test_hydraulics_data_has_all_migrated_columns(): + """Test that the model has all expected columns.""" + expected_columns = [ + "global_id", + "point_id", + "data_source", + "cs_gal_d_ft", + "hd_ft2_d", + "hl_day_1", + "kh_ft_d", + "kv_ft_d", + "p_decimal_fraction", + "s_dimensionless", + "ss_ft_1", + "sy_decimalfractn", + "t_ft2_d", + "k_darcy", + "test_bottom", + "test_top", + "hydraulic_unit", + "hydraulic_unit_type", + "hydraulic_remarks", + "thing_id", + ] + + for column in expected_columns: + assert hasattr( + NMAHydraulicsData, column + ), f"Expected column '{column}' not found in NMAHydraulicsData model" + + +def test_hydraulics_data_table_name(): + """Test that the table name follows convention.""" + assert NMAHydraulicsData.__tablename__ == "NMA_HydraulicsData" + + +# ============= EOF ============================================= diff --git a/transfers/backfill/hydraulicsdata.py b/transfers/hydraulicsdata.py similarity index 68% rename from transfers/backfill/hydraulicsdata.py rename to transfers/hydraulicsdata.py index 8972eec7..9441235c 100644 --- a/transfers/backfill/hydraulicsdata.py +++ b/transfers/hydraulicsdata.py @@ -22,15 +22,16 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import NMAHydraulicsData +from db import NMAHydraulicsData, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import read_csv -class NMAHydraulicsDataBackfill(Transferer): +class HydraulicsDataTransferer(Transferer): """ - Backfill for the legacy NMA_HydraulicsData table. + Transfer for the legacy NMA_HydraulicsData table. """ source_table = "HydraulicsData" @@ -38,16 +39,55 @@ class NMAHydraulicsDataBackfill(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: - input_df = read_csv(self.source_table) - return input_df, input_df + df = read_csv(self.source_table) + cleaned_df = self._filter_to_valid_things(df) + return df, cleaned_df + + def _filter_to_valid_things(self, df: pd.DataFrame) -> pd.DataFrame: + valid_point_ids = set(self._thing_id_cache.keys()) + before_count = len(df) + filtered_df = df[df["PointID"].isin(valid_point_ids)].copy() + after_count = len(filtered_df) + if before_count > after_count: + skipped = before_count - after_count + logger.warning( + f"Filtered out {skipped} HydraulicsData records without matching Things " + f"({after_count} valid, {skipped} orphan records prevented)" + ) + return filtered_df def _transfer_hook(self, session: Session) -> None: - rows = self._dedupe_rows( - [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], - key="GlobalID", - ) + row_dicts = [] + skipped_count = 0 + for row in self.cleaned_df.to_dict("records"): + row_dict = self._row_dict(row) + if row_dict.get("thing_id") is None: + skipped_count += 1 + logger.warning( + "Skipping HydraulicsData GlobalID=%s PointID=%s - Thing not found", + row_dict.get("GlobalID"), + row_dict.get("PointID"), + ) + continue + row_dicts.append(row_dict) + + if skipped_count > 0: + logger.warning( + f"Skipped {skipped_count} HydraulicsData records without valid Thing " + f"(orphan prevention)" + ) + + rows = self._dedupe_rows(row_dicts, key="GlobalID") insert_stmt = insert(NMAHydraulicsData) excluded = insert_stmt.excluded @@ -62,6 +102,7 @@ def _transfer_hook(self, session: Session) -> None: set_={ "PointID": excluded["PointID"], "HydraulicUnit": excluded["HydraulicUnit"], + "thing_id": excluded["thing_id"], "TestTop": excluded["TestTop"], "TestBottom": excluded["TestBottom"], "HydraulicUnitType": excluded["HydraulicUnitType"], @@ -104,6 +145,7 @@ def as_int(key: str) -> Optional[int]: "GlobalID": val("GlobalID"), "PointID": val("PointID"), "HydraulicUnit": val("HydraulicUnit"), + "thing_id": self._thing_id_cache.get(val("PointID")), "TestTop": as_int("TestTop"), "TestBottom": as_int("TestBottom"), "HydraulicUnitType": val("HydraulicUnitType"), @@ -139,13 +181,13 @@ def _dedupe_rows( def run(batch_size: int = 1000) -> None: - """Entrypoint to execute the backfill.""" - transferer = NMAHydraulicsDataBackfill(batch_size=batch_size) + """Entrypoint to execute the transfer.""" + transferer = HydraulicsDataTransferer(batch_size=batch_size) transferer.transfer() if __name__ == "__main__": - # Allow running via `python -m transfers.backfill.hydraulicsdata` + # Allow running via `python -m transfers.hydraulicsdata` run() # ============= EOF ============================================= diff --git a/transfers/metrics.py b/transfers/metrics.py index beb2eb89..fda406b7 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -37,6 +37,7 @@ PermissionHistory, ThingGeologicFormationAssociation, ChemistrySampleInfo, + NMAHydraulicsData, SurfaceWaterData, NMAWaterLevelsContinuousPressureDaily, ViewNGWMNWellConstruction, @@ -108,6 +109,9 @@ def group_metrics(self, *args, **kw) -> None: def surface_water_data_metrics(self, *args, **kw) -> None: self._handle_metrics(SurfaceWaterData, *args, **kw) + def hydraulics_data_metrics(self, *args, **kw) -> None: + self._handle_metrics(NMAHydraulicsData, name="HydraulicsData", *args, **kw) + def chemistry_sampleinfo_metrics(self, *args, **kw) -> None: self._handle_metrics( ChemistrySampleInfo, name="Chemistry_SampleInfo", *args, **kw diff --git a/transfers/transfer.py b/transfers/transfer.py index 1135b55d..41b26ab1 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -19,7 +19,11 @@ from dotenv import load_dotenv +from alembic import command +from alembic.config import Config + from db.engine import session_ctx +from sqlalchemy import text from services.util import get_bool_env from transfers.aquifer_system_transfer import transfer_aquifer_systems from transfers.geologic_formation_transfer import transfer_geologic_formations @@ -34,7 +38,7 @@ ) from transfers.metrics import Metrics -from core.initializers import erase_and_rebuild_db +from core.initializers import erase_and_rebuild_db, init_lexicon, init_parameter from transfers.group_transfer import ProjectGroupTransferer from transfers.link_ids_transfer import ( @@ -49,6 +53,7 @@ from transfers.asset_transfer import AssetTransferer from transfers.chemistry_sampleinfo import ChemistrySampleInfoTransferer +from transfers.hydraulicsdata import HydraulicsDataTransferer from transfers.ngwmn_views import ( NGWMNLithologyTransferer, NGWMNWaterLevelsTransferer, @@ -124,10 +129,31 @@ def _execute_foundational_transfer_with_timing(name: str, transfer_func, limit: return name, result, elapsed +def _alembic_config() -> Config: + root = os.path.dirname(os.path.dirname(__file__)) + cfg = Config(os.path.join(root, "alembic.ini")) + cfg.set_main_option("script_location", os.path.join(root, "alembic")) + return cfg + + +def _drop_and_rebuild_db() -> None: + with session_ctx() as session: + session.execute(text("DROP SCHEMA public CASCADE")) + session.execute(text("CREATE SCHEMA public")) + session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) + session.commit() + command.upgrade(_alembic_config(), "head") + init_lexicon() + init_parameter() + + @timeit def transfer_all(metrics, limit=100): message("STARTING TRANSFER", new_line_at_top=False) - if get_bool_env("ERASE_AND_REBUILD", False): + if get_bool_env("DROP_AND_REBUILD_DB", False): + logger.info("Dropping schema and rebuilding database from migrations") + _drop_and_rebuild_db() + elif get_bool_env("ERASE_AND_REBUILD", False): logger.info("Erase and rebuilding database") erase_and_rebuild_db() @@ -183,6 +209,7 @@ def transfer_all(metrics, limit=100): transfer_groups = get_bool_env("TRANSFER_GROUPS", True) transfer_assets = get_bool_env("TRANSFER_ASSETS", False) transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) + transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) transfer_ngwmn_views = get_bool_env("TRANSFER_NGWMN_VIEWS", True) transfer_pressure_daily = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE_DAILY", True) @@ -207,6 +234,7 @@ def transfer_all(metrics, limit=100): transfer_groups, transfer_assets, transfer_surface_water_data, + transfer_hydraulics_data, transfer_chemistry_sampleinfo, transfer_ngwmn_views, transfer_pressure_daily, @@ -228,6 +256,7 @@ def transfer_all(metrics, limit=100): transfer_groups, transfer_assets, transfer_surface_water_data, + transfer_hydraulics_data, transfer_chemistry_sampleinfo, transfer_ngwmn_views, transfer_pressure_daily, @@ -250,6 +279,7 @@ def _transfer_parallel( transfer_groups, transfer_assets, transfer_surface_water_data, + transfer_hydraulics_data, transfer_chemistry_sampleinfo, transfer_ngwmn_views, transfer_pressure_daily, @@ -281,6 +311,8 @@ def _transfer_parallel( parallel_tasks_1.append(("Assets", AssetTransferer, flags)) if transfer_surface_water_data: parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer, flags)) + if transfer_hydraulics_data: + parallel_tasks_1.append(("HydraulicsData", HydraulicsDataTransferer, flags)) if transfer_chemistry_sampleinfo: parallel_tasks_1.append( ("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags) @@ -361,6 +393,8 @@ def _transfer_parallel( metrics.asset_metrics(*results_map["Assets"]) if "SurfaceWaterData" in results_map and results_map["SurfaceWaterData"]: metrics.surface_water_data_metrics(*results_map["SurfaceWaterData"]) + if "HydraulicsData" in results_map and results_map["HydraulicsData"]: + metrics.hydraulics_data_metrics(*results_map["HydraulicsData"]) if "ChemistrySampleInfo" in results_map and results_map["ChemistrySampleInfo"]: metrics.chemistry_sampleinfo_metrics(*results_map["ChemistrySampleInfo"]) if "NGWMNWellConstruction" in results_map and results_map["NGWMNWellConstruction"]: @@ -444,6 +478,7 @@ def _transfer_sequential( transfer_groups, transfer_assets, transfer_surface_water_data, + transfer_hydraulics_data, transfer_chemistry_sampleinfo, transfer_ngwmn_views, transfer_pressure_daily, @@ -516,6 +551,11 @@ def _transfer_sequential( results = _execute_transfer(SurfaceWaterDataTransferer, flags=flags) metrics.surface_water_data_metrics(*results) + if transfer_hydraulics_data: + message("TRANSFERRING HYDRAULICS DATA") + results = _execute_transfer(HydraulicsDataTransferer, flags=flags) + metrics.hydraulics_data_metrics(*results) + if transfer_chemistry_sampleinfo: message("TRANSFERRING CHEMISTRY SAMPLEINFO") results = _execute_transfer(ChemistrySampleInfoTransferer, flags=flags) From 7a9efa7c2672fd4d341114ba13e7f50b41f0335c Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 20:07:59 -0700 Subject: [PATCH 5/7] feat: add GlobalID column to NMA_MinorTraceChemistry and update data transfer logic --- .../95d8b982cd5d_add_nma_chemistry_lineage_relations.py | 5 +++++ db/nma_legacy.py | 3 +++ tests/test_nma_chemistry_lineage.py | 2 ++ transfers/minor_trace_chemistry_transfer.py | 3 ++- 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py b/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py index 82dbe471..d82be3b0 100644 --- a/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py +++ b/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py @@ -84,6 +84,11 @@ def upgrade() -> None: op.create_table( "NMA_MinorTraceChemistry", sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + ), sa.Column( "chemistry_sample_info_id", postgresql.UUID(as_uuid=True), diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 2cbc1e0d..098bfcc4 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -344,6 +344,9 @@ class NMAMinorTraceChemistry(Base): ) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), nullable=False, default=uuid.uuid4 + ) # FK to ChemistrySampleInfo - required (no orphans) chemistry_sample_info_id: Mapped[uuid.UUID] = mapped_column( diff --git a/tests/test_nma_chemistry_lineage.py b/tests/test_nma_chemistry_lineage.py index f3ae0a0a..341b7c58 100644 --- a/tests/test_nma_chemistry_lineage.py +++ b/tests/test_nma_chemistry_lineage.py @@ -102,6 +102,7 @@ def test_nma_minor_trace_chemistry_columns(): expected_columns = [ "id", # new PK + "global_id", "chemistry_sample_info_id", # new FK (UUID, not string) # from legacy "analyte", @@ -159,6 +160,7 @@ def test_nma_minor_trace_chemistry_save_all_columns(shared_well): # Verify all columns saved assert mtc.id is not None + assert mtc.global_id is not None assert mtc.chemistry_sample_info_id == sample_info.sample_pt_id assert mtc.analyte == "As" assert mtc.sample_value == 0.015 diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 1c527c6e..bb4404bf 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -25,7 +25,7 @@ from datetime import date, datetime from typing import Any, Optional -from uuid import UUID +from uuid import UUID, uuid4 import pandas as pd from sqlalchemy.dialects.postgresql import insert @@ -165,6 +165,7 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: return None return { + "global_id": self._uuid_val(getattr(row, "GlobalID", None)) or uuid4(), "chemistry_sample_info_id": sample_pt_id, "analyte": self._safe_str(row, "Analyte"), "sample_value": self._safe_float(row, "SampleValue"), From 1096d5aaff9c092d0cea6151739e06f05f06ae51 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 20:15:15 -0700 Subject: [PATCH 6/7] Clean up legacy imports --- admin/config.py | 4 +--- db/nma_legacy.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/admin/config.py b/admin/config.py index 038b3d2d..24049ea0 100644 --- a/admin/config.py +++ b/admin/config.py @@ -64,9 +64,7 @@ from db.group import Group from db.notes import Notes from db.sample import Sample -from db.nma_legacy import ChemistrySampleInfo, SurfaceWaterData -from db.nma_legacy import NMAHydraulicsData -from db.nma_legacy import ChemistrySampleInfo +from db.nma_legacy import ChemistrySampleInfo, NMAHydraulicsData, SurfaceWaterData from db.geologic_formation import GeologicFormation from db.data_provenance import DataProvenance from db.transducer import TransducerObservation diff --git a/db/nma_legacy.py b/db/nma_legacy.py index c82eaa1c..b443d742 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -21,7 +21,6 @@ from datetime import date, datetime from typing import TYPE_CHECKING, List, Optional -from sqlalchemy import Boolean, Date, DateTime, Float, Integer, SmallInteger, String from sqlalchemy import ( Boolean, Date, @@ -29,10 +28,11 @@ Float, ForeignKey, Integer, + SmallInteger, String, Text, - text, UniqueConstraint, + text, ) from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column, relationship, validates From 02100a634c22ea8e79befad3b745e1dc4e633df8 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 20:20:51 -0700 Subject: [PATCH 7/7] feat: add _next_global_id function and assign global_id in NMAMinorTraceChemistry instances --- tests/test_nma_chemistry_lineage.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/test_nma_chemistry_lineage.py b/tests/test_nma_chemistry_lineage.py index c13d758e..b58edb91 100644 --- a/tests/test_nma_chemistry_lineage.py +++ b/tests/test_nma_chemistry_lineage.py @@ -47,6 +47,10 @@ def _next_sample_point_id() -> str: return f"SP-{uuid4().hex[:7]}" +def _next_global_id(): + return uuid4() + + @pytest.fixture(scope="module") def shared_well(): """Create a single Thing for all tests in this module.""" @@ -140,6 +144,7 @@ def test_nma_minor_trace_chemistry_save_all_columns(shared_well): session.commit() mtc = NMAMinorTraceChemistry( + global_id=_next_global_id(), chemistry_sample_info=sample_info, analyte="As", sample_value=0.015, @@ -334,6 +339,7 @@ def test_assign_sample_info_to_mtc(shared_well): session.commit() mtc = NMAMinorTraceChemistry( + global_id=_next_global_id(), analyte="As", sample_value=0.01, units="mg/L", @@ -368,6 +374,7 @@ def test_append_mtc_to_sample_info(shared_well): session.commit() mtc = NMAMinorTraceChemistry( + global_id=_next_global_id(), analyte="U", sample_value=15.2, units="ug/L", @@ -428,6 +435,7 @@ def test_full_lineage_navigation(shared_well): session.commit() mtc = NMAMinorTraceChemistry( + global_id=_next_global_id(), analyte="Se", sample_value=0.005, units="mg/L", @@ -461,6 +469,7 @@ def test_reverse_lineage_navigation(shared_well): session.commit() mtc = NMAMinorTraceChemistry( + global_id=_next_global_id(), analyte="Pb", sample_value=0.002, units="mg/L", @@ -508,6 +517,7 @@ def test_cascade_delete_sample_info_deletes_mtc(shared_well): for analyte in ["As", "U", "Se", "Pb"]: sample_info.minor_trace_chemistries.append( NMAMinorTraceChemistry( + global_id=_next_global_id(), analyte=analyte, sample_value=0.01, units="mg/L", @@ -629,6 +639,7 @@ def test_multiple_mtc_per_sample_info(shared_well): for analyte in analytes: sample_info.minor_trace_chemistries.append( NMAMinorTraceChemistry( + global_id=_next_global_id(), analyte=analyte, sample_value=0.01, units="mg/L",