diff --git a/README.md b/README.md index af238454af..08c71ee16e 100644 --- a/README.md +++ b/README.md @@ -333,7 +333,7 @@ you will need to set up the variables for validating the token via cognito: - `export COGNITO_AWS_REGION=eu-west-2` - This is unlikely to change - `export COGNITO_USER_POOL=eu-west-2_a123bc4DE` - Can be found be checking the `User pool ID` value for your environment on the [AWS console] (https://eu-west-2.console.aws.amazon.com/cognito/v2/idp/user-pools?region=eu-west-2) -- `export COGNITO_JWT_AUTH_HEADER=HTTP_X_UHD_AUTH` - This is unlikely to change +- `export JWT_AUTH_HEADER=HTTP_X_UHD_AUTH` - This is unlikely to change --- diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py deleted file mode 100644 index 3a69470a4a..0000000000 --- a/common/auth/cognito_jwt/user_manager.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging - -from django.contrib.auth import get_user_model -from django.contrib.auth.models import BaseUserManager - -logger = logging.getLogger(__name__) - - -class CognitoManager(BaseUserManager): - - @staticmethod - def get_or_create_for_cognito(jwt_payload): - """Create an ephemeral user instance for this request. - We don't need to store or retrieve any info, we use what's in the JWT, - so this speeds up the request by removing the need for any DB access - """ - try: - username = jwt_payload["entraObjectId"] - permission_sets = jwt_payload["permissionSets"] - if not permission_sets: - logger.debug( - "Empty permissionSets in token for user: '%s'", - username, - ) - return None - except KeyError: - logger.debug( - "Error getting entraObjectId and/or permissionSets field(s)" - " from jwt payload: '%s'", - jwt_payload, - ) - return None - - user_class = get_user_model() - user = user_class(username=username) - user.permission_sets = permission_sets - return user diff --git a/common/auth/cognito_jwt/__init__.py b/common/auth/jwt/__init__.py similarity index 100% rename from common/auth/cognito_jwt/__init__.py rename to common/auth/jwt/__init__.py diff --git a/common/auth/cognito_jwt/backend.py b/common/auth/jwt/backend.py similarity index 53% rename from common/auth/cognito_jwt/backend.py rename to common/auth/jwt/backend.py index 1fed42fec9..b2c633015c 100644 --- a/common/auth/cognito_jwt/backend.py +++ b/common/auth/jwt/backend.py @@ -1,5 +1,6 @@ import logging +import jwt from django.apps import apps as django_apps from django.conf import settings from django.utils.encoding import force_str @@ -8,7 +9,7 @@ from rest_framework import HTTP_HEADER_ENCODING, exceptions from rest_framework.authentication import BaseAuthentication -from .validator import TokenError, TokenValidator +from .validator import CognitoTokenValidator, EntraTokenValidator, TokenError logger = logging.getLogger(__name__) @@ -22,8 +23,9 @@ def get_authorization_header(request): Hide some test client ickyness where the header can be unicode. """ - auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "Authorization") + auth_header = getattr(settings, "JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") auth = request.META.get(auth_header, b"") + if isinstance(auth, str): # Work around django test client oddness auth = auth.encode(HTTP_HEADER_ENCODING) @@ -44,38 +46,58 @@ def authenticate(self, request): # Authenticate token try: - token_validator = self.get_token_validator(request) + token_validator, provider_name = self.get_token_validator(jwt_token) + except TokenError as e: + logger.debug("Failed to identify token provider: %s", e) + raise exceptions.AuthenticationFailed( + _("Unknown or malformed token issuer.") + ) from e + + try: jwt_payload = token_validator.validate(jwt_token) - except TokenError: + except TokenError as e: + logger.debug( + "%s token validation failed: %s", provider_name.capitalize(), e + ) raise exceptions.AuthenticationFailed from None - custom_user_manager = self.get_custom_user_manager() + custom_user_manager = self.get_custom_user_manager(provider_name) + if custom_user_manager: - user = custom_user_manager.get_or_create_for_cognito(jwt_payload) + user = custom_user_manager.get_or_create(jwt_payload) else: user_model = self.get_user_model() - user = user_model.objects.get_or_create_for_cognito(jwt_payload) + user = user_model.objects.get_or_create(jwt_payload) if not user: logger.debug( "Unable to create user from JWT, defaulting to unauthenticated" ) return None + return (user, jwt_token) @staticmethod - def get_custom_user_manager(): - """If COGNITO_USER_MANAGER is set, then the user object is obtained - via get_or_create_for_cognito on the user manager, this allows use + def get_custom_user_manager(provider="cognito"): + """If COGNITO_USER_MANAGER or ENTRA_USER_MANAGER is set, then the user object is obtained + via get_or_create_for_cognito (or get_or_create_for_entra) on the user manager, this allows use of the default unmodified Django User model""" result = None - custom_user_manager_path = getattr(settings, "COGNITO_USER_MANAGER", False) + custom_user_manager_path = ( + getattr(settings, "ENTRA_USER_MANAGER", False) + if provider == "entra" + else getattr(settings, "COGNITO_USER_MANAGER", False) + ) if custom_user_manager_path: result = import_string(custom_user_manager_path)() return result @staticmethod - def get_user_model(): - user_model = getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) + def get_user_model(provider="cognito"): + user_model = ( + getattr(settings, "ENTRA_USER_MODEL", settings.AUTH_USER_MODEL) + if provider == "entra" + else getattr(settings, "COGNITO_USER_MODEL", settings.AUTH_USER_MODEL) + ) return django_apps.get_model(user_model, require_ready=False) @staticmethod @@ -97,12 +119,33 @@ def get_jwt_token(request): return auth[1] @staticmethod - def get_token_validator(request): - return TokenValidator( - settings.COGNITO_AWS_REGION, - settings.COGNITO_USER_POOL, - settings.COGNITO_AUDIENCE, - ) + def get_token_validator(jwt_token): + try: + # Decode without verifying signature just to read the header/payload + unverified_payload = jwt.decode( + jwt_token, options={"verify_signature": False} # noqa: S5659 + ) + issuer = unverified_payload.get("iss", "") + except jwt.PyJWTError as e: + raise exceptions.AuthenticationFailed(_("Malformed JWT.")) from e + + if "cognito-idp" in issuer: + validator = CognitoTokenValidator( + settings.COGNITO_AWS_REGION, + settings.COGNITO_USER_POOL, + settings.COGNITO_AUDIENCE, + ) + return validator, "cognito" + + if "sts.windows.net" in issuer: + validator = EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS, + ) + return validator, "entra" + + raise exceptions.AuthenticationFailed(_("Invalid or unsupported token issuer.")) @staticmethod def authenticate_header(request): diff --git a/common/auth/jwt/user_manager.py b/common/auth/jwt/user_manager.py new file mode 100644 index 0000000000..ad0680d2e6 --- /dev/null +++ b/common/auth/jwt/user_manager.py @@ -0,0 +1,74 @@ +import logging + +from django.contrib.auth import get_user_model +from django.contrib.auth.models import BaseUserManager +from rest_framework import exceptions + +from cms.auth_content.models.users import User +from metrics.data.managers.rbac_models.user import UserManager +from metrics.utils.permission_hierarchy import build_permission_hierarchy + +logger = logging.getLogger(__name__) + + +def get_user_permission_set(user_id: str): + permissions = UserManager.get_permission_sets_for_user(user_id) + return build_permission_hierarchy(permissions) + + +class CognitoManager(BaseUserManager): + + @staticmethod + def get_or_create(jwt_payload): + """Create an ephemeral user instance for this request. + If the permissions aren't present in the JWT, queries for them in + the database based on the entraObjectId in the token + """ + try: + username = jwt_payload["entraObjectId"] + # Check if the JWT already includes permissionSets + # Use if found, if not grab user permissions from the database + if "permissionSets" in jwt_payload: + permission_sets = jwt_payload["permissionSets"] + else: + permission_sets = get_user_permission_set(username) + except KeyError: + logger.debug( + "Error getting entraObjectId and/or permissionSets field(s)" + " from jwt payload: '%s'", + jwt_payload, + ) + return None + + user_class = get_user_model() + user = user_class(username=username) + user.permission_sets = permission_sets + return user + + +class EntraManager(BaseUserManager): + + @staticmethod + def get_or_create(jwt_payload): + """Create an ephemeral user instance for this request. + If the provided appid isn't present in the database, raises + AuthenticationFailed exception + """ + try: + username = jwt_payload["appid"] + if not User.objects.filter(user_id=username).exists(): + msg = "Application not found." + raise exceptions.AuthenticationFailed(msg) + permission_sets = get_user_permission_set(username) + except KeyError: + logger.info( + "Error getting entraObjectId and/or permissionSets field(s)" + " from jwt payload: '%s'", + jwt_payload, + ) + return None + + user_class = get_user_model() + user = user_class(username=username) + user.permission_sets = permission_sets + return user diff --git a/common/auth/cognito_jwt/validator.py b/common/auth/jwt/validator.py similarity index 51% rename from common/auth/cognito_jwt/validator.py rename to common/auth/jwt/validator.py index 9ace1bd95b..b19cc89934 100644 --- a/common/auth/cognito_jwt/validator.py +++ b/common/auth/jwt/validator.py @@ -15,7 +15,7 @@ class TokenError(Exception): pass -class TokenValidator: +class CognitoTokenValidator: def __init__(self, aws_region, aws_user_pool, audience): self.aws_region = aws_region self.aws_user_pool = aws_user_pool @@ -86,3 +86,78 @@ def validate(self, token): ) as exc: raise TokenError(str(exc)) from exc return jwt_data + + +class EntraTokenValidator: + def __init__(self, tenant_id, audience, allowed_app_ids): + self.tenant_id = tenant_id + self.audience = audience + self.allowed_app_ids = allowed_app_ids + self.jwks_url = "https://login.microsoftonline.com/common/discovery/keys" + + @cached_property + def expected_issuer(self): + return f"https://sts.windows.net/{self.tenant_id}/" + + @cached_property + def _json_web_keys(self): + response = requests.get(self.jwks_url, timeout=10) + response.raise_for_status() + json_data = response.json() + return {item["kid"]: json.dumps(item) for item in json_data["keys"]} + + def _get_public_key(self, token): + try: + headers = jwt.get_unverified_header(token) + except jwt.DecodeError as exc: + raise TokenError(str(exc)) from exc + + if getattr(settings, "ENTRA_PUBLIC_KEYS_CACHING_ENABLED", False): + cache_key = "entra_jwt:{}".format(headers["kid"]) + jwk_data = cache.get(cache_key) + + if not jwk_data: + jwk_data = self._json_web_keys.get(headers["kid"]) + timeout = getattr(settings, "ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT", 300) + cache.set(cache_key, jwk_data, timeout=timeout) + else: + jwk_data = self._json_web_keys.get(headers["kid"]) + + if jwk_data: + return RSAAlgorithm.from_jwk(jwk_data) + return None + + def validate(self, token): + public_key = self._get_public_key(token) + if not public_key: + msg = "No key found for this token" + raise TokenError(msg) + + params = { + "jwt": token, + "key": public_key, + "issuer": self.expected_issuer, + "audience": self.audience, + "algorithms": ["RS256"], + } + + try: + payload = jwt.decode(**params) + except ( + jwt.InvalidTokenError, + jwt.ExpiredSignatureError, + jwt.DecodeError, + ) as exc: + raise TokenError(str(exc)) from exc + + roles = payload.get("roles", []) + if "application.read" not in roles: + msg = "Missing required role: application.read" + raise TokenError(msg) + + app_id_claim = payload.get("appid") or payload.get("azp") + if app_id_claim not in self.allowed_app_ids: + msg = "Invalid app_id claim" + raise TokenError(msg) + + return payload diff --git a/config.py b/config.py index 5c10fdeb84..ebe52e127f 100644 --- a/config.py +++ b/config.py @@ -61,11 +61,18 @@ # The name of the AWS profile to use for the AWS client used for ingestion AWS_PROFILE_NAME = os.environ.get("AWS_PROFILE_NAME") +JWT_AUTH_HEADER = os.environ.get("JWT_AUTH_HEADER", "HTTP_AUTHORIZATION") + # Cognito configuration COGNITO_AWS_REGION = os.environ.get("COGNITO_AWS_REGION") -COGNITO_JWT_AUTH_HEADER = os.environ.get("COGNITO_JWT_AUTH_HEADER") COGNITO_USER_POOL = os.environ.get("COGNITO_USER_POOL") +# Entra configuration +ENTRA_AUDIENCE = os.environ.get("ENTRA_AUDIENCE") +ENTRA_APP_ID = os.environ.get("ENTRA_APP_ID") +ENTRA_ALLOWED_APP_IDS = os.environ.get("ENTRA_ALLOWED_APP_IDS", "") +ENTRA_TENANT_ID = os.environ.get("ENTRA_TENANT_ID") + # Database configuration POSTGRES_DB = os.environ.get("POSTGRES_DB") POSTGRES_USER = os.environ.get("POSTGRES_USER") diff --git a/metrics/api/settings/default.py b/metrics/api/settings/default.py index 6f8ea47095..ea75c6b9d5 100644 --- a/metrics/api/settings/default.py +++ b/metrics/api/settings/default.py @@ -116,9 +116,18 @@ }, ] -COGNITO_USER_MANAGER = "common.auth.cognito_jwt.user_manager.CognitoManager" +JWT_AUTH_HEADER = config.JWT_AUTH_HEADER + +ENTRA_USER_MANAGER = "common.auth.jwt.user_manager.EntraManager" +ENTRA_AUDIENCE = config.ENTRA_AUDIENCE +ENTRA_APP_ID = config.ENTRA_APP_ID +ENTRA_ALLOWED_APP_IDS = config.ENTRA_ALLOWED_APP_IDS.split(",") +ENTRA_TENANT_ID = config.ENTRA_TENANT_ID +ENTRA_PUBLIC_KEYS_CACHING_ENABLED = True +ENTRA_PUBLIC_KEYS_CACHING_TIMEOUT = 60 * 60 * 24 # 24h caching, default is 300s + +COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" COGNITO_AWS_REGION = config.COGNITO_AWS_REGION -COGNITO_JWT_AUTH_HEADER = config.COGNITO_JWT_AUTH_HEADER COGNITO_USER_POOL = config.COGNITO_USER_POOL COGNITO_AUDIENCE = None COGNITO_PUBLIC_KEYS_CACHING_ENABLED = True @@ -128,7 +137,7 @@ "DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema", "DEFAULT_AUTHENTICATION_CLASSES": [ "rest_framework.authentication.SessionAuthentication", - "common.auth.cognito_jwt.JSONWebTokenAuthentication", + "common.auth.jwt.JSONWebTokenAuthentication", ], } diff --git a/tests/unit/common/auth/cognito_jwt/test_user_manager.py b/tests/unit/common/auth/cognito_jwt/test_user_manager.py deleted file mode 100644 index 85c79d582c..0000000000 --- a/tests/unit/common/auth/cognito_jwt/test_user_manager.py +++ /dev/null @@ -1,46 +0,0 @@ -from django.contrib.auth import get_user_model - -from common.auth.cognito_jwt.user_manager import CognitoManager - -USER_MODEL = get_user_model() - - -def test_get_or_create_for_cognito_returns_user(): - jwt_payload = { - "entraObjectId": "unique_user_id", - "permissionSets": ["all_the_permissions"], - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user - assert user.username == jwt_payload["entraObjectId"] - assert user.permission_sets == jwt_payload["permissionSets"] - assert user.is_active is True - - -def test_get_or_create_for_cognito_returns_none_without_username(): - jwt_payload = { - "permissionSets": ["all_the_permissions"], - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user is None - - -def test_get_or_create_for_cognito_returns_none_without_permission_sets(): - jwt_payload = { - "entraObjectId": "unique_user_id", - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user is None - - -def test_get_or_create_for_cognito_returns_none_with_empty_permission_sets(): - jwt_payload = { - "entraObjectId": "unique_user_id", - "permissionSets": [], - } - - user = CognitoManager.get_or_create_for_cognito(jwt_payload) - assert user is None diff --git a/tests/unit/common/auth/cognito_jwt/test_validator.py b/tests/unit/common/auth/cognito_jwt/test_validator.py deleted file mode 100644 index 6c985ae1df..0000000000 --- a/tests/unit/common/auth/cognito_jwt/test_validator.py +++ /dev/null @@ -1,120 +0,0 @@ -import pytest -from datetime import datetime, timedelta, timezone -from utils import create_jwt_token - -from common.auth.cognito_jwt import validator - - -def test_validate_token(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - - -def test_validate_token_error_key(cognito_well_known_keys, jwk_private_key_two): - token = create_jwt_token( - jwk_private_key_two, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - with pytest.raises(validator.TokenError): - auth.validate(token) - - -def test_validate_token_valid_expiry(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - - -def test_validate_token_error_expired(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - with pytest.raises(validator.TokenError): - auth.validate(token) - - -def test_validate_token_error_aud(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "other-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - - with pytest.raises(validator.TokenError): - auth.validate(token) - - -def test_validate_token_missing_aud(cognito_well_known_keys, jwk_private_key_one): - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - # missing aud - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - - -@pytest.mark.parametrize( - "is_cache_enabled,responses_calls", [(None, 2), (False, 2), (True, 1)] -) -def test_validate_token_caching( - cognito_well_known_keys, - jwk_private_key_one, - settings, - responses, - is_cache_enabled, - responses_calls, -): - if is_cache_enabled is not None: - settings.COGNITO_PUBLIC_KEYS_CACHING_ENABLED = is_cache_enabled - - token = create_jwt_token( - jwk_private_key_one, - { - "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", - "aud": "my-audience", - "sub": "username", - }, - ) - auth = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth.validate(token) - assert len(responses.calls) == 1 - - auth_again = validator.TokenValidator("eu-central-1", "bla", "my-audience") - auth_again.validate(token) - assert len(responses.calls) == responses_calls diff --git a/tests/unit/common/auth/cognito_jwt/conftest.py b/tests/unit/common/auth/jwt/conftest.py similarity index 79% rename from tests/unit/common/auth/cognito_jwt/conftest.py rename to tests/unit/common/auth/jwt/conftest.py index 0f4c2db37b..10e1e84173 100644 --- a/tests/unit/common/auth/cognito_jwt/conftest.py +++ b/tests/unit/common/auth/jwt/conftest.py @@ -8,7 +8,7 @@ def cognito_settings(settings): settings.COGNITO_AWS_REGION = "eu-central-1" settings.COGNITO_USER_POOL = "bla" - settings.COGNITO_JWT_AUTH_HEADER = "HTTP_X_UHD_AUTH" + settings.JWT_AUTH_HEADER = "HTTP_X_UHD_AUTH" settings.COGNITO_AUDIENCE = "my-client-id" settings.COGNITO_PUBLIC_KEYS_CACHING_ENABLED = False settings.CACHES = { @@ -20,6 +20,22 @@ def cognito_settings(settings): settings.ROOT_URLCONF = "urls" +@pytest.fixture(autouse=True) +def entra_settings(settings): + settings.ENTRA_TENANT_ID = "entra_tenant" + settings.ENTRA_AUDIENCE = "entra_audience" + settings.ENTRA_ALLOWED_APP_IDS = ["entraOID", "otherEntraID"] + settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = False + settings.JWT_AUTH_HEADER = "HTTP_AUTHORIZATION" + settings.CACHES = { + "default": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": "unique-snowflake", + } + } + settings.ROOT_URLCONF = "urls" + + def _private_to_public_key(private_key): data = copy.deepcopy(private_key) del data["d"] @@ -99,3 +115,14 @@ def cognito_well_known_keys(responses, jwk_public_key_one, jwk_public_key_two): json=jwk_keys, status=200, ) + + +@pytest.fixture() +def entra_well_known_keys(responses, jwk_public_key_one, jwk_public_key_two): + jwk_keys = {"keys": [jwk_public_key_one]} + responses.add( + responses.GET, + "https://login.microsoftonline.com/common/discovery/keys", + json=jwk_keys, + status=200, + ) diff --git a/tests/unit/common/auth/cognito_jwt/test_backend.py b/tests/unit/common/auth/jwt/test_backend.py similarity index 52% rename from tests/unit/common/auth/cognito_jwt/test_backend.py rename to tests/unit/common/auth/jwt/test_backend.py index 0d2c22ec13..61b37f8d84 100644 --- a/tests/unit/common/auth/cognito_jwt/test_backend.py +++ b/tests/unit/common/auth/jwt/test_backend.py @@ -4,9 +4,11 @@ from django.test import Client, override_settings from rest_framework import status from rest_framework.exceptions import AuthenticationFailed +from unittest.mock import patch from utils import create_jwt_token +import uuid -from common.auth.cognito_jwt import backend +from common.auth.jwt import backend USER_MODEL = get_user_model() @@ -14,19 +16,31 @@ def test_get_authorization_header(rf): """test get_authorization_header correctly handles a header that is a string not a bytestring as expected""" - headers = {settings.COGNITO_JWT_AUTH_HEADER: "bearer string_token"} + headers = {settings.JWT_AUTH_HEADER: "bearer string_token"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() with pytest.raises(AuthenticationFailed): auth.authenticate(request) +@override_settings() +def test_get_auth_header_no_value(rf): + """test get_authorization_header finds no token header if + JWT_AUTH_HEADER is set to None""" + settings.JWT_AUTH_HEADER = None + headers = {"HTTP_AUTHORIZATION": b"bearer string token"} + request = rf.get("/", **headers) + auth = backend.JSONWebTokenAuthentication() + assert auth.authenticate(request) is None + + @override_settings() def test_get_default_auth_header(rf): """test get_authorization_header uses 'Authorization' header if - COGNITO_JWT_AUTH_HEADER is not specified in settings""" - del settings.COGNITO_JWT_AUTH_HEADER - headers = {"Authorization": b"bearer string token"} + JWT_AUTH_HEADER is not specified in settings""" + del settings.JWT_AUTH_HEADER + # Prepend `HTTP_` as it's normally done by Django + headers = {"HTTP_AUTHORIZATION": b"bearer string token"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() with pytest.raises(AuthenticationFailed): @@ -41,9 +55,9 @@ def test_authenticate_no_token(rf): @pytest.mark.parametrize( "cognito_user_manager", - ["common.auth.cognito_jwt.user_manager.CognitoManager", None], + ["common.auth.jwt.user_manager.CognitoManager", None], ) -def test_custom_user_manager( +def test_custom_user_manager_cognito( rf, monkeypatch, cognito_well_known_keys, jwk_private_key_one, cognito_user_manager ): settings.COGNITO_USER_MANAGER = cognito_user_manager @@ -63,14 +77,12 @@ def func(payload): if cognito_user_manager: monkeypatch.setattr( - f"{cognito_user_manager}.get_or_create_for_cognito", func, raising=False + f"{cognito_user_manager}.get_or_create", func, raising=False ) else: - monkeypatch.setattr( - USER_MODEL.objects, "get_or_create_for_cognito", func, raising=False - ) + monkeypatch.setattr(USER_MODEL.objects, "get_or_create", func, raising=False) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) @@ -82,9 +94,7 @@ def func(payload): def test_authenticate_valid_token_with_permission_set( rf, cognito_well_known_keys, jwk_private_key_one ): - settings.COGNITO_USER_MANAGER = ( - "common.auth.cognito_jwt.user_manager.CognitoManager" - ) + settings.COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" token = create_jwt_token( jwk_private_key_one, { @@ -96,7 +106,7 @@ def test_authenticate_valid_token_with_permission_set( }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() user, auth_token = auth.authenticate(request) @@ -105,35 +115,39 @@ def test_authenticate_valid_token_with_permission_set( assert auth_token == token.encode("utf8") +@patch("common.auth.jwt.user_manager.get_user_permission_set") def test_authenticate_valid_token_without_permission_set( - rf, cognito_well_known_keys, jwk_private_key_one + mock_get_perms, rf, cognito_well_known_keys, jwk_private_key_one ): - settings.COGNITO_USER_MANAGER = ( - "common.auth.cognito_jwt.user_manager.CognitoManager" - ) + fake_permissions = ["Permission_1", "Permission_2"] + mock_get_perms.return_value = fake_permissions + + settings.COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" + entra_id = str(uuid.uuid4()) token = create_jwt_token( jwk_private_key_one, { "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", "aud": settings.COGNITO_AUDIENCE, "sub": "username", - "entraObjectId": "entraOID", + "entraObjectId": entra_id, }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() - response = auth.authenticate(request) - assert response is None + user, auth_token = auth.authenticate(request) + assert user + assert user.username == entra_id + assert auth_token == token.encode("utf8") + mock_get_perms.assert_called_once_with(entra_id) def test_authenticate_valid_token_with_empty_permission_set( rf, cognito_well_known_keys, jwk_private_key_one ): - settings.COGNITO_USER_MANAGER = ( - "common.auth.cognito_jwt.user_manager.CognitoManager" - ) + settings.COGNITO_USER_MANAGER = "common.auth.jwt.user_manager.CognitoManager" token = create_jwt_token( jwk_private_key_one, { @@ -145,11 +159,13 @@ def test_authenticate_valid_token_with_empty_permission_set( }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() - response = auth.authenticate(request) - assert response is None + user, auth_token = auth.authenticate(request) + assert user + assert user.username == "entraOID" + assert auth_token == token.encode("utf8") def test_authenticate_invalid(rf, cognito_well_known_keys, jwk_private_key_two): @@ -162,7 +178,7 @@ def test_authenticate_invalid(rf, cognito_well_known_keys, jwk_private_key_two): }, ) - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -171,7 +187,7 @@ def test_authenticate_invalid(rf, cognito_well_known_keys, jwk_private_key_two): def test_authenticate_error_segments(rf): - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer randomiets"} + headers = {settings.JWT_AUTH_HEADER: b"bearer randomiets"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -180,7 +196,7 @@ def test_authenticate_error_segments(rf): def test_authenticate_error_invalid_header(rf): - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer"} + headers = {settings.JWT_AUTH_HEADER: b"bearer"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -189,7 +205,7 @@ def test_authenticate_error_invalid_header(rf): def test_authenticate_error_spaces(rf): - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer random iets"} + headers = {settings.JWT_AUTH_HEADER: b"bearer random iets"} request = rf.get("/", **headers) auth = backend.JSONWebTokenAuthentication() @@ -199,7 +215,63 @@ def test_authenticate_error_spaces(rf): def test_authenticate_error_response_code(): client = Client() - headers = {settings.COGNITO_JWT_AUTH_HEADER: b"bearer random iets"} + headers = {settings.JWT_AUTH_HEADER: b"bearer random iets"} resp = client.get("/", **headers) assert resp.status_code == status.HTTP_401_UNAUTHORIZED + + +def test_authenticate_invalid_entra(rf, entra_well_known_keys, jwk_private_key_two): + token = create_jwt_token( + jwk_private_key_two, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + request = rf.get("/", **headers) + auth = backend.JSONWebTokenAuthentication() + + with pytest.raises(AuthenticationFailed): + auth.authenticate(request) + + +@pytest.mark.parametrize( + "entra_user_manager", + ["common.auth.jwt.user_manager.EntraManager", None], +) +def test_custom_user_manager_entra( + rf, monkeypatch, entra_well_known_keys, jwk_private_key_one, entra_user_manager +): + settings.ENTRA_USER_MANAGER = entra_user_manager + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + + @staticmethod + def func(payload): + return USER_MODEL(username=payload["appid"]) + + if entra_user_manager: + monkeypatch.setattr(f"{entra_user_manager}.get_or_create", func, raising=False) + else: + monkeypatch.setattr(USER_MODEL.objects, "get_or_create", func, raising=False) + + headers = {settings.JWT_AUTH_HEADER: b"bearer %s" % token.encode("utf8")} + request = rf.get("/", **headers) + auth = backend.JSONWebTokenAuthentication() + user, auth_token = auth.authenticate(request) + assert user + assert user.username == "entraOID" + assert auth_token == token.encode("utf8") diff --git a/tests/unit/common/auth/jwt/test_user_manager.py b/tests/unit/common/auth/jwt/test_user_manager.py new file mode 100644 index 0000000000..209ba2ffdf --- /dev/null +++ b/tests/unit/common/auth/jwt/test_user_manager.py @@ -0,0 +1,101 @@ +import pytest +from unittest.mock import patch +import uuid + +from django.contrib.auth import get_user_model +from rest_framework.exceptions import AuthenticationFailed + +from common.auth.jwt.user_manager import CognitoManager, EntraManager + +USER_MODEL = get_user_model() + + +def test_get_or_create_for_cognito_returns_user(): + jwt_payload = { + "entraObjectId": "unique_user_id", + "permissionSets": ["all_the_permissions"], + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["entraObjectId"] + assert user.permission_sets == jwt_payload["permissionSets"] + assert user.is_active is True + + +def test_get_or_create_for_cognito_returns_none_without_username(): + jwt_payload = { + "permissionSets": ["all_the_permissions"], + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user is None + + +@patch("common.auth.jwt.user_manager.get_user_permission_set") +def test_get_or_create_for_cognito_returns_without_permission_sets(mock_get_perms): + fake_permissions = ["Permission_1", "Permission_2"] + mock_get_perms.return_value = fake_permissions + jwt_payload = { + "entraObjectId": "unique_user_id", + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["entraObjectId"] + assert user.permission_sets == fake_permissions + assert user.is_active is True + mock_get_perms.assert_called_once_with(jwt_payload["entraObjectId"]) + + +def test_get_or_create_for_cognito_returns_with_empty_permission_sets(): + jwt_payload = { + "entraObjectId": uuid.uuid4(), + "permissionSets": [], + } + + user = CognitoManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["entraObjectId"] + assert user.permission_sets == jwt_payload["permissionSets"] + assert user.is_active is True + + +@patch("cms.auth_content.models.users.User.objects.filter") +@patch("common.auth.jwt.user_manager.get_user_permission_set") +def test_get_or_create_for_entra_returns_with_permission_sets_lookup( + mock_get_perms, mock_user_filter +): + fake_permissions = ["Permission_1", "Permission_2"] + mock_get_perms.return_value = fake_permissions + mock_user_filter.return_value.exists.return_value = True + jwt_payload = { + "appid": uuid.uuid4(), + } + + user = EntraManager.get_or_create(jwt_payload) + assert user + assert user.username == jwt_payload["appid"] + assert user.permission_sets == fake_permissions + assert user.is_active is True + mock_get_perms.assert_called_once_with(jwt_payload["appid"]) + + +@patch("cms.auth_content.models.users.User.objects.filter") +def test_get_or_create_for_entra_raises_exception_if_user_not_found(mock_user_filter): + mock_user_filter.return_value.exists.return_value = False + jwt_payload = { + "appid": uuid.uuid4(), + } + + with pytest.raises(AuthenticationFailed): + EntraManager.get_or_create(jwt_payload) + + +def test_get_or_create_for_entra_returns_none_without_username(): + jwt_payload = { + "permissionSets": ["all_the_permissions"], + } + + user = EntraManager.get_or_create(jwt_payload) + assert user is None diff --git a/tests/unit/common/auth/jwt/test_validator.py b/tests/unit/common/auth/jwt/test_validator.py new file mode 100644 index 0000000000..fdb0543157 --- /dev/null +++ b/tests/unit/common/auth/jwt/test_validator.py @@ -0,0 +1,282 @@ +import pytest +from datetime import datetime, timedelta, timezone +from django.conf import settings +from utils import create_jwt_token + +from common.auth.jwt import validator + + +def test_validate_token(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + + +def test_validate_token_error_key(cognito_well_known_keys, jwk_private_key_two): + token = create_jwt_token( + jwk_private_key_two, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_valid_expiry(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + + +def test_validate_token_error_expired(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_error_aud(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "other-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_missing_aud(cognito_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + # missing aud + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + + +@pytest.mark.parametrize( + "is_cache_enabled,responses_calls", [(None, 2), (False, 2), (True, 1)] +) +def test_validate_token_caching( + cognito_well_known_keys, + jwk_private_key_one, + settings, + responses, + is_cache_enabled, + responses_calls, +): + if is_cache_enabled is not None: + settings.COGNITO_PUBLIC_KEYS_CACHING_ENABLED = is_cache_enabled + + token = create_jwt_token( + jwk_private_key_one, + { + "iss": "https://cognito-idp.eu-central-1.amazonaws.com/bla", + "aud": "my-audience", + "sub": "username", + }, + ) + auth = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth.validate(token) + assert len(responses.calls) == 1 + + auth_again = validator.CognitoTokenValidator("eu-central-1", "bla", "my-audience") + auth_again.validate(token) + assert len(responses.calls) == responses_calls + + +def test_validate_entra_token(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + auth.validate(token) + + +def test_validate_token_error_key_entra(entra_well_known_keys, jwk_private_key_two): + token = create_jwt_token( + jwk_private_key_two, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_valid_expiry_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + "exp": datetime.now(tz=timezone.utc) + timedelta(minutes=15), + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + auth.validate(token) + + +def test_validate_token_error_expired_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + "exp": datetime.now(tz=timezone.utc) - timedelta(minutes=15), + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_error_aud_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": "other-aud", + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + + with pytest.raises(validator.TokenError): + auth.validate(token) + + +def test_validate_token_missing_aud_entra(entra_well_known_keys, jwk_private_key_one): + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + with pytest.raises(validator.TokenError): + auth.validate(token) + + +@pytest.mark.parametrize( + "is_cache_enabled,responses_calls", [(None, 2), (False, 2), (True, 1)] +) +def test_validate_token_caching_entra( + entra_well_known_keys, + jwk_private_key_one, + settings, + responses, + is_cache_enabled, + responses_calls, +): + if is_cache_enabled is not None: + settings.ENTRA_PUBLIC_KEYS_CACHING_ENABLED = is_cache_enabled + + token = create_jwt_token( + jwk_private_key_one, + { + "iss": f"https://sts.windows.net/{settings.ENTRA_TENANT_ID}/", + "aud": settings.ENTRA_AUDIENCE, + "sub": "username", + "appid": settings.ENTRA_ALLOWED_APP_IDS[0], + "roles": ["application.read"], + }, + ) + auth = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + auth.validate(token) + assert len(responses.calls) == 1 + + auth_again = validator.EntraTokenValidator( + settings.ENTRA_TENANT_ID, + settings.ENTRA_AUDIENCE, + settings.ENTRA_ALLOWED_APP_IDS[0], + ) + auth_again.validate(token) + assert len(responses.calls) == responses_calls diff --git a/tests/unit/common/auth/cognito_jwt/urls.py b/tests/unit/common/auth/jwt/urls.py similarity index 85% rename from tests/unit/common/auth/cognito_jwt/urls.py rename to tests/unit/common/auth/jwt/urls.py index dc211481cf..57c954e866 100644 --- a/tests/unit/common/auth/cognito_jwt/urls.py +++ b/tests/unit/common/auth/jwt/urls.py @@ -2,7 +2,7 @@ from rest_framework.decorators import api_view, authentication_classes from rest_framework.response import Response -from common.auth.cognito_jwt import JSONWebTokenAuthentication +from common.auth.jwt import JSONWebTokenAuthentication @api_view(http_method_names=["GET"]) diff --git a/tests/unit/common/auth/cognito_jwt/utils.py b/tests/unit/common/auth/jwt/utils.py similarity index 100% rename from tests/unit/common/auth/cognito_jwt/utils.py rename to tests/unit/common/auth/jwt/utils.py