-
Notifications
You must be signed in to change notification settings - Fork 48
Expand Dataset.from_files so it works properly with derived variables
#2777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 72 commits
4b989d3
b0c44f6
1dd5671
8989549
1f6dfa3
b6a6651
6793e0c
878e310
be6e55d
b1caf65
19dbff9
d8ea7d9
367bfe7
5bbe6ce
d10de1e
7323866
3ab2cdf
099349f
86b308b
369a811
c2a3d81
a3dab12
001eafa
debd589
c3df13e
e794817
7c1bfd7
b971d50
f6b6d22
1f4de86
8ee3e04
c6d303b
40147fb
6ec04fc
58b12d1
ea3386e
dc7e122
73ddc24
efa2ac1
f9c47a9
3de7bc8
77fd1e8
312fafa
e8c7bf2
6cdd714
9057cf9
ebc82ba
58dd666
b1c66fd
6be3169
5744b0d
cbcf37b
14e8b5e
ecbecc6
d7c73aa
69e0502
6eedca2
62c1996
8f2f179
7bc1bee
62067fc
b12df84
22ab6e7
36724ef
6ad2fef
3ce06fc
1116641
74983d5
acaf9fd
f6e531b
30b6f53
1cdfef2
cc91794
7ec3281
2bfc1fa
f0f2b6e
cffdeea
dec25bc
ff0cdd5
de27a4b
58b21ef
d8f5d08
0962489
81da6e7
ade0bce
d725654
d5234a7
2226ebd
bb72b6a
8dd2fdf
f3477c0
9b28c0f
6d8ba22
d3adbce
8c055a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,6 @@ | |
| from esmvalcore.esgf.facets import FACETS | ||
| from esmvalcore.exceptions import RecipeError | ||
| from esmvalcore.local import LocalFile, _replace_years_with_timerange | ||
| from esmvalcore.preprocessor._derive import get_required | ||
| from esmvalcore.preprocessor._io import DATASET_KEYS | ||
| from esmvalcore.preprocessor._supplementary_vars import ( | ||
| PREPROCESSOR_SUPPLEMENTARIES, | ||
|
|
@@ -188,28 +187,6 @@ | |
| return list(merged.values()) | ||
|
|
||
|
|
||
| def _fix_cmip5_fx_ensemble(dataset: Dataset) -> None: | ||
| """Automatically correct the wrong ensemble for CMIP5 fx variables.""" | ||
| if ( | ||
| dataset.facets.get("project") == "CMIP5" | ||
| and dataset.facets.get("mip") == "fx" | ||
| and dataset.facets.get("ensemble") != "r0i0p0" | ||
| and not dataset.files | ||
| ): | ||
| original_ensemble = dataset["ensemble"] | ||
| copy = dataset.copy() | ||
| copy.facets["ensemble"] = "r0i0p0" | ||
| if copy.files: | ||
| dataset.facets["ensemble"] = "r0i0p0" | ||
| logger.info( | ||
| "Corrected wrong 'ensemble' from '%s' to '%s' for %s", | ||
| original_ensemble, | ||
| dataset["ensemble"], | ||
| dataset.summary(shorten=True), | ||
| ) | ||
| dataset.find_files() | ||
|
|
||
|
|
||
| def _get_supplementary_short_names( | ||
| facets: Facets, | ||
| step: str, | ||
|
|
@@ -428,9 +405,7 @@ | |
| return datasets | ||
|
|
||
|
|
||
| def _dataset_from_files( # noqa: C901 | ||
| dataset: Dataset, | ||
| ) -> list[Dataset]: | ||
| def _dataset_from_files(dataset: Dataset) -> list[Dataset]: | ||
| """Replace facet values of '*' based on available files.""" | ||
| result: list[Dataset] = [] | ||
| errors: list[str] = [] | ||
|
|
@@ -441,53 +416,32 @@ | |
| dataset.summary(shorten=True), | ||
| ) | ||
|
|
||
| representative_datasets = _representative_datasets(dataset) | ||
|
|
||
| # For derived variables, representative_datasets might contain more than | ||
| # one element | ||
| all_datasets: list[list[tuple[dict, Dataset]]] = [] | ||
| for representative_dataset in representative_datasets: | ||
| all_datasets.append([]) | ||
| for expanded_ds in representative_dataset.from_files(): | ||
| updated_facets = {} | ||
| unexpanded_globs = {} | ||
| for key, value in dataset.facets.items(): | ||
| if _isglob(value): | ||
| if key in expanded_ds.facets and not _isglob( | ||
| expanded_ds[key], | ||
| ): | ||
| updated_facets[key] = expanded_ds.facets[key] | ||
| else: | ||
| unexpanded_globs[key] = value | ||
|
|
||
| if unexpanded_globs: | ||
| msg = _report_unexpanded_globs( | ||
| dataset, | ||
| expanded_ds, | ||
| unexpanded_globs, | ||
| ) | ||
| errors.append(msg) | ||
| continue | ||
| for expanded_ds in dataset.from_files(): | ||
| updated_facets = {} | ||
| unexpanded_globs = {} | ||
| for key, value in dataset.facets.items(): | ||
| if _isglob(value): | ||
| if key in expanded_ds.facets and not _isglob( | ||
| expanded_ds[key], | ||
| ): | ||
| updated_facets[key] = expanded_ds.facets[key] | ||
| else: | ||
| unexpanded_globs[key] = value | ||
|
|
||
| if unexpanded_globs: | ||
| msg = _report_unexpanded_globs( | ||
| dataset, | ||
| expanded_ds, | ||
| unexpanded_globs, | ||
| ) | ||
| errors.append(msg) | ||
| continue | ||
|
|
||
| new_ds = dataset.copy() | ||
| new_ds.facets.update(updated_facets) | ||
| new_ds.supplementaries = expanded_ds.supplementaries | ||
| new_ds = dataset.copy() | ||
| new_ds.facets.update(updated_facets) | ||
| new_ds.supplementaries = expanded_ds.supplementaries | ||
|
|
||
| all_datasets[-1].append((updated_facets, new_ds)) | ||
|
|
||
| # If globs have been expanded, only consider those datasets that contain | ||
| # all necessary input variables if derivation is necessary | ||
| for updated_facets, new_ds in all_datasets[0]: | ||
| other_facets = [[d[0] for d in ds] for ds in all_datasets[1:]] | ||
| if all(updated_facets in facets for facets in other_facets): | ||
| result.append(new_ds) | ||
| else: | ||
| logger.debug( | ||
| "Not all necessary input variables to derive '%s' are " | ||
| "available for dataset %s", | ||
| dataset["short_name"], | ||
| updated_facets, | ||
| ) | ||
| result.append(new_ds) | ||
|
|
||
| if errors: | ||
| raise RecipeError("\n".join(errors)) | ||
|
|
@@ -538,59 +492,23 @@ | |
| return msg | ||
|
|
||
|
|
||
| def _derive_needed(dataset: Dataset) -> bool: | ||
| """Check if dataset needs to be derived from other datasets.""" | ||
| if not dataset.facets.get("derive"): | ||
| return False | ||
| if dataset.facets.get("force_derivation"): | ||
| return True | ||
| if _isglob(dataset.facets.get("timerange", "")): | ||
| # Our file finding routines are not able to handle globs. | ||
| dataset = dataset.copy() | ||
| dataset.facets.pop("timerange") | ||
|
|
||
| copy = dataset.copy() | ||
| copy.supplementaries = [] | ||
| return not copy.files | ||
|
|
||
|
|
||
| def _get_input_datasets(dataset: Dataset) -> list[Dataset]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this function still needed now that the dataset provides these as an attribute?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This function removes non-existent optional required datasets prior to loading them. This can/will be moved to the |
||
| """Determine the input datasets needed for deriving `dataset`.""" | ||
| facets = dataset.facets | ||
| if not _derive_needed(dataset): | ||
| _fix_cmip5_fx_ensemble(dataset) | ||
| return [dataset] | ||
| if not dataset._derivation_necessary(): # noqa: SLF001 | ||
| return dataset.input_datasets | ||
|
|
||
| # Configure input datasets needed to derive variable | ||
| datasets = [] | ||
| required_vars = get_required(facets["short_name"], facets["project"]) # type: ignore | ||
| # idea: add option to specify facets in list of dicts that is value of | ||
| # 'derive' in the recipe and use that instead of get_required? | ||
| for input_facets in required_vars: | ||
| input_dataset = dataset.copy() | ||
| keep = {"alias", "recipe_dataset_index", *dataset.minimal_facets} | ||
| input_dataset.facets = { | ||
| k: v for k, v in input_dataset.facets.items() if k in keep | ||
| } | ||
| input_dataset.facets.update(input_facets) | ||
| input_dataset.augment_facets() | ||
| _fix_cmip5_fx_ensemble(input_dataset) | ||
| if input_facets.get("optional") and not input_dataset.files: | ||
| # Skip optional datasets if no data is available | ||
| input_datasets: list[Dataset] = [] | ||
| for input_dataset in dataset.input_datasets: | ||
| if input_dataset.facets.get("optional") and not input_dataset.files: | ||
| logger.info( | ||
| "Skipping: no data found for %s which is marked as 'optional'", | ||
| input_dataset, | ||
| ) | ||
| else: | ||
| datasets.append(input_dataset) | ||
| input_datasets.append(input_dataset) | ||
|
|
||
| # Check timeranges of available input data. | ||
| timeranges: set[str] = set() | ||
| for input_dataset in datasets: | ||
| if "timerange" in input_dataset.facets: | ||
| timeranges.add(input_dataset.facets["timerange"]) # type: ignore | ||
| check.differing_timeranges(timeranges, required_vars) | ||
|
|
||
| return datasets | ||
| return input_datasets | ||
|
|
||
|
|
||
| def _representative_datasets(dataset: Dataset) -> list[Dataset]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function seems no longer needed either
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, this can be removed once |
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.