diff --git a/.github/workflows/CD_production.yml b/.github/workflows/CD_production.yml index 73863baf2..bc6847138 100644 --- a/.github/workflows/CD_production.yml +++ b/.github/workflows/CD_production.yml @@ -40,11 +40,6 @@ jobs: echo "runtime: python313" >> app.yaml echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml echo "instance_class: F4" >> app.yaml - echo "inbound_services:" >> app.yaml - echo " - warmup" >> app.yaml - echo "automatic_scaling:" >> app.yaml - echo " min_instances: 0" >> app.yaml - echo " max_instances: 10" >> app.yaml echo "" >> app.yaml echo "env_variables:" >> app.yaml echo " MODE: \"production\"" >> app.yaml diff --git a/.github/workflows/CD_staging.yml b/.github/workflows/CD_staging.yml index 7b5d7c2b9..e2d9e83a4 100644 --- a/.github/workflows/CD_staging.yml +++ b/.github/workflows/CD_staging.yml @@ -40,11 +40,6 @@ jobs: echo "runtime: python313" >> app.yaml echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml echo "instance_class: F4" >> app.yaml - echo "inbound_services:" >> app.yaml - echo " - warmup" >> app.yaml - echo "automatic_scaling:" >> app.yaml - echo " min_instances: 0" >> app.yaml - echo " max_instances: 10" >> app.yaml echo "" >> app.yaml echo "env_variables:" >> app.yaml echo " MODE: \"production\"" >> app.yaml diff --git a/.github/workflows/dev_deploy.yml b/.github/workflows/dev_deploy.yml deleted file mode 100644 index da47376f1..000000000 --- a/.github/workflows/dev_deploy.yml +++ /dev/null @@ -1,83 +0,0 @@ -name: CD (Development) - -on: - push: - branches: [dev] - -permissions: - contents: write - -jobs: - staging-deploy: - - runs-on: ubuntu-latest - environment: staging - - steps: - - name: Check out source repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Install uv in container - uses: astral-sh/setup-uv@v6 - with: - version: "latest" - - - name: Generate requirements.txt - run: | - uv export -o requirements.txt - - - name: Authenticate to Google Cloud - uses: 'google-github-actions/auth@v2' - with: - credentials_json: ${{ secrets.CLOUD_DEPLOY_SERVICE_ACCOUNT_KEY }} - - # Uses Google Cloud Secret Manager to store secret credentials - - name: Create app.yaml - run: | - echo "service: dev-ocotillo-api" > app.yaml - echo "runtime: python313" >> app.yaml - echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml - echo "instance_class: F4" >> app.yaml - echo "inbound_services:" >> app.yaml - echo " - warmup" >> app.yaml - echo "automatic_scaling:" >> app.yaml - echo " min_instances: 0" >> app.yaml - echo " max_instances: 10" >> app.yaml - echo "" >> app.yaml - echo "env_variables:" >> app.yaml - echo " MODE: \"production\"" >> app.yaml - echo " DB_DRIVER: \"cloudsql\"" >> app.yaml - echo " CLOUD_SQL_INSTANCE_NAME: \"${{ secrets.CLOUD_SQL_INSTANCE_NAME }}\"" >> app.yaml - echo " CLOUD_SQL_DATABASE: \"${{ secrets.CLOUD_SQL_DATABASE }}\"" >> app.yaml - echo " CLOUD_SQL_USER: \"${{ secrets.CLOUD_SQL_USER }}\"" >> app.yaml - echo " CLOUD_SQL_PASSWORD: \"${{ secrets.CLOUD_SQL_PASSWORD }}\"" >> app.yaml - echo " GCS_SERVICE_ACCOUNT_KEY: \"${{ secrets.GCS_SERVICE_ACCOUNT_KEY }}\"" >> app.yaml - echo " GCS_BUCKET_NAME: \"${{secrets.GCS_BUCKET_NAME}}\"" >> app.yaml - echo " AUTHENTIK_URL: \"${{secrets.AUTHENTIK_URL}}\"" >> app.yaml - echo " AUTHENTIK_CLIENT_ID: \"${{secrets.AUTHENTIK_CLIENT_ID}}\"" >> app.yaml - echo " AUTHENTIK_AUTHORIZE_URL: \"${{secrets.AUTHENTIK_AUTHORIZE_URL}}\"" >> app.yaml - echo " AUTHENTIK_TOKEN_URL: \"${{secrets.AUTHENTIK_TOKEN_URL}}\"" >> app.yaml - - - - name: Deploy to Google Cloud - run: | - gcloud app deploy app.yaml --quiet --project ${{ secrets.GCP_PROJECT_ID }} - - # Clean up old versions - delete only the oldest version, one created and one destroyed - - name: Clean up oldest version - run: | - OLDEST_VERSION=$(gcloud app versions list --service=dev-ocotillo-api --project=${{ secrets.GCP_PROJECT_ID}} --format="value(id)" --sort-by="version.createTime" | head -n 1) - if [ ! -z "$OLDEST_VERSION" ]; then - echo "Deleting oldest version: $OLDEST_VERSION" - gcloud app versions delete $OLDEST_VERSION --service=dev-ocotillo-api --project=${{ secrets.GCP_PROJECT_ID }} --quiet - echo "Deleted oldest version: $OLDEST_VERSION" - else - echo "No versions to delete" - fi - - - name: Remove app.yaml - run: | - rm app.yaml - diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 34a7c677a..8e7ef3d18 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -5,7 +5,7 @@ name: Tests on: pull_request: - branches: [ "main",'pre-production', 'transfer'] + branches: ['production', 'staging', 'transfer'] permissions: contents: read diff --git a/api/geospatial.py b/api/geospatial.py index 4f25c2ff6..86951601d 100644 --- a/api/geospatial.py +++ b/api/geospatial.py @@ -14,7 +14,7 @@ # limitations under the License. # =============================================================================== import json -from typing import Annotated, List, Union +from typing import Annotated, List from fastapi import APIRouter, Query, HTTPException from fastapi.responses import FileResponse @@ -100,7 +100,9 @@ def get_feature_collection( things = get_thing_features(session, thing_type, group) - def make_feature_dict(thing, geometry, *other): + def make_feature_dict(thing, geometry, elevation, *other): + geometry = json.loads(geometry) + geometry["coordinates"].append(elevation) return { "type": "Feature", "properties": { @@ -109,7 +111,7 @@ def make_feature_dict(thing, geometry, *other): "name": thing.name, "group": group, }, - "geometry": json.loads(geometry), + "geometry": geometry, } features = [make_feature_dict(*item) for item in things] diff --git a/api/location.py b/api/location.py index 253663ab4..5e2b9abda 100644 --- a/api/location.py +++ b/api/location.py @@ -30,6 +30,7 @@ from services.geospatial_helper import make_within_wkt from services.query_helper import make_query, order_sort_filter, simple_get_by_id from services.crud_helper import model_patcher, model_deleter, model_adder +from services.location_helper import set_geographic_attributes from fastapi import APIRouter @@ -48,7 +49,9 @@ async def create_location( """ Create a new sample location in the database. """ - return model_adder(session, Location, location_data, user=user) + location = model_adder(session, Location, location_data, user=user) + set_geographic_attributes(session, location_data, location) + return location @router.patch( @@ -64,7 +67,9 @@ async def update_location( """ Update a sample location in the database. """ - return model_patcher(session, Location, location_id, location_data, user=user) + location = model_patcher(session, Location, location_id, location_data, user=user) + set_geographic_attributes(session, location_data, location) + return location # @router.get("/shapefile", summary="Get location as shapefile") diff --git a/constants.py b/constants.py index 2944f549f..93179ddb1 100644 --- a/constants.py +++ b/constants.py @@ -15,4 +15,5 @@ # =============================================================================== SRID_WGS84 = 4326 +SRID_UTM_ZONE_13N = 26913 # ============= EOF ============================================= diff --git a/db/base.py b/db/base.py index fe17e7905..31ec9ecd4 100644 --- a/db/base.py +++ b/db/base.py @@ -13,6 +13,29 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +""" +db/base.py + +This file defines the foundational components for the SQLAlchemy models. +It includes: +1. The declarative base class (`Base`) that all models will inherit from. +2. A helper function (`lexicon_term`) to create standardized foreign key columns + referencing the `lexicon_term` table. +3. A helper function (`pascal_to_snake`) to convert class names from PascalCase to snake_case + for automatic table naming. +4. Mixins for common functionality: + - `AutoBaseMixin`: Adds automatic table naming and an auto-incrementing primary key. + - `PropertiesMixin`: Adds a JSONB properties column for storing additional attributes. + - `ReleaseMixin`: Adds a release status column referencing the `lexicon_term` table. + - `AuditMixin`: Adds standard audit columns (created_at, created_by, updated_at, updated_by). +5. A simple `User` model for tracking user information in audit columns. +6. Polymorphic helper mixins (`StatusHistoryMixin`, `NotesMixin`, `AttributionMixin`, `PermissionMixin`.) + which provide a clean, reusable way to add relationships to the polymorphic + metadata tables. Any model that can have a status history (like Thing or Location) + can simply inherit from the `StatusHistoryMixin` mixin. +7. An `AuditMixin` to add standard audit columns to tables. +""" + from sqlalchemy import ( Column, DateTime, @@ -20,11 +43,15 @@ Integer, JSON, String, - Boolean, - Text, ForeignKey, ) -from sqlalchemy.orm import DeclarativeBase, declared_attr, Mapped, mapped_column +from sqlalchemy.orm import ( + DeclarativeBase, + declared_attr, + Mapped, + mapped_column, + relationship, +) from sqlalchemy_searchable import make_searchable from sqlalchemy_continuum import make_versioned import re @@ -41,6 +68,12 @@ class Base(DeclarativeBase): def lexicon_term(foreignkeykw=None, **kw): + """Create a SQLAlchemy mapped column for a self-referencing lexicon term. + + This helper function simplifies the creation of a string column that also + acts as a foreign key to the 'term' column of the 'lexicon_term' table. + It standardizes the column type to String(100) and sets the onupdate + behavior to "CASCADE".""" fkw = foreignkeykw if foreignkeykw else {} @@ -55,13 +88,18 @@ def pascal_to_snake(name): return re.sub(r"(? None: shp.field("id", "L") shp.field("name", "C") - for thing, point in things: + for thing, point, elevation in things: # Assume loc.point is WKT or a Shapely geometry or GeoJSON if isinstance(point, str): try: @@ -88,7 +85,7 @@ def create_shapefile(things: list, filename: str = "things.shp") -> None: geom = to_shape(point) shp.point(geom.x, geom.y) - shp.record(thing.id, thing.name) + shp.record(thing.id, thing.name, elevation) def make_within_wkt(sql: Select, wkt: str) -> Select: diff --git a/services/location_helper.py b/services/location_helper.py new file mode 100644 index 000000000..92eac854b --- /dev/null +++ b/services/location_helper.py @@ -0,0 +1,27 @@ +from shapely.wkt import loads +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from db.location import Location +from services.util import ( + get_state_from_point, + get_county_from_point, + get_quad_name_from_point, +) + + +def set_geographic_attributes( + session: Session, payload: BaseModel, location: Location +) -> None: + """ + Set geographic attributes for a location based off of the point. This function + is to be used for both POST and PATCH requests. + """ + if payload.point is not None: + point = loads(payload.point) + longitude = point.x + latitude = point.y + location.state = get_state_from_point(longitude, latitude) + location.county = get_county_from_point(longitude, latitude) + location.quad_name = get_quad_name_from_point(longitude, latitude) + session.commit() diff --git a/services/util.py b/services/util.py new file mode 100644 index 000000000..6d2d5ace5 --- /dev/null +++ b/services/util.py @@ -0,0 +1,110 @@ +from shapely.ops import transform +import pyproj +import httpx + +from constants import SRID_WGS84 + +TRANSFORMERS = {} + + +def transform_srid(geometry, source_srid, target_srid): + """ + geometry must be a shapely geometry object, like Point, Polygon, or MultiPolygon + """ + transformer_key = (source_srid, target_srid) + if transformer_key not in TRANSFORMERS: + source_crs = pyproj.CRS(f"EPSG:{source_srid}") + target_crs = pyproj.CRS(f"EPSG:{target_srid}") + transformer = pyproj.Transformer.from_crs( + source_crs, target_crs, always_xy=True + ) + TRANSFORMERS[transformer_key] = transformer + else: + transformer = TRANSFORMERS[transformer_key] + return transform(transformer.transform, geometry) + + +def get_tiger_data( + lon: float, lat: float, layer: int, outfields: str = "*" +) -> dict | None: + url = f"https://tigerweb.geo.census.gov/arcgis/rest/services/TIGERweb/State_County/MapServer/{layer}/query" + params = { + "f": "json", + "where": "1=1", + "geometry": f"{lon},{lat}", + "geometryType": "esriGeometryPoint", + "inSR": f"{SRID_WGS84}", + "spatialRel": "esriSpatialRelIntersects", + "outFields": outfields, + "returnGeometry": "false", + } + resp = httpx.get(url, params=params, timeout=30) + data = resp.json() + if not data.get("features"): + return None + + return data["features"][0]["attributes"] + + +def get_state_from_point(lon: float, lat: float) -> str: + attrs = get_tiger_data(lon, lat, layer=0, outfields="BASENAME") + return attrs["BASENAME"] + + +def get_county_from_point(lon: float, lat: float) -> str: + """ + Look up county for a given longitude/latitude + using the US Census TIGERWeb REST API. + """ + + attrs = get_tiger_data(lon, lat, layer=1, outfields="BASENAME") + return attrs["BASENAME"] + + +def get_quad_name_from_point(lon: float, lat: float) -> str: + url = "https://carto.nationalmap.gov/arcgis/rest/services/map_indices/MapServer/10/query" + params = { + "f": "json", + "geometry": f"{lon},{lat}", + "geometryType": "esriGeometryPoint", + "inSR": f"{SRID_WGS84}", + "spatialRel": "esriSpatialRelIntersects", + "outFields": "CELL_NAME,CELL_MAPCODE", + "returnGeometry": "false", + } + + resp = httpx.get(url, params=params, timeout=30) + data = resp.json() + + if data["features"]: + attrs = data["features"][0]["attributes"] + return attrs["CELL_NAME"] + else: + print(f"No quad name found for POINT ({lon} {lat})") + return None + + +def get_epqs_elevation_from_point(lon: float, lat: float) -> float: + url = "https://epqs.nationalmap.gov/v1/json" + params = { + "x": lon, + "y": lat, + "units": "Meters", + "wkid": f"{SRID_WGS84}", + "includeDate": False, + } + + resp = httpx.get(url, params=params) + data = resp.json() + + return data["value"] + + +if __name__ == "__main__": + x = -106.904107 + y = 34.068198 + + print(get_state_from_point(x, y)) + print(get_county_from_point(x, y)) + print(get_quad_name_from_point(x, y)) + print(get_epqs_elevation_from_point(x, y)) diff --git a/tests/conftest.py b/tests/conftest.py index 85092920e..2e14dbdc2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,17 +9,18 @@ def location(): with session_ctx() as session: loc = Location( - name="first location", + # name="first location", notes="these are some test notes", - point="POINT(0 0 0)", + point="POINT(-107.949533 33.809665)", + elevation=2464.9, release_status="draft", elevation_accuracy=100, elevation_method="Survey-grade GPS", coordinate_accuracy=50, coordinate_method="GPS, uncorrected", state="New Mexico", - county="Socorro", - quad_name="some NM quad", + county="Catron", + quad_name="Luera Mountains West", ) session.add(loc) session.commit() @@ -33,8 +34,9 @@ def location(): def second_location(): with session_ctx() as session: location = Location( - name="second location", - point="POINT (10.2 10.2 0)", + # name="second location", + point="POINT (10.2 10.2)", + elevation=0, release_status="draft", ) session.add(location) diff --git a/tests/test_collabnet.py b/tests/test_collabnet.py index 2a8ba0d7f..96712e081 100644 --- a/tests/test_collabnet.py +++ b/tests/test_collabnet.py @@ -13,17 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime - import pytest +from constants import SRID_WGS84 from db import Location -from db.engine import get_db_session, session_ctx +from db.engine import session_ctx -# from db import get_db_session, database_sessionmaker -# from db.base import Location -# from db.thing.well import WellThing -# from db.collabnet import CollaborativeNetworkWell from services.thing_helper import add_thing from tests import client @@ -31,7 +26,7 @@ @pytest.fixture(scope="function") def well(): with session_ctx() as session: - loc = Location(point="SRID=4326;POINT(0 0)") + loc = Location(point=f"SRID={SRID_WGS84};POINT(0 0)") session.add(loc) session.commit() diff --git a/tests/test_geospatial.py b/tests/test_geospatial.py index 5759a87d4..d8ff95e14 100644 --- a/tests/test_geospatial.py +++ b/tests/test_geospatial.py @@ -17,6 +17,7 @@ import pytest from main import app +from constants import SRID_WGS84 from core.dependencies import ( admin_function, editor_function, @@ -75,12 +76,14 @@ def populate(): session.commit() loc1 = Location( - name="Test Location 1", - point=geofunc.ST_GeomFromText("POINT(10.1 10.1 0)", srid=4326), + # name="Test Location 1", + point=geofunc.ST_GeomFromText("POINT(10.1 10.1)", srid=SRID_WGS84), + elevation=0, ) loc2 = Location( - name="Test Location 2", - point=geofunc.ST_GeomFromText("POINT(20 20 0)", srid=4326), + # name="Test Location 2", + point=geofunc.ST_GeomFromText("POINT(20 20)", srid=SRID_WGS84), + elevation=0, ) session.add(loc1) session.add(loc2) diff --git a/tests/test_location.py b/tests/test_location.py index a27ddf775..804db5632 100644 --- a/tests/test_location.py +++ b/tests/test_location.py @@ -42,9 +42,10 @@ def override_dependencies_fixture(): def test_add_location(): payload = { - "name": "test location", + # "name": "test location", "notes": "these are some test notes", - "point": "POINT Z (10.1 10.1 0)", + "point": "POINT (-106.607784 35.118924)", + "elevation": 1558.8, "release_status": "draft", "elevation_accuracy": 1.0, "elevation_method": "Survey-grade GPS", @@ -57,14 +58,18 @@ def test_add_location(): data = response.json() assert "id" in data assert "created_at" in data - assert data["name"] == payload["name"] + # assert data["name"] == payload["name"] assert data["notes"] == payload["notes"] assert data["point"] == payload["point"] + assert data["elevation"] == payload["elevation"] assert data["release_status"] == payload["release_status"] assert data["elevation_accuracy"] == payload["elevation_accuracy"] assert data["elevation_method"] == payload["elevation_method"] assert data["coordinate_accuracy"] == payload["coordinate_accuracy"] assert data["coordinate_method"] == payload["coordinate_method"] + assert data["state"] == "New Mexico" + assert data["county"] == "Bernalillo" + assert data["quad_name"] == "Albuquerque East" # cleanup after test cleanup_post_test(Location, data["id"]) @@ -75,9 +80,10 @@ def test_add_location(): def test_update_location(location): payload = { - "name": "patched name", + # "name": "patched name", "notes": "these are some patched notes", - "point": "POINT Z (10.1 20.2 0)", + "point": "POINT (-106.904107 34.068198)", + "elevation": 1408.3, "release_status": "draft", "elevation_accuracy": 2.0, "elevation_method": "Survey-grade GPS", @@ -88,16 +94,23 @@ def test_update_location(location): assert response.status_code == 200 data = response.json() assert data["id"] == location.id - assert data["name"] == payload["name"] + # assert data["name"] == payload["name"] assert data["notes"] == payload["notes"] assert data["point"] == payload["point"] + assert data["elevation"] == payload["elevation"] assert data["release_status"] == payload["release_status"] assert data["elevation_accuracy"] == payload["elevation_accuracy"] assert data["elevation_method"] == payload["elevation_method"] assert data["coordinate_accuracy"] == payload["coordinate_accuracy"] assert data["coordinate_method"] == payload["coordinate_method"] + assert data["state"] == "New Mexico" + assert data["county"] == "Socorro" + assert data["quad_name"] == "Socorro" # cleanup after test + payload["state"] = location.state + payload["county"] = location.county + payload["quad_name"] = location.quad_name cleanup_patch_test(Location, payload, location) @@ -106,9 +119,9 @@ def test_patch_location_404_not_found(location): Testing updating a location that does not exist """ bad_location_id = 99999 - location_name_patch = "another test name" + location_notes_patch = "patched notes" response = client.patch( - f"/location/{bad_location_id}", json={"name": location_name_patch} + f"/location/{bad_location_id}", json={"notes": location_notes_patch} ) data = response.json() assert response.status_code == 404 @@ -125,14 +138,18 @@ def test_get_locations(location): response = client.get("/location") assert response.status_code == 200 data = response.json() + from pprint import pprint + + pprint(data, indent=2) assert data["total"] == 1 assert data["items"][0]["id"] == location.id assert data["items"][0]["created_at"] == location.created_at.isoformat().replace( "+00:00", "Z" ) - assert data["items"][0]["name"] == location.name + # assert data["items"][0]["name"] == location.name assert data["items"][0]["notes"] == location.notes assert data["items"][0]["point"] == to_shape(location.point).wkt + assert data["items"][0]["elevation"] == location.elevation assert data["items"][0]["release_status"] == location.release_status assert data["items"][0]["elevation_accuracy"] == location.elevation_accuracy assert data["items"][0]["elevation_method"] == location.elevation_method @@ -149,8 +166,9 @@ def test_get_location_by_id(location): data = response.json() assert data["id"] == location.id assert data["created_at"] == location.created_at.isoformat().replace("+00:00", "Z") - assert data["name"] == location.name + # assert data["name"] == location.name assert data["point"] == to_shape(location.point).wkt + assert data["elevation"] == location.elevation assert data["release_status"] == location.release_status assert data["elevation_accuracy"] == location.elevation_accuracy assert data["elevation_method"] == location.elevation_method diff --git a/transfers/contact_transfer.py b/transfers/contact_transfer.py index 41a46d1eb..053f90ae3 100644 --- a/transfers/contact_transfer.py +++ b/transfers/contact_transfer.py @@ -15,7 +15,7 @@ # =============================================================================== import numpy as np import pandas as pd -from transfers.util import read_csv, filter_to_valid_point_ids, logger +from transfers.util import read_csv, filter_to_valid_point_ids, logger, replace_nans from db import Thing, Contact, ThingContactAssociation, Email, Phone, Address from schemas.contact import CreateContact, CreateAddress @@ -53,8 +53,8 @@ def transfer_contacts(session): odf = odf.join(ldf.set_index("OwnerKey"), on="OwnerKey") - odf = odf.replace(pd.NA, None) - odf = odf.replace({np.nan: None}) + odf = replace_nans(odf) + odf = filter_to_valid_point_ids(session, odf) for i, row in odf.iterrows(): thing = session.query(Thing).where(Thing.name == row.PointID).first() diff --git a/transfers/link_ids_transfer.py b/transfers/link_ids_transfer.py index fc1cd8a97..4748a7e46 100644 --- a/transfers/link_ids_transfer.py +++ b/transfers/link_ids_transfer.py @@ -14,6 +14,7 @@ # limitations under the License. # =============================================================================== import re + import pandas as pd from db import Thing, ThingIdLink @@ -22,6 +23,7 @@ logger, extract_organization, read_csv, + replace_nans, ) @@ -34,7 +36,9 @@ def transfer_link_ids_welldata(session): # RULE: exclude rows where both ids are null if pd.isna(row.OSEWellID) and pd.isna(row.OSEWelltagID): - logger.warning(f"Both OSEWellID and OSEWelltagID are null for row {i}") + logger.warning( + f"Both OSEWellID and OSEWelltagID are null for {row.PointID}" + ) continue thing = session.query(Thing).where(Thing.name == row.PointID).first() @@ -150,6 +154,7 @@ def transfer_link_ids(session, site_type="GW"): ldf = ldf[ldf["SiteType"] == site_type] ldf = ldf[ldf["Easting"].notna() & ldf["Northing"].notna()] # ldf = ldf[ldf["AlternateSiteID"].notna()] + ldf = replace_nans(ldf) ldf = filter_to_valid_point_ids(session, ldf) diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 306df6533..39a688cf7 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -18,7 +18,7 @@ from db import LocationThingAssociation from services.thing_helper import add_thing -from transfers.util import make_location, read_csv, logger +from transfers.util import make_location, read_csv, logger, replace_nans def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -> None: @@ -26,20 +26,30 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - ldf = read_csv("Location") ldf = ldf[ldf["SiteType"] == site_type] ldf = ldf[ldf["Easting"].notna() & ldf["Northing"].notna()] + ldf = replace_nans(ldf) n = len(ldf) start_time = time.time() for i, row in enumerate(ldf.itertuples()): + pointid = row.PointID + if ldf[ldf["PointID"] == pointid].shape[0] > 1: + logger.warning(f"PointID {pointid} has duplicate records. Skipping.") + continue + if limit and i >= limit: logger.warning(f"Reached limit of {limit} rows. Stopping migration.") break - if i and not i % 100: + if i and not i % 25: logger.info( f"Processing row {i} of {n}. {row.PointID}, avg rows per second: {i / (time.time() - start_time):.2f}" ) session.commit() - location = make_location(row) + try: + location = make_location(row) + except Exception as e: + logger.error(f"Error creating location for {row.PointID}: {e}") + continue session.add(location) payload = make_payload(row) thing_type = payload.pop("thing_type") @@ -49,6 +59,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - assoc.location = location assoc.thing = spring session.add(assoc) + session.commit() def transfer_springs(session, limit=None): diff --git a/transfers/transfer.py b/transfers/transfer.py index efd71c103..5d5be3cbc 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import time + from dotenv import load_dotenv load_dotenv() @@ -38,21 +40,34 @@ def erase_and_initalize(session: Session) -> None: + logger.info("Erasing existing data and initializing lexicon and sensors") + starttime = time.time() Base.metadata.drop_all(session.bind) Base.metadata.create_all(session.bind) + elapsed_time = time.time() - starttime + logger.info(f"Done erasing existing data. {elapsed_time:0.2f}s") + logger.info("Initializing lexicon and sensors") + starttime = time.time() init_lexicon() + elapsed_time = time.time() - starttime + logger.info(f"Done initializing lexicon. {elapsed_time:0.2f}s") + + starttime = time.time() init_sensor(session) + elapsed_time = time.time() - starttime + logger.info(f"Done initializing sensors. {elapsed_time:0.2f}s") -def message(msg, pad=10): +def message(msg, pad=10, new_line_at_top=True): pad = "*" * pad - logger.info("") + if new_line_at_top: + logger.info("") logger.info(f"{pad} {msg} {pad}") def main_transfer(): - logger.info("Starting transfer") + message("STARTING TRANSFER", new_line_at_top=False) init = True @@ -69,7 +84,20 @@ def main_transfer(): cleanup_wells_flag = False - limit = 500 + transfer_well_flag = True + transfer_spring_flag = True + transfer_perennial_stream_flag = True + transfer_ephemeral_stream_flag = True + transfer_met_flag = True + transfer_contacts_flag = True + transfer_waterlevels_flag = True + transfer_link_ids_flag = True + transfer_assets_flag = True + transfer_groups_flag = True + + cleanup_wells_flag = True + + limit = 15 with session_ctx() as sess: if init: erase_and_initalize(sess) diff --git a/transfers/util.py b/transfers/util.py index 79fec1c2c..327c55391 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -13,21 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime +from datetime import datetime, timezone, timedelta +import pytz import re import io import logging -import httpx -import pyproj from shapely import Point -from shapely.ops import transform + from sqlalchemy.orm import Session import pandas as pd +import numpy as np +from constants import SRID_WGS84, SRID_UTM_ZONE_13N from db import Thing, Location from services.gcs_helper import get_storage_bucket - +from services.util import ( + transform_srid, + get_epqs_elevation_from_point, + get_state_from_point, + get_county_from_point, + get_quad_name_from_point, +) import sys @@ -58,10 +65,16 @@ def flush(self): ) logger = logging.getLogger(__name__) +# workaround to not redirect httpx logging +logging.getLogger("httpx").setLevel(logging.WARNING) + # redirect stderr to the logger sys.stderr = StreamToLogger(logger, logging.ERROR) -TRANSFORMERS = {} + +def replace_nans(df: pd.DataFrame, default=None) -> pd.DataFrame: + df = df.replace(pd.NA, default) + return df.replace({np.nan: default}) def read_csv(name: str, dtype: dict | None = None) -> pd.DataFrame: @@ -75,23 +88,6 @@ def read_csv(name: str, dtype: dict | None = None) -> pd.DataFrame: return pd.read_csv(io.BytesIO(data)) -def transform_srid(geometry, source_srid, target_srid): - """ - geometry must be a shapely geometry object, like Point, Polygon, or MultiPolygon - """ - transformer_key = (source_srid, target_srid) - if transformer_key not in TRANSFORMERS: - source_crs = pyproj.CRS(f"EPSG:{source_srid}") - target_crs = pyproj.CRS(f"EPSG:{target_srid}") - transformer = pyproj.Transformer.from_crs( - source_crs, target_crs, always_xy=True - ) - TRANSFORMERS[transformer_key] = transformer - else: - transformer = TRANSFORMERS[transformer_key] - return transform(transformer.transform, geometry) - - def get_valid_point_ids(session, thing_type="water well"): things = get_valid_things(session, thing_type) valid_pointids = [thing.name for thing in things] @@ -132,143 +128,222 @@ def convert_to_wgs84_vertical_datum(row, z): return z -def get_state_from_point(lon: float, lat: float) -> str: - attrs = get_tiger_data(lon, lat, layer=0, outfields="BASENAME") - return attrs["BASENAME"] - - -def get_county_from_point(lon: float, lat: float) -> str: - """ - Look up county for a given longitude/latitude - using the US Census TIGERWeb REST API. - """ - - attrs = get_tiger_data(lon, lat, layer=1, outfields="BASENAME") - return attrs["BASENAME"] - - -def get_tiger_data( - lon: float, lat: float, layer: int, outfields: str = "*" -) -> dict | None: - url = f"https://tigerweb.geo.census.gov/arcgis/rest/services/TIGERweb/State_County/MapServer/{layer}/query" - params = { - "f": "json", - "where": "1=1", - "geometry": f"{lon},{lat}", - "geometryType": "esriGeometryPoint", - "inSR": "4326", - "spatialRel": "esriSpatialRelIntersects", - "outFields": outfields, - "returnGeometry": "false", - } - resp = httpx.get(url, params=params, timeout=15) - data = resp.json() - if not data.get("features"): - return None - - return data["features"][0]["attributes"] - - -def get_quad_name_from_point(lon: float, lat: float) -> str: - url = "https://carto.nationalmap.gov/arcgis/rest/services/map_indices/MapServer/10/query" - params = { - "f": "json", - "geometry": f"{lon},{lat}", - "geometryType": "esriGeometryPoint", - "inSR": "4326", - "spatialRel": "esriSpatialRelIntersects", - "outFields": "CELL_NAME,CELL_MAPCODE", - "returnGeometry": "false", - } - - resp = httpx.get(url, params=params, timeout=15) - logger.info(resp) - data = resp.json() - - if data["features"]: - attrs = data["features"][0]["attributes"] - return attrs["CELL_NAME"] +def convert_mt_to_utc(dt_record: datetime): + t = dt_record.time() + if t.hour == 0 and t.minute == 0: + # no time was measured, so just set the timezone to UTC and keep + # time at 00:00 + dt_record = dt_record.replace(tzinfo=timezone.utc) else: - logger.warning(f"No quad name found for POINT ({lon} {lat})") - - -def get_epqs_elevation(lon: float, lat: float) -> float: - url = "https://epqs.nationalmap.gov/v1/json" - params = { - "x": lon, - "y": lat, - "units": "Meters", - "wkid": "4326", - "includeDate": False, - } - - resp = httpx.get(url, params=params) - data = resp.json() - - return data["value"] + tz = pytz.timezone("America/Denver") + dt_record = tz.localize(dt_record) + if dt_record.dst() == timedelta(0): + # MST + utc_offset = 7 + else: + # MDT + utc_offset = 6 + dt_record = dt_record - timedelta(hours=utc_offset) + dt_record = dt_record.replace(tzinfo=timezone.utc) + return dt_record def make_location(row: pd.Series) -> Location: + point = Point(row.Easting, row.Northing) - # TODO: should the altitude be fetched from USGS' - # Elevation Point Query Service https://epqs.nationalmap.gov/v1/docs - xypoint = transform_srid( - Point(row.Easting, row.Northing), - source_srid=26913, - target_srid=4326, # WGS84 SRID + # Convert the point to a WGS84 coordinate system + transformed_point = transform_srid( + point, source_srid=SRID_UTM_ZONE_13N, target_srid=SRID_WGS84 ) - z = 0 + state = get_state_from_point(transformed_point.x, transformed_point.y) + county = get_county_from_point(transformed_point.x, transformed_point.y) + quad_name = get_quad_name_from_point(transformed_point.x, transformed_point.y) - # idx = row.index - # idx = df.index.get_loc(row.name) - # print('asdfa', idx, row.name) - # if not z: - # z = get_epqs_elevation(xypoint.x, xypoint.y) + z = row.Altitude + if z: + elevation_from_epqs = False + z = z * 0.3048 + else: + elevation_from_epqs = True + logger.info( + f"Location {row.PointID} has no Altitude. Setting from National Map EPQS for " + ) + z = get_epqs_elevation_from_point(transformed_point.x, transformed_point.y) - # z = row.Altitude if row.Altitude else 0 - # convert z from ft to meters - z = z * 0.3048 + if elevation_from_epqs: + elevation_method = "USGS National Elevation Dataset (NED)" + elif not (pd.isna(row.AltitudeMethod)): + elevation_method = lu_to_lexicon_map[f"LU_AltitudeMethod:{row.AltitudeMethod}"] + else: + elevation_method = None - point = Point(row.Easting, row.Northing, z) + if not (pd.isna(row.CoordinateMethod)): + coordinate_method = lu_to_lexicon_map[ + f"LU_CoordinateMethod:{row.CoordinateMethod}" + ] + else: + coordinate_method = None - # Convert the point to a WGS84 coordinate system - transformed_point = transform_srid( - point, source_srid=26913, target_srid=4326 # WGS84 SRID - ) + """ + Developer's notes + + AMP folks said that the earlier date between DateCreated and SiteDate is when + the site was inventoried, whereas the later is when the record was made in + the database. This was because they were used interchangeably. + """ + if row.DateCreated and row.SiteDate: - # TODO: Add tests for these functions. move to a different location - # use in Location API + date_created = datetime.strptime(row.DateCreated, "%Y-%m-%d %H:%M:%S.%f") + site_date = datetime.strptime(row.SiteDate, "%Y-%m-%d %H:%M:%S.%f") - # TODO: determine correct created_at value - # created_at = row.DateCreated - name = row.PointID + if date_created > site_date: + created_at = date_created + else: + created_at = site_date + elif row.DateCreated and not row.SiteDate: + created_at = datetime.strptime(row.DateCreated, "%Y-%m-%d %H:%M:%S.%f") + else: + # TODO: should this be set to SiteDate if DateCreated is None and SiteDate is populated? + created_at = None + + # convert created_at from MST/MDT to UTC + if created_at is not None: + created_at = convert_mt_to_utc(created_at) + + # TODO: AMP feedback is required for transfering coordinate accuracy values + # from NM_Aquifer to Ocotillo + # if row.CoordinateAccuracy == "U" or pd.isna(row.CoordinateAccuracy): + # # map "Unknown" to None + # row.CoordinateAccuracy = None + # elif row.CoordinateAccuracy == "5m": + # row.CoordinateAccuracy = 5.0 + # else: + # seconds = 0 + # minutes = 0 + # if row.CoordinateAccuracy == "1": + # seconds = 0.1 + # elif row.CoordinateAccuracy == "5": + # seconds = 0.5 + # elif row.CoordinateAccuracy == "F": + # seconds = 5 + # elif row.CoordinateAccuracy == "H": + # seconds = 0.01 + # elif row.CoordinateAccuracy == "M": + # minutes = 1 + # elif row.CoordinateAccuracy == "R": + # seconds = 3 + # elif row.CoordinateAccuracy == "S": + # seconds = 1 + # else: + # seconds = 10 + # coordinate_accuracy_decimal_deg = minutes/60 + seconds / 3600 + + # """ + # Developer's notes + + # To convert accuracy from decimal degrees to meters we do the following: + + # 1. Add the coordinate accuracy to both the latitude and longitude to + # find the "+" distance from the location + # 2. Convert "+" accuracy coordinates from decimal degrees to UTM Zone 13 + # N + # 3. Find the distance in meters from the original Easting/Northing and + # define this as the "+" accuracy in meters + # 4. Subtract the coordinate accuracy to both the latitude and longitude + # to find the "-" distance from the location + # 5. Convert the "-" accuracy coordinates from decimal degrees to UTM Zone + # 13 N + # 6. Find the distance in meters from the original Easting/Northing and + # define this as the "-" accuracy in meters + # 7. Set the coordinate accuracy in meters as the mean of the "+" and "-" + # distances from the location + # """ + # original_longitude = transformed_point.x + # original_latitude = transformed_point.y + + # plus_longitude = original_longitude + coordinate_accuracy_decimal_deg + # plus_latitude = original_latitude + coordinate_accuracy_decimal_deg + # plus_point_decimal_deg = Point(plus_longitude, plus_latitude) + # plus_point_utm_zone_13_n = transform_srid( + # plus_point_decimal_deg, + # SRID_WGS84, + # SRID_UTM_ZONE_13N) + + # minus_longitude = original_longitude - coordinate_accuracy_decimal_deg + # minus_latitude = original_latitude - coordinate_accuracy_decimal_deg + # minus_point_decimal_deg = Point(minus_longitude, minus_latitude) location = Location( nma_pk_location=row.LocationId, - # TODO: determine if PointID should map to location.name or thing.name or if the Location table needs a name field at all. - name=row.PointID, - point=transformed_point.wkt, + # name=row.PointID, + point=point.wkt, + elevation=z, release_status="public" if row.PublicRelease else "private", elevation_accuracy=row.AltitudeAccuracy, - elevation_method=row.AltitudeMethod, - # created_at=created_at, - coordinate_accuracy=row.CoordinateAccuracy, - coordinate_method=row.CoordinateMethod, + elevation_method=elevation_method, + created_at=created_at, + # TODO: get AMP feedback on transfering these values. See above note + # coordinate_accuracy=row.CoordinateAccuracy, + coordinate_method=coordinate_method, nma_coordinate_notes=row.CoordinateNotes, nma_notes_location=row.LocationNotes, + state=state, + county=county, + quad_name=quad_name, ) return location +def make_lu_to_lexicon_mapper(): + lu_tables = [ + # "LU_AltitudeDatum", # the code is the value, so no need for mapping + "LU_AltitudeMethod", # CODE/MEANING + "LU_CollectionMethod", # CODE/MEANING + "LU_ConstructionMethod", # CODE/MEANING + "LU_CoordinateAccuracy", # CODE/MEANING + # "LU_CoordinateDatum", # the code is the value, so no need for mapping + "LU_CoordinateMethod", # CODE/MEANING + "LU_CurrentUse", # CODE/MEANING + "LU_DataQuality", # CODE/MEANING + "LU_DataSource", # CODE/MEANING + "LU_Depth_CompletionSource", # CODE/MEANING + "LU_Discharge_ChemistrySource", # CODE/MEANING + # "LU_FieldNoteTypes", # not being used in the transfers since there are no records + # "LU_Formations", # needs to be cleaned before it can be used + "LU_LevelStatus", # CODE/MEANING + # "LU_Lithology", # needs to be cleaned before it can be used + "LU_MajorAnalyte", # CODE/MEANING + "LU_MeasurementMethod", # CODE/MEANING + # "LU_MeasuringAgency", # the abreviation is what is used in the new schema + "LU_MinorTraceAnalyte", # CODE/MEANING + "LU_MonitoringStatus", # CODE/MEANING + "LU_SampleType", # CODE/MEANING + "LU_SiteType", # CODE/MEANING + "LU_Status", # CODE/MEANING + ] + + mappers = {} + + for lu_table in lu_tables: + table = read_csv(lu_table) + + for i, row in table.iterrows(): + if lu_table == "LU_Formations": + code = row.Code + meaning = row.Meaning + else: + code = row.CODE + meaning = row.MEANING + + mappers.update({f"{lu_table}:{code}": meaning}) + return mappers + + +lu_to_lexicon_map = make_lu_to_lexicon_mapper() + + if __name__ == "__main__": - # quad = get_quad_name_from_point(-106.5, 34.2) - # print(quad) - # state = get_state_from_point(-106.5, 34.2) - # print(state) - # county = get_county_from_point(-106.5, 34.2) - # print(county) - z = get_epqs_elevation(-106.5, 34.2) - print(z) + print(lu_to_lexicon_map) # ============= EOF ============================================= diff --git a/transfers/waterlevels_transfer.py b/transfers/waterlevels_transfer.py index 5dfe41191..53eff8d0c 100644 --- a/transfers/waterlevels_transfer.py +++ b/transfers/waterlevels_transfer.py @@ -13,13 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import time import uuid from datetime import datetime import pandas as pd from db import Thing, Sample, Observation -from transfers.util import filter_to_valid_point_ids, logger, read_csv +from transfers.util import ( + filter_to_valid_point_ids, + logger, + read_csv, + convert_mt_to_utc, +) def transfer_water_levels(session): @@ -28,13 +34,29 @@ def transfer_water_levels(session): wd = filter_to_valid_point_ids(session, wd) gwd = wd.groupby(["PointID"]) + start_time = time.time() for index, group in gwd: - for row in group.itertuples(): + logger.info(f"Processing PointID: {index[0]}") + n = len(group) + for i, row in enumerate(group.itertuples()): + if i and not i % 25: + logger.info( + f"Processing row {i} of {n}. {row.PointID}, avg rows per second: {i / (time.time() - start_time):.2f}" + ) + session.commit() + if pd.isna(row.DepthToWater) or pd.isna(row.DateMeasured): logger.warning(f"Skipping row {row.Index} due to missing data.") continue - dt = datetime.fromisoformat(row.DateMeasured) + if not pd.isna(row.TimeMeasured): + dt_measured = f"{row.DateMeasured} {row.TimeMeasured}" + else: + dt_measured = f"{row.DateMeasured} 12:00:00 AM" + + dt = datetime.strptime(dt_measured, "%Y-%m-%d %I:%M:%S %p") + dt_utc = convert_mt_to_utc(dt) + thing = session.query(Thing).where(Thing.name == row.PointID).first() if thing is None: logger.warning( @@ -47,7 +69,7 @@ def transfer_water_levels(session): sample.sample_type = "groundwater level" sample.field_sample_id = str(uuid.uuid4()) - sample.sample_date = dt + sample.sample_date = dt_utc sample.thing = thing session.add(sample) @@ -67,7 +89,7 @@ def transfer_water_levels(session): obs.unit = "ft" session.add(obs) - session.commit() + session.commit() # ============= EOF ============================================= diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 19d726bdf..bd70cb00c 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -14,35 +14,38 @@ # limitations under the License. # =============================================================================== import time -import numpy as np -import pandas as pd from pydantic import ValidationError from sqlalchemy import select from db import LocationThingAssociation, Thing, WellScreen, Location from schemas.thing import CreateWellScreen from services.thing_helper import add_thing +from services.util import ( + get_state_from_point, + get_county_from_point, + get_quad_name_from_point, +) from transfers.util import ( make_location, filter_to_valid_point_ids, read_csv, - get_state_from_point, - get_county_from_point, - get_quad_name_from_point, logger, + replace_nans, ) ADDED = [] -def transfer_wells(session, start_index=0, limit=0): +def transfer_wells(session, limit=0): wdf = read_csv("WellData", dtype={"OSEWelltagID": str}) ldf = read_csv("Location") ldf = ldf.drop(["PointID", "SSMA_TimeStamp"], axis=1) wdf = wdf.join(ldf.set_index("LocationId"), on="LocationId") wdf = wdf[wdf["SiteType"] == "GW"] wdf = wdf[wdf["Easting"].notna() & wdf["Northing"].notna()] - wdf = wdf.iloc[start_index : start_index + limit] + + wdf = replace_nans(wdf) + n = len(wdf) start_time = time.time() results = { @@ -50,6 +53,11 @@ def transfer_wells(session, start_index=0, limit=0): } made_things = [] for i, row in enumerate(wdf.itertuples()): + pointid = row.PointID + if wdf[wdf["PointID"] == pointid].shape[0] > 1: + logger.warning(f"PointID {pointid} has duplicate records. Skipping.") + continue + if limit and i >= limit: logger.warning("Reached limit of %d rows. Stopping migration.", limit) break @@ -63,8 +71,8 @@ def transfer_wells(session, start_index=0, limit=0): try: location = make_location(row) except Exception as e: - logger.warning(f"Error making location for row {i}: {e}") - break + logger.warning(f"Error making location for {row.PointID}: {e}") + continue # print(location_row) session.add(location) @@ -109,13 +117,13 @@ def transfer_wells(session, start_index=0, limit=0): made_things.append(row.PointID) results["made_things"] = made_things + session.commit() return results def transfer_wellscreens(session, limit=None): wdf = read_csv("WellScreens") - wdf = wdf.replace(pd.NA, None) - wdf = wdf.replace({np.nan: None}) + wdf = replace_nans(wdf) wdf = filter_to_valid_point_ids(session, wdf) @@ -158,13 +166,14 @@ def transfer_wellscreens(session, limit=None): CreateWellScreen.model_validate(well_screen_data) well_screen = WellScreen(**well_screen_data) session.add(well_screen) - session.commit() except ValidationError as e: logger.warning( f"Validation error for row {i} with PointID {row.PointID}: {e}" ) continue + session.commit() + def cleanup_wells(session): locations = session.query(Location).all()