diff --git a/orion/pipelines/pretrained/chronos2/chronos2.json b/orion/pipelines/pretrained/chronos2/chronos2.json new file mode 100644 index 00000000..e6ad52d0 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2.json @@ -0,0 +1,36 @@ +{ + "primitives": [ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate", + "sklearn.impute.SimpleImputer", + "mlstars.custom.timeseries_preprocessing.rolling_window_sequences", + "orion.primitives.chronos2.Chronos2", + "orion.primitives.timeseries_errors.regression_errors", + "orion.primitives.timeseries_anomalies.find_anomalies" + ], + "init_params": { + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600, + "method": "mean" + }, + "mlstars.custom.timeseries_preprocessing.rolling_window_sequences#1": { + "target_column": 0, + "window_size": 250 + }, + "orion.primitives.timeseries_anomalies.find_anomalies#1": { + "window_size_portion": 0.33, + "window_step_size_portion": 0.1, + "fixed_threshold": true + } + }, + "input_names": { + "orion.primitives.timeseries_anomalies.find_anomalies#1": { + "index": "target_index" + } + }, + "output_names": { + "orion.primitives.timeseries_anomalies.find_anomalies#1": { + "y": "anomalies" + } + } +} \ No newline at end of file diff --git a/orion/pipelines/pretrained/chronos2/chronos2_artificialwithanomaly.json b/orion/pipelines/pretrained/chronos2/chronos2_artificialwithanomaly.json new file mode 100644 index 00000000..eebcc81d --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_artificialwithanomaly.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_msl.json b/orion/pipelines/pretrained/chronos2/chronos2_msl.json new file mode 100644 index 00000000..e4fe0c11 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_msl.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_realadexchange.json b/orion/pipelines/pretrained/chronos2/chronos2_realadexchange.json new file mode 100644 index 00000000..6b8aac0a --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_realadexchange.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 3600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_realawscloudwatch.json b/orion/pipelines/pretrained/chronos2/chronos2_realawscloudwatch.json new file mode 100644 index 00000000..eebcc81d --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_realawscloudwatch.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_realtraffic.json b/orion/pipelines/pretrained/chronos2/chronos2_realtraffic.json new file mode 100644 index 00000000..eebcc81d --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_realtraffic.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_realtweets.json b/orion/pipelines/pretrained/chronos2/chronos2_realtweets.json new file mode 100644 index 00000000..eebcc81d --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_realtweets.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_smap.json b/orion/pipelines/pretrained/chronos2/chronos2_smap.json new file mode 100644 index 00000000..e4fe0c11 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_smap.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_ucr.json b/orion/pipelines/pretrained/chronos2/chronos2_ucr.json new file mode 100644 index 00000000..f3ca6b04 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_ucr.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 300 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_yahooa1.json b/orion/pipelines/pretrained/chronos2/chronos2_yahooa1.json new file mode 100644 index 00000000..0fdb0776 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_yahooa1.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 1 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_yahooa2.json b/orion/pipelines/pretrained/chronos2/chronos2_yahooa2.json new file mode 100644 index 00000000..0fdb0776 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_yahooa2.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 1 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_yahooa3.json b/orion/pipelines/pretrained/chronos2/chronos2_yahooa3.json new file mode 100644 index 00000000..0fdb0776 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_yahooa3.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 1 + } +} diff --git a/orion/pipelines/pretrained/chronos2/chronos2_yahooa4.json b/orion/pipelines/pretrained/chronos2/chronos2_yahooa4.json new file mode 100644 index 00000000..0fdb0776 --- /dev/null +++ b/orion/pipelines/pretrained/chronos2/chronos2_yahooa4.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 1 + } +} diff --git a/orion/primitives/chronos2.py b/orion/primitives/chronos2.py new file mode 100644 index 00000000..de9c4aab --- /dev/null +++ b/orion/primitives/chronos2.py @@ -0,0 +1,102 @@ +""" +This primitive an implementation of Amazon's Chronos2 model for timeseries forecasting. + +The model implementation can be found at +https://huggingface.co/amazon/chronos-2 + +Note: This primitive assumes that Chronos2 doesn't care about specific timestamps +of the data. We fill in the timestamps with a linear sequence of timestamps in order +for the model to work. +""" + +import numpy as np +import pandas as pd +import torch + +from chronos import Chronos2Pipeline + + +class Chronos2: + """Chronos2 model for timeseries forecasting. + + Args: + pred_len (int): + Prediction horizon length. Default to 1. + repo_id (str): + Directory of the model checkpoint. Default to "amazon/chronos-2" + batch_size(int): + Size of one batch. Default to 32. + target (int): + Index of target column in multivariate case. Default to 0. + start_time (datetime): + Start time of the timeseries. Default to Jan 1, 2020 00:00:00. + time_interval (int): + Time interval between two samples in seconds. Default to 600. + """ + + def __init__(self, + pred_len=1, + repo_id="amazon/chronos-2", + batch_size=32, + target=0, + start_time=pd.to_datetime("2000-01-01 00:00:00"), + time_interval=600): + + self.pred_len = pred_len + self.batch_size = batch_size + self.target = f"{target}" + self.start_time = start_time + self.time_interval = pd.Timedelta(seconds=time_interval) + + device = "cuda" if torch.cuda.is_available() else "cpu" + self.model = Chronos2Pipeline.from_pretrained(repo_id, device_map=device) + + def predict(self, X, force=False): + """Forecasting timeseries + + Args: + X (ndarray): + input timeseries with shape (n_windows, window_size, n_features). + Return: + ndarray: + forecasted timeseries. + """ + n_windows, window_size, n_features = X.shape + + outs = [] + + for i in range(0, n_windows, self.batch_size): + x_batch = self.convert_to_df(X[i:i + self.batch_size], start_batch_at=i) + y_batch = self.model.predict_df( + df=x_batch, + prediction_length=self.pred_len, + quantile_levels=[0.5], + id_column="item_id", + timestamp_column="timestamp", + target=self.target, + ) + + y_batch = y_batch.sort_values(["item_id", "timestamp"]) + preds = np.stack( + y_batch.groupby("item_id", sort=False)["predictions"] + .apply(lambda s: s.to_numpy()) + .to_list() + ) + outs.append(preds) + + return np.concatenate(outs, axis=0) + + def convert_to_df(self, x_batch, start_batch_at=0): + n_windows_in_batch, window_size, n_features = x_batch.shape + + rows = [] + for window in range(n_windows_in_batch): + for data_entry in range(window_size): + rows.append({ + "timestamp": self.start_time + self.time_interval * data_entry, + "item_id": f"window_{start_batch_at + window}", + **{f"{i}": x_batch[window, data_entry, i] for i in range(n_features)} + }) + + rows = pd.DataFrame(rows) + return rows diff --git a/orion/primitives/jsons/orion.primitives.chronos2.Chronos2.json b/orion/primitives/jsons/orion.primitives.chronos2.Chronos2.json new file mode 100644 index 00000000..1fdc5a29 --- /dev/null +++ b/orion/primitives/jsons/orion.primitives.chronos2.Chronos2.json @@ -0,0 +1,58 @@ +{ + "name": "orion.primitives.chronos2.Chronos2", + "contributors": [ + "Allen Baranov " + ], + "documentation": "https://huggingface.co/amazon/chronos-2", + "description": "Amazon Chronos2 model for timeseries forecasting", + "classifiers": { + "type": "estimator", + "subtype": "regressor" + }, + "modalities": [], + "primitive": "orion.primitives.chronos2.Chronos2", + "produce": { + "method": "predict", + "args": [ + { + "name": "X", + "type": "ndarray" + }, + { + "name": "force", + "type": "bool", + "default": false + } + ], + "output": [ + { + "name": "y_hat", + "type": "ndarray" + } + ] + }, + "hyperparameters": { + "fixed": { + "pred_len": { + "type": "int", + "default": 1 + }, + "repo_id": { + "type": "str", + "default": "amazon/chronos-2" + }, + "batch_size": { + "type": "int", + "default": 32 + }, + "target": { + "type": "int", + "default": 0 + }, + "time_interval": { + "type": "int", + "default": 600 + } + } + } +} diff --git a/setup.py b/setup.py index 45858daf..3341ecd3 100644 --- a/setup.py +++ b/setup.py @@ -48,6 +48,9 @@ "timesfm[torch]>=1.2.0,<1.5;python_version>='3.11'", "jax;python_version>='3.11'", + #chronos2 + 'chronos-forecasting>=2.2.0,<2.3.0', + 'wrapt>=1.14,<1.15', ] diff --git a/tutorials/pipelines/chronos2.ipynb b/tutorials/pipelines/chronos2.ipynb new file mode 100644 index 00000000..48f1531b --- /dev/null +++ b/tutorials/pipelines/chronos2.ipynb @@ -0,0 +1,413 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from orion.data import load_signal, load_anomalies" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 1. Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "signal_name = 'multivariate/S-1'\n", + "\n", + "data = load_signal(signal_name)\n", + "data = data[8000:10000]\n", + "\n", + "data.head()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 2. Pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from mlblocks import MLPipeline\n", + "\n", + "pipeline_name = 'chronos2'\n", + "\n", + "pipeline = MLPipeline(pipeline_name)\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Hyperparameters\n", + "\n", + "The Chronos2 pipeline can be customized with the following hyperparameters:\n", + "\n", + "| Primitive | Parameter | Default | Description |\n", + "|-----------|-----------|---------|-------------|\n", + "| time_segments_aggregate | `interval` | 21600 | Aggregation interval in seconds |\n", + "| time_segments_aggregate | `method` | \"mean\" | Aggregation method (mean, median, sum) |\n", + "| rolling_window_sequences | `window_size` | 250 | Context window size |\n", + "| **Chronos2** | `pred_len` | 1 | Prediction horizon length |\n", + "| **Chronos2** | `batch_size` | 32 | Batch size for inference |\n", + "| **Chronos2** | `target` | 0 | Target column index (multivariate) |\n", + "| find_anomalies | `window_size_portion` | 0.33 | Portion of data for window |\n", + "| find_anomalies | `fixed_threshold` | True | Use fixed vs dynamic threshold |" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "hyperparameters = {\n", + " \"mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1\": {\n", + " \"time_column\": \"timestamp\",\n", + " \"interval\": 21600, \n", + " \"method\": \"mean\" \n", + " },\n", + " \n", + " \"mlstars.custom.timeseries_preprocessing.rolling_window_sequences#1\": {\n", + " \"target_column\": 0,\n", + " \"window_size\": 250\n", + " },\n", + " \n", + " \"orion.primitives.chronos2.Chronos2#1\": {\n", + " \"pred_len\": 1, \n", + " \"batch_size\": 32, \n", + " \"target\": 0, \n", + " },\n", + " \n", + " \"orion.primitives.timeseries_anomalies.find_anomalies#1\": {\n", + " \"window_size_portion\": 0.33,\n", + " \"window_step_size_portion\": 0.1,\n", + " \"fixed_threshold\": True\n", + " }\n", + "}\n", + "\n", + "pipeline.set_hyperparameters(hyperparameters)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## step by step execution\n", + "\n", + "MLPipelines are compose of a squence of primitives, these primitives apply tranformation and calculation operations to the data and updates the variables within the pipeline. To view the primitives used by the pipeline, we access its `primtivies` attribute. \n", + "\n", + "The `UniTS` contains 6 primitives. we will observe how the `context` (which are the variables held within the pipeline) are updated after the execution of each primitive." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline.primitives" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### time segments aggregate\n", + "this primitive creates an equi-spaced time series by aggregating values over fixed specified interval.\n", + "\n", + "* **input**: `X` which is an n-dimensional sequence of values.\n", + "* **output**:\n", + " - `X` sequence of aggregated values, one column for each aggregation method.\n", + " - `index` sequence of index values (first index of each aggregated segment)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "context = pipeline.fit(data, output_=0)\n", + "context.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for i, x in list(zip(context['index'], context['X']))[:5]:\n", + " print(\"entry at {} has value {}\".format(i, x))" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### SimpleImputer\n", + "this primitive is an imputation transformer for filling missing values.\n", + "* **input**: `X` which is an n-dimensional sequence of values.\n", + "* **output**: `X` which is a transformed version of X." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step = 1\n", + "\n", + "context = pipeline.fit(**context, output_=step, start_=step)\n", + "context.keys()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### rolling window sequence\n", + "this primitive generates many sub-sequences of the original sequence. it uses a rolling window approach to create the sub-sequences out of time series data.\n", + "\n", + "* **input**: \n", + " - `X` n-dimensional sequence to iterate over.\n", + " - `index` array containing the index values of X.\n", + "* **output**:\n", + " - `X` input sequences.\n", + " - `y` target sequences.\n", + " - `index` first index value of each input sequence.\n", + " - `target_index` first index value of each target sequence." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step = 2\n", + "\n", + "context = pipeline.fit(**context, output_=step, start_=step)\n", + "context.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# after slicing X into multiple sub-sequences\n", + "# we obtain a 3 dimensional matrix X where\n", + "# the shape indicates (# slices, window size, 1)\n", + "# and similarly y is (# slices, target size)\n", + "\n", + "print(\"X shape = {}\\ny shape = {}\\nindex shape = {}\\ntarget index shape = {}\".format(\n", + " context['X'].shape, context['y'].shape, context['index'].shape, context['target_index'].shape))" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Chronos-2\n", + "This is the forecasting step using Amazon Chronos-2 Time Series Foundation Model. You can read more about it in the [related paper](https://arxiv.org/abs/2510.15821). The [Huggingface Repo](https://huggingface.co/amazon/chronos-2) has additional helpful information about the use of the model. This is a multivariate model that does single target predictions.\n", + "\n", + "* **input**: \n", + " - `X` n-dimensional array containing the input sequences for the model.\n", + "* **output**: \n", + " - `y_hat` predicted values for target column" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step = 3\n", + "\n", + "context = pipeline.fit(**context, output_=step, start_=step)\n", + "context.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "context['y_hat'].shape" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### regression errors\n", + "\n", + "this primitive computes an array of errors comparing predictions and expected output.\n", + "\n", + "* **input**: \n", + " - `y` ground truth.\n", + " - `y_hat` forecasted values.\n", + "* **output**: `errors` array of errors." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step = 4\n", + "\n", + "context = pipeline.fit(**context, output_=step, start_=step)\n", + "context.keys()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### find anomalies\n", + "\n", + "this primitive finds anomalies from sequence of errors\n", + "\n", + "* **input**: \n", + " - `errors` array of errors\n", + " - `target_index` indices\n", + "* **output**: `anomalies`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "step = 5\n", + "\n", + "context = pipeline.fit(**context, output_=step, start_=step)\n", + "context.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "context['anomalies']" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Evaluate performance\n", + "\n", + "In this next step we will load some already known anomalous intervals and evaluate how\n", + "good our anomaly detection was by comparing those with our detected intervals.\n", + "\n", + "For this, we will first load the known anomalies for the signal that we are using:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from orion.data import load_anomalies\n", + "\n", + "ground_truth = load_anomalies('S-1')\n", + "ground_truth" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "anomalies = []\n", + "for ano in context['anomalies']:\n", + " anomalies.append((ano[0], ano[1]))\n", + "anomalies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from orion.evaluation import contextual_confusion_matrix, contextual_f1_score\n", + "\n", + "start, end = context['index'][0], context['index'][-1]\n", + "\n", + "contextual_confusion_matrix(ground_truth, anomalies, start = start, end = end, weighted=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "contextual_f1_score(ground_truth, anomalies, start = start, end = end, weighted=False)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "test_orion_dependencies", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.0" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +}