diff --git a/src/chronos/base.py b/src/chronos/base.py index 42321a30..b023695c 100644 --- a/src/chronos/base.py +++ b/src/chronos/base.py @@ -142,6 +142,7 @@ def predict_df( target: str = "target", prediction_length: int | None = None, quantile_levels: list[float] = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], + batch_size: int = 256, validate_inputs: bool = True, freq: str | None = None, **predict_kwargs, @@ -165,6 +166,8 @@ def predict_df( Number of steps to predict for each time series quantile_levels Quantile levels to compute + batch_size + The number of time series to predict in a single forward pass, by default 256 validate_inputs [ADVANCED] When True (default), validates dataframes before prediction. Setting to False removes the validation overhead, but may silently lead to wrong predictions if data is misformatted. When False, you @@ -212,17 +215,22 @@ def predict_df( df, prediction_length, freq=freq, id_column=id_column, timestamp_column=timestamp_column ) - # Generate forecasts - quantiles, mean = self.predict_quantiles( - inputs=context, - prediction_length=prediction_length, - quantile_levels=quantile_levels, - limit_prediction_length=False, - **predict_kwargs, - ) + # Generate forecasts in batches of at most `batch_size` series to bound memory usage. + quantiles_all = [] + mean_all = [] + for start in range(0, len(context), batch_size): + quantiles, mean = self.predict_quantiles( + inputs=context[start : start + batch_size], + prediction_length=prediction_length, + quantile_levels=quantile_levels, + limit_prediction_length=False, + **predict_kwargs, + ) + quantiles_all.append(quantiles.numpy()) + mean_all.append(mean.numpy()) - quantiles_np = quantiles.numpy() # [n_series, horizon, num_quantiles] - mean_np = mean.numpy() # [n_series, horizon] + quantiles_np = np.concatenate(quantiles_all, axis=0) # [n_series, horizon, num_quantiles] + mean_np = np.concatenate(mean_all, axis=0) # [n_series, horizon] # `future` has prediction_length rows per item, in the same item order as the predictions, # so it lines up with `mean` / `quantiles` directly (single target, no per-variate repeat). @@ -230,7 +238,7 @@ def predict_df( result["target_name"] = target result["predictions"] = mean_np.ravel() - quantiles_flat = quantiles_np.reshape(-1, len(quantile_levels)) + quantiles_flat = quantiles_np.reshape(len(result), len(quantile_levels)) for q_idx, q_level in enumerate(quantile_levels): result[str(q_level)] = quantiles_flat[:, q_idx] @@ -258,68 +266,73 @@ def predict_fev( inference_time_s Total time that it took to make predictions for all windows (in seconds). """ - import datasets - try: import fev except ImportError: raise ImportError("fev is required for predict_fev. Please install it with `pip install fev`.") - def batchify(lst: list, batch_size: int = 32): - """Convert list into batches of desired size.""" - for i in range(0, len(lst), batch_size): - yield lst[i : i + batch_size] - + # `predict_df` puts `predict_quantiles`'s `mean` in the "predictions" column. For point-forecast metrics + # the mean is the right target; for the others the median (0.5 quantile) is, so we request it and swap it in. + use_median_point_forecast = task.eval_metric not in ["MSE", "RMSE", "RMSSE"] quantile_levels = task.quantile_levels.copy() - if 0.5 not in quantile_levels: + if use_median_point_forecast and 0.5 not in quantile_levels: quantile_levels.append(0.5) predictions_per_window = [] inference_time_s = 0.0 for window in task.iter_windows(): - past_data, _ = fev.convert_input_data(window, adapter="datasets", as_univariate=True) - past_data = past_data.with_format("torch").cast_column( - "target", datasets.Sequence(datasets.Value("float32")) - ) - - quantiles_all = [] - mean_all = [] + # Base pipelines are univariate, so we always split multivariate targets and drop covariates. + past_df, _, _ = self._fev_window_to_df(window, as_univariate=True) start_time = time.monotonic() - for batch in batchify(past_data["target"], batch_size=batch_size): - quantiles, mean = self.predict_quantiles( - inputs=batch, - prediction_length=task.horizon, - limit_prediction_length=False, - **kwargs, - quantile_levels=quantile_levels, - ) - - quantiles_all.append(quantiles.numpy()) - mean_all.append(mean.numpy()) - + forecast_df = self.predict_df( + past_df, + id_column=window.id_column, + timestamp_column=window.timestamp_column, + target="target", + prediction_length=task.horizon, + quantile_levels=quantile_levels, + batch_size=batch_size, + **kwargs, + ) inference_time_s += time.monotonic() - start_time - quantiles_np = np.concatenate(quantiles_all, axis=0) # [num_items, horizon, num_quantiles] - mean_np = np.concatenate(mean_all, axis=0) # [num_items, horizon] - - if task.eval_metric in ["MSE", "RMSE", "RMSSE"]: - point_forecast = mean_np # [num_items, horizon] - else: - # use median as the point forecast - point_forecast = quantiles_np[:, :, quantile_levels.index(0.5)] # [num_items, horizon] - predictions_dict = {"predictions": point_forecast} - - for idx, level in enumerate(task.quantile_levels): - predictions_dict[str(level)] = quantiles_np[:, :, idx] + if use_median_point_forecast: + forecast_df["predictions"] = forecast_df["0.5"] predictions_per_window.append( - fev.utils.combine_univariate_predictions_to_multivariate( - datasets.Dataset.from_dict(predictions_dict), target_columns=task.target_columns + fev.utils.convert_forecast_df_to_predictions( + forecast_df, + horizon=task.horizon, + quantile_levels=task.quantile_levels, + target_columns=task.target_columns, ) ) return predictions_per_window, inference_time_s + @staticmethod + def _fev_window_to_df( + window: "fev.EvaluationWindow", as_univariate: bool + ) -> Tuple[pd.DataFrame, Optional[pd.DataFrame], List[str]]: + """Convert a fev evaluation window into the (past_df, future_df, target_columns) inputs for `predict_df`.""" + import fev + + past_df, future_df, _ = fev.convert_input_data(window, adapter="pandas", as_univariate=as_univariate) + + if as_univariate: + # `as_univariate=True` splits multivariate targets into separate univariate series. The adapter keeps + # the covariate columns, so we drop them here to predict each target independently and ignore covariates. + past_df = past_df[[window.id_column, window.timestamp_column, "target"]] + future_df = None + target_columns = ["target"] + else: + # The pandas adapter's future_df only contains the known-future covariates; pass None when there are none. + if not window.known_dynamic_columns: + future_df = None + target_columns = list(window.target_columns) + + return past_df, future_df, target_columns + @classmethod def from_pretrained( cls, diff --git a/src/chronos/chronos2/dataset.py b/src/chronos/chronos2/dataset.py index 31bc1e54..029003e0 100644 --- a/src/chronos/chronos2/dataset.py +++ b/src/chronos/chronos2/dataset.py @@ -5,7 +5,7 @@ import math from enum import Enum -from typing import TYPE_CHECKING, Any, Iterator, Mapping, Sequence, TypeAlias, cast +from typing import Any, Iterator, Mapping, Sequence, TypeAlias, cast import numpy as np import torch @@ -18,16 +18,8 @@ "Chronos2Dataset", "DatasetMode", "PreparedInput", - "convert_fev_window_to_list_of_dicts_input", - "left_pad_and_cat_2D", - "validate_prepared_schema", ] -if TYPE_CHECKING: - import datasets - import fev - - TensorOrArray: TypeAlias = torch.Tensor | np.ndarray @@ -79,102 +71,6 @@ def validate_prepared_schema(prepared_input: Any) -> None: ) -def _cast_fev_features( - past_data: "datasets.Dataset", - future_data: "datasets.Dataset", - target_columns: list[str], - past_dynamic_columns: list[str], - known_dynamic_columns: list[str], -) -> tuple["datasets.Dataset", "datasets.Dataset"]: - import datasets - - dynamic_columns = [*past_dynamic_columns, *known_dynamic_columns] - cat_cols = [] - for col in dynamic_columns: - item = past_data[0][col] - if not np.issubdtype(item.dtype, np.number): - cat_cols.append(col) - - numeric_cols = target_columns + list(set(dynamic_columns) - set(cat_cols)) - past_feature_updates = {col: datasets.Sequence(datasets.Value("float64")) for col in numeric_cols} | { - col: datasets.Sequence(datasets.Value("string")) for col in cat_cols - } - past_data_features = past_data.features - past_data_features.update(past_feature_updates) - past_data = past_data.cast(past_data_features) - - future_cat_cols = [k for k in cat_cols if k in known_dynamic_columns] - future_numeric_cols = list(set(known_dynamic_columns) - set(future_cat_cols)) - future_feature_updates = {col: datasets.Sequence(datasets.Value("float64")) for col in future_numeric_cols} | { - col: datasets.Sequence(datasets.Value("string")) for col in future_cat_cols - } - future_data_features = future_data.features - future_data_features.update(future_feature_updates) - future_data = future_data.cast(future_data_features) - - return past_data, future_data - - -def convert_fev_window_to_list_of_dicts_input( - window: "fev.EvaluationWindow", as_univariate: bool -) -> tuple[list[dict[str, np.ndarray | dict[str, np.ndarray]]], list[str], list[str], list[str]]: - import fev - - if as_univariate: - past_data, future_data = fev.convert_input_data(window, adapter="datasets", as_univariate=True) - target_columns = ["target"] - past_dynamic_columns = [] - known_dynamic_columns = [] - else: - past_data, future_data = window.get_input_data() - target_columns = window.target_columns - past_dynamic_columns = window.past_dynamic_columns - known_dynamic_columns = window.known_dynamic_columns - - past_data, future_data = _cast_fev_features( - past_data=past_data, - future_data=future_data, - target_columns=target_columns, - past_dynamic_columns=past_dynamic_columns, - known_dynamic_columns=known_dynamic_columns, - ) - - num_series: int = len(past_data) - num_past_covariates: int = len(past_dynamic_columns) - num_future_covariates: int = len(known_dynamic_columns) - - # We use numpy format because torch does not support str covariates - target_data = past_data.select_columns(target_columns).with_format("numpy") - # past of past-only and known-future covariates - dynamic_columns = [*past_dynamic_columns, *known_dynamic_columns] - past_covariate_data = past_data.select_columns(dynamic_columns).with_format("numpy") - future_known_data = future_data.select_columns(known_dynamic_columns).with_format("numpy") - - if num_past_covariates + num_future_covariates > 0: - assert len(past_covariate_data) == num_series - if num_future_covariates > 0: - assert len(future_known_data) == num_series - - inputs: list[dict[str, np.ndarray | dict[str, np.ndarray]]] = [] - for idx, target_row in enumerate(target_data): - target_row = cast(dict, target_row) - # this assumes that the targets have the same length for multivariate tasks - target_tensor_i = np.stack([target_row[col] for col in target_columns]) - entry: dict[str, np.ndarray | dict[str, np.ndarray]] = {"target": target_tensor_i} - - if len(dynamic_columns) > 0: - past_covariate_row = past_covariate_data[idx] - entry["past_covariates"] = {col: past_covariate_row[col] for col in dynamic_columns} - - if len(known_dynamic_columns) > 0: - future_known_row = future_known_data[idx] - entry["future_covariates"] = {col: future_known_row[col] for col in known_dynamic_columns} - - inputs.append(entry) - - return inputs, target_columns, past_dynamic_columns, known_dynamic_columns - - class DatasetMode(str, Enum): TRAIN = "train" VALIDATION = "validation" @@ -441,3 +337,10 @@ def convert_tensor_input_to_list_of_dicts_input(*args, **kwargs): "`convert_tensor_input_to_list_of_dicts_input` has been deprecated. " "Please use `chronos.chronos2.preprocess.from_tensor` instead." ) + + +def convert_fev_window_to_list_of_dicts_input(*args, **kwargs): + raise RuntimeError( + "`convert_fev_window_to_list_of_dicts_input` has been deprecated. " + "`predict_fev` now builds inputs directly from `fev.convert_input_data`." + ) diff --git a/src/chronos/chronos2/pipeline.py b/src/chronos/chronos2/pipeline.py index 10e42207..04e426ba 100644 --- a/src/chronos/chronos2/pipeline.py +++ b/src/chronos/chronos2/pipeline.py @@ -955,82 +955,12 @@ def predict_df( result["target_name"] = np.tile(np.repeat(target, prediction_length), n_inputs) result["predictions"] = mean_np.ravel() - quantiles_flat = quantiles_np.reshape(-1, len(quantile_levels)) + quantiles_flat = quantiles_np.reshape(len(result), len(quantile_levels)) for q_idx, q_level in enumerate(quantile_levels): result[str(q_level)] = quantiles_flat[:, q_idx] return result - def _predict_fev_window( - self, - window: "fev.EvaluationWindow", - quantile_levels: list[float], - batch_size: int, - as_univariate: bool, - **predict_kwargs, - ) -> tuple["datasets.DatasetDict", float]: - import datasets - import fev - - from chronos.chronos2.dataset import convert_fev_window_to_list_of_dicts_input - - inputs, target_columns, past_dynamic_columns, known_dynamic_columns = ( - convert_fev_window_to_list_of_dicts_input(window=window, as_univariate=as_univariate) - ) - - num_variates: int = len(target_columns) + len(past_dynamic_columns) + len(known_dynamic_columns) - if batch_size < num_variates: - warnings.warn( - f"batch_size ({batch_size}) is smaller than num_variates ({num_variates}) in the task. " - f"Setting batch_size = num_variates = num_targets + num_covariates", - category=UserWarning, - stacklevel=3, - ) - batch_size = num_variates - - start_time = time.monotonic() - - quantiles, mean = self.predict_quantiles( - inputs=inputs, - prediction_length=window.horizon, - quantile_levels=quantile_levels, - limit_prediction_length=False, - batch_size=batch_size, - **predict_kwargs, - ) - # since fev tasks are homogenous, we can safely stack the list of tensors into a single tensor - quantiles_np = torch.stack(quantiles).numpy() # [n_tasks, n_variates, horizon, num_quantiles] - mean_np = torch.stack(mean).numpy() # [n_tasks, n_variates, horizon] - - inference_time_s = time.monotonic() - start_time - - multivariate_forecast: dict[str, dict[str, np.ndarray]] = {variate_name: {} for variate_name in target_columns} - # mean_np is actually the median here - point_forecast = mean_np # [num_items, n_variates, horizon] - - for v_idx, variate_name in enumerate(target_columns): - multivariate_forecast[variate_name]["predictions"] = point_forecast[:, v_idx] - - for q_idx, level in enumerate(quantile_levels): - for v_idx, variate_name in enumerate(target_columns): - multivariate_forecast[variate_name][str(level)] = quantiles_np[:, v_idx, :, q_idx] - - predictions_dict: dict = {} - for variate_name in target_columns: - predictions_dict[variate_name] = datasets.Dataset.from_dict( - { - k: multivariate_forecast[variate_name][k] - for k in ["predictions"] + [str(q) for q in quantile_levels] - } - ) - predictions = datasets.DatasetDict(predictions_dict) - predictions.set_format("numpy") - - if as_univariate: - predictions = fev.utils.combine_univariate_predictions_to_multivariate(predictions, window.target_columns) - - return predictions, inference_time_s - def predict_fev( self, task: "fev.Task", @@ -1063,31 +993,40 @@ def predict_fev( inference_time_s Total time that it took to make predictions for all windows (in seconds) """ - from chronos.chronos2.dataset import convert_fev_window_to_list_of_dicts_input - try: import fev except ImportError: raise ImportError("fev is required for predict_fev. Please install it with `pip install fev`.") + # The number of variates per task is the same across all windows, so we check it once here. + # Covariates are ignored when `as_univariate=True`, so each task then has a single variate. + if as_univariate: + num_variates = 1 + else: + num_variates = len(task.target_columns) + len(task.past_dynamic_columns) + len(task.known_dynamic_columns) + if batch_size < num_variates: + warnings.warn( + f"batch_size ({batch_size}) is smaller than num_variates ({num_variates}) in the task. " + f"Setting batch_size = num_variates = num_targets + num_covariates", + category=UserWarning, + stacklevel=2, + ) + batch_size = num_variates + pipeline = self if finetune_kwargs is not None: # only fine-tune the model on the first window first_window = task.get_window(0) - inputs, target_columns, past_dynamic_columns, known_dynamic_columns = ( - convert_fev_window_to_list_of_dicts_input(window=first_window, as_univariate=as_univariate) + past_df, future_df, target_columns = self._fev_window_to_df(first_window, as_univariate=as_univariate) + inputs = from_data_frame( + past_df, + target_columns=target_columns, + prediction_length=first_window.horizon, + future_df=future_df, + id_column=first_window.id_column, + timestamp_column=first_window.timestamp_column, ) - num_variates: int = len(target_columns) + len(past_dynamic_columns) + len(known_dynamic_columns) - if batch_size < num_variates: - warnings.warn( - f"batch_size ({batch_size}) is smaller than num_variates ({num_variates}) in the task. " - f"Setting batch_size = num_variates = num_targets + num_covariates", - category=UserWarning, - stacklevel=2, - ) - batch_size = num_variates - finetune_kwargs = deepcopy(finetune_kwargs) finetune_kwargs["prediction_length"] = first_window.horizon finetune_kwargs["batch_size"] = finetune_kwargs.get("batch_size", batch_size) @@ -1097,15 +1036,30 @@ def predict_fev( predictions_per_window = [] inference_time_s = 0.0 for window in task.iter_windows(): - predictions, window_inference_time_s = pipeline._predict_fev_window( - window, + past_df, future_df, target_columns = self._fev_window_to_df(window, as_univariate=as_univariate) + + start_time = time.monotonic() + forecast_df = pipeline.predict_df( + past_df, + future_df=future_df, + id_column=window.id_column, + timestamp_column=window.timestamp_column, + target=target_columns, + prediction_length=window.horizon, quantile_levels=task.quantile_levels, batch_size=batch_size, - as_univariate=as_univariate, **kwargs, ) - predictions_per_window.append(predictions) - inference_time_s += window_inference_time_s + inference_time_s += time.monotonic() - start_time + + predictions_per_window.append( + fev.utils.convert_forecast_df_to_predictions( + forecast_df, + horizon=window.horizon, + quantile_levels=task.quantile_levels, + target_columns=window.target_columns, + ) + ) return predictions_per_window, inference_time_s diff --git a/src/chronos/chronos2/preprocess.py b/src/chronos/chronos2/preprocess.py index d96e0cfc..b1ca42e3 100644 --- a/src/chronos/chronos2/preprocess.py +++ b/src/chronos/chronos2/preprocess.py @@ -181,10 +181,13 @@ def from_data_frame( target = df[target_columns].to_numpy(dtype=np.float32, na_value=np.nan).T - # Normalize each past covariate to float32 (numeric) or "category"; this dtype drives encoding. + # Normalize each past covariate to float32 (numeric except bool) or "category"; this dtype drives encoding. # Future columns are passed through raw — _encode_categorical re-maps them onto the past categories. past_covariates = { - c: df[c].astype(np.float32 if ptypes.is_numeric_dtype(df[c]) else "category") for c in covariate_columns + c: df[c].astype( + np.float32 if ptypes.is_numeric_dtype(df[c]) and not ptypes.is_bool_dtype(df[c]) else "category" + ) + for c in covariate_columns } if future_df is not None: diff --git a/src/chronos/df_utils.py b/src/chronos/df_utils.py index 233c95bf..8cf8e840 100644 --- a/src/chronos/df_utils.py +++ b/src/chronos/df_utils.py @@ -124,7 +124,9 @@ def normalize_df( missing = pd.unique(df[id_column][codes < 0]) raise ValueError(f"future_df has ids not present in df: {list(missing)[:5]}") - ts = df[timestamp_column].to_numpy() + # View as int64 (datetime64 is int64-backed) so np.diff yields integers; comparing + # the timedelta64 from np.diff against 0 raises UFuncTypeError on numpy<2.0. + ts = df[timestamp_column].to_numpy().view("int64") code_diff = np.diff(codes) grouped = bool(np.all(code_diff >= 0)) sorted_within = grouped and bool(np.all((np.diff(ts) >= 0) | (code_diff > 0))) diff --git a/test/test_preprocess.py b/test/test_preprocess.py index faa98dd7..bd5c867c 100644 --- a/test/test_preprocess.py +++ b/test/test_preprocess.py @@ -346,6 +346,31 @@ def test_from_data_frame_with_categorical_covariate_target_encoding(): assert torch.isfinite(prepared["context"][1]).all() +def test_from_data_frame_treats_bool_covariate_as_categorical(): + """Bool covariates must be target-encoded as categories, not passed through as raw 0/1 floats.""" + rng = np.random.default_rng(0) + flags = np.array([True, False] * 10) + base = { + "item_id": ["A"] * 10 + ["B"] * 10, + "timestamp": list(pd.date_range(end="2020-01-10", periods=10, freq="D")) * 2, + "target": rng.standard_normal(20).astype(np.float32), + } + bool_df = pd.DataFrame({**base, "flag": flags}) + str_df = pd.DataFrame({**base, "flag": flags.astype(str)}) + float_df = pd.DataFrame({**base, "flag": flags.astype(np.float32)}) + + kwargs = dict(target_columns=["target"], prediction_length=3, known_covariates_names=["flag"]) + bool_out = from_data_frame(df=bool_df, **kwargs) + str_out = from_data_frame(df=str_df, **kwargs) + float_out = from_data_frame(df=float_df, **kwargs) + + for bool_prepared, str_prepared, float_prepared in zip(bool_out, str_out, float_out): + # bool is encoded identically to the equivalent string categorical ... + torch.testing.assert_close(bool_prepared["context"], str_prepared["context"]) + # ... and differently from raw float 0/1 (which is passed through unencoded). + assert not torch.allclose(bool_prepared["context"], float_prepared["context"]) + + # Tests for _target_encode (the core categorical encoder)