From c3224025c98bda041603a907a967d2c31891175c Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Mon, 27 Apr 2026 13:32:41 -0600 Subject: [PATCH 01/44] fix(source): enable tag paramter for json requests to be Null Previously if the tag parameter was set to Null or was not provided it would default to "data". This caused issues for some endpoints that did not have a "data" key in their response or for which the full object was desired. --- backend/connectors/bor/source.py | 8 +++++--- backend/connectors/isc_seven_rivers/source.py | 6 ++++-- backend/source.py | 4 +--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index 5ad03e1..4a28ee2 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -57,7 +57,7 @@ def get_records(self): # locationTypeId 10 is for wells url = "https://data.usbr.gov/rise/api/location" params = {"stateId": "NM", "locationTypeId": 10} - return self._execute_json_request(url, params) + return self._execute_json_request(url, params, tag="data") def parse_dt(dt): @@ -119,13 +119,14 @@ def get_records(self, site_record): code = get_analyte_search_param(self.config.parameter, BOR_ANALYTE_MAPPING) catalog_record_data = self._execute_json_request( - f"https://data.usbr.gov{site_record.catalogRecords[0]['id']}" + f"https://data.usbr.gov{site_record.catalogRecords[0]['id']}", + tag="data" ) catalog_items = catalog_record_data["relationships"]["catalogItems"]["data"] for i, item in enumerate(self._reorder_catalog_items(catalog_items)): - data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}') + data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}', tag="data") if not data: continue @@ -142,6 +143,7 @@ def get_records(self, site_record): return self._execute_json_request( "https://data.usbr.gov/rise/api/result", params={"itemId": data["attributes"]["_id"]}, + tag="data" ) diff --git a/backend/connectors/isc_seven_rivers/source.py b/backend/connectors/isc_seven_rivers/source.py index 5679fad..691bbac 100644 --- a/backend/connectors/isc_seven_rivers/source.py +++ b/backend/connectors/isc_seven_rivers/source.py @@ -85,6 +85,7 @@ def health(self): def get_records(self): return self._execute_json_request( _make_url("getMonitoringPoints.ashx"), + tag="data", ) @@ -100,7 +101,7 @@ def _get_analyte_id_and_name(self, analyte): """ """ if self._analyte_ids is None: - resp = self._execute_json_request(_make_url("getAnalytes.ashx")) + resp = self._execute_json_request(_make_url("getAnalytes.ashx"), tag="data") if resp: self._analyte_ids = {r["name"]: r["id"] for r in resp} @@ -164,7 +165,7 @@ def get_records(self, site_record): self._source_parameter_name = analyte_id_and_name["name"] return self._execute_json_request( - _make_url("getReadings.ashx"), params=params + _make_url("getReadings.ashx"), params=params, tag="data" ) @@ -184,6 +185,7 @@ def get_records(self, site_record): return self._execute_json_request( _make_url("getWaterLevels.ashx"), params=params, + tag="data", ) def _clean_records(self, records): diff --git a/backend/source.py b/backend/source.py index 5189258..ff6160c 100644 --- a/backend/source.py +++ b/backend/source.py @@ -252,9 +252,7 @@ def _execute_json_request( the json response """ resp = httpx.get(url, params=params, **kw) - if tag is None: - tag = "data" - + if resp.status_code == 200: try: obj = resp.json() From ec9f6e31c35b80a7f71933986adf933f8f7d269a Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Mon, 27 Apr 2026 13:33:44 -0600 Subject: [PATCH 02/44] fix(config): update config/false agencies for bicarbonate BoR does not report bicarbonate data --- backend/config.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/backend/config.py b/backend/config.py index 2b4af34..17a7bee 100644 --- a/backend/config.py +++ b/backend/config.py @@ -281,6 +281,18 @@ def get_config_and_false_agencies(self): "nwis", "pvacd", ] + elif self.parameter in [BICARBONATE]: + config_agencies = ["nmbgmr_amp", "nmed_dwb", "nmose_isc_seven_rivers", "wqp"] + false_agencies = [ + "bor", + "bernco", + "cabq", + "ebid", + "nmose_roswell", + "nmose_pod", + "nwis", + "pvacd", + ] elif self.parameter in [ BICARBONATE, CALCIUM, From dcaf8d3bb4f56c26202ee597d9d0836d108928e1 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Mon, 27 Apr 2026 14:48:07 -0600 Subject: [PATCH 03/44] fix(USGS): use new USGS API endpoints for water level retrieval The USGS deprecated the old water level retrieval API endpoints, so this commit updates the NWISWaterLevelSource to use the new OGC API endpoints. Additionally, the chunk size for site retrieval is reduced to 5 to avoid URI length issues and httpx read timeouts. --- backend/connectors/usgs/source.py | 223 +++++++++++++++---------- backend/connectors/usgs/transformer.py | 32 ++-- backend/transformer.py | 1 + 3 files changed, 144 insertions(+), 112 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index cac4f2a..675d546 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime import httpx from backend.connectors import NM_STATE_BOUNDING_POLYGON @@ -42,55 +41,28 @@ get_terminal_record, ) +""" +-- sites -- +https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items? +state_code=35 +site_type_code=GW -def parse_rdb(text): - """' - Parses rdb tab-delimited responses for NWIS Site Services - """ - - def line_generator(): - header = None - for line in text.split("\n"): - if line.startswith("#"): - continue - elif line.startswith("agency_cd"): - header = [h.strip() for h in line.split("\t")] - continue - elif line.startswith("5s"): - continue - elif line == "": - continue +-- water levels -- +https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items? +monitoring_location_id=<>&monitoring_location_id=<>... - vals = [v.strip() for v in line.split("\t")] - if header and any(vals): - yield dict(zip(header, vals)) +parameter_code=72019 +""" - return list(line_generator()) +KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" - -def parse_json(data): +def parse_waterlevels_json(data): """ - Parses JSON responses for NWIS Groundwater Level Services + Parses JSON responses for USGS field measurements (water levels) into a list of records with standardized keys. """ records = [] - for location in data["timeSeries"]: - site_code = location["sourceInfo"]["siteCode"][0]["value"] - agency = location["sourceInfo"]["siteCode"][0]["agencyCode"] - source_parameter_name = location["variable"]["variableName"] - source_parameter_units = location["variable"]["unit"]["unitCode"] - for value in location["values"][0]["value"]: - record = { - "site_id": f"{agency}-{site_code}", - "source_parameter_name": source_parameter_name, - "value": value["value"], - "datetime_measured": value["dateTime"], - # "date_measured": value["dateTime"].split("T")[0], - # "time_measured": value["dateTime"].split("T")[1], - "source_parameter_units": source_parameter_units, - } - records.append(record) - return records + class NWISSiteSource(BaseSiteSource): @@ -107,80 +79,147 @@ def tag(self): def health(self): try: - self._execute_text_request( - "https://waterservices.usgs.gov/nwis/site/", - { - "format": "rdb", - "siteOutput": "expanded", - "siteType": "GW", - "site": "325754103461301", - }, + params = { + "state_code": "35", + "site_type_code": "GW" + } + self._execute_json_request( + url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items", + params=params ) return True except httpx.HTTPStatusError: pass def get_records(self): - params = {"format": "rdb", "siteOutput": "expanded", "siteType": "GW"} + params = { + "site_type_code": "GW", + "limit": self.chunk_size, + } config = self.config if config.has_bounds(): bbox = config.bbox_bounding_points() - params["bBox"] = ",".join([str(b) for b in bbox]) + params["bbox"] = ",".join([str(b) for b in bbox]) else: - params["stateCd"] = "NM" + params["state_code"] = "35" + + # if config.start_date: + # params["startDt"] = config.start_dt.date().isoformat() + # if config.end_date: + # params["endDt"] = config.end_dt.date().isoformat() + + reached_end: bool = False + records: list = [] + sites_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items" + + """ + TODO + + update the site transformer to transform into standardized format + """ + + while not reached_end: + response = self._execute_json_request( + url=sites_url, + params=params, + headers={"X-API-Key": KEY} + ) - if config.start_date: - params["startDt"] = config.start_dt.date().isoformat() - if config.end_date: - params["endDt"] = config.end_dt.date().isoformat() + records.extend(response.get("features", [])) - text = self._execute_text_request( - "https://waterservices.usgs.gov/nwis/site/", params - ) - if text: - records = parse_rdb(text) - self.log(f"Retrieved {len(records)} records") - return records + found_next_link: bool = False + for link in response["links"]: + if link["rel"] == "next": + sites_url = link["href"] + params = None # next link already has params encoded + found_next_link = True + break + + if not found_next_link : + reached_end = True + + return records + + +# TODO: IMPLEMENT! and transform as necessary. keep in mind "next" links for pagination class NWISWaterLevelSource(BaseWaterLevelSource): transformer_klass = NWISWaterLevelTransformer + # chunk_size=5 to avoid URI length and httpx read timed out issue + chunk_size = 5 def __repr__(self): return "NWISWaterLevelSource" def get_records(self, site_record): - # query sites with the agency, which need to be in the form of "{agency}:{site number}" - sites = make_site_list(site_record) - sites_with_colons = [s.replace("-", ":") for s in sites] + records: list = [] - params = { - "format": "json", - "siteType": "GW", - "siteStatus": "all", - "parameterCd": "72019", - "sites": ",".join(sites_with_colons), - } + # if more than 5 sites are provided the URI is too long + sites: list = make_site_list(site_record) - config = self.config - if config.start_date: - params["startDt"] = config.start_dt.date().isoformat() - else: - params["startDt"] = "1900-01-01" - - if config.end_date: - params["endDt"] = config.end_dt.date().isoformat() - - data = self._execute_json_request( - url="https://waterservices.usgs.gov/nwis/gwlevels/", - params=params, - tag="value", - ) - if data: - records = parse_json(data) - self.log(f"Retrieved {len(records)} records") - return records + # chunk the sites into groups of 5 to avoid URI length issues + chunks_of_sites: list = [] + for i in range(0, len(sites), self.chunk_size): + chunks_of_sites.append(sites[i:i + self.chunk_size]) + + + for chunked_sites in chunks_of_sites: + delineated_sites: str = ",".join(chunked_sites) + + obs_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items" + + reached_end: bool = False + + params: dict = { + "parameter_code": "72019", + "monitoring_location_id": delineated_sites, + "limit": 500, + } + + config = self.config + # if config.start_date: + # params["startDt"] = config.start_dt.date().isoformat() + # else: + # params["startDt"] = "1900-01-01" + + # if config.end_date: + # params["endDt"] = config.end_dt.date().isoformat() + + while not reached_end: + response = self._execute_json_request( + url=obs_url, + params=params, + headers={"X-API-Key": KEY} + ) + + data: list[dict] = response.get("features", []) + if data: + for feature in data: + record = { + "site_id": feature["properties"]["monitoring_location_id"], + "source_parameter_name": "Water level, depth LSD", + "value": feature["properties"]["value"], + "datetime_measured": feature["properties"]["time"], + "source_parameter_units": feature["properties"]["unit_of_measure"] + } + records.append(record) + + found_next_link: bool = False + for link in response["links"]: + if link["rel"] == "next": + obs_url = link["href"] + params = None # next link already has params encoded + found_next_link = True + break + + if not found_next_link : + reached_end = True + self.log(f"Retrieved {len(records)} records") + + + return records def _extract_site_records(self, records, site_record): return [ri for ri in records if ri["site_id"] == site_record.id] diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 379b8bd..4060f9f 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -19,35 +19,27 @@ class NWISSiteTransformer(SiteTransformer): def _transform(self, record): - elevation = record["alt_va"] + elevation = record["properties"]["altitude"] try: elevation = float(elevation) except (ValueError, TypeError): elevation = None - lng = record["dec_long_va"] - lat = record["dec_lat_va"] - datum = record["coord_datum_cd"] - - # if not self.contained(lng, lat): - # return - - agency = record["agency_cd"] - site_no = record["site_no"] - site_id = f"{agency}-{site_no}" + # this data comes from OGC API, which requires the use of WGS84 for the horizontal datum + datum = "WGS84" rec = { - "source": "USGS-NWIS", - "id": site_id, - "name": record["station_nm"], - "latitude": lat, - "longitude": lng, + "source": "USGS", + "id": record["properties"]["id"], + "name": record["properties"]["monitoring_location_name"], + "latitude": record["geometry"]["coordinates"][1], + "longitude": record["geometry"]["coordinates"][0], "elevation": elevation, "elevation_units": "ft", - "horizontal_datum": datum, - "vertical_datum": record["alt_datum_cd"], - "aquifer": record["nat_aqfr_cd"], - "well_depth": record["well_depth_va"], + "horizontal_datum": "WGS84", + "vertical_datum": record["properties"]["vertical_datum"], + "aquifer": record["properties"]["national_aquifer_code"], + "well_depth": record["properties"]["well_constructed_depth"], "well_depth_units": "ft", } return rec diff --git a/backend/transformer.py b/backend/transformer.py index a42ca89..4c8cbb8 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -259,6 +259,7 @@ def standardize_datetime(dt, record_id): "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S+00:00", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S+00:00", "%Y-%m-%d %H:%M", From 036e9e7b1bb1c87df957a9f0d5e6adeadd4e91b3 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Tue, 28 Apr 2026 17:10:46 -0600 Subject: [PATCH 04/44] fix(usgs): update transformer for combined-metadata endpoint --- backend/connectors/usgs/transformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 4060f9f..f0344f2 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -30,7 +30,7 @@ def _transform(self, record): rec = { "source": "USGS", - "id": record["properties"]["id"], + "id": record["properties"]["monitoring_location_id"], "name": record["properties"]["monitoring_location_name"], "latitude": record["geometry"]["coordinates"][1], "longitude": record["geometry"]["coordinates"][0], From 2425094f599436447c7e53d8de59a31dac0a0f02 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Tue, 28 Apr 2026 17:10:50 -0600 Subject: [PATCH 05/44] fix(usgs): update USGS water level retrieval for new APIs Use the new USGS APIs to retrieve water level data. --- backend/connectors/usgs/source.py | 270 ++++++++++++++++++------------ 1 file changed, 159 insertions(+), 111 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 675d546..049c6f2 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -43,9 +43,10 @@ """ -- sites -- -https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items? +https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items? state_code=35 site_type_code=GW +parameter_code=72019 -- water levels -- https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items? @@ -56,19 +57,117 @@ KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" -def parse_waterlevels_json(data): - """ - Parses JSON responses for USGS field measurements (water levels) into a list of records with standardized keys. +def transform_usgs_waterlevels_record(record: dict) -> dict: + return { + "site_id": record["properties"]["monitoring_location_id"], + "source_parameter_name": "Water level, depth LSD", + "value": record["properties"]["value"], + "datetime_measured": record["properties"]["time"], + "source_parameter_units": record["properties"]["unit_of_measure"] + } + +def retrieve_usgs_data( + url: str, + json_data: dict, + headers: dict = None, + params: dict = None, + timeout: int = None, + transformation_hook=None +) -> list: """ - records = [] + Start with a POST request to retrieve the initial batch of data using complex queries, then + follow the "next" links in the response to retrieve all paginated data with GET requests. + The transformation_hook can be used to transform each batch of records as they are retrieved + """ + records: list = [] + + response = httpx.post( + url=url, + json=json_data, + headers=headers, + params=params, + timeout=timeout, + ) + data: dict = response.json() + features: list[dict] = data.get("features", []) + + if transformation_hook: + transformed_features = [transformation_hook(feature) for feature in features] + records.extend(transformed_features) + else: + records.extend(features) + # print(f"Retrieved {len(records)} records") + + found_next_link: bool = False + links: list = data.get("links", []) + for link in links: + if link["rel"] == "next": + next_link_url = link["href"] + found_next_link = True + break + + # use GET requests for the paginated responses after the initial POST to avoid issues with httpx and long URLs with many site ids + # USGS APIs use cursor pagination, so we can just follow the "next" links until there are no more + while found_next_link: + # print(f"Following next link: {next_link_url}") + response = httpx.get( + url=next_link_url, + headers=headers, + timeout=timeout, + ) + data: dict = response.json() + features = data.get("features", []) + if transformation_hook: + transformed_features = [transformation_hook(feature) for feature in features] + records.extend(transformed_features) + else: + records.extend(features) + + # print(f"Retrieved {len(records)} records") + + found_next_link: bool = False + links: list = data.get("links", []) + for link in links: + if link["rel"] == "next": + next_link_url = link["href"] + found_next_link = True + break + + return records class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer chunk_size = 500 bounding_polygon = NM_STATE_BOUNDING_POLYGON + json_data: dict = { + "op": "and", + "args": [ + { + "op": "in", + "args": [ + {"property": "state_code"}, + ["35"] + ] + }, + { + "op": "in", + "args": [ + {"property": "site_type_code"}, + ["GW"] + ] + }, + { + "op": "in", + "args": [ + {"property": "parameter_code"}, + ["72019"] + ] + } + ] + } def __repr__(self): return "NWISSiteSource" @@ -79,145 +178,94 @@ def tag(self): def health(self): try: - params = { - "state_code": "35", - "site_type_code": "GW" - } - self._execute_json_request( - url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items", - params=params + httpx.post( + url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items", + data=self.json_data, + headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + timeout=None ) return True except httpx.HTTPStatusError: pass def get_records(self): - params = { - "site_type_code": "GW", - "limit": self.chunk_size, - } - config = self.config + # TODO: handle date filters + # config = self.config - if config.has_bounds(): - bbox = config.bbox_bounding_points() - params["bbox"] = ",".join([str(b) for b in bbox]) - else: - params["state_code"] = "35" + # if config.has_bounds(): + # bbox = config.bbox_bounding_points() + # params["bbox"] = ",".join([str(b) for b in bbox]) + # else: + # params["state_code"] = "35" # if config.start_date: # params["startDt"] = config.start_dt.date().isoformat() # if config.end_date: # params["endDt"] = config.end_dt.date().isoformat() + sites_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items" - reached_end: bool = False - records: list = [] - sites_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/monitoring-locations/items" - - """ - TODO - - update the site transformer to transform into standardized format - """ - - while not reached_end: - response = self._execute_json_request( - url=sites_url, - params=params, - headers={"X-API-Key": KEY} - ) - - records.extend(response.get("features", [])) + data = self._execute_json_request( + url=sites_url, + params={"limit": 50000, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, + timeout=None, + headers={"X-API-Key": KEY}, + ) - found_next_link: bool = False - for link in response["links"]: - if link["rel"] == "next": - sites_url = link["href"] - params = None # next link already has params encoded - found_next_link = True - break - - if not found_next_link : - reached_end = True - + records: list = data.get("features", []) return records -# TODO: IMPLEMENT! and transform as necessary. keep in mind "next" links for pagination - class NWISWaterLevelSource(BaseWaterLevelSource): transformer_klass = NWISWaterLevelTransformer - # chunk_size=5 to avoid URI length and httpx read timed out issue - chunk_size = 5 + # USGS complex queries allow up to 250 sites to be queried at once + # https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries + num_sites = 250 def __repr__(self): return "NWISWaterLevelSource" def get_records(self, site_record): - records: list = [] - - # if more than 5 sites are provided the URI is too long - sites: list = make_site_list(site_record) + # TODO: handle date filters + # config = self.config + # if config.start_date: + # params["startDt"] = config.start_dt.date().isoformat() + # else: + # params["startDt"] = "1900-01-01" - # chunk the sites into groups of 5 to avoid URI length issues - chunks_of_sites: list = [] - for i in range(0, len(sites), self.chunk_size): - chunks_of_sites.append(sites[i:i + self.chunk_size]) + # if config.end_date: + # params["endDt"] = config.end_dt.date().isoformat() + records: list = [] + sites: list = make_site_list(site_record) - for chunked_sites in chunks_of_sites: - delineated_sites: str = ",".join(chunked_sites) - - obs_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items" + # group sites into batches of num_sites to pass to the API + # since USGS APIs allow up to 250 sites to be queried at once with complex queries + list_of_lists_of_sites: list = [] + for i in range(0, len(sites), self.num_sites): + list_of_lists_of_sites.append(sites[i:i + self.num_sites]) - reached_end: bool = False - params: dict = { - "parameter_code": "72019", - "monitoring_location_id": delineated_sites, - "limit": 500, + for list_of_sites in list_of_lists_of_sites: + json_data: dict = { + "op": "in", + "args": [ + {"property": "monitoring_location_id"}, + list_of_sites + ] } - config = self.config - # if config.start_date: - # params["startDt"] = config.start_dt.date().isoformat() - # else: - # params["startDt"] = "1900-01-01" - - # if config.end_date: - # params["endDt"] = config.end_dt.date().isoformat() - - while not reached_end: - response = self._execute_json_request( - url=obs_url, - params=params, - headers={"X-API-Key": KEY} - ) - - data: list[dict] = response.get("features", []) - if data: - for feature in data: - record = { - "site_id": feature["properties"]["monitoring_location_id"], - "source_parameter_name": "Water level, depth LSD", - "value": feature["properties"]["value"], - "datetime_measured": feature["properties"]["time"], - "source_parameter_units": feature["properties"]["unit_of_measure"] - } - records.append(record) - - found_next_link: bool = False - for link in response["links"]: - if link["rel"] == "next": - obs_url = link["href"] - params = None # next link already has params encoded - found_next_link = True - break - - if not found_next_link : - reached_end = True - self.log(f"Retrieved {len(records)} records") + records_batch: list = retrieve_usgs_data( + url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", + json_data=json_data, + headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + params={"limit": 50000, "parameter_code": "72019"}, + timeout=None, + transformation_hook=transform_usgs_waterlevels_record + ) + records.extend(records_batch) + self.log(f"Retrieved {len(records)} records") return records From 500a0b136dd3b6fa0994d59925cb1d2a05c41b82 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 10:41:45 -0600 Subject: [PATCH 06/44] fix(usgs): set limit to 50000 to avoid pagination/rate limits --- backend/connectors/usgs/source.py | 183 +++++++++--------------------- 1 file changed, 56 insertions(+), 127 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 049c6f2..da28158 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -41,133 +41,14 @@ get_terminal_record, ) -""" --- sites -- -https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items? -state_code=35 -site_type_code=GW -parameter_code=72019 - --- water levels -- -https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items? -monitoring_location_id=<>&monitoring_location_id=<>... - -parameter_code=72019 -""" KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" - -def transform_usgs_waterlevels_record(record: dict) -> dict: - return { - "site_id": record["properties"]["monitoring_location_id"], - "source_parameter_name": "Water level, depth LSD", - "value": record["properties"]["value"], - "datetime_measured": record["properties"]["time"], - "source_parameter_units": record["properties"]["unit_of_measure"] - } - -def retrieve_usgs_data( - url: str, - json_data: dict, - headers: dict = None, - params: dict = None, - timeout: int = None, - transformation_hook=None -) -> list: - """ - Start with a POST request to retrieve the initial batch of data using complex queries, then - follow the "next" links in the response to retrieve all paginated data with GET requests. - - The transformation_hook can be used to transform each batch of records as they are retrieved - """ - records: list = [] - - response = httpx.post( - url=url, - json=json_data, - headers=headers, - params=params, - timeout=timeout, - ) - data: dict = response.json() - features: list[dict] = data.get("features", []) - - if transformation_hook: - transformed_features = [transformation_hook(feature) for feature in features] - records.extend(transformed_features) - else: - records.extend(features) - - # print(f"Retrieved {len(records)} records") - - found_next_link: bool = False - links: list = data.get("links", []) - for link in links: - if link["rel"] == "next": - next_link_url = link["href"] - found_next_link = True - break - - # use GET requests for the paginated responses after the initial POST to avoid issues with httpx and long URLs with many site ids - # USGS APIs use cursor pagination, so we can just follow the "next" links until there are no more - while found_next_link: - # print(f"Following next link: {next_link_url}") - response = httpx.get( - url=next_link_url, - headers=headers, - timeout=timeout, - ) - data: dict = response.json() - features = data.get("features", []) - if transformation_hook: - transformed_features = [transformation_hook(feature) for feature in features] - records.extend(transformed_features) - else: - records.extend(features) - - # print(f"Retrieved {len(records)} records") - - found_next_link: bool = False - links: list = data.get("links", []) - for link in links: - if link["rel"] == "next": - next_link_url = link["href"] - found_next_link = True - break - - return records - +LIMIT = 50000 class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer chunk_size = 500 bounding_polygon = NM_STATE_BOUNDING_POLYGON - json_data: dict = { - "op": "and", - "args": [ - { - "op": "in", - "args": [ - {"property": "state_code"}, - ["35"] - ] - }, - { - "op": "in", - "args": [ - {"property": "site_type_code"}, - ["GW"] - ] - }, - { - "op": "in", - "args": [ - {"property": "parameter_code"}, - ["72019"] - ] - } - ] - } def __repr__(self): return "NWISSiteSource" @@ -206,7 +87,7 @@ def get_records(self): data = self._execute_json_request( url=sites_url, - params={"limit": 50000, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, + params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=None, headers={"X-API-Key": KEY}, ) @@ -240,7 +121,7 @@ def get_records(self, site_record): sites: list = make_site_list(site_record) # group sites into batches of num_sites to pass to the API - # since USGS APIs allow up to 250 sites to be queried at once with complex queries + # USGS APIs allow up to 250 sites to be queried at once with complex queries list_of_lists_of_sites: list = [] for i in range(0, len(sites), self.num_sites): list_of_lists_of_sites.append(sites[i:i + self.num_sites]) @@ -255,19 +136,67 @@ def get_records(self, site_record): ] } - records_batch: list = retrieve_usgs_data( + response = httpx.post( url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", - json_data=json_data, + json=json_data, headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, - params={"limit": 50000, "parameter_code": "72019"}, + params={"limit": LIMIT, "parameter_code": "72019"}, timeout=None, - transformation_hook=transform_usgs_waterlevels_record ) - records.extend(records_batch) + data: dict = response.json() + features: list[dict] = data.get("features", []) + + standard_features: list[dict] = [self._standardize_record(feature) for feature in features] + records.extend(standard_features) + + """ + The following commented-out code handles pagination for cases where there are more than LIMIT records for a given batch of sites. + However, in testing, I have not encountered any cases where this is necessary. Furthermore, cursor-based pagination is broken as + of 4/29/26 when the limit query parameter is used, and it can't be used in combination with other parameters via complex queries. + If we do encounter cases where there are more than LIMIT records, we can use the following code to handle pagination (when it is fixed). + + found_next_link: bool = False + links: list[dict] = data.get("links", []) + for link in links: + if link["rel"] == "next": + next_link_url = link["href"] + found_next_link = True + break + + # use GET requests for the paginated responses after the initial POST to avoid issues with httpx and long URLs with many site ids + # USGS APIs use cursor pagination, so we can just follow the "next" links until there are no more + while found_next_link: + response = httpx.get( + url=next_link_url, + headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + timeout=None, + ) + data: dict = response.json() + features: list[dict] = data.get("features", []) + standard_features: list[dict] = [self._standardize_record(feature) for feature in features] + records.extend(standard_features) + + found_next_link: bool = False + links: list = data.get("links", []) + for link in links: + if link["rel"] == "next": + next_link_url = link["href"] + found_next_link = True + break + """ self.log(f"Retrieved {len(records)} records") return records + + def _standardize_record(self, record: dict) -> dict: + return { + "site_id": record["properties"]["monitoring_location_id"], + "source_parameter_name": "Water level, depth LSD", + "value": record["properties"]["value"], + "datetime_measured": record["properties"]["time"], + "source_parameter_units": record["properties"]["unit_of_measure"] + } def _extract_site_records(self, records, site_record): return [ri for ri in records if ri["site_id"] == site_record.id] From d93869d6584778e6ca7588cb43dbd79e16341ed7 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 11:09:12 -0600 Subject: [PATCH 07/44] refactor(usgs): check usgs health with get request --- backend/connectors/usgs/source.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index da28158..9f2f6ed 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -49,6 +49,7 @@ class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer chunk_size = 500 bounding_polygon = NM_STATE_BOUNDING_POLYGON + sites_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items" def __repr__(self): return "NWISSiteSource" @@ -59,12 +60,12 @@ def tag(self): def health(self): try: - httpx.post( - url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items", - data=self.json_data, - headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, - timeout=None - ) + data = self._execute_json_request( + url=self.sites_url, + params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, + timeout=None, + headers={"X-API-Key": KEY}, + ) return True except httpx.HTTPStatusError: pass @@ -83,10 +84,9 @@ def get_records(self): # params["startDt"] = config.start_dt.date().isoformat() # if config.end_date: # params["endDt"] = config.end_dt.date().isoformat() - sites_url: str = "https://api.waterdata.usgs.gov/ogcapi/v0/collections/combined-metadata/items" data = self._execute_json_request( - url=sites_url, + url=self.sites_url, params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=None, headers={"X-API-Key": KEY}, From 12247f52e69f1da48bba081a6cfdb02ab525346d Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 11:14:02 -0600 Subject: [PATCH 08/44] fix(config): set agencies for bicarbonate --- backend/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/config.py b/backend/config.py index 17a7bee..d45e95d 100644 --- a/backend/config.py +++ b/backend/config.py @@ -294,7 +294,6 @@ def get_config_and_false_agencies(self): "pvacd", ] elif self.parameter in [ - BICARBONATE, CALCIUM, CHLORIDE, FLUORIDE, From b34292207f123aaf68143f849435ff1a8f25d9cb Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:23:58 -0600 Subject: [PATCH 09/44] fix(usgs): add timeout to usgs api calls and retry logic if errors --- backend/connectors/usgs/source.py | 46 ++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 9f2f6ed..ecb0224 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -44,6 +44,7 @@ KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" LIMIT = 50000 +TIMEOUT=1800 class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer @@ -85,12 +86,22 @@ def get_records(self): # if config.end_date: # params["endDt"] = config.end_dt.date().isoformat() - data = self._execute_json_request( - url=self.sites_url, - params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, - timeout=None, - headers={"X-API-Key": KEY}, - ) + finished_request: bool = False + while not finished_request: + try: + data = self._execute_json_request( + url=self.sites_url, + params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, + timeout=TIMEOUT, + headers={"X-API-Key": KEY}, + ) + # _execute_json_request returns None for non-200 responses, so we need to check for that as well + if data is None: + self.warning("Retrying...") + else: + finished_request = True + except Exception as e: + self.warning(f"Error retrieving site records: {e}. Retrying...") records: list = data.get("features", []) @@ -135,14 +146,23 @@ def get_records(self, site_record): list_of_sites ] } + finished_request: bool = False + while not finished_request: + try: + response = httpx.post( + url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", + json=json_data, + headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + params={"limit": LIMIT, "parameter_code": "72019"}, + timeout=None, + ) + if response.status_code != 200: + self.warning(f"Received status code {response.status_code} for sites {list_of_sites}. Retrying...") + else: + finished_request = True + except Exception as e: + self.warning(f"Error retrieving water level records: {e}. Retrying...") - response = httpx.post( - url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", - json=json_data, - headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, - params={"limit": LIMIT, "parameter_code": "72019"}, - timeout=None, - ) data: dict = response.json() features: list[dict] = data.get("features", []) From 3f2d104cb35cf154f91f986b38a7b62537e729f3 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:25:32 -0600 Subject: [PATCH 10/44] fix(usgs): add retry logic to pagination retrieval --- backend/connectors/usgs/source.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index ecb0224..6b850ae 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -154,7 +154,7 @@ def get_records(self, site_record): json=json_data, headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, params={"limit": LIMIT, "parameter_code": "72019"}, - timeout=None, + timeout=TIMEOUT, ) if response.status_code != 200: self.warning(f"Received status code {response.status_code} for sites {list_of_sites}. Retrying...") @@ -186,11 +186,21 @@ def get_records(self, site_record): # use GET requests for the paginated responses after the initial POST to avoid issues with httpx and long URLs with many site ids # USGS APIs use cursor pagination, so we can just follow the "next" links until there are no more while found_next_link: - response = httpx.get( - url=next_link_url, - headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, - timeout=None, - ) + finished_request: bool = False + while not finished_request: + try: + response = httpx.get( + url=next_link_url, + headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + timeout=TIMEOUT, + ) + if response.status_code != 200: + self.warning(f"Received status code {response.status_code} for paginated request. Retrying...") + else: + finished_request = True + except Exception as e: + self.warning(f"Error retrieving paginated water level records: {e}. Retrying... + data: dict = response.json() features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] From 6005f70de6c95db366f0e2f220bee0d5ea0b62ef Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:26:34 -0600 Subject: [PATCH 11/44] refactor(usgs): set datum in site transformer --- backend/connectors/usgs/transformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index f0344f2..bfffd4c 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -36,7 +36,7 @@ def _transform(self, record): "longitude": record["geometry"]["coordinates"][0], "elevation": elevation, "elevation_units": "ft", - "horizontal_datum": "WGS84", + "horizontal_datum": datum, "vertical_datum": record["properties"]["vertical_datum"], "aquifer": record["properties"]["national_aquifer_code"], "well_depth": record["properties"]["well_constructed_depth"], From dba8213abbb41ec1b515a30ccce46b07668398d6 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:29:33 -0600 Subject: [PATCH 12/44] fix(usgs): return False if health check fails --- backend/connectors/usgs/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 6b850ae..7cf5e72 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -69,7 +69,7 @@ def health(self): ) return True except httpx.HTTPStatusError: - pass + return False def get_records(self): # TODO: handle date filters From b17bbde11585d3ea976cb098c4646098b43f661f Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:31:37 -0600 Subject: [PATCH 13/44] fix(usgs): shorten error message --- backend/connectors/usgs/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 7cf5e72..957afd0 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -157,7 +157,7 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code != 200: - self.warning(f"Received status code {response.status_code} for sites {list_of_sites}. Retrying...") + self.warning(f"Received status code {response.status_code}. Retrying...") else: finished_request = True except Exception as e: From 5570c36e9a1a8694485baea6f4a7fa3417ea11df Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:34:22 -0600 Subject: [PATCH 14/44] fix(usgs): set limit of 1 for health check --- backend/connectors/usgs/source.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 957afd0..1084e33 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -61,12 +61,12 @@ def tag(self): def health(self): try: - data = self._execute_json_request( - url=self.sites_url, - params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, - timeout=None, - headers={"X-API-Key": KEY}, - ) + self._execute_json_request( + url=self.sites_url, + params={"limit": 1, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, + timeout=TIMEOUT, + headers={"X-API-Key": KEY}, + ) return True except httpx.HTTPStatusError: return False From 836243e109e19896dfade0225fab10281d9feb40 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:35:12 -0600 Subject: [PATCH 15/44] fix(usgs): remove unused imports --- backend/connectors/usgs/source.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 1084e33..9863e92 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -17,24 +17,19 @@ from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.constants import ( - FEET, DTW, - DTW_UNITS, DT_MEASURED, PARAMETER_NAME, PARAMETER_VALUE, PARAMETER_UNITS, SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, - EARLIEST, - LATEST, ) from backend.connectors.usgs.transformer import ( NWISSiteTransformer, NWISWaterLevelTransformer, ) from backend.source import ( - BaseSource, BaseWaterLevelSource, BaseSiteSource, make_site_list, From a51b13e1c900214d783ec9114654fd5ffe55cfa9 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 13:22:57 -0600 Subject: [PATCH 16/44] fix(usgs): call self.warn not warning --- backend/connectors/usgs/source.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 9863e92..7963f69 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -92,11 +92,11 @@ def get_records(self): ) # _execute_json_request returns None for non-200 responses, so we need to check for that as well if data is None: - self.warning("Retrying...") + self.warn("Retrying...") else: finished_request = True except Exception as e: - self.warning(f"Error retrieving site records: {e}. Retrying...") + self.warn(f"Error retrieving site records: {e}. Retrying...") records: list = data.get("features", []) @@ -152,11 +152,11 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code != 200: - self.warning(f"Received status code {response.status_code}. Retrying...") + self.warn(f"Received status code {response.status_code}. Retrying...") else: finished_request = True except Exception as e: - self.warning(f"Error retrieving water level records: {e}. Retrying...") + self.warn(f"Error retrieving water level records: {e}. Retrying...") data: dict = response.json() features: list[dict] = data.get("features", []) @@ -190,11 +190,11 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code != 200: - self.warning(f"Received status code {response.status_code} for paginated request. Retrying...") + self.warn(f"Received status code {response.status_code} for paginated request. Retrying...") else: finished_request = True except Exception as e: - self.warning(f"Error retrieving paginated water level records: {e}. Retrying... + self.warn(f"Error retrieving paginated water level records: {e}. Retrying... data: dict = response.json() features: list[dict] = data.get("features", []) From 31271c670c64bd010a1d85e14884b8ce0e192242 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 10:27:57 -0600 Subject: [PATCH 17/44] fix(usgs): remove hardcoded API key --- backend/connectors/usgs/source.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 7963f69..c5eedb5 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -37,9 +37,9 @@ ) -KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" +# KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" LIMIT = 50000 -TIMEOUT=1800 +TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer @@ -60,7 +60,6 @@ def health(self): url=self.sites_url, params={"limit": 1, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=TIMEOUT, - headers={"X-API-Key": KEY}, ) return True except httpx.HTTPStatusError: @@ -88,7 +87,6 @@ def get_records(self): url=self.sites_url, params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=TIMEOUT, - headers={"X-API-Key": KEY}, ) # _execute_json_request returns None for non-200 responses, so we need to check for that as well if data is None: @@ -147,7 +145,7 @@ def get_records(self, site_record): response = httpx.post( url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", json=json_data, - headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + headers={"Content-Type": "application/query-cql-json"}, params={"limit": LIMIT, "parameter_code": "72019"}, timeout=TIMEOUT, ) @@ -186,7 +184,7 @@ def get_records(self, site_record): try: response = httpx.get( url=next_link_url, - headers={"X-API-Key": KEY, "Content-Type": "application/query-cql-json"}, + headers={"Content-Type": "application/query-cql-json"}, timeout=TIMEOUT, ) if response.status_code != 200: From deed95f063c5320c946e66f4b8c0b5404ff80bcc Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 10:47:10 -0600 Subject: [PATCH 18/44] fix(usgs): have users pass USGS API keys --- README.md | 11 ++++++++++- backend/connectors/usgs/source.py | 21 +++++++++++++++++---- frontend/cli.py | 21 ++++++++++++++++++++- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index bb05d17..1ae7243 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,9 @@ Data comes from the following sources. We are continuously adding new sources as - Available data: `water levels` - [Pecos Valley Artesian Conservancy District (PVACD)](https://st2.newmexicowaterdata.org/FROST-Server/v1.1/Locations?$filter=properties/agency%20eq%20%27PVACD%27) - Available data: `water levels` -- [USGS (NWIS)](https://waterdata.usgs.gov/nwis) +- [USGS (NWIS)](https://api.waterdata.usgs.gov/docs/) - Available data: `water levels` + - **IMPORTANT** The USGS now uses API keys. To prevent yourself from hitting the rate limit please acquire an API key and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. - [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) - Available data: `water levels`, `water quality` @@ -189,6 +190,14 @@ The Data Integration Engine enables the user to obtain groundwater level and gro - `--no-pvacd` to exclude Pecos Valley Artesian Convservancy District (PVACD) data - `--no-wqp` to exclude Water Quality Portal (WQP) data +### USGS API Keys + +The USGS now uses [API keys](https://api.waterdata.usgs.gov/signup/) to increase the query rate limit to their APIs. If you intend to include USGS water level data in your output please acquire an API key, save it somewhere, and provide it via the `--usgs-api-key` flag. For example: + +``` +die weave waterlevels --output-type timeseries_unified --usgs-api-key FAKE_API_KEY +``` + ### Geographic Filters [In Development] The following flags can be used to geographically filter data: diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index c5eedb5..4a4dcbf 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -14,6 +14,7 @@ # limitations under the License. # =============================================================================== import httpx +import os from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.constants import ( @@ -36,8 +37,6 @@ get_terminal_record, ) - -# KEY = "55MILtQrayXw1NgufxcqRfkkRrg4Rg6KNCyJZ004" LIMIT = 50000 TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests @@ -56,10 +55,15 @@ def tag(self): def health(self): try: + if os.environ.get("USGS_API_KEY") is not None: + headers = {"X-API-Key": os.environ["USGS_API_KEY"]} + else: + headers = {} self._execute_json_request( url=self.sites_url, params={"limit": 1, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=TIMEOUT, + headers=headers ) return True except httpx.HTTPStatusError: @@ -83,10 +87,15 @@ def get_records(self): finished_request: bool = False while not finished_request: try: + if os.environ.get("USGS_API_KEY") is not None: + headers = {"X-API-Key": os.environ["USGS_API_KEY"]} + else: + headers = {} data = self._execute_json_request( url=self.sites_url, params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=TIMEOUT, + headers=headers ) # _execute_json_request returns None for non-200 responses, so we need to check for that as well if data is None: @@ -142,10 +151,14 @@ def get_records(self, site_record): finished_request: bool = False while not finished_request: try: + if os.environ.get("USGS_API_KEY") is not None: + headers = {"X-API-Key": os.environ["USGS_API_KEY"], "Content-Type": "application/query-cql-json"} + else: + headers = {"Content-Type": "application/query-cql-json"} response = httpx.post( url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", json=json_data, - headers={"Content-Type": "application/query-cql-json"}, + headers=headers, params={"limit": LIMIT, "parameter_code": "72019"}, timeout=TIMEOUT, ) @@ -184,7 +197,7 @@ def get_records(self, site_record): try: response = httpx.get( url=next_link_url, - headers={"Content-Type": "application/query-cql-json"}, + headers=headers, timeout=TIMEOUT, ) if response.status_code != 200: diff --git a/frontend/cli.py b/frontend/cli.py index 3efb46b..34a3a34 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import sys +import os import click @@ -206,6 +206,13 @@ def cli(): ), ] +USGS_API_KEY_OPTION = [ + click.option( + "--usgs-api-key", + default=None, + help="USGS API key. Can also be set via USGS_API_KEY environment variable", + ) +] def add_options(options): def _add_options(func): @@ -230,6 +237,7 @@ def _add_options(func): @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) @add_options(OUTPUT_FORMAT_OPTIONS) +@add_options(USGS_API_KEY_OPTION) def weave( parameter, config_path, @@ -256,10 +264,15 @@ def weave( dry, yes, output_format, + usgs_api_key, ): """ Get parameter timeseries or summary data """ + # set USGS_API_KEY environment variable if usgs_api_key is provided + if usgs_api_key is not None: + os.environ["USGS_API_KEY"] = usgs_api_key + # instantiate config and set up parameter config = setup_config( tag=parameter, @@ -334,6 +347,7 @@ def weave( @add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) @add_options(OUTPUT_FORMAT_OPTIONS) +@add_options(USGS_API_KEY_OPTION) def sites( config_path, bbox, @@ -356,10 +370,15 @@ def sites( dry, yes, output_format, + usgs_api_key, ): """ Get sites """ + # set USGS_API_KEY environment variable if usgs_api_key is provided + if usgs_api_key is not None: + os.environ["USGS_API_KEY"] = usgs_api_key + config = setup_config( "sites", config_path, bbox, county, wkt, site_limit, dry, output_format ) From 539fb13fd705b138c4cea4ff2c05052537bd6f97 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 10:49:21 -0600 Subject: [PATCH 19/44] fix(usgs): update README documentation --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1ae7243..3f0f0a7 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Data comes from the following sources. We are continuously adding new sources as - Available data: `water levels` - [USGS (NWIS)](https://api.waterdata.usgs.gov/docs/) - Available data: `water levels` - - **IMPORTANT** The USGS now uses API keys. To prevent yourself from hitting the rate limit please acquire an API key and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. + - **IMPORTANT** The USGS now uses API keys. To prevent yourself from hitting the rate limit please acquire an API key, save it, and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. - [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) - Available data: `water levels`, `water quality` From 6897f28a7feb35363f66284ae9f100e8f89a4b4f Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 10:51:17 -0600 Subject: [PATCH 20/44] fix(documentation): make README more readable --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3f0f0a7..16317e8 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Data comes from the following sources. We are continuously adding new sources as - Available data: `water levels` - [USGS (NWIS)](https://api.waterdata.usgs.gov/docs/) - Available data: `water levels` - - **IMPORTANT** The USGS now uses API keys. To prevent yourself from hitting the rate limit please acquire an API key, save it, and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. + - **IMPORTANT:** The USGS now uses API keys. To prevent yourself from hitting the rate limit please acquire an API key, save it, and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. - [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) - Available data: `water levels`, `water quality` From 1d2bac9729430c9164eedc781d222e555961621e Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 10:52:11 -0600 Subject: [PATCH 21/44] fix(documentation): link to USGS API key acquisition --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 16317e8..e8db69e 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Data comes from the following sources. We are continuously adding new sources as - Available data: `water levels` - [USGS (NWIS)](https://api.waterdata.usgs.gov/docs/) - Available data: `water levels` - - **IMPORTANT:** The USGS now uses API keys. To prevent yourself from hitting the rate limit please acquire an API key, save it, and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. + - **IMPORTANT:** The USGS now uses API keys. To prevent yourself from hitting the rate limit please [acquire an API key](https://api.waterdata.usgs.gov/signup/), save it, and provide it via the `--usgs-api-key` flag when gathering water level data from the USGS. - [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) - Available data: `water levels`, `water quality` From 5f886bb455f430140cb5b86cf3111d76ba6250be Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 11:28:45 -0600 Subject: [PATCH 22/44] fix(usgs): exit if rate limit exceeded --- backend/connectors/usgs/source.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 4a4dcbf..cdc6689 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -15,6 +15,7 @@ # =============================================================================== import httpx import os +import sys from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.constants import ( @@ -91,13 +92,21 @@ def get_records(self): headers = {"X-API-Key": os.environ["USGS_API_KEY"]} else: headers = {} - data = self._execute_json_request( + response = httpx.get( url=self.sites_url, params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=TIMEOUT, headers=headers ) - # _execute_json_request returns None for non-200 responses, so we need to check for that as well + + if response.status_code == 429: + self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") + sys.exit(1) + elif response.status_code != 200: + self.warn(f"Received status code {response.status_code}. Retrying...") + continue + else: + data = response.json() if data is None: self.warn("Retrying...") else: @@ -162,7 +171,10 @@ def get_records(self, site_record): params={"limit": LIMIT, "parameter_code": "72019"}, timeout=TIMEOUT, ) - if response.status_code != 200: + if response.status_code == 429: + self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") + sys.exit(1) + elif response.status_code != 200: self.warn(f"Received status code {response.status_code}. Retrying...") else: finished_request = True From f9908fdc4b562176644b90840966f3f44945c633 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 12:38:54 -0600 Subject: [PATCH 23/44] fix(usgs): disallow partial records if a rate limit error is hit the tool will inform the user if a rate limit error is hit and will not save any records for USGS if a rate limit error is hit, so there won't be partial records for USGS. The tool will still save records for othe sources if a rate limit error is hit for USGS since the user may want to save those records and they won't be affected by the rate limit error for USGS. --- backend/connectors/usgs/source.py | 17 +++++++++++----- backend/unifier.py | 34 ++++++++++++++++++++++++------- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index cdc6689..90ab60c 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -15,7 +15,6 @@ # =============================================================================== import httpx import os -import sys from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.constants import ( @@ -41,6 +40,10 @@ LIMIT = 50000 TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests + +class USGSRateLimitError(Exception): + pass + class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer chunk_size = 500 @@ -100,8 +103,7 @@ def get_records(self): ) if response.status_code == 429: - self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") - sys.exit(1) + raise USGSRateLimitError() elif response.status_code != 200: self.warn(f"Received status code {response.status_code}. Retrying...") continue @@ -111,6 +113,9 @@ def get_records(self): self.warn("Retrying...") else: finished_request = True + except USGSRateLimitError: + self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") + raise USGSRateLimitError("Rate limit exceeded") except Exception as e: self.warn(f"Error retrieving site records: {e}. Retrying...") @@ -172,12 +177,14 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code == 429: - self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") - sys.exit(1) + raise USGSRateLimitError("Rate limit exceeded") elif response.status_code != 200: self.warn(f"Received status code {response.status_code}. Retrying...") else: finished_request = True + except USGSRateLimitError: + self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") + raise USGSRateLimitError("Rate limit exceeded") except Exception as e: self.warn(f"Error retrieving water level records: {e}. Retrying...") diff --git a/backend/unifier.py b/backend/unifier.py index b070631..130cae9 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -21,6 +21,7 @@ from backend.persister import BasePersister from backend.persisters.geoserver import GeoServerPersister from backend.source import BaseSiteSource +from backend.connectors.usgs.source import USGSRateLimitError def health_check(source: BaseSiteSource) -> bool | None: @@ -131,10 +132,17 @@ def _site_wrapper(site_source, parameter_source, persister, config): # in the future make discover required # return + # used to revert back to initial state if a rate limit error is hit, so there aren't partial records + initial_sites_len = len(persister.sites) + initial_timeseries_len = len(persister.timeseries) + use_summarize = config.output_summary site_limit = config.site_limit - sites = site_source.read() + try: + sites = site_source.read() + except USGSRateLimitError: + sites = [] if not sites: return @@ -158,18 +166,30 @@ def _site_wrapper(site_source, parameter_source, persister, config): end_ind += n if use_summarize: - summary_records = parameter_source.read( - site_records, use_summarize, start_ind, end_ind - ) + try: + summary_records = parameter_source.read( + site_records, use_summarize, start_ind, end_ind + ) + except USGSRateLimitError: + # if a rate limit error is hit we want to remove USGS sites so there aren't partial records + persister.sites = persister.sites[:initial_sites_len] + persister.timeseries = persister.timeseries[:initial_timeseries_len] + break if summary_records: persister.records.extend(summary_records) sites_with_records_count += len(summary_records) else: continue else: - results = parameter_source.read( - site_records, use_summarize, start_ind, end_ind - ) + try: + results = parameter_source.read( + site_records, use_summarize, start_ind, end_ind + ) + except USGSRateLimitError: + # if a rate limit error is hit we want to remove USGS sites so there aren't partial records + persister.sites = persister.sites[:initial_sites_len] + persister.timeseries = persister.timeseries[:initial_timeseries_len] + break # no records are returned if there is no site record for parameter # or if the record isn't clean (doesn't have the correct fields) # don't count these sites to apply to site_limit From 7082957cc8e473a53498b35f4f98487a9b3ec5e3 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 12:57:56 -0600 Subject: [PATCH 24/44] fix(usgs): implement retries with exponential backoff --- backend/connectors/usgs/source.py | 71 +++++++++++++++++++------------ 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 90ab60c..b1214d4 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -15,6 +15,7 @@ # =============================================================================== import httpx import os +import time from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.constants import ( @@ -39,7 +40,7 @@ LIMIT = 50000 TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests - +MAX_RETRIES = 7 class USGSRateLimitError(Exception): pass @@ -88,8 +89,9 @@ def get_records(self): # if config.end_date: # params["endDt"] = config.end_dt.date().isoformat() - finished_request: bool = False - while not finished_request: + tries: int = 0 + + while tries < MAX_RETRIES: try: if os.environ.get("USGS_API_KEY") is not None: headers = {"X-API-Key": os.environ["USGS_API_KEY"]} @@ -102,23 +104,23 @@ def get_records(self): headers=headers ) - if response.status_code == 429: + if response.status_code == 200: + break + elif response.status_code == 429: raise USGSRateLimitError() - elif response.status_code != 200: - self.warn(f"Received status code {response.status_code}. Retrying...") - continue - else: - data = response.json() - if data is None: - self.warn("Retrying...") else: - finished_request = True + self.warn(f"Received status code {response.status_code}. Retrying... {tries + 1}/{MAX_RETRIES}") + except USGSRateLimitError: self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") raise USGSRateLimitError("Rate limit exceeded") except Exception as e: - self.warn(f"Error retrieving site records: {e}. Retrying...") + self.warn(f"Error retrieving site records: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") + + tries += 1 + time.sleep(tries) + data: dict = response.json() records: list = data.get("features", []) return records @@ -162,8 +164,9 @@ def get_records(self, site_record): list_of_sites ] } - finished_request: bool = False - while not finished_request: + tries: int = 0 + + while tries < MAX_RETRIES: try: if os.environ.get("USGS_API_KEY") is not None: headers = {"X-API-Key": os.environ["USGS_API_KEY"], "Content-Type": "application/query-cql-json"} @@ -176,17 +179,21 @@ def get_records(self, site_record): params={"limit": LIMIT, "parameter_code": "72019"}, timeout=TIMEOUT, ) - if response.status_code == 429: - raise USGSRateLimitError("Rate limit exceeded") - elif response.status_code != 200: - self.warn(f"Received status code {response.status_code}. Retrying...") + if response.status_code == 200: + break + elif response.status_code == 429: + raise USGSRateLimitError() else: - finished_request = True + self.warn(f"Received status code {response.status_code}. Retrying... {tries + 1}/{MAX_RETRIES}") + except USGSRateLimitError: self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") raise USGSRateLimitError("Rate limit exceeded") except Exception as e: - self.warn(f"Error retrieving water level records: {e}. Retrying...") + self.warn(f"Error retrieving water level records: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") + + tries += 1 + time.sleep(tries) data: dict = response.json() features: list[dict] = data.get("features", []) @@ -211,21 +218,29 @@ def get_records(self, site_record): # use GET requests for the paginated responses after the initial POST to avoid issues with httpx and long URLs with many site ids # USGS APIs use cursor pagination, so we can just follow the "next" links until there are no more while found_next_link: - finished_request: bool = False - while not finished_request: + tries: int = 0 + while tries < MAX_RETRIES: try: response = httpx.get( url=next_link_url, headers=headers, timeout=TIMEOUT, ) - if response.status_code != 200: - self.warn(f"Received status code {response.status_code} for paginated request. Retrying...") + if response.status_code == 200: + break + elif response.status_code == 429: + raise USGSRateLimitError() else: - finished_request = True + self.warn(f"Received status code {response.status_code} for paginated request. Retrying... {tries + 1}/{MAX_RETRIES}") + except USGSRateLimitError: + self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") + raise USGSRateLimitError("Rate limit exceeded") except Exception as e: - self.warn(f"Error retrieving paginated water level records: {e}. Retrying... - + self.warn(f"Error retrieving paginated water level records: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") + + tries += 1 + time.sleep(tries) + data: dict = response.json() features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] From 148208b46a9a07ba48bdce6993848babe17b3e2a Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:00:14 -0600 Subject: [PATCH 25/44] refactor(exceptions): put custom exceptions in own script --- backend/connectors/usgs/source.py | 4 +--- backend/exceptions.py | 2 ++ backend/unifier.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 backend/exceptions.py diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index b1214d4..7a385a2 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -31,6 +31,7 @@ NWISSiteTransformer, NWISWaterLevelTransformer, ) +from backend.exceptions import USGSRateLimitError from backend.source import ( BaseWaterLevelSource, BaseSiteSource, @@ -42,9 +43,6 @@ TIMEOUT=15*60 # 15 minutes, to allow for retries and large requests MAX_RETRIES = 7 -class USGSRateLimitError(Exception): - pass - class NWISSiteSource(BaseSiteSource): transformer_klass = NWISSiteTransformer chunk_size = 500 diff --git a/backend/exceptions.py b/backend/exceptions.py new file mode 100644 index 0000000..1a1d0eb --- /dev/null +++ b/backend/exceptions.py @@ -0,0 +1,2 @@ +class USGSRateLimitError(Exception): + pass \ No newline at end of file diff --git a/backend/unifier.py b/backend/unifier.py index 130cae9..1dd3bed 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -21,7 +21,7 @@ from backend.persister import BasePersister from backend.persisters.geoserver import GeoServerPersister from backend.source import BaseSiteSource -from backend.connectors.usgs.source import USGSRateLimitError +from backend.exceptions import USGSRateLimitError def health_check(source: BaseSiteSource) -> bool | None: From f760bbb7f225783810025fc966c4af1ea427946a Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:02:52 -0600 Subject: [PATCH 26/44] fix(usgs): catch JSONDecodeError and retry --- backend/connectors/usgs/source.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 7a385a2..58f4841 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -16,6 +16,7 @@ import httpx import os import time +import json from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.constants import ( @@ -103,6 +104,7 @@ def get_records(self): ) if response.status_code == 200: + data: dict = response.json() break elif response.status_code == 429: raise USGSRateLimitError() @@ -112,13 +114,14 @@ def get_records(self): except USGSRateLimitError: self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") raise USGSRateLimitError("Rate limit exceeded") + except json.JSONDecodeError as e: + self.warn(f"Failed to decode JSON response: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") except Exception as e: self.warn(f"Error retrieving site records: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") tries += 1 time.sleep(tries) - data: dict = response.json() records: list = data.get("features", []) return records @@ -178,6 +181,7 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code == 200: + data: dict = response.json() break elif response.status_code == 429: raise USGSRateLimitError() @@ -187,13 +191,14 @@ def get_records(self, site_record): except USGSRateLimitError: self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") raise USGSRateLimitError("Rate limit exceeded") + except json.JSONDecodeError as e: + self.warn(f"Failed to decode JSON response: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") except Exception as e: self.warn(f"Error retrieving water level records: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") tries += 1 time.sleep(tries) - data: dict = response.json() features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] @@ -225,6 +230,7 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code == 200: + data: dict = response.json() break elif response.status_code == 429: raise USGSRateLimitError() @@ -233,13 +239,14 @@ def get_records(self, site_record): except USGSRateLimitError: self.warn("Rate limit exceeded. Please provide a valid USGS API key via the --usgs-api-key flag to increase your rate limit and try again.") raise USGSRateLimitError("Rate limit exceeded") + except json.JSONDecodeError as e: + self.warn(f"Failed to decode JSON response: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") except Exception as e: self.warn(f"Error retrieving paginated water level records: {e}. Retrying... {tries + 1}/{MAX_RETRIES}") tries += 1 time.sleep(tries) - data: dict = response.json() features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] records.extend(standard_features) From 1aff1108d07007fcd9534cf2b2e7aa7ff7a2858b Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:04:48 -0600 Subject: [PATCH 27/44] fix(usgs): check status_code for health check --- backend/connectors/usgs/source.py | 7 +++++-- backend/source.py | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 58f4841..2a12d2e 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -63,13 +63,16 @@ def health(self): headers = {"X-API-Key": os.environ["USGS_API_KEY"]} else: headers = {} - self._execute_json_request( + response = httpx.get( url=self.sites_url, params={"limit": 1, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, timeout=TIMEOUT, headers=headers ) - return True + if response.status_code == 200: + return True + else: + return False except httpx.HTTPStatusError: return False diff --git a/backend/source.py b/backend/source.py index ff6160c..be4208d 100644 --- a/backend/source.py +++ b/backend/source.py @@ -252,7 +252,6 @@ def _execute_json_request( the json response """ resp = httpx.get(url, params=params, **kw) - if resp.status_code == 200: try: obj = resp.json() From 0af670eacfb40cc85809a2c1b51f72993b1b1bf8 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:06:25 -0600 Subject: [PATCH 28/44] fix(usgs): revert persister.records on partial completion --- backend/unifier.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/unifier.py b/backend/unifier.py index 1dd3bed..774014e 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -135,6 +135,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): # used to revert back to initial state if a rate limit error is hit, so there aren't partial records initial_sites_len = len(persister.sites) initial_timeseries_len = len(persister.timeseries) + initial_records_len = len(persister.records) use_summarize = config.output_summary site_limit = config.site_limit @@ -174,6 +175,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): # if a rate limit error is hit we want to remove USGS sites so there aren't partial records persister.sites = persister.sites[:initial_sites_len] persister.timeseries = persister.timeseries[:initial_timeseries_len] + persister.records = persister.records[:initial_records_len] break if summary_records: persister.records.extend(summary_records) From 690ba4190b5896812e3e342ce80c76cbbd48aae1 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:26:38 -0600 Subject: [PATCH 29/44] fix(partial records): don't persist partial records --- backend/connectors/usgs/source.py | 24 ++++++++++++++++++++---- backend/exceptions.py | 3 +++ backend/unifier.py | 10 +++++----- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 2a12d2e..540a5f5 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -32,7 +32,7 @@ NWISSiteTransformer, NWISWaterLevelTransformer, ) -from backend.exceptions import USGSRateLimitError +from backend.exceptions import USGSRateLimitError, PartialOrNoDataError from backend.source import ( BaseWaterLevelSource, BaseSiteSource, @@ -91,6 +91,7 @@ def get_records(self): # if config.end_date: # params["endDt"] = config.end_dt.date().isoformat() + data: dict = {} tries: int = 0 while tries < MAX_RETRIES: @@ -107,7 +108,7 @@ def get_records(self): ) if response.status_code == 200: - data: dict = response.json() + data = response.json() break elif response.status_code == 429: raise USGSRateLimitError() @@ -125,6 +126,10 @@ def get_records(self): tries += 1 time.sleep(tries) + if data == {}: + self.warn("Failed to retrieve site records after multiple attempts.") + raise PartialOrNoDataError("Failed to retrieve site records after multiple attempts.") + records: list = data.get("features", []) return records @@ -168,6 +173,8 @@ def get_records(self, site_record): list_of_sites ] } + + data: dict = {} tries: int = 0 while tries < MAX_RETRIES: @@ -184,7 +191,7 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code == 200: - data: dict = response.json() + data = response.json() break elif response.status_code == 429: raise USGSRateLimitError() @@ -202,6 +209,10 @@ def get_records(self, site_record): tries += 1 time.sleep(tries) + if data == {}: + self.warn("Failed to retrieve water level records after multiple attempts.") + raise PartialOrNoDataError("Failed to retrieve water level records after multiple attempts.") + features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] @@ -225,6 +236,7 @@ def get_records(self, site_record): # USGS APIs use cursor pagination, so we can just follow the "next" links until there are no more while found_next_link: tries: int = 0 + data: dict = {} while tries < MAX_RETRIES: try: response = httpx.get( @@ -233,7 +245,7 @@ def get_records(self, site_record): timeout=TIMEOUT, ) if response.status_code == 200: - data: dict = response.json() + data = response.json() break elif response.status_code == 429: raise USGSRateLimitError() @@ -250,6 +262,10 @@ def get_records(self, site_record): tries += 1 time.sleep(tries) + if data == {}: + self.warn("Failed to retrieve paginated water level records after multiple attempts.") + raise PartialOrNoDataError("Failed to retrieve paginated water level records after multiple attempts + features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] records.extend(standard_features) diff --git a/backend/exceptions.py b/backend/exceptions.py index 1a1d0eb..509e44d 100644 --- a/backend/exceptions.py +++ b/backend/exceptions.py @@ -1,2 +1,5 @@ class USGSRateLimitError(Exception): + pass + +class PartialOrNoDataError(Exception): pass \ No newline at end of file diff --git a/backend/unifier.py b/backend/unifier.py index 774014e..d9aede1 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -21,7 +21,7 @@ from backend.persister import BasePersister from backend.persisters.geoserver import GeoServerPersister from backend.source import BaseSiteSource -from backend.exceptions import USGSRateLimitError +from backend.exceptions import USGSRateLimitError, PartialOrNoDataError def health_check(source: BaseSiteSource) -> bool | None: @@ -171,8 +171,8 @@ def _site_wrapper(site_source, parameter_source, persister, config): summary_records = parameter_source.read( site_records, use_summarize, start_ind, end_ind ) - except USGSRateLimitError: - # if a rate limit error is hit we want to remove USGS sites so there aren't partial records + except (USGSRateLimitError, PartialOrNoDataError): + # remove partial records to prevent incomplete data from being saved persister.sites = persister.sites[:initial_sites_len] persister.timeseries = persister.timeseries[:initial_timeseries_len] persister.records = persister.records[:initial_records_len] @@ -187,8 +187,8 @@ def _site_wrapper(site_source, parameter_source, persister, config): results = parameter_source.read( site_records, use_summarize, start_ind, end_ind ) - except USGSRateLimitError: - # if a rate limit error is hit we want to remove USGS sites so there aren't partial records + except (USGSRateLimitError, PartialOrNoDataError): + # remove partial records to prevent incomplete data from being saved persister.sites = persister.sites[:initial_sites_len] persister.timeseries = persister.timeseries[:initial_timeseries_len] break From 74241771af86874d507896e1fd990c70cd8cfc89 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:40:52 -0600 Subject: [PATCH 30/44] fix(usgs): make site source USGS-NWIS to correspond with field measurements --- backend/connectors/usgs/transformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index bfffd4c..fc95212 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -29,7 +29,7 @@ def _transform(self, record): datum = "WGS84" rec = { - "source": "USGS", + "source": "USGS-NWIS", "id": record["properties"]["monitoring_location_id"], "name": record["properties"]["monitoring_location_name"], "latitude": record["geometry"]["coordinates"][1], From 013406dc0df8c5812a9498d7e692dd3f1f0b3d4d Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:54:37 -0600 Subject: [PATCH 31/44] feature(usgs): add dt params --- backend/connectors/usgs/source.py | 65 ++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 540a5f5..014b1d5 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -77,19 +77,26 @@ def health(self): return False def get_records(self): - # TODO: handle date filters - # config = self.config - - # if config.has_bounds(): - # bbox = config.bbox_bounding_points() - # params["bbox"] = ",".join([str(b) for b in bbox]) - # else: - # params["state_code"] = "35" + params = { + "limit": LIMIT, + "parameter_code": "72019", + "site_type_code": "GW", + } - # if config.start_date: - # params["startDt"] = config.start_dt.date().isoformat() - # if config.end_date: - # params["endDt"] = config.end_dt.date().isoformat() + if self.config.has_bounds(): + bbox = self.config.bbox_bounding_points() + params["bbox"] = ",".join([str(b) for b in bbox]) + else: + params["state_code"] = "35" + + if self.config.start_date: + begin: str = self.config.start_dt.date().isoformat() + begin = f"{begin}T00:00:00Z" + params["begin"] = begin + if self.config.end_date: + end: str = self.config.end_dt.date().isoformat() + end = f"{end}T23:59:59Z" + params["end"] = end data: dict = {} tries: int = 0 @@ -102,7 +109,7 @@ def get_records(self): headers = {} response = httpx.get( url=self.sites_url, - params={"limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", "state_code": "35"}, + params=params, timeout=TIMEOUT, headers=headers ) @@ -145,15 +152,27 @@ def __repr__(self): return "NWISWaterLevelSource" def get_records(self, site_record): - # TODO: handle date filters - # config = self.config - # if config.start_date: - # params["startDt"] = config.start_dt.date().isoformat() - # else: - # params["startDt"] = "1900-01-01" + params = { + "limit": LIMIT, + "parameter_code": "72019", + } + + begin: str = "" + end: str = "" + + if self.config.start_date: + begin: str = self.config.start_dt.date().isoformat() + begin = f"{begin}T00:00:00Z" + if self.config.end_date: + end: str = self.config.end_dt.date().isoformat() + end = f"{end}T23:59:59Z" - # if config.end_date: - # params["endDt"] = config.end_dt.date().isoformat() + if begin and end: + params["datetime"] = f"{begin}/{end}" + elif begin: + params["datetime"] = f"{begin}/.." + elif end: + params["datetime"] = f"../{end}" records: list = [] sites: list = make_site_list(site_record) @@ -187,7 +206,7 @@ def get_records(self, site_record): url="https://api.waterdata.usgs.gov/ogcapi/v0/collections/field-measurements/items", json=json_data, headers=headers, - params={"limit": LIMIT, "parameter_code": "72019"}, + params=params, timeout=TIMEOUT, ) if response.status_code == 200: @@ -264,7 +283,7 @@ def get_records(self, site_record): if data == {}: self.warn("Failed to retrieve paginated water level records after multiple attempts.") - raise PartialOrNoDataError("Failed to retrieve paginated water level records after multiple attempts + raise PartialOrNoDataError("Failed to retrieve paginated water level records after multiple attempts") features: list[dict] = data.get("features", []) standard_features: list[dict] = [self._standardize_record(feature) for feature in features] From 7d38c37bc337180543ae9e34b4b734d5a9c1032e Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 13:56:15 -0600 Subject: [PATCH 32/44] note(usgs): type hint variables --- backend/connectors/usgs/source.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 014b1d5..36a3736 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -77,14 +77,14 @@ def health(self): return False def get_records(self): - params = { + params: dict = { "limit": LIMIT, "parameter_code": "72019", "site_type_code": "GW", } if self.config.has_bounds(): - bbox = self.config.bbox_bounding_points() + bbox: tuple = self.config.bbox_bounding_points() params["bbox"] = ",".join([str(b) for b in bbox]) else: params["state_code"] = "35" @@ -152,7 +152,7 @@ def __repr__(self): return "NWISWaterLevelSource" def get_records(self, site_record): - params = { + params: dict = { "limit": LIMIT, "parameter_code": "72019", } @@ -161,10 +161,10 @@ def get_records(self, site_record): end: str = "" if self.config.start_date: - begin: str = self.config.start_dt.date().isoformat() + begin = self.config.start_dt.date().isoformat() begin = f"{begin}T00:00:00Z" if self.config.end_date: - end: str = self.config.end_dt.date().isoformat() + end = self.config.end_dt.date().isoformat() end = f"{end}T23:59:59Z" if begin and end: From 818330004ace4bc2747996a72771f629ae51285d Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:04:39 -0600 Subject: [PATCH 33/44] test(usgs): use .env for api key for tests --- requirements.txt | 1 + setup.py | 1 + tests/test_sources/test_nwis.py | 14 ++++++++++++++ 3 files changed, 16 insertions(+) diff --git a/requirements.txt b/requirements.txt index 15193eb..ba9ce76 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ click==8.2.1 +dotenv flask frost_sta_client Geoalchemy2 diff --git a/setup.py b/setup.py index fb987e1..923b472 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ ], install_requires=[ "click==8.2.1", + "dotenv", "flask", "frost_sta_client", "Geoalchemy2", diff --git a/tests/test_sources/test_nwis.py b/tests/test_sources/test_nwis.py index b7bf272..e67e367 100644 --- a/tests/test_sources/test_nwis.py +++ b/tests/test_sources/test_nwis.py @@ -1,6 +1,20 @@ +import os +from dotenv import load_dotenv +import pytest + from backend.constants import WATERLEVELS, FEET from tests.test_sources import BaseSourceTestClass +@pytest.fixture(autouse=True) +def setup(): + # SETUP CODE ----------------------------------------------------------- + load_dotenv() + + # RUN TESTS ------------------------------------------------------------ + yield + + # TEARDOWN CODE --------------------------------------------------------- + os.environ["USGS_API_KEY"] = "" class TestNWISWaterlevels(BaseSourceTestClass): From 9bf3fc2f3379dcaa491dd19765674f8b99703f43 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:04:46 -0600 Subject: [PATCH 34/44] fix(usgs): Handle USGS rate limit and partial/no data errors --- backend/unifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/unifier.py b/backend/unifier.py index d9aede1..fef4ce2 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -142,7 +142,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): try: sites = site_source.read() - except USGSRateLimitError: + except (USGSRateLimitError, PartialOrNoDataError): sites = [] if not sites: From 43409f3dcfa43a1f03e9a18573d6a11a159c81b7 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:11:35 -0600 Subject: [PATCH 35/44] fix(test): don't override parent setup fixture --- tests/test_sources/test_nmbgmr_amp.py | 2 +- tests/test_sources/test_nwis.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_sources/test_nmbgmr_amp.py b/tests/test_sources/test_nmbgmr_amp.py index b56fd5b..e15b717 100644 --- a/tests/test_sources/test_nmbgmr_amp.py +++ b/tests/test_sources/test_nmbgmr_amp.py @@ -8,7 +8,7 @@ @pytest.fixture(autouse=True) -def setup(): +def setup_nmbgmr_amp(): # SETUP CODE ----------------------------------------------------------- os.environ["IS_TESTING_ENV"] = "True" diff --git a/tests/test_sources/test_nwis.py b/tests/test_sources/test_nwis.py index e67e367..02a716a 100644 --- a/tests/test_sources/test_nwis.py +++ b/tests/test_sources/test_nwis.py @@ -6,7 +6,7 @@ from tests.test_sources import BaseSourceTestClass @pytest.fixture(autouse=True) -def setup(): +def setup_nwis(): # SETUP CODE ----------------------------------------------------------- load_dotenv() From 441ff81d073cb6d6d0fe9dec5cafe33346fd2ed1 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:12:00 -0600 Subject: [PATCH 36/44] fix(test): remove env var outside of fixture --- tests/test_sources/test_nmbgmr_amp.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_sources/test_nmbgmr_amp.py b/tests/test_sources/test_nmbgmr_amp.py index e15b717..be75957 100644 --- a/tests/test_sources/test_nmbgmr_amp.py +++ b/tests/test_sources/test_nmbgmr_amp.py @@ -4,9 +4,6 @@ from backend.constants import WATERLEVELS, CALCIUM, MILLIGRAMS_PER_LITER, FEET from tests.test_sources import BaseSourceTestClass -os.environ["IS_TESTING_ENV"] = "True" - - @pytest.fixture(autouse=True) def setup_nmbgmr_amp(): # SETUP CODE ----------------------------------------------------------- From e524ec74b0f89c3405ffaa782bc736a73ed0e411 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:35:54 -0600 Subject: [PATCH 37/44] fix(requirements): download python-dotenv --- requirements.txt | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index ba9ce76..bd7f502 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ click==8.2.1 -dotenv +python-dotenv flask frost_sta_client Geoalchemy2 diff --git a/setup.py b/setup.py index 923b472..81bc3c1 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ ], install_requires=[ "click==8.2.1", - "dotenv", + "python-dotenv", "flask", "frost_sta_client", "Geoalchemy2", From 7af57d6444e531adbf60f2c6687383da77b3a821 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:36:10 -0600 Subject: [PATCH 38/44] fix(test): load usgs api key from .env for every test --- tests/test_sources/test_nwis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_sources/test_nwis.py b/tests/test_sources/test_nwis.py index 02a716a..0c45618 100644 --- a/tests/test_sources/test_nwis.py +++ b/tests/test_sources/test_nwis.py @@ -8,7 +8,7 @@ @pytest.fixture(autouse=True) def setup_nwis(): # SETUP CODE ----------------------------------------------------------- - load_dotenv() + load_dotenv(override=True) # RUN TESTS ------------------------------------------------------------ yield From d7382bd566f63636e78d934703327ac6c81231b4 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:39:03 -0600 Subject: [PATCH 39/44] fix(unifier): remove partial persister.records on failure --- backend/unifier.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/unifier.py b/backend/unifier.py index fef4ce2..d044d71 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -191,6 +191,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): # remove partial records to prevent incomplete data from being saved persister.sites = persister.sites[:initial_sites_len] persister.timeseries = persister.timeseries[:initial_timeseries_len] + persister.records = persister.records[:initial_records_len] break # no records are returned if there is no site record for parameter # or if the record isn't clean (doesn't have the correct fields) From 245b01fb288cf388749d2992316062d273d234a9 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:39:20 -0600 Subject: [PATCH 40/44] fix(usgs): use truthiness check for usgs api key header --- backend/connectors/usgs/source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 36a3736..e4e056d 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -59,7 +59,7 @@ def tag(self): def health(self): try: - if os.environ.get("USGS_API_KEY") is not None: + if os.environ.get("USGS_API_KEY"): headers = {"X-API-Key": os.environ["USGS_API_KEY"]} else: headers = {} @@ -103,7 +103,7 @@ def get_records(self): while tries < MAX_RETRIES: try: - if os.environ.get("USGS_API_KEY") is not None: + if os.environ.get("USGS_API_KEY"): headers = {"X-API-Key": os.environ["USGS_API_KEY"]} else: headers = {} @@ -198,7 +198,7 @@ def get_records(self, site_record): while tries < MAX_RETRIES: try: - if os.environ.get("USGS_API_KEY") is not None: + if os.environ.get("USGS_API_KEY"): headers = {"X-API-Key": os.environ["USGS_API_KEY"], "Content-Type": "application/query-cql-json"} else: headers = {"Content-Type": "application/query-cql-json"} From f5b2e2fa56eb8ff5470de9d259744b66b46e6d63 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 14:39:42 -0600 Subject: [PATCH 41/44] fix(style): two blank lines between classes --- backend/exceptions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/exceptions.py b/backend/exceptions.py index 509e44d..80b7b84 100644 --- a/backend/exceptions.py +++ b/backend/exceptions.py @@ -1,5 +1,6 @@ class USGSRateLimitError(Exception): pass + class PartialOrNoDataError(Exception): pass \ No newline at end of file From 85ab811a0f3930bf50d886115e0c6aa6cf518837 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 15:39:39 -0600 Subject: [PATCH 42/44] fix(usgs): fail health check on non 2xx status codes --- backend/connectors/usgs/source.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index e4e056d..3da177f 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -69,10 +69,8 @@ def health(self): timeout=TIMEOUT, headers=headers ) - if response.status_code == 200: - return True - else: - return False + response.raise_for_status() + return True except httpx.HTTPStatusError: return False From 5cf9904cbfbfc5907a51ba0ca75442f384888458 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 15:47:14 -0600 Subject: [PATCH 43/44] test(usgs api key): test usgs api key is env var when set via --usgs-api-key flag --- tests/test_cli/__init__.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 2029c38..8d5b596 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -3,6 +3,7 @@ from pathlib import Path import pytest from typing import List +import os from backend.config import SOURCE_KEYS from backend.constants import ( @@ -57,6 +58,7 @@ def _test_weave( bbox: str | None = None, county: str | None = None, wkt: str | None = None, + usgs_api_key: str | None = None, ): # Arrange # turn off all sources except for the one being tested @@ -100,6 +102,9 @@ def _test_weave( arguments.extend(no_agencies) + if usgs_api_key: + arguments.extend(["--usgs-api-key", usgs_api_key]) + # Act result = self.runner.invoke(weave, arguments, standalone_mode=False) @@ -177,6 +182,10 @@ def _test_weave( # 9 assert getattr(config, "output_format") == output_format + + # 10 + if usgs_api_key: + assert os.getenv("USGS_API_KEY") == usgs_api_key except Exception as e: print(result) assert False @@ -217,6 +226,13 @@ def test_weave_wkt(self): wkt="POLYGON((-106.0 32.0, -102.0 32.0, -102.0 36.0, -106.0 36.0, -106.0 32.0))", ) + def test_weave_usgs_api_key(self): + self._test_weave( + parameter=WATERLEVELS, + output_type="summary", + usgs_api_key="TEST_API_KEY", + ) + def test_weave_waterlevels(self): self._test_weave(parameter=WATERLEVELS, output_type="summary") From 4f3b913c9cf7f380ccf00e402b41638dd03e24e8 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Fri, 1 May 2026 09:05:58 -0600 Subject: [PATCH 44/44] fix(test): reset usgs api key after each test --- tests/test_cli/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_cli/__init__.py b/tests/test_cli/__init__.py index 8d5b596..7e3f21a 100644 --- a/tests/test_cli/__init__.py +++ b/tests/test_cli/__init__.py @@ -102,6 +102,8 @@ def _test_weave( arguments.extend(no_agencies) + # save original USGS_API_KEY to reset after test, and set to test value if provided + original_usgs_api_key = os.getenv("USGS_API_KEY", None) if usgs_api_key: arguments.extend(["--usgs-api-key", usgs_api_key]) @@ -186,6 +188,11 @@ def _test_weave( # 10 if usgs_api_key: assert os.getenv("USGS_API_KEY") == usgs_api_key + + if original_usgs_api_key is not None: + os.environ["USGS_API_KEY"] = original_usgs_api_key + else: + del os.environ["USGS_API_KEY"] except Exception as e: print(result) assert False