Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 65 additions & 52 deletions src/chronos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def predict_df(
target: str = "target",
prediction_length: int | None = None,
quantile_levels: list[float] = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
batch_size: int = 256,
validate_inputs: bool = True,
freq: str | None = None,
**predict_kwargs,
Expand All @@ -165,6 +166,8 @@ def predict_df(
Number of steps to predict for each time series
quantile_levels
Quantile levels to compute
batch_size
The number of time series to predict in a single forward pass, by default 256
validate_inputs
[ADVANCED] When True (default), validates dataframes before prediction. Setting to False removes the
validation overhead, but may silently lead to wrong predictions if data is misformatted. When False, you
Expand Down Expand Up @@ -212,25 +215,30 @@ def predict_df(
df, prediction_length, freq=freq, id_column=id_column, timestamp_column=timestamp_column
)

# Generate forecasts
quantiles, mean = self.predict_quantiles(
inputs=context,
prediction_length=prediction_length,
quantile_levels=quantile_levels,
limit_prediction_length=False,
**predict_kwargs,
)
# Generate forecasts in batches of at most `batch_size` series to bound memory usage.
quantiles_all = []
mean_all = []
for start in range(0, len(context), batch_size):
quantiles, mean = self.predict_quantiles(
inputs=context[start : start + batch_size],
prediction_length=prediction_length,
quantile_levels=quantile_levels,
limit_prediction_length=False,
**predict_kwargs,
)
quantiles_all.append(quantiles.numpy())
mean_all.append(mean.numpy())

quantiles_np = quantiles.numpy() # [n_series, horizon, num_quantiles]
mean_np = mean.numpy() # [n_series, horizon]
quantiles_np = np.concatenate(quantiles_all, axis=0) # [n_series, horizon, num_quantiles]
mean_np = np.concatenate(mean_all, axis=0) # [n_series, horizon]

# `future` has prediction_length rows per item, in the same item order as the predictions,
# so it lines up with `mean` / `quantiles` directly (single target, no per-variate repeat).
result = future.copy()
result["target_name"] = target
result["predictions"] = mean_np.ravel()

quantiles_flat = quantiles_np.reshape(-1, len(quantile_levels))
quantiles_flat = quantiles_np.reshape(len(result), len(quantile_levels))
for q_idx, q_level in enumerate(quantile_levels):
result[str(q_level)] = quantiles_flat[:, q_idx]

Expand Down Expand Up @@ -258,68 +266,73 @@ def predict_fev(
inference_time_s
Total time that it took to make predictions for all windows (in seconds).
"""
import datasets

try:
import fev
except ImportError:
raise ImportError("fev is required for predict_fev. Please install it with `pip install fev`.")

def batchify(lst: list, batch_size: int = 32):
"""Convert list into batches of desired size."""
for i in range(0, len(lst), batch_size):
yield lst[i : i + batch_size]

# `predict_df` puts `predict_quantiles`'s `mean` in the "predictions" column. For point-forecast metrics
# the mean is the right target; for the others the median (0.5 quantile) is, so we request it and swap it in.
use_median_point_forecast = task.eval_metric not in ["MSE", "RMSE", "RMSSE"]
quantile_levels = task.quantile_levels.copy()
if 0.5 not in quantile_levels:
if use_median_point_forecast and 0.5 not in quantile_levels:
quantile_levels.append(0.5)

predictions_per_window = []
inference_time_s = 0.0
for window in task.iter_windows():
past_data, _ = fev.convert_input_data(window, adapter="datasets", as_univariate=True)
past_data = past_data.with_format("torch").cast_column(
"target", datasets.Sequence(datasets.Value("float32"))
)

quantiles_all = []
mean_all = []
# Base pipelines are univariate, so we always split multivariate targets and drop covariates.
past_df, _, _ = self._fev_window_to_df(window, as_univariate=True)

start_time = time.monotonic()
for batch in batchify(past_data["target"], batch_size=batch_size):
quantiles, mean = self.predict_quantiles(
inputs=batch,
prediction_length=task.horizon,
limit_prediction_length=False,
**kwargs,
quantile_levels=quantile_levels,
)

quantiles_all.append(quantiles.numpy())
mean_all.append(mean.numpy())

forecast_df = self.predict_df(
past_df,
id_column=window.id_column,
timestamp_column=window.timestamp_column,
target="target",
prediction_length=task.horizon,
quantile_levels=quantile_levels,
batch_size=batch_size,
**kwargs,
)
inference_time_s += time.monotonic() - start_time

quantiles_np = np.concatenate(quantiles_all, axis=0) # [num_items, horizon, num_quantiles]
mean_np = np.concatenate(mean_all, axis=0) # [num_items, horizon]

if task.eval_metric in ["MSE", "RMSE", "RMSSE"]:
point_forecast = mean_np # [num_items, horizon]
else:
# use median as the point forecast
point_forecast = quantiles_np[:, :, quantile_levels.index(0.5)] # [num_items, horizon]
predictions_dict = {"predictions": point_forecast}

for idx, level in enumerate(task.quantile_levels):
predictions_dict[str(level)] = quantiles_np[:, :, idx]
if use_median_point_forecast:
forecast_df["predictions"] = forecast_df["0.5"]

predictions_per_window.append(
fev.utils.combine_univariate_predictions_to_multivariate(
datasets.Dataset.from_dict(predictions_dict), target_columns=task.target_columns
fev.utils.convert_forecast_df_to_predictions(
forecast_df,
horizon=task.horizon,
quantile_levels=task.quantile_levels,
target_columns=task.target_columns,
)
)
return predictions_per_window, inference_time_s

@staticmethod
def _fev_window_to_df(
window: "fev.EvaluationWindow", as_univariate: bool
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame], List[str]]:
"""Convert a fev evaluation window into the (past_df, future_df, target_columns) inputs for `predict_df`."""
import fev

past_df, future_df, _ = fev.convert_input_data(window, adapter="pandas", as_univariate=as_univariate)

if as_univariate:
# `as_univariate=True` splits multivariate targets into separate univariate series. The adapter keeps
# the covariate columns, so we drop them here to predict each target independently and ignore covariates.
past_df = past_df[[window.id_column, window.timestamp_column, "target"]]
future_df = None
target_columns = ["target"]
else:
# The pandas adapter's future_df only contains the known-future covariates; pass None when there are none.
if not window.known_dynamic_columns:
future_df = None
target_columns = list(window.target_columns)

return past_df, future_df, target_columns

@classmethod
def from_pretrained(
cls,
Expand Down
113 changes: 8 additions & 105 deletions src/chronos/chronos2/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import math
from enum import Enum
from typing import TYPE_CHECKING, Any, Iterator, Mapping, Sequence, TypeAlias, cast
from typing import Any, Iterator, Mapping, Sequence, TypeAlias, cast

import numpy as np
import torch
Expand All @@ -18,16 +18,8 @@
"Chronos2Dataset",
"DatasetMode",
"PreparedInput",
"convert_fev_window_to_list_of_dicts_input",
"left_pad_and_cat_2D",
"validate_prepared_schema",
]

if TYPE_CHECKING:
import datasets
import fev


TensorOrArray: TypeAlias = torch.Tensor | np.ndarray


Expand Down Expand Up @@ -79,102 +71,6 @@ def validate_prepared_schema(prepared_input: Any) -> None:
)


def _cast_fev_features(
past_data: "datasets.Dataset",
future_data: "datasets.Dataset",
target_columns: list[str],
past_dynamic_columns: list[str],
known_dynamic_columns: list[str],
) -> tuple["datasets.Dataset", "datasets.Dataset"]:
import datasets

dynamic_columns = [*past_dynamic_columns, *known_dynamic_columns]
cat_cols = []
for col in dynamic_columns:
item = past_data[0][col]
if not np.issubdtype(item.dtype, np.number):
cat_cols.append(col)

numeric_cols = target_columns + list(set(dynamic_columns) - set(cat_cols))
past_feature_updates = {col: datasets.Sequence(datasets.Value("float64")) for col in numeric_cols} | {
col: datasets.Sequence(datasets.Value("string")) for col in cat_cols
}
past_data_features = past_data.features
past_data_features.update(past_feature_updates)
past_data = past_data.cast(past_data_features)

future_cat_cols = [k for k in cat_cols if k in known_dynamic_columns]
future_numeric_cols = list(set(known_dynamic_columns) - set(future_cat_cols))
future_feature_updates = {col: datasets.Sequence(datasets.Value("float64")) for col in future_numeric_cols} | {
col: datasets.Sequence(datasets.Value("string")) for col in future_cat_cols
}
future_data_features = future_data.features
future_data_features.update(future_feature_updates)
future_data = future_data.cast(future_data_features)

return past_data, future_data


def convert_fev_window_to_list_of_dicts_input(
window: "fev.EvaluationWindow", as_univariate: bool
) -> tuple[list[dict[str, np.ndarray | dict[str, np.ndarray]]], list[str], list[str], list[str]]:
import fev

if as_univariate:
past_data, future_data = fev.convert_input_data(window, adapter="datasets", as_univariate=True)
target_columns = ["target"]
past_dynamic_columns = []
known_dynamic_columns = []
else:
past_data, future_data = window.get_input_data()
target_columns = window.target_columns
past_dynamic_columns = window.past_dynamic_columns
known_dynamic_columns = window.known_dynamic_columns

past_data, future_data = _cast_fev_features(
past_data=past_data,
future_data=future_data,
target_columns=target_columns,
past_dynamic_columns=past_dynamic_columns,
known_dynamic_columns=known_dynamic_columns,
)

num_series: int = len(past_data)
num_past_covariates: int = len(past_dynamic_columns)
num_future_covariates: int = len(known_dynamic_columns)

# We use numpy format because torch does not support str covariates
target_data = past_data.select_columns(target_columns).with_format("numpy")
# past of past-only and known-future covariates
dynamic_columns = [*past_dynamic_columns, *known_dynamic_columns]
past_covariate_data = past_data.select_columns(dynamic_columns).with_format("numpy")
future_known_data = future_data.select_columns(known_dynamic_columns).with_format("numpy")

if num_past_covariates + num_future_covariates > 0:
assert len(past_covariate_data) == num_series
if num_future_covariates > 0:
assert len(future_known_data) == num_series

inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = []
for idx, target_row in enumerate(target_data):
target_row = cast(dict, target_row)
# this assumes that the targets have the same length for multivariate tasks
target_tensor_i = np.stack([target_row[col] for col in target_columns])
entry: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_tensor_i}

if len(dynamic_columns) > 0:
past_covariate_row = past_covariate_data[idx]
entry["past_covariates"] = {col: past_covariate_row[col] for col in dynamic_columns}

if len(known_dynamic_columns) > 0:
future_known_row = future_known_data[idx]
entry["future_covariates"] = {col: future_known_row[col] for col in known_dynamic_columns}

inputs.append(entry)

return inputs, target_columns, past_dynamic_columns, known_dynamic_columns


class DatasetMode(str, Enum):
TRAIN = "train"
VALIDATION = "validation"
Expand Down Expand Up @@ -441,3 +337,10 @@ def convert_tensor_input_to_list_of_dicts_input(*args, **kwargs):
"`convert_tensor_input_to_list_of_dicts_input` has been deprecated. "
"Please use `chronos.chronos2.preprocess.from_tensor` instead."
)


def convert_fev_window_to_list_of_dicts_input(*args, **kwargs):
raise RuntimeError(
"`convert_fev_window_to_list_of_dicts_input` has been deprecated. "
"`predict_fev` now builds inputs directly from `fev.convert_input_data`."
)
Loading
Loading