Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/cenace_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

import argparse

import pandas as pd

from src.data.cenace.aggregate.core import build_hourly_partitions
from src.evaluation.cenace.core import run_evaluation
from src.forecast.cenace.core import run_forecast


def run_cenace_pipeline(
cutoff: str,
model: str,
h: int = 24,
max_window_size: int = 48,
skip_aggregate: bool = False,
) -> tuple[str, str]:
try:
cutoff_ts = pd.Timestamp(cutoff)
except Exception as exc:
raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc

if not skip_aggregate:
n_written = build_hourly_partitions()
print(f"Aggregated {n_written} partitions")

forecast_path = run_forecast(
cutoff=cutoff_ts,
model=model,
h=h,
max_window_size=max_window_size,
)
print(f"Forecasts saved to: {forecast_path}")

eval_path = run_evaluation(
cutoff=cutoff_ts,
model=model,
h=h,
max_window_size=max_window_size,
)
print(f"Metrics saved to: {eval_path}")

return str(forecast_path), str(eval_path)
Comment on lines +12 to +44
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces a new dataset pipeline (aggregation → forecasting → evaluation) but adds no tests. The repo already has pytest coverage for analogous GH Archive components (data loaders/partition logic/aggregation), so adding at least unit tests for CENACEData.get_df/get_actuals (partition selection + DuckDB query), baseline models output shape/columns, and evaluate_forecasts would help prevent regressions.

Copilot uses AI. Check for mistakes.


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--cutoff", required=True)
parser.add_argument("--model", required=True)
parser.add_argument("--h", type=int, default=24)
parser.add_argument("--max-window-size", type=int, default=48)
parser.add_argument("--skip-aggregate", action="store_true")
return parser.parse_args()


def main() -> None:
args = parse_args()
run_cenace_pipeline(
cutoff=args.cutoff,
model=args.model,
h=args.h,
max_window_size=args.max_window_size,
skip_aggregate=args.skip_aggregate,
)


if __name__ == "__main__":
main()
Empty file added src/data/cenace/__init__.py
Empty file.
48 changes: 48 additions & 0 deletions src/data/cenace/aggregate/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import pandas as pd

from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR

INPUT_CSV = PROCESSED_CSV
OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR


def build_hourly_partitions() -> int:
df = pd.read_csv(INPUT_CSV)

df["ds"] = pd.to_datetime(df["ds"], errors="coerce")
df["y"] = pd.to_numeric(df["y"], errors="coerce")

df = df.dropna(subset=["unique_id", "ds", "y"]).copy()
df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"])

df["year"] = df["ds"].dt.year
df["month"] = df["ds"].dt.month
df["day"] = df["ds"].dt.day

OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)

n_written = 0
for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True):
part_dir = (
OUTPUT_ROOT / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}"
)
part_dir.mkdir(parents=True, exist_ok=True)

out_path = part_dir / "series.parquet"
part[["unique_id", "ds", "y"]].to_parquet(out_path, index=False)

print(f"Saved: {out_path}")
n_written += 1

return n_written


def main() -> None:
n_written = build_hourly_partitions()
print(f"\nDone. Wrote {n_written} daily partitions.")


if __name__ == "__main__":
main()
15 changes: 15 additions & 0 deletions src/data/cenace/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from pathlib import Path

ROOT = Path(__file__).resolve().parents[3]

DATA_ROOT = ROOT / "data" / "cenace"

TMP_DIR = DATA_ROOT / "tmp"
PROCESSED_DIR = DATA_ROOT / "processed"
PROCESSED_CSV = PROCESSED_DIR / "cenace.csv"

PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly"
FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly"
EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly"
Empty file.
88 changes: 88 additions & 0 deletions src/data/cenace/utils/cenace_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

import duckdb
import pandas as pd


@dataclass
class CENACEData:
base_path: Path
freq: str = "hourly"
h: int = 24
max_window_size: int = 24 * 90

def __post_init__(self) -> None:
self.base_path = Path(self.base_path)

def _date_to_partition(self, d: pd.Timestamp) -> Path:
return (
self.base_path
/ f"year={d.year:04d}"
/ f"month={d.month:02d}"
/ f"day={d.day:02d}"
/ "series.parquet"
)

def _paths_for_range(self, start: pd.Timestamp, end: pd.Timestamp) -> list[str]:
days = pd.date_range(start.normalize(), end.normalize(), freq="D")
paths = [self._date_to_partition(d) for d in days]
existing = [str(p) for p in paths if p.exists()]
if not existing:
raise FileNotFoundError(
f"No parquet files found between {start} and \
{end} under {self.base_path}"
Comment on lines +35 to +36
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileNotFoundError message uses a line-continuation backslash inside the f-string, which will embed a newline and indentation spaces into the exception text. Format this as a single-line f-string (or use textwrap.dedent) so the error message is stable and readable.

Suggested change
f"No parquet files found between {start} and \
{end} under {self.base_path}"
f"No parquet files found between {start} and {end} under {self.base_path}"

Copilot uses AI. Check for mistakes.
)
return existing

def get_df(
self,
cutoff: str | pd.Timestamp,
max_window_size: int | None = None,
sort: bool = True,
) -> pd.DataFrame:
cutoff = pd.Timestamp(cutoff)
window = max_window_size or self.max_window_size
start = cutoff - pd.Timedelta(hours=window - 1)

paths = self._paths_for_range(start, cutoff)

query = f"""
SELECT unique_id, ds, y
FROM read_parquet({paths})
WHERE ds >= TIMESTAMP '{start}'
AND ds <= TIMESTAMP '{cutoff}'
"""

df = duckdb.sql(query).df()
df["ds"] = pd.to_datetime(df["ds"])
Comment on lines +50 to +60
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB parquet reads are built via duckdb.sql(query) with read_parquet({paths}), where {paths} is a Python list repr. This is brittle (path escaping/backslashes on Windows, quoting, and no explicit INSTALL/LOAD parquet like other modules) and can break unexpectedly. Build a connection (duckdb.connect(':memory:')), INSTALL/LOAD parquet, and pass an explicit SQL list of Path(...).as_posix() strings (as done in GH Archive code) to make reads robust.

Copilot uses AI. Check for mistakes.

if sort:
df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True)

return df

def get_actuals(
self, cutoff: str | pd.Timestamp, h: int | None = None
) -> pd.DataFrame:
cutoff = pd.Timestamp(cutoff)
horizon = h or self.h

start = cutoff + pd.Timedelta(hours=1)
end = cutoff + pd.Timedelta(hours=horizon)

paths = self._paths_for_range(start, end)

query = f"""
SELECT unique_id, ds, y
FROM read_parquet({paths})
WHERE ds >= TIMESTAMP '{start}'
AND ds <= TIMESTAMP '{end}'
"""

df = duckdb.sql(query).df()
df["ds"] = pd.to_datetime(df["ds"])
df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True)
return df
25 changes: 25 additions & 0 deletions src/dataset_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from collections.abc import Callable
from typing import Any

from src.cenace_pipeline import run_cenace_pipeline

DatasetRunner = Callable[..., tuple[str, str]]

DATASET_REGISTRY: dict[str, DatasetRunner] = {
"cenace": run_cenace_pipeline,
}

PIPELINE_DATASET_CHOICES = sorted(DATASET_REGISTRY)


def run_dataset_pipeline(dataset: str, **kwargs: Any) -> tuple[str, str]:
try:
runner = DATASET_REGISTRY[dataset]
except KeyError as exc:
raise ValueError(
f"Unsupported dataset: {dataset}. " f"Available: {PIPELINE_DATASET_CHOICES}"
) from exc

return runner(**kwargs)
Empty file.
92 changes: 92 additions & 0 deletions src/evaluation/cenace/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from __future__ import annotations

import argparse
from pathlib import Path

import pandas as pd

from src.data.cenace.config import (
EVALUATIONS_HOURLY_DIR,
FORECASTS_HOURLY_DIR,
PROCESSED_EVENTS_HOURLY_DIR,
)
from src.data.cenace.utils.cenace_data import CENACEData
from src.evaluation.cenace.metrics import evaluate_forecasts


def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path:
return (
root
/ f"year={cutoff.year:04d}"
/ f"month={cutoff.month:02d}"
/ f"day={cutoff.day:02d}"
)


def run_evaluation(
cutoff: str | pd.Timestamp,
model: str,
h: int = 24,
max_window_size: int = 48,
) -> Path:
cutoff = pd.Timestamp(cutoff)

data = CENACEData(
base_path=PROCESSED_EVENTS_HOURLY_DIR,
freq="hourly",
h=h,
max_window_size=max_window_size,
)

forecast_path = (
FORECASTS_HOURLY_DIR
/ model
/ f"year={cutoff.year:04d}"
/ f"month={cutoff.month:02d}"
/ f"day={cutoff.day:02d}"
/ "forecasts.parquet"
)

actuals = data.get_actuals(cutoff, h=h)
forecasts = pd.read_parquet(forecast_path)

merged = forecasts.merge(actuals, on=["unique_id", "ds"], how="inner")
if merged.empty:
raise ValueError("Merged forecasts/actuals is empty")

metrics = evaluate_forecasts(merged)

eval_root = EVALUATIONS_HOURLY_DIR / model
out_dir = cutoff_partition(eval_root, cutoff)
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "metrics.parquet"

metrics.to_parquet(out_path, index=False)
return out_path


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--cutoff", required=True)
parser.add_argument("--model", required=True)
parser.add_argument("--h", type=int, default=24)
parser.add_argument("--max-window-size", type=int, default=48)
return parser.parse_args()


def main() -> None:
args = parse_args()
out_path = run_evaluation(
cutoff=args.cutoff,
model=args.model,
h=args.h,
max_window_size=args.max_window_size,
)
metrics = pd.read_parquet(out_path)
print(f"Saved metrics: {out_path}")
print(metrics.head())
print(metrics.shape)


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions src/evaluation/cenace/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

import pandas as pd


def mae(y_true: pd.Series, y_pred: pd.Series) -> float:
return (y_true - y_pred).abs().mean()


def rmse(y_true: pd.Series, y_pred: pd.Series) -> float:
return ((y_true - y_pred) ** 2).mean() ** 0.5


def smape(y_true: pd.Series, y_pred: pd.Series) -> float:
denom = (y_true.abs() + y_pred.abs()) / 2
out = (y_true - y_pred).abs() / denom
out = out.where(denom != 0, 0.0)
return 100 * out.mean()


def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame:
per_uid = (
merged.groupby("unique_id", as_index=False)
.apply(
lambda g: pd.Series(
{
"mae": mae(g["y"], g["y_hat"]),
"rmse": rmse(g["y"], g["y_hat"]),
"smape": smape(g["y"], g["y_hat"]),
}
)
)
.reset_index(drop=True)
)
Comment on lines +21 to +34
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evaluate_forecasts groups by unique_id and then calls .apply(...).reset_index(drop=True). With pandas groupby.apply, the group key is often placed in the index; dropping the index here can remove unique_id entirely from the returned DataFrame, producing metrics with no identifier. Prefer groupby('unique_id').agg(...) or reset_index() (without drop=True) to guarantee unique_id is preserved.

Copilot uses AI. Check for mistakes.

overall = pd.DataFrame(
[
{
"unique_id": "__overall__",
"mae": mae(merged["y"], merged["y_hat"]),
"rmse": rmse(merged["y"], merged["y_hat"]),
"smape": smape(merged["y"], merged["y_hat"]),
}
]
)

return pd.concat([per_uid, overall], ignore_index=True)
Empty file added src/forecast/cenace/__init__.py
Empty file.
Loading
Loading