Skip to content

Commit d9b4cad

Browse files
committed
Back off USGS publishers on rate limits
1 parent 0b192fe commit d9b4cad

2 files changed

Lines changed: 74 additions & 4 deletions

File tree

publishers/usgs_nims/usgs_nims_publisher.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@
5555
FILENAME_TS_RE = re.compile(r"___(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})Z\.jpg$")
5656

5757

58+
class UpstreamRateLimit(RuntimeError):
59+
def __init__(self, message: str, retry_after: float | None = None):
60+
super().__init__(message)
61+
self.retry_after = retry_after
62+
63+
64+
def _retry_after_seconds(error: HTTPError) -> float | None:
65+
raw = error.headers.get("Retry-After") if error.headers else None
66+
if not raw:
67+
return None
68+
try:
69+
return max(0.0, float(raw))
70+
except ValueError:
71+
return None
72+
73+
5874
def _load_cameras() -> list[dict]:
5975
here = os.path.dirname(os.path.abspath(__file__))
6076
with open(os.path.join(here, "cameras.json")) as f:
@@ -100,6 +116,11 @@ def fetch_latest_image(cam: dict, api_key: str | None = None) -> dict | None:
100116
except HTTPError as e:
101117
if e.code == 404:
102118
return None
119+
if e.code == 429:
120+
raise UpstreamRateLimit(
121+
f"HTTP 429 Too Many Requests for {cam_id}",
122+
_retry_after_seconds(e),
123+
) from e
103124
raise
104125
except Exception as e:
105126
print(f" [WARN] NIMS listFiles failed for {cam_id}: {e}")
@@ -175,6 +196,9 @@ def __init__(self, camera_filter: list[str] | None = None):
175196

176197
# Track last image filename per camera to avoid duplicate publishes
177198
self._last_filename: dict[str, str] = {}
199+
self._usgs_cooldown_until = 0.0
200+
self._request_delay = float(os.environ.get("USGS_NIMS_REQUEST_DELAY", "3.0"))
201+
self._rate_limit_backoff = float(os.environ.get("USGS_NIMS_429_BACKOFF", "900"))
178202

179203
# REST config
180204
self._base_url = os.environ.get(
@@ -276,13 +300,25 @@ def publish_cycle(self, dry_run: bool = False) -> int:
276300
for cam in self.cameras:
277301
nwis_id = cam["nwisId"]
278302
cam_id = cam["camId"]
303+
cooldown_remaining = self._usgs_cooldown_until - time.time()
304+
if cooldown_remaining > 0:
305+
self.stats["skipped"] += 1
306+
print(f" [{ts_label}] NIMS cooldown active; skipping fetches for {cooldown_remaining:.0f}s")
307+
return published
308+
279309
ds_id = self._ds_ids.get(nwis_id)
280310
if ds_id is None and not dry_run:
281311
continue
282312

283313
# Fetch latest image from NIMS
284314
try:
285315
img = fetch_latest_image(cam, api_key=self.api_key)
316+
except UpstreamRateLimit as e:
317+
backoff = e.retry_after or self._rate_limit_backoff
318+
self._usgs_cooldown_until = time.time() + backoff
319+
self.stats["skipped"] += 1
320+
print(f" [{ts_label}] {nwis_id}/{cam_id}: RATE LIMITED; backing off {backoff:.0f}s")
321+
return published
286322
except Exception as e:
287323
self.stats["errors"] += 1
288324
print(f" [{ts_label}] {nwis_id}/{cam_id}: FETCH ERR {e}")
@@ -333,8 +369,7 @@ def publish_cycle(self, dry_run: bool = False) -> int:
333369
self.stats["errors"] += 1
334370
print(f" [{ts_label}] {nwis_id}/{cam_id}: ERR {e}")
335371

336-
# Be polite to NIMS API
337-
time.sleep(0.5)
372+
time.sleep(self._request_delay)
338373

339374
return published
340375

publishers/usgs_water/usgs_water_publisher.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,22 @@
6868
FETCH_LIMIT = 5
6969

7070

71+
class UpstreamRateLimit(RuntimeError):
72+
def __init__(self, message: str, retry_after: float | None = None):
73+
super().__init__(message)
74+
self.retry_after = retry_after
75+
76+
77+
def _retry_after_seconds(error: HTTPError) -> float | None:
78+
raw = error.headers.get("Retry-After") if error.headers else None
79+
if not raw:
80+
return None
81+
try:
82+
return max(0.0, float(raw))
83+
except ValueError:
84+
return None
85+
86+
7187
def _load_stations() -> list[dict]:
7288
here = os.path.dirname(os.path.abspath(__file__))
7389
with open(os.path.join(here, "stations.json")) as f:
@@ -106,6 +122,11 @@ def fetch_continuous_values(nwis_id: str, parameter_code: str,
106122
except HTTPError as e:
107123
if e.code == 404:
108124
return []
125+
if e.code == 429:
126+
raise UpstreamRateLimit(
127+
f"HTTP 429 Too Many Requests for {nwis_id}/{parameter_code}",
128+
_retry_after_seconds(e),
129+
) from e
109130
raise
110131
except Exception as e:
111132
print(f" [WARN] USGS fetch failed for {nwis_id}/{parameter_code}: {e}")
@@ -185,6 +206,9 @@ def __init__(self, station_filter: list[str] | None = None):
185206

186207
# Track last observation timestamp per (station, param) to avoid duplicates
187208
self._last_obs_ts: dict[str, float] = {}
209+
self._usgs_cooldown_until = 0.0
210+
self._request_delay = float(os.environ.get("USGS_REQUEST_DELAY", "2.0"))
211+
self._rate_limit_backoff = float(os.environ.get("USGS_429_BACKOFF", "900"))
188212

189213
# REST config
190214
self._base_url = os.environ.get(
@@ -327,6 +351,12 @@ def publish_cycle(self, dry_run: bool = False) -> int:
327351
station_ds = self._ds_ids.get(nwis_id, {})
328352

329353
for param_code in st.get("parameterCodes", []):
354+
cooldown_remaining = self._usgs_cooldown_until - time.time()
355+
if cooldown_remaining > 0:
356+
self.stats["skipped"] += 1
357+
print(f" [{ts_label}] USGS cooldown active; skipping fetches for {cooldown_remaining:.0f}s")
358+
return published
359+
330360
ds_id = station_ds.get(param_code)
331361
if ds_id is None and not dry_run:
332362
continue
@@ -338,6 +368,12 @@ def publish_cycle(self, dry_run: bool = False) -> int:
338368
values = fetch_continuous_values(
339369
nwis_id, param_code,
340370
api_key=self.api_key, limit=1)
371+
except UpstreamRateLimit as e:
372+
backoff = e.retry_after or self._rate_limit_backoff
373+
self._usgs_cooldown_until = time.time() + backoff
374+
self.stats["skipped"] += 1
375+
print(f" [{ts_label}] {nwis_id}/{param_code}: RATE LIMITED; backing off {backoff:.0f}s")
376+
return published
341377
except Exception as e:
342378
self.stats["errors"] += 1
343379
print(f" [{ts_label}] {nwis_id}/{param_code}: FETCH ERR {e}")
@@ -389,8 +425,7 @@ def publish_cycle(self, dry_run: bool = False) -> int:
389425
self.stats["errors"] += 1
390426
print(f" [{ts_label}] {nwis_id}/{param_code}: ERR {e}")
391427

392-
# Be polite to USGS API
393-
time.sleep(0.3)
428+
time.sleep(self._request_delay)
394429

395430
return published
396431

0 commit comments

Comments
 (0)