Skip to content
Merged
24 changes: 24 additions & 0 deletions AGENTS.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# AGENTS: High-Volume Transfer Playbook

This repo pushes millions of legacy rows through SQLAlchemy. When Codex or any other agent has to work on
these transfers, keep the following rules in mind to avoid hour-long runs:

## 1. Skip ORM object construction once volume climbs
- **Do not call `session.bulk_save_objects`** for high frequency tables (e.g., transducer observations,
water-levels, chemistry results). It still instantiates every mapped class and kills throughput.
- Instead, build plain dictionaries/tuples and call `session.execute(insert(Model), data)` or the newer
SQLAlchemy `session.execute(stmt, execution_options={"synchronize_session": False})`.
- If validation is required (Pydantic models, bound schemas), validate first and dump to dicts before the
Core insert.


## 2. Running pytest safely
- Activate the repo virtualenv before testing: `source .venv/bin/activate` from the project root so all
dependencies (sqlalchemy, fastapi, etc.) are available.
- Load environment variables from `.env` so pytest sees the same DB creds the app uses. For quick shells:
`set -a; source .env; set +a`, or use `ENV_FILE=.env pytest ...` with `python-dotenv` installed.
- Many tests expect a running Postgres bound to the vars in `.env`; confirm `POSTGRES_*` values point to the
right instance before running destructive suites.
- When done, `deactivate` to exit the venv and avoid polluting other shells.

Following this playbook keeps ETL runs measured in seconds/minutes instead of hours. EOF
22 changes: 13 additions & 9 deletions alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,26 @@ def upgrade() -> None:
nullable=False,
),
sa.Column("WellID", postgresql.UUID(as_uuid=True), nullable=True),
sa.Column("PointID", sa.String(length=10), nullable=False),
sa.Column("PointID", sa.String(length=50), nullable=False),
sa.Column(
"thing_id",
sa.Integer(),
sa.ForeignKey("thing.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("StratTop", sa.Float(), nullable=True),
sa.Column("StratBottom", sa.Float(), nullable=True),
sa.Column("UnitIdentifier", sa.String(length=50), nullable=True),
sa.Column("Lithology", sa.String(length=100), nullable=True),
sa.Column("LithologicModifier", sa.String(length=100), nullable=True),
sa.Column("ContributingUnit", sa.String(length=10), nullable=True),
sa.Column("StratSource", sa.Text(), nullable=True),
sa.Column("StratNotes", sa.Text(), nullable=True),
sa.Column("StratTop", sa.SmallInteger(), nullable=False),
sa.Column("StratBottom", sa.SmallInteger(), nullable=False),
sa.Column("UnitIdentifier", sa.String(length=20), nullable=True),
sa.Column("Lithology", sa.String(length=4), nullable=True),
sa.Column("LithologicModifier", sa.String(length=255), nullable=True),
sa.Column("ContributingUnit", sa.String(length=2), nullable=True),
sa.Column("StratSource", sa.String(100), nullable=True),
sa.Column("StratNotes", sa.String(255), nullable=True),
sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True),
sa.CheckConstraint(
'char_length("PointID") > 0',
name="ck_nma_stratigraphy_pointid_len",
),
)
op.create_index(
"ix_nma_stratigraphy_point_id",
Expand Down
27 changes: 18 additions & 9 deletions db/nma_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from sqlalchemy import (
Boolean,
CheckConstraint,
Date,
DateTime,
Float,
Expand Down Expand Up @@ -211,28 +212,36 @@ class NMA_Stratigraphy(Base):
"""Legacy stratigraphy (lithology log) data from AMPAPI."""

__tablename__ = "NMA_Stratigraphy"
__table_args__ = (
CheckConstraint(
'char_length("PointID") > 0',
name="ck_nma_stratigraphy_pointid_len",
),
)

global_id: Mapped[uuid.UUID] = mapped_column(
"GlobalID", UUID(as_uuid=True), primary_key=True
)
well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True))
point_id: Mapped[str] = mapped_column("PointID", String(10), nullable=False)
point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False)
thing_id: Mapped[int] = mapped_column(
Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False
)

strat_top: Mapped[Optional[float]] = mapped_column("StratTop", Float)
strat_bottom: Mapped[Optional[float]] = mapped_column("StratBottom", Float)
unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(50))
lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(100))
strat_top: Mapped[int] = mapped_column("StratTop", SmallInteger, nullable=False)
strat_bottom: Mapped[int] = mapped_column(
"StratBottom", SmallInteger, nullable=False
)
unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(20))
lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(4))
lithologic_modifier: Mapped[Optional[str]] = mapped_column(
"LithologicModifier", String(100)
"LithologicModifier", String(255)
)
contributing_unit: Mapped[Optional[str]] = mapped_column(
"ContributingUnit", String(10)
"ContributingUnit", String(2)
)
strat_source: Mapped[Optional[str]] = mapped_column("StratSource", Text)
strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", Text)
strat_source: Mapped[Optional[str]] = mapped_column("StratSource", String(100))
strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", String(255))
object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True)

thing: Mapped["Thing"] = relationship("Thing", back_populates="stratigraphy_logs")
Expand Down
18 changes: 15 additions & 3 deletions transfers/waterlevels_transducer_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pandas as pd
from pandas import Timestamp
from pydantic import ValidationError
from sqlalchemy import insert
from sqlalchemy.exc import DatabaseError
from sqlalchemy.orm import Session

Expand All @@ -42,6 +43,9 @@ def __init__(self, *args, **kw):
self.groundwater_parameter_id = get_groundwater_parameter_id()
self._itertuples_field_map = {}
self._df_columns = set()
self._observation_columns = {
column.key for column in TransducerObservation.__table__.columns
}
if self._sensor_types is None:
raise ValueError("_sensor_types must be set")
if self._partition_field is None:
Expand Down Expand Up @@ -134,7 +138,15 @@ def _transfer_hook(self, session: Session) -> None:
]

observations = [obs for obs in observations if obs is not None]
session.bulk_save_objects(observations)
if observations:
filtered_observations = [
{k: v for k, v in obs.items() if k in self._observation_columns}
for obs in observations
]
session.execute(
insert(TransducerObservation),
filtered_observations,
)
session.add(block)
logger.info(
f"Added {len(observations)} water levels {release_status} block"
Expand Down Expand Up @@ -164,7 +176,7 @@ def _make_observation(
release_status: str,
deps_sorted: list,
nodeployments: dict,
) -> TransducerObservation | None:
) -> dict | None:
deployment = _find_deployment(row.DateMeasured, deps_sorted)

if deployment is None:
Expand Down Expand Up @@ -195,7 +207,7 @@ def _make_observation(
payload
).model_dump()
legacy_payload = self._legacy_payload(row)
return TransducerObservation(**obspayload, **legacy_payload)
return {**obspayload, **legacy_payload}

except ValidationError as e:
logger.critical(f"Observation validation error: {e.errors()}")
Expand Down
Loading