Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions scripts/cronjobs/earthcare-validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python3

import logging
import traceback
from datetime import timedelta

from cloudnet_api_client import APIClient
from earthcare_downloader import search
from requests import Session

from processing import utils
from processing.config import Config

logging.basicConfig(level=logging.INFO)

PRODUCTS = ("cpr-validation", "cpr-tc-validation")
SEARCH_RADIUS_KM = 200


def main() -> None:
config = Config()
session = utils.make_session()
try:
_process(config, session)
except Exception as err:
logging.exception("Fatal error in cronjob")
utils.send_slack_alert(
config,
err,
source="ec-cronjob",
log=traceback.format_exc(),
)


def _process(config: Config, session: Session) -> None:
yesterday = utils.utctoday() - timedelta(days=1)
client = APIClient()
sites = client.sites("cloudnet")
logging.info(f"Checking {len(sites)} sites for EarthCARE overpasses on {yesterday}")
for site in sites:
files = search(
product="CPR_NOM_1B",
lat=site.latitude,
lon=site.longitude,
radius=SEARCH_RADIUS_KM,
date=yesterday,
)
if not files:
continue
logging.info(
f"Found {len(files)} EarthCARE overpass(es) for {site.id} on {yesterday}"
)
for product in PRODUCTS:
_publish_task(config, session, site.id, product, str(yesterday))


def _publish_task(
config: Config,
session: Session,
site_id: str,
product_id: str,
date: str,
) -> None:
task = {
"type": "process",
"siteId": site_id,
"productId": product_id,
"measurementDate": date,
"scheduledAt": utils.utcnow().isoformat(),
"priority": 100,
}
logging.info(f"Publish task: {task}")
res = session.post(
f"{config.dataportal_url}/api/queue/publish",
json=task,
auth=config.data_submission_auth,
)
res.raise_for_status()


if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion src/processing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from processing.config import Config
from processing.version import __version__ as cloudnet_processing_version

ErrorSource = Literal["data", "worker", "freeze-cronjob", "qc-cronjob"]
ErrorSource = Literal["data", "worker", "freeze-cronjob", "qc-cronjob", "ec-cronjob"]


class Uuid:
Expand Down Expand Up @@ -116,6 +116,8 @@ def send_slack_alert(
label = ":cold_face: Freeze cronjob"
case "qc-cronjob":
label = ":first_place_medal: Yesterday's QC cronjob"
case "ec-cronjob":
label = ":satellite: EarthCARE validation cronjob"
case unknown_source:
label = f":interrobang: Unknown error source ({unknown_source})"

Expand Down