Skip to content
20 changes: 16 additions & 4 deletions backend/connectors/nmbgmr/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
)


# Set timeout to 15 minutes for analyte and water level requests since some sites have a large number of records and the NMBGMR API can be slow to respond.
# Don't use timeout=None since that can cause the request to hang indefinitely if there are issues with the API.
# Instead, catch timeout and other exceptions and retry the request up to 7 times with a delay between retries.

TIMEOUT=15*60

def _make_url(endpoint):
if os.getenv("DEBUG") == "1":
url = f"http://localhost:8000/latest/{endpoint}"
Expand Down Expand Up @@ -82,10 +88,10 @@ def get_records(self):
else:
params["parameter"] = "Manual groundwater levels"

# tags="features" because the response object is a GeoJSON
sites = self._execute_json_request(
_make_url("locations"), params, tag="features", timeout=30
_make_url("locations"), params, tag="features", timeout=TIMEOUT
)

if not config.sites_only:
for site in sites:
if get_bool_env_variable("IS_TESTING_ENV"):
Expand Down Expand Up @@ -119,14 +125,17 @@ def get_records(self, site_record):
analyte = get_analyte_search_param(
self.config.parameter, NMBGMR_ANALYTE_MAPPING
)

records = self._execute_json_request(
_make_url("waterchemistry"),
params={
"pointid": ",".join(make_site_list(site_record)),
"analyte": analyte,
},
tag="",
timeout=TIMEOUT
)

records_sorted_by_pointid = {}
for pointid in records.keys():
records_sorted_by_pointid[pointid] = records[pointid][analyte]
Expand Down Expand Up @@ -224,15 +233,18 @@ def get_records(self, site_record):
# just use manual waterlevels temporarily
url = _make_url("waterlevels/manual")

paginated_records = self._execute_json_request(url, params, tag="")
paginated_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT)

items = paginated_records["items"]
page = paginated_records["page"]
pages = paginated_records["pages"]

while page < pages:
page += 1
params["page"] = page
new_records = self._execute_json_request(url, params, tag="")

new_records = self._execute_json_request(url, params, tag="", timeout=TIMEOUT)

items.extend(new_records["items"])
pages = new_records["pages"]

Expand Down
87 changes: 60 additions & 27 deletions backend/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import shapely.wkt
from shapely import MultiPoint
from typing import Union, List, Callable, Dict
import time

from backend.constants import (
FEET,
Expand Down Expand Up @@ -200,7 +201,7 @@ def discover(self, *args, **kw):
# Methods Already Implemented
# ==========================================================================

def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> str:
def _execute_text_request(self, url: str, params: dict | None = None, max_tries: int = 7, **kw) -> str:
"""
Executes a get request to the provided url and returns the text response.

Expand All @@ -212,6 +213,9 @@ def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> s
params : dict
key-value query parameters to pass to the get request

max_tries : int
the maximum number of times to retry the request if it fails

Returns
-------
str
Expand All @@ -220,17 +224,33 @@ def _execute_text_request(self, url: str, params: dict | None = None, **kw) -> s
if "timeout" not in kw:
kw["timeout"] = 10

Comment on lines 224 to 226
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is for this to apply to NMBGMR, at least so far. @jirhiker and @likithabommasani21 should the 15 minute timeout be the default for all sources?

resp = httpx.get(url, params=params, **kw)
if resp.status_code == 200:
return resp.text
else:
self.warn(f"service url {resp.url}")
self.warn(f"service responded with status {resp.status_code}")
self.warn(f"service responded with text {resp.text}")
return ""
tries: int = 0

while tries < max_tries:
try:
resp = httpx.get(url, params=params, **kw)
if resp.status_code == 200:
return resp.text
else:
self.warn(f"service responded with status {resp.status_code}")
self.warn(f"service responded with text {resp.text}")
self.warn(f"URL: {url}")
self.warn(f"Retrying... {tries+1}/{max_tries}")
except Exception as e:
self.warn(f"Error during request: {e}")
self.warn(f"Retrying... {tries+1}/{max_tries}")
tries += 1
time.sleep(tries)

return ""

def _execute_json_request(
self, url: str, params: dict | None = None, tag: str | None = None, **kw
self,
url: str,
params: dict | None = None,
tag: str | None = None,
max_retries: int = 7,
**kw
Comment thread
jirhiker marked this conversation as resolved.
) -> dict | None:
"""
Executes a get request to the provided url and returns the json response.
Expand All @@ -245,30 +265,43 @@ def _execute_json_request(

tag : str
the key to extract from the json response if required

max_retries : int
the maximum number of times to retry the request if it fails

Returns
-------
dict
the json response
"""
resp = httpx.get(url, params=params, **kw)
if tag is None:
tag = "data"

if resp.status_code == 200:
tries: int = 0
while tries < max_retries:
try:
obj = resp.json()
if tag and isinstance(obj, dict):
return obj[tag]
return obj
except JSONDecodeError:
self.warn(f"service responded but with no data. \n{resp.text}")
return None
else:
self.warn(f"service responded with status {resp.status_code}")
self.warn(f"service responded with text {resp.text}")
self.warn(f"service at url: {resp.url}")
return None
resp = httpx.get(url, params=params, **kw)
if tag is None:
tag = "data"

if resp.status_code == 200:
try:
obj = resp.json()
if tag and isinstance(obj, dict):
return obj[tag]
return obj
except JSONDecodeError:
self.warn(f"service responded but with no data. \n{resp.text}")
self.warn(f"URL: {url}")
return None
else:
self.warn(f"service responded with status {resp.status_code}")
self.warn(f"service responded with text {resp.text} for url {resp.url}")
self.warn(f"URL: {url}")
self.warn(f"Retrying... {tries+1}/{max_retries}")
except Exception as e:
self.warn(f"Error during request: {e}")
self.warn(f"Retrying... {tries+1}/{max_retries}")
tries += 1
time.sleep(tries)
return None
Comment thread
jirhiker marked this conversation as resolved.

# ==========================================================================
# Methods Implemented in BaseSiteSource and BaseParameterSource
Expand Down