-
Notifications
You must be signed in to change notification settings - Fork 1
Add initial CENACE pipeline integration #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
|
|
||
| import pandas as pd | ||
|
|
||
| from src.data.cenace.aggregate.core import build_hourly_partitions | ||
| from src.evaluation.cenace.core import run_evaluation | ||
| from src.forecast.cenace.core import run_forecast | ||
|
|
||
|
|
||
| def run_cenace_pipeline( | ||
| cutoff: str, | ||
| model: str, | ||
| h: int = 24, | ||
| max_window_size: int = 48, | ||
| skip_aggregate: bool = False, | ||
| ) -> tuple[str, str]: | ||
| try: | ||
| cutoff_ts = pd.Timestamp(cutoff) | ||
| except Exception as exc: | ||
| raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc | ||
|
|
||
| if not skip_aggregate: | ||
| n_written = build_hourly_partitions() | ||
| print(f"Aggregated {n_written} partitions") | ||
|
|
||
| forecast_path = run_forecast( | ||
| cutoff=cutoff_ts, | ||
| model=model, | ||
| h=h, | ||
| max_window_size=max_window_size, | ||
| ) | ||
| print(f"Forecasts saved to: {forecast_path}") | ||
|
|
||
| eval_path = run_evaluation( | ||
| cutoff=cutoff_ts, | ||
| model=model, | ||
| h=h, | ||
| max_window_size=max_window_size, | ||
| ) | ||
| print(f"Metrics saved to: {eval_path}") | ||
|
|
||
| return str(forecast_path), str(eval_path) | ||
|
|
||
|
|
||
| def parse_args() -> argparse.Namespace: | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--cutoff", required=True) | ||
| parser.add_argument("--model", required=True) | ||
| parser.add_argument("--h", type=int, default=24) | ||
| parser.add_argument("--max-window-size", type=int, default=48) | ||
| parser.add_argument("--skip-aggregate", action="store_true") | ||
| return parser.parse_args() | ||
|
|
||
|
|
||
| def main() -> None: | ||
| args = parse_args() | ||
| run_cenace_pipeline( | ||
| cutoff=args.cutoff, | ||
| model=args.model, | ||
| h=args.h, | ||
| max_window_size=args.max_window_size, | ||
| skip_aggregate=args.skip_aggregate, | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import pandas as pd | ||
|
|
||
| from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR | ||
|
|
||
| INPUT_CSV = PROCESSED_CSV | ||
| OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR | ||
|
|
||
|
|
||
| def build_hourly_partitions() -> int: | ||
| df = pd.read_csv(INPUT_CSV) | ||
|
|
||
| df["ds"] = pd.to_datetime(df["ds"], errors="coerce") | ||
| df["y"] = pd.to_numeric(df["y"], errors="coerce") | ||
|
|
||
| df = df.dropna(subset=["unique_id", "ds", "y"]).copy() | ||
| df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"]) | ||
|
|
||
| df["year"] = df["ds"].dt.year | ||
| df["month"] = df["ds"].dt.month | ||
| df["day"] = df["ds"].dt.day | ||
|
|
||
| OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| n_written = 0 | ||
| for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True): | ||
| part_dir = ( | ||
| OUTPUT_ROOT / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}" | ||
| ) | ||
| part_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| out_path = part_dir / "series.parquet" | ||
| part[["unique_id", "ds", "y"]].to_parquet(out_path, index=False) | ||
|
|
||
| print(f"Saved: {out_path}") | ||
| n_written += 1 | ||
|
|
||
| return n_written | ||
|
|
||
|
|
||
| def main() -> None: | ||
| n_written = build_hourly_partitions() | ||
| print(f"\nDone. Wrote {n_written} daily partitions.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from pathlib import Path | ||
|
|
||
| ROOT = Path(__file__).resolve().parents[3] | ||
|
|
||
| DATA_ROOT = ROOT / "data" / "cenace" | ||
|
|
||
| TMP_DIR = DATA_ROOT / "tmp" | ||
| PROCESSED_DIR = DATA_ROOT / "processed" | ||
| PROCESSED_CSV = PROCESSED_DIR / "cenace.csv" | ||
|
|
||
| PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly" | ||
| FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly" | ||
| EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly" |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,88 @@ | ||||||||
| from __future__ import annotations | ||||||||
|
|
||||||||
| from dataclasses import dataclass | ||||||||
| from pathlib import Path | ||||||||
|
|
||||||||
| import duckdb | ||||||||
| import pandas as pd | ||||||||
|
|
||||||||
|
|
||||||||
| @dataclass | ||||||||
| class CENACEData: | ||||||||
| base_path: Path | ||||||||
| freq: str = "hourly" | ||||||||
| h: int = 24 | ||||||||
| max_window_size: int = 24 * 90 | ||||||||
|
|
||||||||
| def __post_init__(self) -> None: | ||||||||
| self.base_path = Path(self.base_path) | ||||||||
|
|
||||||||
| def _date_to_partition(self, d: pd.Timestamp) -> Path: | ||||||||
| return ( | ||||||||
| self.base_path | ||||||||
| / f"year={d.year:04d}" | ||||||||
| / f"month={d.month:02d}" | ||||||||
| / f"day={d.day:02d}" | ||||||||
| / "series.parquet" | ||||||||
| ) | ||||||||
|
|
||||||||
| def _paths_for_range(self, start: pd.Timestamp, end: pd.Timestamp) -> list[str]: | ||||||||
| days = pd.date_range(start.normalize(), end.normalize(), freq="D") | ||||||||
| paths = [self._date_to_partition(d) for d in days] | ||||||||
| existing = [str(p) for p in paths if p.exists()] | ||||||||
| if not existing: | ||||||||
| raise FileNotFoundError( | ||||||||
| f"No parquet files found between {start} and \ | ||||||||
| {end} under {self.base_path}" | ||||||||
|
Comment on lines
+35
to
+36
|
||||||||
| f"No parquet files found between {start} and \ | |
| {end} under {self.base_path}" | |
| f"No parquet files found between {start} and {end} under {self.base_path}" |
Copilot
AI
Apr 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DuckDB parquet reads are built via duckdb.sql(query) with read_parquet({paths}), where {paths} is a Python list repr. This is brittle (path escaping/backslashes on Windows, quoting, and no explicit INSTALL/LOAD parquet like other modules) and can break unexpectedly. Build a connection (duckdb.connect(':memory:')), INSTALL/LOAD parquet, and pass an explicit SQL list of Path(...).as_posix() strings (as done in GH Archive code) to make reads robust.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Callable | ||
| from typing import Any | ||
|
|
||
| from src.cenace_pipeline import run_cenace_pipeline | ||
|
|
||
| DatasetRunner = Callable[..., tuple[str, str]] | ||
|
|
||
| DATASET_REGISTRY: dict[str, DatasetRunner] = { | ||
| "cenace": run_cenace_pipeline, | ||
| } | ||
|
|
||
| PIPELINE_DATASET_CHOICES = sorted(DATASET_REGISTRY) | ||
|
|
||
|
|
||
| def run_dataset_pipeline(dataset: str, **kwargs: Any) -> tuple[str, str]: | ||
| try: | ||
| runner = DATASET_REGISTRY[dataset] | ||
| except KeyError as exc: | ||
| raise ValueError( | ||
| f"Unsupported dataset: {dataset}. " f"Available: {PIPELINE_DATASET_CHOICES}" | ||
| ) from exc | ||
|
|
||
| return runner(**kwargs) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| from pathlib import Path | ||
|
|
||
| import pandas as pd | ||
|
|
||
| from src.data.cenace.config import ( | ||
| EVALUATIONS_HOURLY_DIR, | ||
| FORECASTS_HOURLY_DIR, | ||
| PROCESSED_EVENTS_HOURLY_DIR, | ||
| ) | ||
| from src.data.cenace.utils.cenace_data import CENACEData | ||
| from src.evaluation.cenace.metrics import evaluate_forecasts | ||
|
|
||
|
|
||
| def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: | ||
| return ( | ||
| root | ||
| / f"year={cutoff.year:04d}" | ||
| / f"month={cutoff.month:02d}" | ||
| / f"day={cutoff.day:02d}" | ||
| ) | ||
|
|
||
|
|
||
| def run_evaluation( | ||
| cutoff: str | pd.Timestamp, | ||
| model: str, | ||
| h: int = 24, | ||
| max_window_size: int = 48, | ||
| ) -> Path: | ||
| cutoff = pd.Timestamp(cutoff) | ||
|
|
||
| data = CENACEData( | ||
| base_path=PROCESSED_EVENTS_HOURLY_DIR, | ||
| freq="hourly", | ||
| h=h, | ||
| max_window_size=max_window_size, | ||
| ) | ||
|
|
||
| forecast_path = ( | ||
| FORECASTS_HOURLY_DIR | ||
| / model | ||
| / f"year={cutoff.year:04d}" | ||
| / f"month={cutoff.month:02d}" | ||
| / f"day={cutoff.day:02d}" | ||
| / "forecasts.parquet" | ||
| ) | ||
|
|
||
| actuals = data.get_actuals(cutoff, h=h) | ||
| forecasts = pd.read_parquet(forecast_path) | ||
|
|
||
| merged = forecasts.merge(actuals, on=["unique_id", "ds"], how="inner") | ||
| if merged.empty: | ||
| raise ValueError("Merged forecasts/actuals is empty") | ||
|
|
||
| metrics = evaluate_forecasts(merged) | ||
|
|
||
| eval_root = EVALUATIONS_HOURLY_DIR / model | ||
| out_dir = cutoff_partition(eval_root, cutoff) | ||
| out_dir.mkdir(parents=True, exist_ok=True) | ||
| out_path = out_dir / "metrics.parquet" | ||
|
|
||
| metrics.to_parquet(out_path, index=False) | ||
| return out_path | ||
|
|
||
|
|
||
| def parse_args() -> argparse.Namespace: | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--cutoff", required=True) | ||
| parser.add_argument("--model", required=True) | ||
| parser.add_argument("--h", type=int, default=24) | ||
| parser.add_argument("--max-window-size", type=int, default=48) | ||
| return parser.parse_args() | ||
|
|
||
|
|
||
| def main() -> None: | ||
| args = parse_args() | ||
| out_path = run_evaluation( | ||
| cutoff=args.cutoff, | ||
| model=args.model, | ||
| h=args.h, | ||
| max_window_size=args.max_window_size, | ||
| ) | ||
| metrics = pd.read_parquet(out_path) | ||
| print(f"Saved metrics: {out_path}") | ||
| print(metrics.head()) | ||
| print(metrics.shape) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import pandas as pd | ||
|
|
||
|
|
||
| def mae(y_true: pd.Series, y_pred: pd.Series) -> float: | ||
| return (y_true - y_pred).abs().mean() | ||
|
|
||
|
|
||
| def rmse(y_true: pd.Series, y_pred: pd.Series) -> float: | ||
| return ((y_true - y_pred) ** 2).mean() ** 0.5 | ||
|
|
||
|
|
||
| def smape(y_true: pd.Series, y_pred: pd.Series) -> float: | ||
| denom = (y_true.abs() + y_pred.abs()) / 2 | ||
| out = (y_true - y_pred).abs() / denom | ||
| out = out.where(denom != 0, 0.0) | ||
| return 100 * out.mean() | ||
|
|
||
|
|
||
| def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame: | ||
| per_uid = ( | ||
| merged.groupby("unique_id", as_index=False) | ||
| .apply( | ||
| lambda g: pd.Series( | ||
| { | ||
| "mae": mae(g["y"], g["y_hat"]), | ||
| "rmse": rmse(g["y"], g["y_hat"]), | ||
| "smape": smape(g["y"], g["y_hat"]), | ||
| } | ||
| ) | ||
| ) | ||
| .reset_index(drop=True) | ||
| ) | ||
|
Comment on lines
+21
to
+34
|
||
|
|
||
| overall = pd.DataFrame( | ||
| [ | ||
| { | ||
| "unique_id": "__overall__", | ||
| "mae": mae(merged["y"], merged["y_hat"]), | ||
| "rmse": rmse(merged["y"], merged["y_hat"]), | ||
| "smape": smape(merged["y"], merged["y_hat"]), | ||
| } | ||
| ] | ||
| ) | ||
|
|
||
| return pd.concat([per_uid, overall], ignore_index=True) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR introduces a new dataset pipeline (aggregation → forecasting → evaluation) but adds no tests. The repo already has pytest coverage for analogous GH Archive components (data loaders/partition logic/aggregation), so adding at least unit tests for
CENACEData.get_df/get_actuals(partition selection + DuckDB query), baseline models output shape/columns, andevaluate_forecastswould help prevent regressions.