diff --git a/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py b/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py index 82dbe471..9ac9a99a 100644 --- a/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py +++ b/alembic/versions/95d8b982cd5d_add_nma_chemistry_lineage_relations.py @@ -83,7 +83,7 @@ def upgrade() -> None: # Create NMA_MinorTraceChemistry table op.create_table( "NMA_MinorTraceChemistry", - sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=False), sa.Column( "chemistry_sample_info_id", postgresql.UUID(as_uuid=True), @@ -100,7 +100,7 @@ def upgrade() -> None: sa.Column("uncertainty", sa.Float(), nullable=True), sa.Column("volume", sa.Float(), nullable=True), sa.Column("volume_unit", sa.String(20), nullable=True), - sa.PrimaryKeyConstraint("id"), + sa.PrimaryKeyConstraint("GlobalID"), sa.ForeignKeyConstraint( ["chemistry_sample_info_id"], ["NMA_Chemistry_SampleInfo.SamplePtID"], diff --git a/alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py b/alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py new file mode 100644 index 00000000..d624237d --- /dev/null +++ b/alembic/versions/d2f4c6a8b1c2_make_minor_trace_globalid_pk.py @@ -0,0 +1,72 @@ +"""Make GlobalID the primary key for NMA_MinorTraceChemistry. + +Revision ID: d2f4c6a8b1c2 +Revises: 6e1c90f6135a +Create Date: 2026-03-01 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "d2f4c6a8b1c2" +down_revision: Union[str, Sequence[str], None] = "6e1c90f6135a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_MinorTraceChemistry"): + return + + columns = {col["name"] for col in inspector.get_columns("NMA_MinorTraceChemistry")} + if "GlobalID" not in columns: + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=False), + ) + + pk = inspector.get_pk_constraint("NMA_MinorTraceChemistry") + pk_name = pk.get("name") + if pk_name: + op.drop_constraint(pk_name, "NMA_MinorTraceChemistry", type_="primary") + + if "id" in columns: + op.drop_column("NMA_MinorTraceChemistry", "id") + + op.create_primary_key( + "NMA_MinorTraceChemistry_pkey", + "NMA_MinorTraceChemistry", + ["GlobalID"], + ) + + +def downgrade() -> None: + """Downgrade schema.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_MinorTraceChemistry"): + return + + op.drop_constraint( + "NMA_MinorTraceChemistry_pkey", + "NMA_MinorTraceChemistry", + type_="primary", + ) + op.add_column( + "NMA_MinorTraceChemistry", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + ) + op.create_primary_key( + "NMA_MinorTraceChemistry_id_pkey", + "NMA_MinorTraceChemistry", + ["id"], + ) + op.drop_column("NMA_MinorTraceChemistry", "GlobalID") diff --git a/alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py b/alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py new file mode 100644 index 00000000..cdf3164e --- /dev/null +++ b/alembic/versions/e4f7a9c0b2d3_add_search_vector_triggers.py @@ -0,0 +1,77 @@ +"""Add search_vector triggers for searchable tables. + +Revision ID: e4f7a9c0b2d3 +Revises: d2f4c6a8b1c2 +Create Date: 2026-03-01 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "e4f7a9c0b2d3" +down_revision: Union[str, Sequence[str], None] = "d2f4c6a8b1c2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +SEARCH_VECTOR_TRIGGERS = { + "contact": ("name", "role", "organization", "nma_pk_owners"), + "phone": ("phone_number",), + "email": ("email",), + "address": ( + "address_line_1", + "address_line_2", + "city", + "state", + "postal_code", + "country", + ), + "asset": ("name", "mime_type", "storage_service", "storage_path"), + "thing": ("name",), + "well_purpose": ("purpose",), + "well_casing_material": ("material",), + "publication": ("title", "abstract", "doi", "publisher", "url"), + "pub_author": ("name", "affiliation"), +} + + +def _create_trigger(table: str, columns: Sequence[str]) -> None: + trigger_name = f"{table}_search_vector_update" + column_list = ", ".join(f"'{col}'" for col in columns) + op.execute(f'DROP TRIGGER IF EXISTS "{trigger_name}" ON "{table}"') + op.execute( + f""" + CREATE TRIGGER "{trigger_name}" + BEFORE INSERT OR UPDATE ON "{table}" + FOR EACH ROW EXECUTE FUNCTION + tsvector_update_trigger('search_vector', 'pg_catalog.simple', {column_list}); + """ + ) + + +def _drop_trigger(table: str) -> None: + trigger_name = f"{table}_search_vector_update" + op.execute(f'DROP TRIGGER IF EXISTS "{trigger_name}" ON "{table}"') + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + for table, columns in SEARCH_VECTOR_TRIGGERS.items(): + if not inspector.has_table(table): + continue + column_names = {col["name"] for col in inspector.get_columns(table)} + if "search_vector" not in column_names: + continue + _create_trigger(table, columns) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + for table in SEARCH_VECTOR_TRIGGERS: + if inspector.has_table(table): + _drop_trigger(table) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 053276af..98d920fe 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -299,7 +299,9 @@ class NMAMinorTraceChemistry(Base): ), ) - id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) # FK to ChemistrySampleInfo - required (no orphans) chemistry_sample_info_id: Mapped[uuid.UUID] = mapped_column( diff --git a/transfers/minor_trace_chemistry_transfer.py b/transfers/minor_trace_chemistry_transfer.py index 1c527c6e..d89a20c9 100644 --- a/transfers/minor_trace_chemistry_transfer.py +++ b/transfers/minor_trace_chemistry_transfer.py @@ -116,7 +116,7 @@ def _transfer_hook(self, session: Session) -> None: logger.warning("No valid rows to transfer") return - # Dedupe by unique key (chemistry_sample_info_id, analyte) + # Dedupe by GlobalID to avoid PK conflicts. rows = self._dedupe_rows(row_dicts) logger.info(f"Upserting {len(rows)} MinorTraceChemistry records") @@ -127,7 +127,7 @@ def _transfer_hook(self, session: Session) -> None: chunk = rows[i : i + self.batch_size] logger.info(f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows)") stmt = insert_stmt.values(chunk).on_conflict_do_update( - constraint="uq_minor_trace_chemistry_sample_analyte", + index_elements=["GlobalID"], set_={ "sample_value": excluded.sample_value, "units": excluded.units, @@ -164,7 +164,17 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: ) return None + global_id = self._uuid_val(getattr(row, "GlobalID", None)) + if global_id is None: + self._capture_error( + getattr(row, "GlobalID", None), + f"Invalid GlobalID: {getattr(row, 'GlobalID', None)}", + "GlobalID", + ) + return None + return { + "global_id": global_id, "chemistry_sample_info_id": sample_pt_id, "analyte": self._safe_str(row, "Analyte"), "sample_value": self._safe_float(row, "SampleValue"), @@ -183,7 +193,9 @@ def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" deduped = {} for row in rows: - key = (row["chemistry_sample_info_id"], row["analyte"]) + key = row.get("global_id") + if key is None: + continue deduped[key] = row return list(deduped.values())