Skip to content

Commit 0b192fe

Browse files
committed
Harden ISS and water publishers
1 parent 5e1d62a commit 0b192fe

2 files changed

Lines changed: 128 additions & 28 deletions

File tree

publishers/iss/iss_publisher.py

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
import math
2828
import os
2929
import sys
30+
import tempfile
3031
import time
3132
from datetime import datetime, timedelta, timezone
33+
from pathlib import Path
3234
from typing import Any
3335
from urllib.request import Request, urlopen
3436

@@ -50,6 +52,25 @@
5052
NORAD_ID = os.environ.get("NORAD_ID", "25544")
5153
ASSET_NAME = os.environ.get("ASSET_NAME", "ISS (ZARYA)")
5254
CELESTRAK_URL = f"https://celestrak.org/NORAD/elements/gp.php?CATNR={NORAD_ID}&FORMAT=JSON"
55+
TLE_CACHE_FILE = Path(os.environ.get(
56+
"ISS_TLE_CACHE_FILE",
57+
os.path.join(tempfile.gettempdir(), f"iss-{NORAD_ID}-omm.json"),
58+
))
59+
FALLBACK_OMM = {
60+
"OBJECT_NAME": "ISS (ZARYA)",
61+
"OBJECT_ID": "1998-067A",
62+
"EPOCH": "2026-05-25T10:07:58.537056",
63+
"MEAN_MOTION": 15.49365963,
64+
"ECCENTRICITY": 0.00074701,
65+
"INCLINATION": 51.633,
66+
"RA_OF_ASC_NODE": 52.7989,
67+
"ARG_OF_PERICENTER": 96.3597,
68+
"MEAN_ANOMALY": 263.8242,
69+
"NORAD_CAT_ID": 25544,
70+
"BSTAR": 0.00018076148,
71+
"MEAN_MOTION_DOT": 9.654e-5,
72+
"MEAN_MOTION_DDOT": 0,
73+
}
5374

5475

5576
# ═══════════════════════════════════════════════════════════════════════════
@@ -63,18 +84,9 @@
6384
_tle_refresh_interval: float = 3600.0
6485

6586

66-
def fetch_tle_from_celestrak() -> Satrec:
87+
def _satrec_from_omm(omm: dict) -> Satrec:
6788
global _cached_satrec, _tle_fetched_at, _tle_epoch_str, _tle_epoch_dt
6889

69-
req = Request(CELESTRAK_URL, headers={"Accept": "application/json"})
70-
with urlopen(req, timeout=30) as resp:
71-
data = json.loads(resp.read().decode())
72-
73-
if isinstance(data, list) and len(data) > 0:
74-
omm = data[0]
75-
else:
76-
raise RuntimeError(f"Unexpected CelesTrak response: {str(data)[:200]}")
77-
7890
sat = Satrec()
7991
sat.sgp4init(
8092
WGS72, 'i',
@@ -102,6 +114,51 @@ def fetch_tle_from_celestrak() -> Satrec:
102114
return sat
103115

104116

117+
def _normalize_omm_response(data: Any) -> dict:
118+
if isinstance(data, list) and len(data) > 0:
119+
return data[0]
120+
if isinstance(data, dict):
121+
return data
122+
raise RuntimeError(f"Unexpected CelesTrak response: {str(data)[:200]}")
123+
124+
125+
def _write_tle_cache(omm: dict) -> None:
126+
try:
127+
TLE_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
128+
TLE_CACHE_FILE.write_text(json.dumps(omm), encoding="utf-8")
129+
except OSError as e:
130+
print(f" [WARN] Could not write TLE cache {TLE_CACHE_FILE}: {e}")
131+
132+
133+
def load_cached_or_fallback_tle(reason: str) -> Satrec:
134+
print(f" [WARN] TLE fetch failed ({reason}); using cached/fallback OMM")
135+
if TLE_CACHE_FILE.exists():
136+
try:
137+
omm = _normalize_omm_response(json.loads(TLE_CACHE_FILE.read_text(encoding="utf-8")))
138+
sat = _satrec_from_omm(omm)
139+
print(f" TLE cache epoch: {_tle_epoch_str}")
140+
return sat
141+
except Exception as e:
142+
print(f" [WARN] Could not load TLE cache {TLE_CACHE_FILE}: {e}")
143+
144+
sat = _satrec_from_omm(FALLBACK_OMM)
145+
print(f" TLE fallback epoch: {_tle_epoch_str}")
146+
return sat
147+
148+
149+
def fetch_tle_from_celestrak() -> Satrec:
150+
global _cached_satrec, _tle_fetched_at, _tle_epoch_str, _tle_epoch_dt
151+
152+
req = Request(CELESTRAK_URL, headers={"Accept": "application/json"})
153+
with urlopen(req, timeout=30) as resp:
154+
data = json.loads(resp.read().decode())
155+
156+
omm = _normalize_omm_response(data)
157+
sat = _satrec_from_omm(omm)
158+
_write_tle_cache(omm)
159+
return sat
160+
161+
105162
def _epoch_to_jdsatepoch(epoch_str: str) -> float:
106163
dt = datetime.strptime(epoch_str, "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=timezone.utc)
107164
jd, fr = _datetime_to_jd(dt)
@@ -111,7 +168,14 @@ def _epoch_to_jdsatepoch(epoch_str: str) -> float:
111168
def get_satrec() -> Satrec:
112169
global _cached_satrec, _tle_fetched_at
113170
if _cached_satrec is None or (time.time() - _tle_fetched_at) > _tle_refresh_interval:
114-
fetch_tle_from_celestrak()
171+
try:
172+
fetch_tle_from_celestrak()
173+
except Exception as e:
174+
if _cached_satrec is None:
175+
load_cached_or_fallback_tle(str(e))
176+
else:
177+
_tle_fetched_at = time.time()
178+
print(f" [WARN] TLE refresh failed ({e}); keeping epoch {_tle_epoch_str}")
115179
return _cached_satrec
116180

117181

@@ -227,8 +291,7 @@ def on_startup(self, args):
227291
fetch_tle_from_celestrak()
228292
print(f" TLE epoch: {_tle_epoch_str}")
229293
except Exception as e:
230-
print(f" FATAL: Could not fetch TLE: {e}")
231-
sys.exit(1)
294+
load_cached_or_fallback_tle(str(e))
232295

233296
def connect(self):
234297
"""Connect to server. Uses REST mode when OSH_BASE_URL is set, SDK otherwise."""

publishers/usgs_water/usgs_water_publisher.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import json
3131
import os
3232
import random
33+
import re
3334
import ssl
3435
import sys
3536
import time
@@ -197,29 +198,63 @@ def __init__(self, station_filter: list[str] | None = None):
197198
def _system_uid(self, nwis_id: str) -> str:
198199
return f"urn:os4csapi:system:usgs-water:{nwis_id}:v1"
199200

201+
def _raw_datastream_ids(self, sys_id: str) -> dict[str, str]:
202+
url = f"{self._base_url}/systems/{sys_id}/datastreams"
203+
headers = {"Accept": "application/json", "Authorization": self._auth}
204+
ctx = ssl.create_default_context()
205+
ctx.check_hostname = False
206+
ctx.verify_mode = ssl.CERT_NONE
207+
with urlopen(Request(url, headers=headers), timeout=30, context=ctx) as resp:
208+
text = resp.read().decode()
209+
210+
station_ds: dict[str, str] = {}
211+
current_id = None
212+
for key, value in re.findall(r'"(id|outputName)"\s*:\s*"([^"]+)"', text):
213+
if key == "id":
214+
current_id = value
215+
elif key == "outputName" and current_id:
216+
if value == DS_DISCHARGE_OUTPUT:
217+
station_ds["00060"] = current_id
218+
elif value == DS_GAGE_HEIGHT_OUTPUT:
219+
station_ds["00065"] = current_id
220+
current_id = None
221+
return station_ds
222+
200223
def connect(self):
201224
"""Resolve system and datastream IDs for each station via REST API."""
202225
self._ds_ids.clear()
203226
connected = 0
204227
for st in self.stations:
205228
nwis_id = st["nwisId"]
206229
uid = self._system_uid(nwis_id)
207-
sys_id = find_by_uid(self._base_url, self._auth, "systems", uid, no_cache=True)
208-
if not sys_id:
209-
print(f" [WARN] System '{uid}' not found -- skipping {nwis_id}")
210-
continue
211-
212-
# Find datastreams
213-
ds_list = api_get(self._base_url, f"systems/{sys_id}/datastreams", self._auth)
230+
sys_id = None
214231
station_ds = {}
215-
if ds_list:
216-
for item in ds_list.get("items", []):
217-
output_name = item.get("outputName", "")
218-
ds_id = item.get("id")
219-
if output_name == DS_DISCHARGE_OUTPUT and ds_id:
220-
station_ds["00060"] = ds_id
221-
elif output_name == DS_GAGE_HEIGHT_OUTPUT and ds_id:
222-
station_ds["00065"] = ds_id
232+
try:
233+
sys_id = find_by_uid(self._base_url, self._auth, "systems", uid, no_cache=True)
234+
if not sys_id:
235+
print(f" [WARN] System '{uid}' not found -- skipping {nwis_id}")
236+
continue
237+
238+
ds_list = api_get(self._base_url, f"systems/{sys_id}/datastreams", self._auth)
239+
if ds_list:
240+
for item in ds_list.get("items", []):
241+
output_name = item.get("outputName", "")
242+
ds_id = item.get("id")
243+
if output_name == DS_DISCHARGE_OUTPUT and ds_id:
244+
station_ds["00060"] = ds_id
245+
elif output_name == DS_GAGE_HEIGHT_OUTPUT and ds_id:
246+
station_ds["00065"] = ds_id
247+
except Exception as e:
248+
if sys_id:
249+
station_ds = self._raw_datastream_ids(sys_id)
250+
if station_ds:
251+
print(f" [WARN] Used raw datastream fallback for {nwis_id}: {e}")
252+
else:
253+
print(f" [WARN] Could not resolve datastreams for {nwis_id}: {e}")
254+
continue
255+
else:
256+
print(f" [WARN] Could not resolve system for {nwis_id}: {e}")
257+
continue
223258

224259
if not station_ds:
225260
print(f" [WARN] No datastreams found for {nwis_id} -- skipping")
@@ -231,6 +266,8 @@ def connect(self):
231266
print(f" Connected: {nwis_id} -> sys={sys_id} ds=[{ds_summary}]")
232267

233268
print(f" Ready: {connected}/{len(self.stations)} stations connected")
269+
if connected == 0:
270+
raise RuntimeError("No USGS Water stations connected")
234271

235272
def connect_with_retry(self, max_attempts=10, base_delay=5.0, max_delay=120.0):
236273
for attempt in range(1, max_attempts + 1):

0 commit comments

Comments
 (0)