Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion compliance_checker/cf/cf_1_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,35 @@ def _check_gmattr_existence_condition_geoid_name_geoptl_datum_name(self, var):
else:
return (True, msg)

def _check_gmattr_existence_condition_crs_name(self, var):
"""
If projected_crs_name is defined then geographic_crs_name must be also.

:param netCDF4.Variable var
:rtype tuple
:return two-tuple (bool, str)
"""
msg = "projected_crs_name is defined then geographic_crs_name " "must be also."

_ncattrs = set(var.ncattrs())

if any(
x in _ncattrs
for x in [
"projected_crs_name",
"geographic_crs_name",
]
) and (
not {
"projected_crs_name",
"geographic_crs_name",
}.issubset(_ncattrs)
):
return (False, msg)

else:
return (True, msg)

def _check_gmattr_existence_condition_ell_pmerid_hdatum(self, var):
"""
If one of reference_ellipsoid_name, prime_meridian_name, or
Expand All @@ -518,7 +547,7 @@ def _check_gmattr_existence_condition_ell_pmerid_hdatum(self, var):

msg = (
"If any of reference_ellipsoid_name, prime_meridian_name, "
"or horizontal_datum_name are defined, all must be defined."
"or horizontal_datum_name or geographic_crs_name are defined, all must be defined."
)

_ncattrs = set(var.ncattrs())
Expand All @@ -529,12 +558,14 @@ def _check_gmattr_existence_condition_ell_pmerid_hdatum(self, var):
"reference_ellipsoid_name",
"prime_meridian_name",
"horizontal_datum_name",
"geographic_crs_name",
]
) and (
not {
"reference_ellipsoid_name",
"prime_meridian_name",
"horizontal_datum_name",
"geographic_crs_name",
}.issubset(_ncattrs)
):
return (False, msg)
Expand Down Expand Up @@ -739,6 +770,7 @@ def _evaluate_towgs84(self, val):
def check_grid_mapping(self, ds):
prev_return = super().check_grid_mapping(ds)
grid_mapping_variables = cfutil.get_grid_mapping_variables(ds)

for var_name in sorted(grid_mapping_variables):
var = ds.variables[var_name]
test_ctx = self.get_test_ctx(
Expand Down
26 changes: 15 additions & 11 deletions compliance_checker/cf/cf_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ def check_grid_mapping(self, ds):
)
defines_grid_mapping.assert_true(
(isinstance(grid_mapping, str) and grid_mapping),
f"{variable.name}'s grid_mapping attribute must be a "
"space-separated non-empty string",
f"Variable '{variable.name}' must have a non-empty, space-separated 'grid_mapping' attribute.",
)
if isinstance(grid_mapping, str):
# TODO (badams): refactor functionality to split functionality
Expand All @@ -206,7 +205,7 @@ def check_grid_mapping(self, ds):
for grid_var_name, coord_var_str in re_all:
defines_grid_mapping.assert_true(
grid_var_name in ds.variables,
f"grid mapping variable {grid_var_name} must exist in this dataset",
f"Grid mapping variable {grid_var_name} must exist in the dataset.",
)
for ref_var in coord_var_str.split():
defines_grid_mapping.assert_true(
Expand All @@ -218,7 +217,7 @@ def check_grid_mapping(self, ds):
for grid_var_name in grid_mapping.split():
defines_grid_mapping.assert_true(
grid_var_name in ds.variables,
f"grid mapping variable {grid_var_name} must exist in this dataset",
f"Grid mapping variable '{grid_var_name}' must exist in the dataset.",
)
ret_val[variable.name] = defines_grid_mapping.to_result()

Expand All @@ -231,12 +230,17 @@ def check_grid_mapping(self, ds):
)
grid_var = ds.variables[grid_var_name]

# Grid mapping variables should have 0 dimensions.
valid_grid_mapping.assert_true(
len(grid_var.dimensions) == 0,
f"Grid mapping variable '{grid_var_name}' has dimensions but should be scalar (0-dimensional).",
)
grid_mapping_name = getattr(grid_var, "grid_mapping_name", None)

# Grid mapping name must be in appendix F
valid_grid_mapping.assert_true(
grid_mapping_name in self.grid_mapping_dict,
f"{grid_mapping_name} is not a valid grid_mapping_name."
f"Grid mapping variable '{grid_var_name}' has an invalid grid_mapping_name: '{grid_mapping_name}'."
+ " See Appendix F for valid grid mappings",
)

Expand All @@ -253,11 +257,12 @@ def check_grid_mapping(self, ds):

grid_mapping = self.grid_mapping_dict[grid_mapping_name]
required_attrs = grid_mapping[0]

# Make sure all the required attributes are defined
for req in required_attrs:
valid_grid_mapping.assert_true(
hasattr(grid_var, req),
f"{req} is a required attribute for grid mapping {grid_mapping_name}",
f"The attribute '{req}' is required for grid mapping variable '{grid_var_name}' with the grid_mapping_name '{grid_mapping_name}'.",
)

# Make sure that exactly one of the exclusive attributes exist
Expand All @@ -267,10 +272,10 @@ def check_grid_mapping(self, ds):
for attr in at_least_attr:
if hasattr(grid_var, attr):
number_found += 1

valid_grid_mapping.assert_true(
number_found == 1,
f"grid mapping {grid_mapping_name}"
+ "must define exactly one of these attributes: "
f"Grid mapping variable '{grid_var_name}' with grid_mapping_name '{grid_mapping_name}' must define exactly one of the following attributes:"
+ "{}".format(" or ".join(at_least_attr)),
)

Expand All @@ -280,11 +285,10 @@ def check_grid_mapping(self, ds):
found_vars = ds.get_variables_by_attributes(
standard_name=expected_std_name,
)

valid_grid_mapping.assert_true(
len(found_vars) == 1,
f"grid mapping {grid_mapping_name} requires exactly "
+ "one variable with standard_name "
+ f"{expected_std_name} to be defined",
f"Grid mapping variable '{grid_var_name}' with grid_mapping_name '{grid_mapping_name}' requires exactly one variable with standard_name '{expected_std_name}' to be defined.",
)

ret_val[grid_var_name] = valid_grid_mapping.to_result()
Expand Down
25 changes: 22 additions & 3 deletions compliance_checker/cf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,16 +930,35 @@ def get_flag_variables(nc):
return flag_variables


def extract_grid_mapping_names(grid_mapping_string):
"""
Extracts all grid mapping variable names from a grid_mapping string.

:param str grid_mapping_string: The grid_mapping attribute string
:return list[str]: List of grid mapping variable names
"""
return (
re.findall(r"\b(\w+):", grid_mapping_string)
if ":" in grid_mapping_string
else grid_mapping_string.split()
)


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it to include the only two possible formats:

Key-value-like: "key: value" (key is the grid mapping variables
Space-separated names: "x y z" ( x y z are the grid mapping variables)

def get_grid_mapping_variables(nc):
"""
Returns a list of grid mapping variables
Returns a set of all grid mapping variable names that are present in the dataset.

:param netCDF4.Dataset nc: An open netCDF4 Dataset
:return set[str]: Set of grid mapping variable names
"""
grid_mapping_variables = set()
for ncvar in nc.get_variables_by_attributes(grid_mapping=lambda x: x is not None):
if ncvar.grid_mapping in nc.variables:
grid_mapping_variables.add(ncvar.grid_mapping)
grid_mapping_string = ncvar.grid_mapping
grid_mapping_names = extract_grid_mapping_names(grid_mapping_string)
for name in grid_mapping_names:
if name in nc.variables:
grid_mapping_variables.add(name)

return grid_mapping_variables


Expand Down
60 changes: 57 additions & 3 deletions compliance_checker/tests/test_cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,11 +1455,42 @@ def test_check_reduced_horizontal_grid(self):
assert all(r.name == "§5.3 Reduced Horizontal Grid" for r in results)

def test_check_grid_mapping(self):
dataset = self.load_dataset(STATIC_FILES["mapping"])

dataset = MockTimeSeries()
dataset.createVariable("temp", "d", ("time"))
dataset.createVariable("crsOSGB", "d")
dataset.createVariable("crsWGS84", "d")

temp = dataset.variables["temp"]
temp.standard_name = "air_temperature"
temp.units = "K"
temp.coordinates = "lat lon"
temp.grid_mapping = "crsOSGB: time crsWGS84: lat lon"

# create grid_mapping crsOSGB ;
crsOSGB = dataset.variables["crsOSGB"]
crsOSGB.grid_mapping_name = "transverse_mercator"
crsOSGB.semi_major_axis = 6377563.396
crsOSGB.inverse_flattening = 299.3249646
crsOSGB.longitude_of_prime_meridian = 0.0
crsOSGB.latitude_of_projection_origin = 49.0
crsOSGB.longitude_of_central_meridian = -2.0
crsOSGB.scale_factor_at_central_meridian = 0.9996012717
crsOSGB.false_easting = 400000.0
crsOSGB.false_northing = -100000.0
crsOSGB.unit = "metre"

# create grid_mapping crsWGS84
crsWGS84 = dataset.variables["crsWGS84"]
crsWGS84.grid_mapping_name = "latitude_longitude"
crsWGS84.longitude_of_prime_meridian = 0.0
crsWGS84.semi_major_axis = 6378137.0
crsWGS84.inverse_flattening = 298.257223563

results = self.cf.check_grid_mapping(dataset)

assert len(results) == 6
assert len([r.value for r in results.values() if r.value[0] < r.value[1]]) == 0
assert len(results) == 3
assert len([r.value for r in results.values() if r.value[0] < r.value[1]]) == 2
expected_name = (
"§5.6 Horizontal Coordinate Reference Systems, Grid Mappings, Projections"
)
Expand Down Expand Up @@ -2682,13 +2713,36 @@ def test_check_gmattr_existence_condition_geoid_name_geoptl_datum_name(self):
assert not res[0]
dataset.close()

def test_check_gmattr_existence_condition_crs_name(self):
# test good all
dataset = MockTimeSeries()
dataset.createVariable("lev", "d") # dtype=double, dims=1
dataset.variables["lev"].setncattr("projected_crs_name", "blah")
dataset.variables["lev"].setncattr("geographic_crs_name", "blah")
res = self.cf._check_gmattr_existence_condition_crs_name(
dataset.variables["lev"],
)
assert res[0]
dataset.close()

# test bad (not all)
dataset = MockTimeSeries()
dataset.createVariable("lev", "d") # dtype=double, dims=1
dataset.variables["lev"].setncattr("geographic_crs_name", "blah")
res = self.cf._check_gmattr_existence_condition_crs_name(
dataset.variables["lev"],
)
assert not res[0]
dataset.close()

def test_check_gmattr_existence_condition_ell_pmerid_hdatum(self):
# test good (all)
dataset = MockTimeSeries()
dataset.createVariable("lev", "d") # dtype=double, dims=1
dataset.variables["lev"].setncattr("reference_ellipsoid_name", "blah")
dataset.variables["lev"].setncattr("prime_meridian_name", "blah")
dataset.variables["lev"].setncattr("horizontal_datum_name", "blah")
dataset.variables["lev"].setncattr("geographic_crs_name", "blah")
res = self.cf._check_gmattr_existence_condition_ell_pmerid_hdatum(
dataset.variables["lev"],
)
Expand Down
Loading