Skip to content

Commit 9f048cd

Browse files
committed
Add nightly cronjob script for EarthCARE validation processing
1 parent 7885f0b commit 9f048cd

2 files changed

Lines changed: 87 additions & 1 deletion

File tree

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#!/usr/bin/env python3
2+
3+
import logging
4+
import traceback
5+
from datetime import timedelta
6+
7+
from cloudnet_api_client import APIClient
8+
from earthcare_downloader import search
9+
from requests import Session
10+
11+
from processing import utils
12+
from processing.config import Config
13+
14+
logging.basicConfig(level=logging.INFO)
15+
16+
PRODUCTS = ("cpr-validation", "cpr-tc-validation")
17+
SEARCH_RADIUS_KM = 200
18+
19+
20+
def main() -> None:
21+
config = Config()
22+
session = utils.make_session()
23+
try:
24+
_process(config, session)
25+
except Exception as err:
26+
logging.exception("Fatal error in cronjob")
27+
utils.send_slack_alert(
28+
config,
29+
err,
30+
source="earthcare-validation-cronjob",
31+
log=traceback.format_exc(),
32+
)
33+
34+
35+
def _process(config: Config, session: Session) -> None:
36+
yesterday = utils.utctoday() - timedelta(days=1)
37+
client = APIClient()
38+
sites = client.sites("cloudnet")
39+
logging.info(f"Checking {len(sites)} sites for EarthCARE overpasses on {yesterday}")
40+
for site in sites:
41+
files = search(
42+
product="CPR_NOM_1B",
43+
lat=site.latitude,
44+
lon=site.longitude,
45+
radius=SEARCH_RADIUS_KM,
46+
date=yesterday,
47+
)
48+
if not files:
49+
continue
50+
logging.info(
51+
f"Found {len(files)} EarthCARE overpass(es) for {site.id} on {yesterday}"
52+
)
53+
for product in PRODUCTS:
54+
_publish_task(config, session, site.id, product, str(yesterday))
55+
56+
57+
def _publish_task(
58+
config: Config,
59+
session: Session,
60+
site_id: str,
61+
product_id: str,
62+
date: str,
63+
) -> None:
64+
task = {
65+
"type": "process",
66+
"siteId": site_id,
67+
"productId": product_id,
68+
"measurementDate": date,
69+
"scheduledAt": utils.utcnow().isoformat(),
70+
"priority": 100,
71+
}
72+
logging.info(f"Publish task: {task}")
73+
res = session.post(
74+
f"{config.dataportal_url}/api/queue/publish",
75+
json=task,
76+
auth=config.data_submission_auth,
77+
)
78+
res.raise_for_status()
79+
80+
81+
if __name__ == "__main__":
82+
main()

src/processing/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
from processing.config import Config
2020
from processing.version import __version__ as cloudnet_processing_version
2121

22-
ErrorSource = Literal["data", "worker", "freeze-cronjob", "qc-cronjob"]
22+
ErrorSource = Literal[
23+
"data", "worker", "freeze-cronjob", "qc-cronjob", "earthcare-validation-cronjob"
24+
]
2325

2426

2527
class Uuid:
@@ -116,6 +118,8 @@ def send_slack_alert(
116118
label = ":cold_face: Freeze cronjob"
117119
case "qc-cronjob":
118120
label = ":first_place_medal: Yesterday's QC cronjob"
121+
case "earthcare-validation-cronjob":
122+
label = ":satellite: EarthCARE validation cronjob"
119123
case unknown_source:
120124
label = f":interrobang: Unknown error source ({unknown_source})"
121125

0 commit comments

Comments
 (0)