diff --git a/AGENTS.MD b/AGENTS.MD new file mode 100644 index 00000000..f4812ee3 --- /dev/null +++ b/AGENTS.MD @@ -0,0 +1,24 @@ +# AGENTS: High-Volume Transfer Playbook + +This repo pushes millions of legacy rows through SQLAlchemy. When Codex or any other agent has to work on +these transfers, keep the following rules in mind to avoid hour-long runs: + +## 1. Skip ORM object construction once volume climbs +- **Do not call `session.bulk_save_objects`** for high frequency tables (e.g., transducer observations, + water-levels, chemistry results). It still instantiates every mapped class and kills throughput. +- Instead, build plain dictionaries/tuples and call `session.execute(insert(Model), data)` or the newer + SQLAlchemy `session.execute(stmt, execution_options={"synchronize_session": False})`. +- If validation is required (Pydantic models, bound schemas), validate first and dump to dicts before the + Core insert. + + +## 2. Running pytest safely +- Activate the repo virtualenv before testing: `source .venv/bin/activate` from the project root so all + dependencies (sqlalchemy, fastapi, etc.) are available. +- Load environment variables from `.env` so pytest sees the same DB creds the app uses. For quick shells: + `set -a; source .env; set +a`, or use `ENV_FILE=.env pytest ...` with `python-dotenv` installed. +- Many tests expect a running Postgres bound to the vars in `.env`; confirm `POSTGRES_*` values point to the + right instance before running destructive suites. +- When done, `deactivate` to exit the venv and avoid polluting other shells. + +Following this playbook keeps ETL runs measured in seconds/minutes instead of hours. EOF diff --git a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py index 97770d56..29c3cab8 100644 --- a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py +++ b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py @@ -35,22 +35,26 @@ def upgrade() -> None: nullable=False, ), sa.Column("WellID", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("PointID", sa.String(length=10), nullable=False), + sa.Column("PointID", sa.String(length=50), nullable=False), sa.Column( "thing_id", sa.Integer(), sa.ForeignKey("thing.id", ondelete="CASCADE"), nullable=False, ), - sa.Column("StratTop", sa.Float(), nullable=True), - sa.Column("StratBottom", sa.Float(), nullable=True), - sa.Column("UnitIdentifier", sa.String(length=50), nullable=True), - sa.Column("Lithology", sa.String(length=100), nullable=True), - sa.Column("LithologicModifier", sa.String(length=100), nullable=True), - sa.Column("ContributingUnit", sa.String(length=10), nullable=True), - sa.Column("StratSource", sa.Text(), nullable=True), - sa.Column("StratNotes", sa.Text(), nullable=True), + sa.Column("StratTop", sa.SmallInteger(), nullable=False), + sa.Column("StratBottom", sa.SmallInteger(), nullable=False), + sa.Column("UnitIdentifier", sa.String(length=20), nullable=True), + sa.Column("Lithology", sa.String(length=4), nullable=True), + sa.Column("LithologicModifier", sa.String(length=255), nullable=True), + sa.Column("ContributingUnit", sa.String(length=2), nullable=True), + sa.Column("StratSource", sa.String(100), nullable=True), + sa.Column("StratNotes", sa.String(255), nullable=True), sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.CheckConstraint( + 'char_length("PointID") > 0', + name="ck_nma_stratigraphy_pointid_len", + ), ) op.create_index( "ix_nma_stratigraphy_point_id", diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 72f39804..5ea1337e 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -22,6 +22,7 @@ from sqlalchemy import ( Boolean, + CheckConstraint, Date, DateTime, Float, @@ -211,28 +212,36 @@ class NMA_Stratigraphy(Base): """Legacy stratigraphy (lithology log) data from AMPAPI.""" __tablename__ = "NMA_Stratigraphy" + __table_args__ = ( + CheckConstraint( + 'char_length("PointID") > 0', + name="ck_nma_stratigraphy_pointid_len", + ), + ) global_id: Mapped[uuid.UUID] = mapped_column( "GlobalID", UUID(as_uuid=True), primary_key=True ) well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) - point_id: Mapped[str] = mapped_column("PointID", String(10), nullable=False) + point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) thing_id: Mapped[int] = mapped_column( Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False ) - strat_top: Mapped[Optional[float]] = mapped_column("StratTop", Float) - strat_bottom: Mapped[Optional[float]] = mapped_column("StratBottom", Float) - unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(50)) - lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(100)) + strat_top: Mapped[int] = mapped_column("StratTop", SmallInteger, nullable=False) + strat_bottom: Mapped[int] = mapped_column( + "StratBottom", SmallInteger, nullable=False + ) + unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(20)) + lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(4)) lithologic_modifier: Mapped[Optional[str]] = mapped_column( - "LithologicModifier", String(100) + "LithologicModifier", String(255) ) contributing_unit: Mapped[Optional[str]] = mapped_column( - "ContributingUnit", String(10) + "ContributingUnit", String(2) ) - strat_source: Mapped[Optional[str]] = mapped_column("StratSource", Text) - strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", Text) + strat_source: Mapped[Optional[str]] = mapped_column("StratSource", String(100)) + strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", String(255)) object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) thing: Mapped["Thing"] = relationship("Thing", back_populates="stratigraphy_logs") diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index 991ee5c9..d96b11d8 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -18,6 +18,7 @@ import pandas as pd from pandas import Timestamp from pydantic import ValidationError +from sqlalchemy import insert from sqlalchemy.exc import DatabaseError from sqlalchemy.orm import Session @@ -42,6 +43,9 @@ def __init__(self, *args, **kw): self.groundwater_parameter_id = get_groundwater_parameter_id() self._itertuples_field_map = {} self._df_columns = set() + self._observation_columns = { + column.key for column in TransducerObservation.__table__.columns + } if self._sensor_types is None: raise ValueError("_sensor_types must be set") if self._partition_field is None: @@ -134,7 +138,15 @@ def _transfer_hook(self, session: Session) -> None: ] observations = [obs for obs in observations if obs is not None] - session.bulk_save_objects(observations) + if observations: + filtered_observations = [ + {k: v for k, v in obs.items() if k in self._observation_columns} + for obs in observations + ] + session.execute( + insert(TransducerObservation), + filtered_observations, + ) session.add(block) logger.info( f"Added {len(observations)} water levels {release_status} block" @@ -164,7 +176,7 @@ def _make_observation( release_status: str, deps_sorted: list, nodeployments: dict, - ) -> TransducerObservation | None: + ) -> dict | None: deployment = _find_deployment(row.DateMeasured, deps_sorted) if deployment is None: @@ -195,7 +207,7 @@ def _make_observation( payload ).model_dump() legacy_payload = self._legacy_payload(row) - return TransducerObservation(**obspayload, **legacy_payload) + return {**obspayload, **legacy_payload} except ValidationError as e: logger.critical(f"Observation validation error: {e.errors()}")