diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..9d866e3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "pip" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 5ea32ac..05fe71e 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -5,9 +5,9 @@ name: CI/CD on: push: - branches: [ "main", "feature/jir", "dev/jab"] + branches: [ "pre-production", "feature/jir", "dev/jab"] pull_request: - branches: [ "main"] + branches: [ "pre-production"] permissions: contents: read @@ -18,26 +18,39 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + - name: Set up Python 3.10 - uses: actions/setup-python@v3 + uses: actions/setup-python@v4 with: python-version: "3.10" cache: "pip" + + - name: Cache pip dependencies + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + - name: Install dependencies run: | python -m pip install --upgrade pip pip install flake8 pytest mypy if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Type check with mypy run: | mypy . --ignore-missing-imports + - name: Test with pytest run: | pytest -s -rx tests diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index 9c64e4b..699394f 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -3,10 +3,12 @@ name: Publish Python 🐍 distributions 📦 to PyPI and TestPyPI # pushes only occur on successful pull request merges # this makes it so that the main branch gets published to PyPi, not the # target branch from the pull request (pre-production) + +# pushing tags should be the only way to trigger this workflow on: push: - branches: - - main + tags: + - '*' jobs: build-and-publish-if-merged: @@ -20,7 +22,7 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.9 - cache: 'pip' + - name: Install pypa/build run: >- python -m diff --git a/CHANGELOG.md b/CHANGELOG.md index 80bf398..c92224a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,24 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased: 0.8.0 +## 0.9.2 + +### Added +- `--sites-only` flag to only retrieve site data +- `--output-format` flag to write out sites/summary tables as csv or geojson. + - options are `csv` or `geojson` + - timeseries data is always written to a csv +- NM OSE POD data for sites. + - can be removed from output with `--no-nmose-pod` +- `--output-dir` to change the output directory to a location other than `.` (the current working directory) + +### Changed +- `output` to `output-type` for CLI + +### Fixed +- a bug with `--site-limit`. it now exports the number of sets requested by the + +## 0.8.0 ### Added - water level for WQP diff --git a/README.md b/README.md index 4e43866..0cb15ac 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Format code](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/format_code.yml/badge.svg?branch=main)](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/format_code.yml) [![Publish Python 🐍 distributions 📦 to PyPI and TestPyPI](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/publish-to-pypi.yml/badge.svg)](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/publish-to-pypi.yml) [![CI/CD](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/cicd.yml/badge.svg)](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/cicd.yml) - +[![Dependabot Updates](https://github.com/DataIntegrationGroup/DataIntegrationEngine/actions/workflows/dependabot/dependabot-updates/badge.svg)](https://github.com/DataIntegrationGroup/DataIntegrationEngine/actions/workflows/dependabot/dependabot-updates) ![NMWDI](https://newmexicowaterdata.org/wp-content/uploads/2023/11/newmexicowaterdatalogoNov2023.png) ![NMBGMR](https://waterdata.nmt.edu/static/nmbgmr_logo_resized.png) @@ -71,22 +71,22 @@ where `{parameter}` is the name of the parameter whose data is to be retrieved, | **pvacd** | X | - | - | - | - | - | - | - | - | - | - | - | - | - | - | - | | **wqp** | X | X | X | X | X | X | X | X | X | X | X | X | X | X | X | X | -### Output -The `--output` option is required and used to set the output type: +### Output Type +The `--output-type` option is required and used to set the output type: ``` ---output summary +--output-type summary ``` - A summary table consisting of location information as well as summary statistics for the parameter of interest for every location that has observations. ``` ---output timeseries_unified +--output-type timeseries_unified ``` - A single table consisting of time series data for all locations for the parameter of interest. - A single table of site data that contains information such as latitude, longitude, and elevation ``` ---output timeseries_separated +--output-type timeseries_separated ``` - Separate time series tables for all locations for the parameter of interest. - A single table of site data that contains information such as latitude, longitude, and elevation @@ -146,6 +146,7 @@ A log of the inputs and processes, called `die.log`, is also saved to the output | formation | geologic formation in which the well terminates | string | N | | aquifer | aquifer from which the well draws water | string | N | | well_depth | depth of well | float | N | +| well_depth_units | units of well depth. Defaults to ft | string | N | **CABQ elevation is calculated as [elevation at top of casing] - [stickup height]; if stickup height < 0 the measuring point is assumed to be beneath the ground surface @@ -180,7 +181,7 @@ The Data Integration Engine enables the user to obtain groundwater level and gro - `--no-pvacd` to exclude Pecos Valley Artesian Convservancy District (PVACD) data - `--no-wqp` to exclude Water Quality Portal (WQP) data -### Geographic Filters +### Geographic Filters [In Development] The following flags can be used to geographically filter data: @@ -192,7 +193,11 @@ The following flags can be used to geographically filter data: -- bbox 'x1 y1, x2 y2' ``` -### Date Filters +``` +-- wkt {wkt polygon or multipolygon} +``` + +### Date Filters [In Development] The following flags can be used to filter by dates: @@ -214,12 +219,12 @@ die sources {parameter} to print the sources that report that parameter to the terminal. -### Wells [In Development] +### Sites Use ``` -die wells +die sites ``` -to print wells to the terminal. \ No newline at end of file +to export site information only \ No newline at end of file diff --git a/backend/__init__.py b/backend/__init__.py index 2034a38..804491c 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -1,7 +1,16 @@ from enum import Enum +from os import environ class OutputFormat(str, Enum): GEOJSON = "geojson" CSV = "csv" - GEOSERVER = "geoserver" \ No newline at end of file + GEOSERVER = "geoserver" + + +def get_bool_env_variable(var: str) -> bool: + env_var = environ.get(var, None) + if env_var is None or env_var.strip().lower() not in ["true", "1", "yes"]: + return False + else: + return True diff --git a/backend/config.py b/backend/config.py index 96d379d..86ef36a 100644 --- a/backend/config.py +++ b/backend/config.py @@ -18,6 +18,8 @@ from datetime import datetime, timedelta from enum import Enum import shapely.wkt +import yaml + from . import OutputFormat from .bounding_polygons import get_county_polygon from .connectors.nmbgmr.source import ( @@ -68,7 +70,8 @@ ) from .connectors.usgs.source import NWISSiteSource, NWISWaterLevelSource from .connectors.wqp.source import WQPSiteSource, WQPAnalyteSource, WQPWaterLevelSource -from .logger import Loggable +from backend.logger import Loggable + SOURCE_DICT = { "bernco": BernCoSiteSource, @@ -98,9 +101,6 @@ def get_source(source): return klass() - - - class Config(Loggable): site_limit: int = 0 dry: bool = False @@ -149,18 +149,14 @@ class Config(Loggable): analyte_output_units: str = MILLIGRAMS_PER_LITER waterlevel_output_units: str = FEET - # use_csv: bool = True - # use_geojson: bool = False - - output_format: OutputFormat = OutputFormat.CSV + output_format: str = OutputFormat.CSV - yes: bool = True + yes: bool = False def __init__(self, model=None, payload=None, path=None): # need to initialize logger super().__init__() - self.bbox = {} if path: payload = self._load_from_yaml(path) @@ -186,24 +182,27 @@ def __init__(self, model=None, payload=None, path=None): if value is not None: setattr(self, f"use_source_{sk}", value) - for attr in ("wkt", "county", "bbox", - "output_summary", - "output_timeseries_unified", - "output_timeseries_separated", - "start_date", - "end_date", - "parameter", - "output_name", - "dry", - "latest_water_level_only", - "output_format", - "use_cloud_storage", - "yes"): + for attr in ( + "wkt", + "county", + "bbox", + "output_summary", + "output_timeseries_unified", + "output_timeseries_separated", + "start_date", + "end_date", + "parameter", + "output_name", + "dry", + "latest_water_level_only", + "output_format", + "use_cloud_storage", + "yes", + ): if attr in payload: setattr(self, attr, payload[attr]) def _load_from_yaml(self, path): - import yaml path = os.path.abspath(path) if os.path.exists(path): self.log(f"Loading config from {path}") @@ -221,7 +220,6 @@ def get_config_and_false_agencies(self): "ebid", "nmbgmr_amp", "nmose_isc_seven_rivers", - "nmose_pod", "nmose_roswell", "nwis", "pvacd", @@ -450,7 +448,7 @@ def _report_attributes(title, attrs): "output_horizontal_datum", "output_elevation_units", "use_cloud_storage", - "output_format" + "output_format", ), ) @@ -573,4 +571,6 @@ def output_path(self): def get(self, attr): if self._payload: return self._payload.get(attr) + + # ============= EOF ============================================= diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 2ec5d10..d01cd11 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -15,8 +15,7 @@ # =============================================================================== import os -import httpx - +from backend import get_bool_env_variable from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.connectors.nmbgmr.transformer import ( NMBGMRSiteTransformer, @@ -48,13 +47,15 @@ def _make_url(endpoint): if os.getenv("DEBUG") == "1": - return f"http://localhost:8000/latest/{endpoint}" - return f"https://waterdata.nmt.edu/latest/{endpoint}" + url = f"http://localhost:8000/latest/{endpoint}" + else: + url = f"https://waterdata.nmt.edu/latest/{endpoint}" + return url class NMBGMRSiteSource(BaseSiteSource): transformer_klass = NMBGMRSiteTransformer - chunk_size = 10 + chunk_size = 100 bounding_polygon = NM_STATE_BOUNDING_POLYGON def __repr__(self): @@ -87,18 +88,23 @@ def get_records(self): ) if not config.sites_only: for site in sites: - # print(f"Obtaining well data for {site['properties']['point_id']}") - # well_data = self._execute_json_request( - # _make_url("wells"), - # params={"pointid": site["properties"]["point_id"]}, - # tag="", - # ) - # site["properties"]["formation"] = well_data["formation"] - # site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] - # site["properties"]["well_depth_units"] = FEET - site["properties"]["formation"] = None - site["properties"]["well_depth"] = None - site["properties"]["well_depth_units"] = FEET + if get_bool_env_variable("IS_TESTING_ENV"): + print( + f"Skipping well data for {site['properties']['point_id']} for testing (until well data can be retrieved in batches)" + ) + site["properties"]["formation"] = None + site["properties"]["well_depth"] = None + site["properties"]["well_depth_units"] = FEET + else: + print(f"Obtaining well data for {site['properties']['point_id']}") + well_data = self._execute_json_request( + _make_url("wells"), + params={"pointid": site["properties"]["point_id"]}, + tag="", + ) + site["properties"]["formation"] = well_data["formation"] + site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] + site["properties"]["well_depth_units"] = FEET return sites @@ -170,7 +176,11 @@ def __repr__(self): def _clean_records(self, records): # remove records with no depth to water value - return [r for r in records if r["DepthToWaterBGS"] is not None] + return [ + r + for r in records + if r["DepthToWaterBGS"] is not None and r["DateMeasured"] is not None + ] def _extract_parameter_record(self, record, *args, **kw): record[PARAMETER_NAME] = DTW @@ -197,7 +207,7 @@ def _extract_source_parameter_results(self, records): return [r["DepthToWaterBGS"] for r in records] def _extract_site_records(self, records, site_record): - return [ri for ri in records if ri["Well"]["PointID"] == site_record.id] + return [ri for ri in records if ri["PointID"] == site_record.id] def _extract_source_parameter_names(self, records): return ["DepthToWaterBGS" for r in records] @@ -214,7 +224,19 @@ def get_records(self, site_record): # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") - return self._execute_json_request(url, params) + paginated_records = self._execute_json_request(url, params, tag="") + items = paginated_records["items"] + page = paginated_records["page"] + pages = paginated_records["pages"] + + while page < pages: + page += 1 + params["page"] = page + new_records = self._execute_json_request(url, params, tag="") + items.extend(new_records["items"]) + pages = new_records["pages"] + + return items # ============= EOF ============================================= diff --git a/backend/connectors/nmose/source.py b/backend/connectors/nmose/source.py index ad180ce..5def1bf 100644 --- a/backend/connectors/nmose/source.py +++ b/backend/connectors/nmose/source.py @@ -41,9 +41,11 @@ def get_records(self, *args, **kw) -> List[Dict]: "https://services2.arcgis.com/qXZbWTdPDbTjl7Dy/arcgis/rest/services/OSE_PODs/FeatureServer/0/query" ) - params['where'] = "pod_status = 'ACT' AND pod_basin NOT IN ('SP', 'SD', 'LWD')" - params["outFields"] = ("OBJECTID,pod_basin,pod_status,easting,northing,datum,utm_accura,status,county" - "pod_name,pod_nbr,pod_suffix,pod_file,depth_well,aquifer,elevation") + params["where"] = "pod_status = 'ACT' AND pod_basin NOT IN ('SP', 'SD', 'LWD')" + params["outFields"] = ( + "OBJECTID,pod_basin,pod_status,easting,northing,datum,utm_accura,status,county" + "pod_name,pod_nbr,pod_suffix,pod_file,depth_well,aquifer,elevation" + ) params["outSR"] = 4326 params["f"] = "json" diff --git a/backend/connectors/nmose/transformer.py b/backend/connectors/nmose/transformer.py index e3a2b64..8f26ebb 100644 --- a/backend/connectors/nmose/transformer.py +++ b/backend/connectors/nmose/transformer.py @@ -24,7 +24,7 @@ def _transform(self, record) -> dict: # "name": record["station_nm"], "latitude": geometry["y"], "longitude": geometry["x"], - "elevation": properties['elevation'], + "elevation": properties["elevation"], "elevation_units": "ft", # "horizontal_datum": datum, # "vertical_datum": record["alt_datum_cd"], diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 25e4e87..cac4f2a 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -76,11 +76,12 @@ def parse_json(data): for location in data["timeSeries"]: site_code = location["sourceInfo"]["siteCode"][0]["value"] + agency = location["sourceInfo"]["siteCode"][0]["agencyCode"] source_parameter_name = location["variable"]["variableName"] source_parameter_units = location["variable"]["unit"]["unitCode"] for value in location["values"][0]["value"]: record = { - "site_code": site_code, + "site_id": f"{agency}-{site_code}", "source_parameter_name": source_parameter_name, "value": value["value"], "datetime_measured": value["dateTime"], @@ -150,12 +151,16 @@ def __repr__(self): return "NWISWaterLevelSource" def get_records(self, site_record): + # query sites with the agency, which need to be in the form of "{agency}:{site number}" + sites = make_site_list(site_record) + sites_with_colons = [s.replace("-", ":") for s in sites] + params = { "format": "json", "siteType": "GW", "siteStatus": "all", "parameterCd": "72019", - "sites": ",".join(make_site_list(site_record)), + "sites": ",".join(sites_with_colons), } config = self.config @@ -178,7 +183,7 @@ def get_records(self, site_record): return records def _extract_site_records(self, records, site_record): - return [ri for ri in records if ri["site_code"] == site_record.id] + return [ri for ri in records if ri["site_id"] == site_record.id] def _clean_records(self, records): return [ diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 1f61cf5..379b8bd 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -32,9 +32,13 @@ def _transform(self, record): # if not self.contained(lng, lat): # return + agency = record["agency_cd"] + site_no = record["site_no"] + site_id = f"{agency}-{site_no}" + rec = { "source": "USGS-NWIS", - "id": record["site_no"], + "id": site_id, "name": record["station_nm"], "latitude": lat, "longitude": lng, diff --git a/backend/persister.py b/backend/persister.py index d20e1ed..bf3d11c 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -17,10 +17,13 @@ import io import os import shutil +from pprint import pprint +import json import pandas as pd import geopandas as gpd import psycopg2 +from shapely import Point from backend import OutputFormat from backend.logger import Loggable @@ -32,15 +35,70 @@ print("google cloud storage not available") +def write_memory(func, records, output_format=None): + f = io.BytesIO() + func(f, records, output_format) + return f.getvalue() + + +def dump_timeseries(path, timeseries: list[list]): + """ + Dumps timeseries records to a CSV file. The timeseries must be a list of + lists, where each inner list contains the records for a single site. In the case + of timeseries separated, the inner list will contain the records for a single site + and this function will be called multiple times, once for each site. + """ + with open(path, "w", newline="") as f: + writer = csv.writer(f) + headers_have_not_been_written = True + for i, records in enumerate(timeseries): + for record in records: + if i == 0 and headers_have_not_been_written: + writer.writerow(record.keys) + headers_have_not_been_written = False + writer.writerow(record.to_row()) + + +def dump_sites_summary(path, records, output_format: OutputFormat): + if output_format == OutputFormat.CSV: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + for i, site in enumerate(records): + if i == 0: + writer.writerow(site.keys) + writer.writerow(site.to_row()) + else: + features = [ + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [ + getattr(record, "longitude"), + getattr(record, "latitude"), + getattr(record, "elevation"), + ], + }, + "properties": { + k: getattr(record, k) + for k in record.keys + if k not in ["latitude", "longitude", "elevation"] + }, + } + for record in records + ] + feature_collection = {"type": "FeatureCollection", "features": features} + + with open(path, "w") as f: + json.dump(feature_collection, f, indent=4) + + class BasePersister(Loggable): """ Class to persist the data to a file or cloud storage. If persisting to a file, the output directory is created by config._make_output_path() """ - extension: str - # output_id: str - def __init__(self, config=None): self.records = [] self.timeseries = [] @@ -59,27 +117,27 @@ def finalize(self, output_name: str): def dump_sites(self, path: str): if self.sites: path = os.path.join(path, "sites") - path = self.add_extension(path) + path = self.add_extension(path, self.config.output_format) self.log(f"dumping sites to {os.path.abspath(path)}") - self._write(path, self.sites) + self._dump_sites_summary(path, self.sites, self.config.output_format) else: self.log("no sites to dump", fg="red") def dump_summary(self, path: str): if self.records: path = os.path.join(path, "summary") - path = self.add_extension(path) + path = self.add_extension(path, self.config.output_format) self.log(f"dumping summary to {os.path.abspath(path)}") - self._write(path, self.records) + self._dump_sites_summary(path, self.records, self.config.output_format) else: self.log("no records to dump", fg="red") def dump_timeseries_unified(self, path: str): if self.timeseries: path = os.path.join(path, "timeseries_unified") - path = self.add_extension(path) + path = self.add_extension(path, OutputFormat.CSV) self.log(f"dumping unified timeseries to {os.path.abspath(path)}") - self._dump_timeseries_unified(path, self.timeseries) + self._dump_timeseries(path, self.timeseries) else: self.log("no timeseries records to dump", fg="red") @@ -89,89 +147,39 @@ def dump_timeseries_separated(self, path: str): # the individual site timeseries will be dumped timeseries_path = os.path.join(path, "timeseries") self._make_output_directory(timeseries_path) - for site, records in self.timeseries: - path = os.path.join(timeseries_path, str(site.id).replace(" ", "_")) - path = self.add_extension(path) - self.log(f"dumping {site.id} to {os.path.abspath(path)}") - self._write(path, records) + for records in self.timeseries: + site_id = records[0].id + path = os.path.join(timeseries_path, str(site_id).replace(" ", "_")) + path = self.add_extension(path, OutputFormat.CSV) + self.log(f"dumping {site_id} to {os.path.abspath(path)}") + + list_of_records = [records] + self._dump_timeseries(path, list_of_records) else: self.log("no timeseries records to dump", fg="red") - def save(self, path: str): - if self.records: - path = self.add_extension(path) - self.log(f"saving to {path}") - self._write(path, self.records) - else: - self.log("no records to save", fg="red") - - def add_extension(self, path: str): - if not self.extension: + def add_extension(self, path: str, extension: OutputFormat): + if not extension: raise NotImplementedError - - ext = self.extension - if self.config.output_format == OutputFormat.CSV: - ext = "csv" - elif self.config.output_format == OutputFormat.GEOJSON: - ext = "geojson" + else: + ext = extension if not path.endswith(ext): path = f"{path}.{ext}" return path - def _write(self, path: str, records): - raise NotImplementedError + def _dump_sites_summary( + self, path: str, records: list, output_format: OutputFormat + ): + dump_sites_summary(path, records, output_format) - def _dump_timeseries_unified(self, path: str, timeseries: list): - raise NotImplementedError + def _dump_timeseries(self, path: str, timeseries: list): + dump_timeseries(path, timeseries) def _make_output_directory(self, output_directory: str): os.mkdir(output_directory) -def write_file(path, func, records): - with open(path, "w", newline="") as f: - func(csv.writer(f), records) - - -def write_memory(func, records, output_format=None): - f = io.BytesIO() - func(f, records, output_format) - return f.getvalue() - - -def dump_timeseries_unified(writer, timeseries): - headers_have_not_been_written = True - for i, (site, records) in enumerate(timeseries): - for j, record in enumerate(records): - if i == 0 and headers_have_not_been_written: - writer.writerow(record.keys) - headers_have_not_been_written = False - writer.writerow(record.to_row()) - - -def dump_sites(filehandle, records, output_format): - if output_format == OutputFormat.CSV: - writer = csv.writer(filehandle) - for i, site in enumerate(records): - if i == 0: - writer.writerow(site.keys) - writer.writerow(site.to_row()) - else: - r0 = records[0] - df = pd.DataFrame([r.to_row() for r in records], columns=r0.keys) - - gdf = gpd.GeoDataFrame( - df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326" - ) - gdf.to_file(filehandle, driver="GeoJSON") - - - - - - - class CloudStoragePersister(BasePersister): extension = "csv" _content: list @@ -203,50 +211,36 @@ def finalize(self, output_name: str): else: path, cnt = self._content[0] - #this is a hack. need a better way to specify the output path + # this is a hack. need a better way to specify the output path dirname = os.path.basename(os.path.dirname(path)) path = os.path.join(dirname, os.path.basename(path)) blob = bucket.blob(path) - blob.upload_from_string(cnt, content_type="application/json" if self.config.output_format == OutputFormat.GEOJSON else "text/csv") + blob.upload_from_string( + cnt, + content_type=( + "application/json" + if self.config.output_format == OutputFormat.GEOJSON + else "text/csv" + ), + ) def _make_output_directory(self, output_directory: str): # prevent making root directory, because we are not saving to disk pass - def _write(self, path: str, records: list): - content = write_memory(dump_sites, records, self.config.output_format) - self._add_content(path, content) - def _add_content(self, path: str, content: str): self._content.append((path, content)) - def _dump_timeseries_unified(self, path: str, timeseries: list): - content = write_memory(dump_timeseries_unified, timeseries) + def _dump_sites_summary( + self, path: str, records: list, output_format: OutputFormat + ): + content = write_memory(dump_sites_summary, records, output_format) self._add_content(path, content) - -class CSVPersister(BasePersister): - extension = "csv" - - def _write(self, path: str, records: list): - write_file(path, dump_sites, records) - def _dump_timeseries_unified(self, path: str, timeseries: list): - write_file(path, dump_timeseries_unified, timeseries) - - -class GeoJSONPersister(BasePersister): - extension = "geojson" - - def _write(self, path: str, records: list): - r0 = records[0] - df = pd.DataFrame([r.to_row() for r in records], columns=r0.keys) - - gdf = gpd.GeoDataFrame( - df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326" - ) - gdf.to_file(path, driver="GeoJSON") + content = write_memory(path, dump_timeseries, timeseries) + self._add_content(path, content) # class ST2Persister(BasePersister): diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index a6a38de..4461246 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -9,24 +9,32 @@ import os import time from itertools import groupby - +from typing import Type import psycopg2 from shapely.geometry.multipoint import MultiPoint from shapely.geometry.point import Point from sqlalchemy.dialects.postgresql import JSONB, insert -from sqlalchemy.orm import declarative_base, sessionmaker, relationship +from sqlalchemy.orm import declarative_base, sessionmaker, relationship, Mapped + from backend.persister import BasePersister -from sqlalchemy import Column, ForeignKey, create_engine, UUID, String, Integer, Float, Date, Time +from sqlalchemy import ( + Column, + ForeignKey, + create_engine, + UUID, + String, + Integer, + Float, + Date, + Time, +) from geoalchemy2 import Geometry Base = declarative_base() -# dbname=db.get('dbname'), -# user=db.get('user'), -# password=db.get('password'), -# host=db.get('host'), -# port=db.get('port'), + + def session_factory(connection: dict): user = connection.get("user", "postgres") password = connection.get("password", "") @@ -51,7 +59,9 @@ class Location(Base): geometry = Column(Geometry(geometry_type="POINT", srid=4326)) source_slug = Column(String, ForeignKey("tbl_sources.name")) - source = relationship("Sources", backref="locations") + source: Mapped["Sources"] = relationship( + "Sources", backref="locations", uselist=False + ) class Summary(Base): @@ -66,7 +76,9 @@ class Summary(Base): source_slug = Column(String, ForeignKey("tbl_sources.name")) parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) - source = relationship("Sources", backref="summaries") + source: Mapped["Sources"] = relationship( + "Sources", backref="summaries", uselist=False + ) value = Column(Float) nrecords = Column(Integer) @@ -104,8 +116,8 @@ def __init__(self, *args, **kwargs): def dump_sites(self, path: str): if self.sites: - db = self.config.get('geoserver').get('db') - dbname = db.get('db_name') + db = self.config.get("geoserver").get("db") + dbname = db.get("db_name") self.log(f"dumping sites to {dbname}") self._write_to_sites(self.sites) else: @@ -113,8 +125,8 @@ def dump_sites(self, path: str): def dump_summary(self, path: str): if self.records: - db = self.config.get('geoserver').get('db') - dbname = db.get('db_name') + db = self.config.get("geoserver").get("db") + dbname = db.get("db_name") self.log(f"dumping summary to {dbname}") self._write_to_summary(self.records) else: @@ -124,38 +136,51 @@ def _connect(self): """ Connect to a PostgreSQL database on Cloud SQL. """ - sf = session_factory(self.config.get('geoserver').get('db')) + sf = session_factory(self.config.get("geoserver").get("db")) self._connection = sf() def _write_sources(self, records: list): sources = {r.source for r in records} with self._connection as conn: - sql = insert(Sources).values([{"name": source} for source in sources]).on_conflict_do_nothing( - index_elements=[Sources.name],) + sql = ( + insert(Sources) + .values([{"name": source} for source in sources]) + .on_conflict_do_nothing( + index_elements=[Sources.name], + ) + ) conn.execute(sql) conn.commit() def _write_sources_with_convex_hull(self, records: list): # sources = {r.source for r in records} with self._connection as conn: + def key(r): return str(r.source) records = sorted(records, key=key) for source_name, group in groupby(records, key=key): - group = list(group) + source_records = list(group) # calculate convex hull for the source from the records # Create a MultiPoint object - points = MultiPoint([Point(record.longitude, record.latitude) for record in group]) + points = MultiPoint( + [ + Point(record.longitude, record.latitude) + for record in source_records + ] + ) # Calculate the convex hull sinsert = insert(Sources) print("Writing source", source_name, points.convex_hull) - sql = sinsert.values([{"name": source_name, - "convex_hull": points.convex_hull.wkt}]).on_conflict_do_update( + sql = sinsert.values( + [{"name": source_name, "convex_hull": points.convex_hull.wkt}] + ).on_conflict_do_update( index_elements=[Sources.name], - set_={"convex_hull": sinsert.excluded.convex_hull}) + set_={"convex_hull": sinsert.excluded.convex_hull}, + ) # sql = insert(Sources).values([{"name": source,} for source in sources]).on_conflict_do_nothing( # index_elements=[Sources.name],) conn.execute(sql) @@ -163,9 +188,20 @@ def key(r): def _write_parameters(self): with self._connection as conn: - sql = insert(Parameters).values([{"name": self.config.parameter, - "units": self.config.analyte_output_units}]).on_conflict_do_nothing( - index_elements=[Parameters.name],) + sql = ( + insert(Parameters) + .values( + [ + { + "name": self.config.parameter, + "units": self.config.analyte_output_units, + } + ] + ) + .on_conflict_do_nothing( + index_elements=[Parameters.name], + ) + ) print(sql) conn.execute(sql) conn.commit() @@ -175,7 +211,14 @@ def _write_to_summary(self, records: list): self._write_parameters() for r in records: print(r, [r.to_dict()]) - keys = ["usgs_site_id", "alternate_site_id", "formation", "aquifer", "well_depth"] + keys = [ + "usgs_site_id", + "alternate_site_id", + "formation", + "aquifer", + "well_depth", + ] + def make_stmt(chunk): values = [ { @@ -194,7 +237,9 @@ def make_stmt(chunk): "latest_time": record.latest_time if record.latest_time else None, "earliest_value": record.earliest_value, "earliest_date": record.earliest_date, - "earliest_time": record.earliest_time if record.earliest_time else None, + "earliest_time": ( + record.earliest_time if record.earliest_time else None + ), } for record in chunk ] @@ -202,15 +247,17 @@ def make_stmt(chunk): linsert = insert(Summary) return linsert.values(values).on_conflict_do_update( index_elements=[Summary.data_source_uid], - set_={"properties": linsert.excluded.properties} + set_={"properties": linsert.excluded.properties}, ) self._chunk_insert(make_stmt, records) def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10): for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") + chunk = records[i : i + chunk_size] + print( + f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}" + ) st = time.time() stmt = make_stmt(chunk) @@ -218,7 +265,7 @@ def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10): conn.execute(stmt) conn.commit() - print('Chunk write time:', time.time() - st) + print("Chunk write time:", time.time() - st) def _write_to_sites(self, records: list): """ @@ -227,7 +274,13 @@ def _write_to_sites(self, records: list): self._write_sources_with_convex_hull(records) - keys = ["usgs_site_id", "alternate_site_id", "formation", "aquifer", "well_depth"] + keys = [ + "usgs_site_id", + "alternate_site_id", + "formation", + "aquifer", + "well_depth", + ] chunk_size = 1000 # Larger chunk size for fewer commits def make_stmt(chunk): @@ -244,7 +297,7 @@ def make_stmt(chunk): linsert = insert(Location) stmt = linsert.values(values).on_conflict_do_update( index_elements=[Location.data_source_uid], - set_={"properties": linsert.excluded.properties} + set_={"properties": linsert.excluded.properties}, ) return stmt @@ -297,11 +350,11 @@ def make_stmt(chunk): # # print('Chunk write time:', time.time() - st) - # # Pre-serialize properties to reduce processing time - # values = [ - # (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source) - # for record in chunk - # ] + # # Pre-serialize properties to reduce processing time + # values = [ + # (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source) + # for record in chunk + # ] # # with self._connection.cursor() as cursor: # sql = """INSERT INTO public.tbl_location (name, properties, geometry, source_slug) @@ -312,4 +365,6 @@ def make_stmt(chunk): # self._connection.commit() # Commit once per chunk # print('Chunk write time:', time.time() - st) # break + + # ============= EOF ============================================= diff --git a/backend/record.py b/backend/record.py index a14f549..ac8a9f9 100644 --- a/backend/record.py +++ b/backend/record.py @@ -169,6 +169,7 @@ class SiteRecord(BaseRecord): "formation", "aquifer", "well_depth", + "well_depth_units", ) defaults: dict = { @@ -186,6 +187,7 @@ class SiteRecord(BaseRecord): "formation": "", "aquifer": "", "well_depth": None, + "well_depth_units": FEET, } diff --git a/backend/unifier.py b/backend/unifier.py index 143f967..b070631 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -18,7 +18,7 @@ from backend.config import Config, get_source, OutputFormat from backend.logger import setup_logging from backend.constants import WATERLEVELS -from backend.persister import CSVPersister, GeoJSONPersister, CloudStoragePersister +from backend.persister import BasePersister from backend.persisters.geoserver import GeoServerPersister from backend.source import BaseSiteSource @@ -79,36 +79,36 @@ def unify_sites(config): return True -def _perister_factory(config): - """ - Determines the type of persister to use based on the configuration. The - persister types are: +# def _perister_factory(config): +# """ +# Determines the type of persister to use based on the configuration. The +# persister types are: - - CSVPersister - - CloudStoragePersister - - GeoJSONPersister +# - CSVPersister +# - CloudStoragePersister +# - GeoJSONPersister - Parameters - ------- - config: Config - The configuration object +# Parameters +# ------- +# config: Config +# The configuration object - Returns - ------- - Persister - The persister object to use - """ - persister_klass = CSVPersister - if config.use_cloud_storage: - persister_klass = CloudStoragePersister - elif config.output_format == OutputFormat.CSV: - persister_klass = CSVPersister - elif config.output_format == OutputFormat.GEOJSON: - persister_klass = GeoJSONPersister - elif config.output_format == OutputFormat.GEOSERVER: - persister_klass = GeoServerPersister +# Returns +# ------- +# Persister +# The persister object to use +# """ +# persister_klass = CSVPersister +# if config.use_cloud_storage: +# persister_klass = CloudStoragePersister +# elif config.output_format == OutputFormat.CSV: +# persister_klass = CSVPersister +# elif config.output_format == OutputFormat.GEOJSON: +# persister_klass = GeoJSONPersister +# elif config.output_format == OutputFormat.GEOSERVER: +# persister_klass = GeoServerPersister - return persister_klass(config) +# return persister_klass(config) # def _unify_wrapper(config, func): @@ -179,27 +179,13 @@ def _site_wrapper(site_source, parameter_source, persister, config): sites_with_records_count += len(results) for site, records in results: - persister.timeseries.append((site, records)) + persister.timeseries.append(records) persister.sites.append(site) if site_limit: - # print( - # "sites_with_records_count:", - # sites_with_records_count, - # "|", - # "site_limit:", - # site_limit, - # "|", - # "chunk_size:", - # site_source.chunk_size, - # ) - if sites_with_records_count >= site_limit: # remove any extra sites that were gathered. removes 0 if site_limit is not exceeded num_sites_to_remove = sites_with_records_count - site_limit - # print( - # f"removing {num_sites_to_remove} to avoid exceeding the site limit" - # ) # if sites_with_records_count == sit_limit then num_sites_to_remove = 0 # and calling list[:0] will retur an empty list, so subtract @@ -230,9 +216,19 @@ def _unify_parameter( config, sources, ): - persister = _perister_factory(config) + + if config.output_format == OutputFormat.GEOSERVER: + persister = GeoServerPersister(config) + else: + persister = BasePersister(config) + for site_source, parameter_source in sources: - _site_wrapper(site_source, parameter_source, persister, config) + _site_wrapper( + site_source, + parameter_source, + persister, + config, + ) if config.output_summary: persister.dump_summary(config.output_path) @@ -375,17 +371,17 @@ def get_datastreams(): print(si, si.id, ds["@iot.id"]) -if __name__ == "__main__": - # test_waterlevel_unification() - # root = logging.getLogger() - # root.setLevel(logging.DEBUG) - # shandler = logging.StreamHandler() - # get_sources(Config()) - setup_logging() - site_unification_test() - # waterlevel_unification_test() - # analyte_unification_test() - # print(health_check("nwis")) - # generate_site_bounds() +# if __name__ == "__main__": +# test_waterlevel_unification() +# root = logging.getLogger() +# root.setLevel(logging.DEBUG) +# shandler = logging.StreamHandler() +# get_sources(Config()) +# setup_logging() +# site_unification_test() +# waterlevel_unification_test() +# analyte_unification_test() +# print(health_check("nwis")) +# generate_site_bounds() # ============= EOF ============================================= diff --git a/frontend/cli.py b/frontend/cli.py index 68d8d4a..879e5d3 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -17,6 +17,7 @@ import click +from backend import OutputFormat from backend.config import Config from backend.constants import PARAMETER_OPTIONS from backend.unifier import unify_sites, unify_waterlevels, unify_analytes @@ -169,33 +170,16 @@ def cli(): help="End date in the form 'YYYY', 'YYYY-MM', 'YYYY-MM-DD', 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'", ), ] - -TIMESERIES_OPTIONS = [ - click.option( - "--separated_timeseries", - is_flag=True, - default=False, - show_default=True, - help="Output separate timeseries files for every site", - ), - click.option( - "--unified_timeseries", - is_flag=True, - default=False, - show_default=True, - help="Output single timeseries file, which includes all sites", - ), -] - -OUTPUT_OPTIONS = [ +OUTPUT_TYPE_OPTIONS = [ click.option( - "--output", + "--output-type", type=click.Choice(["summary", "timeseries_unified", "timeseries_separated"]), required=True, help="Output summary file, single unified timeseries file, or separated timeseries files", ), ] -PERSISTER_OPTIONS = [ + +OUTPUT_DIR_OPTIONS = [ click.option( "--output-dir", default=".", @@ -203,7 +187,17 @@ def cli(): ) ] -CONFIG_OPTIONS = [ +OUTPUT_FORMATS = sorted([value for value in OutputFormat]) +OUTPUT_FORMAT_OPTIONS = [ + click.option( + "--output-format", + type=click.Choice(OUTPUT_FORMATS), + default="csv", + help=f"Output file format for sites: {OUTPUT_FORMATS}. Default is csv", + ) +] + +CONFIG_PATH_OPTIONS = [ click.option( "--config-path", type=click.Path(exists=True), @@ -212,6 +206,7 @@ def cli(): ), ] + def add_options(options): def _add_options(func): for option in reversed(options): @@ -227,61 +222,73 @@ def _add_options(func): type=click.Choice(PARAMETER_OPTIONS, case_sensitive=False), required=True, ) -@add_options(CONFIG_OPTIONS) -@add_options(OUTPUT_OPTIONS) -@add_options(PERSISTER_OPTIONS) +@add_options(CONFIG_PATH_OPTIONS) +@add_options(OUTPUT_TYPE_OPTIONS) +@add_options(OUTPUT_DIR_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) +@add_options(OUTPUT_FORMAT_OPTIONS) def weave( - parameter, - config_path, - output, - output_dir, - start_date, - end_date, - bbox, - county, - wkt, - no_bernco, - no_bor, - no_cabq, - no_ebid, - no_nmbgmr_amp, - no_nmed_dwb, - no_nmose_isc_seven_rivers, - no_nmose_pod, - no_nmose_roswell, - no_nwis, - no_pvacd, - no_wqp, - site_limit, - dry, - yes): + parameter, + config_path, + output_type, + output_dir, + start_date, + end_date, + bbox, + county, + wkt, + no_bernco, + no_bor, + no_cabq, + no_ebid, + no_nmbgmr_amp, + no_nmed_dwb, + no_nmose_isc_seven_rivers, + no_nmose_pod, + no_nmose_roswell, + no_nwis, + no_pvacd, + no_wqp, + site_limit, + dry, + yes, + output_format, +): """ Get parameter timeseries or summary data """ # instantiate config and set up parameter - config = setup_config(parameter, config_path, bbox, county, wkt, site_limit, dry) - + config = setup_config( + tag=parameter, + config_path=config_path, + bbox=bbox, + county=county, + wkt=wkt, + site_limit=site_limit, + dry=dry, + output_format=output_format, + ) + config.parameter = parameter # output type - if output == "summary": + if output_type == "summary": summary = True timeseries_unified = False timeseries_separated = False - elif output == "timeseries_unified": + elif output_type == "timeseries_unified": summary = False timeseries_unified = True timeseries_separated = False - elif output == "timeseries_separated": + elif output_type == "timeseries_separated": summary = False timeseries_unified = False timeseries_separated = True else: - click.echo(f"Invalid output type: {output}") + click.echo(f"Invalid output type: {output_type}") return config.output_summary = summary @@ -297,7 +304,7 @@ def weave( lcs = locals() if config_agencies: for agency in config_agencies: - setattr(config, f"use_source_{agency}", lcs.get(f'no_{agency}', False)) + setattr(config, f"use_source_{agency}", lcs.get(f"no_{agency}", False)) # dates config.start_date = start_date config.end_date = end_date @@ -313,50 +320,68 @@ def weave( if not click.confirm("Do you want to continue?", default=True): return - if parameter.lower() == "waterlevels": - unify_waterlevels(config) - else: - unify_analytes(config) + if parameter.lower() == "waterlevels": + unify_waterlevels(config) + else: + unify_analytes(config) + return config - @cli.command() -@add_options(CONFIG_OPTIONS) +@add_options(CONFIG_PATH_OPTIONS) @add_options(SPATIAL_OPTIONS) -@add_options(PERSISTER_OPTIONS) +@add_options(OUTPUT_DIR_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) -def sites(config_path, - bbox, county, wkt, - output_dir, - no_bernco, - no_bor, - no_cabq, - no_ebid, - no_nmbgmr_amp, - no_nmed_dwb, - no_nmose_isc_seven_rivers, - no_nmose_pod, - no_nmose_roswell, - no_nwis, - no_pvacd, - no_wqp, - site_limit, - dry, - yes): +@add_options(OUTPUT_FORMAT_OPTIONS) +def sites( + config_path, + bbox, + county, + wkt, + output_dir, + no_bernco, + no_bor, + no_cabq, + no_ebid, + no_nmbgmr_amp, + no_nmed_dwb, + no_nmose_isc_seven_rivers, + no_nmose_pod, + no_nmose_roswell, + no_nwis, + no_pvacd, + no_wqp, + site_limit, + dry, + yes, + output_format, +): """ Get sites """ - - config = setup_config("sites", config_path, bbox, county, wkt, site_limit, dry) - config_agencies = ["bernco", "bor", "cabq", "ebid", "nmbgmr_amp", "nmed_dwb", - "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", - "wqp", "nmose_pod"] + config = setup_config( + "sites", config_path, bbox, county, wkt, site_limit, dry, output_format + ) + config_agencies = [ + "bernco", + "bor", + "cabq", + "ebid", + "nmbgmr_amp", + "nmed_dwb", + "nmose_isc_seven_rivers", + "nmose_roswell", + "nwis", + "pvacd", + "wqp", + "nmose_pod", + ] if config_path is None: lcs = locals() for agency in config_agencies: - setattr(config, f"use_source_{agency}", lcs.get(f'no_{agency}', False)) + setattr(config, f"use_source_{agency}", lcs.get(f"no_{agency}", False)) config.output_dir = output_dir config.sites_only = True @@ -406,7 +431,16 @@ def sources(sources, bbox, wkt, county): click.echo(s) -def setup_config(tag, config_path, bbox, county, wkt, site_limit, dry): +def setup_config( + tag, + config_path, + bbox, + county, + wkt, + site_limit, + dry, + output_format=OutputFormat.CSV, +): config = Config(path=config_path) if county: @@ -426,6 +460,8 @@ def setup_config(tag, config_path, bbox, county, wkt, site_limit, dry): config.site_limit = None config.dry = dry + config.output_format = output_format.value + return config diff --git a/mypy.ini b/mypy.ini index 380b366..4904098 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,3 +1,4 @@ [mypy] ignore_missing_imports = True -exclude = ^(venv|.github|.mypy_cache|.pytest_cache|nmuwd.egg-info|__pycache__|build|tests/archived) \ No newline at end of file +exclude = ^(venv|.github|.mypy_cache|.pytest_cache|nmuwd.egg-info|__pycache__|build|tests/archived) +plugins = sqlalchemy.ext.mypy.plugin diff --git a/requirements.txt b/requirements.txt index 4e9f7c5..648458d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,15 @@ flask +frost_sta_client +Geoalchemy2 +geopandas +google-cloud-storage gunicorn httpx +mypy pandas -geopandas -frost_sta_client -google-cloud-storage +psycopg2 pytest +pyyaml +sqlalchemy[mypy] +types-pyyaml urllib3>=2.2.0,<3.0.0 \ No newline at end of file diff --git a/setup.py b/setup.py index f06990d..abfc56f 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( name="nmuwd", - version="0.8.2", + version="0.9.1", author="Jake Ross", description="New Mexico Water Data Integration Engine", long_description=long_description, diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 84923b8..4d342ae 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -49,7 +49,8 @@ def setup(self): def _test_weave( self, parameter: str, - output: str, + output_type: str, + output_format: str = "csv", site_limit: int = 4, start_date: str = "1990-08-10", end_date: str = "1990-08-11", @@ -81,8 +82,8 @@ def _test_weave( arguments = [ parameter, - "--output", - output, + "--output-type", + output_type, "--dry", "--site-limit", str(site_limit), @@ -90,6 +91,8 @@ def _test_weave( start_date, "--end-date", end_date, + "--output-format", + output_format, ] if geographic_filter_name and geographic_filter_value: @@ -115,6 +118,7 @@ def _test_weave( 6. The start date is set correctly 7. The end date is set correctly 8. The geographic filter is set correctly + 9. The site output type is set correctly """ config = result.return_value @@ -137,11 +141,11 @@ def _test_weave( # 3 output_types = ["summary", "timeseries_unified", "timeseries_separated"] - for output_type in output_types: - if output_type == output: - assert getattr(config, f"output_{output_type}") is True + for ot in output_types: + if ot == output_type: + assert getattr(config, f"output_{ot}") is True else: - assert getattr(config, f"output_{output_type}") is False + assert getattr(config, f"output_{ot}") is False # 4 assert getattr(config, "site_limit") == 4 @@ -166,74 +170,89 @@ def _test_weave( else: assert getattr(config, _geographic_filter_name) == "" + # 9 + assert getattr(config, "output_format") == output_format + def test_weave_summary(self): - self._test_weave(parameter=WATERLEVELS, output="summary") + self._test_weave(parameter=WATERLEVELS, output_type="summary") def test_weave_timeseries_unified(self): - self._test_weave(parameter=WATERLEVELS, output="timeseries_unified") + self._test_weave(parameter=WATERLEVELS, output_type="timeseries_unified") def test_weave_timeseries_separated(self): - self._test_weave(parameter=WATERLEVELS, output="timeseries_separated") + self._test_weave(parameter=WATERLEVELS, output_type="timeseries_separated") + + def test_weave_csv(self): + self._test_weave( + parameter=WATERLEVELS, output_type="summary", output_format="csv" + ) + + def test_weave_geojson(self): + self._test_weave( + parameter=WATERLEVELS, output_type="summary", output_format="geojson" + ) def test_weave_bbox(self): self._test_weave( - parameter=WATERLEVELS, output="summary", bbox="32.0,-106.0,36.0,-102.0" + parameter=WATERLEVELS, output_type="summary", bbox="32.0,-106.0,36.0,-102.0" ) def test_weave_county(self): - self._test_weave(parameter=WATERLEVELS, output="summary", county="Bernalillo") + self._test_weave( + parameter=WATERLEVELS, output_type="summary", county="Bernalillo" + ) def test_weave_wkt(self): self._test_weave( parameter=WATERLEVELS, - output="summary", + output_type="summary", wkt="POLYGON((-106.0 32.0, -102.0 32.0, -102.0 36.0, -106.0 36.0, -106.0 32.0))", ) def test_weave_waterlevels(self): - self._test_weave(parameter=WATERLEVELS, output="summary") + self._test_weave(parameter=WATERLEVELS, output_type="summary") def test_weave_arsenic(self): - self._test_weave(parameter=ARSENIC, output="summary") + self._test_weave(parameter=ARSENIC, output_type="summary") def test_weave_bicarbonate(self): - self._test_weave(parameter=BICARBONATE, output="summary") + self._test_weave(parameter=BICARBONATE, output_type="summary") def test_weave_calcium(self): - self._test_weave(parameter=CALCIUM, output="summary") + self._test_weave(parameter=CALCIUM, output_type="summary") def test_weave_carbonate(self): - self._test_weave(parameter=CARBONATE, output="summary") + self._test_weave(parameter=CARBONATE, output_type="summary") def test_weave_chloride(self): - self._test_weave(parameter=CHLORIDE, output="summary") + self._test_weave(parameter=CHLORIDE, output_type="summary") def test_weave_fluoride(self): - self._test_weave(parameter=FLUORIDE, output="summary") + self._test_weave(parameter=FLUORIDE, output_type="summary") def test_weave_magnesium(self): - self._test_weave(parameter=MAGNESIUM, output="summary") + self._test_weave(parameter=MAGNESIUM, output_type="summary") def test_weave_nitrate(self): - self._test_weave(parameter=NITRATE, output="summary") + self._test_weave(parameter=NITRATE, output_type="summary") def test_weave_ph(self): - self._test_weave(parameter=PH, output="summary") + self._test_weave(parameter=PH, output_type="summary") def test_weave_potassium(self): - self._test_weave(parameter=POTASSIUM, output="summary") + self._test_weave(parameter=POTASSIUM, output_type="summary") def test_weave_silica(self): - self._test_weave(parameter=SILICA, output="summary") + self._test_weave(parameter=SILICA, output_type="summary") def test_weave_sodium(self): - self._test_weave(parameter=SODIUM, output="summary") + self._test_weave(parameter=SODIUM, output_type="summary") def test_weave_sulfate(self): - self._test_weave(parameter=SULFATE, output="summary") + self._test_weave(parameter=SULFATE, output_type="summary") def test_weave_tds(self): - self._test_weave(parameter=TDS, output="summary") + self._test_weave(parameter=TDS, output_type="summary") def test_weave_uranium(self): - self._test_weave(parameter=URANIUM, output="summary") + self._test_weave(parameter=URANIUM, output_type="summary") diff --git a/tests/test_sources/__init__.py b/tests/test_sources/__init__.py index a18dd94..34d0485 100644 --- a/tests/test_sources/__init__.py +++ b/tests/test_sources/__init__.py @@ -1,3 +1,4 @@ +import json from logging import shutdown as logger_shutdown from pathlib import Path import pytest @@ -10,8 +11,18 @@ from backend.unifier import unify_analytes, unify_waterlevels from tests import recursively_clean_directory -SUMMARY_RECORD_HEADERS = list(SummaryRecord.keys) -SITE_RECORD_HEADERS = list(SiteRecord.keys) +EXCLUDED_GEOJSON_KEYS = ["latitude", "longitude", "elevation"] + +SUMMARY_RECORD_CSV_HEADERS = list(SummaryRecord.keys) +SUMMARY_RECORD_GEOJSON_KEYS = [ + k for k in SUMMARY_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS +] + +SITE_RECORD_CSV_HEADERS = list(SiteRecord.keys) +SITE_RECORD_GEOJSON_KEYS = [ + k for k in SITE_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS +] + PARAMETER_RECORD_HEADERS = list(ParameterRecord.keys) @@ -68,18 +79,61 @@ def _run_unifier(self): else: unify_analytes(self.config) - def _check_sites_file(self): - sites_file = Path(self.config.output_path) / "sites.csv" - assert sites_file.exists() + def _check_summary_file(self, extension: str): + summary_file = Path(self.config.output_path) / f"summary.{extension}" + assert summary_file.exists() - with open(sites_file, "r") as f: - headers = f.readline().strip().split(",") - assert headers == SITE_RECORD_HEADERS + if extension == "csv": + with open(summary_file, "r") as f: + headers = f.readline().strip().split(",") + assert headers == SUMMARY_RECORD_CSV_HEADERS + + # +1 for the header + with open(summary_file, "r") as f: + lines = f.readlines() + assert len(lines) == self.site_limit + 1 + elif extension == "geojson": + with open(summary_file, "r") as f: + summary = json.load(f) + assert len(summary["features"]) == self.site_limit + assert summary["type"] == "FeatureCollection" + for feature in summary["features"]: + assert feature["geometry"]["type"] == "Point" + assert len(feature["geometry"]["coordinates"]) == 3 + assert sorted(feature["properties"].keys()) == sorted( + SUMMARY_RECORD_GEOJSON_KEYS + ) + assert summary["features"][0]["type"] == "Feature" + else: + raise ValueError(f"Unsupported file extension: {extension}") - # +1 for the header - with open(sites_file, "r") as f: - lines = f.readlines() - assert len(lines) == self.site_limit + 1 + def _check_sites_file(self, extension: str): + sites_file = Path(self.config.output_path) / f"sites.{extension}" + assert sites_file.exists() + + if extension == "csv": + with open(sites_file, "r") as f: + headers = f.readline().strip().split(",") + assert headers == SITE_RECORD_CSV_HEADERS + + # +1 for the header + with open(sites_file, "r") as f: + lines = f.readlines() + assert len(lines) == self.site_limit + 1 + elif extension == "geojson": + with open(sites_file, "r") as f: + sites = json.load(f) + assert len(sites["features"]) == self.site_limit + assert sites["type"] == "FeatureCollection" + for feature in sites["features"]: + assert feature["geometry"]["type"] == "Point" + assert len(feature["geometry"]["coordinates"]) == 3 + assert sorted(feature["properties"].keys()) == sorted( + SITE_RECORD_GEOJSON_KEYS + ) + assert sites["features"][0]["type"] == "Feature" + else: + raise ValueError(f"Unsupported file extension: {extension}") def _check_timeseries_file(self, timeseries_dir, timeseries_file_name): timeseries_file = Path(timeseries_dir) / timeseries_file_name @@ -94,7 +148,7 @@ def test_health(self): source = self.config.all_site_sources()[0][0] assert source.health() - def test_summary(self): + def test_summary_csv(self): # Arrange -------------------------------------------------------------- self.config.output_summary = True self.config.report() @@ -103,21 +157,21 @@ def test_summary(self): self._run_unifier() # Assert --------------------------------------------------------------- - # Check the summary file - summary_file = Path(self.config.output_path) / "summary.csv" - assert summary_file.exists() + self._check_summary_file("csv") - # Check the column headers - with open(summary_file, "r") as f: - headers = f.readline().strip().split(",") - assert headers == SUMMARY_RECORD_HEADERS + def test_summary_geojson(self): + # Arrange -------------------------------------------------------------- + self.config.output_summary = True + self.config.output_format = "geojson" + self.config.report() - # +1 for the header - with open(summary_file, "r") as f: - lines = f.readlines() - assert len(lines) == self.site_limit + 1 + # Act ------------------------------------------------------------------ + self._run_unifier() - def test_timeseries_unified(self): + # Assert --------------------------------------------------------------- + self._check_summary_file("geojson") + + def test_timeseries_unified_csv(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_unified = True self.config.report() @@ -127,16 +181,54 @@ def test_timeseries_unified(self): # Assert --------------------------------------------------------------- # Check the sites file - self._check_sites_file() + self._check_sites_file("csv") # Check the timeseries file timeseries_dir = Path(self.config.output_path) timeseries_file_name = "timeseries_unified.csv" self._check_timeseries_file(timeseries_dir, timeseries_file_name) - def test_timeseries_separated(self): + def test_timeseries_unified_geojson(self): + # Arrange -------------------------------------------------------------- + self.config.output_timeseries_unified = True + self.config.output_format = "geojson" + self.config.report() + + # Act ------------------------------------------------------------------ + self._run_unifier() + + # Assert --------------------------------------------------------------- + # Check the sites file + self._check_sites_file("geojson") + + # Check the timeseries file + timeseries_dir = Path(self.config.output_path) + timeseries_file_name = "timeseries_unified.csv" + self._check_timeseries_file(timeseries_dir, timeseries_file_name) + + def test_timeseries_separated_csv(self): + # Arrange -------------------------------------------------------------- + self.config.output_timeseries_separated = True + self.config.report() + + # Act ------------------------------------------------------------------ + self._run_unifier() + + # Assert --------------------------------------------------------------- + # Check the sites file + self._check_sites_file("csv") + + # Check the timeseries files + timeseries_dir = Path(self.config.output_path) / "timeseries" + assert len([f for f in timeseries_dir.iterdir()]) == self.site_limit + + for timeseries_file in timeseries_dir.iterdir(): + self._check_timeseries_file(timeseries_dir, timeseries_file.name) + + def test_timeseries_separated_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_separated = True + self.config.output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ @@ -144,7 +236,7 @@ def test_timeseries_separated(self): # Assert --------------------------------------------------------------- # Check the sites file - self._check_sites_file() + self._check_sites_file("geojson") # Check the timeseries files timeseries_dir = Path(self.config.output_path) / "timeseries" diff --git a/tests/test_sources/test_nmbgmr_amp.py b/tests/test_sources/test_nmbgmr_amp.py index 90bba2c..b56fd5b 100644 --- a/tests/test_sources/test_nmbgmr_amp.py +++ b/tests/test_sources/test_nmbgmr_amp.py @@ -1,6 +1,23 @@ +import os +import pytest + from backend.constants import WATERLEVELS, CALCIUM, MILLIGRAMS_PER_LITER, FEET from tests.test_sources import BaseSourceTestClass +os.environ["IS_TESTING_ENV"] = "True" + + +@pytest.fixture(autouse=True) +def setup(): + # SETUP CODE ----------------------------------------------------------- + os.environ["IS_TESTING_ENV"] = "True" + + # RUN TESTS ------------------------------------------------------------ + yield + + # TEARDOWN CODE --------------------------------------------------------- + os.environ["IS_TESTING_ENV"] = "False" + class TestNMBGMRWaterlevels(BaseSourceTestClass):