From cad4d2ee37956433ee22ac110ea940d2d2c42211 Mon Sep 17 00:00:00 2001 From: huacchob Date: Tue, 6 Jan 2026 17:00:24 -0500 Subject: [PATCH 1/2] adding TYPE_API, and API remediation to perform config_merge operations for API based devices, such as Meraki, associated tests and documentation --- changes/1064.added | 1 + docs/user/app_feature_remediation.md | 26 +- nautobot_golden_config/choices.py | 2 + nautobot_golden_config/models.py | 382 +++++++++++++++++- .../fixtures/remediation/actual_config.json | 7 + .../fixtures/remediation/config_context.json | 5 + .../fixtures/remediation/intended_config.json | 7 + nautobot_golden_config/tests/test_models.py | 113 +++++- 8 files changed, 538 insertions(+), 5 deletions(-) create mode 100644 changes/1064.added create mode 100644 nautobot_golden_config/tests/fixtures/remediation/actual_config.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/config_context.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/intended_config.json diff --git a/changes/1064.added b/changes/1064.added new file mode 100644 index 000000000..07f477cfa --- /dev/null +++ b/changes/1064.added @@ -0,0 +1 @@ +Adding API remediation feature to support API dispatchers. diff --git a/docs/user/app_feature_remediation.md b/docs/user/app_feature_remediation.md index bc6e6c6fb..2cfbd70a4 100644 --- a/docs/user/app_feature_remediation.md +++ b/docs/user/app_feature_remediation.md @@ -1,6 +1,6 @@ # Navigating Configuration Remediation -Automated network configuration remediation is a systematic approach that leverages technology and processes to address and rectify configuration issues in network devices. +Automated network configuration remediation is a systematic approach that leverages technology and processes to address and rectify configuration issues in network devices. It involves the use of the Golden Configuration app to understand the current configuration state, compare it against the intended configuration state, and automatically generate remediation data. Automated network configuration remediation improves efficiency by eliminating manual efforts and reducing the risk of human errors. It enables rapid response to security vulnerabilities, minimizes downtime, and enhances compliance with regulatory and industry standards. @@ -48,6 +48,28 @@ Default Hier config options can be used or customized on a per platform basis, a For additional information on how to customize Hier Config options, please refer to the Hierarchical Configuration development guide: https://hier-config.readthedocs.io/en/latest/ +### API Remediation Type + +You can use the TYPE_API option to enable a device to use the API type of remediation. To use this, you would need to pass the settings +that the API request would use as config context. Here is an example using Cisco Meraki platform. + +```json +org_remediation: + - endpoint: "/organizations/{{ obj.get_config_context().get('organization_id', '')}}" + method: "PUT" + query: [] + fields: + - "name" +``` + +The way to create this is like this: + +- The high level key should be '**feature-name**\_remediation', in this case the feature is **org** + - endpoint: This is the endpoint you should call. You could pass jinja to the endpoint to dynamically create the endpoint. + - method: This is the HTTP method to use for the call. + - query: You add strings here, used as a filter if the endpoint supports it, like for example '?user=NTC' if you would like to filter a response searching for the NTC user. + - fields: This is also a list of strings, and it should hold the key names of the response you got from the device, to include that in the payload you will send to the device when you execute the Config Plan. In this example, we only want the "name" field from the response. + ### Custom Config Remediation Type When a Network Operating System delivers configuration data in a format that is not CLI/Hierarchical, we can still perform remediation by using the Custom Remediation options. Custom Remediation is defined within a Python function that takes as input a Configuration Compliance object and returns a Remediation Field. @@ -66,4 +88,4 @@ Once remediation settings are configured for a particular platform, remediation Once remediation is configured for a particular Platform/Feature pair, it is possible to validate remediation operations by running a compliance job. Navigate to **Jobs -> Perform Configuration Compliance** and run a compliance job for a device that has remediation enabled. Verify that remediation data has been generated by navigating to **Golden Config -> Config Compliance**, select the device and check the compliance status for the feature with remediation enabled and the "Remediating Configuration" field, as shown below: -![Validate Configuration Remediation](../images/remediation_validate_feature.png) \ No newline at end of file +![Validate Configuration Remediation](../images/remediation_validate_feature.png) diff --git a/nautobot_golden_config/choices.py b/nautobot_golden_config/choices.py index ef896fa1b..bbe51de72 100644 --- a/nautobot_golden_config/choices.py +++ b/nautobot_golden_config/choices.py @@ -21,10 +21,12 @@ class RemediationTypeChoice(ChoiceSet): """Choiceset used by RemediationSetting.""" TYPE_HIERCONFIG = "hierconfig" + TYPE_API = "api" TYPE_CUSTOM = "custom_remediation" CHOICES = ( (TYPE_HIERCONFIG, "HIERCONFIG"), + (TYPE_API, "API"), (TYPE_CUSTOM, "CUSTOM_REMEDIATION"), ) diff --git a/nautobot_golden_config/models.py b/nautobot_golden_config/models.py index 472e1b206..1fd0f990c 100644 --- a/nautobot_golden_config/models.py +++ b/nautobot_golden_config/models.py @@ -3,6 +3,9 @@ import json import logging import os +from collections import deque +from dataclasses import dataclass +from typing import Any from deepdiff import DeepDiff from django.core.exceptions import ValidationError @@ -24,6 +27,8 @@ from nautobot_golden_config.choices import ComplianceRuleConfigTypeChoice, ConfigPlanTypeChoice, RemediationTypeChoice from nautobot_golden_config.utilities.constant import ENABLE_SOTAGG, PLUGIN_CFG +# pylint: disable=too-many-lines + LOGGER = logging.getLogger(__name__) GRAPHQL_STR_START = "query ($device_id: ID!)" @@ -178,8 +183,7 @@ def _verify_get_custom_compliance_data(compliance_details): def _get_hierconfig_remediation(obj): - """ - Generate the remediation configuration for a device using HierConfig. + """Generate the remediation configuration for a device using HierConfig. This function determines the remediating configuration required to bring a device's actual configuration in line with its intended configuration, using the HierConfig library. It performs the following steps: @@ -238,12 +242,386 @@ def _get_hierconfig_remediation(obj): return remediation_config +@dataclass(frozen=True) +class DictKey: + """Dict key dataclass. + + Attrs: + key (Any): The key. + """ + + key: Any + + +class ApiRemediation: # pylint: disable=too-few-public-methods + """Remediation class for controllers.""" + + def __init__( + self, + compliance_obj, + ) -> None: + """Controller remediation. + + Args: + compliance_obj (ConfigCompliance): Golden Config Compliance object. + """ + self.compliance_obj = compliance_obj + self.feature_name: str = compliance_obj.rule.feature.name.lower() + self.intended_config: dict[str, Any] = compliance_obj.intended + self.backup_config: dict[str, Any] = compliance_obj.actual + + def _filter_allowed_params( + self, + feature_name: str, + config: dict[str, Any], + config_context: dict[str, Any] | None, + ) -> dict[str, Any]: + """Filter allowed parameters and remove unwanted parameters. + + Args: + feature_name (str): Compliance feature name. + config (Optional[dict[str, Any]]): Intended or actual config. + config_context (ConfigContext): Device config context. + + Returns: + dict[str, Any]: Filtered config. + """ + if not config_context: + return {} + endpoint_fields: list[str] = [] + for endpoint in config_context: + if not endpoint.get("fields"): + return {} + endpoint_fields.extend(endpoint["fields"]) + + if isinstance(config[feature_name], dict): + valid_payload_config: dict[str, Any] = {feature_name: {}} + for key, value in config[feature_name].items(): + if key in endpoint_fields: + valid_payload_config[feature_name][key] = value + return valid_payload_config + + if isinstance(config[feature_name], list): + valid_payload_config: dict[str, Any] = {feature_name: []} + for item in config[feature_name]: + params_dict = {} + for key, value in item.items(): + if key in endpoint_fields: + params_dict[key] = value + if params_dict: + valid_payload_config[feature_name].append(params_dict) + return valid_payload_config + return {} + + def _process_diff( # pylint: disable=too-many-branches + self, + diff: dict[Any, Any], + path: tuple[str, ...], + value: str, + ) -> None: + """Process the diff. + + Args: + diff (dict[Any, Any]): Diff dictionary. + path (tuple[str, ...]): Path of dictionary keys. + value (str): The key's value. + + Raises: + TypeError: If an unexpected type is encountered. + """ + current = diff + + for i, key in enumerate(path): + is_last = i == len(path) - 1 + next_key = path[i + 1] if not is_last else None + + if isinstance(key, DictKey): + dict_key: Any = key.key + if is_last: + current[dict_key] = value + else: + if dict_key not in current: + current[dict_key] = [] if isinstance(next_key, int) else {} + current = current[dict_key] + elif isinstance(key, (str, float)): + if is_last: + current[key] = value + else: + if key not in current: + current[key] = [] if isinstance(next_key, int) else {} + current = current[key] + + elif isinstance(key, int): + # current must be a list + if not isinstance(current, list): + exc_msg: str = f"Expected list at index {i}, got {type(current)}" + raise TypeError(exc_msg) + while len(current) <= key: + current.append({}) + if is_last: + current[key] = value + else: + if not isinstance(current[key], (dict, list)): + current[key] = [] if isinstance(next_key, int) else {} + current = current[key] + + else: + exc_msg: str = f"Unsupported key type: {key}" + raise TypeError(exc_msg) + + def _dict_config( # pylint: disable=too-many-arguments + self, + intended: dict[Any, Any], + actual: dict[Any, Any], + diff: dict[Any, Any], + path: tuple[Any], + stack: deque[tuple[tuple[str, ...], Any, Any]], + ) -> None: + """Dictionary config. + + Args: + intended (dict[Any, Any]): Intended config. + actual (dict[Any, Any]): Actual config. + diff (dict[Any, Any]): Diff dictionary. + path (tuple[Any]): Path of keys. + stack (deque[Tuple[Tuple[str, ...], Any, Any]]): Stack of tuples. + """ + for key, value in intended.items(): + if isinstance(value, dict): + stack.append( + ( + path + (DictKey(key=key),), + actual.get(key, {}), + value, + ), + ) + self._dict_config( + intended=value, + actual=actual.get(key, {}), + diff=diff, + path=path + (DictKey(key=key),), + stack=stack, + ) + elif isinstance(value, list): + stack.append( + ( + path + (DictKey(key=key),), + actual.get(key, []), + value, + ), + ) + self._list_config( + intended=value, + actual=actual.get(key, []), + diff=diff, + path=path + (DictKey(key=key),), + stack=stack, + ) + elif isinstance(value, (str, int, float, bool)): + if key not in actual: + self._process_diff( + diff=diff, + path=path + (DictKey(key=key),), + value=value, + ) + else: + self._str_int_float_config( + intended=value, + actual=actual.get(key, ""), + diff=diff, + path=path + (DictKey(key=key),), + ) + + def _list_config( # pylint: disable=too-many-arguments + self, + intended: list[Any], + actual: list[Any], + diff: dict[Any, Any], + path: tuple[Any], + stack: deque[tuple[tuple[str, ...], Any, Any]], + ) -> None: + """List config. + + Args: + intended (list[Any]): Intended config. + actual (list[Any]): Actual config. + required_params (list[Any]): Required parameters. + diff (dict[Any, Any]): Diff dictionary. + path (tuple[Any]): Path of keys. + stack (deque[Tuple[Tuple[str, ...], Any, Any]]): Stack of tuples. + """ + for index, intended_item in enumerate(intended): + if index >= len(actual): + self._process_diff( + diff=diff, + path=path + (index,), + value=intended_item, + ) + continue + try: + actual_item = actual[index] + except IndexError: + actual_item = None + + if isinstance(intended_item, dict): + stack.append((path + (index,), actual_item, intended_item)) + self._dict_config( + intended=intended_item, + actual=actual_item if isinstance(actual_item, dict) else {}, + diff=diff, + path=path + (index,), + stack=stack, + ) + elif isinstance(intended_item, list): + stack.append((path + (index,), actual_item, intended_item)) + self._list_config( + intended=intended_item, + actual=actual_item if isinstance(actual_item, list) else [], + diff=diff, + path=path + (index,), + stack=stack, + ) + else: + self._str_int_float_config( + intended=intended_item, + actual=actual_item if isinstance(actual_item, (str, int, float, bool)) else "", + diff=diff, + path=path + (index,), + ) + + def _str_int_float_config( + self, + intended: str, + actual: str, + diff: dict[Any, Any], + path: tuple[Any], + ) -> None: + """Str config. + + Args: + intended (str): Intended config. + actual (str): Actual config. + diff (dict[Any, Any]): Diff dictionary. + path (tuple[Any]): Path of keys. + """ + if actual != intended: + self._process_diff(diff=diff, path=path, value=intended) + + def _clean_diff(self, diff: list[Any] | dict[Any, Any]) -> dict[Any, Any]: + """Recursively remove empty dicts/lists in diff. + + Args: + diff (Union[list[Any], dict[Any, Any]]): Diff dictionary. + + Returns: + dict[Any, Any]: Cleaned diff dictionary. + """ + if isinstance(diff, dict): + cleaned = {} + for k, v in diff.items(): + cleaned_value = self._clean_diff(diff=v) + if cleaned_value not in ({}, [], None): + cleaned[k] = cleaned_value + return cleaned + + if isinstance(diff, list): + cleaned = [self._clean_diff(item) for item in diff] + cleaned = [item for item in cleaned if item not in ({}, [], None)] + return cleaned or [] + + return diff + + def controller_remediation(self) -> str: + """Controller remediation. + + Raises: + ValidationError: Intended or Actual does not have the feature name as the top level key. + + Returns: + str: Remediation config. + """ + config_context: dict[str, Any] = self.compliance_obj.device.get_config_context() + if config_context.get("remediate_full_intended"): + if isinstance(self.intended_config, str): + self.intended_config: dict[Any, Any] = json.loads(self.intended_config) + return json.dumps( + obj=self.intended_config, + indent=4, + ) + intended: list[Any] | dict[Any, Any] = self._filter_allowed_params( + feature_name=self.feature_name, + config=self.intended_config, + config_context=config_context.get(f"{self.feature_name}_remediation"), + ) + actual: list[Any] | dict[Any, Any] = self._filter_allowed_params( + feature_name=self.feature_name, + config=self.backup_config, + config_context=config_context.get(f"{self.feature_name}_remediation"), + ) + if not actual or not intended: + exc_msg: str = "There was no config context passed." + raise ValidationError(exc_msg) + diff: dict[str, Any] = {} + stack: deque[tuple[tuple[str, ...], Any, Any]] = deque() + stack.append((tuple(), actual, intended)) + + while stack: + path, actual, intended = stack.pop() + + if isinstance(actual, dict) and isinstance(intended, dict): + self._dict_config( + intended=intended, + actual=actual, + diff=diff, + path=path, + stack=stack, + ) + + elif isinstance(actual, list) and isinstance(intended, list): + self._list_config( + intended=intended, + actual=actual, + diff=diff, + path=path, + stack=stack, + ) + else: + self._str_int_float_config( + intended=intended, + actual=actual, + diff=diff, + path=path, + ) + + if not diff: + return "" + if not diff.get(self.feature_name): + exc_msg: str = f"No differences found for feature {self.feature_name}." + raise ValidationError(exc_msg) + cleaned_diff: dict[Any, Any] = self._clean_diff(diff=diff) + return json.dumps(cleaned_diff, indent=4) + + +def _get_api_remediation(obj) -> str: + """Generate the remediation configuration for a device using API. + + Args: + obj (ConfigCompliance): The ConfigCompliance instance. + + Returns: + str: The remediation configuration as a string. + """ + json_controller = ApiRemediation(compliance_obj=obj) + return json_controller.controller_remediation() + + # The below maps the provided compliance types FUNC_MAPPER = { ComplianceRuleConfigTypeChoice.TYPE_CLI: _get_cli_compliance, ComplianceRuleConfigTypeChoice.TYPE_JSON: _get_json_compliance, ComplianceRuleConfigTypeChoice.TYPE_XML: _get_xml_compliance, RemediationTypeChoice.TYPE_HIERCONFIG: _get_hierconfig_remediation, + RemediationTypeChoice.TYPE_API: _get_api_remediation, } # The below conditionally add the custom provided compliance type for custom_function, custom_type in CUSTOM_FUNCTIONS.items(): diff --git a/nautobot_golden_config/tests/fixtures/remediation/actual_config.json b/nautobot_golden_config/tests/fixtures/remediation/actual_config.json new file mode 100644 index 000000000..2d3f1ce3e --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/actual_config.json @@ -0,0 +1,7 @@ +{ + "feature": { + "param1": "value1", + "param2": "DIFFERENT", + "param4": "extra" + } +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/config_context.json b/nautobot_golden_config/tests/fixtures/remediation/config_context.json new file mode 100644 index 000000000..b47b33cfe --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/config_context.json @@ -0,0 +1,5 @@ +[ + { + "fields": ["params1", "params2", "params3"] + } +] diff --git a/nautobot_golden_config/tests/fixtures/remediation/intended_config.json b/nautobot_golden_config/tests/fixtures/remediation/intended_config.json new file mode 100644 index 000000000..1b59ae770 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/intended_config.json @@ -0,0 +1,7 @@ +{ + "feature": { + "param1": "value1", + "param2": "value2", + "param3": "value3" + } +} diff --git a/nautobot_golden_config/tests/test_models.py b/nautobot_golden_config/tests/test_models.py index e94bb3c8d..a2d9b2493 100644 --- a/nautobot_golden_config/tests/test_models.py +++ b/nautobot_golden_config/tests/test_models.py @@ -1,6 +1,10 @@ """Unit tests for nautobot_golden_config models.""" -from unittest.mock import patch +import json +import pathlib +from collections import deque +from typing import Any +from unittest.mock import MagicMock, patch from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError @@ -11,10 +15,12 @@ from nautobot_golden_config.choices import RemediationTypeChoice from nautobot_golden_config.models import ( + ApiRemediation, ConfigCompliance, ConfigPlan, ConfigRemove, ConfigReplace, + DictKey, GoldenConfigSetting, RemediationSetting, _get_hierconfig_remediation, @@ -31,6 +37,8 @@ create_saved_queries, ) +# pylint: disable=protected-access + class ConfigComplianceModelTestCase(TestCase): """Test CRUD operations for ConfigCompliance Model.""" @@ -734,3 +742,106 @@ def test_hierconfig_instantiation_error(self, mock_workflow_remediation, mock_ge mock_get_hconfig.assert_called_once() # WorkflowRemediation should never be called since get_hconfig raises exception mock_workflow_remediation.assert_not_called() + + +def load_fixture(filename: str) -> Any: + """Load a JSON fixture from a file. + + Args: + filename (str): Path to the JSON file. + + Returns: + Any: The loaded JSON data. + """ + with pathlib.Path(filename).open(encoding="utf-8") as f: + return json.load(fp=f) + + +class TestApiRemediation(TestCase): + """Test ApiRemediation class.""" + + @classmethod + def setUpClass(cls) -> None: + cls.base_fixtures_path: str = "nautobot_golden_config/tests/fixtures/remediation/" + cls.intended_config = load_fixture(filename=f"{cls.base_fixtures_path}intended_config.json") + cls.actual_config = load_fixture(filename=f"{cls.base_fixtures_path}actual_config.json") + cls.config_context = load_fixture(filename=f"{cls.base_fixtures_path}config_context.json") + super().setUpClass() + + def setUp(self): + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"feature_remediation": self.config_context} + self.compliance_obj = MagicMock() + self.compliance_obj.rule = rule + self.compliance_obj.device = device + self.compliance_obj.intended = self.intended_config + self.compliance_obj.actual = self.actual_config + + def test_process_diff_dictkey(self): + diff = {} + path = (DictKey("foo"),) + value = "bar" + remediation = ApiRemediation(MagicMock()) + remediation._process_diff(diff, path, value) + self.assertEqual(diff["foo"], "bar") + + def test_process_diff_str_key(self): + diff = {} + path = ("foo",) + value = "bar" + remediation = ApiRemediation(MagicMock()) + remediation._process_diff(diff, path, value) + self.assertEqual(diff["foo"], "bar") + + def test_process_diff_int_key(self): + diff = [] + path = (0,) + value = "bar" + remediation = ApiRemediation(MagicMock()) + remediation._process_diff(diff, path, value) + self.assertEqual(diff[0], "bar") + + def test_dict_config(self): + remediation = ApiRemediation(MagicMock()) + intended = {"foo": {"bar": 1}} + actual = {"foo": {}} + diff = {} + stack = deque() + remediation._dict_config(intended, actual, diff, tuple(), stack) + self.assertEqual(diff["foo"]["bar"], 1) + + def test_list_config(self): + remediation = ApiRemediation(MagicMock()) + intended = [{"bar": 1}] + actual = [{}] + diff = [] + stack = deque() + remediation._list_config(intended, actual, diff, tuple(), stack) + self.assertEqual(diff[0]["bar"], 1) + + def test_str_int_float_config(self): + remediation = ApiRemediation(MagicMock()) + diff = {} + remediation._str_int_float_config("foo", "bar", diff, ("baz",)) + self.assertEqual(diff["baz"], "foo") + + def test_clean_diff(self): + remediation = ApiRemediation(MagicMock()) + diff = {"foo": {}, "bar": {"baz": "qux"}, "empty": []} + cleaned = remediation._clean_diff(diff) + self.assertNotIn("foo", cleaned) + self.assertIn("bar", cleaned) + self.assertNotIn("empty", cleaned) + + def test_controller_remediation_no_context(self): + compliance_obj = MagicMock() + compliance_obj.rule.feature.name = "feature" + compliance_obj.intended = {"feature": {}} + compliance_obj.actual = {"feature": {}} + compliance_obj.device.get_config_context.return_value = {} + remediation = ApiRemediation(compliance_obj) + with self.assertRaises(Exception): + remediation.controller_remediation() From 4192390d4bd130746ff88e238cc55f0f97501aa9 Mon Sep 17 00:00:00 2001 From: huacchob Date: Wed, 14 Jan 2026 12:41:56 -0500 Subject: [PATCH 2/2] updating ApiRemediation to use the deepdiff library instead of custom logic --- nautobot_golden_config/models.py | 482 +++++++++--------- .../fixtures/remediation/actual_config.json | 7 - .../fixtures/remediation/config_context.json | 5 - .../remediation/dict_actual_config.json | 37 ++ .../remediation/dict_config_context.json | 14 + .../remediation/dict_intended_config.json | 32 ++ .../fixtures/remediation/intended_config.json | 7 - .../remediation/list_actual_config.json | 37 ++ .../remediation/list_config_context.json | 5 + .../remediation/list_intended_config.json | 37 ++ nautobot_golden_config/tests/test_models.py | 236 ++++++--- 11 files changed, 556 insertions(+), 343 deletions(-) delete mode 100644 nautobot_golden_config/tests/fixtures/remediation/actual_config.json delete mode 100644 nautobot_golden_config/tests/fixtures/remediation/config_context.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json delete mode 100644 nautobot_golden_config/tests/fixtures/remediation/intended_config.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/list_config_context.json create mode 100644 nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json diff --git a/nautobot_golden_config/models.py b/nautobot_golden_config/models.py index 1fd0f990c..e84bdf016 100644 --- a/nautobot_golden_config/models.py +++ b/nautobot_golden_config/models.py @@ -3,7 +3,6 @@ import json import logging import os -from collections import deque from dataclasses import dataclass from typing import Any @@ -242,46 +241,109 @@ def _get_hierconfig_remediation(obj): return remediation_config -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class DictKey: """Dict key dataclass. + A wrapper class for dictionary keys to enable custom behavior or identification. + Primarily used to distinguish dictionary keys with list indexes. + Attrs: - key (Any): The key. + key (Any): The dictionary key. """ key: Any + def __str__(self) -> str: + """Return the string representation of the key. + + Returns: + str: The string representation of the key. + """ + return str(self.key) + + def __repr__(self) -> str: + """Return the official string representation of the DictKey. + + Returns: + str: The official string representation of the DictKey. + """ + return f"DictKey({self.key!r})" + + +def _wrap_dict_keys(obj: list[Any] | dict[Any, Any]) -> list[Any] | dict[Any, Any]: + """Recursively walk dicts and lists and wrap *only* dict keys as DictKey(key=). + + Scalars are returned unchanged. + + Args: + obj (list[Any] | dict[Any, Any]): The object to wrap. + + Returns: + list[Any] | dict[Any, Any]: The object with dict keys wrapped as DictKey, or the original scalar. + """ + if isinstance(obj, dict): + return {DictKey(k): _wrap_dict_keys(v) for k, v in obj.items()} + + if isinstance(obj, list): + return [_wrap_dict_keys(x) for x in obj] + return obj + + +def _create_deepdiff_object(actual: list[Any] | dict[Any, Any], intended: list[Any] | dict[Any, Any]) -> DeepDiff: + """Create a DeepDiff object. + + Args: + actual (list[Any] | dict[Any, Any]): Actual configuration. + intended (list[Any] | dict[Any, Any]): Intended configuration. + + Returns: + DeepDiff: DeepDiff object representing the differences. + """ + dd = DeepDiff( + t1=actual, + t2=intended, + view="tree", + verbose_level=2, + ) + return dd + class ApiRemediation: # pylint: disable=too-few-public-methods """Remediation class for controllers.""" + _DEEPCONFIG_CHANGE_TYPES: tuple[str, ...] = ( + "values_changed", + "dictionary_item_added", + "iterable_item_added", + ) + def __init__( self, compliance_obj, ) -> None: - """Controller remediation. + """Initialize controller remediation. Args: compliance_obj (ConfigCompliance): Golden Config Compliance object. """ self.compliance_obj = compliance_obj self.feature_name: str = compliance_obj.rule.feature.name.lower() - self.intended_config: dict[str, Any] = compliance_obj.intended - self.backup_config: dict[str, Any] = compliance_obj.actual + self.intended_config: dict[str, Any] | str = compliance_obj.intended + self.backup_config: dict[str, Any] | str = compliance_obj.actual - def _filter_allowed_params( + def _filter_allowed_params( # pylint: disable=too-many-branches self, feature_name: str, config: dict[str, Any], - config_context: dict[str, Any] | None, + config_context: list[dict[str, Any]] | None, ) -> dict[str, Any]: """Filter allowed parameters and remove unwanted parameters. Args: feature_name (str): Compliance feature name. - config (Optional[dict[str, Any]]): Intended or actual config. - config_context (ConfigContext): Device config context. + config (dict[str, Any]): Intended or actual config. + config_context (list[dict[str, Any]] | None): Device config context. Returns: dict[str, Any]: Filtered config. @@ -294,16 +356,23 @@ def _filter_allowed_params( return {} endpoint_fields.extend(endpoint["fields"]) - if isinstance(config[feature_name], dict): + feature_value: Any = config.get(feature_name) + + if feature_value is None: + return {} + + if isinstance(feature_value, dict): valid_payload_config: dict[str, Any] = {feature_name: {}} - for key, value in config[feature_name].items(): + for key, value in feature_value.items(): if key in endpoint_fields: valid_payload_config[feature_name][key] = value return valid_payload_config - if isinstance(config[feature_name], list): + if isinstance(feature_value, list): valid_payload_config: dict[str, Any] = {feature_name: []} - for item in config[feature_name]: + for item in feature_value: + if not isinstance(item, dict): + continue params_dict = {} for key, value in item.items(): if key in endpoint_fields: @@ -313,241 +382,178 @@ def _filter_allowed_params( return valid_payload_config return {} + def _prune_empty_containers(self, obj: list[Any] | dict[Any, Any]) -> list[Any] | dict[Any, Any]: + """Recursively remove empty dicts/lists while preserving None. + + This is appropriate for API payload generation where `None` (JSON null) is a valid value. + + Args: + obj (list[Any] | dict[Any, Any]): Arbitrary nested structure. + + Returns: + list[Any] | dict[Any, Any]: Structure with empty dict/list containers removed; None preserved. + """ + if isinstance(obj, dict): + cleaned: dict[Any, Any] = {} + for k, v in obj.items(): + cv = self._prune_empty_containers(v) + if cv not in ({}, []): + cleaned[k] = cv + return cleaned + + if isinstance(obj, list): + cleaned_list = [self._prune_empty_containers(x) for x in obj] + cleaned_list = [x for x in cleaned_list if x not in ({}, [])] + return cleaned_list or [] + + return obj + def _process_diff( # pylint: disable=too-many-branches self, diff: dict[Any, Any], - path: tuple[str, ...], - value: str, + path: tuple[Any, ...], + value: Any, ) -> None: - """Process the diff. + """Populate a nested delta structure given a DeepDiff path and value. + + This method walks the supplied path tokens and creates intermediate dicts/lists + as required, then sets the final leaf node to `value`. Args: - diff (dict[Any, Any]): Diff dictionary. - path (tuple[str, ...]): Path of dictionary keys. - value (str): The key's value. + diff (dict[Any, Any]): Delta dictionary being populated. + path (tuple[Any, ...]): Path tokens (DictKey, str, or int list indices). + value (Any): Value to assign at the leaf path. Raises: - TypeError: If an unexpected type is encountered. + TypeError: If an unexpected container type is encountered during traversal. """ - current = diff + cur: Any = diff - for i, key in enumerate(path): - is_last = i == len(path) - 1 - next_key = path[i + 1] if not is_last else None + for i, raw_key in enumerate(path): + is_last: bool = i == len(path) - 1 + next_key = None if is_last else path[i + 1] + + key = raw_key.key if isinstance(raw_key, DictKey) else raw_key + + if isinstance(key, int): + if not isinstance(cur, list): + err_msg: str = f"Expected list at path[{i}] (index {key}), got {type(cur)}" + raise TypeError(err_msg) + + while len(cur) <= key: + cur.append(None) - if isinstance(key, DictKey): - dict_key: Any = key.key - if is_last: - current[dict_key] = value - else: - if dict_key not in current: - current[dict_key] = [] if isinstance(next_key, int) else {} - current = current[dict_key] - elif isinstance(key, (str, float)): if is_last: - current[key] = value - else: - if key not in current: - current[key] = [] if isinstance(next_key, int) else {} - current = current[key] - - elif isinstance(key, int): - # current must be a list - if not isinstance(current, list): - exc_msg: str = f"Expected list at index {i}, got {type(current)}" - raise TypeError(exc_msg) - while len(current) <= key: - current.append({}) + cur[key] = value + return + + if cur[key] is None or not isinstance(cur[key], (dict, list)): + cur[key] = [] if isinstance(next_key, int) else {} + + cur = cur[key] + continue + + if isinstance(key, (str, float)): + if not isinstance(cur, dict): + err_msg: str = f"Expected dict at path[{i}] (key {key!r}), got {type(cur)}" + raise TypeError(err_msg) + if is_last: - current[key] = value - else: - if not isinstance(current[key], (dict, list)): - current[key] = [] if isinstance(next_key, int) else {} - current = current[key] + cur[key] = value + return - else: - exc_msg: str = f"Unsupported key type: {key}" - raise TypeError(exc_msg) + if key not in cur or not isinstance(cur[key], (dict, list)): + cur[key] = [] if isinstance(next_key, int) else {} - def _dict_config( # pylint: disable=too-many-arguments - self, - intended: dict[Any, Any], - actual: dict[Any, Any], - diff: dict[Any, Any], - path: tuple[Any], - stack: deque[tuple[tuple[str, ...], Any, Any]], - ) -> None: - """Dictionary config. + cur = cur[key] + continue - Args: - intended (dict[Any, Any]): Intended config. - actual (dict[Any, Any]): Actual config. - diff (dict[Any, Any]): Diff dictionary. - path (tuple[Any]): Path of keys. - stack (deque[Tuple[Tuple[str, ...], Any, Any]]): Stack of tuples. - """ - for key, value in intended.items(): - if isinstance(value, dict): - stack.append( - ( - path + (DictKey(key=key),), - actual.get(key, {}), - value, - ), - ) - self._dict_config( - intended=value, - actual=actual.get(key, {}), - diff=diff, - path=path + (DictKey(key=key),), - stack=stack, - ) - elif isinstance(value, list): - stack.append( - ( - path + (DictKey(key=key),), - actual.get(key, []), - value, - ), - ) - self._list_config( - intended=value, - actual=actual.get(key, []), - diff=diff, - path=path + (DictKey(key=key),), - stack=stack, - ) - elif isinstance(value, (str, int, float, bool)): - if key not in actual: - self._process_diff( - diff=diff, - path=path + (DictKey(key=key),), - value=value, - ) - else: - self._str_int_float_config( - intended=value, - actual=actual.get(key, ""), - diff=diff, - path=path + (DictKey(key=key),), - ) - - def _list_config( # pylint: disable=too-many-arguments - self, - intended: list[Any], - actual: list[Any], - diff: dict[Any, Any], - path: tuple[Any], - stack: deque[tuple[tuple[str, ...], Any, Any]], - ) -> None: - """List config. + err_msg: str = f"Unsupported key type at path[{i}]: {type(key)} ({key!r})" + raise TypeError(err_msg) + + def _apply_deepdiff_changes(self, delta: dict[Any, Any], changes: list[Any]) -> None: + """Apply DeepDiff change objects onto the delta payload using intended-side values (t2). Args: - intended (list[Any]): Intended config. - actual (list[Any]): Actual config. - required_params (list[Any]): Required parameters. - diff (dict[Any, Any]): Diff dictionary. - path (tuple[Any]): Path of keys. - stack (deque[Tuple[Tuple[str, ...], Any, Any]]): Stack of tuples. + delta (dict[Any, Any]): Delta payload being constructed. + changes (list[Any]): DeepDiff tree change objects. """ - for index, intended_item in enumerate(intended): - if index >= len(actual): - self._process_diff( - diff=diff, - path=path + (index,), - value=intended_item, - ) + for change in changes: + if not hasattr(change, "t2"): continue + + if not hasattr(change, "path"): + continue + try: - actual_item = actual[index] - except IndexError: - actual_item = None - - if isinstance(intended_item, dict): - stack.append((path + (index,), actual_item, intended_item)) - self._dict_config( - intended=intended_item, - actual=actual_item if isinstance(actual_item, dict) else {}, - diff=diff, - path=path + (index,), - stack=stack, - ) - elif isinstance(intended_item, list): - stack.append((path + (index,), actual_item, intended_item)) - self._list_config( - intended=intended_item, - actual=actual_item if isinstance(actual_item, list) else [], - diff=diff, - path=path + (index,), - stack=stack, - ) - else: - self._str_int_float_config( - intended=intended_item, - actual=actual_item if isinstance(actual_item, (str, int, float, bool)) else "", - diff=diff, - path=path + (index,), - ) + tokens = change.path(output_format="list") + except (TypeError, AttributeError): + continue - def _str_int_float_config( - self, - intended: str, - actual: str, - diff: dict[Any, Any], - path: tuple[Any], - ) -> None: - """Str config. + if tokens and tokens[0] == "root": + tokens = tokens[1:] - Args: - intended (str): Intended config. - actual (str): Actual config. - diff (dict[Any, Any]): Diff dictionary. - path (tuple[Any]): Path of keys. - """ - if actual != intended: - self._process_diff(diff=diff, path=path, value=intended) + self._process_diff( + diff=delta, + path=tuple(tokens), + value=change.t2, + ) + + def _clean_diff(self, diff: Any) -> list[Any] | dict[Any, Any]: + """Convert DeepDiff(tree) into an intended-shaped delta, then prune empty containers. + + If `diff` resembles DeepDiff(tree) output, a delta is built from intended-side values (t2). + Otherwise, the object is only pruned for empty dict/list containers. - def _clean_diff(self, diff: list[Any] | dict[Any, Any]) -> dict[Any, Any]: - """Recursively remove empty dicts/lists in diff. + None values are preserved because they are meaningful in API payloads. Args: - diff (Union[list[Any], dict[Any, Any]]): Diff dictionary. + diff (Any): DeepDiff(tree) result or empty dictionary. Returns: - dict[Any, Any]: Cleaned diff dictionary. + list[Any] | dict[Any, Any]: Intended-shaped delta with empty containers removed (None preserved). """ - if isinstance(diff, dict): - cleaned = {} - for k, v in diff.items(): - cleaned_value = self._clean_diff(diff=v) - if cleaned_value not in ({}, [], None): - cleaned[k] = cleaned_value - return cleaned + if not isinstance(diff, dict): + return self._prune_empty_containers(obj=diff) + + if not any(k in diff for k in self._DEEPCONFIG_CHANGE_TYPES): + return self._prune_empty_containers(obj=diff) - if isinstance(diff, list): - cleaned = [self._clean_diff(item) for item in diff] - cleaned = [item for item in cleaned if item not in ({}, [], None)] - return cleaned or [] + delta: dict[Any, Any] = {} + for change_type in self._DEEPCONFIG_CHANGE_TYPES: + self._apply_deepdiff_changes(delta=delta, changes=diff.get(change_type, []) or []) + return self._prune_empty_containers(obj=delta) - return diff + def api_remediation(self) -> str: + """Generate the remediation payload for the current feature. - def controller_remediation(self) -> str: - """Controller remediation. + Workflow: + 1. Load device config context and honor the "remediate_full_intended" flag. + 2. Filter intended/actual configs down to the fields allowed by the remediation context. + 3. Wrap dictionary keys with DictKey to preserve dict-key semantics in DeepDiff paths. + 4. Compute DeepDiff(tree) between actual (t1) and intended (t2). + 5. Convert DeepDiff output into an intended-shaped delta via `_clean_diff`. + 6. Return the remediation payload as JSON. Raises: - ValidationError: Intended or Actual does not have the feature name as the top level key. + ValidationError: If remediation context is missing or intended/actual cannot be filtered. Returns: - str: Remediation config. + str: JSON remediation payload (delta), or an empty string when there are no differences. """ config_context: dict[str, Any] = self.compliance_obj.device.get_config_context() - if config_context.get("remediate_full_intended"): + try: + if isinstance(self.backup_config, str): + self.backup_config = json.loads(self.backup_config) if isinstance(self.intended_config, str): - self.intended_config: dict[Any, Any] = json.loads(self.intended_config) - return json.dumps( - obj=self.intended_config, - indent=4, - ) + self.intended_config = json.loads(self.intended_config) + except json.JSONDecodeError as exc: + err_msg: str = f"Invalid JSON config: {exc}" + raise ValidationError(err_msg) from exc + + if config_context.get("remediate_full_intended"): + return json.dumps(obj=self.intended_config, indent=4) + intended: list[Any] | dict[Any, Any] = self._filter_allowed_params( feature_name=self.feature_name, config=self.intended_config, @@ -558,47 +564,27 @@ def controller_remediation(self) -> str: config=self.backup_config, config_context=config_context.get(f"{self.feature_name}_remediation"), ) + if not actual or not intended: - exc_msg: str = "There was no config context passed." - raise ValidationError(exc_msg) - diff: dict[str, Any] = {} - stack: deque[tuple[tuple[str, ...], Any, Any]] = deque() - stack.append((tuple(), actual, intended)) - - while stack: - path, actual, intended = stack.pop() - - if isinstance(actual, dict) and isinstance(intended, dict): - self._dict_config( - intended=intended, - actual=actual, - diff=diff, - path=path, - stack=stack, - ) + err_msg: str = "There was no config context fields that matched the intended or actual configuration." + raise ValidationError(err_msg) - elif isinstance(actual, list) and isinstance(intended, list): - self._list_config( - intended=intended, - actual=actual, - diff=diff, - path=path, - stack=stack, - ) - else: - self._str_int_float_config( - intended=intended, - actual=actual, - diff=diff, - path=path, - ) + dict_key_intended: list[Any] | dict[Any, Any] = _wrap_dict_keys(obj=intended) + dict_key_actual: list[Any] | dict[Any, Any] = _wrap_dict_keys(obj=actual) - if not diff: + dd: DeepDiff = _create_deepdiff_object( + actual=dict_key_actual, + intended=dict_key_intended, + ) + + if not dd: + return "" + + cleaned_diff: list[Any] | dict[Any, Any] = self._clean_diff(diff=dd) + + if not cleaned_diff: return "" - if not diff.get(self.feature_name): - exc_msg: str = f"No differences found for feature {self.feature_name}." - raise ValidationError(exc_msg) - cleaned_diff: dict[Any, Any] = self._clean_diff(diff=diff) + return json.dumps(cleaned_diff, indent=4) @@ -612,7 +598,7 @@ def _get_api_remediation(obj) -> str: str: The remediation configuration as a string. """ json_controller = ApiRemediation(compliance_obj=obj) - return json_controller.controller_remediation() + return json_controller.api_remediation() # The below maps the provided compliance types diff --git a/nautobot_golden_config/tests/fixtures/remediation/actual_config.json b/nautobot_golden_config/tests/fixtures/remediation/actual_config.json deleted file mode 100644 index 2d3f1ce3e..000000000 --- a/nautobot_golden_config/tests/fixtures/remediation/actual_config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "feature": { - "param1": "value1", - "param2": "DIFFERENT", - "param4": "extra" - } -} diff --git a/nautobot_golden_config/tests/fixtures/remediation/config_context.json b/nautobot_golden_config/tests/fixtures/remediation/config_context.json deleted file mode 100644 index b47b33cfe..000000000 --- a/nautobot_golden_config/tests/fixtures/remediation/config_context.json +++ /dev/null @@ -1,5 +0,0 @@ -[ - { - "fields": ["params1", "params2", "params3"] - } -] diff --git a/nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json b/nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json new file mode 100644 index 000000000..12e4fa117 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/dict_actual_config.json @@ -0,0 +1,37 @@ +{ + "feature": { + "param1": "value1", + "param2": "DIFFERENT", + "param4": "extra", + "nested_dict": { + "a": 1, + "b": false, + "c": null, + "d": 2.71, + "e": [ + { "x": "foo", "y": 10 }, + { "x": "baz", "y": 99 } + ] + }, + "nested_list": [ + { + "id": 1, + "values": [1, 2, 3], + "flag": false + }, + { + "id": 2, + "values": [4, 5], + "flag": false + }, + { + "id": 3, + "values": [], + "flag": true + } + ], + "scalar_none": null, + "scalar_float": 2.718, + "scalar_bool": true + } +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json b/nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json new file mode 100644 index 000000000..126b840fb --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/dict_config_context.json @@ -0,0 +1,14 @@ +[ + { + "fields": [ + "param1", + "param2", + "param3", + "nested_dict", + "nested_list", + "scalar_none", + "scalar_float", + "scalar_bool" + ] + } +] diff --git a/nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json b/nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json new file mode 100644 index 000000000..47665b04a --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/dict_intended_config.json @@ -0,0 +1,32 @@ +{ + "feature": { + "param1": "value1", + "param2": "value2", + "param3": "value3", + "nested_dict": { + "a": 1, + "b": true, + "c": null, + "d": 3.14, + "e": [ + { "x": "foo", "y": 10 }, + { "x": "bar", "y": 20 } + ] + }, + "nested_list": [ + { + "id": 1, + "values": [1, 2, 3], + "flag": false + }, + { + "id": 2, + "values": [4, 5, 6], + "flag": true + } + ], + "scalar_none": null, + "scalar_float": 2.718, + "scalar_bool": false + } +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/intended_config.json b/nautobot_golden_config/tests/fixtures/remediation/intended_config.json deleted file mode 100644 index 1b59ae770..000000000 --- a/nautobot_golden_config/tests/fixtures/remediation/intended_config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "feature": { - "param1": "value1", - "param2": "value2", - "param3": "value3" - } -} diff --git a/nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json b/nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json new file mode 100644 index 000000000..67169da03 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/list_actual_config.json @@ -0,0 +1,37 @@ +{ + "feature": [ + { + "id": 1, + "name": "alpha", + "active": false, + "values": [1, 2], + "meta": { + "score": 9.5, + "tags": ["a", "b", "x"], + "extra": null + } + }, + { + "id": 2, + "name": "beta", + "active": false, + "values": [4, 5], + "meta": { + "score": 6.5, + "tags": ["c", "d"], + "extra": null + } + }, + { + "id": 4, + "name": "delta", + "active": true, + "values": [7, 8], + "meta": { + "score": 1.0, + "tags": ["z"], + "extra": "new" + } + } + ] +} diff --git a/nautobot_golden_config/tests/fixtures/remediation/list_config_context.json b/nautobot_golden_config/tests/fixtures/remediation/list_config_context.json new file mode 100644 index 000000000..e5cc1a070 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/list_config_context.json @@ -0,0 +1,5 @@ +[ + { + "fields": ["id", "name", "active", "values", "meta"] + } +] diff --git a/nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json b/nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json new file mode 100644 index 000000000..144623da8 --- /dev/null +++ b/nautobot_golden_config/tests/fixtures/remediation/list_intended_config.json @@ -0,0 +1,37 @@ +{ + "feature": [ + { + "id": 1, + "name": "alpha", + "active": true, + "values": [1, 2, 3], + "meta": { + "score": 9.5, + "tags": ["a", "b"], + "extra": null + } + }, + { + "id": 2, + "name": "beta", + "active": false, + "values": [4, 5, 6], + "meta": { + "score": 7.0, + "tags": ["c"], + "extra": "something" + } + }, + { + "id": 3, + "name": "gamma", + "active": true, + "values": [], + "meta": { + "score": 0.0, + "tags": [], + "extra": null + } + } + ] +} diff --git a/nautobot_golden_config/tests/test_models.py b/nautobot_golden_config/tests/test_models.py index a2d9b2493..3aa4ee3eb 100644 --- a/nautobot_golden_config/tests/test_models.py +++ b/nautobot_golden_config/tests/test_models.py @@ -2,7 +2,6 @@ import json import pathlib -from collections import deque from typing import Any from unittest.mock import MagicMock, patch @@ -23,7 +22,9 @@ DictKey, GoldenConfigSetting, RemediationSetting, + _create_deepdiff_object, _get_hierconfig_remediation, + _wrap_dict_keys, ) from nautobot_golden_config.tests.conftest import create_git_repos @@ -757,15 +758,53 @@ def load_fixture(filename: str) -> Any: return json.load(fp=f) +class TestDictKey(TestCase): + """Test cases for the DictKey class.""" + + def test_str_and_repr(self): + k = DictKey("foo") + self.assertEqual(str(k), "foo") + self.assertEqual(repr(k), "DictKey('foo')") + + +class TestWrapDictKeys(TestCase): + """Test cases for the _wrap_dict_keys function.""" + + def test_wrap_dict_keys(self): + obj = {"a": 1, "b": {"c": 2}, "d": [3, {"e": 4}]} + wrapped = _wrap_dict_keys(obj) + # Check top-level keys are DictKey + self.assertTrue(all(isinstance(k, DictKey) for k in wrapped.keys())) + # Check nested dict keys are DictKey + self.assertTrue(all(isinstance(k, DictKey) for k in wrapped[DictKey("b")].keys())) + # Check list elements are preserved + self.assertEqual(wrapped[DictKey("d")][0], 3) + self.assertTrue(isinstance(wrapped[DictKey("d")][1], dict)) + self.assertTrue(isinstance(list(wrapped[DictKey("d")][1].keys())[0], DictKey)) + + +class TestCreateDeepDiffObject(TestCase): + """Test cases for the _create_deepdiff_object function.""" + + def test_deepdiff_object(self): + a = {"foo": 1, "bar": 2} + b = {"foo": 1, "bar": 3} + dd = _create_deepdiff_object(a, b) + self.assertIn("values_changed", dd) + + class TestApiRemediation(TestCase): - """Test ApiRemediation class.""" + """Test ApiRemediation class using mocks and fixture files.""" @classmethod - def setUpClass(cls) -> None: - cls.base_fixtures_path: str = "nautobot_golden_config/tests/fixtures/remediation/" - cls.intended_config = load_fixture(filename=f"{cls.base_fixtures_path}intended_config.json") - cls.actual_config = load_fixture(filename=f"{cls.base_fixtures_path}actual_config.json") - cls.config_context = load_fixture(filename=f"{cls.base_fixtures_path}config_context.json") + def setUpClass(cls): + cls.base_fixtures_path = "nautobot_golden_config/tests/fixtures/remediation/" + cls.dict_intended_config = load_fixture(filename=f"{cls.base_fixtures_path}dict_intended_config.json") + cls.dict_actual_config = load_fixture(filename=f"{cls.base_fixtures_path}dict_actual_config.json") + cls.list_intended_config = load_fixture(filename=f"{cls.base_fixtures_path}list_intended_config.json") + cls.list_actual_config = load_fixture(filename=f"{cls.base_fixtures_path}list_actual_config.json") + cls.dict_config_context = load_fixture(filename=f"{cls.base_fixtures_path}dict_config_context.json") + cls.list_config_context = load_fixture(filename=f"{cls.base_fixtures_path}list_config_context.json") super().setUpClass() def setUp(self): @@ -773,75 +812,120 @@ def setUp(self): rule.feature.name = "feature" rule.config_type = "json" device = MagicMock() - device.get_config_context.return_value = {"feature_remediation": self.config_context} - self.compliance_obj = MagicMock() - self.compliance_obj.rule = rule - self.compliance_obj.device = device - self.compliance_obj.intended = self.intended_config - self.compliance_obj.actual = self.actual_config - - def test_process_diff_dictkey(self): - diff = {} - path = (DictKey("foo"),) - value = "bar" - remediation = ApiRemediation(MagicMock()) - remediation._process_diff(diff, path, value) - self.assertEqual(diff["foo"], "bar") - - def test_process_diff_str_key(self): - diff = {} - path = ("foo",) - value = "bar" - remediation = ApiRemediation(MagicMock()) - remediation._process_diff(diff, path, value) - self.assertEqual(diff["foo"], "bar") - - def test_process_diff_int_key(self): - diff = [] - path = (0,) - value = "bar" - remediation = ApiRemediation(MagicMock()) - remediation._process_diff(diff, path, value) - self.assertEqual(diff[0], "bar") - - def test_dict_config(self): - remediation = ApiRemediation(MagicMock()) - intended = {"foo": {"bar": 1}} - actual = {"foo": {}} - diff = {} - stack = deque() - remediation._dict_config(intended, actual, diff, tuple(), stack) - self.assertEqual(diff["foo"]["bar"], 1) - - def test_list_config(self): - remediation = ApiRemediation(MagicMock()) - intended = [{"bar": 1}] - actual = [{}] - diff = [] - stack = deque() - remediation._list_config(intended, actual, diff, tuple(), stack) - self.assertEqual(diff[0]["bar"], 1) - - def test_str_int_float_config(self): - remediation = ApiRemediation(MagicMock()) - diff = {} - remediation._str_int_float_config("foo", "bar", diff, ("baz",)) - self.assertEqual(diff["baz"], "foo") + device.get_config_context.return_value = {"feature_remediation": self.dict_config_context} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.dict_intended_config + compliance_obj.actual = self.dict_actual_config + self.compliance_obj = compliance_obj + self.api = ApiRemediation(self.compliance_obj) + + def test_dict_remediation_delta(self): + api = ApiRemediation(compliance_obj=self.compliance_obj) + payload = api.api_remediation() + self.assertTrue(isinstance(payload, str)) + if payload: + data = json.loads(payload) + self.assertIn("feature", data) + self.assertIsInstance(data["feature"], dict) + + def test_list_remediation_delta(self): + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"feature_remediation": self.list_config_context} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.list_intended_config + compliance_obj.actual = self.list_actual_config + self.compliance_obj = compliance_obj + api = ApiRemediation(compliance_obj=self.compliance_obj) + payload = api.api_remediation() + self.assertTrue(isinstance(payload, str)) + if payload: + data = json.loads(payload) + self.assertIn("feature", data) + self.assertIsInstance(data["feature"], list) def test_clean_diff(self): - remediation = ApiRemediation(MagicMock()) - diff = {"foo": {}, "bar": {"baz": "qux"}, "empty": []} - cleaned = remediation._clean_diff(diff) - self.assertNotIn("foo", cleaned) - self.assertIn("bar", cleaned) - self.assertNotIn("empty", cleaned) - - def test_controller_remediation_no_context(self): + a = {"feature": {"x": 1, "y": 0}} + b = {"feature": {"x": 1, "y": 2}} + dd = _create_deepdiff_object(a, b) + cleaned = self.api._clean_diff(dd) + self.assertIn("feature", cleaned) + self.assertIn("y", cleaned["feature"]) + + def test_api_remediation_delta(self): + payload = self.api.api_remediation() + # Should be a JSON string with only changed fields + if payload: + data = json.loads(payload) + self.assertIn("feature", data) + # The actual changed fields depend on the fixture content + self.assertIsInstance(data["feature"], dict) + + def test_dict_remediation_full_intended(self): + # If remediate_full_intended is True, should return full intended config + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"remediate_full_intended": True} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.dict_intended_config + compliance_obj.actual = self.dict_actual_config + api = ApiRemediation(compliance_obj) + payload = api.api_remediation() + self.assertEqual(json.loads(payload), self.dict_intended_config) + + def test_list_remediation_full_intended(self): + # If remediate_full_intended is True, should return full intended config + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"remediate_full_intended": True} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = self.list_intended_config + compliance_obj.actual = self.list_actual_config + api = ApiRemediation(compliance_obj) + payload = api.api_remediation() + self.assertEqual(json.loads(payload), self.list_intended_config) + + def test_api_remediation_no_context(self): + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {} + compliance_obj = MagicMock() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = {"feature": {"x": 1}} + compliance_obj.actual = {"feature": {"x": 0}} + api = ApiRemediation(compliance_obj) + with self.assertRaises(ValidationError): + api.api_remediation() + + def test_api_remediation_no_diff(self): + # Should return empty string if no diff + rule = MagicMock() + rule.feature.name = "feature" + rule.config_type = "json" + device = MagicMock() + device.get_config_context.return_value = {"feature_remediation": self.dict_config_context} compliance_obj = MagicMock() - compliance_obj.rule.feature.name = "feature" - compliance_obj.intended = {"feature": {}} - compliance_obj.actual = {"feature": {}} - compliance_obj.device.get_config_context.return_value = {} - remediation = ApiRemediation(compliance_obj) - with self.assertRaises(Exception): - remediation.controller_remediation() + compliance_obj.rule = rule + compliance_obj.device = device + compliance_obj.intended = {"feature": {"x": 1}} + compliance_obj.actual = {"feature": {"x": 1}} + api = ApiRemediation(compliance_obj) + payload = api.api_remediation() + self.assertEqual(payload, "")