From 80b242c1354afe76d6637c9a25a3eb7ee71f4e9d Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 20:56:57 -0700 Subject: [PATCH 1/4] feat: implement migration for NMA_Radionuclides table and add transfer logic --- alembic/env.py | 18 +- .../f3b4c5d6e7f8_create_nma_radionuclides.py | 107 +++++++ db/nma_legacy.py | 69 +++++ tests/test_radionuclides_legacy.py | 292 ++++++++++++++++++ transfers/metrics.py | 4 + transfers/radionuclides.py | 254 +++++++++++++++ transfers/transfer.py | 43 ++- 7 files changed, 772 insertions(+), 15 deletions(-) create mode 100644 alembic/versions/f3b4c5d6e7f8_create_nma_radionuclides.py create mode 100644 tests/test_radionuclides_legacy.py create mode 100644 transfers/radionuclides.py diff --git a/alembic/env.py b/alembic/env.py index 679ebec6..4224d62e 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -4,7 +4,7 @@ from alembic import context from dotenv import load_dotenv -from sqlalchemy import engine_from_config, pool, create_engine +from sqlalchemy import create_engine, engine_from_config, pool, text from services.util import get_bool_env @@ -164,6 +164,22 @@ def getconn(): ) with context.begin_transaction(): context.run_migrations() + connection.execute( + text( + """ + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'app_read') THEN + CREATE ROLE app_read; + END IF; + GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read; + ALTER DEFAULT PRIVILEGES IN SCHEMA public + GRANT SELECT ON TABLES TO app_read; + END + $$; + """ + ) + ) if context.is_offline_mode(): diff --git a/alembic/versions/f3b4c5d6e7f8_create_nma_radionuclides.py b/alembic/versions/f3b4c5d6e7f8_create_nma_radionuclides.py new file mode 100644 index 00000000..e187d616 --- /dev/null +++ b/alembic/versions/f3b4c5d6e7f8_create_nma_radionuclides.py @@ -0,0 +1,107 @@ +"""Create legacy NMA_Radionuclides table. + +Revision ID: f3b4c5d6e7f8 +Revises: e4f7a9c0b2d3 +Create Date: 2026-03-01 01:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "f3b4c5d6e7f8" +down_revision: Union[str, Sequence[str], None] = "e4f7a9c0b2d3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy radionuclides table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_Radionuclides"): + op.create_table( + "NMA_Radionuclides", + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "SamplePtID", + postgresql.UUID(as_uuid=True), + sa.ForeignKey( + "NMA_Chemistry_SampleInfo.SamplePtID", ondelete="CASCADE" + ), + nullable=False, + ), + sa.Column("SamplePointID", sa.String(length=10), nullable=True), + sa.Column("Analyte", sa.String(length=50), nullable=True), + sa.Column("Symbol", sa.String(length=50), nullable=True), + sa.Column( + "SampleValue", sa.Float(), nullable=True, server_default=sa.text("0") + ), + sa.Column("Units", sa.String(length=50), nullable=True), + sa.Column( + "Uncertainty", sa.Float(), nullable=True, server_default=sa.text("0") + ), + sa.Column("AnalysisMethod", sa.String(length=255), nullable=True), + sa.Column("AnalysisDate", sa.DateTime(), nullable=True), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column( + "Volume", sa.Integer(), nullable=True, server_default=sa.text("0") + ), + sa.Column("VolumeUnit", sa.String(length=50), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + sa.Column("AnalysesAgency", sa.String(length=50), nullable=True), + sa.Column("WCLab_ID", sa.String(length=25), nullable=True), + ) + op.create_index( + "Radionuclides$AnalysesAgency", + "NMA_Radionuclides", + ["AnalysesAgency"], + ) + op.create_index( + "Radionuclides$Analyte", + "NMA_Radionuclides", + ["Analyte"], + ) + op.create_index( + "Radionuclides$Chemistry SampleInfoRadionuclides", + "NMA_Radionuclides", + ["SamplePtID"], + ) + op.create_index( + "Radionuclides$SamplePointID", + "NMA_Radionuclides", + ["SamplePointID"], + ) + op.create_index( + "Radionuclides$SamplePtID", + "NMA_Radionuclides", + ["SamplePtID"], + ) + op.create_index( + "Radionuclides$WCLab_ID", + "NMA_Radionuclides", + ["WCLab_ID"], + ) + + +def downgrade() -> None: + """Drop the legacy radionuclides table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_Radionuclides"): + op.drop_table("NMA_Radionuclides") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index b443d742..eb372564 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -267,6 +267,13 @@ class ChemistrySampleInfo(Base): passive_deletes=True, ) + radionuclides: Mapped[List["NMARadionuclides"]] = relationship( + "NMARadionuclides", + back_populates="chemistry_sample_info", + cascade="all, delete-orphan", + passive_deletes=True, + ) + @validates("thing_id") def validate_thing_id(self, key, value): """Prevent orphan ChemistrySampleInfo - must have a parent Thing.""" @@ -382,4 +389,66 @@ def validate_chemistry_sample_info_id(self, key, value): return value +class NMARadionuclides(Base): + """ + Legacy Radionuclides table from NM_Aquifer_Dev_DB. + """ + + __tablename__ = "NMA_Radionuclides" + + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) + sample_pt_id: Mapped[uuid.UUID] = mapped_column( + "SamplePtID", + UUID(as_uuid=True), + ForeignKey("NMA_Chemistry_SampleInfo.SamplePtID", ondelete="CASCADE"), + nullable=False, + ) + sample_point_id: Mapped[Optional[str]] = mapped_column("SamplePointID", String(10)) + analyte: Mapped[Optional[str]] = mapped_column("Analyte", String(50)) + symbol: Mapped[Optional[str]] = mapped_column("Symbol", String(50)) + sample_value: Mapped[Optional[float]] = mapped_column( + "SampleValue", Float, server_default=text("0") + ) + units: Mapped[Optional[str]] = mapped_column("Units", String(50)) + uncertainty: Mapped[Optional[float]] = mapped_column( + "Uncertainty", Float, server_default=text("0") + ) + analysis_method: Mapped[Optional[str]] = mapped_column( + "AnalysisMethod", String(255) + ) + analysis_date: Mapped[Optional[datetime]] = mapped_column("AnalysisDate", DateTime) + notes: Mapped[Optional[str]] = mapped_column("Notes", String(255)) + volume: Mapped[Optional[int]] = mapped_column( + "Volume", Integer, server_default=text("0") + ) + volume_unit: Mapped[Optional[str]] = mapped_column("VolumeUnit", String(50)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + analyses_agency: Mapped[Optional[str]] = mapped_column("AnalysesAgency", String(50)) + wclab_id: Mapped[Optional[str]] = mapped_column("WCLab_ID", String(25)) + + thing: Mapped["Thing"] = relationship("Thing") + chemistry_sample_info: Mapped["ChemistrySampleInfo"] = relationship( + "ChemistrySampleInfo", back_populates="radionuclides" + ) + + @validates("thing_id") + def validate_thing_id(self, key, value): + if value is None: + raise ValueError( + "NMARadionuclides requires a Thing (thing_id cannot be None)" + ) + return value + + @validates("sample_pt_id") + def validate_sample_pt_id(self, key, value): + if value is None: + raise ValueError("NMARadionuclides requires a SamplePtID") + return value + + # ============= EOF ============================================= diff --git a/tests/test_radionuclides_legacy.py b/tests/test_radionuclides_legacy.py new file mode 100644 index 00000000..d77d877d --- /dev/null +++ b/tests/test_radionuclides_legacy.py @@ -0,0 +1,292 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Unit tests for Radionuclides legacy model. + +These tests verify the migration of columns from the legacy Radionuclides table. +Migrated columns (excluding SSMA_TimeStamp): +- SamplePtID -> sample_pt_id +- SamplePointID -> sample_point_id +- Analyte -> analyte +- Symbol -> symbol +- SampleValue -> sample_value +- Units -> units +- Uncertainty -> uncertainty +- AnalysisMethod -> analysis_method +- AnalysisDate -> analysis_date +- Notes -> notes +- Volume -> volume +- VolumeUnit -> volume_unit +- OBJECTID -> object_id +- GlobalID -> global_id +- AnalysesAgency -> analyses_agency +- WCLab_ID -> wclab_id +""" + +from datetime import datetime +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import ChemistrySampleInfo, NMARadionuclides + + +def _next_sample_point_id() -> str: + return f"SP-{uuid4().hex[:7]}" + + +# ===================== CREATE tests ========================== +def test_create_radionuclides_all_fields(water_well_thing): + """Test creating a radionuclides record with all fields.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + sample_point_id=sample_info.sample_point_id, + analyte="U-238", + symbol="<", + sample_value=0.12, + units="pCi/L", + uncertainty=0.01, + analysis_method="ICP-MS", + analysis_date=datetime(2024, 6, 15, 0, 0, 0), + notes="Test notes", + volume=250, + volume_unit="mL", + analyses_agency="NMBGMR", + wclab_id="LAB-001", + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.sample_pt_id == sample_info.sample_pt_id + assert record.sample_point_id == sample_info.sample_point_id + assert record.analyte == "U-238" + assert record.sample_value == 0.12 + assert record.uncertainty == 0.01 + + session.delete(record) + session.delete(sample_info) + session.commit() + + +def test_create_radionuclides_minimal(water_well_thing): + """Test creating a radionuclides record with minimal fields.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.sample_pt_id == sample_info.sample_pt_id + assert record.analyte is None + assert record.units is None + + session.delete(record) + session.delete(sample_info) + session.commit() + + +# ===================== READ tests ========================== +def test_read_radionuclides_by_global_id(water_well_thing): + """Test reading a radionuclides record by GlobalID.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + + fetched = session.get(NMARadionuclides, record.global_id) + assert fetched is not None + assert fetched.global_id == record.global_id + + session.delete(record) + session.delete(sample_info) + session.commit() + + +def test_query_radionuclides_by_sample_point_id(water_well_thing): + """Test querying radionuclides by sample_point_id.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record1 = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + sample_point_id=sample_info.sample_point_id, + ) + record2 = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + sample_point_id="OTHER-PT", + ) + session.add_all([record1, record2]) + session.commit() + + results = ( + session.query(NMARadionuclides) + .filter(NMARadionuclides.sample_point_id == sample_info.sample_point_id) + .all() + ) + assert len(results) >= 1 + assert all(r.sample_point_id == sample_info.sample_point_id for r in results) + + session.delete(record1) + session.delete(record2) + session.delete(sample_info) + session.commit() + + +# ===================== UPDATE tests ========================== +def test_update_radionuclides(water_well_thing): + """Test updating a radionuclides record.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + + record.analyses_agency = "Updated Agency" + record.notes = "Updated notes" + session.commit() + session.refresh(record) + + assert record.analyses_agency == "Updated Agency" + assert record.notes == "Updated notes" + + session.delete(record) + session.delete(sample_info) + session.commit() + + +# ===================== DELETE tests ========================== +def test_delete_radionuclides(water_well_thing): + """Test deleting a radionuclides record.""" + with session_ctx() as session: + sample_info = ChemistrySampleInfo( + sample_pt_id=uuid4(), + sample_point_id=_next_sample_point_id(), + thing_id=water_well_thing.id, + ) + session.add(sample_info) + session.commit() + + record = NMARadionuclides( + global_id=uuid4(), + thing_id=water_well_thing.id, + sample_pt_id=sample_info.sample_pt_id, + ) + session.add(record) + session.commit() + + session.delete(record) + session.commit() + + fetched = session.get(NMARadionuclides, record.global_id) + assert fetched is None + + session.delete(sample_info) + session.commit() + + +# ===================== Column existence tests ========================== +def test_radionuclides_has_all_migrated_columns(): + """Test that the model has all expected columns.""" + expected_columns = [ + "thing_id", + "sample_pt_id", + "sample_point_id", + "analyte", + "symbol", + "sample_value", + "units", + "uncertainty", + "analysis_method", + "analysis_date", + "notes", + "volume", + "volume_unit", + "object_id", + "global_id", + "analyses_agency", + "wclab_id", + ] + + for column in expected_columns: + assert hasattr( + NMARadionuclides, column + ), f"Expected column '{column}' not found in NMARadionuclides model" + + +def test_radionuclides_table_name(): + """Test that the table name follows convention.""" + assert NMARadionuclides.__tablename__ == "NMA_Radionuclides" + + +# ============= EOF ============================================= diff --git a/transfers/metrics.py b/transfers/metrics.py index fda406b7..7f276b78 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -38,6 +38,7 @@ ThingGeologicFormationAssociation, ChemistrySampleInfo, NMAHydraulicsData, + NMARadionuclides, SurfaceWaterData, NMAWaterLevelsContinuousPressureDaily, ViewNGWMNWellConstruction, @@ -117,6 +118,9 @@ def chemistry_sampleinfo_metrics(self, *args, **kw) -> None: ChemistrySampleInfo, name="Chemistry_SampleInfo", *args, **kw ) + def radionuclides_metrics(self, *args, **kw) -> None: + self._handle_metrics(NMARadionuclides, name="Radionuclides", *args, **kw) + def ngwmn_well_construction_metrics(self, *args, **kw) -> None: self._handle_metrics( ViewNGWMNWellConstruction, name="NGWMN WellConstruction", *args, **kw diff --git a/transfers/radionuclides.py b/transfers/radionuclides.py new file mode 100644 index 00000000..eaf63e04 --- /dev/null +++ b/transfers/radionuclides.py @@ -0,0 +1,254 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import ChemistrySampleInfo, NMARadionuclides +from db.engine import session_ctx +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class RadionuclidesTransferer(Transferer): + """ + Transfer for the legacy Radionuclides table. + """ + + source_table = "Radionuclides" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + self._sample_pt_ids: set[UUID] = set() + self._thing_id_by_sample_pt_id: dict[UUID, int] = {} + self._build_sample_info_cache() + + def _build_sample_info_cache(self) -> None: + with session_ctx() as session: + sample_infos = session.query( + ChemistrySampleInfo.sample_pt_id, ChemistrySampleInfo.thing_id + ).all() + self._sample_pt_ids = {sample_pt_id for sample_pt_id, _ in sample_infos} + self._thing_id_by_sample_pt_id = { + sample_pt_id: thing_id for sample_pt_id, thing_id in sample_infos + } + logger.info( + f"Built ChemistrySampleInfo cache with {len(self._sample_pt_ids)} entries" + ) + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + input_df = read_csv(self.source_table, parse_dates=["AnalysisDate"]) + cleaned_df = self._filter_to_valid_sample_infos(input_df) + return input_df, cleaned_df + + def _filter_to_valid_sample_infos(self, df: pd.DataFrame) -> pd.DataFrame: + valid_sample_pt_ids = self._sample_pt_ids + mask = df["SamplePtID"].apply( + lambda value: self._uuid_val(value) in valid_sample_pt_ids + ) + before_count = len(df) + filtered_df = df[mask].copy() + after_count = len(filtered_df) + + if before_count > after_count: + skipped = before_count - after_count + logger.warning( + f"Filtered out {skipped} Radionuclides records without matching " + f"ChemistrySampleInfo ({after_count} valid, {skipped} orphan records prevented)" + ) + + return filtered_df + + def _transfer_hook(self, session: Session) -> None: + row_dicts = [] + skipped_global_id = 0 + skipped_thing_id = 0 + for row in self.cleaned_df.to_dict("records"): + row_dict = self._row_dict(row) + if row_dict is None: + continue + if row_dict.get("GlobalID") is None: + skipped_global_id += 1 + logger.warning( + "Skipping Radionuclides SamplePtID=%s - GlobalID missing or invalid", + row_dict.get("SamplePtID"), + ) + continue + if row_dict.get("thing_id") is None: + skipped_thing_id += 1 + logger.warning( + "Skipping Radionuclides SamplePtID=%s - Thing not found", + row_dict.get("SamplePtID"), + ) + continue + row_dicts.append(row_dict) + + if skipped_global_id > 0: + logger.warning( + "Skipped %s Radionuclides records without valid GlobalID", + skipped_global_id, + ) + if skipped_thing_id > 0: + logger.warning( + "Skipped %s Radionuclides records without valid Thing", + skipped_thing_id, + ) + + rows = self._dedupe_rows(row_dicts, key="GlobalID") + insert_stmt = insert(NMARadionuclides) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into Radionuclides" + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "thing_id": excluded.thing_id, + "SamplePtID": excluded.SamplePtID, + "SamplePointID": excluded.SamplePointID, + "Analyte": excluded.Analyte, + "Symbol": excluded.Symbol, + "SampleValue": excluded.SampleValue, + "Units": excluded.Units, + "Uncertainty": excluded.Uncertainty, + "AnalysisMethod": excluded.AnalysisMethod, + "AnalysisDate": excluded.AnalysisDate, + "Notes": excluded.Notes, + "Volume": excluded.Volume, + "VolumeUnit": excluded.VolumeUnit, + "OBJECTID": excluded.OBJECTID, + "AnalysesAgency": excluded.AnalysesAgency, + "WCLab_ID": excluded.WCLab_ID, + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> Optional[dict[str, Any]]: + def val(key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + def float_val(key: str) -> Optional[float]: + v = val(key) + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + def int_val(key: str) -> Optional[int]: + v = val(key) + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + analysis_date = val("AnalysisDate") + if hasattr(analysis_date, "to_pydatetime"): + analysis_date = analysis_date.to_pydatetime() + if isinstance(analysis_date, datetime): + analysis_date = analysis_date.replace(tzinfo=None) + + sample_pt_id = self._uuid_val(val("SamplePtID")) + if sample_pt_id is None: + self._capture_error( + val("SamplePtID"), + f"Invalid SamplePtID: {val('SamplePtID')}", + "SamplePtID", + ) + return None + + global_id = self._uuid_val(val("GlobalID")) + thing_id = self._thing_id_by_sample_pt_id.get(sample_pt_id) + + return { + "thing_id": thing_id, + "SamplePtID": sample_pt_id, + "SamplePointID": val("SamplePointID"), + "Analyte": val("Analyte"), + "Symbol": val("Symbol"), + "SampleValue": float_val("SampleValue"), + "Units": val("Units"), + "Uncertainty": float_val("Uncertainty"), + "AnalysisMethod": val("AnalysisMethod"), + "AnalysisDate": analysis_date, + "Notes": val("Notes"), + "Volume": int_val("Volume"), + "VolumeUnit": val("VolumeUnit"), + "OBJECTID": val("OBJECTID"), + "GlobalID": global_id, + "AnalysesAgency": val("AnalysesAgency"), + "WCLab_ID": val("WCLab_ID"), + } + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """ + Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. + Later rows win. + """ + deduped = {} + for row in rows: + row_key = row.get(key) + if row_key is None: + continue + deduped[row_key] = row + return list(deduped.values()) + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = RadionuclidesTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + # Allow running via `python -m transfers.radionuclides` + run() + +# ============= EOF ============================================= diff --git a/transfers/transfer.py b/transfers/transfer.py index 41b26ab1..e326fb84 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -54,6 +54,7 @@ from transfers.asset_transfer import AssetTransferer from transfers.chemistry_sampleinfo import ChemistrySampleInfoTransferer from transfers.hydraulicsdata import HydraulicsDataTransferer +from transfers.radionuclides import RadionuclidesTransferer from transfers.ngwmn_views import ( NGWMNLithologyTransferer, NGWMNWaterLevelsTransferer, @@ -211,6 +212,7 @@ def transfer_all(metrics, limit=100): transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) + transfer_radionuclides = get_bool_env("TRANSFER_RADIONUCLIDES", True) transfer_ngwmn_views = get_bool_env("TRANSFER_NGWMN_VIEWS", True) transfer_pressure_daily = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE_DAILY", True) transfer_weather_data = get_bool_env("TRANSFER_WEATHER_DATA", True) @@ -236,6 +238,7 @@ def transfer_all(metrics, limit=100): transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, @@ -258,6 +261,7 @@ def transfer_all(metrics, limit=100): transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, @@ -281,6 +285,7 @@ def _transfer_parallel( transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, @@ -317,6 +322,8 @@ def _transfer_parallel( parallel_tasks_1.append( ("ChemistrySampleInfo", ChemistrySampleInfoTransferer, flags) ) + if transfer_radionuclides: + parallel_tasks_1.append(("Radionuclides", RadionuclidesTransferer, flags)) if transfer_ngwmn_views: parallel_tasks_1.append( ("NGWMNWellConstruction", NGWMNWellConstructionTransferer, flags) @@ -397,6 +404,8 @@ def _transfer_parallel( metrics.hydraulics_data_metrics(*results_map["HydraulicsData"]) if "ChemistrySampleInfo" in results_map and results_map["ChemistrySampleInfo"]: metrics.chemistry_sampleinfo_metrics(*results_map["ChemistrySampleInfo"]) + if "Radionuclides" in results_map and results_map["Radionuclides"]: + metrics.radionuclides_metrics(*results_map["Radionuclides"]) if "NGWMNWellConstruction" in results_map and results_map["NGWMNWellConstruction"]: metrics.ngwmn_well_construction_metrics(*results_map["NGWMNWellConstruction"]) if "NGWMNWaterLevels" in results_map and results_map["NGWMNWaterLevels"]: @@ -480,6 +489,7 @@ def _transfer_sequential( transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, + transfer_radionuclides, transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, @@ -515,20 +525,6 @@ def _transfer_sequential( results = _execute_transfer(WaterLevelTransferer, flags=flags) metrics.water_level_metrics(*results) - if transfer_pressure: - message("TRANSFERRING WATER LEVELS PRESSURE") - results = _execute_transfer( - WaterLevelsContinuousPressureTransferer, flags=flags - ) - metrics.pressure_metrics(*results) - - if transfer_acoustic: - message("TRANSFERRING WATER LEVELS ACOUSTIC") - results = _execute_transfer( - WaterLevelsContinuousAcousticTransferer, flags=flags - ) - metrics.acoustic_metrics(*results) - if transfer_link_ids: message("TRANSFERRING LINK IDS") results = _execute_transfer(LinkIdsWellDataTransferer, flags=flags) @@ -561,6 +557,11 @@ def _transfer_sequential( results = _execute_transfer(ChemistrySampleInfoTransferer, flags=flags) metrics.chemistry_sampleinfo_metrics(*results) + if transfer_radionuclides: + message("TRANSFERRING RADIONUCLIDES") + results = _execute_transfer(RadionuclidesTransferer, flags=flags) + metrics.radionuclides_metrics(*results) + if transfer_ngwmn_views: message("TRANSFERRING NGWMN WELL CONSTRUCTION") results = _execute_transfer(NGWMNWellConstructionTransferer, flags=flags) @@ -589,6 +590,20 @@ def _transfer_sequential( results = _execute_transfer(MinorTraceChemistryTransferer, flags=flags) metrics.minor_trace_chemistry_metrics(*results) + if transfer_pressure: + message("TRANSFERRING WATER LEVELS PRESSURE") + results = _execute_transfer( + WaterLevelsContinuousPressureTransferer, flags=flags + ) + metrics.pressure_metrics(*results) + + if transfer_acoustic: + message("TRANSFERRING WATER LEVELS ACOUSTIC") + results = _execute_transfer( + WaterLevelsContinuousAcousticTransferer, flags=flags + ) + metrics.acoustic_metrics(*results) + def main(): message("START--------------------------------------") From 5819dfff9fe6747a0d977384e8cdf29e91b87d7f Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 21:00:35 -0700 Subject: [PATCH 2/4] feat: add RadionuclidesAdmin view and integrate into admin interface --- admin/config.py | 9 ++- admin/views/__init__.py | 2 + admin/views/hydraulicsdata.py | 20 +++--- admin/views/radionuclides.py | 126 ++++++++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 13 deletions(-) create mode 100644 admin/views/radionuclides.py diff --git a/admin/config.py b/admin/config.py index 24049ea0..12349525 100644 --- a/admin/config.py +++ b/admin/config.py @@ -38,6 +38,7 @@ SampleAdmin, HydraulicsDataAdmin, ChemistrySampleInfoAdmin, + RadionuclidesAdmin, GeologicFormationAdmin, DataProvenanceAdmin, TransducerObservationAdmin, @@ -64,7 +65,12 @@ from db.group import Group from db.notes import Notes from db.sample import Sample -from db.nma_legacy import ChemistrySampleInfo, NMAHydraulicsData, SurfaceWaterData +from db.nma_legacy import ( + ChemistrySampleInfo, + NMAHydraulicsData, + NMARadionuclides, + SurfaceWaterData, +) from db.geologic_formation import GeologicFormation from db.data_provenance import DataProvenance from db.transducer import TransducerObservation @@ -136,6 +142,7 @@ def create_admin(app): # Hydraulics admin.add_view(HydraulicsDataAdmin(NMAHydraulicsData)) + admin.add_view(RadionuclidesAdmin(NMARadionuclides)) # Field admin.add_view(FieldEventAdmin(FieldEvent)) diff --git a/admin/views/__init__.py b/admin/views/__init__.py index 9b74c065..c5f0ec70 100644 --- a/admin/views/__init__.py +++ b/admin/views/__init__.py @@ -33,6 +33,7 @@ from admin.views.sample import SampleAdmin from admin.views.hydraulicsdata import HydraulicsDataAdmin from admin.views.chemistry_sampleinfo import ChemistrySampleInfoAdmin +from admin.views.radionuclides import RadionuclidesAdmin from admin.views.geologic_formation import GeologicFormationAdmin from admin.views.data_provenance import DataProvenanceAdmin from admin.views.transducer_observation import TransducerObservationAdmin @@ -61,6 +62,7 @@ "SampleAdmin", "HydraulicsDataAdmin", "ChemistrySampleInfoAdmin", + "RadionuclidesAdmin", "GeologicFormationAdmin", "DataProvenanceAdmin", "TransducerObservationAdmin", diff --git a/admin/views/hydraulicsdata.py b/admin/views/hydraulicsdata.py index 6082ce50..e11e4c9d 100644 --- a/admin/views/hydraulicsdata.py +++ b/admin/views/hydraulicsdata.py @@ -36,9 +36,10 @@ class HydraulicsDataAdmin(OcotilloModelView): # ========== List View ========== - column_list = [ + list_fields = [ "global_id", "point_id", + "thing_id", "hydraulic_unit", "hydraulic_unit_type", "test_top", @@ -48,9 +49,10 @@ class HydraulicsDataAdmin(OcotilloModelView): "data_source", ] - column_sortable_list = [ + sortable_fields = [ "global_id", "point_id", + "thing_id", "hydraulic_unit", "hydraulic_unit_type", "test_top", @@ -60,22 +62,14 @@ class HydraulicsDataAdmin(OcotilloModelView): "data_source", ] - search_fields = [ + searchable_fields = [ "global_id", "point_id", "hydraulic_unit", "hydraulic_remarks", - ] - - column_filters = [ - "hydraulic_unit", - "hydraulic_unit_type", "data_source", ] - can_export = True - export_types = ["csv", "excel"] - page_size = 50 page_size_options = [25, 50, 100, 200] @@ -84,6 +78,7 @@ class HydraulicsDataAdmin(OcotilloModelView): fields = [ "global_id", "point_id", + "thing_id", "hydraulic_unit", "hydraulic_unit_type", "hydraulic_remarks", @@ -103,9 +98,10 @@ class HydraulicsDataAdmin(OcotilloModelView): "data_source", ] - labels = { + field_labels = { "global_id": "GlobalID", "point_id": "PointID", + "thing_id": "Thing ID", "hydraulic_unit": "HydraulicUnit", "hydraulic_unit_type": "HydraulicUnitType", "hydraulic_remarks": "Hydraulic Remarks", diff --git a/admin/views/radionuclides.py b/admin/views/radionuclides.py new file mode 100644 index 00000000..53524773 --- /dev/null +++ b/admin/views/radionuclides.py @@ -0,0 +1,126 @@ +# =============================================================================== +# Copyright 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +RadionuclidesAdmin view for legacy NMA_Radionuclides. +""" +from admin.views.base import OcotilloModelView + + +class RadionuclidesAdmin(OcotilloModelView): + """ + Admin view for NMARadionuclides model. + """ + + # ========== Basic Configuration ========== + + name = "Radionuclides" + label = "Radionuclides" + icon = "fa fa-radiation" + + can_create = False + can_edit = False + can_delete = False + + # ========== List View ========== + + list_fields = [ + "global_id", + "sample_pt_id", + "sample_point_id", + "thing_id", + "analyte", + "sample_value", + "units", + "analysis_date", + "analyses_agency", + ] + + sortable_fields = [ + "global_id", + "sample_pt_id", + "sample_point_id", + "thing_id", + "analyte", + "sample_value", + "units", + "analysis_date", + "analyses_agency", + "wclab_id", + "object_id", + ] + + fields_default_sort = [("analysis_date", True)] + + searchable_fields = [ + "global_id", + "sample_pt_id", + "sample_point_id", + "analyte", + "symbol", + "analysis_method", + "analysis_date", + "notes", + "analyses_agency", + "wclab_id", + ] + + page_size = 50 + page_size_options = [25, 50, 100, 200] + + # ========== Form View ========== + + fields = [ + "global_id", + "sample_pt_id", + "sample_point_id", + "thing_id", + "analyte", + "symbol", + "sample_value", + "units", + "uncertainty", + "analysis_method", + "analysis_date", + "notes", + "volume", + "volume_unit", + "object_id", + "analyses_agency", + "wclab_id", + ] + + field_labels = { + "global_id": "GlobalID", + "sample_pt_id": "SamplePtID", + "sample_point_id": "SamplePointID", + "thing_id": "Thing ID", + "analyte": "Analyte", + "symbol": "Symbol", + "sample_value": "SampleValue", + "units": "Units", + "uncertainty": "Uncertainty", + "analysis_method": "AnalysisMethod", + "analysis_date": "AnalysisDate", + "notes": "Notes", + "volume": "Volume", + "volume_unit": "VolumeUnit", + "object_id": "OBJECTID", + "analyses_agency": "AnalysesAgency", + "wclab_id": "WCLab_ID", + } + + +# ============= EOF ============================================= From 18505930ad0149edaf217e7f2fa5df73dd6a29e6 Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Tue, 13 Jan 2026 21:40:12 -0700 Subject: [PATCH 3/4] Update transfers/radionuclides.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- transfers/radionuclides.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/transfers/radionuclides.py b/transfers/radionuclides.py index eaf63e04..73fc4333 100644 --- a/transfers/radionuclides.py +++ b/transfers/radionuclides.py @@ -229,8 +229,26 @@ def _dedupe_rows( self, rows: list[dict[str, Any]], key: str ) -> list[dict[str, Any]]: """ - Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. - Later rows win. + Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops + when inserting into the database. + + For any given ``key`` value, only a single row is kept in the returned list. + If multiple rows share the same ``key`` value, the *last* occurrence in + ``rows`` overwrites earlier ones (i.e. "later rows win"), because the + internal mapping is updated on each encounter of that key. + + This behavior is appropriate when: + * The input batch is ordered such that later rows represent the most + recent or authoritative data for a given key, and + * Only one row per key should be written in a single batch to prevent + repeated ON CONFLICT handling for the same key. + + Callers should be aware that this can silently drop earlier rows with the + same key. If preserving all conflicting rows or applying a custom conflict + resolution strategy is important, the caller should: + * Pre-process and consolidate rows before passing them to this method, or + * Implement a different deduplication/merge strategy tailored to their + needs. """ deduped = {} for row in rows: From 5168dbf0a16aa08e82b059209b311e82925be747 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 13 Jan 2026 21:40:39 -0700 Subject: [PATCH 4/4] feat: update NMA_HydraulicsData schema with UUIDs and new columns --- admin/views/hydraulicsdata.py | 8 +++++ alembic/env.py | 20 ++++++------- .../d1a2b3c4e5f6_create_nma_hydraulicsdata.py | 29 ++++++++++++++++++- db/nma_legacy.py | 6 +++- tests/test_hydraulics_data_legacy.py | 15 ++++++++-- transfers/hydraulicsdata.py | 18 +++++++++++- 6 files changed, 81 insertions(+), 15 deletions(-) diff --git a/admin/views/hydraulicsdata.py b/admin/views/hydraulicsdata.py index e11e4c9d..8ecf683b 100644 --- a/admin/views/hydraulicsdata.py +++ b/admin/views/hydraulicsdata.py @@ -38,6 +38,7 @@ class HydraulicsDataAdmin(OcotilloModelView): list_fields = [ "global_id", + "well_id", "point_id", "thing_id", "hydraulic_unit", @@ -47,10 +48,12 @@ class HydraulicsDataAdmin(OcotilloModelView): "t_ft2_d", "k_darcy", "data_source", + "object_id", ] sortable_fields = [ "global_id", + "well_id", "point_id", "thing_id", "hydraulic_unit", @@ -60,6 +63,7 @@ class HydraulicsDataAdmin(OcotilloModelView): "t_ft2_d", "k_darcy", "data_source", + "object_id", ] searchable_fields = [ @@ -77,6 +81,7 @@ class HydraulicsDataAdmin(OcotilloModelView): fields = [ "global_id", + "well_id", "point_id", "thing_id", "hydraulic_unit", @@ -96,10 +101,12 @@ class HydraulicsDataAdmin(OcotilloModelView): "p_decimal_fraction", "k_darcy", "data_source", + "object_id", ] field_labels = { "global_id": "GlobalID", + "well_id": "WellID", "point_id": "PointID", "thing_id": "Thing ID", "hydraulic_unit": "HydraulicUnit", @@ -119,6 +126,7 @@ class HydraulicsDataAdmin(OcotilloModelView): "p_decimal_fraction": "P (decimal fraction)", "k_darcy": "k (darcy)", "data_source": "Data Source", + "object_id": "OBJECTID", } diff --git a/alembic/env.py b/alembic/env.py index 4224d62e..045bcf3b 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -157,6 +157,13 @@ def getconn(): ) with connectable.connect() as connection: + autocommit_conn = connection.execution_options(isolation_level="AUTOCOMMIT") + role_exists = autocommit_conn.execute( + text("SELECT 1 FROM pg_roles WHERE rolname = 'app_read'") + ).first() + if not role_exists: + autocommit_conn.execute(text("CREATE ROLE app_read")) + context.configure( connection=connection, target_metadata=target_metadata, @@ -167,16 +174,9 @@ def getconn(): connection.execute( text( """ - DO $$ - BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'app_read') THEN - CREATE ROLE app_read; - END IF; - GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read; - ALTER DEFAULT PRIVILEGES IN SCHEMA public - GRANT SELECT ON TABLES TO app_read; - END - $$; + GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_read; + ALTER DEFAULT PRIVILEGES IN SCHEMA public + GRANT SELECT ON TABLES TO app_read; """ ) ) diff --git a/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py b/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py index 6d87dec8..30560dd5 100644 --- a/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py +++ b/alembic/versions/d1a2b3c4e5f6_create_nma_hydraulicsdata.py @@ -10,6 +10,7 @@ from alembic import op import sqlalchemy as sa from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision: str = "d1a2b3c4e5f6" @@ -25,7 +26,13 @@ def upgrade() -> None: if not inspector.has_table("NMA_HydraulicsData"): op.create_table( "NMA_HydraulicsData", - sa.Column("GlobalID", sa.String(length=40), primary_key=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + primary_key=True, + nullable=False, + ), + sa.Column("WellID", postgresql.UUID(as_uuid=True), nullable=True), sa.Column("PointID", sa.String(length=50), nullable=True), sa.Column("HydraulicUnit", sa.String(length=18), nullable=True), sa.Column( @@ -50,6 +57,23 @@ def upgrade() -> None: sa.Column("P (decimal fraction)", sa.Float(), nullable=True), sa.Column("k (darcy)", sa.Float(), nullable=True), sa.Column("Data Source", sa.String(length=255), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True), + ) + op.create_index( + "ix_nma_hydraulicsdata_objectid", + "NMA_HydraulicsData", + ["OBJECTID"], + unique=True, + ) + op.create_index( + "ix_nma_hydraulicsdata_pointid", + "NMA_HydraulicsData", + ["PointID"], + ) + op.create_index( + "ix_nma_hydraulicsdata_wellid", + "NMA_HydraulicsData", + ["WellID"], ) @@ -58,4 +82,7 @@ def downgrade() -> None: bind = op.get_bind() inspector = inspect(bind) if inspector.has_table("NMA_HydraulicsData"): + op.drop_index("ix_nma_hydraulicsdata_wellid", table_name="NMA_HydraulicsData") + op.drop_index("ix_nma_hydraulicsdata_pointid", table_name="NMA_HydraulicsData") + op.drop_index("ix_nma_hydraulicsdata_objectid", table_name="NMA_HydraulicsData") op.drop_table("NMA_HydraulicsData") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index eb372564..07ca6efa 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -166,12 +166,16 @@ class NMAHydraulicsData(Base): __tablename__ = "NMA_HydraulicsData" - global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) data_source: Mapped[Optional[str]] = mapped_column("Data Source", String(255)) thing_id: Mapped[int] = mapped_column( Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False ) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) cs_gal_d_ft: Mapped[Optional[float]] = mapped_column("Cs (gal/d/ft)", Float) hd_ft2_d: Mapped[Optional[float]] = mapped_column("HD (ft2/d)", Float) diff --git a/tests/test_hydraulics_data_legacy.py b/tests/test_hydraulics_data_legacy.py index 4498b5a5..c4b224fd 100644 --- a/tests/test_hydraulics_data_legacy.py +++ b/tests/test_hydraulics_data_legacy.py @@ -19,6 +19,7 @@ These tests verify the migration of columns from the legacy HydraulicsData table. Migrated columns: - GlobalID -> global_id +- WellID -> well_id - PointID -> point_id - Data Source -> data_source - Cs (gal/d/ft) -> cs_gal_d_ft @@ -37,6 +38,7 @@ - HydraulicUnit -> hydraulic_unit - HydraulicUnitType -> hydraulic_unit_type - Hydraulic Remarks -> hydraulic_remarks +- OBJECTID -> object_id - thing_id -> thing_id """ @@ -46,8 +48,8 @@ from db.nma_legacy import NMAHydraulicsData -def _next_global_id() -> str: - return str(uuid4()) +def _next_global_id(): + return uuid4() # ===================== CREATE tests ========================== @@ -56,6 +58,7 @@ def test_create_hydraulics_data_all_fields(water_well_thing): with session_ctx() as session: record = NMAHydraulicsData( global_id=_next_global_id(), + well_id=uuid4(), point_id=water_well_thing.name, data_source="Legacy Source", cs_gal_d_ft=1.2, @@ -74,6 +77,7 @@ def test_create_hydraulics_data_all_fields(water_well_thing): hydraulic_unit="Unit A", hydraulic_unit_type="U", hydraulic_remarks="Test remarks", + object_id=101, thing_id=water_well_thing.id, ) session.add(record) @@ -81,10 +85,12 @@ def test_create_hydraulics_data_all_fields(water_well_thing): session.refresh(record) assert record.global_id is not None + assert record.well_id is not None assert record.point_id == water_well_thing.name assert record.data_source == "Legacy Source" assert record.test_top == 30 assert record.test_bottom == 120 + assert record.object_id == 101 assert record.thing_id == water_well_thing.id session.delete(record) @@ -105,8 +111,10 @@ def test_create_hydraulics_data_minimal(water_well_thing): session.refresh(record) assert record.global_id is not None + assert record.well_id is None assert record.point_id is None assert record.data_source is None + assert record.object_id is None assert record.thing_id == water_well_thing.id session.delete(record) @@ -139,6 +147,7 @@ def test_query_hydraulics_data_by_point_id(water_well_thing): with session_ctx() as session: record1 = NMAHydraulicsData( global_id=_next_global_id(), + well_id=uuid4(), point_id=water_well_thing.name, test_top=10, test_bottom=20, @@ -217,6 +226,7 @@ def test_hydraulics_data_has_all_migrated_columns(): """Test that the model has all expected columns.""" expected_columns = [ "global_id", + "well_id", "point_id", "data_source", "cs_gal_d_ft", @@ -235,6 +245,7 @@ def test_hydraulics_data_has_all_migrated_columns(): "hydraulic_unit", "hydraulic_unit_type", "hydraulic_remarks", + "object_id", "thing_id", ] diff --git a/transfers/hydraulicsdata.py b/transfers/hydraulicsdata.py index 9441235c..75e8d6ba 100644 --- a/transfers/hydraulicsdata.py +++ b/transfers/hydraulicsdata.py @@ -17,6 +17,7 @@ from __future__ import annotations from typing import Any, Optional +import uuid import pandas as pd from sqlalchemy.dialects.postgresql import insert @@ -100,6 +101,7 @@ def _transfer_hook(self, session: Session) -> None: stmt = insert_stmt.values(chunk).on_conflict_do_update( index_elements=["GlobalID"], set_={ + "WellID": excluded["WellID"], "PointID": excluded["PointID"], "HydraulicUnit": excluded["HydraulicUnit"], "thing_id": excluded["thing_id"], @@ -119,6 +121,7 @@ def _transfer_hook(self, session: Session) -> None: "P (decimal fraction)": excluded["P (decimal fraction)"], "k (darcy)": excluded["k (darcy)"], "Data Source": excluded["Data Source"], + "OBJECTID": excluded["OBJECTID"], }, ) session.execute(stmt) @@ -132,6 +135,17 @@ def val(key: str) -> Optional[Any]: return None return v + def as_uuid(key: str) -> Optional[uuid.UUID]: + v = val(key) + if v is None: + return None + if isinstance(v, uuid.UUID): + return v + try: + return uuid.UUID(str(v)) + except (ValueError, TypeError): + return None + def as_int(key: str) -> Optional[int]: v = val(key) if v is None: @@ -142,7 +156,8 @@ def as_int(key: str) -> Optional[int]: return None return { - "GlobalID": val("GlobalID"), + "GlobalID": as_uuid("GlobalID"), + "WellID": as_uuid("WellID"), "PointID": val("PointID"), "HydraulicUnit": val("HydraulicUnit"), "thing_id": self._thing_id_cache.get(val("PointID")), @@ -162,6 +177,7 @@ def as_int(key: str) -> Optional[int]: "P (decimal fraction)": val("P (decimal fraction)"), "k (darcy)": val("k (darcy)"), "Data Source": val("Data Source"), + "OBJECTID": as_int("OBJECTID"), } def _dedupe_rows(