From 4eaac1c86064f418880f13be1fb954a57881fdb2 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:20:02 -0600 Subject: [PATCH 01/10] fix(nmbgmr): set timeout to 30 minutes and retry failures --- backend/connectors/nmbgmr/source.py | 66 +++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 45e0a16..3bcbe75 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -44,6 +44,11 @@ make_site_list, ) +""" +Set timeout to 30 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond. +Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API. Instead, catch timeout exceptions and retry the request until it succeeds or a different exception is raised. +""" +TIMEOUT=1800 def _make_url(endpoint): if os.getenv("DEBUG") == "1": @@ -83,9 +88,16 @@ def get_records(self): params["parameter"] = "Manual groundwater levels" # tags="features" because the response object is a GeoJSON - sites = self._execute_json_request( - _make_url("locations"), params, tag="features", timeout=30 - ) + request_finished: bool = False + while not request_finished: + try: + sites = self._execute_json_request( + _make_url("locations"), params, tag="features", timeout=TIMEOUT + ) + request_finished = True + except Exception as e: + self.warning(f"Error retrieving site data: {e}. Retrying...") + if not config.sites_only: for site in sites: if get_bool_env_variable("IS_TESTING_ENV"): @@ -119,14 +131,23 @@ def get_records(self, site_record): analyte = get_analyte_search_param( self.config.parameter, NMBGMR_ANALYTE_MAPPING ) - records = self._execute_json_request( - _make_url("waterchemistry"), - params={ - "pointid": ",".join(make_site_list(site_record)), - "analyte": analyte, - }, - tag="", - ) + + request_finished: bool = False + + while not request_finished: + try: + records = self._execute_json_request( + _make_url("waterchemistry"), + params={ + "pointid": ",".join(make_site_list(site_record)), + "analyte": analyte, + }, + tag="", + timeout=TIMEOUT + ) + request_finished = True + except Exception as e: + self.warning(f"Error retrieving analyte data: {e}. Retrying...") records_sorted_by_pointid = {} for pointid in records.keys(): records_sorted_by_pointid[pointid] = records[pointid][analyte] @@ -221,10 +242,17 @@ def get_records(self, site_record): # url = _make_url("waterlevels/latest") # else: params = {"pointid": ",".join(make_site_list(site_record))} + print(make_site_list(site_record)) # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") - paginated_records = self._execute_json_request(url, params, tag="") + request_finished: bool = False + while not request_finished: + try: + paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) + request_finished = True + except Exception as e: + self.warning(f"Error retrieving water level data: {e}. Retrying...") items = paginated_records["items"] page = paginated_records["page"] pages = paginated_records["pages"] @@ -232,7 +260,19 @@ def get_records(self, site_record): while page < pages: page += 1 params["page"] = page - new_records = self._execute_json_request(url, params, tag="") + + request_finished: bool = False + while not request_finished: + try: + new_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) + # status_code != 200 makes _execute_json_request return None, so check for that and retry if it happens + if new_records is None: + self.warning("Retrying...") + continue + request_finished = True + except Exception as e: + self.warning(f"Error retrieving page {page} of water level data: {e}. Retrying...") + items.extend(new_records["items"]) pages = new_records["pages"] From 4fd3ac67c2bf1f43e0124ab9dfd2519f29718fa7 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:42:11 -0600 Subject: [PATCH 02/10] fix(nmbgmr): retry when status_code!=200 --- backend/connectors/nmbgmr/source.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 3bcbe75..42ef9d4 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -94,6 +94,9 @@ def get_records(self): sites = self._execute_json_request( _make_url("locations"), params, tag="features", timeout=TIMEOUT ) + if sites is None: + self.warning("Retrying...") + continue request_finished = True except Exception as e: self.warning(f"Error retrieving site data: {e}. Retrying...") @@ -145,6 +148,9 @@ def get_records(self, site_record): tag="", timeout=TIMEOUT ) + if records is None: + self.warning("Retrying...") + continue request_finished = True except Exception as e: self.warning(f"Error retrieving analyte data: {e}. Retrying...") @@ -250,6 +256,9 @@ def get_records(self, site_record): while not request_finished: try: paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) + if paginated_records is None: + self.warning("Retrying...") + continue request_finished = True except Exception as e: self.warning(f"Error retrieving water level data: {e}. Retrying...") From 1fcede39cad4116ed2adac4e409be4bc371a1a52 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 12:58:19 -0600 Subject: [PATCH 03/10] fix(nmbgmr): remove print debugging statement --- backend/connectors/nmbgmr/source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 42ef9d4..8dbe702 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -248,7 +248,6 @@ def get_records(self, site_record): # url = _make_url("waterlevels/latest") # else: params = {"pointid": ",".join(make_site_list(site_record))} - print(make_site_list(site_record)) # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") From c1771d7b8afc3a1a9e2d335b26880498e56fc5a3 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 13:22:12 -0600 Subject: [PATCH 04/10] fix(nmbgmr): call self.warn not warning --- backend/connectors/nmbgmr/source.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 8dbe702..ab4c1fc 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -95,11 +95,11 @@ def get_records(self): _make_url("locations"), params, tag="features", timeout=TIMEOUT ) if sites is None: - self.warning("Retrying...") + self.warn("Retrying...") continue request_finished = True except Exception as e: - self.warning(f"Error retrieving site data: {e}. Retrying...") + self.warn(f"Error retrieving site data: {e}. Retrying...") if not config.sites_only: for site in sites: @@ -149,11 +149,11 @@ def get_records(self, site_record): timeout=TIMEOUT ) if records is None: - self.warning("Retrying...") + self.warn("Retrying...") continue request_finished = True except Exception as e: - self.warning(f"Error retrieving analyte data: {e}. Retrying...") + self.warn(f"Error retrieving analyte data: {e}. Retrying...") records_sorted_by_pointid = {} for pointid in records.keys(): records_sorted_by_pointid[pointid] = records[pointid][analyte] @@ -256,11 +256,11 @@ def get_records(self, site_record): try: paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) if paginated_records is None: - self.warning("Retrying...") + self.warn("Retrying...") continue request_finished = True except Exception as e: - self.warning(f"Error retrieving water level data: {e}. Retrying...") + self.warn(f"Error retrieving water level data: {e}. Retrying...") items = paginated_records["items"] page = paginated_records["page"] pages = paginated_records["pages"] @@ -275,11 +275,11 @@ def get_records(self, site_record): new_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) # status_code != 200 makes _execute_json_request return None, so check for that and retry if it happens if new_records is None: - self.warning("Retrying...") + self.warn("Retrying...") continue request_finished = True except Exception as e: - self.warning(f"Error retrieving page {page} of water level data: {e}. Retrying...") + self.warn(f"Error retrieving page {page} of water level data: {e}. Retrying...") items.extend(new_records["items"]) pages = new_records["pages"] From 43e24571fa55ed46960e6dc88abf57d18ea7ed21 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 15:28:45 -0600 Subject: [PATCH 05/10] refactor(nmbgmr): change note style at top of doc --- backend/connectors/nmbgmr/source.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index ab4c1fc..d30d3a2 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -44,10 +44,11 @@ make_site_list, ) -""" -Set timeout to 30 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond. -Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API. Instead, catch timeout exceptions and retry the request until it succeeds or a different exception is raised. -""" + +# Set timeout to 30 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond. +# Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API. +# Instead, catch timeout exceptions and retry the request until it succeeds or a different exception is raised. + TIMEOUT=1800 def _make_url(endpoint): From c17e806ff91778729a6a2740ece1c53b5d0e1082 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 16:00:43 -0600 Subject: [PATCH 06/10] refactor(retries): move rety logic to BaseSource this prevents code duplication and allows all sources to benefit from retry logic when making requests to external APIs. --- backend/connectors/nmbgmr/source.py | 69 ++++++------------------ backend/source.py | 82 ++++++++++++++++++++--------- 2 files changed, 72 insertions(+), 79 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index d30d3a2..765a893 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -88,19 +88,9 @@ def get_records(self): else: params["parameter"] = "Manual groundwater levels" - # tags="features" because the response object is a GeoJSON - request_finished: bool = False - while not request_finished: - try: - sites = self._execute_json_request( - _make_url("locations"), params, tag="features", timeout=TIMEOUT - ) - if sites is None: - self.warn("Retrying...") - continue - request_finished = True - except Exception as e: - self.warn(f"Error retrieving site data: {e}. Retrying...") + sites = self._execute_json_request( + _make_url("locations"), params, tag="features", timeout=TIMEOUT + ) if not config.sites_only: for site in sites: @@ -136,25 +126,16 @@ def get_records(self, site_record): self.config.parameter, NMBGMR_ANALYTE_MAPPING ) - request_finished: bool = False - - while not request_finished: - try: - records = self._execute_json_request( - _make_url("waterchemistry"), - params={ - "pointid": ",".join(make_site_list(site_record)), - "analyte": analyte, - }, - tag="", - timeout=TIMEOUT - ) - if records is None: - self.warn("Retrying...") - continue - request_finished = True - except Exception as e: - self.warn(f"Error retrieving analyte data: {e}. Retrying...") + records = self._execute_json_request( + _make_url("waterchemistry"), + params={ + "pointid": ",".join(make_site_list(site_record)), + "analyte": analyte, + }, + tag="", + timeout=TIMEOUT + ) + records_sorted_by_pointid = {} for pointid in records.keys(): records_sorted_by_pointid[pointid] = records[pointid][analyte] @@ -252,16 +233,8 @@ def get_records(self, site_record): # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") - request_finished: bool = False - while not request_finished: - try: - paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) - if paginated_records is None: - self.warn("Retrying...") - continue - request_finished = True - except Exception as e: - self.warn(f"Error retrieving water level data: {e}. Retrying...") + paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) + items = paginated_records["items"] page = paginated_records["page"] pages = paginated_records["pages"] @@ -270,17 +243,7 @@ def get_records(self, site_record): page += 1 params["page"] = page - request_finished: bool = False - while not request_finished: - try: - new_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) - # status_code != 200 makes _execute_json_request return None, so check for that and retry if it happens - if new_records is None: - self.warn("Retrying...") - continue - request_finished = True - except Exception as e: - self.warn(f"Error retrieving page {page} of water level data: {e}. Retrying...") + new_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) items.extend(new_records["items"]) pages = new_records["pages"] diff --git a/backend/source.py b/backend/source.py index 5189258..cafb447 100644 --- a/backend/source.py +++ b/backend/source.py @@ -19,6 +19,7 @@ import shapely.wkt from shapely import MultiPoint from typing import Union, List, Callable, Dict +import time from backend.constants import ( FEET, @@ -200,7 +201,7 @@ def discover(self, *args, **kw): # Methods Already Implemented # ========================================================================== - def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> str: + def _execute_text_request(self, url: str, params: dict | None = None, max_tries: int = 7, **kw) -> str: """ Executes a get request to the provided url and returns the text response. @@ -212,6 +213,9 @@ def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> s params : dict key-value query parameters to pass to the get request + max_tries : int + the maximum number of times to retry the request if it fails + Returns ------- str @@ -220,17 +224,32 @@ def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> s if "timeout" not in kw: kw["timeout"] = 10 - resp = httpx.get(url, params=params, **kw) - if resp.status_code == 200: - return resp.text - else: - self.warn(f"service url {resp.url}") - self.warn(f"service responded with status {resp.status_code}") - self.warn(f"service responded with text {resp.text}") + tries: int = 0 + + while tries < max_tries: + try: + resp = httpx.get(url, params=params, **kw) + if resp.status_code == 200: + return resp.text + else: + self.warn(f"service responded with status {resp.status_code}") + self.warn(f"service responded with text {resp.text}") + self.warn(f"Retrying... {tries+1}/{max_tries}") + except Exception as e: + self.warn(f"Error during request: {e}") + self.warn(f"Retrying... {tries+1}/{max_tries}") + tries += 1 + time.sleep(tries) + return "" def _execute_json_request( - self, url: str, params: dict | None = None, tag: str | None = None, **kw + self, + url: str, + params: dict | None = None, + tag: str | None = None, + max_retries: int = 7, + **kw ) -> dict | None: """ Executes a get request to the provided url and returns the json response. @@ -245,30 +264,41 @@ def _execute_json_request( tag : str the key to extract from the json response if required + + max_retries : int + the maximum number of times to retry the request if it fails Returns ------- dict the json response """ - resp = httpx.get(url, params=params, **kw) - if tag is None: - tag = "data" - - if resp.status_code == 200: + tries: int = 0 + while tries < max_retries: try: - obj = resp.json() - if tag and isinstance(obj, dict): - return obj[tag] - return obj - except JSONDecodeError: - self.warn(f"service responded but with no data. \n{resp.text}") - return None - else: - self.warn(f"service responded with status {resp.status_code}") - self.warn(f"service responded with text {resp.text}") - self.warn(f"service at url: {resp.url}") - return None + resp = httpx.get(url, params=params, **kw) + if tag is None: + tag = "data" + + if resp.status_code == 200: + try: + obj = resp.json() + if tag and isinstance(obj, dict): + return obj[tag] + return obj + except JSONDecodeError: + self.warn(f"service responded but with no data. \n{resp.text}") + return None + else: + self.warn(f"service responded with status {resp.status_code}") + self.warn(f"service responded with text {resp.text}") + self.warn(f"Retrying... {tries+1}/{max_retries}") + except Exception as e: + self.warn(f"Error during request: {e}") + self.warn(f"Retrying... {tries+1}/{max_retries}") + tries += 1 + time.sleep(tries) + return None # ========================================================================== # Methods Implemented in BaseSiteSource and BaseParameterSource From 4a64c6ffd395fc6328d8db5e74d220a4fe691811 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Wed, 29 Apr 2026 16:01:27 -0600 Subject: [PATCH 07/10] refactor(nmbgmr): change timeout to 15 minutes --- backend/connectors/nmbgmr/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 765a893..023aea7 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -49,7 +49,7 @@ # Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API. # Instead, catch timeout exceptions and retry the request until it succeeds or a different exception is raised. -TIMEOUT=1800 +TIMEOUT=15*60 def _make_url(endpoint): if os.getenv("DEBUG") == "1": From 0e961a4fa8831066b63a9dfdd5dce8ce5e43d2f5 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 08:29:21 -0600 Subject: [PATCH 08/10] fix(nmbgmr): update developers note --- backend/connectors/nmbgmr/source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 023aea7..1050d36 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -45,9 +45,9 @@ ) -# Set timeout to 30 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond. +# Set timeout to 15 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond. # Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API. -# Instead, catch timeout exceptions and retry the request until it succeeds or a different exception is raised. +# Instead, catch timeout and other exceptions and retry the request up to 7 times with a delay between retries. TIMEOUT=15*60 From 8dd54d71f21f7878e57418882e363a00ffcb6910 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 08:30:46 -0600 Subject: [PATCH 09/10] fix(source): return null string outside of while loop --- backend/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/source.py b/backend/source.py index cafb447..dd89c19 100644 --- a/backend/source.py +++ b/backend/source.py @@ -241,7 +241,7 @@ def _execute_text_request(self, url: str, params: dict | None = None, max_tries: tries += 1 time.sleep(tries) - return "" + return "" def _execute_json_request( self, From 32b6b2542f0a8a0344bf885db19aff2faa2a2dda Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 30 Apr 2026 08:32:30 -0600 Subject: [PATCH 10/10] fix(source): add URL to error message and log --- backend/source.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/source.py b/backend/source.py index dd89c19..1309343 100644 --- a/backend/source.py +++ b/backend/source.py @@ -234,6 +234,7 @@ def _execute_text_request(self, url: str, params: dict | None = None, max_tries: else: self.warn(f"service responded with status {resp.status_code}") self.warn(f"service responded with text {resp.text}") + self.warn(f"URL: {url}") self.warn(f"Retrying... {tries+1}/{max_tries}") except Exception as e: self.warn(f"Error during request: {e}") @@ -288,10 +289,12 @@ def _execute_json_request( return obj except JSONDecodeError: self.warn(f"service responded but with no data. \n{resp.text}") + self.warn(f"URL: {url}") return None else: self.warn(f"service responded with status {resp.status_code}") - self.warn(f"service responded with text {resp.text}") + self.warn(f"service responded with text {resp.text} for url {resp.url}") + self.warn(f"URL: {url}") self.warn(f"Retrying... {tries+1}/{max_retries}") except Exception as e: self.warn(f"Error during request: {e}")