From 40559a35d79fabcf95f9e26e5a04d1f1d531e3ab Mon Sep 17 00:00:00 2001 From: elmartinj Date: Wed, 8 Apr 2026 17:22:26 -0600 Subject: [PATCH] Add initial CENACE pipeline integration --- src/cenace_pipeline.py | 69 +++++++++++++++ src/data/cenace/__init__.py | 0 src/data/cenace/aggregate/core.py | 48 +++++++++++ src/data/cenace/config.py | 15 ++++ src/data/cenace/transform/core.py | 0 src/data/cenace/utils/cenace_data.py | 88 +++++++++++++++++++ src/dataset_registry.py | 25 ++++++ src/evaluation/cenace/__init__.py | 0 src/evaluation/cenace/core.py | 92 ++++++++++++++++++++ src/evaluation/cenace/metrics.py | 47 +++++++++++ src/forecast/cenace/__init__.py | 0 src/forecast/cenace/core.py | 93 ++++++++++++++++++++ src/forecast/cenace/models.py | 122 +++++++++++++++++++++++++++ src/pipeline.py | 32 +++++++ 14 files changed, 631 insertions(+) create mode 100644 src/cenace_pipeline.py create mode 100644 src/data/cenace/__init__.py create mode 100644 src/data/cenace/aggregate/core.py create mode 100644 src/data/cenace/config.py create mode 100644 src/data/cenace/transform/core.py create mode 100644 src/data/cenace/utils/cenace_data.py create mode 100644 src/dataset_registry.py create mode 100644 src/evaluation/cenace/__init__.py create mode 100644 src/evaluation/cenace/core.py create mode 100644 src/evaluation/cenace/metrics.py create mode 100644 src/forecast/cenace/__init__.py create mode 100644 src/forecast/cenace/core.py create mode 100644 src/forecast/cenace/models.py create mode 100644 src/pipeline.py diff --git a/src/cenace_pipeline.py b/src/cenace_pipeline.py new file mode 100644 index 0000000..e437e7c --- /dev/null +++ b/src/cenace_pipeline.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import argparse + +import pandas as pd + +from src.data.cenace.aggregate.core import build_hourly_partitions +from src.evaluation.cenace.core import run_evaluation +from src.forecast.cenace.core import run_forecast + + +def run_cenace_pipeline( + cutoff: str, + model: str, + h: int = 24, + max_window_size: int = 48, + skip_aggregate: bool = False, +) -> tuple[str, str]: + try: + cutoff_ts = pd.Timestamp(cutoff) + except Exception as exc: + raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc + + if not skip_aggregate: + n_written = build_hourly_partitions() + print(f"Aggregated {n_written} partitions") + + forecast_path = run_forecast( + cutoff=cutoff_ts, + model=model, + h=h, + max_window_size=max_window_size, + ) + print(f"Forecasts saved to: {forecast_path}") + + eval_path = run_evaluation( + cutoff=cutoff_ts, + model=model, + h=h, + max_window_size=max_window_size, + ) + print(f"Metrics saved to: {eval_path}") + + return str(forecast_path), str(eval_path) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + parser.add_argument("--skip-aggregate", action="store_true") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + run_cenace_pipeline( + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + skip_aggregate=args.skip_aggregate, + ) + + +if __name__ == "__main__": + main() diff --git a/src/data/cenace/__init__.py b/src/data/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/cenace/aggregate/core.py b/src/data/cenace/aggregate/core.py new file mode 100644 index 0000000..99433f8 --- /dev/null +++ b/src/data/cenace/aggregate/core.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import pandas as pd + +from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR + +INPUT_CSV = PROCESSED_CSV +OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR + + +def build_hourly_partitions() -> int: + df = pd.read_csv(INPUT_CSV) + + df["ds"] = pd.to_datetime(df["ds"], errors="coerce") + df["y"] = pd.to_numeric(df["y"], errors="coerce") + + df = df.dropna(subset=["unique_id", "ds", "y"]).copy() + df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"]) + + df["year"] = df["ds"].dt.year + df["month"] = df["ds"].dt.month + df["day"] = df["ds"].dt.day + + OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) + + n_written = 0 + for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True): + part_dir = ( + OUTPUT_ROOT / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}" + ) + part_dir.mkdir(parents=True, exist_ok=True) + + out_path = part_dir / "series.parquet" + part[["unique_id", "ds", "y"]].to_parquet(out_path, index=False) + + print(f"Saved: {out_path}") + n_written += 1 + + return n_written + + +def main() -> None: + n_written = build_hourly_partitions() + print(f"\nDone. Wrote {n_written} daily partitions.") + + +if __name__ == "__main__": + main() diff --git a/src/data/cenace/config.py b/src/data/cenace/config.py new file mode 100644 index 0000000..baf3893 --- /dev/null +++ b/src/data/cenace/config.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[3] + +DATA_ROOT = ROOT / "data" / "cenace" + +TMP_DIR = DATA_ROOT / "tmp" +PROCESSED_DIR = DATA_ROOT / "processed" +PROCESSED_CSV = PROCESSED_DIR / "cenace.csv" + +PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly" +FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly" +EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly" diff --git a/src/data/cenace/transform/core.py b/src/data/cenace/transform/core.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/cenace/utils/cenace_data.py b/src/data/cenace/utils/cenace_data.py new file mode 100644 index 0000000..5142720 --- /dev/null +++ b/src/data/cenace/utils/cenace_data.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +import duckdb +import pandas as pd + + +@dataclass +class CENACEData: + base_path: Path + freq: str = "hourly" + h: int = 24 + max_window_size: int = 24 * 90 + + def __post_init__(self) -> None: + self.base_path = Path(self.base_path) + + def _date_to_partition(self, d: pd.Timestamp) -> Path: + return ( + self.base_path + / f"year={d.year:04d}" + / f"month={d.month:02d}" + / f"day={d.day:02d}" + / "series.parquet" + ) + + def _paths_for_range(self, start: pd.Timestamp, end: pd.Timestamp) -> list[str]: + days = pd.date_range(start.normalize(), end.normalize(), freq="D") + paths = [self._date_to_partition(d) for d in days] + existing = [str(p) for p in paths if p.exists()] + if not existing: + raise FileNotFoundError( + f"No parquet files found between {start} and \ + {end} under {self.base_path}" + ) + return existing + + def get_df( + self, + cutoff: str | pd.Timestamp, + max_window_size: int | None = None, + sort: bool = True, + ) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + window = max_window_size or self.max_window_size + start = cutoff - pd.Timedelta(hours=window - 1) + + paths = self._paths_for_range(start, cutoff) + + query = f""" + SELECT unique_id, ds, y + FROM read_parquet({paths}) + WHERE ds >= TIMESTAMP '{start}' + AND ds <= TIMESTAMP '{cutoff}' + """ + + df = duckdb.sql(query).df() + df["ds"] = pd.to_datetime(df["ds"]) + + if sort: + df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) + + return df + + def get_actuals( + self, cutoff: str | pd.Timestamp, h: int | None = None + ) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + horizon = h or self.h + + start = cutoff + pd.Timedelta(hours=1) + end = cutoff + pd.Timedelta(hours=horizon) + + paths = self._paths_for_range(start, end) + + query = f""" + SELECT unique_id, ds, y + FROM read_parquet({paths}) + WHERE ds >= TIMESTAMP '{start}' + AND ds <= TIMESTAMP '{end}' + """ + + df = duckdb.sql(query).df() + df["ds"] = pd.to_datetime(df["ds"]) + df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) + return df diff --git a/src/dataset_registry.py b/src/dataset_registry.py new file mode 100644 index 0000000..87e674c --- /dev/null +++ b/src/dataset_registry.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from src.cenace_pipeline import run_cenace_pipeline + +DatasetRunner = Callable[..., tuple[str, str]] + +DATASET_REGISTRY: dict[str, DatasetRunner] = { + "cenace": run_cenace_pipeline, +} + +PIPELINE_DATASET_CHOICES = sorted(DATASET_REGISTRY) + + +def run_dataset_pipeline(dataset: str, **kwargs: Any) -> tuple[str, str]: + try: + runner = DATASET_REGISTRY[dataset] + except KeyError as exc: + raise ValueError( + f"Unsupported dataset: {dataset}. " f"Available: {PIPELINE_DATASET_CHOICES}" + ) from exc + + return runner(**kwargs) diff --git a/src/evaluation/cenace/__init__.py b/src/evaluation/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/evaluation/cenace/core.py b/src/evaluation/cenace/core.py new file mode 100644 index 0000000..b6207e9 --- /dev/null +++ b/src/evaluation/cenace/core.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from src.data.cenace.config import ( + EVALUATIONS_HOURLY_DIR, + FORECASTS_HOURLY_DIR, + PROCESSED_EVENTS_HOURLY_DIR, +) +from src.data.cenace.utils.cenace_data import CENACEData +from src.evaluation.cenace.metrics import evaluate_forecasts + + +def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: + return ( + root + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + ) + + +def run_evaluation( + cutoff: str | pd.Timestamp, + model: str, + h: int = 24, + max_window_size: int = 48, +) -> Path: + cutoff = pd.Timestamp(cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=h, + max_window_size=max_window_size, + ) + + forecast_path = ( + FORECASTS_HOURLY_DIR + / model + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + / "forecasts.parquet" + ) + + actuals = data.get_actuals(cutoff, h=h) + forecasts = pd.read_parquet(forecast_path) + + merged = forecasts.merge(actuals, on=["unique_id", "ds"], how="inner") + if merged.empty: + raise ValueError("Merged forecasts/actuals is empty") + + metrics = evaluate_forecasts(merged) + + eval_root = EVALUATIONS_HOURLY_DIR / model + out_dir = cutoff_partition(eval_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "metrics.parquet" + + metrics.to_parquet(out_path, index=False) + return out_path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + out_path = run_evaluation( + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + ) + metrics = pd.read_parquet(out_path) + print(f"Saved metrics: {out_path}") + print(metrics.head()) + print(metrics.shape) + + +if __name__ == "__main__": + main() diff --git a/src/evaluation/cenace/metrics.py b/src/evaluation/cenace/metrics.py new file mode 100644 index 0000000..435fe70 --- /dev/null +++ b/src/evaluation/cenace/metrics.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import pandas as pd + + +def mae(y_true: pd.Series, y_pred: pd.Series) -> float: + return (y_true - y_pred).abs().mean() + + +def rmse(y_true: pd.Series, y_pred: pd.Series) -> float: + return ((y_true - y_pred) ** 2).mean() ** 0.5 + + +def smape(y_true: pd.Series, y_pred: pd.Series) -> float: + denom = (y_true.abs() + y_pred.abs()) / 2 + out = (y_true - y_pred).abs() / denom + out = out.where(denom != 0, 0.0) + return 100 * out.mean() + + +def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame: + per_uid = ( + merged.groupby("unique_id", as_index=False) + .apply( + lambda g: pd.Series( + { + "mae": mae(g["y"], g["y_hat"]), + "rmse": rmse(g["y"], g["y_hat"]), + "smape": smape(g["y"], g["y_hat"]), + } + ) + ) + .reset_index(drop=True) + ) + + overall = pd.DataFrame( + [ + { + "unique_id": "__overall__", + "mae": mae(merged["y"], merged["y_hat"]), + "rmse": rmse(merged["y"], merged["y_hat"]), + "smape": smape(merged["y"], merged["y_hat"]), + } + ] + ) + + return pd.concat([per_uid, overall], ignore_index=True) diff --git a/src/forecast/cenace/__init__.py b/src/forecast/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/forecast/cenace/core.py b/src/forecast/cenace/core.py new file mode 100644 index 0000000..1bc8b72 --- /dev/null +++ b/src/forecast/cenace/core.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from src.data.cenace.config import FORECASTS_HOURLY_DIR, PROCESSED_EVENTS_HOURLY_DIR +from src.data.cenace.utils.cenace_data import CENACEData +from src.forecast.cenace.models import MODEL_REGISTRY + + +def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: + return ( + root + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True, choices=sorted(MODEL_REGISTRY)) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + return parser.parse_args() + + +def run_forecast( + cutoff: str | pd.Timestamp, + model: str, + h: int = 24, + max_window_size: int = 48, +) -> Path: + cutoff = pd.Timestamp(cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=h, + max_window_size=max_window_size, + ) + + train = data.get_df(cutoff, max_window_size=max_window_size) + + if model not in MODEL_REGISTRY: + raise ValueError( + f"Unknown CENACE model: {model}. " f"Available: {sorted(MODEL_REGISTRY)}" + ) + + model_fn = MODEL_REGISTRY[model] + forecasts = model_fn(train, cutoff=cutoff, h=h) + + forecast_root = FORECASTS_HOURLY_DIR / model + out_dir = cutoff_partition(forecast_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "forecasts.parquet" + + forecasts.to_parquet(out_path, index=False) + return out_path + + +def main() -> None: + args = parse_args() + cutoff = pd.Timestamp(args.cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=args.h, + max_window_size=args.max_window_size, + ) + + train = data.get_df(cutoff, max_window_size=args.max_window_size) + model_fn = MODEL_REGISTRY[args.model] + forecasts = model_fn(train, cutoff=cutoff, h=args.h) + + forecast_root = FORECASTS_HOURLY_DIR / args.model + out_dir = cutoff_partition(forecast_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "forecasts.parquet" + + forecasts.to_parquet(out_path, index=False) + + print(f"Saved forecasts: {out_path}") + print(forecasts.head()) + print(forecasts.shape) + + +if __name__ == "__main__": + main() diff --git a/src/forecast/cenace/models.py b/src/forecast/cenace/models.py new file mode 100644 index 0000000..3a11d21 --- /dev/null +++ b/src/forecast/cenace/models.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import pandas as pd + +from src.forecast.forecast import generate_forecast + + +def naive_last_value(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + + last_values = ( + train_df.sort_values(["unique_id", "ds"]) + .groupby("unique_id", as_index=False) + .tail(1)[["unique_id", "y"]] + .rename(columns={"y": "y_hat"}) + ) + + future_ds = pd.date_range( + cutoff + pd.Timedelta(hours=1), + periods=h, + freq="h", + ) + + out = [] + for ds in future_ds: + tmp = last_values.copy() + tmp["ds"] = ds + out.append(tmp) + + fcst = pd.concat(out, ignore_index=True) + return ( + fcst[["unique_id", "ds", "y_hat"]] + .sort_values(["unique_id", "ds"]) + .reset_index(drop=True) + ) + + +def seasonal_naive_24(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + + expected_start = cutoff - pd.Timedelta(hours=23) + last_day = train_df.loc[ + (train_df["ds"] >= expected_start) & (train_df["ds"] <= cutoff), + ["unique_id", "ds", "y"], + ].copy() + + counts = last_day.groupby("unique_id")["ds"].count() + bad_ids = counts[counts != 24] + if not bad_ids.empty: + raise ValueError( + "seasonal_naive_24 needs exactly 24 hourly observations in the last day " + f"for every series. Bad series count: {len(bad_ids)}" + ) + + last_day["hour_ahead"] = ( + (last_day["ds"] - expected_start) / pd.Timedelta(hours=1) + ).astype(int) + profile = last_day[["unique_id", "hour_ahead", "y"]].rename(columns={"y": "y_hat"}) + + unique_ids = profile["unique_id"].drop_duplicates().sort_values().tolist() + future_ds = pd.date_range(cutoff + pd.Timedelta(hours=1), periods=h, freq="h") + + future_index = pd.DataFrame( + [(uid, ds, i % 24) for i, ds in enumerate(future_ds) for uid in unique_ids], + columns=["unique_id", "ds", "hour_ahead"], + ) + + fcst = ( + future_index.merge(profile, on=["unique_id", "hour_ahead"], how="left")[ + ["unique_id", "ds", "y_hat"] + ] + .sort_values(["unique_id", "ds"]) + .reset_index(drop=True) + ) + + if fcst["y_hat"].isna().any(): + raise ValueError( + "seasonal_naive_24" + " produced missing forecasts after profile merge." + ) + + return fcst + + +def auto_arima(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + + raw = generate_forecast( + model_name="auto_arima", + df=train_df, + h=h, + freq="h", + ).copy() + + point_candidates = [ + col + for col in raw.columns + if col not in {"unique_id", "ds"} and "-q-" not in col + ] + if "auto_arima" in point_candidates: + point_col = "auto_arima" + elif len(point_candidates) == 1: + point_col = point_candidates[0] + else: + raise ValueError( + "Could not detect AutoARIMA point forecast column. " + f"Candidates found: {point_candidates}" + ) + + fcst = ( + raw[["unique_id", "ds", point_col]] + .rename(columns={point_col: "y_hat"}) + .sort_values(["unique_id", "ds"]) + .reset_index(drop=True) + ) + return fcst + + +MODEL_REGISTRY = { + "naive_last_value": naive_last_value, + "seasonal_naive_24": seasonal_naive_24, + "auto_arima": auto_arima, +} diff --git a/src/pipeline.py b/src/pipeline.py new file mode 100644 index 0000000..28bc14b --- /dev/null +++ b/src/pipeline.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import argparse + +from src.dataset_registry import PIPELINE_DATASET_CHOICES, run_dataset_pipeline + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--dataset", required=True, choices=PIPELINE_DATASET_CHOICES) + parser.add_argument("--model", required=True) + parser.add_argument("--cutoff", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + parser.add_argument("--skip-aggregate", action="store_true") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + run_dataset_pipeline( + dataset=args.dataset, + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + skip_aggregate=args.skip_aggregate, + ) + + +if __name__ == "__main__": + main()