From 01eaf15750c3b2629fcd8dd82c9d8115668cdf6c Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Fri, 12 Jun 2026 10:43:25 +0100 Subject: [PATCH 01/13] Enabling Entra auth --- common/auth/cognito_jwt/user_manager.py | 37 --- common/auth/cognito_jwt/validator.py | 88 ------ common/auth/{cognito_jwt => jwt}/__init__.py | 0 common/auth/{cognito_jwt => jwt}/backend.py | 71 +++-- common/auth/jwt/user_manager.py | 72 +++++ common/auth/jwt/validator.py | 175 ++++++++++++ config.py | 5 + metrics/api/settings/default.py | 11 +- .../auth/cognito_jwt/test_user_manager.py | 46 ---- .../common/auth/cognito_jwt/test_validator.py | 120 --------- .../auth/{cognito_jwt => jwt}/conftest.py | 26 ++ .../auth/{cognito_jwt => jwt}/test_backend.py | 100 ++++++- .../unit/common/auth/jwt/test_user_manager.py | 100 +++++++ tests/unit/common/auth/jwt/test_validator.py | 250 ++++++++++++++++++ .../common/auth/{cognito_jwt => jwt}/urls.py | 2 +- .../common/auth/{cognito_jwt => jwt}/utils.py | 0 16 files changed, 777 insertions(+), 326 deletions(-) delete mode 100644 common/auth/cognito_jwt/user_manager.py delete mode 100644 common/auth/cognito_jwt/validator.py rename common/auth/{cognito_jwt => jwt}/__init__.py (100%) rename common/auth/{cognito_jwt => jwt}/backend.py (54%) create mode 100644 common/auth/jwt/user_manager.py create mode 100644 common/auth/jwt/validator.py delete mode 100644 tests/unit/common/auth/cognito_jwt/test_user_manager.py delete mode 100644 tests/unit/common/auth/cognito_jwt/test_validator.py rename tests/unit/common/auth/{cognito_jwt => jwt}/conftest.py (80%) rename tests/unit/common/auth/{cognito_jwt => jwt}/test_backend.py (66%) create mode 100644 tests/unit/common/auth/jwt/test_user_manager.py create mode 100644 tests/unit/common/auth/jwt/test_validator.py rename tests/unit/common/auth/{cognito_jwt => jwt}/urls.py (85%) rename tests/unit/common/auth/{cognito_jwt => jwt}/utils.py (100%) diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py deleted file mode 100644 index 3a69470a4a..0000000000 --- a/common/auth/cognito_jwt/user_manager.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging - -from django.contrib.auth import get_user_model -from django.contrib.auth.models import BaseUserManager - -logger = logging.getLogger(__name__) - - -class CognitoManager(BaseUserManager): - - @staticmethod - def get_or_create_for_cognito(jwt_payload): - """Create an ephemeral user instance for this request. - We don't need to store or retrieve any info, we use what's in the JWT, - so this speeds up the request by removing the need for any DB access - """ - try: - username = jwt_payload["entraObjectId"] - permission_sets = jwt_payload["permissionSets"] - if not permission_sets: - logger.debug( - "Empty permissionSets in token for user: '%s'", - username, - ) - return None - except KeyError: - logger.debug( - "Error getting entraObjectId and/or permissionSets field(s)" - " from jwt payload: '%s'", - jwt_payload, - ) - return None - - user_class = get_user_model() - user = user_class(username=username) - user.permission_sets = permission_sets - return user diff --git a/common/auth/cognito_jwt/validator.py b/common/auth/cognito_jwt/validator.py deleted file mode 100644 index 9ace1bd95b..0000000000 --- a/common/auth/cognito_jwt/validator.py +++ /dev/null @@ -1,88 +0,0 @@ -import json -import logging - -import jwt -import requests -from django.conf import settings -from django.core.cache import cache -from django.utils.functional import cached_property -from jwt.algorithms import RSAAlgorithm - -logger = logging.getLogger(__name__) - - -class TokenError(Exception): - pass - - -class TokenValidator: - def __init__(self, aws_region, aws_user_pool, audience): - self.aws_region = aws_region - self.aws_user_pool = aws_user_pool - self.audience = audience - - @cached_property - def pool_url(self): - return ( - f"https://cognito-idp.{self.aws_region}.amazonaws.com/{self.aws_user_pool}" - ) - - @cached_property - def _json_web_keys(self): - response = requests.get(self.pool_url + "/.well-known/jwks.json", timeout=10) - response.raise_for_status() - json_data = response.json() - return {item["kid"]: json.dumps(item) for item in json_data["keys"]} - - def _get_public_key(self, token): - try: - headers = jwt.get_unverified_header(token) - except jwt.DecodeError as exc: - raise TokenError(str(exc)) from exc - - if getattr(settings, "COGNITO_PUBLIC_KEYS_CACHING_ENABLED", False): - cache_key = "cognito_jwt:{}".format(headers["kid"]) - jwk_data = cache.get(cache_key) - - if not jwk_data: - jwk_data = self._json_web_keys.get(headers["kid"]) - timeout = getattr(settings, "COGNITO_PUBLIC_KEYS_CACHING_TIMEOUT", 300) - cache.set(cache_key, jwk_data, timeout=timeout) - else: - jwk_data = self._json_web_keys.get(headers["kid"]) - - if jwk_data: - return RSAAlgorithm.from_jwk(jwk_data) - return None - - def validate(self, token): - public_key = self._get_public_key(token) - if not public_key: - msg = "No key found for this token" - raise TokenError(msg) - - params = { - "jwt": token, - "key": public_key, - "issuer": self.pool_url, - "algorithms": ["RS256"], - } - - logger.debug("JWT - %s", params) - token_payload = jwt.decode( - token, options={"verify_signature": False} # noqa: S5659 - ) - logger.debug("JWT decoded - %s", token_payload) - - if "aud" in token_payload: - params.update({"audience": self.audience}) - - try: - jwt_data = jwt.decode(**params) - except ( - jwt.InvalidTokenError, - jwt.ExpiredSignatureError, - jwt.DecodeError, - ) as exc: - raise TokenError(str(exc)) from exc - return jwt_data diff --git a/common/auth/cognito_jwt/__init__.py b/common/auth/jwt/__init__.py similarity index 100% rename from common/auth/cognito_jwt/__init__.py rename to common/auth/jwt/__init__.py diff --git a/common/auth/cognito_jwt/backend.py b/common/auth/jwt/backend.py similarity index 54% rename from common/auth/cognito_jwt/backend.py rename to common/auth/jwt/backend.py index 1fed42fec9..d1c214fe24 100644 --- a/common/auth/cognito_jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -1,3 +1,4 @@ +import jwt import logging from django.apps import apps as django_apps @@ -8,7 +9,7 @@ from rest_framework import HTTP_HEADER_ENCODING, exceptions from rest_framework.authentication import BaseAuthentication -from .validator import TokenError, TokenValidator +from .validator import TokenError, CognitoTokenValidator, EntraTokenValidator logger = logging.getLogger(__name__) @@ -24,6 +25,12 @@ def get_authorization_header(request): """ auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "Authorization") auth = request.META.get(auth_header, b"") + + # If the Cognito header isn't present, look for the Entra auth header + if not auth: + auth_header = getattr(settings, "ENTRA_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") + auth = request.META.get(auth_header, b"") + if isinstance(auth, str): # Work around django test client oddness auth = auth.encode(HTTP_HEADER_ENCODING) @@ -44,38 +51,46 @@ def authenticate(self, request): # Authenticate token try: - token_validator = self.get_token_validator(request) - jwt_payload = token_validator.validate(jwt_token) + token_validator, provider_name = self.get_token_validator(jwt_token) except TokenError: + logger.debug(f"Failed to identify token provider: {e}") + raise exceptions.AuthenticationFailed(_("Unknown or malformed token issuer.")) + + try: + jwt_payload = token_validator.validate(jwt_token) + except TokenError as e: + logger.debug(f"{provider_name.capitalize()} token validation failed: {e}") raise exceptions.AuthenticationFailed from None - custom_user_manager = self.get_custom_user_manager() + custom_user_manager = self.get_custom_user_manager(provider_name) + if custom_user_manager: - user = custom_user_manager.get_or_create_for_cognito(jwt_payload) + user = custom_user_manager.get_or_create(jwt_payload) else: user_model = self.get_user_model() - user = user_model.objects.get_or_create_for_cognito(jwt_payload) + user = user_model.objects.get_or_create(jwt_payload) if not user: logger.debug( "Unable to create user from JWT, defaulting to unauthenticated" ) return None + return (user, jwt_token) @staticmethod - def get_custom_user_manager(): - """If COGNITO_USER_MANAGER is set, then the user object is obtained - via get_or_create_for_cognito on the user manager, this allows use + def get_custom_user_manager(provider="cognito"): + """If COGNITO_USER_MANAGER or ENTRA_USER_MANAGER is set, then the user object is obtained + via get_or_create_for_cognito (or get_or_create_for_entra) on the user manager, this allows use of the default unmodified Django User model""" result = None - custom_user_manager_path = getattr(settings, "COGNITO_USER_MANAGER", False) + custom_user_manager_path = getattr(settings, "ENTRA_USER_MANAGER", False) if provider == "entra" else getattr(settings, "COGNITO_USER_MANAGER", False) if custom_user_manager_path: result = import_string(custom_user_manager_path)() return result @staticmethod - def get_user_model(): - user_model = getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) + def get_user_model(provider="cognito"): + user_model = getattr(settings, "ENTRA_USER_MODEL", settings.AUTH_USER_MODEL) if provider == "entra" else getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) return django_apps.get_model(user_model, require_ready=False) @staticmethod @@ -97,12 +112,32 @@ def get_jwt_token(request): return auth[1] @staticmethod - def get_token_validator(request): - return TokenValidator( - settings.COGNITO_AWS_REGION, - settings.COGNITO_USER_POOL, - settings.COGNITO_AUDIENCE, - ) + def get_token_validator(jwt_token): + try: + # Decode without verifying signature just to read the header/payload + unverified_payload = jwt.decode(jwt_token, options={"verify_signature": False}) + issuer = unverified_payload.get("iss", "") + except jwt.PyJWTError: + raise exceptions.AuthenticationFailed(_("Malformed JWT.")) + + if "cognito-idp" in issuer: + validator = CognitoTokenValidator( + settings.COGNITO_AWS_REGION, + settings.COGNITO_USER_POOL, + settings.COGNITO_AUDIENCE, + ) + return validator, "cognito" + + elif "sts.windows.net" in issuer: + validator = EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_APP_ID, + ) + return validator, "entra" + + else: + raise exceptions.AuthenticationFailed(_("Invalid or unsupported token issuer.")) @staticmethod def authenticate_header(request): diff --git a/common/auth/jwt/user_manager.py b/common/auth/jwt/user_manager.py new file mode 100644 index 0000000000..2b9aedfecb --- /dev/null +++ b/common/auth/jwt/user_manager.py @@ -0,0 +1,72 @@ +import logging + +from django.contrib.auth import get_user_model +from django.contrib.auth.models import BaseUserManager + +from metrics.data.managers.rbac_models.user import UserManager +from cms.auth_content.models.users import User +from metrics.utils.permission_hierarchy import build_permission_hierarchy +from rest_framework import exceptions + +logger = logging.getLogger(__name__) + +def get_user_permission_set(id: str): + permissions = UserManager.get_permission_sets_for_user(id) + return build_permission_hierarchy(permissions) + + +class CognitoManager(BaseUserManager): + + @staticmethod + def get_or_create(jwt_payload): + """Create an ephemeral user instance for this request. + If the permissions aren't present in the JWT, queries for them in + the database based on the entraObjectId in the token + """ + try: + username = jwt_payload["entraObjectId"] + # Check if the JWT already includes permissionSets + # Use if found, if not grab user permissions from the database + if "permissionSets" in jwt_payload: + permission_sets = jwt_payload["permissionSets"] + else: + permission_sets = get_user_permission_set(username) + except KeyError: + logger.debug( + "Error getting entraObjectId and/or permissionSets field(s)" + " from jwt payload: '%s'", + jwt_payload, + ) + return None + + user_class = get_user_model() + user = user_class(username=username) + user.permission_sets = permission_sets + return user + + +class EntraManager(BaseUserManager): + + @staticmethod + def get_or_create(jwt_payload): + """Create an ephemeral user instance for this request. + If the provided appid isn't present in the database, raises + AuthenticationFailed exception + """ + try: + username = jwt_payload["appid"] + if not User.objects.filter(user_id=username).exists(): + raise exceptions.AuthenticationFailed(("Application not found.")) + permission_sets = get_user_permission_set(username) + except KeyError: + logger.info( + "Error getting entraObjectId and/or permissionSets field(s)" + " from jwt payload: '%s'", + jwt_payload, + ) + return None + + user_class = get_user_model() + user = user_class(username=username) + user.permission_sets = permission_sets + return user diff --git a/common/auth/jwt/validator.py b/common/auth/jwt/validator.py new file mode 100644 index 0000000000..82a89e64f8 --- /dev/null +++ b/common/auth/jwt/validator.py @@ -0,0 +1,175 @@ +import json +import logging + +import jwt +import requests +from django.conf import settings +from django.core.cache import cache +from django.utils.functional import cached_property +from jwt.algorithms import RSAAlgorithm + +logger = logging.getLogger(__name__) + + +class TokenError(Exception): + pass + + +class CognitoTokenValidator: + def __init__(self, aws_region, aws_user_pool, audience): + self.aws_region = aws_region + self.aws_user_pool = aws_user_pool + self.audience = audience + + @cached_property + def pool_url(self): + return ( + f"https://cognito-idp.{self.aws_region}.amazonaws.com/{self.aws_user_pool}" + ) + + @cached_property + def _json_web_keys(self): + response = requests.get(self.pool_url + "/.well-known/jwks.json", timeout=10) + response.raise_for_status() + json_data = response.json() + return {item["kid"]: json.dumps(item) for item in json_data["keys"]} + + def _get_public_key(self, token): + try: + headers = jwt.get_unverified_header(token) + except jwt.DecodeError as exc: + raise TokenError(str(exc)) from exc + + if getattr(settings, "COGNITO_PUBLIC_KEYS_CACHING_ENABLED", False): + cache_key = "cognito_jwt:{}".format(headers["kid"]) + jwk_data = cache.get(cache_key) + + if not jwk_data: + jwk_data = self._json_web_keys.get(headers["kid"]) + timeout = getattr(settings, "COGNITO_PUBLIC_KEYS_CACHING_TIMEOUT", 300) + cache.set(cache_key, jwk_data, timeout=timeout) + else: + jwk_data = self._json_web_keys.get(headers["kid"]) + + if jwk_data: + return RSAAlgorithm.from_jwk(jwk_data) + return None + + def validate(self, token): + public_key = self._get_public_key(token) + if not public_key: + msg = "No key found for this token" + raise TokenError(msg) + + params = { + "jwt": token, + "key": public_key, + "issuer": self.pool_url, + "algorithms": ["RS256"], + } + + logger.debug("JWT - %s", params) + token_payload = jwt.decode( + token, options={"verify_signature": False} # noqa: S5659 + ) + logger.debug("JWT decoded - %s", token_payload) + + if "aud" in token_payload: + params.update({"audience": self.audience}) + + try: + jwt_data = jwt.decode(**params) + except ( + jwt.InvalidTokenError, + jwt.ExpiredSignatureError, + jwt.DecodeError, + ) as exc: + raise TokenError(str(exc)) from exc + return jwt_data + + +class EntraTokenValidator: + def __init__(self, tenant_id, audience, app_id): + self.tenant_id = tenant_id + self.audience = audience + self.app_id = app_id + + @cached_property + def expected_issuer(self): + return f"https://sts.windows.net/{self.tenant_id}/" + + @cached_property + def jwks_url(self): + return "https://login.microsoftonline.com/common/discovery/keys" + + @cached_property + def _json_web_keys(self): + response = requests.get(self.jwks_url, timeout=10) + response.raise_for_status() + json_data = response.json() + return {item["kid"]: json.dumps(item) for item in json_data["keys"]} + + def _get_public_key(self, token): + try: + headers = jwt.get_unverified_header(token) + except jwt.DecodeError as exc: + raise TokenError(str(exc)) from exc + + if getattr(settings, "ENTRA_PUBLIC_KEYS_CACHING_ENABLED", False): + cache_key = "entra_jwt:{}".format(headers["kid"]) + jwk_data = cache.get(cache_key) + + if not jwk_data: + jwk_data = self._json_web_keys.get(headers["kid"]) + timeout = getattr(settings, "ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT", 300) + cache.set(cache_key, jwk_data, timeout=timeout) + else: + jwk_data = self._json_web_keys.get(headers["kid"]) + + if jwk_data: + return RSAAlgorithm.from_jwk(jwk_data) + return None + + def validate(self, token): + try: + unverified_payload = jwt.decode( + token, options={"verify_signature": False} + ) + except jwt.DecodeError as exc: + raise TokenError(str(exc)) from exc + + if "permission_sets" in unverified_payload: + logger.debug("permission_sets found in token, skipping validation.") + return unverified_payload + + public_key = self._get_public_key(token) + if not public_key: + msg = "No key found for this token" + raise TokenError(msg) + + params = { + "jwt": token, + "key": public_key, + "issuer": self.expected_issuer, + "audience": self.audience, + "algorithms": ["RS256"], + } + + try: + payload = jwt.decode(**params) + except ( + jwt.InvalidTokenError, + jwt.ExpiredSignatureError, + jwt.DecodeError, + ) as exc: + raise TokenError(str(exc)) from exc + + roles = payload.get("roles", []) + if "Application.Read" not in roles: + raise TokenError("Missing required role: Application.Read") + + app_id_claim = payload.get("appid") or payload.get("azp") + if app_id_claim != self.app_id: + raise TokenError("Invalid app_id claim") + + return payload diff --git a/config.py b/config.py index 5c10fdeb84..108521b864 100644 --- a/config.py +++ b/config.py @@ -66,6 +66,11 @@ COGNITO_JWT_AUTH_HEADER = os.environ.get("COGNITO_JWT_AUTH_HEADER") COGNITO_USER_POOL = os.environ.get("COGNITO_USER_POOL") +# Entra configuration +ENTRA_AUDIENCE = os.environ.get("ENTRA_AUDIENCE") +ENTRA_APP_ID = os.environ.get("ENTRA_APP_ID") +ENTRA_TENANT_ID = os.environ.get("ENTRA_TENANT_ID") + # Database configuration POSTGRES_DB = os.environ.get("POSTGRES_DB") POSTGRES_USER = os.environ.get("POSTGRES_USER") diff --git a/metrics/api/settings/default.py b/metrics/api/settings/default.py index 6f8ea47095..79b54c72cd 100644 --- a/metrics/api/settings/default.py +++ b/metrics/api/settings/default.py @@ -116,7 +116,14 @@ }, ] -COGNITO_USER_MANAGER = "common.auth.cognito_jwt.user_manager.CognitoManager" +ENTRA_USER_MANAGER = "common.auth.jwt.user_manager.EntraManager" +ENTRA_AUDIENCE = config.ENTRA_AUDIENCE +ENTRA_APP_ID = config.ENTRA_APP_ID +ENTRA_TENANT_ID = config.ENTRA_TENANT_ID +ENTRA_PUBLIC_KEYS_CACHING_ENABLED = True +ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT = 60 * 60 * 24 # 24h caching, default is 300s + +COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" COGNITO_AWS_REGION = config.COGNITO_AWS_REGION COGNITO_JWT_AUTH_HEADER = config.COGNITO_JWT_AUTH_HEADER COGNITO_USER_POOL = config.COGNITO_USER_POOL @@ -128,7 +135,7 @@ "DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema", "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", - "common.auth.cognito_jwt.JSONWebTokenAuthentication", + "common.auth.jwt.JSONWebTokenAuthentication", ], } diff --git a/tests/unit/common/auth/cognito_jwt/test_user_manager.py b/tests/unit/common/auth/cognito_jwt/test_user_manager.py deleted file mode 100644 index 85c79d582c..0000000000 --- a/tests/unit/common/auth/cognito_jwt/test_user_manager.py +++ /dev/null @@ -1,46 +0,0 @@ -from django.contrib.auth import get_user_model - -from common.auth.cognito_jwt.user_manager import CognitoManager - -USER_MODEL = get_user_model() - - -def test_get_or_create_for_cognito_returns_user(): - jwt_payload = { - "entraObjectId": "unique_user_id", - "permissionSets": ["all_the_permissions"], - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user - assert user.username == jwt_payload["entraObjectId"] - assert user.permission_sets == jwt_payload["permissionSets"] - assert user.is_active is True - - -def test_get_or_create_for_cognito_returns_none_without_username(): - jwt_payload = { - "permissionSets": ["all_the_permissions"], - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user is None - - -def test_get_or_create_for_cognito_returns_none_without_permission_sets(): - jwt_payload = { - "entraObjectId": "unique_user_id", - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user is None - - -def test_get_or_create_for_cognito_returns_none_with_empty_permission_sets(): - jwt_payload = { - "entraObjectId": "unique_user_id", - "permissionSets": [], - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user is None diff --git a/tests/unit/common/auth/cognito_jwt/test_validator.py b/tests/unit/common/auth/cognito_jwt/test_validator.py deleted file mode 100644 index 6c985ae1df..0000000000 --- a/tests/unit/common/auth/cognito_jwt/test_validator.py +++ /dev/null @@ -1,120 +0,0 @@ -import pytest -from datetime import datetime, timedelta, timezone -from utils import create_jwt_token - -from common.auth.cognito_jwt import validator - - -def test_validate_token(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - - -def test_validate_token_error_key(cognito_well_known_keys, jwk_private_key_two): - token = create_jwt_token( - jwk_private_key_two, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - with pytest.raises(validator.TokenError): - auth.validate(token) - - -def test_validate_token_valid_expiry(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - - -def test_validate_token_error_expired(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - with pytest.raises(validator.TokenError): - auth.validate(token) - - -def test_validate_token_error_aud(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "other-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - - with pytest.raises(validator.TokenError): - auth.validate(token) - - -def test_validate_token_missing_aud(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - # missing aud - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - - -@pytest.mark.parametrize( - "is_cache_enabled,responses_calls", [(None, 2), (False, 2), (True, 1)] -) -def test_validate_token_caching( - cognito_well_known_keys, - jwk_private_key_one, - settings, - responses, - is_cache_enabled, - responses_calls, -): - if is_cache_enabled is not None: - settings.COGNITO_PUBLIC_KEYS_CACHING_ENABLED = is_cache_enabled - - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - assert len(responses.calls) == 1 - - auth_again = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth_again.validate(token) - assert len(responses.calls) == responses_calls diff --git a/tests/unit/common/auth/cognito_jwt/conftest.py b/tests/unit/common/auth/jwt/conftest.py similarity index 80% rename from tests/unit/common/auth/cognito_jwt/conftest.py rename to tests/unit/common/auth/jwt/conftest.py index 0f4c2db37b..d11bc0564e 100644 --- a/tests/unit/common/auth/cognito_jwt/conftest.py +++ b/tests/unit/common/auth/jwt/conftest.py @@ -19,6 +19,21 @@ def cognito_settings(settings): } settings.ROOT_URLCONF = "urls" +@pytest.fixture(autouse=True) +def entra_settings(settings): + settings.ENTRA_TENANT_ID = "entra_tenant" + settings.ENTRA_AUDIENCE = "entra_audience" + settings.ENTRA_APP_ID = "entraOID" + settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = False + settings.ENTRA_JWT_AUTH_HEADER = "HTTP_AUTHORIZATION" + settings.CACHES = { + "default": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": "unique-snowflake", + } + } + settings.ROOT_URLCONF = "urls" + def _private_to_public_key(private_key): data = copy.deepcopy(private_key) @@ -99,3 +114,14 @@ def cognito_well_known_keys(responses, jwk_public_key_one, jwk_public_key_two): json=jwk_keys, status=200, ) + + +@pytest.fixture() +def entra_well_known_keys(responses, jwk_public_key_one, jwk_public_key_two): + jwk_keys = {"keys": [jwk_public_key_one]} + responses.add( + responses.GET, + "https://login.microsoftonline.com/common/discovery/keys", + json=jwk_keys, + status=200, + ) diff --git a/tests/unit/common/auth/cognito_jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py similarity index 66% rename from tests/unit/common/auth/cognito_jwt/test_backend.py rename to tests/unit/common/auth/jwt/test_backend.py index 0d2c22ec13..e404aa86eb 100644 --- a/tests/unit/common/auth/cognito_jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -4,9 +4,11 @@ from django.test import Client, override_settings from rest_framework import status from rest_framework.exceptions import AuthenticationFailed +from unittest.mock import patch from utils import create_jwt_token +import uuid -from common.auth.cognito_jwt import backend +from common.auth.jwt import backend USER_MODEL = get_user_model() @@ -41,9 +43,9 @@ def test_authenticate_no_token(rf): @pytest.mark.parametrize( "cognito_user_manager", - ["common.auth.cognito_jwt.user_manager.CognitoManager", None], + ["common.auth.jwt.user_manager.CognitoManager", None], ) -def test_custom_user_manager( +def test_custom_user_manager_cognito( rf, monkeypatch, cognito_well_known_keys, jwk_private_key_one, cognito_user_manager ): settings.COGNITO_USER_MANAGER = cognito_user_manager @@ -63,11 +65,11 @@ def func(payload): if cognito_user_manager: monkeypatch.setattr( - f"{cognito_user_manager}.get_or_create_for_cognito", func, raising=False + f"{cognito_user_manager}.get_or_create", func, raising=False ) else: monkeypatch.setattr( - USER_MODEL.objects, "get_or_create_for_cognito", func, raising=False + USER_MODEL.objects, "get_or_create", func, raising=False ) headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} @@ -83,7 +85,7 @@ def test_authenticate_valid_token_with_permission_set( rf, cognito_well_known_keys, jwk_private_key_one ): settings.COGNITO_USER_MANAGER = ( - "common.auth.cognito_jwt.user_manager.CognitoManager" + "common.auth.jwt.user_manager.CognitoManager" ) token = create_jwt_token( jwk_private_key_one, @@ -105,34 +107,42 @@ def test_authenticate_valid_token_with_permission_set( assert auth_token == token.encode("utf8") +@patch("common.auth.jwt.user_manager.get_user_permission_set") def test_authenticate_valid_token_without_permission_set( - rf, cognito_well_known_keys, jwk_private_key_one + mock_get_perms, rf, cognito_well_known_keys, jwk_private_key_one ): + fake_permissions = ["Permission_1", "Permission_2"] + mock_get_perms.return_value = fake_permissions + settings.COGNITO_USER_MANAGER = ( - "common.auth.cognito_jwt.user_manager.CognitoManager" + "common.auth.jwt.user_manager.CognitoManager" ) + entra_id = str(uuid.uuid4()) token = create_jwt_token( jwk_private_key_one, { "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", "aud": settings.COGNITO_AUDIENCE, "sub": "username", - "entraObjectId": "entraOID", + "entraObjectId": entra_id, }, ) headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() - response = auth.authenticate(request) - assert response is None + user, auth_token = auth.authenticate(request) + assert user + assert user.username == entra_id + assert auth_token == token.encode("utf8") + mock_get_perms.assert_called_once_with(entra_id) def test_authenticate_valid_token_with_empty_permission_set( rf, cognito_well_known_keys, jwk_private_key_one ): settings.COGNITO_USER_MANAGER = ( - "common.auth.cognito_jwt.user_manager.CognitoManager" + "common.auth.jwt.user_manager.CognitoManager" ) token = create_jwt_token( jwk_private_key_one, @@ -148,8 +158,10 @@ def test_authenticate_valid_token_with_empty_permission_set( headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() - response = auth.authenticate(request) - assert response is None + user, auth_token = auth.authenticate(request) + assert user + assert user.username == "entraOID" + assert auth_token == token.encode("utf8") def test_authenticate_invalid(rf, cognito_well_known_keys, jwk_private_key_two): @@ -203,3 +215,63 @@ def test_authenticate_error_response_code(): resp = client.get("/", **headers) assert resp.status_code == status.HTTP_401_UNAUTHORIZED + + +def test_authenticate_invalid_entra(rf, entra_well_known_keys, jwk_private_key_two): + token = create_jwt_token( + jwk_private_key_two, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + + headers = {settings.ENTRA_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + request = rf.get("/", **headers) + auth = backend.JSONWebTokenAuthentication() + + with pytest.raises(AuthenticationFailed): + auth.authenticate(request) + + +@pytest.mark.parametrize( + "entra_user_manager", + ["common.auth.jwt.user_manager.EntraManager", None], +) +def test_custom_user_manager_entra( + rf, monkeypatch, entra_well_known_keys, jwk_private_key_one, entra_user_manager +): + settings.ENTRA_USER_MANAGER = entra_user_manager + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + + @staticmethod + def func(payload): + return USER_MODEL(username=payload["appid"]) + + if entra_user_manager: + monkeypatch.setattr( + f"{entra_user_manager}.get_or_create", func, raising=False + ) + else: + monkeypatch.setattr( + USER_MODEL.objects, "get_or_create", func, raising=False + ) + + headers = {settings.ENTRA_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + request = rf.get("/", **headers) + auth = backend.JSONWebTokenAuthentication() + user, auth_token = auth.authenticate(request) + assert user + assert user.username == "entraOID" + assert auth_token == token.encode("utf8") diff --git a/tests/unit/common/auth/jwt/test_user_manager.py b/tests/unit/common/auth/jwt/test_user_manager.py new file mode 100644 index 0000000000..557d97855d --- /dev/null +++ b/tests/unit/common/auth/jwt/test_user_manager.py @@ -0,0 +1,100 @@ +import pytest +from unittest.mock import patch +import uuid + +from django.contrib.auth import get_user_model +from rest_framework.exceptions import AuthenticationFailed + +from common.auth.jwt.user_manager import CognitoManager, EntraManager + +USER_MODEL = get_user_model() + + +def test_get_or_create_for_cognito_returns_user(): + jwt_payload = { + "entraObjectId": "unique_user_id", + "permissionSets": ["all_the_permissions"], + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["entraObjectId"] + assert user.permission_sets == jwt_payload["permissionSets"] + assert user.is_active is True + + +def test_get_or_create_for_cognito_returns_none_without_username(): + jwt_payload = { + "permissionSets": ["all_the_permissions"], + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user is None + + +@patch("common.auth.jwt.user_manager.get_user_permission_set") +def test_get_or_create_for_cognito_returns_without_permission_sets(mock_get_perms): + fake_permissions = ["Permission_1", "Permission_2"] + mock_get_perms.return_value = fake_permissions + jwt_payload = { + "entraObjectId": "unique_user_id", + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["entraObjectId"] + assert user.permission_sets == fake_permissions + assert user.is_active is True + mock_get_perms.assert_called_once_with(jwt_payload["entraObjectId"]) + + +def test_get_or_create_for_cognito_returns_with_empty_permission_sets(): + jwt_payload = { + "entraObjectId": uuid.uuid4(), + "permissionSets": [], + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["entraObjectId"] + assert user.permission_sets == jwt_payload["permissionSets"] + assert user.is_active is True + + +@patch("cms.auth_content.models.users.User.objects.filter") +@patch("common.auth.jwt.user_manager.get_user_permission_set") +def test_get_or_create_for_entra_returns_with_permission_sets_lookup(mock_get_perms, mock_user_filter): + fake_permissions = ["Permission_1", "Permission_2"] + mock_get_perms.return_value = fake_permissions + mock_user_filter.return_value.exists.return_value = True + jwt_payload = { + "appid": uuid.uuid4(), + } + + user = EntraManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["appid"] + assert user.permission_sets == fake_permissions + assert user.is_active is True + mock_get_perms.assert_called_once_with(jwt_payload["appid"]) + + +@patch("cms.auth_content.models.users.User.objects.filter") +def test_get_or_create_for_entra_raises_exception_if_user_not_found(mock_user_filter): + mock_user_filter.return_value.exists.return_value = False + jwt_payload = { + "appid": uuid.uuid4(), + } + + with pytest.raises(AuthenticationFailed): + EntraManager.get_or_create(jwt_payload) + + +def test_get_or_create_for_entra_returns_none_without_username(): + jwt_payload = { + "permissionSets": ["all_the_permissions"], + } + + user = EntraManager.get_or_create(jwt_payload) + assert user is None + diff --git a/tests/unit/common/auth/jwt/test_validator.py b/tests/unit/common/auth/jwt/test_validator.py new file mode 100644 index 0000000000..6ac90d2b99 --- /dev/null +++ b/tests/unit/common/auth/jwt/test_validator.py @@ -0,0 +1,250 @@ +import pytest +from datetime import datetime, timedelta, timezone +from django.conf import settings +from utils import create_jwt_token + +from common.auth.jwt import validator + + +def test_validate_token(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + + +def test_validate_token_error_key(cognito_well_known_keys, jwk_private_key_two): + token = create_jwt_token( + jwk_private_key_two, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_valid_expiry(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + + +def test_validate_token_error_expired(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_error_aud(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "other-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_missing_aud(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + # missing aud + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + + +@pytest.mark.parametrize( + "is_cache_enabled,responses_calls", [(None, 2), (False, 2), (True, 1)] +) +def test_validate_token_caching( + cognito_well_known_keys, + jwk_private_key_one, + settings, + responses, + is_cache_enabled, + responses_calls, +): + if is_cache_enabled is not None: + settings.COGNITO_PUBLIC_KEYS_CACHING_ENABLED = is_cache_enabled + + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + assert len(responses.calls) == 1 + + auth_again = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth_again.validate(token) + assert len(responses.calls) == responses_calls + + +def test_validate_entra_token(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth.validate(token) + + +def test_validate_token_error_key_entra(entra_well_known_keys, jwk_private_key_two): + token = create_jwt_token( + jwk_private_key_two, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_valid_expiry_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth.validate(token) + + +def test_validate_token_error_expired_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_error_aud_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": "other-aud", + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_missing_aud_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + with pytest.raises(validator.TokenError): + auth.validate(token) + + +@pytest.mark.parametrize( + "is_cache_enabled,responses_calls", [(None, 2), (False, 2), (True, 1)] +) +def test_validate_token_caching_entra( + entra_well_known_keys, + jwk_private_key_one, + settings, + responses, + is_cache_enabled, + responses_calls, +): + if is_cache_enabled is not None: + settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = is_cache_enabled + + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_APP_ID, + "roles": ["Application.Read"], + }, + ) + auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth.validate(token) + assert len(responses.calls) == 1 + + auth_again = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth_again.validate(token) + assert len(responses.calls) == responses_calls diff --git a/tests/unit/common/auth/cognito_jwt/urls.py b/tests/unit/common/auth/jwt/urls.py similarity index 85% rename from tests/unit/common/auth/cognito_jwt/urls.py rename to tests/unit/common/auth/jwt/urls.py index dc211481cf..57c954e866 100644 --- a/tests/unit/common/auth/cognito_jwt/urls.py +++ b/tests/unit/common/auth/jwt/urls.py @@ -2,7 +2,7 @@ from rest_framework.decorators import api_view, authentication_classes from rest_framework.response import Response -from common.auth.cognito_jwt import JSONWebTokenAuthentication +from common.auth.jwt import JSONWebTokenAuthentication @api_view(http_method_names=["GET"]) diff --git a/tests/unit/common/auth/cognito_jwt/utils.py b/tests/unit/common/auth/jwt/utils.py similarity index 100% rename from tests/unit/common/auth/cognito_jwt/utils.py rename to tests/unit/common/auth/jwt/utils.py From debed12413adad1133d67e319a38952da50b4f10 Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Fri, 12 Jun 2026 10:47:18 +0100 Subject: [PATCH 02/13] formatting --- common/auth/jwt/backend.py | 28 ++++++++++++---- common/auth/jwt/user_manager.py | 1 + common/auth/jwt/validator.py | 4 +-- tests/unit/common/auth/jwt/conftest.py | 1 + tests/unit/common/auth/jwt/test_backend.py | 24 ++++---------- .../unit/common/auth/jwt/test_user_manager.py | 5 +-- tests/unit/common/auth/jwt/test_validator.py | 32 ++++++++++++++----- 7 files changed, 57 insertions(+), 38 deletions(-) diff --git a/common/auth/jwt/backend.py b/common/auth/jwt/backend.py index d1c214fe24..3141031d89 100644 --- a/common/auth/jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -54,7 +54,9 @@ def authenticate(self, request): token_validator, provider_name = self.get_token_validator(jwt_token) except TokenError: logger.debug(f"Failed to identify token provider: {e}") - raise exceptions.AuthenticationFailed(_("Unknown or malformed token issuer.")) + raise exceptions.AuthenticationFailed( + _("Unknown or malformed token issuer.") + ) try: jwt_payload = token_validator.validate(jwt_token) @@ -83,14 +85,22 @@ def get_custom_user_manager(provider="cognito"): via get_or_create_for_cognito (or get_or_create_for_entra) on the user manager, this allows use of the default unmodified Django User model""" result = None - custom_user_manager_path = getattr(settings, "ENTRA_USER_MANAGER", False) if provider == "entra" else getattr(settings, "COGNITO_USER_MANAGER", False) + custom_user_manager_path = ( + getattr(settings, "ENTRA_USER_MANAGER", False) + if provider == "entra" + else getattr(settings, "COGNITO_USER_MANAGER", False) + ) if custom_user_manager_path: result = import_string(custom_user_manager_path)() return result @staticmethod def get_user_model(provider="cognito"): - user_model = getattr(settings, "ENTRA_USER_MODEL", settings.AUTH_USER_MODEL) if provider == "entra" else getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) + user_model = ( + getattr(settings, "ENTRA_USER_MODEL", settings.AUTH_USER_MODEL) + if provider == "entra" + else getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) + ) return django_apps.get_model(user_model, require_ready=False) @staticmethod @@ -115,7 +125,9 @@ def get_jwt_token(request): def get_token_validator(jwt_token): try: # Decode without verifying signature just to read the header/payload - unverified_payload = jwt.decode(jwt_token, options={"verify_signature": False}) + unverified_payload = jwt.decode( + jwt_token, options={"verify_signature": False} + ) issuer = unverified_payload.get("iss", "") except jwt.PyJWTError: raise exceptions.AuthenticationFailed(_("Malformed JWT.")) @@ -127,7 +139,7 @@ def get_token_validator(jwt_token): settings.COGNITO_AUDIENCE, ) return validator, "cognito" - + elif "sts.windows.net" in issuer: validator = EntraTokenValidator( settings.ENTRA_TENANT_ID, @@ -135,9 +147,11 @@ def get_token_validator(jwt_token): settings.ENTRA_APP_ID, ) return validator, "entra" - + else: - raise exceptions.AuthenticationFailed(_("Invalid or unsupported token issuer.")) + raise exceptions.AuthenticationFailed( + _("Invalid or unsupported token issuer.") + ) @staticmethod def authenticate_header(request): diff --git a/common/auth/jwt/user_manager.py b/common/auth/jwt/user_manager.py index 2b9aedfecb..39b778a594 100644 --- a/common/auth/jwt/user_manager.py +++ b/common/auth/jwt/user_manager.py @@ -10,6 +10,7 @@ logger = logging.getLogger(__name__) + def get_user_permission_set(id: str): permissions = UserManager.get_permission_sets_for_user(id) return build_permission_hierarchy(permissions) diff --git a/common/auth/jwt/validator.py b/common/auth/jwt/validator.py index 82a89e64f8..4ba4933f77 100644 --- a/common/auth/jwt/validator.py +++ b/common/auth/jwt/validator.py @@ -132,9 +132,7 @@ def _get_public_key(self, token): def validate(self, token): try: - unverified_payload = jwt.decode( - token, options={"verify_signature": False} - ) + unverified_payload = jwt.decode(token, options={"verify_signature": False}) except jwt.DecodeError as exc: raise TokenError(str(exc)) from exc diff --git a/tests/unit/common/auth/jwt/conftest.py b/tests/unit/common/auth/jwt/conftest.py index d11bc0564e..121f400682 100644 --- a/tests/unit/common/auth/jwt/conftest.py +++ b/tests/unit/common/auth/jwt/conftest.py @@ -19,6 +19,7 @@ def cognito_settings(settings): } settings.ROOT_URLCONF = "urls" + @pytest.fixture(autouse=True) def entra_settings(settings): settings.ENTRA_TENANT_ID = "entra_tenant" diff --git a/tests/unit/common/auth/jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py index e404aa86eb..d4e80c5d3f 100644 --- a/tests/unit/common/auth/jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -68,9 +68,7 @@ def func(payload): f"{cognito_user_manager}.get_or_create", func, raising=False ) else: - monkeypatch.setattr( - USER_MODEL.objects, "get_or_create", func, raising=False - ) + monkeypatch.setattr(USER_MODEL.objects, "get_or_create", func, raising=False) headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) @@ -84,9 +82,7 @@ def func(payload): def test_authenticate_valid_token_with_permission_set( rf, cognito_well_known_keys, jwk_private_key_one ): - settings.COGNITO_USER_MANAGER = ( - "common.auth.jwt.user_manager.CognitoManager" - ) + settings.COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" token = create_jwt_token( jwk_private_key_one, { @@ -114,9 +110,7 @@ def test_authenticate_valid_token_without_permission_set( fake_permissions = ["Permission_1", "Permission_2"] mock_get_perms.return_value = fake_permissions - settings.COGNITO_USER_MANAGER = ( - "common.auth.jwt.user_manager.CognitoManager" - ) + settings.COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" entra_id = str(uuid.uuid4()) token = create_jwt_token( jwk_private_key_one, @@ -141,9 +135,7 @@ def test_authenticate_valid_token_without_permission_set( def test_authenticate_valid_token_with_empty_permission_set( rf, cognito_well_known_keys, jwk_private_key_one ): - settings.COGNITO_USER_MANAGER = ( - "common.auth.jwt.user_manager.CognitoManager" - ) + settings.COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" token = create_jwt_token( jwk_private_key_one, { @@ -260,13 +252,9 @@ def func(payload): return USER_MODEL(username=payload["appid"]) if entra_user_manager: - monkeypatch.setattr( - f"{entra_user_manager}.get_or_create", func, raising=False - ) + monkeypatch.setattr(f"{entra_user_manager}.get_or_create", func, raising=False) else: - monkeypatch.setattr( - USER_MODEL.objects, "get_or_create", func, raising=False - ) + monkeypatch.setattr(USER_MODEL.objects, "get_or_create", func, raising=False) headers = {settings.ENTRA_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) diff --git a/tests/unit/common/auth/jwt/test_user_manager.py b/tests/unit/common/auth/jwt/test_user_manager.py index 557d97855d..209ba2ffdf 100644 --- a/tests/unit/common/auth/jwt/test_user_manager.py +++ b/tests/unit/common/auth/jwt/test_user_manager.py @@ -63,7 +63,9 @@ def test_get_or_create_for_cognito_returns_with_empty_permission_sets(): @patch("cms.auth_content.models.users.User.objects.filter") @patch("common.auth.jwt.user_manager.get_user_permission_set") -def test_get_or_create_for_entra_returns_with_permission_sets_lookup(mock_get_perms, mock_user_filter): +def test_get_or_create_for_entra_returns_with_permission_sets_lookup( + mock_get_perms, mock_user_filter +): fake_permissions = ["Permission_1", "Permission_2"] mock_get_perms.return_value = fake_permissions mock_user_filter.return_value.exists.return_value = True @@ -97,4 +99,3 @@ def test_get_or_create_for_entra_returns_none_without_username(): user = EntraManager.get_or_create(jwt_payload) assert user is None - diff --git a/tests/unit/common/auth/jwt/test_validator.py b/tests/unit/common/auth/jwt/test_validator.py index 6ac90d2b99..3ea7112ff2 100644 --- a/tests/unit/common/auth/jwt/test_validator.py +++ b/tests/unit/common/auth/jwt/test_validator.py @@ -132,7 +132,9 @@ def test_validate_entra_token(entra_well_known_keys, jwk_private_key_one): "roles": ["Application.Read"], }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) auth.validate(token) @@ -147,7 +149,9 @@ def test_validate_token_error_key_entra(entra_well_known_keys, jwk_private_key_t "roles": ["Application.Read"], }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -164,7 +168,9 @@ def test_validate_token_valid_expiry_entra(entra_well_known_keys, jwk_private_ke "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) auth.validate(token) @@ -180,7 +186,9 @@ def test_validate_token_error_expired_entra(entra_well_known_keys, jwk_private_k "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -196,7 +204,9 @@ def test_validate_token_error_aud_entra(entra_well_known_keys, jwk_private_key_o "roles": ["Application.Read"], }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -212,7 +222,9 @@ def test_validate_token_missing_aud_entra(entra_well_known_keys, jwk_private_key "roles": ["Application.Read"], }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -241,10 +253,14 @@ def test_validate_token_caching_entra( "roles": ["Application.Read"], }, ) - auth = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) auth.validate(token) assert len(responses.calls) == 1 - auth_again = validator.EntraTokenValidator(settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID) + auth_again = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + ) auth_again.validate(token) assert len(responses.calls) == responses_calls From 5a43fcc5d531f61e315174a86e72a50501ae1e4e Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Fri, 12 Jun 2026 11:09:21 +0100 Subject: [PATCH 03/13] Ruff formatting fixes --- common/auth/jwt/backend.py | 25 ++++++++++++------------- common/auth/jwt/user_manager.py | 11 ++++++----- common/auth/jwt/validator.py | 11 +++++------ 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/common/auth/jwt/backend.py b/common/auth/jwt/backend.py index 3141031d89..0e41919e37 100644 --- a/common/auth/jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -1,6 +1,6 @@ -import jwt import logging +import jwt from django.apps import apps as django_apps from django.conf import settings from django.utils.encoding import force_str @@ -9,7 +9,7 @@ from rest_framework import HTTP_HEADER_ENCODING, exceptions from rest_framework.authentication import BaseAuthentication -from .validator import TokenError, CognitoTokenValidator, EntraTokenValidator +from .validator import CognitoTokenValidator, EntraTokenValidator, TokenError logger = logging.getLogger(__name__) @@ -52,16 +52,18 @@ def authenticate(self, request): # Authenticate token try: token_validator, provider_name = self.get_token_validator(jwt_token) - except TokenError: - logger.debug(f"Failed to identify token provider: {e}") + except TokenError as e: + logger.debug("Failed to identify token provider: %s", e) raise exceptions.AuthenticationFailed( _("Unknown or malformed token issuer.") - ) + ) from e try: jwt_payload = token_validator.validate(jwt_token) except TokenError as e: - logger.debug(f"{provider_name.capitalize()} token validation failed: {e}") + logger.debug( + "%s token validation failed: %s", provider_name.capitalize(), e + ) raise exceptions.AuthenticationFailed from None custom_user_manager = self.get_custom_user_manager(provider_name) @@ -129,8 +131,8 @@ def get_token_validator(jwt_token): jwt_token, options={"verify_signature": False} ) issuer = unverified_payload.get("iss", "") - except jwt.PyJWTError: - raise exceptions.AuthenticationFailed(_("Malformed JWT.")) + except jwt.PyJWTError as e: + raise exceptions.AuthenticationFailed(_("Malformed JWT.")) from e if "cognito-idp" in issuer: validator = CognitoTokenValidator( @@ -140,7 +142,7 @@ def get_token_validator(jwt_token): ) return validator, "cognito" - elif "sts.windows.net" in issuer: + if "sts.windows.net" in issuer: validator = EntraTokenValidator( settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, @@ -148,10 +150,7 @@ def get_token_validator(jwt_token): ) return validator, "entra" - else: - raise exceptions.AuthenticationFailed( - _("Invalid or unsupported token issuer.") - ) + raise exceptions.AuthenticationFailed(_("Invalid or unsupported token issuer.")) @staticmethod def authenticate_header(request): diff --git a/common/auth/jwt/user_manager.py b/common/auth/jwt/user_manager.py index 39b778a594..ad0680d2e6 100644 --- a/common/auth/jwt/user_manager.py +++ b/common/auth/jwt/user_manager.py @@ -2,17 +2,17 @@ from django.contrib.auth import get_user_model from django.contrib.auth.models import BaseUserManager +from rest_framework import exceptions -from metrics.data.managers.rbac_models.user import UserManager from cms.auth_content.models.users import User +from metrics.data.managers.rbac_models.user import UserManager from metrics.utils.permission_hierarchy import build_permission_hierarchy -from rest_framework import exceptions logger = logging.getLogger(__name__) -def get_user_permission_set(id: str): - permissions = UserManager.get_permission_sets_for_user(id) +def get_user_permission_set(user_id: str): + permissions = UserManager.get_permission_sets_for_user(user_id) return build_permission_hierarchy(permissions) @@ -57,7 +57,8 @@ def get_or_create(jwt_payload): try: username = jwt_payload["appid"] if not User.objects.filter(user_id=username).exists(): - raise exceptions.AuthenticationFailed(("Application not found.")) + msg = "Application not found." + raise exceptions.AuthenticationFailed(msg) permission_sets = get_user_permission_set(username) except KeyError: logger.info( diff --git a/common/auth/jwt/validator.py b/common/auth/jwt/validator.py index 4ba4933f77..fdb1390b9d 100644 --- a/common/auth/jwt/validator.py +++ b/common/auth/jwt/validator.py @@ -93,15 +93,12 @@ def __init__(self, tenant_id, audience, app_id): self.tenant_id = tenant_id self.audience = audience self.app_id = app_id + self.jwks_url = "https://login.microsoftonline.com/common/discovery/keys" @cached_property def expected_issuer(self): return f"https://sts.windows.net/{self.tenant_id}/" - @cached_property - def jwks_url(self): - return "https://login.microsoftonline.com/common/discovery/keys" - @cached_property def _json_web_keys(self): response = requests.get(self.jwks_url, timeout=10) @@ -164,10 +161,12 @@ def validate(self, token): roles = payload.get("roles", []) if "Application.Read" not in roles: - raise TokenError("Missing required role: Application.Read") + msg = "Missing required role: Application.Read" + raise TokenError(msg) app_id_claim = payload.get("appid") or payload.get("azp") if app_id_claim != self.app_id: - raise TokenError("Invalid app_id claim") + msg = "Invalid app_id claim" + raise TokenError(msg) return payload From cadcb4bf07f2a68407d099caf151f9ec7a359b64 Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Fri, 12 Jun 2026 11:19:57 +0100 Subject: [PATCH 04/13] Removed redundant check --- common/auth/jwt/backend.py | 2 +- common/auth/jwt/validator.py | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/common/auth/jwt/backend.py b/common/auth/jwt/backend.py index 0e41919e37..89edda15ad 100644 --- a/common/auth/jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -128,7 +128,7 @@ def get_token_validator(jwt_token): try: # Decode without verifying signature just to read the header/payload unverified_payload = jwt.decode( - jwt_token, options={"verify_signature": False} + jwt_token, options={"verify_signature": False} # noqa: S5659 ) issuer = unverified_payload.get("iss", "") except jwt.PyJWTError as e: diff --git a/common/auth/jwt/validator.py b/common/auth/jwt/validator.py index fdb1390b9d..2dec1c347c 100644 --- a/common/auth/jwt/validator.py +++ b/common/auth/jwt/validator.py @@ -128,15 +128,6 @@ def _get_public_key(self, token): return None def validate(self, token): - try: - unverified_payload = jwt.decode(token, options={"verify_signature": False}) - except jwt.DecodeError as exc: - raise TokenError(str(exc)) from exc - - if "permission_sets" in unverified_payload: - logger.debug("permission_sets found in token, skipping validation.") - return unverified_payload - public_key = self._get_public_key(token) if not public_key: msg = "No key found for this token" From 48b35b89e90c10d59632d607b7b9b6777545a1af Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Tue, 16 Jun 2026 11:13:49 +0100 Subject: [PATCH 05/13] Updating role capitalisation --- common/auth/jwt/validator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/auth/jwt/validator.py b/common/auth/jwt/validator.py index 2dec1c347c..73bd4822ff 100644 --- a/common/auth/jwt/validator.py +++ b/common/auth/jwt/validator.py @@ -151,8 +151,8 @@ def validate(self, token): raise TokenError(str(exc)) from exc roles = payload.get("roles", []) - if "Application.Read" not in roles: - msg = "Missing required role: Application.Read" + if "application.read" not in roles: + msg = "Missing required role: application.read" raise TokenError(msg) app_id_claim = payload.get("appid") or payload.get("azp") From 89c0adb3f6c7e5b1c9b33eb9eb5fe873358b75fa Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Tue, 16 Jun 2026 11:18:20 +0100 Subject: [PATCH 06/13] capitalisation in tests --- tests/unit/common/auth/jwt/test_backend.py | 4 ++-- tests/unit/common/auth/jwt/test_validator.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/unit/common/auth/jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py index d4e80c5d3f..b5860f7580 100644 --- a/tests/unit/common/auth/jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -216,7 +216,7 @@ def test_authenticate_invalid_entra(rf, entra_well_known_keys, jwk_private_key_t "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) @@ -243,7 +243,7 @@ def test_custom_user_manager_entra( "aud": settings.ENTRA_AUDIENCE, "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) diff --git a/tests/unit/common/auth/jwt/test_validator.py b/tests/unit/common/auth/jwt/test_validator.py index 3ea7112ff2..9be654cb2e 100644 --- a/tests/unit/common/auth/jwt/test_validator.py +++ b/tests/unit/common/auth/jwt/test_validator.py @@ -129,7 +129,7 @@ def test_validate_entra_token(entra_well_known_keys, jwk_private_key_one): "aud": settings.ENTRA_AUDIENCE, "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( @@ -146,7 +146,7 @@ def test_validate_token_error_key_entra(entra_well_known_keys, jwk_private_key_t "aud": settings.ENTRA_AUDIENCE, "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( @@ -164,7 +164,7 @@ def test_validate_token_valid_expiry_entra(entra_well_known_keys, jwk_private_ke "aud": settings.ENTRA_AUDIENCE, "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), }, ) @@ -182,7 +182,7 @@ def test_validate_token_error_expired_entra(entra_well_known_keys, jwk_private_k "aud": settings.ENTRA_AUDIENCE, "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), }, ) @@ -201,7 +201,7 @@ def test_validate_token_error_aud_entra(entra_well_known_keys, jwk_private_key_o "aud": "other-aud", "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( @@ -219,7 +219,7 @@ def test_validate_token_missing_aud_entra(entra_well_known_keys, jwk_private_key "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( @@ -250,7 +250,7 @@ def test_validate_token_caching_entra( "aud": settings.ENTRA_AUDIENCE, "sub": "username", "appid": settings.ENTRA_APP_ID, - "roles": ["Application.Read"], + "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( From 31779485874e58ad7373b2a25821364dcdc24bfc Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Tue, 16 Jun 2026 11:23:45 +0100 Subject: [PATCH 07/13] Requested cryptography package upgrade --- requirements-prod.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-prod.txt b/requirements-prod.txt index ad09cb2006..7466cf1316 100644 --- a/requirements-prod.txt +++ b/requirements-prod.txt @@ -10,7 +10,7 @@ click==8.4.1 colorama==0.4.6 coreapi==2.3.3 coreschema==0.0.4 -cryptography==48.0.0 +cryptography==48.0.1 defusedxml==0.7.1 distlib==0.4.0 django-cors-headers==4.8.0 From f67df3ef1b7b10c6a5cb026c2f7323226a78387c Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Thu, 18 Jun 2026 15:06:47 +0100 Subject: [PATCH 08/13] Fixing header bug --- common/auth/jwt/backend.py | 2 +- config.py | 3 ++- metrics/api/settings/default.py | 1 + tests/unit/common/auth/jwt/test_backend.py | 16 +++++++++++++++- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/common/auth/jwt/backend.py b/common/auth/jwt/backend.py index 89edda15ad..72764f30b5 100644 --- a/common/auth/jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -23,7 +23,7 @@ def get_authorization_header(request): Hide some test client ickyness where the header can be unicode. """ - auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "Authorization") + auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") auth = request.META.get(auth_header, b"") # If the Cognito header isn't present, look for the Entra auth header diff --git a/config.py b/config.py index 108521b864..06112a4005 100644 --- a/config.py +++ b/config.py @@ -63,13 +63,14 @@ # Cognito configuration COGNITO_AWS_REGION = os.environ.get("COGNITO_AWS_REGION") -COGNITO_JWT_AUTH_HEADER = os.environ.get("COGNITO_JWT_AUTH_HEADER") +COGNITO_JWT_AUTH_HEADER = os.environ.get("COGNITO_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") COGNITO_USER_POOL = os.environ.get("COGNITO_USER_POOL") # Entra configuration ENTRA_AUDIENCE = os.environ.get("ENTRA_AUDIENCE") ENTRA_APP_ID = os.environ.get("ENTRA_APP_ID") ENTRA_TENANT_ID = os.environ.get("ENTRA_TENANT_ID") +ENTRA_JWT_AUTH_HEADER = os.environ.get("ENTRA_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") # Database configuration POSTGRES_DB = os.environ.get("POSTGRES_DB") diff --git a/metrics/api/settings/default.py b/metrics/api/settings/default.py index 79b54c72cd..40d04228a5 100644 --- a/metrics/api/settings/default.py +++ b/metrics/api/settings/default.py @@ -122,6 +122,7 @@ ENTRA_TENANT_ID = config.ENTRA_TENANT_ID ENTRA_PUBLIC_KEYS_CACHING_ENABLED = True ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT = 60 * 60 * 24 # 24h caching, default is 300s +ENTRA_JWT_AUTH_HEADER = config.ENTRA_JWT_AUTH_HEADER COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" COGNITO_AWS_REGION = config.COGNITO_AWS_REGION diff --git a/tests/unit/common/auth/jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py index b5860f7580..d7631dae16 100644 --- a/tests/unit/common/auth/jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -23,12 +23,26 @@ def test_get_authorization_header(rf): auth.authenticate(request) +@override_settings() +def test_get_auth_header_no_value(rf): + """test get_authorization_header finds no token header if + COGNITO_JWT_AUTH_HEADER and ENTRA_JWT_AUTH_HEADER is set to None""" + settings.COGNITO_JWT_AUTH_HEADER = None + settings.ENTRA_JWT_AUTH_HEADER = None + headers = {"HTTP_AUTHORIZATION": b"bearer string token"} + request = rf.get("/", **headers) + auth = backend.JSONWebTokenAuthentication() + assert auth.authenticate(request) is None + + @override_settings() def test_get_default_auth_header(rf): """test get_authorization_header uses 'Authorization' header if COGNITO_JWT_AUTH_HEADER is not specified in settings""" del settings.COGNITO_JWT_AUTH_HEADER - headers = {"Authorization": b"bearer string token"} + del settings.ENTRA_JWT_AUTH_HEADER + # Prepend `HTTP_` as it's normally done by Django + headers = {"HTTP_AUTHORIZATION": b"bearer string token"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() with pytest.raises(AuthenticationFailed): From 454116db993021418620d1e157aac2ebe9acfcf6 Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Thu, 18 Jun 2026 15:08:54 +0100 Subject: [PATCH 09/13] Linting --- config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/config.py b/config.py index 06112a4005..7200d6ea4f 100644 --- a/config.py +++ b/config.py @@ -63,7 +63,9 @@ # Cognito configuration COGNITO_AWS_REGION = os.environ.get("COGNITO_AWS_REGION") -COGNITO_JWT_AUTH_HEADER = os.environ.get("COGNITO_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") +COGNITO_JWT_AUTH_HEADER = os.environ.get( + "COGNITO_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION" +) COGNITO_USER_POOL = os.environ.get("COGNITO_USER_POOL") # Entra configuration From 17cdbcbd0654184bb5fb2d317da89c5bebe9eb57 Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Fri, 19 Jun 2026 14:57:06 +0100 Subject: [PATCH 10/13] Unified jwt header env var --- common/auth/jwt/backend.py | 7 +---- config.py | 6 ++-- metrics/api/settings/default.py | 4 +-- tests/unit/common/auth/jwt/conftest.py | 4 +-- tests/unit/common/auth/jwt/test_backend.py | 34 ++++++++++------------ 5 files changed, 23 insertions(+), 32 deletions(-) diff --git a/common/auth/jwt/backend.py b/common/auth/jwt/backend.py index 72764f30b5..53346f3b33 100644 --- a/common/auth/jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -23,14 +23,9 @@ def get_authorization_header(request): Hide some test client ickyness where the header can be unicode. """ - auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") + auth_header = getattr(settings, "JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") auth = request.META.get(auth_header, b"") - # If the Cognito header isn't present, look for the Entra auth header - if not auth: - auth_header = getattr(settings, "ENTRA_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") - auth = request.META.get(auth_header, b"") - if isinstance(auth, str): # Work around django test client oddness auth = auth.encode(HTTP_HEADER_ENCODING) diff --git a/config.py b/config.py index 7200d6ea4f..4cdd2f9969 100644 --- a/config.py +++ b/config.py @@ -61,18 +61,16 @@ # The name of the AWS profile to use for the AWS client used for ingestion AWS_PROFILE_NAME = os.environ.get("AWS_PROFILE_NAME") +JWT_AUTH_HEADER = os.environ.get("JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") + # Cognito configuration COGNITO_AWS_REGION = os.environ.get("COGNITO_AWS_REGION") -COGNITO_JWT_AUTH_HEADER = os.environ.get( - "COGNITO_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION" -) COGNITO_USER_POOL = os.environ.get("COGNITO_USER_POOL") # Entra configuration ENTRA_AUDIENCE = os.environ.get("ENTRA_AUDIENCE") ENTRA_APP_ID = os.environ.get("ENTRA_APP_ID") ENTRA_TENANT_ID = os.environ.get("ENTRA_TENANT_ID") -ENTRA_JWT_AUTH_HEADER = os.environ.get("ENTRA_JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") # Database configuration POSTGRES_DB = os.environ.get("POSTGRES_DB") diff --git a/metrics/api/settings/default.py b/metrics/api/settings/default.py index 40d04228a5..7d9edb542f 100644 --- a/metrics/api/settings/default.py +++ b/metrics/api/settings/default.py @@ -116,17 +116,17 @@ }, ] +JWT_AUTH_HEADER = config.JWT_AUTH_HEADER + ENTRA_USER_MANAGER = "common.auth.jwt.user_manager.EntraManager" ENTRA_AUDIENCE = config.ENTRA_AUDIENCE ENTRA_APP_ID = config.ENTRA_APP_ID ENTRA_TENANT_ID = config.ENTRA_TENANT_ID ENTRA_PUBLIC_KEYS_CACHING_ENABLED = True ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT = 60 * 60 * 24 # 24h caching, default is 300s -ENTRA_JWT_AUTH_HEADER = config.ENTRA_JWT_AUTH_HEADER COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" COGNITO_AWS_REGION = config.COGNITO_AWS_REGION -COGNITO_JWT_AUTH_HEADER = config.COGNITO_JWT_AUTH_HEADER COGNITO_USER_POOL = config.COGNITO_USER_POOL COGNITO_AUDIENCE = None COGNITO_PUBLIC_KEYS_CACHING_ENABLED = True diff --git a/tests/unit/common/auth/jwt/conftest.py b/tests/unit/common/auth/jwt/conftest.py index 121f400682..584bf036ab 100644 --- a/tests/unit/common/auth/jwt/conftest.py +++ b/tests/unit/common/auth/jwt/conftest.py @@ -8,7 +8,7 @@ def cognito_settings(settings): settings.COGNITO_AWS_REGION = "eu-central-1" settings.COGNITO_USER_POOL = "bla" - settings.COGNITO_JWT_AUTH_HEADER = "HTTP_X_UHD_AUTH" + settings.JWT_AUTH_HEADER = "HTTP_X_UHD_AUTH" settings.COGNITO_AUDIENCE = "my-client-id" settings.COGNITO_PUBLIC_KEYS_CACHING_ENABLED = False settings.CACHES = { @@ -26,7 +26,7 @@ def entra_settings(settings): settings.ENTRA_AUDIENCE = "entra_audience" settings.ENTRA_APP_ID = "entraOID" settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = False - settings.ENTRA_JWT_AUTH_HEADER = "HTTP_AUTHORIZATION" + settings.JWT_AUTH_HEADER = "HTTP_AUTHORIZATION" settings.CACHES = { "default": { "BACKEND": "django.core.cache.backends.locmem.LocMemCache", diff --git a/tests/unit/common/auth/jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py index d7631dae16..1813a70070 100644 --- a/tests/unit/common/auth/jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -16,7 +16,7 @@ def test_get_authorization_header(rf): """test get_authorization_header correctly handles a header that is a string not a bytestring as expected""" - headers = {settings.COGNITO_JWT_AUTH_HEADER: "bearer string_token"} + headers = {settings.JWT_AUTH_HEADER: "bearer string_token"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() with pytest.raises(AuthenticationFailed): @@ -26,9 +26,8 @@ def test_get_authorization_header(rf): @override_settings() def test_get_auth_header_no_value(rf): """test get_authorization_header finds no token header if - COGNITO_JWT_AUTH_HEADER and ENTRA_JWT_AUTH_HEADER is set to None""" - settings.COGNITO_JWT_AUTH_HEADER = None - settings.ENTRA_JWT_AUTH_HEADER = None + JWT_AUTH_HEADER is set to None""" + settings.JWT_AUTH_HEADER = None headers = {"HTTP_AUTHORIZATION": b"bearer string token"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -38,9 +37,8 @@ def test_get_auth_header_no_value(rf): @override_settings() def test_get_default_auth_header(rf): """test get_authorization_header uses 'Authorization' header if - COGNITO_JWT_AUTH_HEADER is not specified in settings""" - del settings.COGNITO_JWT_AUTH_HEADER - del settings.ENTRA_JWT_AUTH_HEADER + JWT_AUTH_HEADER is not specified in settings""" + del settings.JWT_AUTH_HEADER # Prepend `HTTP_` as it's normally done by Django headers = {"HTTP_AUTHORIZATION": b"bearer string token"} request = rf.get("/", **headers) @@ -84,7 +82,7 @@ def func(payload): else: monkeypatch.setattr(USER_MODEL.objects, "get_or_create", func, raising=False) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) @@ -108,7 +106,7 @@ def test_authenticate_valid_token_with_permission_set( }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) @@ -136,7 +134,7 @@ def test_authenticate_valid_token_without_permission_set( }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) @@ -161,7 +159,7 @@ def test_authenticate_valid_token_with_empty_permission_set( }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) @@ -180,7 +178,7 @@ def test_authenticate_invalid(rf, cognito_well_known_keys, jwk_private_key_two): }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -189,7 +187,7 @@ def test_authenticate_invalid(rf, cognito_well_known_keys, jwk_private_key_two): def test_authenticate_error_segments(rf): - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer randomiets"} + headers = {settings.JWT_AUTH_HEADER: b"bearer randomiets"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -198,7 +196,7 @@ def test_authenticate_error_segments(rf): def test_authenticate_error_invalid_header(rf): - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer"} + headers = {settings.JWT_AUTH_HEADER: b"bearer"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -207,7 +205,7 @@ def test_authenticate_error_invalid_header(rf): def test_authenticate_error_spaces(rf): - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer random iets"} + headers = {settings.JWT_AUTH_HEADER: b"bearer random iets"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -217,7 +215,7 @@ def test_authenticate_error_spaces(rf): def test_authenticate_error_response_code(): client = Client() - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer random iets"} + headers = {settings.JWT_AUTH_HEADER: b"bearer random iets"} resp = client.get("/", **headers) assert resp.status_code == status.HTTP_401_UNAUTHORIZED @@ -234,7 +232,7 @@ def test_authenticate_invalid_entra(rf, entra_well_known_keys, jwk_private_key_t }, ) - headers = {settings.ENTRA_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -270,7 +268,7 @@ def func(payload): else: monkeypatch.setattr(USER_MODEL.objects, "get_or_create", func, raising=False) - headers = {settings.ENTRA_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) From 9870815ca69cc91840b76366677e38d0b4253105 Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Tue, 23 Jun 2026 09:35:11 +0100 Subject: [PATCH 11/13] Multiple app ids allowed --- common/auth/jwt/backend.py | 2 +- common/auth/jwt/validator.py | 6 +++--- config.py | 1 + metrics/api/settings/default.py | 1 + tests/unit/common/auth/jwt/conftest.py | 2 +- tests/unit/common/auth/jwt/test_backend.py | 4 ++-- 6 files changed, 9 insertions(+), 7 deletions(-) diff --git a/common/auth/jwt/backend.py b/common/auth/jwt/backend.py index 53346f3b33..b2c633015c 100644 --- a/common/auth/jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -141,7 +141,7 @@ def get_token_validator(jwt_token): validator = EntraTokenValidator( settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, - settings.ENTRA_APP_ID, + settings.ENTRA_ALLOWED_APP_IDS, ) return validator, "entra" diff --git a/common/auth/jwt/validator.py b/common/auth/jwt/validator.py index 73bd4822ff..b19cc89934 100644 --- a/common/auth/jwt/validator.py +++ b/common/auth/jwt/validator.py @@ -89,10 +89,10 @@ def validate(self, token): class EntraTokenValidator: - def __init__(self, tenant_id, audience, app_id): + def __init__(self, tenant_id, audience, allowed_app_ids): self.tenant_id = tenant_id self.audience = audience - self.app_id = app_id + self.allowed_app_ids = allowed_app_ids self.jwks_url = "https://login.microsoftonline.com/common/discovery/keys" @cached_property @@ -156,7 +156,7 @@ def validate(self, token): raise TokenError(msg) app_id_claim = payload.get("appid") or payload.get("azp") - if app_id_claim != self.app_id: + if app_id_claim not in self.allowed_app_ids: msg = "Invalid app_id claim" raise TokenError(msg) diff --git a/config.py b/config.py index 4cdd2f9969..ebe52e127f 100644 --- a/config.py +++ b/config.py @@ -70,6 +70,7 @@ # Entra configuration ENTRA_AUDIENCE = os.environ.get("ENTRA_AUDIENCE") ENTRA_APP_ID = os.environ.get("ENTRA_APP_ID") +ENTRA_ALLOWED_APP_IDS = os.environ.get("ENTRA_ALLOWED_APP_IDS", "") ENTRA_TENANT_ID = os.environ.get("ENTRA_TENANT_ID") # Database configuration diff --git a/metrics/api/settings/default.py b/metrics/api/settings/default.py index 7d9edb542f..ea75c6b9d5 100644 --- a/metrics/api/settings/default.py +++ b/metrics/api/settings/default.py @@ -121,6 +121,7 @@ ENTRA_USER_MANAGER = "common.auth.jwt.user_manager.EntraManager" ENTRA_AUDIENCE = config.ENTRA_AUDIENCE ENTRA_APP_ID = config.ENTRA_APP_ID +ENTRA_ALLOWED_APP_IDS = config.ENTRA_ALLOWED_APP_IDS.split(",") ENTRA_TENANT_ID = config.ENTRA_TENANT_ID ENTRA_PUBLIC_KEYS_CACHING_ENABLED = True ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT = 60 * 60 * 24 # 24h caching, default is 300s diff --git a/tests/unit/common/auth/jwt/conftest.py b/tests/unit/common/auth/jwt/conftest.py index 584bf036ab..c81e9f66a2 100644 --- a/tests/unit/common/auth/jwt/conftest.py +++ b/tests/unit/common/auth/jwt/conftest.py @@ -24,7 +24,7 @@ def cognito_settings(settings): def entra_settings(settings): settings.ENTRA_TENANT_ID = "entra_tenant" settings.ENTRA_AUDIENCE = "entra_audience" - settings.ENTRA_APP_ID = "entraOID" + settings.ENTRA_ALLOWED_APP_IDS = ["entraOID","otherEntraID"] settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = False settings.JWT_AUTH_HEADER = "HTTP_AUTHORIZATION" settings.CACHES = { diff --git a/tests/unit/common/auth/jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py index 1813a70070..61b37f8d84 100644 --- a/tests/unit/common/auth/jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -227,7 +227,7 @@ def test_authenticate_invalid_entra(rf, entra_well_known_keys, jwk_private_key_t { "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) @@ -254,7 +254,7 @@ def test_custom_user_manager_entra( "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": settings.ENTRA_AUDIENCE, "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) From f9d0c71781263264155eb69bf45dff78ff3a6e1b Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Tue, 23 Jun 2026 09:43:38 +0100 Subject: [PATCH 12/13] Linting and test fixes --- tests/unit/common/auth/jwt/conftest.py | 2 +- tests/unit/common/auth/jwt/test_validator.py | 46 +++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/tests/unit/common/auth/jwt/conftest.py b/tests/unit/common/auth/jwt/conftest.py index c81e9f66a2..10e1e84173 100644 --- a/tests/unit/common/auth/jwt/conftest.py +++ b/tests/unit/common/auth/jwt/conftest.py @@ -24,7 +24,7 @@ def cognito_settings(settings): def entra_settings(settings): settings.ENTRA_TENANT_ID = "entra_tenant" settings.ENTRA_AUDIENCE = "entra_audience" - settings.ENTRA_ALLOWED_APP_IDS = ["entraOID","otherEntraID"] + settings.ENTRA_ALLOWED_APP_IDS = ["entraOID", "otherEntraID"] settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = False settings.JWT_AUTH_HEADER = "HTTP_AUTHORIZATION" settings.CACHES = { diff --git a/tests/unit/common/auth/jwt/test_validator.py b/tests/unit/common/auth/jwt/test_validator.py index 9be654cb2e..fdb0543157 100644 --- a/tests/unit/common/auth/jwt/test_validator.py +++ b/tests/unit/common/auth/jwt/test_validator.py @@ -128,12 +128,14 @@ def test_validate_entra_token(entra_well_known_keys, jwk_private_key_one): "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": settings.ENTRA_AUDIENCE, "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) auth.validate(token) @@ -145,12 +147,14 @@ def test_validate_token_error_key_entra(entra_well_known_keys, jwk_private_key_t "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": settings.ENTRA_AUDIENCE, "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -163,13 +167,15 @@ def test_validate_token_valid_expiry_entra(entra_well_known_keys, jwk_private_ke "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": settings.ENTRA_AUDIENCE, "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) auth.validate(token) @@ -181,13 +187,15 @@ def test_validate_token_error_expired_entra(entra_well_known_keys, jwk_private_k "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": settings.ENTRA_AUDIENCE, "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -200,12 +208,14 @@ def test_validate_token_error_aud_entra(entra_well_known_keys, jwk_private_key_o "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": "other-aud", "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) with pytest.raises(validator.TokenError): @@ -218,12 +228,14 @@ def test_validate_token_missing_aud_entra(entra_well_known_keys, jwk_private_key { "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) with pytest.raises(validator.TokenError): auth.validate(token) @@ -249,18 +261,22 @@ def test_validate_token_caching_entra( "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", "aud": settings.ENTRA_AUDIENCE, "sub": "username", - "appid": settings.ENTRA_APP_ID, + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], "roles": ["application.read"], }, ) auth = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) auth.validate(token) assert len(responses.calls) == 1 auth_again = validator.EntraTokenValidator( - settings.ENTRA_TENANT_ID, settings.ENTRA_AUDIENCE, settings.ENTRA_APP_ID + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], ) auth_again.validate(token) assert len(responses.calls) == responses_calls From 278e2ea56bd43685b3cba9e9884772ea955dc837 Mon Sep 17 00:00:00 2001 From: itsthatianguy Date: Fri, 26 Jun 2026 10:28:32 +0100 Subject: [PATCH 13/13] Env var update in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index af238454af..08c71ee16e 100644 --- a/README.md +++ b/README.md @@ -333,7 +333,7 @@ you will need to set up the variables for validating the token via cognito: - `export COGNITO_AWS_REGION=eu-west-2` - This is unlikely to change - `export COGNITO_USER_POOL=eu-west-2_a123bc4DE` - Can be found be checking the `User pool ID` value for your environment on the [AWS console] (https://eu-west-2.console.aws.amazon.com/cognito/v2/idp/user-pools?region=eu-west-2) -- `export COGNITO_JWT_AUTH_HEADER=HTTP_X_UHD_AUTH` - This is unlikely to change +- `export JWT_AUTH_HEADER=HTTP_X_UHD_AUTH` - This is unlikely to change ---