diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 45e0a16..1050d36 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -45,6 +45,12 @@ ) +# Set timeout to 15 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond. +# Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API. +# Instead, catch timeout and other exceptions and retry the request up to 7 times with a delay between retries. + +TIMEOUT=15*60 + def _make_url(endpoint): if os.getenv("DEBUG") == "1": url = f"http://localhost:8000/latest/{endpoint}" @@ -82,10 +88,10 @@ def get_records(self): else: params["parameter"] = "Manual groundwater levels" - # tags="features" because the response object is a GeoJSON sites = self._execute_json_request( - _make_url("locations"), params, tag="features", timeout=30 + _make_url("locations"), params, tag="features", timeout=TIMEOUT ) + if not config.sites_only: for site in sites: if get_bool_env_variable("IS_TESTING_ENV"): @@ -119,6 +125,7 @@ def get_records(self, site_record): analyte = get_analyte_search_param( self.config.parameter, NMBGMR_ANALYTE_MAPPING ) + records = self._execute_json_request( _make_url("waterchemistry"), params={ @@ -126,7 +133,9 @@ def get_records(self, site_record): "analyte": analyte, }, tag="", + timeout=TIMEOUT ) + records_sorted_by_pointid = {} for pointid in records.keys(): records_sorted_by_pointid[pointid] = records[pointid][analyte] @@ -224,7 +233,8 @@ def get_records(self, site_record): # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") - paginated_records = self._execute_json_request(url, params, tag="") + paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) + items = paginated_records["items"] page = paginated_records["page"] pages = paginated_records["pages"] @@ -232,7 +242,9 @@ def get_records(self, site_record): while page < pages: page += 1 params["page"] = page - new_records = self._execute_json_request(url, params, tag="") + + new_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT) + items.extend(new_records["items"]) pages = new_records["pages"] diff --git a/backend/source.py b/backend/source.py index 5189258..1309343 100644 --- a/backend/source.py +++ b/backend/source.py @@ -19,6 +19,7 @@ import shapely.wkt from shapely import MultiPoint from typing import Union, List, Callable, Dict +import time from backend.constants import ( FEET, @@ -200,7 +201,7 @@ def discover(self, *args, **kw): # Methods Already Implemented # ========================================================================== - def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> str: + def _execute_text_request(self, url: str, params: dict | None = None, max_tries: int = 7, **kw) -> str: """ Executes a get request to the provided url and returns the text response. @@ -212,6 +213,9 @@ def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> s params : dict key-value query parameters to pass to the get request + max_tries : int + the maximum number of times to retry the request if it fails + Returns ------- str @@ -220,17 +224,33 @@ def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> s if "timeout" not in kw: kw["timeout"] = 10 - resp = httpx.get(url, params=params, **kw) - if resp.status_code == 200: - return resp.text - else: - self.warn(f"service url {resp.url}") - self.warn(f"service responded with status {resp.status_code}") - self.warn(f"service responded with text {resp.text}") - return "" + tries: int = 0 + + while tries < max_tries: + try: + resp = httpx.get(url, params=params, **kw) + if resp.status_code == 200: + return resp.text + else: + self.warn(f"service responded with status {resp.status_code}") + self.warn(f"service responded with text {resp.text}") + self.warn(f"URL: {url}") + self.warn(f"Retrying... {tries+1}/{max_tries}") + except Exception as e: + self.warn(f"Error during request: {e}") + self.warn(f"Retrying... {tries+1}/{max_tries}") + tries += 1 + time.sleep(tries) + + return "" def _execute_json_request( - self, url: str, params: dict | None = None, tag: str | None = None, **kw + self, + url: str, + params: dict | None = None, + tag: str | None = None, + max_retries: int = 7, + **kw ) -> dict | None: """ Executes a get request to the provided url and returns the json response. @@ -245,30 +265,43 @@ def _execute_json_request( tag : str the key to extract from the json response if required + + max_retries : int + the maximum number of times to retry the request if it fails Returns ------- dict the json response """ - resp = httpx.get(url, params=params, **kw) - if tag is None: - tag = "data" - - if resp.status_code == 200: + tries: int = 0 + while tries < max_retries: try: - obj = resp.json() - if tag and isinstance(obj, dict): - return obj[tag] - return obj - except JSONDecodeError: - self.warn(f"service responded but with no data. \n{resp.text}") - return None - else: - self.warn(f"service responded with status {resp.status_code}") - self.warn(f"service responded with text {resp.text}") - self.warn(f"service at url: {resp.url}") - return None + resp = httpx.get(url, params=params, **kw) + if tag is None: + tag = "data" + + if resp.status_code == 200: + try: + obj = resp.json() + if tag and isinstance(obj, dict): + return obj[tag] + return obj + except JSONDecodeError: + self.warn(f"service responded but with no data. \n{resp.text}") + self.warn(f"URL: {url}") + return None + else: + self.warn(f"service responded with status {resp.status_code}") + self.warn(f"service responded with text {resp.text} for url {resp.url}") + self.warn(f"URL: {url}") + self.warn(f"Retrying... {tries+1}/{max_retries}") + except Exception as e: + self.warn(f"Error during request: {e}") + self.warn(f"Retrying... {tries+1}/{max_retries}") + tries += 1 + time.sleep(tries) + return None # ========================================================================== # Methods Implemented in BaseSiteSource and BaseParameterSource