-
Notifications
You must be signed in to change notification settings - Fork 1
use-globalid #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use-globalid #373
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| """Make GlobalID the primary key for NMA_MinorTraceChemistry. | ||
| Revision ID: d2f4c6a8b1c2 | ||
| Revises: 6e1c90f6135a | ||
| Create Date: 2026-03-01 00:00:00.000000 | ||
| """ | ||
|
|
||
| from typing import Sequence, Union | ||
|
|
||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| from sqlalchemy import inspect | ||
| from sqlalchemy.dialects import postgresql | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = "d2f4c6a8b1c2" | ||
| down_revision: Union[str, Sequence[str], None] = "6e1c90f6135a" | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| """Upgrade schema.""" | ||
| bind = op.get_bind() | ||
| inspector = inspect(bind) | ||
| if not inspector.has_table("NMA_MinorTraceChemistry"): | ||
| return | ||
|
|
||
| columns = {col["name"] for col in inspector.get_columns("NMA_MinorTraceChemistry")} | ||
| if "GlobalID" not in columns: | ||
| op.add_column( | ||
| "NMA_MinorTraceChemistry", | ||
| sa.Column("GlobalID", postgresql.UUID(as_uuid=True), nullable=False), | ||
| ) | ||
|
|
||
| pk = inspector.get_pk_constraint("NMA_MinorTraceChemistry") | ||
| pk_name = pk.get("name") | ||
| if pk_name: | ||
| op.drop_constraint(pk_name, "NMA_MinorTraceChemistry", type_="primary") | ||
|
|
||
| if "id" in columns: | ||
| op.drop_column("NMA_MinorTraceChemistry", "id") | ||
|
|
||
| op.create_primary_key( | ||
| "NMA_MinorTraceChemistry_pkey", | ||
| "NMA_MinorTraceChemistry", | ||
| ["GlobalID"], | ||
| ) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| """Downgrade schema.""" | ||
| bind = op.get_bind() | ||
| inspector = inspect(bind) | ||
| if not inspector.has_table("NMA_MinorTraceChemistry"): | ||
| return | ||
|
|
||
| op.drop_constraint( | ||
| "NMA_MinorTraceChemistry_pkey", | ||
| "NMA_MinorTraceChemistry", | ||
| type_="primary", | ||
| ) | ||
| op.add_column( | ||
| "NMA_MinorTraceChemistry", | ||
| sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), | ||
| ) | ||
| op.create_primary_key( | ||
| "NMA_MinorTraceChemistry_id_pkey", | ||
| "NMA_MinorTraceChemistry", | ||
| ["id"], | ||
| ) | ||
| op.drop_column("NMA_MinorTraceChemistry", "GlobalID") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| """Add search_vector triggers for searchable tables. | ||
|
|
||
| Revision ID: e4f7a9c0b2d3 | ||
| Revises: d2f4c6a8b1c2 | ||
| Create Date: 2026-03-01 00:00:00.000000 | ||
| """ | ||
|
|
||
| from typing import Sequence, Union | ||
|
|
||
| from alembic import op | ||
| from sqlalchemy import inspect | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = "e4f7a9c0b2d3" | ||
| down_revision: Union[str, Sequence[str], None] = "d2f4c6a8b1c2" | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None | ||
|
|
||
|
|
||
| SEARCH_VECTOR_TRIGGERS = { | ||
| "contact": ("name", "role", "organization", "nma_pk_owners"), | ||
| "phone": ("phone_number",), | ||
| "email": ("email",), | ||
| "address": ( | ||
| "address_line_1", | ||
| "address_line_2", | ||
| "city", | ||
| "state", | ||
| "postal_code", | ||
| "country", | ||
| ), | ||
| "asset": ("name", "mime_type", "storage_service", "storage_path"), | ||
| "thing": ("name",), | ||
| "well_purpose": ("purpose",), | ||
| "well_casing_material": ("material",), | ||
| "publication": ("title", "abstract", "doi", "publisher", "url"), | ||
| "pub_author": ("name", "affiliation"), | ||
| } | ||
|
|
||
|
|
||
| def _create_trigger(table: str, columns: Sequence[str]) -> None: | ||
| trigger_name = f"{table}_search_vector_update" | ||
| column_list = ", ".join(f"'{col}'" for col in columns) | ||
| op.execute(f'DROP TRIGGER IF EXISTS "{trigger_name}" ON "{table}"') | ||
| op.execute( | ||
| f""" | ||
| CREATE TRIGGER "{trigger_name}" | ||
| BEFORE INSERT OR UPDATE ON "{table}" | ||
| FOR EACH ROW EXECUTE FUNCTION | ||
| tsvector_update_trigger('search_vector', 'pg_catalog.simple', {column_list}); | ||
| """ | ||
| ) | ||
|
|
||
|
|
||
| def _drop_trigger(table: str) -> None: | ||
| trigger_name = f"{table}_search_vector_update" | ||
| op.execute(f'DROP TRIGGER IF EXISTS "{trigger_name}" ON "{table}"') | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| bind = op.get_bind() | ||
| inspector = inspect(bind) | ||
| for table, columns in SEARCH_VECTOR_TRIGGERS.items(): | ||
| if not inspector.has_table(table): | ||
| continue | ||
| column_names = {col["name"] for col in inspector.get_columns(table)} | ||
| if "search_vector" not in column_names: | ||
| continue | ||
| _create_trigger(table, columns) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| bind = op.get_bind() | ||
| inspector = inspect(bind) | ||
| for table in SEARCH_VECTOR_TRIGGERS: | ||
| if inspector.has_table(table): | ||
| _drop_trigger(table) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,7 +116,7 @@ def _transfer_hook(self, session: Session) -> None: | |
| logger.warning("No valid rows to transfer") | ||
| return | ||
|
|
||
| # Dedupe by unique key (chemistry_sample_info_id, analyte) | ||
| # Dedupe by GlobalID to avoid PK conflicts. | ||
| rows = self._dedupe_rows(row_dicts) | ||
| logger.info(f"Upserting {len(rows)} MinorTraceChemistry records") | ||
|
|
||
|
|
@@ -127,7 +127,7 @@ def _transfer_hook(self, session: Session) -> None: | |
| chunk = rows[i : i + self.batch_size] | ||
| logger.info(f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows)") | ||
| stmt = insert_stmt.values(chunk).on_conflict_do_update( | ||
| constraint="uq_minor_trace_chemistry_sample_analyte", | ||
| index_elements=["GlobalID"], | ||
| set_={ | ||
|
Comment on lines
129
to
131
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The upsert now targets Useful? React with 👍 / 👎. |
||
| "sample_value": excluded.sample_value, | ||
| "units": excluded.units, | ||
|
|
@@ -164,7 +164,17 @@ def _row_to_dict(self, row) -> Optional[dict[str, Any]]: | |
| ) | ||
| return None | ||
|
|
||
| global_id = self._uuid_val(getattr(row, "GlobalID", None)) | ||
| if global_id is None: | ||
| self._capture_error( | ||
| getattr(row, "GlobalID", None), | ||
| f"Invalid GlobalID: {getattr(row, 'GlobalID', None)}", | ||
| "GlobalID", | ||
| ) | ||
| return None | ||
|
|
||
| return { | ||
| "global_id": global_id, | ||
| "chemistry_sample_info_id": sample_pt_id, | ||
| "analyte": self._safe_str(row, "Analyte"), | ||
| "sample_value": self._safe_float(row, "SampleValue"), | ||
|
|
@@ -183,7 +193,9 @@ def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" | ||
| deduped = {} | ||
| for row in rows: | ||
| key = (row["chemistry_sample_info_id"], row["analyte"]) | ||
| key = row.get("global_id") | ||
| if key is None: | ||
| continue | ||
| deduped[key] = row | ||
| return list(deduped.values()) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upgrade adds a new
GlobalIDcolumn withnullable=Falseon an existing table, but there is no backfill or default before enforcing the constraint. On PostgreSQL, this migration will fail ifNMA_MinorTraceChemistryalready contains rows because the new column is NULL for existing records. To make this safe on populated databases, add the column as nullable (or with a server default), populate it, then alter it to NOT NULL.Useful? React with 👍 / 👎.