diff --git a/geonode/base/api/tests.py b/geonode/base/api/tests.py index 43d37b5663e..61f3375a0f1 100644 --- a/geonode/base/api/tests.py +++ b/geonode/base/api/tests.py @@ -1301,6 +1301,87 @@ def test_perms_resources(self): response = self.client.put(set_perms_url, data=resource_perm_spec, format="json") self.assertEqual(response.status_code, 200) + @override_settings( + EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=True, + EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=True, + ) + def test_resource_service_permissions_patch_merges_permissions(self): + self.assertTrue(self.client.login(username="admin", password="admin")) + admin = get_user_model().objects.get(username="admin") + bobby = get_user_model().objects.get(username="bobby") + norman = get_user_model().objects.get(username="norman") + resource = resource_manager.create( + str(uuid4()), resource_type=Dataset, defaults={"title": "api_perms_patch_merge", "owner": admin} + ) + resource.set_permissions({"users": {bobby: ["base.view_resourcebase"]}}) + + set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions") + response = self.client.patch( + set_perms_url, + data={ + "uuid": resource.uuid, + "users": [{"id": norman.id, "permissions": "edit"}], + }, + format="json", + ) + self.assertEqual(response.status_code, 200) + resp_js = json.loads(response.content.decode("utf-8")) + resouce_service_dispatcher.apply((resp_js.get("execution_id"),)) + response = self.client.get(resp_js.get("status_url")) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json().get("status"), "finished") + + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + user_permissions = {u["username"]: u["permissions"] for u in response.data.get("users", [])} + self.assertEqual(user_permissions.get("bobby"), "view") + self.assertEqual(user_permissions.get("norman"), "edit") + + @override_settings( + EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=True, + EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=True, + ) + def test_resource_service_permissions_put_replaces_permissions(self): + self.assertTrue(self.client.login(username="admin", password="admin")) + admin = get_user_model().objects.get(username="admin") + bobby = get_user_model().objects.get(username="bobby") + norman = get_user_model().objects.get(username="norman") + resource = resource_manager.create( + str(uuid4()), resource_type=Dataset, defaults={"title": "api_perms_put_replace", "owner": admin} + ) + resource.set_permissions( + { + "users": { + bobby: ["base.view_resourcebase"], + norman: ["base.view_resourcebase"], + } + } + ) + + set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions") + response = self.client.put( + set_perms_url, + data={ + "uuid": resource.uuid, + "users": [{"id": norman.id, "permissions": "edit"}], + "organizations": [], + "groups": [], + }, + format="json", + ) + self.assertEqual(response.status_code, 200) + resp_js = json.loads(response.content.decode("utf-8")) + resouce_service_dispatcher.apply((resp_js.get("execution_id"),)) + response = self.client.get(resp_js.get("status_url")) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json().get("status"), "finished") + + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + user_permissions = {u["username"]: u["permissions"] for u in response.data.get("users", [])} + self.assertNotIn("bobby", user_permissions) + self.assertEqual(user_permissions.get("norman"), "edit") + def test_featured_and_published_resources(self): """ Ensure we can Get & Set Permissions across the Resource Base list. @@ -2392,21 +2473,22 @@ def test_resource_service_permissions_with_restricted_settings(self): # Try to update permissions including anonymous and registered members groups set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions") - perm_spec = { - "uuid": resource.uuid, - "groups": [ - { - "id": anonymous_group.id, - "name": "anonymous", - "permissions": "view", - }, - { - "id": registered_group.id, - "name": "registered-members", - "permissions": "download", - }, - ], - } + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + perm_spec = response.data + perm_spec["uuid"] = resource.uuid + perm_spec["groups"] = [ + { + "id": anonymous_group.id, + "name": "anonymous", + "permissions": "view", + }, + { + "id": registered_group.id, + "name": "registered-members", + "permissions": "download", + }, + ] response = self.client.put(set_perms_url, data=perm_spec, format="json") self.assertEqual(response.status_code, 200) @@ -2454,21 +2536,22 @@ def test_resource_service_permissions_with_restricted_settings(self): # login as admin (staff user) and verify admin can modify these permissions self.assertTrue(self.client.login(username="admin", password="admin")) - perm_spec_admin = { - "uuid": resource.uuid, - "groups": [ - { - "id": anonymous_group.id, - "name": "anonymous", - "permissions": "view", - }, - { - "id": registered_group.id, - "name": "registered-members", - "permissions": "view", - }, - ], - } + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + perm_spec_admin = response.data + perm_spec_admin["uuid"] = resource.uuid + perm_spec_admin["groups"] = [ + { + "id": anonymous_group.id, + "name": "anonymous", + "permissions": "view", + }, + { + "id": registered_group.id, + "name": "registered-members", + "permissions": "view", + }, + ] response = self.client.put(set_perms_url, data=perm_spec_admin, format="json") self.assertEqual(response.status_code, 200) @@ -2539,21 +2622,22 @@ def test_resource_service_permissions_with_partial_restriction(self): registered_group = Group.objects.get(name="registered-members") set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions") - perm_spec = { - "uuid": resource.uuid, - "groups": [ - { - "id": anonymous_group.id, - "name": "anonymous", - "permissions": "view", - }, - { - "id": registered_group.id, - "name": "registered-members", - "permissions": "download", - }, - ], - } + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + perm_spec = response.data + perm_spec["uuid"] = resource.uuid + perm_spec["groups"] = [ + { + "id": anonymous_group.id, + "name": "anonymous", + "permissions": "view", + }, + { + "id": registered_group.id, + "name": "registered-members", + "permissions": "download", + }, + ] response = self.client.put(set_perms_url, data=perm_spec, format="json") self.assertEqual(response.status_code, 200) @@ -2599,6 +2683,79 @@ def test_resource_service_permissions_with_partial_restriction(self): "Bobby should not have been able to set registered-members to download", ) + @override_settings( + EDITORS_CAN_MANAGE_ANONYMOUS_PERMISSIONS=False, + EDITORS_CAN_MANAGE_REGISTERED_MEMBERS_PERMISSIONS=False, + ) + def test_resource_service_permissions_patch_with_restricted_settings(self): + resource = Dataset.objects.filter(owner__username="admin").first() + bobby = get_user_model().objects.get(username="bobby") + resource.set_permissions( + { + "users": { + bobby: [ + "base.change_resourcebase", + "base.change_resourcebase_metadata", + "base.change_resourcebase_permissions", + ] + } + } + ) + + self.assertTrue(self.client.login(username="bobby", password="bob")) + + anonymous_group = Group.objects.get(name="anonymous") + registered_group = Group.objects.get(name="registered-members") + set_perms_url = urljoin(f"{reverse('base-resources-detail', kwargs={'pk': resource.pk})}/", "permissions") + + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + initial_group_permissions = {g["name"]: g["permissions"] for g in response.data.get("groups", [])} + anonymous_target = "download" if initial_group_permissions.get("anonymous") != "download" else "view" + registered_target = "download" if initial_group_permissions.get("registered-members") != "download" else "view" + + response = self.client.patch( + set_perms_url, + data={ + "uuid": resource.uuid, + "groups": [ + { + "id": anonymous_group.id, + "name": "anonymous", + "permissions": anonymous_target, + }, + { + "id": registered_group.id, + "name": "registered-members", + "permissions": registered_target, + }, + ], + }, + format="json", + ) + self.assertEqual(response.status_code, 200) + + resp_js = json.loads(response.content.decode("utf-8")) + execution_id = resp_js.get("execution_id", "") + status_url = resp_js.get("status_url", None) + for _cnt in range(0, 10): + response = self.client.get(f"{status_url}") + self.assertEqual(response.status_code, 200) + resp_js = json.loads(response.content.decode("utf-8")) + if resp_js.get("status", "") == "finished": + break + else: + resouce_service_dispatcher.apply((execution_id,)) + sleep(3.0) + + response = self.client.get(set_perms_url, format="json") + self.assertEqual(response.status_code, 200) + group_permissions = {g["name"]: g["permissions"] for g in response.data.get("groups", [])} + self.assertEqual(group_permissions.get("anonymous"), initial_group_permissions.get("anonymous")) + self.assertEqual( + group_permissions.get("registered-members"), initial_group_permissions.get("registered-members") + ) + def test_resource_service_copy(self): files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp") files_as_dict, _ = get_files(files) diff --git a/geonode/base/api/views.py b/geonode/base/api/views.py index 045a5da1b7e..f2247e99c92 100644 --- a/geonode/base/api/views.py +++ b/geonode/base/api/views.py @@ -68,7 +68,7 @@ AdvertisedFilter, ) from geonode.groups.models import GroupProfile, Group -from geonode.security.permissions import get_compact_perms_list, PermSpec +from geonode.security.permissions import get_compact_perms_list, PermSpec, PermSpecCompact from geonode.security.utils import ( get_visible_resources, get_resources_with_perms, @@ -105,7 +105,7 @@ ) from geonode.people.api.serializers import UserSerializer from .pagination import GeoNodeApiPagination -from geonode.base.utils import validate_extra_metadata +from geonode.base.utils import validate_extra_metadata, patch_perms from geonode.assets.models import Asset from geonode.assets.utils import create_asset_and_link, unlink_asset from geonode.assets.handlers import asset_handler_registry @@ -620,23 +620,33 @@ def resource_service_permissions(self, request, pk, *args, **kwargs): ) elif request.method in ["PUT", "PATCH"]: user_perms = permissions_registry.get_perms(instance=resource, user=request.user) - if request.data.get("groups"): - excluded_ids = [] - if "can_manage_anonymous_permissions" not in user_perms: - anonymous_group = Group.objects.get(name="anonymous") - excluded_ids.append(anonymous_group.id) - logger.info( - f"User {request.user.username} cannot manage anonymous permissions on resource {resource.pk}" - ) - if "can_manage_registered_member_permissions" not in user_perms: - registered_group = Group.objects.get(name=groups_settings.REGISTERED_MEMBERS_GROUP_NAME) - excluded_ids.append(registered_group.id) - logger.info( - f"User {request.user.username} cannot manage registered members permissions on resource {resource.pk}" - ) - if excluded_ids: - request.data["groups"] = [g for g in request.data["groups"] if g.get("id") not in excluded_ids] - perms_spec_compact_resource = patch_perms(request.data, perms_spec.compact, resource) + current_compact = PermSpecCompact(perms_spec.compact, resource) + if request.method == "PATCH": + proposed_compact = PermSpecCompact(perms_spec.compact, resource) + proposed_compact.merge(PermSpecCompact(request.data, resource)) + else: + proposed_compact = PermSpecCompact(request.data, resource) + + perms_diff = current_compact.diff(proposed_compact) + excluded_group_ids = [] + if "can_manage_anonymous_permissions" not in user_perms: + anonymous_group = Group.objects.get(name="anonymous") + excluded_group_ids.append(anonymous_group.id) + logger.info( + f"User {request.user.username} cannot manage anonymous permissions on resource {resource.pk}" + ) + if "can_manage_registered_member_permissions" not in user_perms: + registered_group = Group.objects.get(name=groups_settings.REGISTERED_MEMBERS_GROUP_NAME) + excluded_group_ids.append(registered_group.id) + logger.info( + f"User {request.user.username} cannot manage registered members permissions on resource {resource.pk}" + ) + if excluded_group_ids: + for diff_action in ("added", "removed", "changed"): + perms_diff.groups[diff_action] = [ + item for item in perms_diff.groups[diff_action] if item.get("id") not in excluded_group_ids + ] + perms_spec_compact_resource = patch_perms(perms_spec.compact, perms_diff, resource) if resource.dirty_state: raise Exception("Cannot update if the resource is in dirty state") diff --git a/geonode/base/utils.py b/geonode/base/utils.py index f5b543b5066..c73623ce146 100644 --- a/geonode/base/utils.py +++ b/geonode/base/utils.py @@ -39,7 +39,7 @@ from geonode.security.utils import AdvancedSecurityWorkflowManager from geonode.thumbs.utils import get_thumbs, remove_thumb from geonode.utils import get_legend_url -from geonode.security.permissions import PermSpecCompact +from geonode.security.permissions import PermSpecCompactDiff logger = logging.getLogger("geonode.base.utils") @@ -216,11 +216,14 @@ def remove_country_from_languagecode(language: str): return lang -def patch_perms(updated_perms_compact, current_perms_compact, resource): +def patch_perms(current_perms_compact, perms_diff, resource): """ - Patch updated permission changes with current permissions. + Apply a permission diff to a current compact spec. + + ``perms_diff`` may be a :class:`PermSpecCompactDiff` instance or its dict + representation (as produced by :meth:`PermSpecCompactDiff.to_dict` or + :meth:`PermSpecCompact.diff`). Returns the resulting ``PermSpecCompact``. """ - perms_spec_compact_patch = PermSpecCompact(updated_perms_compact, resource) - perms_spec_compact_resource = PermSpecCompact(current_perms_compact, resource) - perms_spec_compact_resource.merge(perms_spec_compact_patch) - return perms_spec_compact_resource + if not isinstance(perms_diff, PermSpecCompactDiff): + perms_diff = PermSpecCompactDiff.from_dict(perms_diff) + return perms_diff.apply(current_perms_compact, resource) diff --git a/geonode/security/permissions.py b/geonode/security/permissions.py index 7ab61866744..fe0a980d8ce 100644 --- a/geonode/security/permissions.py +++ b/geonode/security/permissions.py @@ -342,6 +342,11 @@ def _bind_json(self, json): # if possible if not isinstance(json, dict): self._binding_failed("expected dict, got %s", type(json)) + # ``pop`` below mutates this dict; copy so callers (and any + # caller-owned references via shared list items) stay intact. Each + # recursive ``_bind_json`` call does the same for its own input, so a + # shallow copy at every level is sufficient. + json = dict(json) for binding in self._bindings: val = json.pop(binding.name, None) if binding.expected and val is None: @@ -670,6 +675,75 @@ def extended(self): ) return json.copy() + def diff(self, other: "PermSpecCompact") -> "PermSpecCompactDiff": + """Compute the differences between ``self`` and ``other``. + + ``self`` is treated as the current state and ``other`` as the proposed + one. Returns a :class:`PermSpecCompactDiff` describing the mutations + needed to move from ``self`` to ``other``, bucketed by + ``users`` / ``organizations`` / ``groups``: + + - ``added`` — entries present in ``other`` but not in ``self``. + - ``removed`` — entries present in ``self`` but not in ``other``. + - ``changed`` — entries present in both with a different ``permissions`` + value; each item carries ``from`` and ``to`` to describe the + transition. + + Entities are matched by ``id``. ``"none"`` is treated as a permission + value, not as absence — callers wanting to collapse it to a removal + can post-process the result. + """ + + # Index each bucket's entries by id so membership/lookup is O(1). + # Duplicate ids in the same bucket are first-wins (matches merge()). + def _index(entries): + indexed = {} + for entry in entries or []: + entry_id = getattr(entry, "id", None) + if entry_id is None or entry_id in indexed: + continue + indexed[entry_id] = entry + return indexed + + # Serialize an indexed entry back to its dict form for the diff payload. + def _payload(entry): + return entry._to_json_object(top_level=False) + + buckets = {} + # Iterate the same buckets the compact spec already defines + # (users / organizations / groups), driven by _bindings so the diff + # stays in lockstep with the schema. + for bucket in (b.name for b in self._bindings): + current = _index(getattr(self, bucket, None)) + proposed = _index(getattr(other, bucket, None)) + + # Set algebra over ids gives us the three mutation kinds in one shot: + # added = proposed \ current (ids only in other) + # removed = current \ proposed (ids only in self) + # changed = proposed ∩ current (ids in both, filtered by perm) + # "none" is a legal permission value, not absence — equal perms on + # both sides (including "none" == "none") yield no entry at all. + added = [_payload(proposed[i]) for i in proposed.keys() - current.keys()] + removed = [_payload(current[i]) for i in current.keys() - proposed.keys()] + changed = [] + for entry_id in current.keys() & proposed.keys(): + before = current[entry_id].permissions + after = proposed[entry_id].permissions + if before == after: + continue + # Build the changed entry from the proposed side (so identifier + # fields reflect the latest values), then replace the single + # "permissions" key with the explicit from/to transition — + # validators downstream need direction, not just the new value. + payload = _payload(proposed[entry_id]) + payload.pop("permissions", None) + payload["from"] = before + payload["to"] = after + changed.append(payload) + + buckets[bucket] = {"added": added, "removed": removed, "changed": changed} + return PermSpecCompactDiff(**buckets) + def merge(self, perm_spec_compact_patch: "PermSpecCompact"): """Merges 'perm_spec_compact_patch' to the current one. @@ -693,6 +767,98 @@ def merge(self, perm_spec_compact_patch: "PermSpecCompact"): getattr(self, _elem).add(_up) +class PermSpecCompactDiff: + """Structured result of diffing two ``PermSpecCompact`` instances. + + The diff is bucketed (``users`` / ``organizations`` / ``groups``); each + bucket carries ``added`` / ``removed`` / ``changed`` lists describing the + mutations needed to move from a current to a proposed state. ``added`` and + ``removed`` items are full compact entries (with ``permissions``); + ``changed`` items carry identifying fields plus ``from`` / ``to`` to + describe the permission transition. + + See :meth:`PermSpecCompact.diff` for how an instance is produced and + :meth:`apply` for how to materialize the result onto a current compact + spec. + """ + + _BUCKETS = ("users", "organizations", "groups") + + def __init__(self, users=None, organizations=None, groups=None): + self.users = self._normalize_bucket(users) + self.organizations = self._normalize_bucket(organizations) + self.groups = self._normalize_bucket(groups) + + @staticmethod + def _normalize_bucket(bucket): + bucket = bucket or {} + return { + "added": list(bucket.get("added", []) or []), + "removed": list(bucket.get("removed", []) or []), + "changed": list(bucket.get("changed", []) or []), + } + + @classmethod + def from_dict(cls, data): + data = data or {} + return cls( + users=data.get("users"), + organizations=data.get("organizations"), + groups=data.get("groups"), + ) + + def to_dict(self): + return {bucket: dict(getattr(self, bucket)) for bucket in self._BUCKETS} + + def is_empty(self): + for bucket in self._BUCKETS: + entries = getattr(self, bucket) + if entries["added"] or entries["removed"] or entries["changed"]: + return False + return True + + def apply(self, current_perms_compact, resource) -> "PermSpecCompact": + """Materialize the diff on top of a current compact spec. + + ``current_perms_compact`` is the dict-shaped compact spec to start + from; ``resource`` is the target resource (used by the underlying + ``PermSpecCompact`` constructor). Within each bucket: ``removed`` + entries are dropped by id, ``changed`` entries update the + ``permissions`` of existing entries (matched by id), and ``added`` + entries are appended (replacing any pre-existing entry with the same + id so the operation is idempotent against stale diffs). + """ + result = PermSpecCompact(current_perms_compact, resource) + binding_cls_by_bucket = {b.name: b.binding for b in result._bindings} + + for bucket, binding_cls in binding_cls_by_bucket.items(): + bucket_diff = getattr(self, bucket) + entries = getattr(result, bucket, None) or [] + setattr(result, bucket, entries) + + removed_ids = {e["id"] for e in bucket_diff["removed"] if e.get("id") is not None} + if removed_ids: + entries[:] = [e for e in entries if e.id not in removed_ids] + + change_map = {e["id"]: e["to"] for e in bucket_diff["changed"] if e.get("id") is not None} + if change_map: + for e in entries: + if e.id in change_map: + e.permissions = change_map[e.id] + + added_ids = {new.get("id") for new in bucket_diff["added"] if new.get("id") is not None} + if added_ids: + entries[:] = [e for e in entries if e.id not in added_ids] + for new in bucket_diff["added"]: + if new.get("id") is not None: + entries.append(binding_cls(new, resource, parent=result)) + + return result + + def __repr__(self): + return f"PermSpecCompactDiff({self.to_dict()!r})" + + def get_compact_perms_list( perms: list, resource_type: str = None, diff --git a/geonode/security/tests.py b/geonode/security/tests.py index 0f650b6f68b..41c6038f7a1 100644 --- a/geonode/security/tests.py +++ b/geonode/security/tests.py @@ -1834,6 +1834,259 @@ def test_perm_spec_conversion(self): _p.compact, ) + def test_perm_spec_compact_diff(self): + """PermSpecCompact.diff reports added/removed/changed per bucket.""" + standard_user = get_user_model().objects.get(username="bobby") + dataset = Dataset.objects.filter(owner=standard_user).first() + + def _user(uid, perm): + return { + "id": uid, + "username": f"u{uid}", + "first_name": "", + "last_name": "", + "avatar": "", + "permissions": perm, + "is_staff": False, + "is_superuser": False, + } + + def _group(gid, perm, name="g"): + return {"id": gid, "title": name, "name": name, "permissions": perm} + + empty_bucket = {"added": [], "removed": [], "changed": []} + + # 1. Identical specs -> empty diff in every bucket. + current = PermSpecCompact( + { + "users": [_user(10, "view"), _user(11, "edit")], + "organizations": [_group(20, "view", "org")], + "groups": [_group(3, "view", "anonymous")], + }, + dataset, + ) + proposed = PermSpecCompact( + { + "users": [_user(10, "view"), _user(11, "edit")], + "organizations": [_group(20, "view", "org")], + "groups": [_group(3, "view", "anonymous")], + }, + dataset, + ) + diff = current.diff(proposed) + self.assertEqual(diff.users, empty_bucket) + self.assertEqual(diff.organizations, empty_bucket) + self.assertEqual(diff.groups, empty_bucket) + + # 2. Mixed mutations across all buckets. + current = PermSpecCompact( + { + "users": [_user(10, "view"), _user(11, "edit")], + "organizations": [_group(20, "view", "org")], + "groups": [_group(3, "view", "anonymous")], + }, + dataset, + ) + proposed = PermSpecCompact( + { + # user 10 changed view -> manage, 11 removed, 12 added. + "users": [_user(10, "manage"), _user(12, "view")], + # org 21 added, 20 unchanged. + "organizations": [_group(20, "view", "org"), _group(21, "edit", "org2")], + # anonymous group view -> download. + "groups": [_group(3, "download", "anonymous")], + }, + dataset, + ) + + diff = current.diff(proposed) + + # users bucket + self.assertEqual([u["id"] for u in diff.users["added"]], [12]) + self.assertEqual(diff.users["added"][0]["permissions"], "view") + self.assertEqual([u["id"] for u in diff.users["removed"]], [11]) + self.assertEqual(diff.users["removed"][0]["permissions"], "edit") + self.assertEqual(len(diff.users["changed"]), 1) + changed_user = diff.users["changed"][0] + self.assertEqual(changed_user["id"], 10) + self.assertEqual(changed_user["from"], "view") + self.assertEqual(changed_user["to"], "manage") + # changed entries carry identifier fields but drop "permissions" in favor of from/to. + self.assertNotIn("permissions", changed_user) + self.assertEqual(changed_user["username"], "u10") + + # organizations bucket + self.assertEqual([o["id"] for o in diff.organizations["added"]], [21]) + self.assertEqual(diff.organizations["added"][0]["permissions"], "edit") + self.assertEqual(diff.organizations["removed"], []) + self.assertEqual(diff.organizations["changed"], []) + + # groups bucket + self.assertEqual(diff.groups["added"], []) + self.assertEqual(diff.groups["removed"], []) + self.assertEqual(len(diff.groups["changed"]), 1) + self.assertEqual(diff.groups["changed"][0]["id"], 3) + self.assertEqual(diff.groups["changed"][0]["from"], "view") + self.assertEqual(diff.groups["changed"][0]["to"], "download") + + # 3. "none" on both sides is a no-op (treated as a value, not absence). + current = PermSpecCompact({"users": [_user(10, "none")]}, dataset) + proposed = PermSpecCompact({"users": [_user(10, "none")]}, dataset) + self.assertEqual(current.diff(proposed).users, empty_bucket) + + # 4. "none" -> value is reported as a transition, not as added. + proposed = PermSpecCompact({"users": [_user(10, "view")]}, dataset) + diff = current.diff(proposed) + self.assertEqual(diff.users["added"], []) + self.assertEqual(diff.users["removed"], []) + self.assertEqual(len(diff.users["changed"]), 1) + self.assertEqual(diff.users["changed"][0]["from"], "none") + self.assertEqual(diff.users["changed"][0]["to"], "view") + + # 5. Missing buckets on either side do not blow up. + current = PermSpecCompact({"users": [_user(10, "view")]}, dataset) + proposed = PermSpecCompact({}, dataset) + diff = current.diff(proposed) + self.assertEqual([u["id"] for u in diff.users["removed"]], [10]) + self.assertEqual(diff.users["added"], []) + self.assertEqual(diff.users["changed"], []) + self.assertEqual(diff.organizations, empty_bucket) + self.assertEqual(diff.groups, empty_bucket) + + # 6. Duplicate ids in a bucket: first wins (consistent with merge()). + current = PermSpecCompact({"users": [_user(10, "view"), _user(10, "edit")]}, dataset) + proposed = PermSpecCompact({"users": [_user(10, "view")]}, dataset) + self.assertEqual(current.diff(proposed).users, empty_bucket) + + def test_perm_spec_compact_diff_class(self): + """PermSpecCompactDiff supports is_empty / to_dict / from_dict.""" + from geonode.security.permissions import PermSpecCompactDiff + + empty = PermSpecCompactDiff() + self.assertTrue(empty.is_empty()) + self.assertEqual( + empty.to_dict(), + { + "users": {"added": [], "removed": [], "changed": []}, + "organizations": {"added": [], "removed": [], "changed": []}, + "groups": {"added": [], "removed": [], "changed": []}, + }, + ) + + # Round-trip a populated diff. + payload = { + "users": { + "added": [{"id": 5, "username": "alice", "permissions": "view"}], + "removed": [], + "changed": [{"id": 3, "username": "carol", "from": "view", "to": "edit"}], + }, + "organizations": {"added": [], "removed": [], "changed": []}, + "groups": {"added": [], "removed": [], "changed": []}, + } + diff = PermSpecCompactDiff.from_dict(payload) + self.assertFalse(diff.is_empty()) + self.assertEqual(diff.to_dict(), payload) + + # Missing/partial buckets normalize to empty lists. + partial = PermSpecCompactDiff.from_dict({"users": {"added": [{"id": 1, "permissions": "view"}]}}) + self.assertEqual(partial.users["added"], [{"id": 1, "permissions": "view"}]) + self.assertEqual(partial.users["removed"], []) + self.assertEqual(partial.users["changed"], []) + self.assertEqual(partial.organizations, {"added": [], "removed": [], "changed": []}) + self.assertEqual(partial.groups, {"added": [], "removed": [], "changed": []}) + + def test_perm_spec_compact_diff_apply(self): + """PermSpecCompactDiff.apply materializes added/removed/changed onto a current spec.""" + from geonode.security.permissions import PermSpecCompactDiff + + standard_user = get_user_model().objects.get(username="bobby") + dataset = Dataset.objects.filter(owner=standard_user).first() + + def _user(uid, perm): + return { + "id": uid, + "username": f"u{uid}", + "first_name": "", + "last_name": "", + "avatar": "", + "permissions": perm, + "is_staff": False, + "is_superuser": False, + } + + def _group(gid, perm, name="g"): + return {"id": gid, "title": name, "name": name, "permissions": perm} + + current_compact = { + "users": [_user(10, "view"), _user(11, "edit")], + "organizations": [_group(20, "view", "org")], + "groups": [_group(3, "view", "anonymous")], + } + proposed_compact = { + "users": [_user(10, "manage"), _user(12, "view")], + "organizations": [_group(20, "view", "org"), _group(21, "edit", "org2")], + "groups": [_group(3, "download", "anonymous")], + } + + current = PermSpecCompact(current_compact, dataset) + proposed = PermSpecCompact(proposed_compact, dataset) + diff = current.diff(proposed) + + # Apply on a fresh copy and verify the result matches `proposed`. + applied = diff.apply(current_compact, dataset) + + def _by_id(entries): + return {e.id: e.permissions for e in entries or []} + + self.assertEqual(_by_id(applied.users), {10: "manage", 12: "view"}) + self.assertEqual(_by_id(applied.organizations), {20: "view", 21: "edit"}) + self.assertEqual(_by_id(applied.groups), {3: "download"}) + + # Applying an empty diff leaves the current state untouched. + unchanged = PermSpecCompactDiff().apply(current_compact, dataset) + self.assertEqual(_by_id(unchanged.users), {10: "view", 11: "edit"}) + self.assertEqual(_by_id(unchanged.organizations), {20: "view"}) + self.assertEqual(_by_id(unchanged.groups), {3: "view"}) + + def test_patch_perms_accepts_diff(self): + """patch_perms accepts both PermSpecCompactDiff and its dict form.""" + from geonode.base.utils import patch_perms + from geonode.security.permissions import PermSpecCompactDiff + + standard_user = get_user_model().objects.get(username="bobby") + dataset = Dataset.objects.filter(owner=standard_user).first() + + def _user(uid, perm): + return { + "id": uid, + "username": f"u{uid}", + "first_name": "", + "last_name": "", + "avatar": "", + "permissions": perm, + "is_staff": False, + "is_superuser": False, + } + + current_compact = {"users": [_user(10, "view"), _user(11, "edit")]} + diff = PermSpecCompactDiff.from_dict( + { + "users": { + "added": [_user(12, "view")], + "removed": [_user(11, "edit")], + "changed": [{"id": 10, "username": "u10", "from": "view", "to": "manage"}], + } + } + ) + + # Object form + result = patch_perms(current_compact, diff, dataset) + self.assertEqual({u.id: u.permissions for u in result.users}, {10: "manage", 12: "view"}) + + # Dict form (back-compat) + result_from_dict = patch_perms(current_compact, diff.to_dict(), dataset) + self.assertEqual({u.id: u.permissions for u in result_from_dict.users}, {10: "manage", 12: "view"}) + def test_admin_whitelisted_access_backend(self): from geonode.security.backends import AdminRestrictedAccessBackend from django.core.exceptions import PermissionDenied