diff --git a/gtep/gtep_data.py b/gtep/gtep_data.py index 251fcbc..8c48ad6 100644 --- a/gtep/gtep_data.py +++ b/gtep/gtep_data.py @@ -57,13 +57,13 @@ def load_prescient( self, data_path, representative_dates=None, - representative_weights={}, + representative_weights=None, options_dict=None, ): """Loads data structured via Prescient data loader. :param data_path: Folder containing the data to be loaded - :param representative_dates: List of time points to include. Note: Change the last date for whatever extreme day is needed based on the given run(s) + :param representative_dates: List of time points to include. :param representative_weights: dictionary of weights for each representative date, defaults to empty Dict :param options_dict: Options dictionary to pass to the Prescient data loader, defaults to None @@ -72,21 +72,21 @@ def load_prescient( # create prescient config object with defaults prescient_options = PrescientConfig() - # work around for prescient throwing an error with Path objects - if isinstance(data_path, Path): - data_path = str(data_path) - if options_dict is None: # set basic configurations that do not match prescient defaults options_dict = { - "data_path": data_path, + "data_path": str( + data_path + ), # work around for prescient (error with Path objects) "num_days": 365, "ruc_horizon": 36, } else: # ensure data path is included in options dictionary - options_dict["data_path"] = data_path + options_dict["data_path"] = str( + data_path + ) # work around for prescient (error with Path objects) # update configuration values based on options dictionary prescient_options.set_value(options_dict) @@ -153,15 +153,22 @@ def load_prescient( "2020-07-05 00:00", "2020-10-14 00:00", ## Change the last date for whatever extreme day is needed based on the given run(s) ] + #enforce representative dates as a list + if not isinstance(representative_dates, list): + representative_dates = [representative_dates] + #check that the representative dates has at least one value + if not len(representative_dates) >= 1: + raise ValueError('Invalid input for representative_dates. representative_dates should be a list of date strings') self.representative_dates = representative_dates + self.representative_weights = representative_weights - if not representative_weights: + if representative_weights is None: # set the weight for each day to the total weight divided by number of days total_weight = prescient_options.num_days * self.stages weight_per_date = int(total_weight / (len(representative_dates))) self.representative_weights = { key: weight_per_date - for date, key in enumerate(self.representative_dates) + for key, date in enumerate(self.representative_dates) } time_keys = self.md.data["system"]["time_keys"] @@ -197,7 +204,7 @@ def import_load_scaling(self, load_file_name, forecast_years=None): adjusted_forecast_by_period = adjusted_forecast[ adjusted_forecast["year"].isin(forecast_years) - ] + ].copy() base_zones = [ "base_economic_coast", @@ -263,12 +270,15 @@ def import_outage_data(self, load_file_name): r" (\d+):" ) filtered_outages = filtered_outages[["fips_code", "hour"]] - county_to_fips = pd.read_csv( - "./gtep/data/123_Bus_Resil_Week/county_fips_match.csv" - ) - bus_to_county = pd.read_csv( - "./gtep/data/123_Bus_Resil_Week/Bus_data_gen_weights_mappings.csv" - ) + + base_dir = Path(load_file_name).parent + + county_fips_path = base_dir / "county_fips_match.csv" + bus_to_county_path = base_dir / "Bus_data_gen_weights_mappings.csv" + + county_to_fips = pd.read_csv(county_fips_path) + bus_to_county = pd.read_csv(bus_to_county_path) + county_to_fips = county_to_fips[["County", "FIPS"]] bus_to_county = bus_to_county[["Bus Number", "County"]] bus_to_county = bus_to_county.merge(county_to_fips, how="inner", on="County") @@ -280,7 +290,8 @@ def import_outage_data(self, load_file_name): how="left", ) bus_hours = bus_hours[bus_hours["Bus Number"].notna()] - bus_hours.to_csv("./gtep/data/123_Bus_Resil_Week/not_right.csv") + csv_path = base_dir / "not_right.csv" + bus_hours.to_csv(csv_path) self.bus_hours = bus_hours[["hour", "Bus Number"]] self.bus_hours = self.bus_hours.astype(int) @@ -346,8 +357,12 @@ def load_storage_csv(self, data_path): :param data_path: filepath for storage data csv file """ + # enforce pathlib object + if not isinstance(data_path, Path): + data_path = Path(data_path) + try: - storage_path = data_path + "/storage.csv" + storage_path = data_path / "storage.csv" storage_df = pd.read_csv(storage_path) storage_data = {} @@ -368,10 +383,18 @@ def texas_case_study_updates(self, data_path): :param data_path: filepath for generator data csv file """ # check that datapath is coming from a texas case study directory - if "Texas" or "Coal" not in data_path: + if ( + ("Texas" not in str(data_path)) + and ("Coal" not in str(data_path)) + and ("Resil_Week" not in str(data_path)) + ): raise ValueError("The data path provided is not a Texas case study") - generator_update_path = data_path + "/gen.csv" + # enforce pathlib object + if not isinstance(data_path, Path): + data_path = Path(data_path) + + generator_update_path = data_path / "gen.csv" generator_df = pd.read_csv(generator_update_path) bonus_feature_list = [ "capex1", @@ -391,6 +414,8 @@ def texas_case_study_updates(self, data_path): for col in bonus_feature_list: for gen in data_point.data["elements"]["generator"]: if not data_point.data["elements"]["generator"][gen].get(col): - data_point.data["elements"]["generator"][gen][col] = float( - generator_df[generator_df["GEN UID"] == gen][col] - ) + matching_rows = generator_df[generator_df["GEN UID"] == gen] + if not matching_rows.empty: + data_point.data["elements"]["generator"][gen][col] = float( + matching_rows[col].iloc[0] + ) diff --git a/gtep/tests/unit/test_gtep_data.py b/gtep/tests/unit/test_gtep_data.py new file mode 100644 index 0000000..1eaab4c --- /dev/null +++ b/gtep/tests/unit/test_gtep_data.py @@ -0,0 +1,397 @@ +################################################################################# +# The Institute for the Design of Advanced Energy Systems Integrated Platform +# Framework (IDAES IP) was produced under the DOE Institute for the +# Design of Advanced Energy Systems (IDAES). +# +# Copyright (c) 2018-2026 by the software owners: The Regents of the +# University of California, through Lawrence Berkeley National Laboratory, +# National Technology & Engineering Solutions of Sandia, LLC, Carnegie Mellon +# University, West Virginia University Research Corporation, et al. +# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md +# for full copyright and license information. +################################################################################# + + +import pyomo.common.unittest as unittest +from unittest.mock import patch +import pytest +from gtep.gtep_data import ExpansionPlanningData +import pandas as pd +from pathlib import Path +import tempfile +import os + +curr_dir = Path(__file__).resolve().parent +input_data_source = (curr_dir / ".." / ".." / "data" / "5bus").resolve() + +load_scaling_file = ( + curr_dir / ".." / ".." / "data" / "Texas_2000" / "ERCOT-Adjusted-Forecast.xlsb" +).resolve() + +storage_file = (curr_dir / ".." / ".." / "data" / "9_bus_GTEP_dir").resolve() + +texas_data_path = (curr_dir / ".." / ".." / "data" / "123_Bus_Coal").resolve() + +outage_data_path = ( + curr_dir / ".." / ".." / "data" / "123_Bus_Resil_Week" / "may_20.csv" +).resolve() + + +class TestExpansionPlanningData(unittest.TestCase): + + def test_data_init(self): + # Test that the ExpansionPlanningData object initializes properly with default values + testObject = ExpansionPlanningData() + self.assertIsInstance(testObject, ExpansionPlanningData) + self.assertEqual(testObject.stages, 2) + self.assertEqual(testObject.num_reps, 4) + self.assertEqual(testObject.len_reps, 1) + self.assertEqual(testObject.num_commit, 24) + self.assertEqual(testObject.num_dispatch, 1) + self.assertEqual(testObject.duration_dispatch, 60) + + # Test that the ExpansionPlanningData object initializes properly with input values + testObject = ExpansionPlanningData(1, 2, 2, 2, 2, 15) + self.assertEqual(testObject.stages, 1) + self.assertEqual(testObject.num_reps, 2) + self.assertEqual(testObject.len_reps, 2) + self.assertEqual(testObject.num_commit, 2) + self.assertEqual(testObject.num_dispatch, 2) + self.assertEqual(testObject.duration_dispatch, 15) + + # Test that the ExpansionPlanningData object initializes properly with partial input values + testObject = ExpansionPlanningData(duration_dispatch=15) + self.assertEqual(testObject.stages, 2) + self.assertEqual(testObject.num_reps, 4) + self.assertEqual(testObject.len_reps, 1) + self.assertEqual(testObject.num_commit, 24) + self.assertEqual(testObject.num_dispatch, 1) + self.assertEqual(testObject.duration_dispatch, 15) + + ### LOAD_PRESCIENT ### + def test_default_representative_dates(self): + # Test no representative dates passed in, initializing with defaults + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + # default dates: + expected_dates = [ + "2020-01-28 00:00", + "2020-04-23 00:00", + "2020-07-05 00:00", + "2020-10-14 00:00", + ] + self.assertEqual(testObject.representative_dates, expected_dates) + + def test_passed_representative_dates(self): + # Test new representative dates passed in, replacing defaults + expected_dates = [ + "2020-01-28 00:00", + "2020-04-23 00:00", + "2020-07-05 00:00", + "2020-12-14 00:00", + ] + + testObject = ExpansionPlanningData() + testObject.load_prescient( + data_path=input_data_source, + representative_dates=expected_dates, + ) + + self.assertEqual(testObject.representative_dates, expected_dates) + + def test_representative_date_not_in_time_keys(self): + # Test passing invalid/not covered dates + testObject = ExpansionPlanningData() + bad_dates = [ + "2020-01-28 00:00", + "2020-04-23 00:00", + "2020-07-05 00:00", + "2099-01-01 00:00", # Not in time_keys + ] + with self.assertRaises(ValueError): + testObject.load_prescient( + data_path=input_data_source, representative_dates=bad_dates + ) + + def test_empty_representative_dates(self): + # Test an empty list passed to overwrite defaults without setting new dates + testObject = ExpansionPlanningData() + with self.assertRaises(ZeroDivisionError): + testObject.load_prescient( + data_path=input_data_source, representative_dates=[] + ) + + def test_default_representative_weights(self): + # Test no representative weights passed in, so the function calculates them + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + + total_weight = 365 * testObject.stages + expected_weight = int(total_weight / len(testObject.representative_dates)) + + for w in testObject.representative_weights.values(): + self.assertEqual(w, expected_weight) + + def test_no_representative_weights_passed_5_dates(self): + # Test no representative weights passed in and custom dates passed in, so the function calculates them + dates = [ + "2020-01-28 00:00", + "2020-04-23 00:00", + "2020-07-05 00:00", + "2020-10-14 00:00", + "2020-12-14 00:00", + ] + + testObject = ExpansionPlanningData() + testObject.load_prescient( + data_path=input_data_source, representative_dates=dates + ) + + total_weight = 365 * testObject.stages + expected_weight = int(total_weight / len(testObject.representative_dates)) + + for w in testObject.representative_weights.values(): + self.assertEqual(w, expected_weight) + + def test_passed_representative_weights(self): + # Test representative weights passed in + weights = {1: 91, 2: 91, 3: 91, 4: 91} + testObject = ExpansionPlanningData() + testObject.load_prescient( + data_path=input_data_source, representative_weights=weights + ) + self.assertEqual(testObject.representative_weights, weights) + + def test_in_service_flags_set(self): + # Test in service flags are being set properly + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + + self.assertEqual( + testObject.md.data["elements"]["generator"]["3_CT"]["in_service"], True + ) + self.assertEqual( + testObject.md.data["elements"]["branch"]["branch_3_4_1"]["in_service"], + True, + ) + + def test_missing_simulation_objects_csv(self): + # test if a data path is missing the simulation objects, it should throw an error + with tempfile.TemporaryDirectory() as tmpdirname: + testObject = ExpansionPlanningData() + with self.assertRaises(FileNotFoundError): + testObject.load_prescient(data_path=tmpdirname) + + def test_clone_at_time_keys_called_correctly(self): + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + + model_class = type(testObject.md) + + # Patch clone_at_time_keys on the existing model instance + with unittest.mock.patch.object( + model_class, "clone_at_time_keys", wraps=testObject.md.clone_at_time_keys + ) as mock_clone: + + # Call load_prescient again to trigger cloning with patched method + testObject.load_prescient(data_path=input_data_source) + + # Check that clone_at_time_keys was called once per representative date + expected_calls = len(testObject.representative_dates) + self.assertEqual(mock_clone.call_count, expected_calls) + + ### IMPORT_LOAD_SCALING ### + @pytest.mark.skipif( + not os.path.exists(load_scaling_file), + reason=f"Data file {load_scaling_file} not found", + ) + def test_import_load_scaling_normal(self): + # test successful passthrough of load scaling function + testObject = ExpansionPlanningData() + testObject.import_load_scaling(load_scaling_file) + + df = testObject.load_scaling + self.assertIsInstance(df, pd.DataFrame) + expected_columns = ["year", "month", "day", "hour"] + [ + str(i) for i in range(1, 9) + ] + for col in expected_columns: + self.assertIn(col, df.columns) + self.assertFalse(df.empty) + + @pytest.mark.skipif( + not os.path.exists(load_scaling_file), + reason=f"Data file {load_scaling_file} not found", + ) + def test_import_load_scaling_incorrect_num_years(self): + # Test value error raised if the length of forecast years is incorrect + testObject = ExpansionPlanningData(stages=3) + forecast_years = [2025, 2030] + + with self.assertRaises(ValueError): + testObject.import_load_scaling(load_scaling_file, forecast_years) + + @pytest.mark.skipif( + not os.path.exists(load_scaling_file), + reason=f"Data file {load_scaling_file} not found", + ) + def test_import_load_scaling_incorrect_years_too_early(self): + # Test value error raised if the forecast years are outside the supported ranges + testObject = ExpansionPlanningData(stages=3) + forecast_years = [2019, 2030, 2055] + + with self.assertRaises(ValueError): + testObject.import_load_scaling(load_scaling_file, forecast_years) + + ### IMPORT_OUTAGE_DATA ### + @pytest.mark.skipif( + not os.path.exists(outage_data_path), + reason=f"Data file {outage_data_path} not found", + ) + def test_import_outage_data(self): + testObject = ExpansionPlanningData() + + testObject.import_outage_data(outage_data_path) + + df = testObject.bus_hours + + self.assertTrue(hasattr(testObject, "bus_hours")) + self.assertIsInstance(df, pd.DataFrame) + self.assertIn("hour", df.columns) + self.assertIn("Bus Number", df.columns) + + ### LOAD_DEFAULT_DATA_SETTINGS ### + def test_load_default_data_settings(self): + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + + testObject.load_default_data_settings() + + # Check generators + for gen_name, gen in testObject.md.data["elements"]["generator"].items(): + if gen.get("fuel") == "C": + if gen.get("in_service") is False: + self.assertEqual(gen["lifetime"], 1) + else: + self.assertEqual(gen["lifetime"], 2) + else: + self.assertEqual(gen["lifetime"], 3) + + # Check other fixed attributes + self.assertEqual(gen["spinning_reserve_frac"], 0.1) + self.assertEqual(gen["quickstart_reserve_frac"], 0.1) + self.assertEqual(gen["capital_multiplier"], 1) + self.assertEqual(gen["extension_multiplier"], 0) + self.assertEqual(gen["max_operating_reserve"], 1) + self.assertEqual(gen["max_spinning_reserve"], 1) + self.assertEqual(gen["max_quickstart_reserve"], 1) + self.assertEqual(gen["ramp_up_rate"], 0.1) + self.assertEqual(gen["ramp_down_rate"], 0.1) + self.assertEqual(gen["emissions_factor"], 1) + self.assertEqual(gen["start_fuel"], 1) + self.assertEqual(gen["investment_cost"], 1) + + # Check branches + for branch in testObject.md.data["elements"]["branch"].values(): + self.assertEqual(branch["loss_rate"], 0) + self.assertEqual(branch["distance"], 1) + self.assertEqual(branch["capital_cost"], 10000000) + + # Check system + system = testObject.md.data["system"] + self.assertEqual(system["min_operating_reserve"], 0.1) + self.assertEqual(system["min_spinning_reserve"], 0.1) + + ### LOAD_STORAGE_CSV ### + @pytest.mark.skipif( + not os.path.exists(storage_file), + reason=f"Data file {storage_file} not found", + ) + def test_load_storage_csv_success(self): + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + testObject.load_storage_csv(storage_file) + + # Check that storage data was loaded into md.data["elements"]["storage"] + storage = testObject.md.data["elements"].get("storage", None) + self.assertIsNotNone(storage) + self.assertIsInstance(storage, dict) + + # Check some expected keys in storage data + expected_keys = { + "bus", + "generator", + "storage_type", + "energy_capacity", + "initial_state_of_charge", + "investment_cost", + "investment_cost_kwh", + } + for key in expected_keys: + self.assertin(key ,storage["100MW_400MWh_1"].keys()) + + @pytest.mark.skipif( + not os.path.exists(storage_file), + reason=f"Data file {storage_file} not found", + ) + def test_load_storage_string_path(self): + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=input_data_source) + testObject.load_storage_csv(str(storage_file)) # should not throw an error + self.assertin('storage' ,testObject.md.data["elements"].keys()) + #check that the storage data is not an empty dict + self.assertTrue(testObject.md.data["elements"]['storage']) + + def test_load_storage_csv_file_not_found(self): + testObject = ExpansionPlanningData() + testObject.load_prescient(input_data_source) + testObject.load_storage_csv(input_data_source) + + # Storage should be set to empty dict + storage = testObject.md.data["elements"].get("storage", None) + self.assertIsInstance(storage, dict) + self.assertEqual(storage, {}) + + ### TEXAS_CASE_STUDY_UPDATES ### + @pytest.mark.skipif( + not os.path.exists(texas_data_path), + reason=f"Data file {texas_data_path} not found", + ) + def test_texas_case_study(self): + testObject = ExpansionPlanningData() + testObject.load_prescient(data_path=texas_data_path) + + # Call the method under test + testObject.texas_case_study_updates(texas_data_path) + + generator = testObject.md.data["elements"].get("generator", None) + self.assertIsNotNone(generator) + + expected_columns = [ + "capex1", + "capex2", + "capex3", + "fuel_cost1", + "fuel_cost2", + "fuel_cost3", + "fixed_ops1", + "fixed_ops2", + "fixed_ops3", + "var_ops1", + "var_ops2", + "var_ops3", + ] + + # Check that each expected column is added to each generator + for gen_name, gen_data in generator.items(): + for col in expected_columns: + self.assertIn( + col, gen_data, f"Column {col} missing in generator {gen_name}" + ) + + def test_texas_case_study_invalid_data_path(self): + # Test that an error is raised if not a Texas case Study + testObject = ExpansionPlanningData() + + with self.assertRaises(ValueError): + testObject.texas_case_study_updates(input_data_source) diff --git a/pyproject.toml b/pyproject.toml index 6515abb..d1f2d6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ dependencies = [ "matplotlib", "ipython", "openpyxl", + "pyxlsb", ] [project.urls]