From 4732442f59df79de3fe21ceb2ff0025ce387d8de Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Tue, 11 Mar 2025 22:09:40 -0600 Subject: [PATCH 01/38] Create dependabot.yml --- .github/dependabot.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..9d866e3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "pip" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "weekly" From 535ed50670d6a49f130aa12ebd913ac72748f2b4 Mon Sep 17 00:00:00 2001 From: Jake Ross Date: Tue, 11 Mar 2025 22:13:25 -0600 Subject: [PATCH 02/38] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2314f70..2b33ca8 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Format code](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/format_code.yml/badge.svg?branch=main)](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/format_code.yml) [![Publish Python 🐍 distributions 📦 to PyPI and TestPyPI](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/publish-to-pypi.yml/badge.svg)](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/publish-to-pypi.yml) [![CI/CD](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/cicd.yml/badge.svg)](https://github.com/DataIntegrationGroup/PyWeaver/actions/workflows/cicd.yml) - +[![Dependabot Updates](https://github.com/DataIntegrationGroup/DataIntegrationEngine/actions/workflows/dependabot/dependabot-updates/badge.svg)](https://github.com/DataIntegrationGroup/DataIntegrationEngine/actions/workflows/dependabot/dependabot-updates) ![NMWDI](https://newmexicowaterdata.org/wp-content/uploads/2023/11/newmexicowaterdatalogoNov2023.png) ![NMBGMR](https://waterdata.nmt.edu/static/nmbgmr_logo_resized.png) @@ -218,4 +218,4 @@ Use die wells ``` -to print wells to the terminal. \ No newline at end of file +to print wells to the terminal. From 0e3422f95cd082d7447cb4695b437f450e6b2302 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 18 Mar 2025 08:58:37 -0600 Subject: [PATCH 03/38] Update for AMPAPI v 0.33.0 - pagination and removal of well from waterlevel records --- backend/connectors/nmbgmr/source.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index d75adae..8700a98 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -15,8 +15,6 @@ # =============================================================================== import os -import httpx - from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.connectors.nmbgmr.transformer import ( NMBGMRSiteTransformer, @@ -47,12 +45,12 @@ def _make_url(endpoint): if os.getenv("DEBUG") == "1": return f"http://localhost:8000/latest/{endpoint}" - return f"https://waterdata.nmt.edu/latest/{endpoint}" + return f"https://waterdata.nmt.edu//latest/{endpoint}" class NMBGMRSiteSource(BaseSiteSource): transformer_klass = NMBGMRSiteTransformer - chunk_size = 10 + chunk_size = 100 bounding_polygon = NM_STATE_BOUNDING_POLYGON def __repr__(self): @@ -168,7 +166,7 @@ def __repr__(self): def _clean_records(self, records): # remove records with no depth to water value - return [r for r in records if r["DepthToWaterBGS"] is not None] + return [r for r in records if r["DepthToWaterBGS"] is not None and r["DateMeasured"] is not None] def _extract_parameter_record(self, record, *args, **kw): record[PARAMETER_NAME] = DTW @@ -195,7 +193,7 @@ def _extract_source_parameter_results(self, records): return [r["DepthToWaterBGS"] for r in records] def _extract_site_records(self, records, site_record): - return [ri for ri in records if ri["Well"]["PointID"] == site_record.id] + return [ri for ri in records if ri["PointID"] == site_record.id] def _extract_source_parameter_names(self, records): return ["DepthToWaterBGS" for r in records] @@ -212,7 +210,19 @@ def get_records(self, site_record): # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") - return self._execute_json_request(url, params) + paginated_records = self._execute_json_request(url, params, tag="") + items = paginated_records["items"] + page = paginated_records["page"] + pages = paginated_records["pages"] + + while page < pages: + page += 1 + params["page"] = page + new_records = self._execute_json_request(url, params, tag="") + items.extend(new_records["items"]) + pages = new_records["pages"] + + return items # ============= EOF ============================================= From 3c5ffcdb2e4b8795e277c3904ceda282d9e4844b Mon Sep 17 00:00:00 2001 From: jakeross Date: Mon, 14 Apr 2025 23:34:15 -0600 Subject: [PATCH 04/38] added ci/cd caching --- .github/workflows/cicd.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 5ea32ac..9ca846e 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -19,25 +19,38 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Set up Python 3.10 uses: actions/setup-python@v3 with: python-version: "3.10" cache: "pip" + + - name: Cache pip dependencies + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + - name: Install dependencies run: | python -m pip install --upgrade pip pip install flake8 pytest mypy if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Type check with mypy run: | mypy . --ignore-missing-imports + - name: Test with pytest run: | pytest -s -rx tests From 20977952408cf72ea34031a29c9e7076e814075f Mon Sep 17 00:00:00 2001 From: jakeross Date: Mon, 14 Apr 2025 23:43:47 -0600 Subject: [PATCH 05/38] workflow trigger updates --- .github/workflows/cicd.yml | 4 ++-- .github/workflows/publish-to-pypi.yml | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 9ca846e..77ce402 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -5,9 +5,9 @@ name: CI/CD on: push: - branches: [ "main", "feature/jir", "dev/jab"] + branches: [ "pre-production", "feature/jir", "dev/jab"] pull_request: - branches: [ "main"] + branches: [ "pre-production"] permissions: contents: read diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index 9c64e4b..699394f 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -3,10 +3,12 @@ name: Publish Python 🐍 distributions 📦 to PyPI and TestPyPI # pushes only occur on successful pull request merges # this makes it so that the main branch gets published to PyPi, not the # target branch from the pull request (pre-production) + +# pushing tags should be the only way to trigger this workflow on: push: - branches: - - main + tags: + - '*' jobs: build-and-publish-if-merged: @@ -20,7 +22,7 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.9 - cache: 'pip' + - name: Install pypa/build run: >- python -m From 6cc5a15244867e0edc3210c6643b78a836e6f1de Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 15 Apr 2025 16:52:50 -0700 Subject: [PATCH 06/38] Implement GeoJSON for sites & summary --- backend/config.py | 5 +-- backend/persister.py | 100 ++++++++++++++++++++++++++++--------------- backend/record.py | 9 ++-- backend/unifier.py | 46 ++++++++++---------- frontend/cli.py | 36 +++++++--------- setup.py | 2 +- 6 files changed, 112 insertions(+), 86 deletions(-) diff --git a/backend/config.py b/backend/config.py index 0ec1dd7..d59a8ea 100644 --- a/backend/config.py +++ b/backend/config.py @@ -142,15 +142,13 @@ class Config(Loggable): output_summary: bool = False output_timeseries_unified: bool = False output_timeseries_separated: bool = False + output_site_file_type: str = "csv" latest_water_level_only: bool = False analyte_output_units: str = MILLIGRAMS_PER_LITER waterlevel_output_units: str = FEET - use_csv: bool = True - use_geojson: bool = False - def __init__(self, model=None, payload=None): # need to initialize logger super().__init__() @@ -417,6 +415,7 @@ def _report_attributes(title, attrs): "output_summary", "output_timeseries_unified", "output_timeseries_separated", + "output_site_file_type", "output_horizontal_datum", "output_elevation_units", ), diff --git a/backend/persister.py b/backend/persister.py index a89572c..d7fb56e 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -17,9 +17,12 @@ import io import os import shutil +from pprint import pprint +import json import pandas as pd import geopandas as gpd +from shapely import Point from backend.logger import Loggable @@ -34,9 +37,7 @@ class BasePersister(Loggable): Class to persist the data to a file or cloud storage. If persisting to a file, the output directory is created by config._make_output_path() """ - - extension: str - # output_id: str + add_extension: str = "csv" def __init__(self): self.records = [] @@ -75,7 +76,7 @@ def dump_timeseries_unified(self, path: str): path = os.path.join(path, "timeseries_unified") path = self.add_extension(path) self.log(f"dumping unified timeseries to {os.path.abspath(path)}") - self._dump_timeseries_unified(path, self.timeseries) + self._dump_timeseries(path, self.timeseries) else: self.log("no timeseries records to dump", fg="red") @@ -85,21 +86,16 @@ def dump_timeseries_separated(self, path: str): # the individual site timeseries will be dumped timeseries_path = os.path.join(path, "timeseries") self._make_output_directory(timeseries_path) - for site, records in self.timeseries: - path = os.path.join(timeseries_path, str(site.id).replace(" ", "_")) + for records in self.timeseries: + site_id = records[0].id + path = os.path.join(timeseries_path, str(site_id).replace(" ", "_")) path = self.add_extension(path) - self.log(f"dumping {site.id} to {os.path.abspath(path)}") - self._write(path, records) - else: - self.log("no timeseries records to dump", fg="red") + self.log(f"dumping {site_id} to {os.path.abspath(path)}") - def save(self, path: str): - if self.records: - path = self.add_extension(path) - self.log(f"saving to {path}") - self._write(path, self.records) + list_of_records = [records] + self._dump_timeseries(path, list_of_records) else: - self.log("no records to save", fg="red") + self.log("no timeseries records to dump", fg="red") def add_extension(self, path: str): if not self.extension: @@ -111,15 +107,14 @@ def add_extension(self, path: str): def _write(self, path: str, records): raise NotImplementedError - - def _dump_timeseries_unified(self, path: str, timeseries: list): + + def _dump_timeseries(self, path: str, timeseries: list): raise NotImplementedError def _make_output_directory(self, output_directory: str): os.mkdir(output_directory) - -def write_file(path, func, records): +def write_csv_file(path, func, records): with open(path, "w", newline="") as f: func(csv.writer(f), records) @@ -130,10 +125,16 @@ def write_memory(path, func, records): return f.getvalue() -def dump_timeseries_unified(writer, timeseries): +def dump_timeseries(writer, timeseries: list[list]): + """ + Dumps timeseries records to a CSV file. The timeseries must be a list of + lists, where each inner list contains the records for a single site. In the case + of timeseries separated, the inner list will contain the records for a single site + and this function will be called multiple times, once for each site. + """ headers_have_not_been_written = True - for i, (site, records) in enumerate(timeseries): - for j, record in enumerate(records): + for i, records in enumerate(timeseries): + for record in records: if i == 0 and headers_have_not_been_written: writer.writerow(record.keys) headers_have_not_been_written = False @@ -192,7 +193,7 @@ def _add_content(self, path: str, content: str): self._content.append((path, content)) def _dump_timeseries_unified(self, path: str, timeseries: list): - content = write_memory(path, dump_timeseries_unified, timeseries) + content = write_memory(path, dump_timeseries, timeseries) self._add_content(path, content) @@ -200,24 +201,53 @@ class CSVPersister(BasePersister): extension = "csv" def _write(self, path: str, records: list): - write_file(path, dump_sites, records) + write_csv_file(path, dump_sites, records) - def _dump_timeseries_unified(self, path: str, timeseries: list): - write_file(path, dump_timeseries_unified, timeseries) + def _dump_timeseries(self, path: str, timeseries: list): + write_csv_file(path, dump_timeseries, timeseries) class GeoJSONPersister(BasePersister): extension = "geojson" def _write(self, path: str, records: list): - r0 = records[0] - df = pd.DataFrame([r.to_row() for r in records], columns=r0.keys) - - gdf = gpd.GeoDataFrame( - df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326" - ) - gdf.to_file(path, driver="GeoJSON") - + feature_collection = { + "type": "FeatureCollection", + "features": [], + } + + features = [ + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [record.get("longitude"), record.get("latitude"), record.get("elevation")], + }, + "properties": {k: record.get(k) for k in record.keys if k not in ["latitude", "longitude", "elevation"]}, + } + for record in records + ] + feature_collection["features"].extend(features) + + + with open(path, "w") as f: + json.dump(feature_collection, f, indent=4) + + + def _get_gdal_type(self, dtype): + """ + Map pandas dtypes to GDAL-compatible types for the schema. + """ + if pd.api.types.is_integer_dtype(dtype): + return "int" + elif pd.api.types.is_float_dtype(dtype): + return "float" + elif pd.api.types.is_string_dtype(dtype): + return "str" + elif pd.api.types.is_datetime64_any_dtype(dtype): + return "datetime" + else: + return "str" # Default to string for unsupported types # class ST2Persister(BasePersister): # extension = "st2" diff --git a/backend/record.py b/backend/record.py index 5cfe5e9..3185363 100644 --- a/backend/record.py +++ b/backend/record.py @@ -30,10 +30,8 @@ def to_csv(self): def __init__(self, payload): self._payload = payload - - def to_row(self): - - def get(attr): + + def get(self, attr): # v = self._payload.get(attr) # if v is None and self.defaults: # v = self.defaults.get(attr) @@ -64,7 +62,8 @@ def get(attr): break return v - return [get(k) for k in self.keys] + def to_row(self): + return [self.get(k) for k in self.keys] def update(self, **kw): self._payload.update(kw) diff --git a/backend/unifier.py b/backend/unifier.py index ce03a3d..4539f11 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -100,9 +100,9 @@ def _perister_factory(config): persister_klass = CSVPersister if config.use_cloud_storage: persister_klass = CloudStoragePersister - elif config.use_csv: + elif config.output_site_file_type == "csv": persister_klass = CSVPersister - elif config.use_geojson: + elif config.output_site_file_type == "geojson": persister_klass = GeoJSONPersister return persister_klass() @@ -114,7 +114,7 @@ def _perister_factory(config): # persister.save(config.output_path) -def _site_wrapper(site_source, parameter_source, persister, config): +def _site_wrapper(site_source, parameter_source, sites_summary_persister, timeseries_persister, config): try: # TODO: fully develop checks/discoveries below @@ -142,7 +142,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): first_flag = True if config.sites_only: - persister.sites.extend(sites) + sites_summary_persister.sites.extend(sites) else: for site_records in site_source.chunks(sites): if type(site_records) == list: @@ -159,7 +159,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): site_records, use_summarize, start_ind, end_ind ) if summary_records: - persister.records.extend(summary_records) + sites_summary_persister.records.extend(summary_records) sites_with_records_count += len(summary_records) else: continue @@ -176,8 +176,8 @@ def _site_wrapper(site_source, parameter_source, persister, config): sites_with_records_count += len(results) for site, records in results: - persister.timeseries.append((site, records)) - persister.sites.append(site) + timeseries_persister.timeseries.append(records) + sites_summary_persister.sites.append(site) if site_limit: # print( @@ -203,15 +203,15 @@ def _site_wrapper(site_source, parameter_source, persister, config): # num_sites_to_remove from the length of the list # to remove the last num_sites_to_remove sites if use_summarize: - persister.records = persister.records[ - : len(persister.records) - num_sites_to_remove + sites_summary_persister.records = sites_summary_persister.records[ + : len(sites_summary_persister.records) - num_sites_to_remove ] else: - persister.timeseries = persister.timeseries[ - : len(persister.timeseries) - num_sites_to_remove + timeseries_persister.timeseries = timeseries_persister.timeseries[ + : len(timeseries_persister.timeseries) - num_sites_to_remove ] - persister.sites = persister.sites[ - : len(persister.sites) - num_sites_to_remove + sites_summary_persister.sites = sites_summary_persister.sites[ + : len(sites_summary_persister.sites) - num_sites_to_remove ] break @@ -227,22 +227,24 @@ def _unify_parameter( config, sources, ): - persister = _perister_factory(config) + sites_summary_persister = _perister_factory(config) + timeseries_persister = CSVPersister() for site_source, parameter_source in sources: - _site_wrapper(site_source, parameter_source, persister, config) + _site_wrapper(site_source, parameter_source, sites_summary_persister, timeseries_persister, config) if config.output_summary: - persister.dump_summary(config.output_path) + sites_summary_persister.dump_summary(config.output_path) elif config.output_timeseries_unified: - persister.dump_timeseries_unified(config.output_path) - persister.dump_sites(config.output_path) + timeseries_persister.dump_timeseries_unified(config.output_path) + sites_summary_persister.dump_sites(config.output_path) elif config.sites_only: - persister.dump_sites(config.output_path) + sites_summary_persister.dump_sites(config.output_path) else: # config.output_timeseries_separated - persister.dump_timeseries_separated(config.output_path) - persister.dump_sites(config.output_path) + timeseries_persister.dump_timeseries_separated(config.output_path) + sites_summary_persister.dump_sites(config.output_path) - persister.finalize(config.output_name) + timeseries_persister.finalize(config.output_name) + sites_summary_persister.finalize(config.output_name) def get_sources_in_polygon(polygon): diff --git a/frontend/cli.py b/frontend/cli.py index 04e2949..c0c4f15 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -169,24 +169,6 @@ def cli(): help="End date in the form 'YYYY', 'YYYY-MM', 'YYYY-MM-DD', 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'", ), ] - -TIMESERIES_OPTIONS = [ - click.option( - "--separated_timeseries", - is_flag=True, - default=False, - show_default=True, - help="Output separate timeseries files for every site", - ), - click.option( - "--unified_timeseries", - is_flag=True, - default=False, - show_default=True, - help="Output single timeseries file, which includes all sites", - ), -] - OUTPUT_OPTIONS = [ click.option( "--output", @@ -195,6 +177,7 @@ def cli(): help="Output summary file, single unified timeseries file, or separated timeseries files", ), ] + PERSISTER_OPTIONS = [ click.option( "--output-dir", @@ -203,6 +186,15 @@ def cli(): ) ] +SITE_OUTPUT_TYPE_OPTIONS = [ + click.option( + "--site-output-type", + type=click.Choice(["csv", "geojson"]), + default="csv", + help="Output file format for sites (csv or geoson). Default is csv", + ) +] + def add_options(options): def _add_options(func): @@ -225,6 +217,7 @@ def _add_options(func): @add_options(SPATIAL_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) +@add_options(SITE_OUTPUT_TYPE_OPTIONS) def weave( parameter, output, @@ -249,12 +242,13 @@ def weave( site_limit, dry, yes, + site_output_type, ): """ Get parameter timeseries or summary data """ # instantiate config and set up parameter - config = setup_config(parameter, bbox, wkt, county, site_limit, dry) + config = setup_config(parameter, bbox, wkt, county, site_limit, dry, site_output_type) config.parameter = parameter # output type @@ -402,7 +396,7 @@ def sources(sources, bbox, wkt, county): click.echo(s) -def setup_config(tag, bbox, wkt, county, site_limit, dry): +def setup_config(tag, bbox, wkt, county, site_limit, dry, site_output_type="csv"): config = Config() if county: click.echo(f"Getting {tag} for county {county}") @@ -421,6 +415,8 @@ def setup_config(tag, bbox, wkt, county, site_limit, dry): config.site_limit = None config.dry = dry + config.output_site_file_type = site_output_type + return config diff --git a/setup.py b/setup.py index f06990d..05b81a8 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( name="nmuwd", - version="0.8.2", + version="0.9.0", author="Jake Ross", description="New Mexico Water Data Integration Engine", long_description=long_description, From 60ef303437b5beae83564cb82a4315dfa510a59d Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 15 Apr 2025 17:28:00 -0700 Subject: [PATCH 07/38] geojson output for sites and summary --- backend/config.py | 4 +- backend/unifier.py | 4 +- frontend/cli.py | 10 +-- tests/test_cli/__init__.py | 25 ++++++ tests/test_sources/__init__.py | 141 ++++++++++++++++++++++++++------- 5 files changed, 147 insertions(+), 37 deletions(-) diff --git a/backend/config.py b/backend/config.py index d59a8ea..de2bc64 100644 --- a/backend/config.py +++ b/backend/config.py @@ -142,7 +142,7 @@ class Config(Loggable): output_summary: bool = False output_timeseries_unified: bool = False output_timeseries_separated: bool = False - output_site_file_type: str = "csv" + site_file_type: str = "csv" latest_water_level_only: bool = False @@ -415,7 +415,7 @@ def _report_attributes(title, attrs): "output_summary", "output_timeseries_unified", "output_timeseries_separated", - "output_site_file_type", + "site_file_type", "output_horizontal_datum", "output_elevation_units", ), diff --git a/backend/unifier.py b/backend/unifier.py index 4539f11..9e4e653 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -100,9 +100,9 @@ def _perister_factory(config): persister_klass = CSVPersister if config.use_cloud_storage: persister_klass = CloudStoragePersister - elif config.output_site_file_type == "csv": + elif config.site_file_type == "csv": persister_klass = CSVPersister - elif config.output_site_file_type == "geojson": + elif config.site_file_type == "geojson": persister_klass = GeoJSONPersister return persister_klass() diff --git a/frontend/cli.py b/frontend/cli.py index c0c4f15..cd9e180 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -188,7 +188,7 @@ def cli(): SITE_OUTPUT_TYPE_OPTIONS = [ click.option( - "--site-output-type", + "--site-file-type", type=click.Choice(["csv", "geojson"]), default="csv", help="Output file format for sites (csv or geoson). Default is csv", @@ -242,13 +242,13 @@ def weave( site_limit, dry, yes, - site_output_type, + site_file_type, ): """ Get parameter timeseries or summary data """ # instantiate config and set up parameter - config = setup_config(parameter, bbox, wkt, county, site_limit, dry, site_output_type) + config = setup_config(parameter, bbox, wkt, county, site_limit, dry, site_file_type) config.parameter = parameter # output type @@ -396,7 +396,7 @@ def sources(sources, bbox, wkt, county): click.echo(s) -def setup_config(tag, bbox, wkt, county, site_limit, dry, site_output_type="csv"): +def setup_config(tag, bbox, wkt, county, site_limit, dry, site_file_type="csv"): config = Config() if county: click.echo(f"Getting {tag} for county {county}") @@ -415,7 +415,7 @@ def setup_config(tag, bbox, wkt, county, site_limit, dry, site_output_type="csv" config.site_limit = None config.dry = dry - config.output_site_file_type = site_output_type + config.site_file_type = site_file_type return config diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 84923b8..0903403 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -50,6 +50,7 @@ def _test_weave( self, parameter: str, output: str, + site_output_type: str = "csv", site_limit: int = 4, start_date: str = "1990-08-10", end_date: str = "1990-08-11", @@ -92,6 +93,13 @@ def _test_weave( end_date, ] + if site_output_type == "csv": + arguments.append("--site-file-type") + arguments.append(site_output_type) + elif site_output_type == "geojson": + arguments.append("--site-file-type") + arguments.append(site_output_type) + if geographic_filter_name and geographic_filter_value: arguments.extend([f"--{geographic_filter_name}", geographic_filter_value]) @@ -115,6 +123,7 @@ def _test_weave( 6. The start date is set correctly 7. The end date is set correctly 8. The geographic filter is set correctly + 9. The site output type is set correctly """ config = result.return_value @@ -166,6 +175,12 @@ def _test_weave( else: assert getattr(config, _geographic_filter_name) == "" + # 9 + if site_output_type == "csv": + assert getattr(config, "site_file_type") == "csv" + elif site_output_type == "geojson": + assert getattr(config, "site_file_type") == "geojson" + def test_weave_summary(self): self._test_weave(parameter=WATERLEVELS, output="summary") @@ -175,6 +190,16 @@ def test_weave_timeseries_unified(self): def test_weave_timeseries_separated(self): self._test_weave(parameter=WATERLEVELS, output="timeseries_separated") + def test_weave_csv(self): + self._test_weave( + parameter=WATERLEVELS, output="summary", site_output_type="csv" + ) + + def test_weave_geojson(self): + self._test_weave( + parameter=WATERLEVELS, output="summary", site_output_type="geojson" + ) + def test_weave_bbox(self): self._test_weave( parameter=WATERLEVELS, output="summary", bbox="32.0,-106.0,36.0,-102.0" diff --git a/tests/test_sources/__init__.py b/tests/test_sources/__init__.py index a18dd94..d020b90 100644 --- a/tests/test_sources/__init__.py +++ b/tests/test_sources/__init__.py @@ -1,3 +1,4 @@ +import json from logging import shutdown as logger_shutdown from pathlib import Path import pytest @@ -10,8 +11,14 @@ from backend.unifier import unify_analytes, unify_waterlevels from tests import recursively_clean_directory -SUMMARY_RECORD_HEADERS = list(SummaryRecord.keys) -SITE_RECORD_HEADERS = list(SiteRecord.keys) +EXCLUDED_GEOJSON_KEYS = ["latitude", "longitude", "elevation"] + +SUMMARY_RECORD_CSV_HEADERS = list(SummaryRecord.keys) +SUMMARY_RECORD_GEOJSON_KEYS = [k for k in SUMMARY_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS] + +SITE_RECORD_CSV_HEADERS = list(SiteRecord.keys) +SITE_RECORD_GEOJSON_KEYS = [k for k in SITE_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS] + PARAMETER_RECORD_HEADERS = list(ParameterRecord.keys) @@ -68,18 +75,57 @@ def _run_unifier(self): else: unify_analytes(self.config) - def _check_sites_file(self): - sites_file = Path(self.config.output_path) / "sites.csv" - assert sites_file.exists() + def _check_summary_file(self, extension: str): + summary_file = Path(self.config.output_path) / f"summary.{extension}" + assert summary_file.exists() - with open(sites_file, "r") as f: - headers = f.readline().strip().split(",") - assert headers == SITE_RECORD_HEADERS + if extension == "csv": + with open(summary_file, "r") as f: + headers = f.readline().strip().split(",") + assert headers == SUMMARY_RECORD_CSV_HEADERS + + # +1 for the header + with open(summary_file, "r") as f: + lines = f.readlines() + assert len(lines) == self.site_limit + 1 + elif extension == "geojson": + with open(summary_file, "r") as f: + summary = json.load(f) + assert len(summary["features"]) == self.site_limit + assert summary["type"] == "FeatureCollection" + for feature in summary["features"]: + assert feature["geometry"]["type"] == "Point" + assert len(feature["geometry"]["coordinates"]) == 3 + assert sorted(feature["properties"].keys()) == sorted(SUMMARY_RECORD_GEOJSON_KEYS) + assert summary["features"][0]["type"] == "Feature" + else: + raise ValueError(f"Unsupported file extension: {extension}") - # +1 for the header - with open(sites_file, "r") as f: - lines = f.readlines() - assert len(lines) == self.site_limit + 1 + def _check_sites_file(self, extension: str): + sites_file = Path(self.config.output_path) / f"sites.{extension}" + assert sites_file.exists() + + if extension == "csv": + with open(sites_file, "r") as f: + headers = f.readline().strip().split(",") + assert headers == SITE_RECORD_CSV_HEADERS + + # +1 for the header + with open(sites_file, "r") as f: + lines = f.readlines() + assert len(lines) == self.site_limit + 1 + elif extension == "geojson": + with open(sites_file, "r") as f: + sites = json.load(f) + assert len(sites["features"]) == self.site_limit + assert sites["type"] == "FeatureCollection" + for feature in sites["features"]: + assert feature["geometry"]["type"] == "Point" + assert len(feature["geometry"]["coordinates"]) == 3 + assert sorted(feature["properties"].keys()) == sorted(SITE_RECORD_GEOJSON_KEYS) + assert sites["features"][0]["type"] == "Feature" + else: + raise ValueError(f"Unsupported file extension: {extension}") def _check_timeseries_file(self, timeseries_dir, timeseries_file_name): timeseries_file = Path(timeseries_dir) / timeseries_file_name @@ -94,7 +140,7 @@ def test_health(self): source = self.config.all_site_sources()[0][0] assert source.health() - def test_summary(self): + def test_summary_csv(self): # Arrange -------------------------------------------------------------- self.config.output_summary = True self.config.report() @@ -103,21 +149,21 @@ def test_summary(self): self._run_unifier() # Assert --------------------------------------------------------------- - # Check the summary file - summary_file = Path(self.config.output_path) / "summary.csv" - assert summary_file.exists() + self._check_summary_file("csv") - # Check the column headers - with open(summary_file, "r") as f: - headers = f.readline().strip().split(",") - assert headers == SUMMARY_RECORD_HEADERS + def test_summary_geojson(self): + # Arrange -------------------------------------------------------------- + self.config.output_summary = True + self.config.site_file_type = "geojson" + self.config.report() - # +1 for the header - with open(summary_file, "r") as f: - lines = f.readlines() - assert len(lines) == self.site_limit + 1 + # Act ------------------------------------------------------------------ + self._run_unifier() - def test_timeseries_unified(self): + # Assert --------------------------------------------------------------- + self._check_summary_file("geojson") + + def test_timeseries_unified_csv(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_unified = True self.config.report() @@ -127,16 +173,54 @@ def test_timeseries_unified(self): # Assert --------------------------------------------------------------- # Check the sites file - self._check_sites_file() + self._check_sites_file("csv") # Check the timeseries file timeseries_dir = Path(self.config.output_path) timeseries_file_name = "timeseries_unified.csv" self._check_timeseries_file(timeseries_dir, timeseries_file_name) - def test_timeseries_separated(self): + def test_timeseries_unified_geojson(self): + # Arrange -------------------------------------------------------------- + self.config.output_timeseries_unified = True + self.config.site_file_type = "geojson" + self.config.report() + + # Act ------------------------------------------------------------------ + self._run_unifier() + + # Assert --------------------------------------------------------------- + # Check the sites file + self._check_sites_file("geojson") + + # Check the timeseries file + timeseries_dir = Path(self.config.output_path) + timeseries_file_name = "timeseries_unified.csv" + self._check_timeseries_file(timeseries_dir, timeseries_file_name) + + def test_timeseries_separated_csv(self): + # Arrange -------------------------------------------------------------- + self.config.output_timeseries_separated = True + self.config.report() + + # Act ------------------------------------------------------------------ + self._run_unifier() + + # Assert --------------------------------------------------------------- + # Check the sites file + self._check_sites_file("csv") + + # Check the timeseries files + timeseries_dir = Path(self.config.output_path) / "timeseries" + assert len([f for f in timeseries_dir.iterdir()]) == self.site_limit + + for timeseries_file in timeseries_dir.iterdir(): + self._check_timeseries_file(timeseries_dir, timeseries_file.name) + + def test_timeseries_separated_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_separated = True + self.config.site_file_type = "geojson" self.config.report() # Act ------------------------------------------------------------------ @@ -144,7 +228,7 @@ def test_timeseries_separated(self): # Assert --------------------------------------------------------------- # Check the sites file - self._check_sites_file() + self._check_sites_file("geojson") # Check the timeseries files timeseries_dir = Path(self.config.output_path) / "timeseries" @@ -152,6 +236,7 @@ def test_timeseries_separated(self): for timeseries_file in timeseries_dir.iterdir(): self._check_timeseries_file(timeseries_dir, timeseries_file.name) + @pytest.mark.skip(reason="test_date_range not implemented yet") def test_date_range(self): From 0f1d7272925609d543fcf5b504a7cb07a2e69ae1 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 16 Apr 2025 00:31:30 +0000 Subject: [PATCH 08/38] Formatting changes --- backend/persister.py | 21 ++++++++---- backend/record.py | 60 +++++++++++++++++----------------- backend/unifier.py | 39 +++++++++++++++------- tests/test_sources/__init__.py | 17 +++++++--- 4 files changed, 85 insertions(+), 52 deletions(-) diff --git a/backend/persister.py b/backend/persister.py index d7fb56e..09d32e6 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -17,7 +17,7 @@ import io import os import shutil -from pprint import pprint +from pprint import pprint import json import pandas as pd @@ -37,6 +37,7 @@ class BasePersister(Loggable): Class to persist the data to a file or cloud storage. If persisting to a file, the output directory is created by config._make_output_path() """ + add_extension: str = "csv" def __init__(self): @@ -107,13 +108,14 @@ def add_extension(self, path: str): def _write(self, path: str, records): raise NotImplementedError - + def _dump_timeseries(self, path: str, timeseries: list): raise NotImplementedError def _make_output_directory(self, output_directory: str): os.mkdir(output_directory) + def write_csv_file(path, func, records): with open(path, "w", newline="") as f: func(csv.writer(f), records) @@ -221,19 +223,25 @@ def _write(self, path: str, records: list): "type": "Feature", "geometry": { "type": "Point", - "coordinates": [record.get("longitude"), record.get("latitude"), record.get("elevation")], + "coordinates": [ + record.get("longitude"), + record.get("latitude"), + record.get("elevation"), + ], + }, + "properties": { + k: record.get(k) + for k in record.keys + if k not in ["latitude", "longitude", "elevation"] }, - "properties": {k: record.get(k) for k in record.keys if k not in ["latitude", "longitude", "elevation"]}, } for record in records ] feature_collection["features"].extend(features) - with open(path, "w") as f: json.dump(feature_collection, f, indent=4) - def _get_gdal_type(self, dtype): """ Map pandas dtypes to GDAL-compatible types for the schema. @@ -249,6 +257,7 @@ def _get_gdal_type(self, dtype): else: return "str" # Default to string for unsupported types + # class ST2Persister(BasePersister): # extension = "st2" # diff --git a/backend/record.py b/backend/record.py index 3185363..61d9769 100644 --- a/backend/record.py +++ b/backend/record.py @@ -30,37 +30,37 @@ def to_csv(self): def __init__(self, payload): self._payload = payload - + def get(self, attr): - # v = self._payload.get(attr) - # if v is None and self.defaults: - # v = self.defaults.get(attr) - v = self.__getattr__(attr) - - field_sigfigs = [ - ("elevation", 2), - ("well_depth", 2), - ("latitude", 6), - ("longitude", 6), - ("min", 2), - ("max", 2), - ("mean", 2), - ] - - # both analyte and water level tables have the same fields, but the - # rounding should only occur for water level tables - if isinstance(self, WaterLevelRecord): - field_sigfigs.append((PARAMETER_VALUE, 2)) - - for field, sigfigs in field_sigfigs: - if v is not None and field == attr: - try: - v = round(v, sigfigs) - except TypeError as e: - print(field, attr) - raise e - break - return v + # v = self._payload.get(attr) + # if v is None and self.defaults: + # v = self.defaults.get(attr) + v = self.__getattr__(attr) + + field_sigfigs = [ + ("elevation", 2), + ("well_depth", 2), + ("latitude", 6), + ("longitude", 6), + ("min", 2), + ("max", 2), + ("mean", 2), + ] + + # both analyte and water level tables have the same fields, but the + # rounding should only occur for water level tables + if isinstance(self, WaterLevelRecord): + field_sigfigs.append((PARAMETER_VALUE, 2)) + + for field, sigfigs in field_sigfigs: + if v is not None and field == attr: + try: + v = round(v, sigfigs) + except TypeError as e: + print(field, attr) + raise e + break + return v def to_row(self): return [self.get(k) for k in self.keys] diff --git a/backend/unifier.py b/backend/unifier.py index 9e4e653..d90705c 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -114,7 +114,9 @@ def _perister_factory(config): # persister.save(config.output_path) -def _site_wrapper(site_source, parameter_source, sites_summary_persister, timeseries_persister, config): +def _site_wrapper( + site_source, parameter_source, sites_summary_persister, timeseries_persister, config +): try: # TODO: fully develop checks/discoveries below @@ -203,16 +205,25 @@ def _site_wrapper(site_source, parameter_source, sites_summary_persister, timese # num_sites_to_remove from the length of the list # to remove the last num_sites_to_remove sites if use_summarize: - sites_summary_persister.records = sites_summary_persister.records[ - : len(sites_summary_persister.records) - num_sites_to_remove - ] + sites_summary_persister.records = ( + sites_summary_persister.records[ + : len(sites_summary_persister.records) + - num_sites_to_remove + ] + ) else: - timeseries_persister.timeseries = timeseries_persister.timeseries[ - : len(timeseries_persister.timeseries) - num_sites_to_remove - ] - sites_summary_persister.sites = sites_summary_persister.sites[ - : len(sites_summary_persister.sites) - num_sites_to_remove - ] + timeseries_persister.timeseries = ( + timeseries_persister.timeseries[ + : len(timeseries_persister.timeseries) + - num_sites_to_remove + ] + ) + sites_summary_persister.sites = ( + sites_summary_persister.sites[ + : len(sites_summary_persister.sites) + - num_sites_to_remove + ] + ) break except BaseException: @@ -230,7 +241,13 @@ def _unify_parameter( sites_summary_persister = _perister_factory(config) timeseries_persister = CSVPersister() for site_source, parameter_source in sources: - _site_wrapper(site_source, parameter_source, sites_summary_persister, timeseries_persister, config) + _site_wrapper( + site_source, + parameter_source, + sites_summary_persister, + timeseries_persister, + config, + ) if config.output_summary: sites_summary_persister.dump_summary(config.output_path) diff --git a/tests/test_sources/__init__.py b/tests/test_sources/__init__.py index d020b90..78886bc 100644 --- a/tests/test_sources/__init__.py +++ b/tests/test_sources/__init__.py @@ -14,10 +14,14 @@ EXCLUDED_GEOJSON_KEYS = ["latitude", "longitude", "elevation"] SUMMARY_RECORD_CSV_HEADERS = list(SummaryRecord.keys) -SUMMARY_RECORD_GEOJSON_KEYS = [k for k in SUMMARY_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS] +SUMMARY_RECORD_GEOJSON_KEYS = [ + k for k in SUMMARY_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS +] SITE_RECORD_CSV_HEADERS = list(SiteRecord.keys) -SITE_RECORD_GEOJSON_KEYS = [k for k in SITE_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS] +SITE_RECORD_GEOJSON_KEYS = [ + k for k in SITE_RECORD_CSV_HEADERS if k not in EXCLUDED_GEOJSON_KEYS +] PARAMETER_RECORD_HEADERS = list(ParameterRecord.keys) @@ -96,7 +100,9 @@ def _check_summary_file(self, extension: str): for feature in summary["features"]: assert feature["geometry"]["type"] == "Point" assert len(feature["geometry"]["coordinates"]) == 3 - assert sorted(feature["properties"].keys()) == sorted(SUMMARY_RECORD_GEOJSON_KEYS) + assert sorted(feature["properties"].keys()) == sorted( + SUMMARY_RECORD_GEOJSON_KEYS + ) assert summary["features"][0]["type"] == "Feature" else: raise ValueError(f"Unsupported file extension: {extension}") @@ -122,7 +128,9 @@ def _check_sites_file(self, extension: str): for feature in sites["features"]: assert feature["geometry"]["type"] == "Point" assert len(feature["geometry"]["coordinates"]) == 3 - assert sorted(feature["properties"].keys()) == sorted(SITE_RECORD_GEOJSON_KEYS) + assert sorted(feature["properties"].keys()) == sorted( + SITE_RECORD_GEOJSON_KEYS + ) assert sites["features"][0]["type"] == "Feature" else: raise ValueError(f"Unsupported file extension: {extension}") @@ -236,7 +244,6 @@ def test_timeseries_separated_geojson(self): for timeseries_file in timeseries_dir.iterdir(): self._check_timeseries_file(timeseries_dir, timeseries_file.name) - @pytest.mark.skip(reason="test_date_range not implemented yet") def test_date_range(self): From 34eeddc0064b51b6c43226f30e074b473c7f7537 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 16 Apr 2025 07:48:43 -0700 Subject: [PATCH 09/38] Comment out cache because of errors --- .github/workflows/cicd.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 5ea32ac..8cc7a39 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -23,7 +23,7 @@ jobs: uses: actions/setup-python@v3 with: python-version: "3.10" - cache: "pip" + # cache: "pip" - name: Install dependencies run: | python -m pip install --upgrade pip From a047ba1dda1dcdc8db864801a99fd6d5ddac6153 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 16 Apr 2025 08:51:54 -0700 Subject: [PATCH 10/38] add well depth units to sites table output --- backend/record.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/record.py b/backend/record.py index 61d9769..0f8e34b 100644 --- a/backend/record.py +++ b/backend/record.py @@ -161,6 +161,7 @@ class SiteRecord(BaseRecord): "formation", "aquifer", "well_depth", + "well_depth_units", ) defaults: dict = { @@ -178,6 +179,7 @@ class SiteRecord(BaseRecord): "formation": "", "aquifer": "", "well_depth": None, + "well_depth_units": FEET, } From 0edebcd426bb327bd854fb206fec3f6b9df8dcff Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 16 Apr 2025 08:52:12 -0700 Subject: [PATCH 11/38] mypy fixes --- backend/persister.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/backend/persister.py b/backend/persister.py index 09d32e6..9cae709 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -38,7 +38,7 @@ class BasePersister(Loggable): If persisting to a file, the output directory is created by config._make_output_path() """ - add_extension: str = "csv" + extension: str = "csv" def __init__(self): self.records = [] @@ -213,11 +213,7 @@ class GeoJSONPersister(BasePersister): extension = "geojson" def _write(self, path: str, records: list): - feature_collection = { - "type": "FeatureCollection", - "features": [], - } - + features = [ { "type": "Feature", @@ -237,7 +233,10 @@ def _write(self, path: str, records: list): } for record in records ] - feature_collection["features"].extend(features) + feature_collection = { + "type": "FeatureCollection", + "features": features + } with open(path, "w") as f: json.dump(feature_collection, f, indent=4) From dd47369b86672146bf8adf7ab63f285c213efa2f Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 16 Apr 2025 16:28:38 +0000 Subject: [PATCH 12/38] Formatting changes --- backend/persister.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/backend/persister.py b/backend/persister.py index 9cae709..2288dc3 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -213,7 +213,7 @@ class GeoJSONPersister(BasePersister): extension = "geojson" def _write(self, path: str, records: list): - + features = [ { "type": "Feature", @@ -233,10 +233,7 @@ def _write(self, path: str, records: list): } for record in records ] - feature_collection = { - "type": "FeatureCollection", - "features": features - } + feature_collection = {"type": "FeatureCollection", "features": features} with open(path, "w") as f: json.dump(feature_collection, f, indent=4) From 1a0511dc49feb7f87dc427896c53d5c71fe65be7 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 16 Apr 2025 09:57:00 -0700 Subject: [PATCH 13/38] work on caching in workflow --- .github/workflows/cicd.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 77ce402..05fe71e 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -18,10 +18,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python 3.10 - uses: actions/setup-python@v3 + uses: actions/setup-python@v4 with: python-version: "3.10" cache: "pip" From 6d9ea6f9fda7d7e80be6846e6dcbc9660a747bd9 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 16 Apr 2025 14:34:32 -0700 Subject: [PATCH 14/38] Bulk well retrieval --- backend/connectors/nmbgmr/source.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 8700a98..2b2fd62 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -82,19 +82,21 @@ def get_records(self): sites = self._execute_json_request( _make_url("locations"), params, tag="features", timeout=30 ) + pointids = [site["properties"]["point_id"] for site in sites] + wells = self._execute_json_request( + _make_url("wells"), params={"pointid": ",".join(pointids)}, tag="" + ) for site in sites: - print(f"Obtaining well data for {site['properties']['point_id']}") - well_data = self._execute_json_request( - _make_url("wells"), - params={"pointid": site["properties"]["point_id"]}, - tag="", - ) - site["properties"]["formation"] = well_data["formation"] - site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] - site["properties"]["well_depth_units"] = FEET - # site["properties"]["formation"] = None - # site["properties"]["well_depth"] = None - # site["properties"]["well_depth_units"] = FEET + pointid = site["properties"]["point_id"] + well_data = wells.get(pointid) + if well_data: + site["properties"]["formation"] = well_data["formation"] + site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] + site["properties"]["well_depth_units"] = FEET + else: + site["properties"]["formation"] = None + site["properties"]["well_depth"] = None + site["properties"]["well_depth_units"] = None return sites From d25716e794c8921fc188db02587e6c326ed9715d Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 23 Apr 2025 15:07:04 -0600 Subject: [PATCH 15/38] fixes and updates after merging pre-production --- README.md | 7 ++- backend/config.py | 15 ++--- backend/persister.py | 70 ++++++++-------------- backend/unifier.py | 6 +- frontend/cli.py | 104 +++++++++++++++++++++------------ requirements.txt | 5 +- tests/test_cli/__init__.py | 22 +++---- tests/test_sources/__init__.py | 6 +- 8 files changed, 118 insertions(+), 117 deletions(-) diff --git a/README.md b/README.md index 715c1c1..0173df4 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,7 @@ A log of the inputs and processes, called `die.log`, is also saved to the output | formation | geologic formation in which the well terminates | string | N | | aquifer | aquifer from which the well draws water | string | N | | well_depth | depth of well | float | N | +| well_depth_units | units of well depth. Defaults to ft | string | N | **CABQ elevation is calculated as [elevation at top of casing] - [stickup height]; if stickup height < 0 the measuring point is assumed to be beneath the ground surface @@ -214,12 +215,12 @@ die sources {parameter} to print the sources that report that parameter to the terminal. -### Wells [In Development] +### Sites Use ``` -die wells +die sites ``` -to print wells to the terminal. +to export site information only \ No newline at end of file diff --git a/backend/config.py b/backend/config.py index 3808ee7..9c54e7c 100644 --- a/backend/config.py +++ b/backend/config.py @@ -68,6 +68,7 @@ ) from .connectors.usgs.source import NWISSiteSource, NWISWaterLevelSource from .connectors.wqp.source import WQPSiteSource, WQPAnalyteSource, WQPWaterLevelSource +from backend.logger import Loggable SOURCE_DICT = { @@ -143,17 +144,13 @@ class Config(Loggable): output_summary: bool = False output_timeseries_unified: bool = False output_timeseries_separated: bool = False - site_file_type: str = "csv" latest_water_level_only: bool = False analyte_output_units: str = MILLIGRAMS_PER_LITER waterlevel_output_units: str = FEET - # use_csv: bool = True - # use_geojson: bool = False - - output_format: OutputFormat = OutputFormat.CSV + sites_output_format: OutputFormat = OutputFormat.CSV yes: bool = True @@ -161,7 +158,6 @@ def __init__(self, model=None, payload=None, path=None): # need to initialize logger super().__init__() - self.bbox = {} if path: payload = self._load_from_yaml(path) @@ -197,7 +193,7 @@ def __init__(self, model=None, payload=None, path=None): "output_name", "dry", "latest_water_level_only", - "output_format", + "sites_output_format", "use_cloud_storage", "yes"): if attr in payload: @@ -289,7 +285,7 @@ def get_config_and_false_agencies(self): def finalize(self): self._update_output_units() - if self.output_format != OutputFormat.GEOSERVER: + if self.sites_output_format != OutputFormat.GEOSERVER: self.update_output_name() self.make_output_directory() @@ -448,11 +444,10 @@ def _report_attributes(title, attrs): "output_summary", "output_timeseries_unified", "output_timeseries_separated", - "site_file_type", "output_horizontal_datum", "output_elevation_units", "use_cloud_storage", - "output_format" + "sites_output_format" ), ) diff --git a/backend/persister.py b/backend/persister.py index 21c5781..efa76c4 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -130,6 +130,30 @@ def write_csv_file(path, func, records): with open(path, "w", newline="") as f: func(csv.writer(f), records) +def write_sites_geojson_file(path, records): + features = [ + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [ + record.get("longitude"), + record.get("latitude"), + record.get("elevation"), + ], + }, + "properties": { + k: record.get(k) + for k in record.keys + if k not in ["latitude", "longitude", "elevation"] + }, + } + for record in records + ] + feature_collection = {"type": "FeatureCollection", "features": features} + + with open(path, "w") as f: + json.dump(feature_collection, f, indent=4) def write_memory(func, records, output_format=None): f = io.BytesIO() @@ -169,12 +193,6 @@ def dump_sites(filehandle, records, output_format): ) gdf.to_file(filehandle, driver="GeoJSON") - - - - - - class CloudStoragePersister(BasePersister): extension = "csv" _content: list @@ -243,45 +261,7 @@ class GeoJSONPersister(BasePersister): extension = "geojson" def _write(self, path: str, records: list): - - features = [ - { - "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [ - record.get("longitude"), - record.get("latitude"), - record.get("elevation"), - ], - }, - "properties": { - k: record.get(k) - for k in record.keys - if k not in ["latitude", "longitude", "elevation"] - }, - } - for record in records - ] - feature_collection = {"type": "FeatureCollection", "features": features} - - with open(path, "w") as f: - json.dump(feature_collection, f, indent=4) - - def _get_gdal_type(self, dtype): - """ - Map pandas dtypes to GDAL-compatible types for the schema. - """ - if pd.api.types.is_integer_dtype(dtype): - return "int" - elif pd.api.types.is_float_dtype(dtype): - return "float" - elif pd.api.types.is_string_dtype(dtype): - return "str" - elif pd.api.types.is_datetime64_any_dtype(dtype): - return "datetime" - else: - return "str" # Default to string for unsupported types + write_sites_geojson_file(path, records) # class ST2Persister(BasePersister): diff --git a/backend/unifier.py b/backend/unifier.py index 639a823..44c49e0 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -394,14 +394,14 @@ def get_datastreams(): print(si, si.id, ds["@iot.id"]) -if __name__ == "__main__": +# if __name__ == "__main__": # test_waterlevel_unification() # root = logging.getLogger() # root.setLevel(logging.DEBUG) # shandler = logging.StreamHandler() # get_sources(Config()) - setup_logging() - site_unification_test() + # setup_logging() + # site_unification_test() # waterlevel_unification_test() # analyte_unification_test() # print(health_check("nwis")) diff --git a/frontend/cli.py b/frontend/cli.py index c621c0d..d7d3c01 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -17,6 +17,7 @@ import click +from backend import OutputFormat from backend.config import Config from backend.constants import PARAMETER_OPTIONS from backend.unifier import unify_sites, unify_waterlevels, unify_analytes @@ -178,7 +179,7 @@ def cli(): ), ] -PERSISTER_OPTIONS = [ +OUTPUT_DIR_OPTIONS = [ click.option( "--output-dir", default=".", @@ -186,15 +187,25 @@ def cli(): ) ] -SITE_OUTPUT_TYPE_OPTIONS = [ +SITES_OUTPUT_FORMATS = sorted([value for value in OutputFormat]) +SITES_OUTPUT_FORMAT_OPTIONS = [ click.option( - "--site-file-type", - type=click.Choice(["csv", "geojson"]), + "--sites_output_format", + type=click.Choice(SITES_OUTPUT_FORMATS), default="csv", - help="Output file format for sites (csv or geoson). Default is csv", + help=f"Output file format for sites: {SITES_OUTPUT_FORMATS}. Default is csv", ) ] +CONFIG_PATH_OPTIONS = [ + click.option( + "--config-path", + type=click.Path(exists=True), + default=None, + help="Path to config file. Default is config.yaml", + ), +] + def add_options(options): def _add_options(func): @@ -211,14 +222,14 @@ def _add_options(func): type=click.Choice(PARAMETER_OPTIONS, case_sensitive=False), required=True, ) -@add_options(CONFIG_OPTIONS) +@add_options(CONFIG_PATH_OPTIONS) @add_options(OUTPUT_OPTIONS) -@add_options(PERSISTER_OPTIONS) +@add_options(OUTPUT_DIR_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) -@add_options(SITE_OUTPUT_TYPE_OPTIONS) +@add_options(SITES_OUTPUT_FORMAT_OPTIONS) def weave( parameter, config_path, @@ -228,6 +239,7 @@ def weave( end_date, bbox, county, + wkt, no_bernco, no_bor, no_cabq, @@ -235,18 +247,29 @@ def weave( no_nmbgmr_amp, no_nmed_dwb, no_nmose_isc_seven_rivers, + no_nmose_pod, no_nmose_roswell, no_nwis, no_pvacd, no_wqp, site_limit, dry, - yes): + yes, + sites_output_format): """ Get parameter timeseries or summary data """ # instantiate config and set up parameter - config = setup_config(parameter, config_path, bbox, county, site_limit, dry, site_file_type) + config = setup_config( + tag=parameter, + config_path=config_path, + bbox=bbox, + county=county, + wkt=wkt, + site_limit=site_limit, + dry=dry, + sites_output_format=sites_output_format + ) config.parameter = parameter @@ -296,41 +319,48 @@ def weave( if not click.confirm("Do you want to continue?", default=True): return - if parameter.lower() == "waterlevels": - unify_waterlevels(config) - else: - unify_analytes(config) + if parameter.lower() == "waterlevels": + unify_waterlevels(config) + else: + unify_analytes(config) + return config @cli.command() -@add_options(CONFIG_OPTIONS) +@add_options(CONFIG_PATH_OPTIONS) @add_options(SPATIAL_OPTIONS) -@add_options(PERSISTER_OPTIONS) +@add_options(OUTPUT_DIR_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) -def sites(config_path, - bbox, county, - output_dir, - no_bernco, - no_bor, - no_cabq, - no_ebid, - no_nmbgmr_amp, - no_nmed_dwb, - no_nmose_isc_seven_rivers, - no_nmose_roswell, - no_nwis, - no_pvacd, - no_wqp, - site_limit, - dry, - yes): +@add_options(SITES_OUTPUT_FORMAT_OPTIONS) +def sites( + config_path, + bbox, + county, + wkt, + output_dir, + no_bernco, + no_bor, + no_cabq, + no_ebid, + no_nmbgmr_amp, + no_nmed_dwb, + no_nmose_isc_seven_rivers, + no_nmose_pod, + no_nmose_roswell, + no_nwis, + no_pvacd, + no_wqp, + site_limit, + dry, + yes, + sites_output_format +): """ Get sites """ - - config = setup_config("sites", config_path, bbox, county, site_limit, dry) + config = setup_config("sites", config_path, bbox, county, wkt, site_limit, dry, sites_output_format) config_agencies = ["bernco", "bor", "cabq", "ebid", "nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", "wqp", "nmose_pod"] @@ -388,7 +418,7 @@ def sources(sources, bbox, wkt, county): click.echo(s) -def setup_config(tag, config_path, bbox, county, site_limit, dry, site_file_type="csv"): +def setup_config(tag, config_path, bbox, county, wkt, site_limit, dry, sites_output_format=OutputFormat.CSV): config = Config(path=config_path) if county: @@ -408,7 +438,7 @@ def setup_config(tag, config_path, bbox, county, site_limit, dry, site_file_type config.site_limit = None config.dry = dry - config.site_file_type = site_file_type + config.sites_output_format = sites_output_format return config diff --git a/requirements.txt b/requirements.txt index 4e9f7c5..6aea17c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,7 @@ geopandas frost_sta_client google-cloud-storage pytest -urllib3>=2.2.0,<3.0.0 \ No newline at end of file +urllib3>=2.2.0,<3.0.0 +Geoalchemy2 +sqlalchemy +psycopg2 \ No newline at end of file diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 0903403..12c3084 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -50,7 +50,7 @@ def _test_weave( self, parameter: str, output: str, - site_output_type: str = "csv", + sites_output_format: str = "csv", site_limit: int = 4, start_date: str = "1990-08-10", end_date: str = "1990-08-11", @@ -91,20 +91,15 @@ def _test_weave( start_date, "--end-date", end_date, + "--sites_output_format", + sites_output_format ] - if site_output_type == "csv": - arguments.append("--site-file-type") - arguments.append(site_output_type) - elif site_output_type == "geojson": - arguments.append("--site-file-type") - arguments.append(site_output_type) - if geographic_filter_name and geographic_filter_value: arguments.extend([f"--{geographic_filter_name}", geographic_filter_value]) arguments.extend(no_agencies) - + # Act result = self.runner.invoke(weave, arguments, standalone_mode=False) @@ -176,10 +171,7 @@ def _test_weave( assert getattr(config, _geographic_filter_name) == "" # 9 - if site_output_type == "csv": - assert getattr(config, "site_file_type") == "csv" - elif site_output_type == "geojson": - assert getattr(config, "site_file_type") == "geojson" + assert getattr(config, "sites_output_format") == sites_output_format def test_weave_summary(self): self._test_weave(parameter=WATERLEVELS, output="summary") @@ -192,12 +184,12 @@ def test_weave_timeseries_separated(self): def test_weave_csv(self): self._test_weave( - parameter=WATERLEVELS, output="summary", site_output_type="csv" + parameter=WATERLEVELS, output="summary", sites_output_format="csv" ) def test_weave_geojson(self): self._test_weave( - parameter=WATERLEVELS, output="summary", site_output_type="geojson" + parameter=WATERLEVELS, output="summary", sites_output_format="geojson" ) def test_weave_bbox(self): diff --git a/tests/test_sources/__init__.py b/tests/test_sources/__init__.py index 78886bc..5e33ec0 100644 --- a/tests/test_sources/__init__.py +++ b/tests/test_sources/__init__.py @@ -162,7 +162,7 @@ def test_summary_csv(self): def test_summary_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_summary = True - self.config.site_file_type = "geojson" + self.config.sites_output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ @@ -191,7 +191,7 @@ def test_timeseries_unified_csv(self): def test_timeseries_unified_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_unified = True - self.config.site_file_type = "geojson" + self.config.sites_output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ @@ -228,7 +228,7 @@ def test_timeseries_separated_csv(self): def test_timeseries_separated_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_separated = True - self.config.site_file_type = "geojson" + self.config.sites_output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ From 8ce5bb97ed79352fb42f8a4000d7a625bcfd4d5a Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 23 Apr 2025 21:08:19 +0000 Subject: [PATCH 16/38] Formatting changes --- backend/__init__.py | 2 +- backend/config.py | 38 +++++----- backend/connectors/nmose/source.py | 8 ++- backend/connectors/nmose/transformer.py | 2 +- backend/persister.py | 50 +++++++------ backend/persisters/geoserver.py | 95 +++++++++++++++++------- backend/unifier.py | 22 +++--- frontend/cli.py | 96 +++++++++++++++---------- tests/test_cli/__init__.py | 4 +- 9 files changed, 201 insertions(+), 116 deletions(-) diff --git a/backend/__init__.py b/backend/__init__.py index 2034a38..06a630a 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -4,4 +4,4 @@ class OutputFormat(str, Enum): GEOJSON = "geojson" CSV = "csv" - GEOSERVER = "geoserver" \ No newline at end of file + GEOSERVER = "geoserver" diff --git a/backend/config.py b/backend/config.py index 9c54e7c..f6d8642 100644 --- a/backend/config.py +++ b/backend/config.py @@ -99,9 +99,6 @@ def get_source(source): return klass() - - - class Config(Loggable): site_limit: int = 0 dry: bool = False @@ -183,24 +180,29 @@ def __init__(self, model=None, payload=None, path=None): if value is not None: setattr(self, f"use_source_{sk}", value) - for attr in ("wkt", "county", "bbox", - "output_summary", - "output_timeseries_unified", - "output_timeseries_separated", - "start_date", - "end_date", - "parameter", - "output_name", - "dry", - "latest_water_level_only", - "sites_output_format", - "use_cloud_storage", - "yes"): + for attr in ( + "wkt", + "county", + "bbox", + "output_summary", + "output_timeseries_unified", + "output_timeseries_separated", + "start_date", + "end_date", + "parameter", + "output_name", + "dry", + "latest_water_level_only", + "sites_output_format", + "use_cloud_storage", + "yes", + ): if attr in payload: setattr(self, attr, payload[attr]) def _load_from_yaml(self, path): import yaml + path = os.path.abspath(path) if os.path.exists(path): self.log(f"Loading config from {path}") @@ -447,7 +449,7 @@ def _report_attributes(title, attrs): "output_horizontal_datum", "output_elevation_units", "use_cloud_storage", - "sites_output_format" + "sites_output_format", ), ) @@ -570,4 +572,6 @@ def output_path(self): def get(self, attr): if self._payload: return self._payload.get(attr) + + # ============= EOF ============================================= diff --git a/backend/connectors/nmose/source.py b/backend/connectors/nmose/source.py index ad180ce..5def1bf 100644 --- a/backend/connectors/nmose/source.py +++ b/backend/connectors/nmose/source.py @@ -41,9 +41,11 @@ def get_records(self, *args, **kw) -> List[Dict]: "https://services2.arcgis.com/qXZbWTdPDbTjl7Dy/arcgis/rest/services/OSE_PODs/FeatureServer/0/query" ) - params['where'] = "pod_status = 'ACT' AND pod_basin NOT IN ('SP', 'SD', 'LWD')" - params["outFields"] = ("OBJECTID,pod_basin,pod_status,easting,northing,datum,utm_accura,status,county" - "pod_name,pod_nbr,pod_suffix,pod_file,depth_well,aquifer,elevation") + params["where"] = "pod_status = 'ACT' AND pod_basin NOT IN ('SP', 'SD', 'LWD')" + params["outFields"] = ( + "OBJECTID,pod_basin,pod_status,easting,northing,datum,utm_accura,status,county" + "pod_name,pod_nbr,pod_suffix,pod_file,depth_well,aquifer,elevation" + ) params["outSR"] = 4326 params["f"] = "json" diff --git a/backend/connectors/nmose/transformer.py b/backend/connectors/nmose/transformer.py index e3a2b64..8f26ebb 100644 --- a/backend/connectors/nmose/transformer.py +++ b/backend/connectors/nmose/transformer.py @@ -24,7 +24,7 @@ def _transform(self, record) -> dict: # "name": record["station_nm"], "latitude": geometry["y"], "longitude": geometry["x"], - "elevation": properties['elevation'], + "elevation": properties["elevation"], "elevation_units": "ft", # "horizontal_datum": datum, # "vertical_datum": record["alt_datum_cd"], diff --git a/backend/persister.py b/backend/persister.py index efa76c4..327202c 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -130,31 +130,33 @@ def write_csv_file(path, func, records): with open(path, "w", newline="") as f: func(csv.writer(f), records) + def write_sites_geojson_file(path, records): features = [ - { - "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [ - record.get("longitude"), - record.get("latitude"), - record.get("elevation"), - ], - }, - "properties": { - k: record.get(k) - for k in record.keys - if k not in ["latitude", "longitude", "elevation"] - }, - } - for record in records - ] + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [ + record.get("longitude"), + record.get("latitude"), + record.get("elevation"), + ], + }, + "properties": { + k: record.get(k) + for k in record.keys + if k not in ["latitude", "longitude", "elevation"] + }, + } + for record in records + ] feature_collection = {"type": "FeatureCollection", "features": features} with open(path, "w") as f: json.dump(feature_collection, f, indent=4) + def write_memory(func, records, output_format=None): f = io.BytesIO() func(f, records, output_format) @@ -193,6 +195,7 @@ def dump_sites(filehandle, records, output_format): ) gdf.to_file(filehandle, driver="GeoJSON") + class CloudStoragePersister(BasePersister): extension = "csv" _content: list @@ -224,12 +227,19 @@ def finalize(self, output_name: str): else: path, cnt = self._content[0] - #this is a hack. need a better way to specify the output path + # this is a hack. need a better way to specify the output path dirname = os.path.basename(os.path.dirname(path)) path = os.path.join(dirname, os.path.basename(path)) blob = bucket.blob(path) - blob.upload_from_string(cnt, content_type="application/json" if self.config.output_format == OutputFormat.GEOJSON else "text/csv") + blob.upload_from_string( + cnt, + content_type=( + "application/json" + if self.config.output_format == OutputFormat.GEOJSON + else "text/csv" + ), + ) def _make_output_directory(self, output_directory: str): # prevent making root directory, because we are not saving to disk diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index bdfb0c8..9fc261d 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -16,10 +16,22 @@ from backend.persister import BasePersister -from sqlalchemy import Column, ForeignKey, create_engine, UUID, String, Integer, Float, Date, Time +from sqlalchemy import ( + Column, + ForeignKey, + create_engine, + UUID, + String, + Integer, + Float, + Date, + Time, +) from geoalchemy2 import Geometry Base = declarative_base() + + # dbname=db.get('dbname'), # user=db.get('user'), # password=db.get('password'), @@ -101,8 +113,8 @@ def __init__(self, *args, **kwargs): def dump_sites(self, path: str): if self.sites: - db = self.config.get('geoserver').get('db') - dbname = db.get('db_name') + db = self.config.get("geoserver").get("db") + dbname = db.get("db_name") self.log(f"dumping sites to {dbname}") self._write_to_sites(self.sites) else: @@ -110,8 +122,8 @@ def dump_sites(self, path: str): def dump_summary(self, path: str): if self.records: - db = self.config.get('geoserver').get('db') - dbname = db.get('db_name') + db = self.config.get("geoserver").get("db") + dbname = db.get("db_name") self.log(f"dumping summary to {dbname}") self._write_to_summary(self.records) else: @@ -121,22 +133,38 @@ def _connect(self): """ Connect to a PostgreSQL database on Cloud SQL. """ - sf = session_factory(self.config.get('geoserver').get('db')) + sf = session_factory(self.config.get("geoserver").get("db")) self._connection = sf() def _write_sources(self, records: list): sources = {r.source for r in records} with self._connection as conn: - sql = insert(Sources).values([{"name": source} for source in sources]).on_conflict_do_nothing( - index_elements=[Sources.name],) + sql = ( + insert(Sources) + .values([{"name": source} for source in sources]) + .on_conflict_do_nothing( + index_elements=[Sources.name], + ) + ) conn.execute(sql) conn.commit() def _write_parameters(self): with self._connection as conn: - sql = insert(Parameters).values([{"name": self.config.parameter, - "units": self.config.analyte_output_units}]).on_conflict_do_nothing( - index_elements=[Parameters.name],) + sql = ( + insert(Parameters) + .values( + [ + { + "name": self.config.parameter, + "units": self.config.analyte_output_units, + } + ] + ) + .on_conflict_do_nothing( + index_elements=[Parameters.name], + ) + ) print(sql) conn.execute(sql) conn.commit() @@ -146,7 +174,14 @@ def _write_to_summary(self, records: list): self._write_parameters() for r in records: print(r, [r.to_dict()]) - keys = ["usgs_site_id", "alternate_site_id", "formation", "aquifer", "well_depth"] + keys = [ + "usgs_site_id", + "alternate_site_id", + "formation", + "aquifer", + "well_depth", + ] + def make_stmt(chunk): values = [ { @@ -165,7 +200,9 @@ def make_stmt(chunk): "latest_time": record.latest_time if record.latest_time else None, "earliest_value": record.earliest_value, "earliest_date": record.earliest_date, - "earliest_time": record.earliest_time if record.earliest_time else None, + "earliest_time": ( + record.earliest_time if record.earliest_time else None + ), } for record in chunk ] @@ -173,15 +210,17 @@ def make_stmt(chunk): linsert = insert(Summary) return linsert.values(values).on_conflict_do_update( index_elements=[Summary.data_source_uid], - set_={"properties": linsert.excluded.properties} + set_={"properties": linsert.excluded.properties}, ) self._chunk_insert(make_stmt, records) def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10): for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") + chunk = records[i : i + chunk_size] + print( + f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}" + ) st = time.time() stmt = make_stmt(chunk) @@ -189,7 +228,7 @@ def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10): conn.execute(stmt) conn.commit() - print('Chunk write time:', time.time() - st) + print("Chunk write time:", time.time() - st) def _write_to_sites(self, records: list): """ @@ -198,7 +237,13 @@ def _write_to_sites(self, records: list): self._write_sources(records) - keys = ["usgs_site_id", "alternate_site_id", "formation", "aquifer", "well_depth"] + keys = [ + "usgs_site_id", + "alternate_site_id", + "formation", + "aquifer", + "well_depth", + ] chunk_size = 1000 # Larger chunk size for fewer commits def make_stmt(chunk): @@ -215,7 +260,7 @@ def make_stmt(chunk): linsert = insert(Location) stmt = linsert.values(values).on_conflict_do_update( index_elements=[Location.data_source_uid], - set_={"properties": linsert.excluded.properties} + set_={"properties": linsert.excluded.properties}, ) return stmt @@ -268,11 +313,11 @@ def make_stmt(chunk): # # print('Chunk write time:', time.time() - st) - # # Pre-serialize properties to reduce processing time - # values = [ - # (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source) - # for record in chunk - # ] + # # Pre-serialize properties to reduce processing time + # values = [ + # (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source) + # for record in chunk + # ] # # with self._connection.cursor() as cursor: # sql = """INSERT INTO public.tbl_location (name, properties, geometry, source_slug) @@ -283,4 +328,6 @@ def make_stmt(chunk): # self._connection.commit() # Commit once per chunk # print('Chunk write time:', time.time() - st) # break + + # ============= EOF ============================================= diff --git a/backend/unifier.py b/backend/unifier.py index 44c49e0..5a1d6ae 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -395,16 +395,16 @@ def get_datastreams(): # if __name__ == "__main__": - # test_waterlevel_unification() - # root = logging.getLogger() - # root.setLevel(logging.DEBUG) - # shandler = logging.StreamHandler() - # get_sources(Config()) - # setup_logging() - # site_unification_test() - # waterlevel_unification_test() - # analyte_unification_test() - # print(health_check("nwis")) - # generate_site_bounds() +# test_waterlevel_unification() +# root = logging.getLogger() +# root.setLevel(logging.DEBUG) +# shandler = logging.StreamHandler() +# get_sources(Config()) +# setup_logging() +# site_unification_test() +# waterlevel_unification_test() +# analyte_unification_test() +# print(health_check("nwis")) +# generate_site_bounds() # ============= EOF ============================================= diff --git a/frontend/cli.py b/frontend/cli.py index d7d3c01..e8ac8c2 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -231,31 +231,32 @@ def _add_options(func): @add_options(DEBUG_OPTIONS) @add_options(SITES_OUTPUT_FORMAT_OPTIONS) def weave( - parameter, - config_path, - output, - output_dir, - start_date, - end_date, - bbox, - county, - wkt, - no_bernco, - no_bor, - no_cabq, - no_ebid, - no_nmbgmr_amp, - no_nmed_dwb, - no_nmose_isc_seven_rivers, - no_nmose_pod, - no_nmose_roswell, - no_nwis, - no_pvacd, - no_wqp, - site_limit, - dry, - yes, - sites_output_format): + parameter, + config_path, + output, + output_dir, + start_date, + end_date, + bbox, + county, + wkt, + no_bernco, + no_bor, + no_cabq, + no_ebid, + no_nmbgmr_amp, + no_nmed_dwb, + no_nmose_isc_seven_rivers, + no_nmose_pod, + no_nmose_roswell, + no_nwis, + no_pvacd, + no_wqp, + site_limit, + dry, + yes, + sites_output_format, +): """ Get parameter timeseries or summary data """ @@ -268,9 +269,9 @@ def weave( wkt=wkt, site_limit=site_limit, dry=dry, - sites_output_format=sites_output_format - ) - + sites_output_format=sites_output_format, + ) + config.parameter = parameter # output type @@ -303,7 +304,7 @@ def weave( lcs = locals() if config_agencies: for agency in config_agencies: - setattr(config, f"use_source_{agency}", lcs.get(f'no_{agency}', False)) + setattr(config, f"use_source_{agency}", lcs.get(f"no_{agency}", False)) # dates config.start_date = start_date config.end_date = end_date @@ -325,7 +326,6 @@ def weave( unify_analytes(config) return config - @cli.command() @add_options(CONFIG_PATH_OPTIONS) @@ -355,20 +355,33 @@ def sites( site_limit, dry, yes, - sites_output_format + sites_output_format, ): """ Get sites """ - config = setup_config("sites", config_path, bbox, county, wkt, site_limit, dry, sites_output_format) - config_agencies = ["bernco", "bor", "cabq", "ebid", "nmbgmr_amp", "nmed_dwb", - "nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd", - "wqp", "nmose_pod"] + config = setup_config( + "sites", config_path, bbox, county, wkt, site_limit, dry, sites_output_format + ) + config_agencies = [ + "bernco", + "bor", + "cabq", + "ebid", + "nmbgmr_amp", + "nmed_dwb", + "nmose_isc_seven_rivers", + "nmose_roswell", + "nwis", + "pvacd", + "wqp", + "nmose_pod", + ] if config_path is None: lcs = locals() for agency in config_agencies: - setattr(config, f"use_source_{agency}", lcs.get(f'no_{agency}', False)) + setattr(config, f"use_source_{agency}", lcs.get(f"no_{agency}", False)) config.output_dir = output_dir config.sites_only = True @@ -418,7 +431,16 @@ def sources(sources, bbox, wkt, county): click.echo(s) -def setup_config(tag, config_path, bbox, county, wkt, site_limit, dry, sites_output_format=OutputFormat.CSV): +def setup_config( + tag, + config_path, + bbox, + county, + wkt, + site_limit, + dry, + sites_output_format=OutputFormat.CSV, +): config = Config(path=config_path) if county: diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 12c3084..1cb0ba6 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -92,14 +92,14 @@ def _test_weave( "--end-date", end_date, "--sites_output_format", - sites_output_format + sites_output_format, ] if geographic_filter_name and geographic_filter_value: arguments.extend([f"--{geographic_filter_name}", geographic_filter_value]) arguments.extend(no_agencies) - + # Act result = self.runner.invoke(weave, arguments, standalone_mode=False) From cf2c0b3d3b5b0d14dbc20d404c79b1921d61670a Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 10:46:00 -0600 Subject: [PATCH 17/38] integration and unification of dev branches --- backend/config.py | 8 +- backend/persister.py | 190 +++++++++++++-------------------- backend/unifier.py | 122 ++++++++++----------- frontend/cli.py | 42 ++++---- tests/test_cli/__init__.py | 71 ++++++------ tests/test_sources/__init__.py | 8 +- 6 files changed, 197 insertions(+), 244 deletions(-) diff --git a/backend/config.py b/backend/config.py index f6d8642..6dd2e0e 100644 --- a/backend/config.py +++ b/backend/config.py @@ -147,7 +147,7 @@ class Config(Loggable): analyte_output_units: str = MILLIGRAMS_PER_LITER waterlevel_output_units: str = FEET - sites_output_format: OutputFormat = OutputFormat.CSV + output_format: str = OutputFormat.CSV yes: bool = True @@ -193,7 +193,7 @@ def __init__(self, model=None, payload=None, path=None): "output_name", "dry", "latest_water_level_only", - "sites_output_format", + "output_format", "use_cloud_storage", "yes", ): @@ -287,7 +287,7 @@ def get_config_and_false_agencies(self): def finalize(self): self._update_output_units() - if self.sites_output_format != OutputFormat.GEOSERVER: + if self.output_format != OutputFormat.GEOSERVER: self.update_output_name() self.make_output_directory() @@ -449,7 +449,7 @@ def _report_attributes(title, attrs): "output_horizontal_datum", "output_elevation_units", "use_cloud_storage", - "sites_output_format", + "output_format", ), ) diff --git a/backend/persister.py b/backend/persister.py index 327202c..b985e7c 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -32,7 +32,65 @@ try: from google.cloud import storage except ImportError: - print("google cloud storage not available") + print("google cloud storage not available") + + +def write_memory(func, records, output_format=None): + f = io.BytesIO() + func(f, records, output_format) + return f.getvalue() + + +def dump_timeseries(path, timeseries: list[list]): + """ + Dumps timeseries records to a CSV file. The timeseries must be a list of + lists, where each inner list contains the records for a single site. In the case + of timeseries separated, the inner list will contain the records for a single site + and this function will be called multiple times, once for each site. + """ + with open(path, "w", newline="") as f: + writer = csv.writer(f) + headers_have_not_been_written = True + for i, records in enumerate(timeseries): + for record in records: + if i == 0 and headers_have_not_been_written: + writer.writerow(record.keys) + headers_have_not_been_written = False + writer.writerow(record.to_row()) + + +def dump_sites_summary(path, records, output_format: OutputFormat): + if output_format == OutputFormat.CSV: + with open(path, "w", newline="") as f: + writer = csv.writer(f) + for i, site in enumerate(records): + if i == 0: + writer.writerow(site.keys) + writer.writerow(site.to_row()) + else: + features = [ + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [ + getattr(record, "longitude"), + getattr(record, "latitude"), + getattr(record, "elevation"), + ], + }, + "properties": { + k: getattr(record, k) + for k in record.keys + if k not in ["latitude", "longitude", "elevation"] + }, + } + for record in records + ] + feature_collection = {"type": "FeatureCollection", "features": features} + + with open(path, "w") as f: + json.dump(feature_collection, f, indent=4) class BasePersister(Loggable): @@ -40,9 +98,6 @@ class BasePersister(Loggable): Class to persist the data to a file or cloud storage. If persisting to a file, the output directory is created by config._make_output_path() """ - - extension: str = "csv" - def __init__(self, config=None): self.records = [] self.timeseries = [] @@ -61,25 +116,25 @@ def finalize(self, output_name: str): def dump_sites(self, path: str): if self.sites: path = os.path.join(path, "sites") - path = self.add_extension(path) + path = self.add_extension(path, self.config.output_format) self.log(f"dumping sites to {os.path.abspath(path)}") - self._write(path, self.sites) + self._dump_sites_summary(path, self.sites, self.config.output_format) else: self.log("no sites to dump", fg="red") def dump_summary(self, path: str): if self.records: path = os.path.join(path, "summary") - path = self.add_extension(path) + path = self.add_extension(path, self.config.output_format) self.log(f"dumping summary to {os.path.abspath(path)}") - self._write(path, self.records) + self._dump_sites_summary(path, self.records, self.config.output_format) else: self.log("no records to dump", fg="red") def dump_timeseries_unified(self, path: str): if self.timeseries: path = os.path.join(path, "timeseries_unified") - path = self.add_extension(path) + path = self.add_extension(path, OutputFormat.CSV) self.log(f"dumping unified timeseries to {os.path.abspath(path)}") self._dump_timeseries(path, self.timeseries) else: @@ -94,7 +149,7 @@ def dump_timeseries_separated(self, path: str): for records in self.timeseries: site_id = records[0].id path = os.path.join(timeseries_path, str(site_id).replace(" ", "_")) - path = self.add_extension(path) + path = self.add_extension(path, OutputFormat.CSV) self.log(f"dumping {site_id} to {os.path.abspath(path)}") list_of_records = [records] @@ -102,100 +157,26 @@ def dump_timeseries_separated(self, path: str): else: self.log("no timeseries records to dump", fg="red") - def add_extension(self, path: str): - if not self.extension: + def add_extension(self, path: str, extension: OutputFormat): + if not extension: raise NotImplementedError - - ext = self.extension - if self.config.output_format == OutputFormat.CSV: - ext = "csv" - elif self.config.output_format == OutputFormat.GEOJSON: - ext = "geojson" + else: + ext = extension if not path.endswith(ext): path = f"{path}.{ext}" return path - def _write(self, path: str, records): - raise NotImplementedError + def _dump_sites_summary(self, path: str, records: list, output_format: OutputFormat): + dump_sites_summary(path, records, output_format) def _dump_timeseries(self, path: str, timeseries: list): - raise NotImplementedError + dump_timeseries(path, timeseries) def _make_output_directory(self, output_directory: str): os.mkdir(output_directory) -def write_csv_file(path, func, records): - with open(path, "w", newline="") as f: - func(csv.writer(f), records) - - -def write_sites_geojson_file(path, records): - features = [ - { - "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [ - record.get("longitude"), - record.get("latitude"), - record.get("elevation"), - ], - }, - "properties": { - k: record.get(k) - for k in record.keys - if k not in ["latitude", "longitude", "elevation"] - }, - } - for record in records - ] - feature_collection = {"type": "FeatureCollection", "features": features} - - with open(path, "w") as f: - json.dump(feature_collection, f, indent=4) - - -def write_memory(func, records, output_format=None): - f = io.BytesIO() - func(f, records, output_format) - return f.getvalue() - - -def dump_timeseries(writer, timeseries: list[list]): - """ - Dumps timeseries records to a CSV file. The timeseries must be a list of - lists, where each inner list contains the records for a single site. In the case - of timeseries separated, the inner list will contain the records for a single site - and this function will be called multiple times, once for each site. - """ - headers_have_not_been_written = True - for i, records in enumerate(timeseries): - for record in records: - if i == 0 and headers_have_not_been_written: - writer.writerow(record.keys) - headers_have_not_been_written = False - writer.writerow(record.to_row()) - - -def dump_sites(filehandle, records, output_format): - if output_format == OutputFormat.CSV: - writer = csv.writer(filehandle) - for i, site in enumerate(records): - if i == 0: - writer.writerow(site.keys) - writer.writerow(site.to_row()) - else: - r0 = records[0] - df = pd.DataFrame([r.to_row() for r in records], columns=r0.keys) - - gdf = gpd.GeoDataFrame( - df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326" - ) - gdf.to_file(filehandle, driver="GeoJSON") - - class CloudStoragePersister(BasePersister): extension = "csv" _content: list @@ -245,35 +226,18 @@ def _make_output_directory(self, output_directory: str): # prevent making root directory, because we are not saving to disk pass - def _write(self, path: str, records: list): - content = write_memory(dump_sites, records, self.config.output_format) - self._add_content(path, content) - def _add_content(self, path: str, content: str): self._content.append((path, content)) + def _dump_sites_summary(self, path: str, records: list): + content = write_memory(dump_sites_summary, records, self.config.output_format) + self._add_content(path, content) + + def _dump_timeseries_unified(self, path: str, timeseries: list): content = write_memory(path, dump_timeseries, timeseries) self._add_content(path, content) - -class CSVPersister(BasePersister): - extension = "csv" - - def _write(self, path: str, records: list): - write_csv_file(path, dump_sites, records) - - def _dump_timeseries(self, path: str, timeseries: list): - write_csv_file(path, dump_timeseries, timeseries) - - -class GeoJSONPersister(BasePersister): - extension = "geojson" - - def _write(self, path: str, records: list): - write_sites_geojson_file(path, records) - - # class ST2Persister(BasePersister): # extension = "st2" # diff --git a/backend/unifier.py b/backend/unifier.py index 5a1d6ae..15eef07 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -18,7 +18,7 @@ from backend.config import Config, get_source, OutputFormat from backend.logger import setup_logging from backend.constants import WATERLEVELS -from backend.persister import CSVPersister, GeoJSONPersister, CloudStoragePersister +from backend.persister import BasePersister from backend.persisters.geoserver import GeoServerPersister from backend.source import BaseSiteSource @@ -79,36 +79,36 @@ def unify_sites(config): return True -def _perister_factory(config): - """ - Determines the type of persister to use based on the configuration. The - persister types are: +# def _perister_factory(config): +# """ +# Determines the type of persister to use based on the configuration. The +# persister types are: - - CSVPersister - - CloudStoragePersister - - GeoJSONPersister +# - CSVPersister +# - CloudStoragePersister +# - GeoJSONPersister - Parameters - ------- - config: Config - The configuration object +# Parameters +# ------- +# config: Config +# The configuration object - Returns - ------- - Persister - The persister object to use - """ - persister_klass = CSVPersister - if config.use_cloud_storage: - persister_klass = CloudStoragePersister - elif config.output_format == OutputFormat.CSV: - persister_klass = CSVPersister - elif config.output_format == OutputFormat.GEOJSON: - persister_klass = GeoJSONPersister - elif config.output_format == OutputFormat.GEOSERVER: - persister_klass = GeoServerPersister +# Returns +# ------- +# Persister +# The persister object to use +# """ +# persister_klass = CSVPersister +# if config.use_cloud_storage: +# persister_klass = CloudStoragePersister +# elif config.output_format == OutputFormat.CSV: +# persister_klass = CSVPersister +# elif config.output_format == OutputFormat.GEOJSON: +# persister_klass = GeoJSONPersister +# elif config.output_format == OutputFormat.GEOSERVER: +# persister_klass = GeoServerPersister - return persister_klass(config) +# return persister_klass(config) # def _unify_wrapper(config, func): @@ -118,7 +118,7 @@ def _perister_factory(config): def _site_wrapper( - site_source, parameter_source, sites_summary_persister, timeseries_persister, config + site_source, parameter_source, persister, config ): try: @@ -147,7 +147,7 @@ def _site_wrapper( first_flag = True if config.sites_only: - sites_summary_persister.sites.extend(sites) + persister.sites.extend(sites) else: for site_records in site_source.chunks(sites): if type(site_records) == list: @@ -164,7 +164,7 @@ def _site_wrapper( site_records, use_summarize, start_ind, end_ind ) if summary_records: - sites_summary_persister.records.extend(summary_records) + persister.records.extend(summary_records) sites_with_records_count += len(summary_records) else: continue @@ -181,49 +181,35 @@ def _site_wrapper( sites_with_records_count += len(results) for site, records in results: - timeseries_persister.timeseries.append(records) - sites_summary_persister.sites.append(site) + persister.timeseries.append(records) + persister.sites.append(site) if site_limit: - # print( - # "sites_with_records_count:", - # sites_with_records_count, - # "|", - # "site_limit:", - # site_limit, - # "|", - # "chunk_size:", - # site_source.chunk_size, - # ) - if sites_with_records_count >= site_limit: # remove any extra sites that were gathered. removes 0 if site_limit is not exceeded num_sites_to_remove = sites_with_records_count - site_limit - # print( - # f"removing {num_sites_to_remove} to avoid exceeding the site limit" - # ) # if sites_with_records_count == sit_limit then num_sites_to_remove = 0 # and calling list[:0] will retur an empty list, so subtract # num_sites_to_remove from the length of the list # to remove the last num_sites_to_remove sites if use_summarize: - sites_summary_persister.records = ( - sites_summary_persister.records[ - : len(sites_summary_persister.records) + persister.records = ( + persister.records[ + : len(persister.records) - num_sites_to_remove ] ) else: - timeseries_persister.timeseries = ( - timeseries_persister.timeseries[ - : len(timeseries_persister.timeseries) + persister.timeseries = ( + persister.timeseries[ + : len(persister.timeseries) - num_sites_to_remove ] ) - sites_summary_persister.sites = ( - sites_summary_persister.sites[ - : len(sites_summary_persister.sites) + persister.sites = ( + persister.sites[ + : len(persister.sites) - num_sites_to_remove ] ) @@ -241,30 +227,32 @@ def _unify_parameter( config, sources, ): - sites_summary_persister = _perister_factory(config) - timeseries_persister = CSVPersister() + + if config.output_format == OutputFormat.GEOSERVER: + persister = GeoServerPersister(config) + else: + persister = BasePersister(config) + for site_source, parameter_source in sources: _site_wrapper( site_source, parameter_source, - sites_summary_persister, - timeseries_persister, + persister, config, ) if config.output_summary: - sites_summary_persister.dump_summary(config.output_path) + persister.dump_summary(config.output_path) elif config.output_timeseries_unified: - timeseries_persister.dump_timeseries_unified(config.output_path) - sites_summary_persister.dump_sites(config.output_path) + persister.dump_timeseries_unified(config.output_path) + persister.dump_sites(config.output_path) elif config.sites_only: - sites_summary_persister.dump_sites(config.output_path) + persister.dump_sites(config.output_path) else: # config.output_timeseries_separated - timeseries_persister.dump_timeseries_separated(config.output_path) - sites_summary_persister.dump_sites(config.output_path) + persister.dump_timeseries_separated(config.output_path) + persister.dump_sites(config.output_path) - timeseries_persister.finalize(config.output_name) - sites_summary_persister.finalize(config.output_name) + persister.finalize(config.output_name) def get_sources_in_polygon(polygon): diff --git a/frontend/cli.py b/frontend/cli.py index e8ac8c2..879e5d3 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -170,9 +170,9 @@ def cli(): help="End date in the form 'YYYY', 'YYYY-MM', 'YYYY-MM-DD', 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'", ), ] -OUTPUT_OPTIONS = [ +OUTPUT_TYPE_OPTIONS = [ click.option( - "--output", + "--output-type", type=click.Choice(["summary", "timeseries_unified", "timeseries_separated"]), required=True, help="Output summary file, single unified timeseries file, or separated timeseries files", @@ -187,13 +187,13 @@ def cli(): ) ] -SITES_OUTPUT_FORMATS = sorted([value for value in OutputFormat]) -SITES_OUTPUT_FORMAT_OPTIONS = [ +OUTPUT_FORMATS = sorted([value for value in OutputFormat]) +OUTPUT_FORMAT_OPTIONS = [ click.option( - "--sites_output_format", - type=click.Choice(SITES_OUTPUT_FORMATS), + "--output-format", + type=click.Choice(OUTPUT_FORMATS), default="csv", - help=f"Output file format for sites: {SITES_OUTPUT_FORMATS}. Default is csv", + help=f"Output file format for sites: {OUTPUT_FORMATS}. Default is csv", ) ] @@ -223,17 +223,17 @@ def _add_options(func): required=True, ) @add_options(CONFIG_PATH_OPTIONS) -@add_options(OUTPUT_OPTIONS) +@add_options(OUTPUT_TYPE_OPTIONS) @add_options(OUTPUT_DIR_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) -@add_options(SITES_OUTPUT_FORMAT_OPTIONS) +@add_options(OUTPUT_FORMAT_OPTIONS) def weave( parameter, config_path, - output, + output_type, output_dir, start_date, end_date, @@ -255,7 +255,7 @@ def weave( site_limit, dry, yes, - sites_output_format, + output_format, ): """ Get parameter timeseries or summary data @@ -269,26 +269,26 @@ def weave( wkt=wkt, site_limit=site_limit, dry=dry, - sites_output_format=sites_output_format, + output_format=output_format, ) config.parameter = parameter # output type - if output == "summary": + if output_type == "summary": summary = True timeseries_unified = False timeseries_separated = False - elif output == "timeseries_unified": + elif output_type == "timeseries_unified": summary = False timeseries_unified = True timeseries_separated = False - elif output == "timeseries_separated": + elif output_type == "timeseries_separated": summary = False timeseries_unified = False timeseries_separated = True else: - click.echo(f"Invalid output type: {output}") + click.echo(f"Invalid output type: {output_type}") return config.output_summary = summary @@ -333,7 +333,7 @@ def weave( @add_options(OUTPUT_DIR_OPTIONS) @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) -@add_options(SITES_OUTPUT_FORMAT_OPTIONS) +@add_options(OUTPUT_FORMAT_OPTIONS) def sites( config_path, bbox, @@ -355,13 +355,13 @@ def sites( site_limit, dry, yes, - sites_output_format, + output_format, ): """ Get sites """ config = setup_config( - "sites", config_path, bbox, county, wkt, site_limit, dry, sites_output_format + "sites", config_path, bbox, county, wkt, site_limit, dry, output_format ) config_agencies = [ "bernco", @@ -439,7 +439,7 @@ def setup_config( wkt, site_limit, dry, - sites_output_format=OutputFormat.CSV, + output_format=OutputFormat.CSV, ): config = Config(path=config_path) @@ -460,7 +460,7 @@ def setup_config( config.site_limit = None config.dry = dry - config.sites_output_format = sites_output_format + config.output_format = output_format.value return config diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 1cb0ba6..b6e3f6a 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -49,8 +49,8 @@ def setup(self): def _test_weave( self, parameter: str, - output: str, - sites_output_format: str = "csv", + output_type: str, + output_format: str = "csv", site_limit: int = 4, start_date: str = "1990-08-10", end_date: str = "1990-08-11", @@ -82,8 +82,8 @@ def _test_weave( arguments = [ parameter, - "--output", - output, + "--output-type", + output_type, "--dry", "--site-limit", str(site_limit), @@ -91,8 +91,8 @@ def _test_weave( start_date, "--end-date", end_date, - "--sites_output_format", - sites_output_format, + "--output-format", + output_format, ] if geographic_filter_name and geographic_filter_value: @@ -102,6 +102,7 @@ def _test_weave( # Act result = self.runner.invoke(weave, arguments, standalone_mode=False) + print(result) # Assert assert result.exit_code == 0 @@ -141,11 +142,11 @@ def _test_weave( # 3 output_types = ["summary", "timeseries_unified", "timeseries_separated"] - for output_type in output_types: - if output_type == output: - assert getattr(config, f"output_{output_type}") is True + for ot in output_types: + if ot == output_type: + assert getattr(config, f"output_{ot}") is True else: - assert getattr(config, f"output_{output_type}") is False + assert getattr(config, f"output_{ot}") is False # 4 assert getattr(config, "site_limit") == 4 @@ -171,86 +172,86 @@ def _test_weave( assert getattr(config, _geographic_filter_name) == "" # 9 - assert getattr(config, "sites_output_format") == sites_output_format + assert getattr(config, "output_format") == output_format def test_weave_summary(self): - self._test_weave(parameter=WATERLEVELS, output="summary") + self._test_weave(parameter=WATERLEVELS, output_type="summary") def test_weave_timeseries_unified(self): - self._test_weave(parameter=WATERLEVELS, output="timeseries_unified") + self._test_weave(parameter=WATERLEVELS, output_type="timeseries_unified") def test_weave_timeseries_separated(self): - self._test_weave(parameter=WATERLEVELS, output="timeseries_separated") + self._test_weave(parameter=WATERLEVELS, output_type="timeseries_separated") def test_weave_csv(self): self._test_weave( - parameter=WATERLEVELS, output="summary", sites_output_format="csv" + parameter=WATERLEVELS, output_type="summary", output_format="csv" ) def test_weave_geojson(self): self._test_weave( - parameter=WATERLEVELS, output="summary", sites_output_format="geojson" + parameter=WATERLEVELS, output_type="summary", output_format="geojson" ) def test_weave_bbox(self): self._test_weave( - parameter=WATERLEVELS, output="summary", bbox="32.0,-106.0,36.0,-102.0" + parameter=WATERLEVELS, output_type="summary", bbox="32.0,-106.0,36.0,-102.0" ) def test_weave_county(self): - self._test_weave(parameter=WATERLEVELS, output="summary", county="Bernalillo") + self._test_weave(parameter=WATERLEVELS, output_type="summary", county="Bernalillo") def test_weave_wkt(self): self._test_weave( parameter=WATERLEVELS, - output="summary", + output_type="summary", wkt="POLYGON((-106.0 32.0, -102.0 32.0, -102.0 36.0, -106.0 36.0, -106.0 32.0))", ) def test_weave_waterlevels(self): - self._test_weave(parameter=WATERLEVELS, output="summary") + self._test_weave(parameter=WATERLEVELS, output_type="summary") def test_weave_arsenic(self): - self._test_weave(parameter=ARSENIC, output="summary") + self._test_weave(parameter=ARSENIC, output_type="summary") def test_weave_bicarbonate(self): - self._test_weave(parameter=BICARBONATE, output="summary") + self._test_weave(parameter=BICARBONATE, output_type="summary") def test_weave_calcium(self): - self._test_weave(parameter=CALCIUM, output="summary") + self._test_weave(parameter=CALCIUM, output_type="summary") def test_weave_carbonate(self): - self._test_weave(parameter=CARBONATE, output="summary") + self._test_weave(parameter=CARBONATE, output_type="summary") def test_weave_chloride(self): - self._test_weave(parameter=CHLORIDE, output="summary") + self._test_weave(parameter=CHLORIDE, output_type="summary") def test_weave_fluoride(self): - self._test_weave(parameter=FLUORIDE, output="summary") + self._test_weave(parameter=FLUORIDE, output_type="summary") def test_weave_magnesium(self): - self._test_weave(parameter=MAGNESIUM, output="summary") + self._test_weave(parameter=MAGNESIUM, output_type="summary") def test_weave_nitrate(self): - self._test_weave(parameter=NITRATE, output="summary") + self._test_weave(parameter=NITRATE, output_type="summary") def test_weave_ph(self): - self._test_weave(parameter=PH, output="summary") + self._test_weave(parameter=PH, output_type="summary") def test_weave_potassium(self): - self._test_weave(parameter=POTASSIUM, output="summary") + self._test_weave(parameter=POTASSIUM, output_type="summary") def test_weave_silica(self): - self._test_weave(parameter=SILICA, output="summary") + self._test_weave(parameter=SILICA, output_type="summary") def test_weave_sodium(self): - self._test_weave(parameter=SODIUM, output="summary") + self._test_weave(parameter=SODIUM, output_type="summary") def test_weave_sulfate(self): - self._test_weave(parameter=SULFATE, output="summary") + self._test_weave(parameter=SULFATE, output_type="summary") def test_weave_tds(self): - self._test_weave(parameter=TDS, output="summary") + self._test_weave(parameter=TDS, output_type="summary") def test_weave_uranium(self): - self._test_weave(parameter=URANIUM, output="summary") + self._test_weave(parameter=URANIUM, output_type="summary") diff --git a/tests/test_sources/__init__.py b/tests/test_sources/__init__.py index 5e33ec0..dd1b9fe 100644 --- a/tests/test_sources/__init__.py +++ b/tests/test_sources/__init__.py @@ -66,7 +66,7 @@ def setup(self): # 2: delete newly created dirs and files path_to_clean = Path(self.config.output_path) print(f"Cleaning and removing {path_to_clean}") - recursively_clean_directory(path_to_clean) + # recursively_clean_directory(path_to_clean) # reset test attributes self.dirs_to_delete = [] @@ -162,7 +162,7 @@ def test_summary_csv(self): def test_summary_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_summary = True - self.config.sites_output_format = "geojson" + self.config.output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ @@ -191,7 +191,7 @@ def test_timeseries_unified_csv(self): def test_timeseries_unified_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_unified = True - self.config.sites_output_format = "geojson" + self.config.output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ @@ -228,7 +228,7 @@ def test_timeseries_separated_csv(self): def test_timeseries_separated_geojson(self): # Arrange -------------------------------------------------------------- self.config.output_timeseries_separated = True - self.config.sites_output_format = "geojson" + self.config.output_format = "geojson" self.config.report() # Act ------------------------------------------------------------------ From 7fe0fa057eaef93b100be4ac3dd121d33fdb4c24 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 10:51:18 -0600 Subject: [PATCH 18/38] remove dirs made by tests --- tests/test_sources/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_sources/__init__.py b/tests/test_sources/__init__.py index dd1b9fe..34d0485 100644 --- a/tests/test_sources/__init__.py +++ b/tests/test_sources/__init__.py @@ -66,7 +66,7 @@ def setup(self): # 2: delete newly created dirs and files path_to_clean = Path(self.config.output_path) print(f"Cleaning and removing {path_to_clean}") - # recursively_clean_directory(path_to_clean) + recursively_clean_directory(path_to_clean) # reset test attributes self.dirs_to_delete = [] From 2c09d46ddd6724f5a5ec1eea386a709fd88aeb27 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 25 Apr 2025 16:52:58 +0000 Subject: [PATCH 19/38] Formatting changes --- backend/persister.py | 9 ++++++--- backend/unifier.py | 33 +++++++++++---------------------- tests/test_cli/__init__.py | 4 +++- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/backend/persister.py b/backend/persister.py index b985e7c..d4dc898 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -32,7 +32,7 @@ try: from google.cloud import storage except ImportError: - print("google cloud storage not available") + print("google cloud storage not available") def write_memory(func, records, output_format=None): @@ -98,6 +98,7 @@ class BasePersister(Loggable): Class to persist the data to a file or cloud storage. If persisting to a file, the output directory is created by config._make_output_path() """ + def __init__(self, config=None): self.records = [] self.timeseries = [] @@ -167,7 +168,9 @@ def add_extension(self, path: str, extension: OutputFormat): path = f"{path}.{ext}" return path - def _dump_sites_summary(self, path: str, records: list, output_format: OutputFormat): + def _dump_sites_summary( + self, path: str, records: list, output_format: OutputFormat + ): dump_sites_summary(path, records, output_format) def _dump_timeseries(self, path: str, timeseries: list): @@ -232,12 +235,12 @@ def _add_content(self, path: str, content: str): def _dump_sites_summary(self, path: str, records: list): content = write_memory(dump_sites_summary, records, self.config.output_format) self._add_content(path, content) - def _dump_timeseries_unified(self, path: str, timeseries: list): content = write_memory(path, dump_timeseries, timeseries) self._add_content(path, content) + # class ST2Persister(BasePersister): # extension = "st2" # diff --git a/backend/unifier.py b/backend/unifier.py index 15eef07..b070631 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -117,9 +117,7 @@ def unify_sites(config): # persister.save(config.output_path) -def _site_wrapper( - site_source, parameter_source, persister, config -): +def _site_wrapper(site_source, parameter_source, persister, config): try: # TODO: fully develop checks/discoveries below @@ -194,25 +192,16 @@ def _site_wrapper( # num_sites_to_remove from the length of the list # to remove the last num_sites_to_remove sites if use_summarize: - persister.records = ( - persister.records[ - : len(persister.records) - - num_sites_to_remove - ] - ) + persister.records = persister.records[ + : len(persister.records) - num_sites_to_remove + ] else: - persister.timeseries = ( - persister.timeseries[ - : len(persister.timeseries) - - num_sites_to_remove - ] - ) - persister.sites = ( - persister.sites[ - : len(persister.sites) - - num_sites_to_remove - ] - ) + persister.timeseries = persister.timeseries[ + : len(persister.timeseries) - num_sites_to_remove + ] + persister.sites = persister.sites[ + : len(persister.sites) - num_sites_to_remove + ] break except BaseException: @@ -227,7 +216,7 @@ def _unify_parameter( config, sources, ): - + if config.output_format == OutputFormat.GEOSERVER: persister = GeoServerPersister(config) else: diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index b6e3f6a..ecfaee0 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -199,7 +199,9 @@ def test_weave_bbox(self): ) def test_weave_county(self): - self._test_weave(parameter=WATERLEVELS, output_type="summary", county="Bernalillo") + self._test_weave( + parameter=WATERLEVELS, output_type="summary", county="Bernalillo" + ) def test_weave_wkt(self): self._test_weave( From 3ba04ee91324f02bf93895bce853a708d8a7e999 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 10:53:55 -0600 Subject: [PATCH 20/38] rearranged requirements to be alphabetical --- requirements.txt | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/requirements.txt b/requirements.txt index 6aea17c..2d2a138 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,13 @@ flask +frost_sta_client +Geoalchemy2 +geopandas +google-cloud-storage gunicorn httpx +mypy pandas -geopandas -frost_sta_client -google-cloud-storage +psycopg2 pytest -urllib3>=2.2.0,<3.0.0 -Geoalchemy2 sqlalchemy -psycopg2 \ No newline at end of file +urllib3>=2.2.0,<3.0.0 \ No newline at end of file From 0aee4e73c95c4daa2f0229a950a369e2a6e13979 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 10:57:37 -0600 Subject: [PATCH 21/38] updated CHANGELOG --- CHANGELOG.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80bf398..583fa8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased: 0.8.0 +## Unreleased: 0.9.0 + +### Added +- `--sites-only` flag to only retrieve site data +- `--output-format` flag to write out sites/summary tables as csv or geojson. + - options are `csv` or `geojson` +- NM OSE POD data for sites. + - can be removed from output with `--no-nmose-pod` +- `--output-dir` to change the output directory to a location other than `.` (the current working directory) + +### Changed +- `output` to `output-type` for CLI + +### Fixed +- a bug with `--site-limit`. it now exports the number of sets requested by the + +## 0.8.0 ### Added - water level for WQP From acf680a37b92bf4d2150d23732fae81e5bb0eabd Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 11:03:22 -0600 Subject: [PATCH 22/38] NMBGMR water level pagination --- backend/connectors/nmbgmr/source.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 8c5db49..e16ecf7 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -46,8 +46,10 @@ def _make_url(endpoint): if os.getenv("DEBUG") == "1": - return f"http://localhost:8000/latest/{endpoint}" - return f"https://waterdata.nmt.edu//latest/{endpoint}" + url = f"http://localhost:8000/latest/{endpoint}" + else: + url = f"https://waterdata.nmt.edu/latest/{endpoint}" + return url class NMBGMRSiteSource(BaseSiteSource): From ad8b0f18d5d7e5dc7f329ac234ce6fc3ffefb6fb Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 25 Apr 2025 17:05:53 +0000 Subject: [PATCH 23/38] Formatting changes --- backend/connectors/nmbgmr/source.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index e16ecf7..bb45cab 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -170,7 +170,11 @@ def __repr__(self): def _clean_records(self, records): # remove records with no depth to water value - return [r for r in records if r["DepthToWaterBGS"] is not None and r["DateMeasured"] is not None] + return [ + r + for r in records + if r["DepthToWaterBGS"] is not None and r["DateMeasured"] is not None + ] def _extract_parameter_record(self, record, *args, **kw): record[PARAMETER_NAME] = DTW From de46920e63ec4b7a2ceab7ce6138d8287fdb839e Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 14:30:55 -0600 Subject: [PATCH 24/38] mypy fixes --- backend/config.py | 4 ++-- backend/persister.py | 4 ++-- backend/persisters/geoserver.py | 15 +++++---------- mypy.ini | 3 ++- requirements.txt | 4 +++- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/backend/config.py b/backend/config.py index 6dd2e0e..56decd0 100644 --- a/backend/config.py +++ b/backend/config.py @@ -18,6 +18,8 @@ from datetime import datetime, timedelta from enum import Enum import shapely.wkt +import yaml + from . import OutputFormat from .bounding_polygons import get_county_polygon from .connectors.nmbgmr.source import ( @@ -201,8 +203,6 @@ def __init__(self, model=None, payload=None, path=None): setattr(self, attr, payload[attr]) def _load_from_yaml(self, path): - import yaml - path = os.path.abspath(path) if os.path.exists(path): self.log(f"Loading config from {path}") diff --git a/backend/persister.py b/backend/persister.py index d4dc898..23964ad 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -232,8 +232,8 @@ def _make_output_directory(self, output_directory: str): def _add_content(self, path: str, content: str): self._content.append((path, content)) - def _dump_sites_summary(self, path: str, records: list): - content = write_memory(dump_sites_summary, records, self.config.output_format) + def _dump_sites_summary(self, path: str, records: list, output_format: OutputFormat): + content = write_memory(dump_sites_summary, records, output_format) self._add_content(path, content) def _dump_timeseries_unified(self, path: str, timeseries: list): diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index 9fc261d..8885614 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -9,10 +9,11 @@ import os import time from itertools import groupby - +from typing import Type import psycopg2 from sqlalchemy.dialects.postgresql import JSONB, insert -from sqlalchemy.orm import declarative_base, sessionmaker, relationship +from sqlalchemy.orm import declarative_base, sessionmaker, relationship, Mapped + from backend.persister import BasePersister @@ -31,12 +32,6 @@ Base = declarative_base() - -# dbname=db.get('dbname'), -# user=db.get('user'), -# password=db.get('password'), -# host=db.get('host'), -# port=db.get('port'), def session_factory(connection: dict): user = connection.get("user", "postgres") password = connection.get("password", "") @@ -61,7 +56,7 @@ class Location(Base): geometry = Column(Geometry(geometry_type="POINT", srid=4326)) source_slug = Column(String, ForeignKey("tbl_sources.name")) - source = relationship("Sources", backref="locations") + source: Mapped["Sources"] = relationship("Sources", backref="locations", uselist=False) class Summary(Base): @@ -76,7 +71,7 @@ class Summary(Base): source_slug = Column(String, ForeignKey("tbl_sources.name")) parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) - source = relationship("Sources", backref="summaries") + source: Mapped["Sources"] = relationship("Sources", backref="summaries", uselist=False) value = Column(Float) nrecords = Column(Integer) diff --git a/mypy.ini b/mypy.ini index 380b366..4904098 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,3 +1,4 @@ [mypy] ignore_missing_imports = True -exclude = ^(venv|.github|.mypy_cache|.pytest_cache|nmuwd.egg-info|__pycache__|build|tests/archived) \ No newline at end of file +exclude = ^(venv|.github|.mypy_cache|.pytest_cache|nmuwd.egg-info|__pycache__|build|tests/archived) +plugins = sqlalchemy.ext.mypy.plugin diff --git a/requirements.txt b/requirements.txt index 2d2a138..648458d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,7 @@ mypy pandas psycopg2 pytest -sqlalchemy +pyyaml +sqlalchemy[mypy] +types-pyyaml urllib3>=2.2.0,<3.0.0 \ No newline at end of file From ec4689e82bfc371d5712413440f45d0daa687756 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 14:50:56 -0600 Subject: [PATCH 25/38] populate nmbgmr well fields --- backend/connectors/nmbgmr/source.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index bb45cab..19311a9 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -87,18 +87,18 @@ def get_records(self): ) if not config.sites_only: for site in sites: - # print(f"Obtaining well data for {site['properties']['point_id']}") - # well_data = self._execute_json_request( - # _make_url("wells"), - # params={"pointid": site["properties"]["point_id"]}, - # tag="", - # ) - # site["properties"]["formation"] = well_data["formation"] - # site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] - # site["properties"]["well_depth_units"] = FEET - site["properties"]["formation"] = None - site["properties"]["well_depth"] = None + print(f"Obtaining well data for {site['properties']['point_id']}") + well_data = self._execute_json_request( + _make_url("wells"), + params={"pointid": site["properties"]["point_id"]}, + tag="", + ) + site["properties"]["formation"] = well_data["formation"] + site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] site["properties"]["well_depth_units"] = FEET + # site["properties"]["formation"] = None + # site["properties"]["well_depth"] = None + # site["properties"]["well_depth_units"] = FEET return sites From 8322949d9feb2441547a9acf138070933c102205 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 25 Apr 2025 20:53:17 +0000 Subject: [PATCH 26/38] Formatting changes --- backend/persister.py | 4 +++- backend/persisters/geoserver.py | 22 ++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/backend/persister.py b/backend/persister.py index 23964ad..bf3d11c 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -232,7 +232,9 @@ def _make_output_directory(self, output_directory: str): def _add_content(self, path: str, content: str): self._content.append((path, content)) - def _dump_sites_summary(self, path: str, records: list, output_format: OutputFormat): + def _dump_sites_summary( + self, path: str, records: list, output_format: OutputFormat + ): content = write_memory(dump_sites_summary, records, output_format) self._add_content(path, content) diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index b011c17..9b79f3b 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -34,6 +34,7 @@ Base = declarative_base() + def session_factory(connection: dict): user = connection.get("user", "postgres") password = connection.get("password", "") @@ -58,7 +59,9 @@ class Location(Base): geometry = Column(Geometry(geometry_type="POINT", srid=4326)) source_slug = Column(String, ForeignKey("tbl_sources.name")) - source: Mapped["Sources"] = relationship("Sources", backref="locations", uselist=False) + source: Mapped["Sources"] = relationship( + "Sources", backref="locations", uselist=False + ) class Summary(Base): @@ -73,7 +76,9 @@ class Summary(Base): source_slug = Column(String, ForeignKey("tbl_sources.name")) parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) - source: Mapped["Sources"] = relationship("Sources", backref="summaries", uselist=False) + source: Mapped["Sources"] = relationship( + "Sources", backref="summaries", uselist=False + ) value = Column(Float) nrecords = Column(Integer) @@ -150,6 +155,7 @@ def _write_sources(self, records: list): def _write_sources_with_convex_hull(self, records: list): # sources = {r.source for r in records} with self._connection as conn: + def key(r): return str(r.source) @@ -159,15 +165,19 @@ def key(r): # calculate convex hull for the source from the records # Create a MultiPoint object - points = MultiPoint([Point(record.longitude, record.latitude) for record in group]) + points = MultiPoint( + [Point(record.longitude, record.latitude) for record in group] + ) # Calculate the convex hull sinsert = insert(Sources) print("Writing source", source_name, points.convex_hull) - sql = sinsert.values([{"name": source_name, - "convex_hull": points.convex_hull.wkt}]).on_conflict_do_update( + sql = sinsert.values( + [{"name": source_name, "convex_hull": points.convex_hull.wkt}] + ).on_conflict_do_update( index_elements=[Sources.name], - set_={"convex_hull": sinsert.excluded.convex_hull}) + set_={"convex_hull": sinsert.excluded.convex_hull}, + ) # sql = insert(Sources).values([{"name": source,} for source in sources]).on_conflict_do_nothing( # index_elements=[Sources.name],) conn.execute(sql) From f0c7973f02c38454490fc96f75e03ff961def3ba Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 14:55:07 -0600 Subject: [PATCH 27/38] mypy fix --- backend/persisters/geoserver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index 9b79f3b..486e3a8 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -161,12 +161,12 @@ def key(r): records = sorted(records, key=key) for source_name, group in groupby(records, key=key): - group = list(group) + listed_group = list(group) # calculate convex hull for the source from the records # Create a MultiPoint object points = MultiPoint( - [Point(record.longitude, record.latitude) for record in group] + [Point(record.longitude, record.latitude) for record in listed_group] ) # Calculate the convex hull From e05ef53d2ae77341f167909a14878b3cf213fb15 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 14:56:19 -0600 Subject: [PATCH 28/38] variable name clarity --- backend/persisters/geoserver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index 486e3a8..3f09acb 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -161,12 +161,12 @@ def key(r): records = sorted(records, key=key) for source_name, group in groupby(records, key=key): - listed_group = list(group) + source_records = list(group) # calculate convex hull for the source from the records # Create a MultiPoint object points = MultiPoint( - [Point(record.longitude, record.latitude) for record in listed_group] + [Point(record.longitude, record.latitude) for record in source_records] ) # Calculate the convex hull From dab407b8347755f49663f8ac78b0729316ddfe9b Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 25 Apr 2025 20:56:41 +0000 Subject: [PATCH 29/38] Formatting changes --- backend/persisters/geoserver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index 486e3a8..f5e83ff 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -166,7 +166,10 @@ def key(r): # Create a MultiPoint object points = MultiPoint( - [Point(record.longitude, record.latitude) for record in listed_group] + [ + Point(record.longitude, record.latitude) + for record in listed_group + ] ) # Calculate the convex hull From 074f62edc0e4131481eca7ae831ef00ecc5931d0 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 25 Apr 2025 20:58:24 +0000 Subject: [PATCH 30/38] Formatting changes --- backend/persisters/geoserver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index 3f09acb..4461246 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -166,7 +166,10 @@ def key(r): # Create a MultiPoint object points = MultiPoint( - [Point(record.longitude, record.latitude) for record in source_records] + [ + Point(record.longitude, record.latitude) + for record in source_records + ] ) # Calculate the convex hull From 93a3878e7b60552472d0f2561afec4e89624daec Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 15:19:51 -0600 Subject: [PATCH 31/38] skip well data retrieval for NMBGMR until it can be retrieved in batches --- backend/__init__.py | 10 +++++++-- backend/connectors/nmbgmr/source.py | 30 +++++++++++++++------------ tests/test_cli/__init__.py | 1 - tests/test_sources/test_nmbgmr_amp.py | 17 ++++++++++++++- 4 files changed, 41 insertions(+), 17 deletions(-) diff --git a/backend/__init__.py b/backend/__init__.py index 06a630a..d531f86 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -1,7 +1,13 @@ from enum import Enum - - +from os import environ class OutputFormat(str, Enum): GEOJSON = "geojson" CSV = "csv" GEOSERVER = "geoserver" + + +def get_bool_env_variable(var) -> bool: + if environ.get(var).lower() in ["true", "1", "yes"]: + return True + else: + return False \ No newline at end of file diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 19311a9..ec26aae 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -15,6 +15,7 @@ # =============================================================================== import os +from backend import get_bool_env_variable from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.connectors.nmbgmr.transformer import ( NMBGMRSiteTransformer, @@ -87,19 +88,22 @@ def get_records(self): ) if not config.sites_only: for site in sites: - print(f"Obtaining well data for {site['properties']['point_id']}") - well_data = self._execute_json_request( - _make_url("wells"), - params={"pointid": site["properties"]["point_id"]}, - tag="", - ) - site["properties"]["formation"] = well_data["formation"] - site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] - site["properties"]["well_depth_units"] = FEET - # site["properties"]["formation"] = None - # site["properties"]["well_depth"] = None - # site["properties"]["well_depth_units"] = FEET - + if get_bool_env_variable("IS_TESTING_ENV"): + print(f"Skipping well data for {site['properties']['point_id']} for testing (until well data can be retrieved in batches)") + site["properties"]["formation"] = None + site["properties"]["well_depth"] = None + site["properties"]["well_depth_units"] = FEET + else: + print(f"Obtaining well data for {site['properties']['point_id']}") + well_data = self._execute_json_request( + _make_url("wells"), + params={"pointid": site["properties"]["point_id"]}, + tag="", + ) + site["properties"]["formation"] = well_data["formation"] + site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] + site["properties"]["well_depth_units"] = FEET + return sites diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index ecfaee0..4d342ae 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -102,7 +102,6 @@ def _test_weave( # Act result = self.runner.invoke(weave, arguments, standalone_mode=False) - print(result) # Assert assert result.exit_code == 0 diff --git a/tests/test_sources/test_nmbgmr_amp.py b/tests/test_sources/test_nmbgmr_amp.py index 90bba2c..cb3e031 100644 --- a/tests/test_sources/test_nmbgmr_amp.py +++ b/tests/test_sources/test_nmbgmr_amp.py @@ -1,6 +1,21 @@ +import os +import pytest + from backend.constants import WATERLEVELS, CALCIUM, MILLIGRAMS_PER_LITER, FEET from tests.test_sources import BaseSourceTestClass +os.environ["IS_TESTING_ENV"] = "True" + +@pytest.fixture(autouse=True) +def setup(): + # SETUP CODE ----------------------------------------------------------- + os.environ["IS_TESTING_ENV"] = "True" + + # RUN TESTS ------------------------------------------------------------ + yield + + # TEARDOWN CODE --------------------------------------------------------- + os.environ["IS_TESTING_ENV"] = "False" class TestNMBGMRWaterlevels(BaseSourceTestClass): @@ -13,4 +28,4 @@ class TestNMBGMRAnalyte(BaseSourceTestClass): parameter = CALCIUM units = MILLIGRAMS_PER_LITER - agency = "nmbgmr_amp" + agency = "nmbgmr_amp" \ No newline at end of file From 373cc9d68ae2216177845d632f619150aabc9766 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 25 Apr 2025 21:21:30 +0000 Subject: [PATCH 32/38] Formatting changes --- backend/__init__.py | 4 +++- backend/connectors/nmbgmr/source.py | 6 ++++-- tests/test_sources/test_nmbgmr_amp.py | 4 +++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/backend/__init__.py b/backend/__init__.py index d531f86..5b3ab77 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -1,5 +1,7 @@ from enum import Enum from os import environ + + class OutputFormat(str, Enum): GEOJSON = "geojson" CSV = "csv" @@ -10,4 +12,4 @@ def get_bool_env_variable(var) -> bool: if environ.get(var).lower() in ["true", "1", "yes"]: return True else: - return False \ No newline at end of file + return False diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index ec26aae..d01cd11 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -89,7 +89,9 @@ def get_records(self): if not config.sites_only: for site in sites: if get_bool_env_variable("IS_TESTING_ENV"): - print(f"Skipping well data for {site['properties']['point_id']} for testing (until well data can be retrieved in batches)") + print( + f"Skipping well data for {site['properties']['point_id']} for testing (until well data can be retrieved in batches)" + ) site["properties"]["formation"] = None site["properties"]["well_depth"] = None site["properties"]["well_depth_units"] = FEET @@ -103,7 +105,7 @@ def get_records(self): site["properties"]["formation"] = well_data["formation"] site["properties"]["well_depth"] = well_data["well_depth_ftbgs"] site["properties"]["well_depth_units"] = FEET - + return sites diff --git a/tests/test_sources/test_nmbgmr_amp.py b/tests/test_sources/test_nmbgmr_amp.py index cb3e031..b56fd5b 100644 --- a/tests/test_sources/test_nmbgmr_amp.py +++ b/tests/test_sources/test_nmbgmr_amp.py @@ -6,6 +6,7 @@ os.environ["IS_TESTING_ENV"] = "True" + @pytest.fixture(autouse=True) def setup(): # SETUP CODE ----------------------------------------------------------- @@ -17,6 +18,7 @@ def setup(): # TEARDOWN CODE --------------------------------------------------------- os.environ["IS_TESTING_ENV"] = "False" + class TestNMBGMRWaterlevels(BaseSourceTestClass): parameter = WATERLEVELS @@ -28,4 +30,4 @@ class TestNMBGMRAnalyte(BaseSourceTestClass): parameter = CALCIUM units = MILLIGRAMS_PER_LITER - agency = "nmbgmr_amp" \ No newline at end of file + agency = "nmbgmr_amp" From 87e2225a38fda6a419f38814d041adf15e3907e4 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 25 Apr 2025 15:33:53 -0600 Subject: [PATCH 33/38] mypy fix --- backend/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/__init__.py b/backend/__init__.py index 5b3ab77..804491c 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -8,8 +8,9 @@ class OutputFormat(str, Enum): GEOSERVER = "geoserver" -def get_bool_env_variable(var) -> bool: - if environ.get(var).lower() in ["true", "1", "yes"]: - return True - else: +def get_bool_env_variable(var: str) -> bool: + env_var = environ.get(var, None) + if env_var is None or env_var.strip().lower() not in ["true", "1", "yes"]: return False + else: + return True From a5d0b5987acf2d2a3290e16875a7c34cbe65cd1b Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 28 Apr 2025 08:29:21 -0600 Subject: [PATCH 34/38] default 'yes' to False to enable prompt --- backend/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/config.py b/backend/config.py index 56decd0..0b16ed7 100644 --- a/backend/config.py +++ b/backend/config.py @@ -151,7 +151,7 @@ class Config(Loggable): output_format: str = OutputFormat.CSV - yes: bool = True + yes: bool = False def __init__(self, model=None, payload=None, path=None): # need to initialize logger From a23d1d8bbca9191316c7b8dae4c00042e8f3dfb5 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 28 Apr 2025 13:55:46 -0600 Subject: [PATCH 35/38] add agency to nwis site numbers --- backend/connectors/usgs/source.py | 11 ++++++++--- backend/connectors/usgs/transformer.py | 6 +++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 25e4e87..cac4f2a 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -76,11 +76,12 @@ def parse_json(data): for location in data["timeSeries"]: site_code = location["sourceInfo"]["siteCode"][0]["value"] + agency = location["sourceInfo"]["siteCode"][0]["agencyCode"] source_parameter_name = location["variable"]["variableName"] source_parameter_units = location["variable"]["unit"]["unitCode"] for value in location["values"][0]["value"]: record = { - "site_code": site_code, + "site_id": f"{agency}-{site_code}", "source_parameter_name": source_parameter_name, "value": value["value"], "datetime_measured": value["dateTime"], @@ -150,12 +151,16 @@ def __repr__(self): return "NWISWaterLevelSource" def get_records(self, site_record): + # query sites with the agency, which need to be in the form of "{agency}:{site number}" + sites = make_site_list(site_record) + sites_with_colons = [s.replace("-", ":") for s in sites] + params = { "format": "json", "siteType": "GW", "siteStatus": "all", "parameterCd": "72019", - "sites": ",".join(make_site_list(site_record)), + "sites": ",".join(sites_with_colons), } config = self.config @@ -178,7 +183,7 @@ def get_records(self, site_record): return records def _extract_site_records(self, records, site_record): - return [ri for ri in records if ri["site_code"] == site_record.id] + return [ri for ri in records if ri["site_id"] == site_record.id] def _clean_records(self, records): return [ diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 1f61cf5..379b8bd 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -32,9 +32,13 @@ def _transform(self, record): # if not self.contained(lng, lat): # return + agency = record["agency_cd"] + site_no = record["site_no"] + site_id = f"{agency}-{site_no}" + rec = { "source": "USGS-NWIS", - "id": record["site_no"], + "id": site_id, "name": record["station_nm"], "latitude": lat, "longitude": lng, From 89042a9d722ac7704bc151a9fe34ae0b078450b4 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 28 Apr 2025 17:17:31 -0600 Subject: [PATCH 36/38] ose pods are false agencies for all parameters --- backend/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/config.py b/backend/config.py index 0b16ed7..86ef36a 100644 --- a/backend/config.py +++ b/backend/config.py @@ -220,7 +220,6 @@ def get_config_and_false_agencies(self): "ebid", "nmbgmr_amp", "nmose_isc_seven_rivers", - "nmose_pod", "nmose_roswell", "nwis", "pvacd", From ada99cb875ebbeda454435d1da44be84ae5fda92 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 28 Apr 2025 17:18:00 -0600 Subject: [PATCH 37/38] bump to 0.9.1 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 05b81a8..abfc56f 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( name="nmuwd", - version="0.9.0", + version="0.9.1", author="Jake Ross", description="New Mexico Water Data Integration Engine", long_description=long_description, From 08a0aebe671ab67b686e90365e57619eb8262bb1 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 29 Apr 2025 08:18:14 -0600 Subject: [PATCH 38/38] update documentation --- CHANGELOG.md | 3 ++- README.md | 18 +++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 583fa8d..c92224a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased: 0.9.0 +## 0.9.2 ### Added - `--sites-only` flag to only retrieve site data - `--output-format` flag to write out sites/summary tables as csv or geojson. - options are `csv` or `geojson` + - timeseries data is always written to a csv - NM OSE POD data for sites. - can be removed from output with `--no-nmose-pod` - `--output-dir` to change the output directory to a location other than `.` (the current working directory) diff --git a/README.md b/README.md index 0173df4..0cb15ac 100644 --- a/README.md +++ b/README.md @@ -71,22 +71,22 @@ where `{parameter}` is the name of the parameter whose data is to be retrieved, | **pvacd** | X | - | - | - | - | - | - | - | - | - | - | - | - | - | - | - | | **wqp** | X | X | X | X | X | X | X | X | X | X | X | X | X | X | X | X | -### Output -The `--output` option is required and used to set the output type: +### Output Type +The `--output-type` option is required and used to set the output type: ``` ---output summary +--output-type summary ``` - A summary table consisting of location information as well as summary statistics for the parameter of interest for every location that has observations. ``` ---output timeseries_unified +--output-type timeseries_unified ``` - A single table consisting of time series data for all locations for the parameter of interest. - A single table of site data that contains information such as latitude, longitude, and elevation ``` ---output timeseries_separated +--output-type timeseries_separated ``` - Separate time series tables for all locations for the parameter of interest. - A single table of site data that contains information such as latitude, longitude, and elevation @@ -181,7 +181,7 @@ The Data Integration Engine enables the user to obtain groundwater level and gro - `--no-pvacd` to exclude Pecos Valley Artesian Convservancy District (PVACD) data - `--no-wqp` to exclude Water Quality Portal (WQP) data -### Geographic Filters +### Geographic Filters [In Development] The following flags can be used to geographically filter data: @@ -193,7 +193,11 @@ The following flags can be used to geographically filter data: -- bbox 'x1 y1, x2 y2' ``` -### Date Filters +``` +-- wkt {wkt polygon or multipolygon} +``` + +### Date Filters [In Development] The following flags can be used to filter by dates: