diff --git a/docs/validation/COMPREHENSIVE_VALIDATION_REPORT.md b/docs/validation/COMPREHENSIVE_VALIDATION_REPORT.md new file mode 100644 index 0000000..e8fe2bb --- /dev/null +++ b/docs/validation/COMPREHENSIVE_VALIDATION_REPORT.md @@ -0,0 +1,248 @@ +# OpenShield — Comprehensive Validation Report + +> **Base & scope.** This validation was executed against the +> `docs/issue-132-frontend-api-validation-dev` snapshot, which had **44** scanner +> rules at the time. The test deliverables in this branch are rebased onto +> `upstream/dev` (openshield-org), which has since advanced to **45** rules — +> `az_net_015` (public DNS zone enumeration, #106) was added and is now covered by +> a test added in this branch. Counts below have been updated to the 45-rule base. +> Findings were measured during the original docs-branch run and were **not** +> re-measured end-to-end against `upstream/dev` except where noted; the three +> headline code defects (H-1, H-2, M-1) were re-confirmed to still exist on +> `upstream/dev`. Other upstream additions landed after the validation run +> (async scan worker #129, threat-simulation prompt builder #138) were not +> separately validated. + +## Executive Summary + +OpenShield is in **good functional shape**. The two blockers from the previous +validation run (2026-06-07) are **closed**: with a real `DATABASE_URL` + Postgres +the import-time `KeyError: 'DATABASE_URL'` no longer halts pytest collection, and +the full backend suite runs green. The scanner engine, REST API, database +persistence, AI/RAG layer, CVE/NVD correlation, and frontend build all work. + +Rule-test coverage was the largest gap and is now **closed** — every one of the +**45** scanner rules has dedicated compliant + non-compliant unit tests (was 10 of +44 on the docs branch; `az_net_015` coverage was added when rebasing onto +`upstream/dev`). Full backend suite: **170 passed, 0 failed**; `scanner/rules` +coverage **90%**. + +Validation also surfaced **two genuine rule-correctness bugs** (both masked by +bare `except` clauses), one broken AI module, an observability gap in the scan +engine, a Sentinel field-mapping mismatch, outdated dependencies with known CVEs, +and stale documentation counts. None are release-blocking on their own, but the +two scanner-rule bugs defeat the purpose of the affected rules and should be +fixed before the next release. + +The live **Render API is currently suspended by its owner** (HTTP 503, +`suspend-by-user`), so live-API validation is blocked; both Vercel sites serve. + +--- + +## Environment + +| Field | Value | +|---|---| +| OS | Windows 11 + WSL2 Ubuntu 22.04 (backend), Windows (frontend) | +| Python | 3.11.15 (deadsnakes), native WSL venv | +| Node / npm | v22.14.0 / 10.9.2 | +| Database | PostgreSQL 14 (WSL), `openshield_test` DB | +| Branch | `docs/issue-132-frontend-api-validation-dev` | +| HEAD | `67b6744` | +| Date | 2026-06-21 | + +--- + +## Phase Results + +| Phase | Status | Key findings | +|---|---|---| +| 0 — Environment setup | **Pass** | Py 3.11 + Postgres + venv up; `DatabaseManager.run_migrations()` creates `openshield.{findings,scans}`; import-time `DATABASE_URL` blocker resolved | +| 1 — Static checks (CI parity, 8) | **Pass** | 45 rules / 45 playbooks / 4 frameworks (44/44 on the docs branch; `az_net_015` is the +1 on upstream); all 8 checks pass. Playbook `bash -n` failed only due to Windows CRLF checkout (git-stored blobs are LF; CI passes). 2 rules orphaned from frameworks (informational) | +| 2 — Full backend pytest | **Pass** | **87 passed, 0 failed**; 39% coverage; `test_jwt_config.py` collection blocker gone. Found `ai/chunker.py` SyntaxError via coverage warning | +| 3 — Rule-test coverage gap | **Pass** | +73 tests across 6 groups on the docs branch (+3 more for `az_net_015` on upstream); all 45 rules now covered; `scanner/rules` 90%. Surfaced AZ-NET-012 bug | +| 4 — ScanEngine integration | **Pass** | 45 rules load; self-consistent score; error isolation works. Confirmed observability gap; surfaced AZ-NET-009 bug | +| 5 — Local API + smoke test | **Pass** | Pass-1 (empty) 32/32+8 skip; Pass-2 (seeded) 37/38; all 18 routes exercised; playbook + CVE paths exercised after seeding. 1 smoke-test design bug (TC-34) | +| 6 — Frontend | **Pass** | lint 65 err/4 warn (unchanged baseline, cosmetic); build clean (782 kB JS); code-level INT-001–007 verified via normalizer layer | +| 7 — AI / RAG | **Pass** | 219 docs embedded; retriever returns correct top hits; `test_ai_insights.py` 24/24; 400/401/502 validation paths confirmed | +| 8 — CVE / NVD / Sentinel | **Pass** | CVE+NVD 27/27; on the docs snapshot `test_worker.py` was absent (it now exists on upstream via #129 and is included in the 170-test suite); Sentinel has no automated tests; HMAC signing correct; Sentinel field-mapping bug found | +| 9 — Security | **Pass** | JWT fail-closed 7/7; CORS `*` warning fires; SQL parameterized (no injection); no hardcoded creds; dep CVEs + missing rate limiting noted | +| 10 — Documentation | **Partial** | Docs-branch counts stale (39/36 → 44/44 at validation; 45/45 on upstream); architecture identity table 4→9 and PQC category missing. Upstream docs not re-audited | +| 11 — Live environments | **Partial / Blocked** | Render API **suspended by owner** (503); Vercel dashboard + website serve (HTTP 200) | +| Azure live scan | **Deferred** | Needs real credentials (see checklist) | + +--- + +## Prioritized Issue List + +### Critical +*(none)* + +### High + +**H-1 — `AZ-NET-009` silently never fires (false negative).** +`scanner/rules/az_net_009.py:36` calls `client.virtual_network_gateway_connections.list_all()`, +but the real `azure-mgmt-network` SDK exposes only `.list(resource_group)` on that +operations group — there is **no `list_all()`** (verified: application_gateways, +load_balancers, virtual_networks all have `list_all`; only this one does not). The +bare `except Exception` at `az_net_009.py:37` swallows the `AttributeError`, so the +rule always returns `[]` and can never detect an IKEv1 VPN gateway. +*Impact:* a security blind spot — the misconfiguration this rule targets is never reported. +*Fix:* iterate resource groups and call `.list(rg)`, or use the correct list-all method. + +**H-2 — `AZ-NET-012` false-positives on every NSG.** +`scanner/rules/az_net_012.py:48` calls `azure_client.get_nsg_flow_logs(resource_group)`, +but `scanner/azure_client.py` has **no `get_nsg_flow_logs` method**. The +`AttributeError` is swallowed by the bare `except` at `az_net_012.py:56`, leaving +`flow_log_enabled = False`, so **every NSG** is flagged "NSG Flow Logs Not Enabled" +regardless of real configuration. +*Impact:* guaranteed false positives on every scan; erodes trust and buries real findings. +*Fix:* implement `AzureClient.get_nsg_flow_logs()` (Network Watcher flow-logs API), or +change the rule to use an existing accessor. + +### Medium + +**M-1 — `ai/chunker.py` does not import (SyntaxError).** +`ai/chunker.py:37` is `text.rfind("` followed by a literal newline then `", start, end)` +— a string literal with an unescaped newline (should be `"\n"`). The module raises +`SyntaxError` on import. CI misses it because CHECK 6 only `py_compile`s `api/`, not +`ai/`. Blast radius is limited (`chunk_documents()` is imported nowhere; `ai/embed.py` +does its own chunking), but `ai/README.md` documents it as part of the pipeline. +*Fix:* `"\n"`; and extend the CI syntax check to cover `ai/` (and `sentinel/`). + +**M-2 — ScanEngine swallows failed rules with no observability.** +`scanner/engine.py:127` logs a rule exception and continues (good — error isolation +works), but `run_scan()`'s result dict has **no field** listing which rules errored. +A partial scan (rules silently failing) is indistinguishable from a clean scan to the +API/dashboard. *Fix:* add an `errored_rules` list to the result and surface it via the API. + +**M-3 — Sentinel ingest drops compliance fields with real scan data.** +`sentinel/ingest.py:36-37` reads `raw.get("compliance",{}).get("cis"/"nist")`, but the +real scanner emits `frameworks` with UPPERCASE keys (`{"CIS": ..., "NIST": ...}`). With +actual `ScanEngine` output, `CisControl`/`NistControl` are always blank. The April-2026 +`TEST_PLAN.md` passed only because `generate_test_findings.py` produces synthetic +findings matching the ingest code's `compliance.cis` shape (and `OS-00x` IDs), not the +real scanner's. *Fix:* read `frameworks` with case-insensitive keys. + +**M-4 — Dependencies pinned to versions with known CVEs.** +`pip-audit`: 34 advisories across 9 packages — notably **`pyjwt 2.8.0`** (auth library), +`flask-cors 4.0.0` (CVE-2024-6844/6866/6839), `gunicorn 21.2.0` (CVE-2024-1135 request +smuggling), `requests 2.31.0` (CVE-2024-35195), `cryptography 42.0.5`, `flask 3.0.0`, +`azure-identity 1.15.0`. `npm audit`: 1 HIGH in `vite` (dev-server only). +*Fix:* bump pins; prioritize `pyjwt`, `flask-cors`, `gunicorn`, `requests`. + +**M-5 — Documentation counts are stale.** +*(Audited on the `docs/issue-132` snapshot; line numbers and exact text refer to that +branch's docs. Upstream `README.md`/`docs/architecture.md` were not re-audited and may +differ.)* `README.md:47` "39 rules" and `:50` "36 playbooks" (also `:63`/`:67` diagrams +"39"); `docs/architecture.md:5`/`:113` "36 rules", `:46` "20 rule files"; the architecture +rule-category table (`:117-122`) lists Identity as 4 (actually 9) and **omits the PQC +category (3 rules)**, summing to 36. Actual code at validation time: 44 rules / 44 +playbooks (now **45 / 45** on `upstream/dev`). + +### Low / Informational + +- **L-1** `tests/smoke_test.py` TC-34 fails on a seeded DB: TC-33 and TC-34 both `POST` + the same idempotent `/api/scans//enrich`; the 2nd call returns + `{"message":"Scan already enriched"}` (no `status`) at `api/routes/scans.py:100`, so + TC-34's `status=="COMPLETED"` check fails. App behavior is correct; the test makes a + duplicate call. Net effect: smoke test exits 1 on any already-enriched scan. +- **L-2** `tests/smoke_test.py` duplicates the TC-24/25/26/27/28 blocks (copy-paste). +- **L-3** `AZ-KV-003` and `AZ-NET-012` rule IDs are not referenced by any + `compliance/frameworks/*.json` — they never surface on the Compliance page. CI CHECK 7 + only validates the forward direction so it doesn't catch this. +- **L-4** No `.gitattributes`; with `core.autocrlf=true` the `.sh` playbooks check out as + CRLF on Windows and fail `bash -n` locally. Add `*.sh text eol=lf`. +- **L-5** CORS defaults to `*` when `ALLOWED_ORIGINS` unset (`api/app.py:108`) — pre-prod + hardening item. +- **L-6** No rate limiting on any endpoint (informational for an OSS project at this stage). +- **L-7** `api/routes/findings.py:10-12` defines `_PLAYBOOKS_DIR` twice (dead line). +- **L-8** Compliance controls table lacks per-control `severity`/`category`/`resources` + (`get_compliance_score` omits them; the frontend defaults to MEDIUM/General/0) — + cosmetic, graceful. +- **L-9** `tailwind.config.js` `no-undef` lint error (`module` not defined) — CommonJS + file not excluded from the ESM lint config. +- **Note (docs-branch vs upstream).** On the validated `docs/issue-132` snapshot, the + 5th AI route `/api/ai/threat-simulation`, `tests/test_worker.py`, and `az_net_015` were + **all absent** (only 4 AI routes existed). On `upstream/dev` all three have since landed + (`az_net_015` #106 — now covered by a test added in this branch; async worker + + `tests/test_worker.py` #129; `/api/ai/threat-simulation` route #138, making 5 AI routes). + These upstream additions were **not** separately validated in this pass. + +--- + +## Test Deliverables (kept) + +| File | Change | +|---|---| +| `tests/test_rules_compute.py` | **new** — 9 tests (AZ-CMP-001..004) | +| `tests/test_rules_identity.py` | +16 tests (AZ-IDN-002..009; Graph/SDK monkeypatched) | +| `tests/test_rules_network.py` | +27 tests (AZ-NET-003..014; +AZ-NET-015 on upstream) | +| `tests/test_rules_keyvault.py` | +9 tests (AZ-KV-001,003..005) | +| `tests/test_rules_database.py` | +6 tests (AZ-DB-001..003) | +| `tests/test_rules_storage.py` | +6 tests (AZ-STOR-003..005) | +| `tests/test_engine_integration.py` | **new** — 4 ScanEngine orchestration tests | +| `tests/helpers/mock_azure.py` | extended additively (~30 accessors incl. DNS zones/record-sets + credential stub) | + +Test counts after additions: + +- `pytest -q` (full backend suite): **170 passed, 0 failed** (on `upstream/dev`; was 164 + on the docs branch — upstream adds its own tests incl. `test_worker.py`, plus the 3 + `az_net_015` tests added here) +- `pytest tests/test_rules_*.py -q` (CI rule-regression slice): **92 passed** (was 89; +3 + for `az_net_015`) +- `pytest tests/test_engine_integration.py -q`: **4 passed** + +### Scope of the AZ-NET-009 and AZ-NET-012 tests + +The new tests for **AZ-NET-009** (`tests/test_rules_network.py`) and **AZ-NET-012** +(same file) validate the rules' **intended isolated detection logic — not current +production integration.** Both tests deliberately supply, via the mock/fake client, +a method the real code path lacks: + +- AZ-NET-009: the fake `NetworkManagementClient` exposes + `virtual_network_gateway_connections.list_all()`, which the real `azure-mgmt-network` + SDK does **not** provide (only `.list(resource_group)`). +- AZ-NET-012: `MockAzureClient.get_nsg_flow_logs(resource_group)` exists, but the real + `scanner/azure_client.py` has **no `get_nsg_flow_logs` method at all**. + +These tests therefore pass against the *expected* contract and prove the rules' branch +logic is sound, but they do **not** reproduce the production behaviour (a swallowed +`AttributeError` → AZ-NET-009 never fires / AZ-NET-012 always fires). They are kept to +lock in correct logic once H-1/H-2 are fixed; the bugs themselves are tracked above and +must be fixed in production code (`scanner/`), which this validation pass did not touch. + +--- + +## Deferred — Azure live scan (needs real credentials) + +Do **not** fabricate values. When a sandbox/non-production subscription is available: + +```bash +export AZURE_SUBSCRIPTION_ID=... +export AZURE_CLIENT_ID=... +export AZURE_CLIENT_SECRET=... +export AZURE_TENANT_ID=... +``` + +Then run **one** of: + +```bash +# Direct engine scan +python -c "from scanner.engine import ScanEngine; import json,os; print(json.dumps(ScanEngine(os.environ['AZURE_SUBSCRIPTION_ID']).run_scan(), indent=2))" + +# Or via the running API +# POST /api/scans/trigger (JWT required), then poll GET /api/scans +# Or the credential-gated smoke tests +RUN_REAL_SCAN=true python tests/smoke_test.py +``` + +**Safety pre-check before any real run:** re-read all 45 rules' `scan()` calls to confirm +every Azure SDK call is read-only (`.list*` / `.get*` only — no create/update/delete). +Spot-checks during this pass found only read calls, but do a final sweep. + +**Expected real-scan caveats given the bugs above:** AZ-NET-009 will report nothing +(H-1) and AZ-NET-012 will flag every NSG (H-2) until fixed. + +Also deferred: **live Sentinel ingestion** (needs Log Analytics workspace ID + shared key) +and a **real LLM completion** against `/api/ai/*` (BYOK — opportunistic only). diff --git a/tests/helpers/mock_azure.py b/tests/helpers/mock_azure.py index ed18ef5..1eb1457 100644 --- a/tests/helpers/mock_azure.py +++ b/tests/helpers/mock_azure.py @@ -14,7 +14,7 @@ """ from types import SimpleNamespace -from typing import Any, Dict, List, Tuple, Optional +from typing import Any, Dict, List, Optional, Tuple def make_resource(**kwargs: Any) -> SimpleNamespace: @@ -22,6 +22,26 @@ def make_resource(**kwargs: Any) -> SimpleNamespace: return SimpleNamespace(**kwargs) +class _StubToken: + """Minimal stand-in for an Azure AccessToken (has a ``.token`` attribute).""" + + def __init__(self, token: str = "fake-graph-token") -> None: + self.token = token + + +class _StubCredential: + """Minimal stand-in for a TokenCredential used by Graph/SDK-based rules. + + Rules that talk to Microsoft Graph or instantiate an Azure management + client read ``azure_client.credential`` and call ``get_token(scope)``. + Tests monkeypatch ``requests.get`` / the SDK client class separately, so + this stub only needs to hand back an object exposing ``.token``. + """ + + def get_token(self, *scopes: Any, **kwargs: Any) -> _StubToken: + return _StubToken() + + class MockAzureClient: """Drop-in replacement for AzureClient that returns configured fake data.""" @@ -33,8 +53,36 @@ def __init__(self) -> None: self._sql_servers: List[Any] = [] self._service_principals: List[Any] = [] self._sql_firewall_rules: Dict[Tuple[str, str], List[Any]] = {} + # --- Additional state added for full rule-coverage tests ---------- # + self._network_interfaces: Dict[Tuple[str, str], Any] = {} + self._vm_extensions: Dict[Tuple[str, str], Optional[List[Any]]] = {} + self._storage_lifecycle: Dict[Tuple[str, str], Optional[bool]] = {} + self._storage_logging: Dict[Tuple[str, str, str], Optional[bool]] = {} + self._virtual_networks: List[Any] = [] + self._public_ip_addresses: List[Any] = [] + self._all_azure_firewalls: Optional[List[Any]] = [] + self._vnet_peerings: Dict[Tuple[str, str], List[Any]] = {} + self._postgresql_servers: List[Any] = [] + self._postgresql_flexible_servers: List[Any] = [] + self._pg_flex_parameters: Dict[Tuple[str, str], List[Any]] = {} + self._sql_auditing: Dict[Tuple[str, str], Any] = {} + self._sql_auditing_default: Any = None + self._kv_certificates: Dict[str, List[Any]] = {} + self._kv_keys: Dict[str, List[Any]] = {} self._diagnostic_settings: Dict[str, Optional[bool]] = {} - self._key_vault_certificates: Dict[str, List[Any]] = {} + self._diagnostic_default: Optional[bool] = False + self._conditional_access_policies: List[Any] = [] + # Credential stub for Graph/SDK-based rules (idn_003..009). + self.credential = _StubCredential() + self._regions_with_resources: List[str] = [] + self._network_watcher_regions: List[str] = [] + self._nsg_flow_logs: Dict[str, List[Any]] = {} + self._dns_zones: List[Any] = [] + self._dns_record_sets: Dict[Tuple[str, str], List[Any]] = {} + self._web_apps: List[Any] = [] + # Some rules read azure_client.subscription_id when constructing an + # SDK management client inside scan() (e.g. AZ-NET-007..010). + self.subscription_id = "00000000-0000-0000-0000-000000000001" def set_storage_accounts(self, accounts: List[Any]) -> "MockAzureClient": self._storage_accounts = accounts @@ -66,14 +114,6 @@ def set_sql_server_firewall_rules( self._sql_firewall_rules[(resource_group, server_name)] = rules return self - def set_diagnostic_settings(self, resource_id: str, status: Optional[bool]) -> "MockAzureClient": - self._diagnostic_settings[resource_id] = status - return self - - def set_key_vault_certificates(self, vault_name: str, certificates: List[Any]) -> "MockAzureClient": - self._key_vault_certificates[vault_name] = certificates - return self - def get_storage_accounts(self) -> List[Any]: return self._storage_accounts @@ -97,11 +137,235 @@ def get_sql_server_firewall_rules( ) -> List[Any]: return self._sql_firewall_rules.get((resource_group, server_name), []) - def get_diagnostic_settings(self, resource_id: str) -> Optional[bool]: - return self._diagnostic_settings.get(resource_id, False) + # ------------------------------------------------------------------ # + # Compute — network interfaces & VM extensions # + # ------------------------------------------------------------------ # + + def set_network_interface( + self, resource_group: str, nic_name: str, nic: Any + ) -> "MockAzureClient": + self._network_interfaces[(resource_group, nic_name)] = nic + return self + + def get_network_interface( + self, resource_group: str, nic_name: str + ) -> Optional[Any]: + return self._network_interfaces.get((resource_group, nic_name)) + + def set_vm_extensions( + self, resource_group: str, vm_name: str, extensions: Optional[List[Any]] + ) -> "MockAzureClient": + self._vm_extensions[(resource_group, vm_name)] = extensions + return self + + def get_vm_extensions( + self, resource_group: str, vm_name: str + ) -> Optional[List[Any]]: + # Default to an empty list (compliant-by-default) unless configured. + return self._vm_extensions.get((resource_group, vm_name), []) + + # ------------------------------------------------------------------ # + # Storage — lifecycle & service logging (three-state: True/False/None) # + # ------------------------------------------------------------------ # + + def set_storage_lifecycle_policy( + self, resource_group: str, account_name: str, value: Optional[bool] + ) -> "MockAzureClient": + self._storage_lifecycle[(resource_group, account_name)] = value + return self + + def get_storage_lifecycle_policy( + self, resource_group: str, account_name: str + ) -> Optional[bool]: + return self._storage_lifecycle.get((resource_group, account_name)) + + def set_storage_service_logging( + self, resource_group: str, account_name: str, service: str, value: Optional[bool] + ) -> "MockAzureClient": + self._storage_logging[(resource_group, account_name, service)] = value + return self + + def get_storage_service_logging( + self, resource_group: str, account_name: str, service: str + ) -> Optional[bool]: + return self._storage_logging.get((resource_group, account_name, service)) + + # ------------------------------------------------------------------ # + # Network — VNets, public IPs, firewalls, peerings, flow logs # + # ------------------------------------------------------------------ # + + def set_virtual_networks(self, vnets: List[Any]) -> "MockAzureClient": + self._virtual_networks = vnets + return self + + def get_virtual_networks(self) -> List[Any]: + return self._virtual_networks + + def set_public_ip_addresses(self, ips: List[Any]) -> "MockAzureClient": + self._public_ip_addresses = ips + return self + + def get_public_ip_addresses(self) -> List[Any]: + return self._public_ip_addresses + + def set_all_azure_firewalls( + self, firewalls: Optional[List[Any]] + ) -> "MockAzureClient": + self._all_azure_firewalls = firewalls + return self + + def get_all_azure_firewalls(self) -> Optional[List[Any]]: + return self._all_azure_firewalls + + def set_vnet_peerings( + self, resource_group: str, vnet_name: str, peerings: List[Any] + ) -> "MockAzureClient": + self._vnet_peerings[(resource_group, vnet_name)] = peerings + return self + + def get_vnet_peerings( + self, resource_group: str, vnet_name: str + ) -> List[Any]: + return self._vnet_peerings.get((resource_group, vnet_name), []) + + def set_nsg_flow_logs( + self, resource_group: str, flow_logs: List[Any] + ) -> "MockAzureClient": + # NOTE: az_net_012 calls get_nsg_flow_logs(resource_group) with a single + # argument and iterates the result. This mock matches that *expected* + # contract so the rule's detection logic can be exercised. The real + # AzureClient does not implement get_nsg_flow_logs at all — see the + # validation report (AZ-NET-012 always false-positives in production). + self._nsg_flow_logs[resource_group] = flow_logs + return self + + def get_nsg_flow_logs(self, resource_group: str) -> List[Any]: + return self._nsg_flow_logs.get(resource_group, []) + + def set_regions_with_resources(self, regions: List[str]) -> "MockAzureClient": + self._regions_with_resources = regions + return self + + def get_regions_with_resources(self) -> List[str]: + return self._regions_with_resources + + def set_network_watcher_regions(self, regions: List[str]) -> "MockAzureClient": + self._network_watcher_regions = regions + return self + + def get_network_watcher_regions(self) -> List[str]: + return self._network_watcher_regions + + def set_dns_zones(self, zones: List[Any]) -> "MockAzureClient": + self._dns_zones = zones + return self + + def get_dns_zones(self) -> List[Any]: + return self._dns_zones + + def set_dns_record_sets( + self, resource_group: str, zone_name: str, record_sets: List[Any] + ) -> "MockAzureClient": + self._dns_record_sets[(resource_group, zone_name)] = record_sets + return self + + def get_dns_record_sets( + self, resource_group: str, zone_name: str + ) -> List[Any]: + return self._dns_record_sets.get((resource_group, zone_name), []) + + # ------------------------------------------------------------------ # + # Databases — PostgreSQL (single + flexible), SQL auditing # + # ------------------------------------------------------------------ # + + def set_postgresql_servers(self, servers: List[Any]) -> "MockAzureClient": + self._postgresql_servers = servers + return self + + def get_postgresql_servers(self) -> List[Any]: + return self._postgresql_servers + + def set_postgresql_flexible_servers(self, servers: List[Any]) -> "MockAzureClient": + self._postgresql_flexible_servers = servers + return self + + def get_postgresql_flexible_servers(self) -> List[Any]: + return self._postgresql_flexible_servers + + def set_postgresql_flexible_server_parameters( + self, resource_group: str, server_name: str, parameters: List[Any] + ) -> "MockAzureClient": + self._pg_flex_parameters[(resource_group, server_name)] = parameters + return self + + def get_postgresql_flexible_server_parameters( + self, resource_group: str, server_name: str + ) -> List[Any]: + return self._pg_flex_parameters.get((resource_group, server_name), []) + + def set_sql_server_auditing_policy( + self, resource_group: str, server_name: str, policy: Any + ) -> "MockAzureClient": + self._sql_auditing[(resource_group, server_name)] = policy + return self + + def get_sql_server_auditing_policy( + self, resource_group: str, server_name: str + ) -> Optional[Any]: + return self._sql_auditing.get( + (resource_group, server_name), self._sql_auditing_default + ) + + # ------------------------------------------------------------------ # + # Key Vault — certificates & keys (data plane) # + # ------------------------------------------------------------------ # + + def set_key_vault_certificates( + self, vault_name: str, certificates: List[Any] + ) -> "MockAzureClient": + self._kv_certificates[vault_name] = certificates + return self def get_key_vault_certificates(self, vault_name: str) -> List[Any]: - return self._key_vault_certificates.get(vault_name, []) + return self._kv_certificates.get(vault_name, []) + + def set_key_vault_keys( + self, vault_name: str, keys: List[Any] + ) -> "MockAzureClient": + self._kv_keys[vault_name] = keys + return self + + def get_key_vault_keys(self, vault_name: str) -> List[Any]: + return self._kv_keys.get(vault_name, []) + + # ------------------------------------------------------------------ # + # Monitoring & Identity # + # ------------------------------------------------------------------ # + + def set_diagnostic_settings( + self, resource_id: str, value: Optional[bool] + ) -> "MockAzureClient": + self._diagnostic_settings[resource_id] = value + return self + + def get_diagnostic_settings(self, resource_id: str) -> Optional[bool]: + return self._diagnostic_settings.get(resource_id, self._diagnostic_default) + + def set_conditional_access_policies( + self, policies: List[Any] + ) -> "MockAzureClient": + self._conditional_access_policies = policies + return self + + def get_conditional_access_policies(self) -> List[Any]: + return self._conditional_access_policies + + def set_web_apps(self, apps: List[Any]) -> "MockAzureClient": + self._web_apps = apps + return self + + def get_web_apps(self) -> List[Any]: + return self._web_apps @staticmethod def parse_resource_id(resource_id: str) -> Dict[str, str]: diff --git a/tests/test_engine_integration.py b/tests/test_engine_integration.py new file mode 100644 index 0000000..cd23f51 --- /dev/null +++ b/tests/test_engine_integration.py @@ -0,0 +1,138 @@ +"""Offline integration tests for scanner.engine.ScanEngine. + +ScanEngine.__init__ hardcodes ``self.client = AzureClient(subscription_id)``, +so there is no constructor injection point for a mock. These tests monkeypatch +``scanner.engine.AzureClient`` to return a configured MockAzureClient before +instantiating the engine, then exercise run_scan() end-to-end with no network. +""" + +import scanner.engine as engine_mod +from scanner.engine import ScanEngine +from tests.helpers.mock_azure import MockAzureClient, make_resource + +_SUB = "00000000-0000-0000-0000-000000000001" +_RG = "rg-test" + + +class _OfflineCredential: + """A credential whose get_token raises, so the handful of rules that build + their own Azure SDK/Graph clients inside scan() fail fast OFFLINE (no real + network egress) and are caught by their own try/except. This keeps the + whole-engine integration test deterministic and network-free.""" + + def get_token(self, *scopes, **kwargs): + raise RuntimeError("offline test: no Azure access") + + +def _offline_mock(): + client = MockAzureClient() + client.credential = _OfflineCredential() + return client + + +def _patch_engine_client(monkeypatch, client): + """Force ScanEngine to use the given client instead of a real AzureClient.""" + monkeypatch.setattr(engine_mod, "AzureClient", lambda subscription_id: client) + + +def _storage_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Storage/storageAccounts/{name}" + ) + + +def _nsg_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Network/networkSecurityGroups/{name}" + ) + + +def test_engine_loads_all_45_rules(monkeypatch): + """The engine must dynamically load all 45 rule modules.""" + _patch_engine_client(monkeypatch, _offline_mock()) + eng = ScanEngine(_SUB) + assert len(eng.rules) >= 45 + # Every loaded rule must expose a callable scan() and a RULE_ID. + for rule in eng.rules: + assert callable(getattr(rule, "scan", None)) + assert getattr(rule, "RULE_ID", None) + + +def test_engine_run_scan_result_is_self_consistent(monkeypatch): + """total_findings, the findings list, and score must be mutually consistent.""" + client = _offline_mock() + # Seed a couple of flagged resources so the scan is non-empty but bounded. + client.set_storage_accounts([ + make_resource(id=_storage_id("public-sa"), name="public-sa", + allow_blob_public_access=True, enable_https_traffic_only=False, + sku=make_resource(name="Standard_LRS"), location="eastus"), + ]) + _patch_engine_client(monkeypatch, client) + + eng = ScanEngine(_SUB) + result = eng.run_scan() + + assert result["status"] == "completed" + assert result["subscription_id"] == _SUB + assert result["total_findings"] == len(result["findings"]) + assert result["total_findings"] > 0 # the public storage account triggers findings + + # Score is 100 minus severity-weighted deductions, floored at 0. + weights = {"HIGH": 10, "MEDIUM": 5, "LOW": 2} + expected_deduction = sum( + weights.get((f.get("severity") or "").upper(), 0) for f in result["findings"] + ) + assert result["score"] == max(0, 100 - expected_deduction) + assert 0 <= result["score"] <= 100 + + # Every finding is tagged with the scan_id and a detected_at timestamp. + for f in result["findings"]: + assert f["scan_id"] == result["scan_id"] + assert "detected_at" in f + + +def test_engine_empty_subscription_is_consistent(monkeypatch): + """A subscription with no resources still completes consistently. + + Note: it does NOT score 100 — several rules flag the *absence* of a control + (e.g. AZ-IDN-002 flags "no Conditional Access MFA policy"), so an empty + subscription legitimately produces a small number of findings. The result + must still be internally consistent. + """ + _patch_engine_client(monkeypatch, _offline_mock()) + eng = ScanEngine(_SUB) + result = eng.run_scan() + assert result["status"] == "completed" + assert result["total_findings"] == len(result["findings"]) + assert 0 <= result["score"] <= 100 + # AZ-IDN-002 is absence-based: no CA policy configured -> it fires. + assert any(f["rule_id"] == "AZ-IDN-002" for f in result["findings"]) + + +def test_engine_isolates_a_failing_rule(monkeypatch): + """One rule raising inside scan() must not abort the whole scan. + + Demonstrates the OBSERVABILITY GAP: the failed rule is swallowed and the + result dict has no field recording which rules errored, so a partial scan + is indistinguishable from a fully clean one. + """ + _patch_engine_client(monkeypatch, _offline_mock()) + eng = ScanEngine(_SUB) + assert len(eng.rules) >= 45 + + # Force the first loaded rule to raise when scanned. + def _boom(*args, **kwargs): + raise RuntimeError("simulated rule failure") + + monkeypatch.setattr(eng.rules[0], "scan", _boom) + + result = eng.run_scan() # must not raise + assert result["status"] == "completed" + # The other 44 rules still ran (empty mock -> no findings) and the scan + # completed. Crucially, there is NO field in the result naming the failed + # rule -- this is the observability gap flagged in the validation report. + assert "errored_rules" not in result + assert "rules_failed" not in result + assert "errors" not in result diff --git a/tests/test_rules_compute.py b/tests/test_rules_compute.py new file mode 100644 index 0000000..b810028 --- /dev/null +++ b/tests/test_rules_compute.py @@ -0,0 +1,202 @@ +"""Rule regression tests for the compute rules AZ-CMP-001 .. AZ-CMP-004. + +Each test configures a MockAzureClient with fake VM/NIC/extension data and +calls the rule's scan() function directly. No network calls are made — the +mock_azure and subscription_id fixtures come from tests/conftest.py and the +helper accessors from tests/helpers/mock_azure.py. +""" + +import scanner.rules.az_cmp_001 as az_cmp_001 +import scanner.rules.az_cmp_002 as az_cmp_002 +import scanner.rules.az_cmp_003 as az_cmp_003 +import scanner.rules.az_cmp_004 as az_cmp_004 +from tests.helpers.mock_azure import make_resource + +_REQUIRED_FIELDS = { + "rule_id", "rule_name", "severity", "category", + "resource_id", "resource_name", "resource_type", + "description", "remediation", "playbook", "frameworks", +} + +_SUB = "00000000-0000-0000-0000-000000000001" +_RG = "rg-test" + + +def _vm_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Compute/virtualMachines/{name}" + ) + + +def _nic_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Network/networkInterfaces/{name}" + ) + + +# ── AZ-CMP-001: VM public IP with no NSG ──────────────────────────────────── + +def test_cmp_001_compliant_nic_with_nsg_returns_no_findings(mock_azure, subscription_id): + """A NIC with a public IP but a protecting NSG is compliant.""" + nic = make_resource( + ip_configurations=[make_resource(public_ip_address=make_resource(id="pip1"))], + network_security_group=make_resource(id="nsg1"), + ) + vm = make_resource( + id=_vm_id("vm-compliant"), + name="vm-compliant", + network_profile=make_resource( + network_interfaces=[make_resource(id=_nic_id("nic1"))] + ), + ) + mock_azure.set_virtual_machines([vm]) + mock_azure.set_network_interface(_RG, "nic1", nic) + assert az_cmp_001.scan(mock_azure, subscription_id) == [] + + +def test_cmp_001_noncompliant_public_ip_no_nsg_returns_one_finding(mock_azure, subscription_id): + """A NIC with a public IP and no NSG must produce exactly one finding.""" + nic = make_resource( + ip_configurations=[make_resource(public_ip_address=make_resource(id="pip1"))], + network_security_group=None, + ) + vm = make_resource( + id=_vm_id("vm-exposed"), + name="vm-exposed", + network_profile=make_resource( + network_interfaces=[make_resource(id=_nic_id("nic1"))] + ), + ) + mock_azure.set_virtual_machines([vm]) + mock_azure.set_network_interface(_RG, "nic1", nic) + findings = az_cmp_001.scan(mock_azure, subscription_id) + assert len(findings) == 1 + f = findings[0] + assert _REQUIRED_FIELDS.issubset(f.keys()) + assert f["rule_id"] == "AZ-CMP-001" + assert f["severity"] == "HIGH" + assert f["resource_name"] == "vm-exposed" + + +# ── AZ-CMP-002: disk using platform-managed encryption only ───────────────── + +def test_cmp_002_compliant_cmk_disk_returns_no_findings(mock_azure, subscription_id): + """OS disk encrypted with a customer-managed key is compliant.""" + os_disk = make_resource( + name="osdisk", + managed_disk=make_resource( + encryption=make_resource(type="EncryptionAtRestWithCustomerKey") + ), + ) + vm = make_resource( + id=_vm_id("vm-cmk"), + name="vm-cmk", + location="eastus", + storage_profile=make_resource(os_disk=os_disk, data_disks=[]), + ) + mock_azure.set_virtual_machines([vm]) + assert az_cmp_002.scan(mock_azure, subscription_id) == [] + + +def test_cmp_002_noncompliant_platform_key_returns_one_finding(mock_azure, subscription_id): + """OS disk using platform-managed encryption only must produce one finding.""" + os_disk = make_resource( + name="osdisk", + managed_disk=make_resource( + encryption=make_resource(type="EncryptionAtRestWithPlatformKey") + ), + ) + vm = make_resource( + id=_vm_id("vm-pmk"), + name="vm-pmk", + location="eastus", + storage_profile=make_resource(os_disk=os_disk, data_disks=[]), + ) + mock_azure.set_virtual_machines([vm]) + findings = az_cmp_002.scan(mock_azure, subscription_id) + assert len(findings) == 1 + f = findings[0] + assert _REQUIRED_FIELDS.issubset(f.keys()) + assert f["rule_id"] == "AZ-CMP-002" + assert f["severity"] == "HIGH" + assert f["resource_name"] == "vm-pmk" + + +# ── AZ-CMP-003: VM without endpoint protection ────────────────────────────── + +def test_cmp_003_compliant_with_ep_extension_returns_no_findings(mock_azure, subscription_id): + """A VM with a recognised endpoint-protection extension is compliant.""" + vm = make_resource(id=_vm_id("vm-protected"), name="vm-protected") + mock_azure.set_virtual_machines([vm]) + mock_azure.set_vm_extensions( + _RG, "vm-protected", + [make_resource(type_properties_type="IaaSAntimalware")], + ) + assert az_cmp_003.scan(mock_azure, subscription_id) == [] + + +def test_cmp_003_noncompliant_no_ep_extension_returns_one_finding(mock_azure, subscription_id): + """A VM with no endpoint-protection extension must produce one finding.""" + vm = make_resource(id=_vm_id("vm-unprotected"), name="vm-unprotected") + mock_azure.set_virtual_machines([vm]) + mock_azure.set_vm_extensions( + _RG, "vm-unprotected", + [make_resource(type_properties_type="CustomScript")], + ) + findings = az_cmp_003.scan(mock_azure, subscription_id) + assert len(findings) == 1 + f = findings[0] + assert _REQUIRED_FIELDS.issubset(f.keys()) + assert f["rule_id"] == "AZ-CMP-003" + assert f["severity"] == "HIGH" + assert f["resource_name"] == "vm-unprotected" + + +def test_cmp_003_extensions_none_skips_without_finding(mock_azure, subscription_id): + """When extensions cannot be determined (None) the rule must not flag.""" + vm = make_resource(id=_vm_id("vm-unknown"), name="vm-unknown") + mock_azure.set_virtual_machines([vm]) + mock_azure.set_vm_extensions(_RG, "vm-unknown", None) + assert az_cmp_003.scan(mock_azure, subscription_id) == [] + + +# ── AZ-CMP-004: VM without automatic OS patching ──────────────────────────── + +def test_cmp_004_compliant_auto_updates_returns_no_findings(mock_azure, subscription_id): + """A Windows VM with automatic updates enabled is compliant.""" + vm = make_resource( + id=_vm_id("vm-patched"), + name="vm-patched", + os_profile=make_resource( + windows_configuration=make_resource( + enable_automatic_updates=True, patch_settings=None + ), + linux_configuration=None, + ), + ) + mock_azure.set_virtual_machines([vm]) + assert az_cmp_004.scan(mock_azure, subscription_id) == [] + + +def test_cmp_004_noncompliant_no_patching_returns_one_finding(mock_azure, subscription_id): + """A Windows VM with auto-updates off and no auto patch-mode must flag.""" + vm = make_resource( + id=_vm_id("vm-stale"), + name="vm-stale", + os_profile=make_resource( + windows_configuration=make_resource( + enable_automatic_updates=False, patch_settings=None + ), + linux_configuration=None, + ), + ) + mock_azure.set_virtual_machines([vm]) + findings = az_cmp_004.scan(mock_azure, subscription_id) + assert len(findings) == 1 + f = findings[0] + assert _REQUIRED_FIELDS.issubset(f.keys()) + assert f["rule_id"] == "AZ-CMP-004" + assert f["severity"] == "HIGH" + assert f["resource_name"] == "vm-stale" diff --git a/tests/test_rules_database.py b/tests/test_rules_database.py index e6a214f..f78902f 100644 --- a/tests/test_rules_database.py +++ b/tests/test_rules_database.py @@ -1,5 +1,8 @@ -"""Rule regression tests for AZ-DB-004.""" +"""Rule regression tests for the database rules AZ-DB-001 .. AZ-DB-004.""" +import scanner.rules.az_db_001 as az_db_001 +import scanner.rules.az_db_002 as az_db_002 +import scanner.rules.az_db_003 as az_db_003 import scanner.rules.az_db_004 as az_db_004 from tests.helpers.mock_azure import make_resource @@ -62,3 +65,86 @@ def test_db_004_no_firewall_rules_returns_no_findings(mock_azure, subscription_i mock_azure.set_sql_server_firewall_rules(_RG, "sql-no-rules", []) findings = az_db_004.scan(mock_azure, subscription_id) assert findings == [] + + +def _pg_id(name, provider="Microsoft.DBforPostgreSQL/servers"): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/{provider}/{name}" + ) + + +# ── AZ-DB-001: PostgreSQL single-server public network access ─────────────── + +def test_db_001_compliant_returns_no_findings(mock_azure, subscription_id): + server = make_resource( + id=_pg_id("pg-private"), name="pg-private", + public_network_access="Disabled", location="eastus", + ) + mock_azure.set_postgresql_servers([server]) + assert az_db_001.scan(mock_azure, subscription_id) == [] + + +def test_db_001_noncompliant_returns_one_finding(mock_azure, subscription_id): + server = make_resource( + id=_pg_id("pg-public"), name="pg-public", + public_network_access="Enabled", location="eastus", + ) + mock_azure.set_postgresql_servers([server]) + findings = az_db_001.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-DB-001" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["resource_name"] == "pg-public" + + +# ── AZ-DB-002: Azure SQL server auditing disabled ─────────────────────────── + +def test_db_002_compliant_returns_no_findings(mock_azure, subscription_id): + server = make_resource(id=_sql_id("sql-audited"), name="sql-audited") + mock_azure.set_sql_servers([server]) + mock_azure.set_sql_server_auditing_policy(_RG, "sql-audited", make_resource(state="Enabled")) + assert az_db_002.scan(mock_azure, subscription_id) == [] + + +def test_db_002_noncompliant_returns_one_finding(mock_azure, subscription_id): + server = make_resource(id=_sql_id("sql-unaudited"), name="sql-unaudited") + mock_azure.set_sql_servers([server]) + mock_azure.set_sql_server_auditing_policy(_RG, "sql-unaudited", make_resource(state="Disabled")) + findings = az_db_002.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-DB-002" + assert findings[0]["severity"] == "MEDIUM" + assert findings[0]["resource_name"] == "sql-unaudited" + + +# ── AZ-DB-003: PostgreSQL flexible server SSL enforcement disabled ────────── + +def test_db_003_compliant_returns_no_findings(mock_azure, subscription_id): + server = make_resource( + id=_pg_id("pgflex-ssl", "Microsoft.DBforPostgreSQL/flexibleServers"), + name="pgflex-ssl", location="eastus", + ) + mock_azure.set_postgresql_flexible_servers([server]) + mock_azure.set_postgresql_flexible_server_parameters( + _RG, "pgflex-ssl", + [make_resource(name="require_secure_transport", value="on")], + ) + assert az_db_003.scan(mock_azure, subscription_id) == [] + + +def test_db_003_noncompliant_returns_one_finding(mock_azure, subscription_id): + server = make_resource( + id=_pg_id("pgflex-nossl", "Microsoft.DBforPostgreSQL/flexibleServers"), + name="pgflex-nossl", location="eastus", + ) + mock_azure.set_postgresql_flexible_servers([server]) + mock_azure.set_postgresql_flexible_server_parameters( + _RG, "pgflex-nossl", + [make_resource(name="require_secure_transport", value="off")], + ) + findings = az_db_003.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-DB-003" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["resource_name"] == "pgflex-nossl" diff --git a/tests/test_rules_identity.py b/tests/test_rules_identity.py index 26177bf..e1945be 100644 --- a/tests/test_rules_identity.py +++ b/tests/test_rules_identity.py @@ -1,12 +1,56 @@ -"""Rule regression tests for AZ-IDN-001. +"""Rule regression tests for the identity rules AZ-IDN-001 .. AZ-IDN-009. -Each test configures a MockAzureClient with fake role assignment objects and -calls the rule's scan() function directly. No network calls are made. +AZ-IDN-001/002 read data through MockAzureClient accessors. AZ-IDN-003..007 +call Microsoft Graph via ``requests.get`` — those tests monkeypatch +``requests.get`` with a small URL router. AZ-IDN-008/009 instantiate Azure +management SDK clients inside scan() — those tests monkeypatch the SDK class. +No real network calls are ever made. """ +import sys +from types import SimpleNamespace + +import requests + import scanner.rules.az_idn_001 as az_idn_001 +import scanner.rules.az_idn_002 as az_idn_002 +import scanner.rules.az_idn_003 as az_idn_003 +import scanner.rules.az_idn_004 as az_idn_004 +import scanner.rules.az_idn_005 as az_idn_005 +import scanner.rules.az_idn_006 as az_idn_006 +import scanner.rules.az_idn_007 as az_idn_007 +import scanner.rules.az_idn_008 as az_idn_008 +import scanner.rules.az_idn_009 as az_idn_009 from tests.helpers.mock_azure import make_resource + +class _Resp: + """Minimal fake ``requests`` response.""" + + def __init__(self, json_data, status_code=200): + self._json = json_data + self.status_code = status_code + + def raise_for_status(self): + if self.status_code >= 400: + raise requests.HTTPError(f"HTTP {self.status_code}") + + def json(self): + return self._json + + +def _install_router(monkeypatch, routes): + """Patch requests.get with a substring->response router. + + ``routes`` is a list of (url_substring, _Resp) pairs, checked in order. + """ + def fake_get(url, *args, **kwargs): + for needle, resp in routes: + if needle in url: + return resp + raise AssertionError(f"unexpected URL in test: {url}") + monkeypatch.setattr(requests, "get", fake_get) + _REQUIRED_FIELDS = { "rule_id", "rule_name", "severity", "category", "resource_id", "resource_name", "resource_type", @@ -51,3 +95,214 @@ def test_idn_001_noncompliant_returns_one_finding(mock_azure, subscription_id): assert finding["category"] == "Identity" assert finding["resource_name"] == "sp-owner-def456" assert finding["metadata"]["principal_id"] == "sp-owner-def456" + + +_GLOBAL_ADMIN_ROLE_ID = "62e90394-69f5-4237-9190-012177145e10" + + +# ── AZ-IDN-002: MFA enforced on admins via Conditional Access ─────────────── + +def test_idn_002_compliant_policy_enforces_mfa_returns_no_findings(mock_azure, subscription_id): + """A CA policy that is enabled, requires MFA, and covers all users is compliant.""" + policy = { + "state": "enabled", + "grantControls": {"builtInControls": ["mfa"]}, + "conditions": {"users": {"includeUsers": ["All"]}}, + } + mock_azure.set_conditional_access_policies([policy]) + assert az_idn_002.scan(mock_azure, subscription_id) == [] + + +def test_idn_002_noncompliant_no_policies_returns_one_finding(mock_azure, subscription_id): + """No Conditional Access policies at all must produce exactly one finding.""" + mock_azure.set_conditional_access_policies([]) + findings = az_idn_002.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-002" + assert findings[0]["severity"] == "HIGH" + + +# ── AZ-IDN-003: guest invitations not restricted ─────────────────────────── + +def test_idn_003_compliant_restricted_invites_returns_no_findings(mock_azure, subscription_id, monkeypatch): + _install_router(monkeypatch, [ + ("authorizationPolicy", _Resp({"id": "authPol", "allowInvitesFrom": "adminsAndGuestInviters"})), + ]) + assert az_idn_003.scan(mock_azure, subscription_id) == [] + + +def test_idn_003_noncompliant_everyone_can_invite_returns_one_finding(mock_azure, subscription_id, monkeypatch): + _install_router(monkeypatch, [ + ("authorizationPolicy", _Resp({"id": "authPol", "allowInvitesFrom": "everyone"})), + ]) + findings = az_idn_003.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-003" + assert findings[0]["severity"] == "MEDIUM" + + +# ── AZ-IDN-004: no PIM for admin roles ────────────────────────────────────── + +def test_idn_004_compliant_role_has_pim_returns_no_findings(mock_azure, subscription_id, monkeypatch): + role_defs = {"value": [{"id": "rd-ga", "displayName": "Global Administrator"}]} + schedules = {"value": [{"roleDefinitionId": "rd-ga"}]} + _install_router(monkeypatch, [ + ("roleDefinitions", _Resp(role_defs)), + ("roleEligibilitySchedules", _Resp(schedules)), + ]) + assert az_idn_004.scan(mock_azure, subscription_id) == [] + + +def test_idn_004_noncompliant_role_without_pim_returns_finding(mock_azure, subscription_id, monkeypatch): + role_defs = {"value": [{"id": "rd-ga", "displayName": "Global Administrator"}]} + schedules = {"value": []} + _install_router(monkeypatch, [ + ("roleEligibilitySchedules", _Resp(schedules)), # check more specific first + ("roleDefinitions", _Resp(role_defs)), + ]) + findings = az_idn_004.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-004" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["resource_name"] == "Global Administrator" + + +# ── AZ-IDN-005: guest with high-privilege role ────────────────────────────── + +def test_idn_005_compliant_member_user_returns_no_findings(mock_azure, subscription_id, monkeypatch): + role_defs = {"value": [{"id": "rd-ga", "displayName": "Global Administrator"}]} + assignments = {"value": [{"id": "a1", "roleDefinitionId": "rd-ga", "principalId": "u1"}]} + _install_router(monkeypatch, [ + ("/users/", _Resp({"id": "u1", "displayName": "Member User", "userType": "Member"})), + ("roleDefinitions", _Resp(role_defs)), + ("roleAssignments", _Resp(assignments)), + ]) + assert az_idn_005.scan(mock_azure, subscription_id) == [] + + +def test_idn_005_noncompliant_guest_admin_returns_finding(mock_azure, subscription_id, monkeypatch): + role_defs = {"value": [{"id": "rd-ga", "displayName": "Global Administrator"}]} + assignments = {"value": [{"id": "a1", "roleDefinitionId": "rd-ga", "principalId": "g1"}]} + _install_router(monkeypatch, [ + ("/users/", _Resp({"id": "g1", "displayName": "Guest Admin", "userPrincipalName": "guest@ext.com", "userType": "Guest"})), + ("roleDefinitions", _Resp(role_defs)), + ("roleAssignments", _Resp(assignments)), + ]) + findings = az_idn_005.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-005" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["resource_name"] == "Guest Admin" + + +# ── AZ-IDN-006: stale / non-expiring SP client secret ─────────────────────── + +def test_idn_006_compliant_fresh_secret_returns_no_findings(mock_azure, subscription_id, monkeypatch): + apps = {"value": [{ + "id": "app1", "displayName": "App One", "appId": "client-1", + "passwordCredentials": [{ + "keyId": "k1", "hint": "ab", + "startDateTime": "2099-01-01T00:00:00Z", + "endDateTime": "2099-02-01T00:00:00Z", + }], + }]} + _install_router(monkeypatch, [("/applications", _Resp(apps))]) + assert az_idn_006.scan(mock_azure, subscription_id) == [] + + +def test_idn_006_noncompliant_secret_no_expiry_returns_finding(mock_azure, subscription_id, monkeypatch): + apps = {"value": [{ + "id": "app1", "displayName": "App One", "appId": "client-1", + "passwordCredentials": [{ + "keyId": "k1", "hint": "ab", + "startDateTime": "2020-01-01T00:00:00Z", + "endDateTime": None, + }], + }]} + _install_router(monkeypatch, [("/applications", _Resp(apps))]) + findings = az_idn_006.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-006" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["metadata"]["no_expiry"] is True + + +# ── AZ-IDN-007: active user with no MFA registered ────────────────────────── + +def test_idn_007_compliant_user_with_mfa_returns_no_findings(mock_azure, subscription_id, monkeypatch): + regs = {"value": [{"id": "u1", "userPrincipalName": "a@x.com", "isEnabled": True, "isMfaRegistered": True}]} + _install_router(monkeypatch, [("credentialUserRegistrationDetails", _Resp(regs))]) + assert az_idn_007.scan(mock_azure, subscription_id) == [] + + +def test_idn_007_noncompliant_user_without_mfa_returns_finding(mock_azure, subscription_id, monkeypatch): + regs = {"value": [{"id": "u1", "userDisplayName": "No MFA User", "userPrincipalName": "a@x.com", "isEnabled": True, "isMfaRegistered": False}]} + _install_router(monkeypatch, [("credentialUserRegistrationDetails", _Resp(regs))]) + findings = az_idn_007.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-007" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["resource_name"] == "No MFA User" + + +# ── AZ-IDN-008: custom RBAC role with wildcard permissions ────────────────── + +def _install_fake_auth_client(monkeypatch, roles): + fake_module = SimpleNamespace( + AuthorizationManagementClient=lambda *a, **k: SimpleNamespace( + role_definitions=SimpleNamespace(list=lambda **kw: list(roles)) + ) + ) + monkeypatch.setitem(sys.modules, "azure.mgmt.authorization", fake_module) + + +def test_idn_008_compliant_specific_actions_returns_no_findings(mock_azure, subscription_id, monkeypatch): + role = make_resource( + role_name="ReaderPlus", name="rd1", id="/rd1", + permissions=[make_resource(actions=["Microsoft.Storage/*/read"])], + assignable_scopes=[f"/subscriptions/{_SUB}"], + ) + _install_fake_auth_client(monkeypatch, [role]) + assert az_idn_008.scan(mock_azure, subscription_id) == [] + + +def test_idn_008_noncompliant_wildcard_action_returns_finding(mock_azure, subscription_id, monkeypatch): + role = make_resource( + role_name="GodMode", name="rd2", id="/rd2", + permissions=[make_resource(actions=["*"])], + assignable_scopes=[f"/subscriptions/{_SUB}"], + ) + _install_fake_auth_client(monkeypatch, [role]) + findings = az_idn_008.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-008" + assert findings[0]["severity"] == "HIGH" + assert findings[0]["resource_name"] == "GodMode" + + +# ── AZ-IDN-009: no activity-log alert for role-assignment changes ─────────── + +def _install_fake_monitor_client(monkeypatch, alerts): + fake_module = SimpleNamespace( + MonitorManagementClient=lambda *a, **k: SimpleNamespace( + activity_log_alerts=SimpleNamespace( + list_by_subscription_id=lambda: list(alerts) + ) + ) + ) + monkeypatch.setitem(sys.modules, "azure.mgmt.monitor", fake_module) + + +def test_idn_009_compliant_alert_present_returns_no_findings(mock_azure, subscription_id, monkeypatch): + leaf = make_resource(field="operationName", equals="Microsoft.Authorization/roleAssignments/write") + alert = make_resource(enabled=True, condition=make_resource(all_of=[leaf])) + _install_fake_monitor_client(monkeypatch, [alert]) + assert az_idn_009.scan(mock_azure, subscription_id) == [] + + +def test_idn_009_noncompliant_no_alert_returns_finding(mock_azure, subscription_id, monkeypatch): + _install_fake_monitor_client(monkeypatch, []) + findings = az_idn_009.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-IDN-009" + assert findings[0]["severity"] == "MEDIUM" diff --git a/tests/test_rules_keyvault.py b/tests/test_rules_keyvault.py index c115ce5..f5ccc9f 100644 --- a/tests/test_rules_keyvault.py +++ b/tests/test_rules_keyvault.py @@ -1,10 +1,16 @@ -"""Rule regression tests for AZ-KV-002. +"""Rule regression tests for the Key Vault rules AZ-KV-001 .. AZ-KV-005. -Each test configures a MockAzureClient with a fake Key Vault object and calls -the rule's scan() function directly. No network calls are made. +Each test configures a MockAzureClient with fake Key Vault data and calls the +rule's scan() function directly. No network calls are made. """ +from datetime import datetime, timedelta, timezone + +import scanner.rules.az_kv_001 as az_kv_001 import scanner.rules.az_kv_002 as az_kv_002 +import scanner.rules.az_kv_003 as az_kv_003 +import scanner.rules.az_kv_004 as az_kv_004 +import scanner.rules.az_kv_005 as az_kv_005 from tests.helpers.mock_azure import make_resource _REQUIRED_FIELDS = { @@ -64,3 +70,100 @@ def test_kv_002_noncompliant_returns_one_finding(mock_azure, subscription_id): assert finding["category"] == "KeyVault" assert finding["resource_name"] == "kv-public" assert finding["metadata"]["resource_group"] == _RG + + +def _vault_with_props(name, **prop_kwargs): + return make_resource( + id=_kv_id(name), name=name, location="eastus", + properties=make_resource(**prop_kwargs), + ) + + +# ── AZ-KV-001: soft delete disabled ───────────────────────────────────────── + +def test_kv_001_compliant_returns_no_findings(mock_azure, subscription_id): + mock_azure.set_key_vaults([_vault_with_props("kv-sd-on", enable_soft_delete=True)]) + assert az_kv_001.scan(mock_azure, subscription_id) == [] + + +def test_kv_001_noncompliant_returns_one_finding(mock_azure, subscription_id): + mock_azure.set_key_vaults([ + _vault_with_props("kv-sd-off", enable_soft_delete=False, enable_purge_protection=False) + ]) + findings = az_kv_001.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-KV-001" + assert findings[0]["severity"] == "MEDIUM" + assert findings[0]["resource_name"] == "kv-sd-off" + + +# ── AZ-KV-003: diagnostic logging not enabled ─────────────────────────────── + +def test_kv_003_compliant_returns_no_findings(mock_azure, subscription_id): + vault = _vault_with_props("kv-diag-on") + mock_azure.set_key_vaults([vault]) + mock_azure.set_diagnostic_settings(vault.id, True) + assert az_kv_003.scan(mock_azure, subscription_id) == [] + + +def test_kv_003_noncompliant_returns_one_finding(mock_azure, subscription_id): + vault = _vault_with_props("kv-diag-off") + mock_azure.set_key_vaults([vault]) + mock_azure.set_diagnostic_settings(vault.id, False) + findings = az_kv_003.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-KV-003" + assert findings[0]["severity"] == "MEDIUM" + + +def test_kv_003_indeterminate_status_skips(mock_azure, subscription_id): + """When diagnostic status is None (cannot determine) the rule must not flag.""" + vault = _vault_with_props("kv-diag-unknown") + mock_azure.set_key_vaults([vault]) + mock_azure.set_diagnostic_settings(vault.id, None) + assert az_kv_003.scan(mock_azure, subscription_id) == [] + + +# ── AZ-KV-004: purge protection disabled ──────────────────────────────────── + +def test_kv_004_compliant_returns_no_findings(mock_azure, subscription_id): + mock_azure.set_key_vaults([_vault_with_props("kv-pp-on", enable_purge_protection=True)]) + assert az_kv_004.scan(mock_azure, subscription_id) == [] + + +def test_kv_004_noncompliant_returns_one_finding(mock_azure, subscription_id): + mock_azure.set_key_vaults([_vault_with_props("kv-pp-off", enable_purge_protection=False)]) + findings = az_kv_004.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-KV-004" + assert findings[0]["severity"] == "MEDIUM" + + +# ── AZ-KV-005: certificate expiring within 30 days ────────────────────────── + +def test_kv_005_compliant_far_future_expiry_returns_no_findings(mock_azure, subscription_id): + vault = _vault_with_props("kv-cert-ok") + mock_azure.set_key_vaults([vault]) + cert = make_resource( + name="cert-fresh", + expires_on=datetime.now(timezone.utc) + timedelta(days=200), + policy=None, + ) + mock_azure.set_key_vault_certificates("kv-cert-ok", [cert]) + assert az_kv_005.scan(mock_azure, subscription_id) == [] + + +def test_kv_005_noncompliant_expiring_soon_returns_one_finding(mock_azure, subscription_id): + vault = _vault_with_props("kv-cert-exp") + mock_azure.set_key_vaults([vault]) + cert = make_resource( + name="cert-expiring", + expires_on=datetime.now(timezone.utc) + timedelta(days=10), + policy=None, + ) + mock_azure.set_key_vault_certificates("kv-cert-exp", [cert]) + findings = az_kv_005.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-KV-005" + assert findings[0]["severity"] == "MEDIUM" + assert findings[0]["resource_name"] == "cert-expiring" diff --git a/tests/test_rules_network.py b/tests/test_rules_network.py index 8215c4d..d638233 100644 --- a/tests/test_rules_network.py +++ b/tests/test_rules_network.py @@ -1,9 +1,42 @@ -"""Rule regression tests for AZ-NET-001 and AZ-NET-002.""" +"""Rule regression tests for the network rules AZ-NET-001 .. AZ-NET-014. + +AZ-NET-007/008/009/010 construct a NetworkManagementClient inside scan(); those +tests monkeypatch azure.mgmt.network.NetworkManagementClient. The rest read data +through MockAzureClient accessors. No real network calls are made. +""" + +from types import SimpleNamespace import scanner.rules.az_net_001 as az_net_001 import scanner.rules.az_net_002 as az_net_002 +import scanner.rules.az_net_003 as az_net_003 +import scanner.rules.az_net_004 as az_net_004 +import scanner.rules.az_net_005 as az_net_005 +import scanner.rules.az_net_006 as az_net_006 +import scanner.rules.az_net_007 as az_net_007 +import scanner.rules.az_net_008 as az_net_008 +import scanner.rules.az_net_009 as az_net_009 +import scanner.rules.az_net_010 as az_net_010 +import scanner.rules.az_net_011 as az_net_011 +import scanner.rules.az_net_012 as az_net_012 +import scanner.rules.az_net_013 as az_net_013 +import scanner.rules.az_net_014 as az_net_014 +import scanner.rules.az_net_015 as az_net_015 from tests.helpers.mock_azure import make_resource + +def _install_network_client(monkeypatch, **collections): + """Patch azure.mgmt.network.NetworkManagementClient with a fake exposing + the given sub-resources, each as ``.list_all() -> list``.""" + def make_client(*args, **kwargs): + ns = {} + for attr, items in collections.items(): + ns[attr] = SimpleNamespace(list_all=lambda items=items: list(items)) + return SimpleNamespace(**ns) + monkeypatch.setattr( + "azure.mgmt.network.NetworkManagementClient", make_client + ) + _REQUIRED_FIELDS = { "rule_id", "rule_name", "severity", "category", "resource_id", "resource_name", "resource_type", @@ -103,3 +136,362 @@ def test_net_002_noncompliant_returns_one_finding(mock_azure, subscription_id): assert finding["severity"] == "HIGH" assert finding["category"] == "Network" assert finding["resource_name"] == "nsg-rdp-open" + + +def _vnet_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Network/virtualNetworks/{name}" + ) + + +# ── AZ-NET-003: unrestricted inbound on 443 ───────────────────────────────── + +def test_net_003_compliant_returns_no_findings(mock_azure, subscription_id): + nsg = make_resource( + id=_nsg_id("nsg-443-ok"), name="nsg-443-ok", + security_rules=[_allow_rule("Allow443", "443", "10.0.0.0/24")], + ) + mock_azure.set_network_security_groups([nsg]) + assert az_net_003.scan(mock_azure, subscription_id) == [] + + +def test_net_003_noncompliant_returns_one_finding(mock_azure, subscription_id): + nsg = make_resource( + id=_nsg_id("nsg-443-open"), name="nsg-443-open", + security_rules=[_open_allow_rule("Allow443Internet", "443")], + ) + mock_azure.set_network_security_groups([nsg]) + findings = az_net_003.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-003" + assert findings[0]["severity"] == "HIGH" + + +# ── AZ-NET-004: NSG with no rules ─────────────────────────────────────────── + +def test_net_004_compliant_returns_no_findings(mock_azure, subscription_id): + nsg = make_resource( + id=_nsg_id("nsg-has-rules"), name="nsg-has-rules", + security_rules=[_allow_rule("AllowHTTPS", "443")], + ) + mock_azure.set_network_security_groups([nsg]) + assert az_net_004.scan(mock_azure, subscription_id) == [] + + +def test_net_004_noncompliant_returns_one_finding(mock_azure, subscription_id): + nsg = make_resource(id=_nsg_id("nsg-empty"), name="nsg-empty", security_rules=[]) + mock_azure.set_network_security_groups([nsg]) + findings = az_net_004.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-004" + assert findings[0]["severity"] == "MEDIUM" + + +# ── AZ-NET-005: VNet without DDoS protection ──────────────────────────────── + +def test_net_005_compliant_returns_no_findings(mock_azure, subscription_id): + vnet = make_resource( + id=_vnet_id("vnet-ddos"), name="vnet-ddos", + ddos_protection_plan=make_resource(id="plan1"), + enable_ddos_protection=True, + ) + mock_azure.set_virtual_networks([vnet]) + assert az_net_005.scan(mock_azure, subscription_id) == [] + + +def test_net_005_noncompliant_returns_one_finding(mock_azure, subscription_id): + vnet = make_resource( + id=_vnet_id("vnet-noddos"), name="vnet-noddos", + ddos_protection_plan=None, enable_ddos_protection=False, + ) + mock_azure.set_virtual_networks([vnet]) + findings = az_net_005.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-005" + assert findings[0]["severity"] == "LOW" + + +# ── AZ-NET-006: unassociated public IP ────────────────────────────────────── + +def test_net_006_compliant_returns_no_findings(mock_azure, subscription_id): + pip = make_resource( + id="/pip/associated", name="pip-associated", + ip_configuration=make_resource(id="nic-ipcfg"), nat_gateway=None, + ) + mock_azure.set_public_ip_addresses([pip]) + assert az_net_006.scan(mock_azure, subscription_id) == [] + + +def test_net_006_noncompliant_returns_one_finding(mock_azure, subscription_id): + pip = make_resource( + id="/pip/orphan", name="pip-orphan", + ip_configuration=None, nat_gateway=None, + ip_address="20.1.2.3", sku=make_resource(name="Standard"), + ) + mock_azure.set_public_ip_addresses([pip]) + findings = az_net_006.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-006" + assert findings[0]["severity"] == "LOW" + + +# ── AZ-NET-007: Application Gateway without WAF (SDK) ──────────────────────── + +def test_net_007_compliant_returns_no_findings(mock_azure, subscription_id, monkeypatch): + agw = make_resource( + id="/agw1", name="agw-waf", location="eastus", + sku=make_resource(name="WAF_v2"), + web_application_firewall_configuration=make_resource(enabled=True), + ) + _install_network_client(monkeypatch, application_gateways=[agw]) + assert az_net_007.scan(mock_azure, subscription_id) == [] + + +def test_net_007_noncompliant_returns_one_finding(mock_azure, subscription_id, monkeypatch): + agw = make_resource( + id="/agw2", name="agw-nowaf", location="eastus", + sku=make_resource(name="Standard_v2"), + web_application_firewall_configuration=None, + ) + _install_network_client(monkeypatch, application_gateways=[agw]) + findings = az_net_007.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-007" + assert findings[0]["severity"] == "HIGH" + + +# ── AZ-NET-008: Load balancer with no backend pool (SDK) ──────────────────── + +def test_net_008_compliant_returns_no_findings(mock_azure, subscription_id, monkeypatch): + lb = make_resource( + id="/lb1", name="lb-ok", location="eastus", + backend_address_pools=[make_resource(name="pool1")], + ) + _install_network_client(monkeypatch, load_balancers=[lb]) + assert az_net_008.scan(mock_azure, subscription_id) == [] + + +def test_net_008_noncompliant_returns_one_finding(mock_azure, subscription_id, monkeypatch): + lb = make_resource( + id="/lb2", name="lb-empty", location="eastus", backend_address_pools=[] + ) + _install_network_client(monkeypatch, load_balancers=[lb]) + findings = az_net_008.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-008" + assert findings[0]["severity"] == "LOW" + + +# ── AZ-NET-009: VPN gateway using IKEv1 (SDK) ─────────────────────────────── +# NOTE / BUG: the rule calls +# client.virtual_network_gateway_connections.list_all() +# but the real azure-mgmt-network SDK exposes only .list(resource_group) on that +# operations group — there is NO list_all(). In production the AttributeError is +# swallowed by the rule's except, so AZ-NET-009 NEVER fires (silent false +# negative). The fake client below provides list_all() so these tests can still +# validate the rule's intended IKEv1 detection logic — see the validation report. + +def test_net_009_compliant_returns_no_findings(mock_azure, subscription_id, monkeypatch): + conn = make_resource( + id="/conn1", name="conn-ikev2", location="eastus", + connection_protocol="IKEv2", connection_type="IPsec", + ) + _install_network_client(monkeypatch, virtual_network_gateway_connections=[conn]) + assert az_net_009.scan(mock_azure, subscription_id) == [] + + +def test_net_009_noncompliant_returns_one_finding(mock_azure, subscription_id, monkeypatch): + conn = make_resource( + id="/conn2", name="conn-ikev1", location="eastus", + connection_protocol="IKEv1", connection_type="IPsec", + ) + _install_network_client(monkeypatch, virtual_network_gateway_connections=[conn]) + findings = az_net_009.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-009" + assert findings[0]["severity"] == "HIGH" + + +# ── AZ-NET-010: subnet with no NSG (SDK) ──────────────────────────────────── + +def test_net_010_compliant_returns_no_findings(mock_azure, subscription_id, monkeypatch): + vnet = make_resource( + id=_vnet_id("vnet-subnets"), name="vnet-subnets", + subnets=[make_resource( + name="app-subnet", id="/subnet/app", + network_security_group=make_resource(id="nsg1"), + address_prefix="10.0.1.0/24", + )], + ) + _install_network_client(monkeypatch, virtual_networks=[vnet]) + assert az_net_010.scan(mock_azure, subscription_id) == [] + + +def test_net_010_noncompliant_returns_one_finding(mock_azure, subscription_id, monkeypatch): + vnet = make_resource( + id=_vnet_id("vnet-bare"), name="vnet-bare", + subnets=[make_resource( + name="bare-subnet", id="/subnet/bare", + network_security_group=None, address_prefix="10.0.2.0/24", + )], + ) + _install_network_client(monkeypatch, virtual_networks=[vnet]) + findings = az_net_010.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-010" + assert findings[0]["severity"] == "HIGH" + + +def test_net_010_skips_reserved_subnets(mock_azure, subscription_id, monkeypatch): + """Gateway/Firewall/Bastion subnets without an NSG must not be flagged.""" + vnet = make_resource( + id=_vnet_id("vnet-gw"), name="vnet-gw", + subnets=[make_resource( + name="GatewaySubnet", id="/subnet/gw", + network_security_group=None, address_prefix="10.0.3.0/24", + )], + ) + _install_network_client(monkeypatch, virtual_networks=[vnet]) + assert az_net_010.scan(mock_azure, subscription_id) == [] + + +# ── AZ-NET-011: Network Watcher not enabled in all regions ────────────────── + +def test_net_011_compliant_returns_no_findings(mock_azure, subscription_id): + mock_azure.set_regions_with_resources(["eastus", "westus"]) + mock_azure.set_network_watcher_regions(["eastus", "westus"]) + assert az_net_011.scan(mock_azure, subscription_id) == [] + + +def test_net_011_noncompliant_returns_finding_per_unmonitored_region(mock_azure, subscription_id): + mock_azure.set_regions_with_resources(["eastus", "westus"]) + mock_azure.set_network_watcher_regions(["eastus"]) + findings = az_net_011.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-011" + assert findings[0]["severity"] == "LOW" + assert findings[0]["resource_name"] == "westus" + + +# ── AZ-NET-012: NSG flow logs not enabled ─────────────────────────────────── +# NOTE: this rule depends on azure_client.get_nsg_flow_logs(resource_group), +# which the REAL AzureClient does not implement — so in production the rule's +# bare except swallows the AttributeError and flags every NSG (false positive). +# The mock provides the expected method, so these tests validate the rule's +# *intended* detection logic and isolate the missing-method bug as root cause. + +def test_net_012_compliant_returns_no_findings(mock_azure, subscription_id): + nsg_id = _nsg_id("nsg-flowlogs") + nsg = make_resource(id=nsg_id, name="nsg-flowlogs") + mock_azure.set_network_security_groups([nsg]) + mock_azure.set_nsg_flow_logs( + _RG, [make_resource(target_resource_id=nsg_id, enabled=True)] + ) + assert az_net_012.scan(mock_azure, subscription_id) == [] + + +def test_net_012_noncompliant_returns_one_finding(mock_azure, subscription_id): + nsg_id = _nsg_id("nsg-noflow") + nsg = make_resource(id=nsg_id, name="nsg-noflow") + mock_azure.set_network_security_groups([nsg]) + mock_azure.set_nsg_flow_logs(_RG, []) # no flow logs configured + findings = az_net_012.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-012" + assert findings[0]["severity"] == "MEDIUM" + + +# ── AZ-NET-013: Azure Firewall not enabled on VNet ────────────────────────── + +def test_net_013_compliant_returns_no_findings(mock_azure, subscription_id): + vnet_id = _vnet_id("vnet-fw") + fw = make_resource(ip_configurations=[ + make_resource(subnet=make_resource(id=f"{vnet_id}/subnets/AzureFirewallSubnet")) + ]) + mock_azure.set_all_azure_firewalls([fw]) + mock_azure.set_virtual_networks([make_resource(id=vnet_id, name="vnet-fw", location="eastus")]) + assert az_net_013.scan(mock_azure, subscription_id) == [] + + +def test_net_013_noncompliant_returns_one_finding(mock_azure, subscription_id): + vnet_id = _vnet_id("vnet-unprotected") + mock_azure.set_all_azure_firewalls([]) # empty list, not None + mock_azure.set_virtual_networks([make_resource(id=vnet_id, name="vnet-unprotected", location="eastus")]) + findings = az_net_013.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-013" + assert findings[0]["severity"] == "HIGH" + + +def test_net_013_firewall_listing_none_skips(mock_azure, subscription_id): + """When the firewall listing fails (None) the rule must not flag VNets.""" + mock_azure.set_all_azure_firewalls(None) + mock_azure.set_virtual_networks([make_resource(id=_vnet_id("v"), name="v", location="eastus")]) + assert az_net_013.scan(mock_azure, subscription_id) == [] + + +# ── AZ-NET-014: VNet peering without gateway-transit restrictions ─────────── + +def test_net_014_compliant_returns_no_findings(mock_azure, subscription_id): + vnet = make_resource(id=_vnet_id("vnet-peer-ok"), name="vnet-peer-ok") + mock_azure.set_virtual_networks([vnet]) + mock_azure.set_vnet_peerings(_RG, "vnet-peer-ok", [ + make_resource(name="peer1", allow_gateway_transit=False, use_remote_gateways=False) + ]) + assert az_net_014.scan(mock_azure, subscription_id) == [] + + +def test_net_014_noncompliant_returns_one_finding(mock_azure, subscription_id): + vnet = make_resource(id=_vnet_id("vnet-peer-bad"), name="vnet-peer-bad") + mock_azure.set_virtual_networks([vnet]) + mock_azure.set_vnet_peerings(_RG, "vnet-peer-bad", [ + make_resource(name="peer1", allow_gateway_transit=True, use_remote_gateways=False) + ]) + findings = az_net_014.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-014" + assert findings[0]["severity"] == "MEDIUM" + + +# ── AZ-NET-015: public DNS zone exposing internal infrastructure ──────────── + +def _dns_zone_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Network/dnsZones/{name}" + ) + + +def test_net_015_compliant_public_records_returns_no_findings(mock_azure, subscription_id): + """A public zone with only public IPs and non-internal names is compliant.""" + zone = make_resource(id=_dns_zone_id("example.com"), name="example.com", zone_type="Public") + mock_azure.set_dns_zones([zone]) + mock_azure.set_dns_record_sets(_RG, "example.com", [ + make_resource(name="www", a_records=[make_resource(ipv4_address="20.1.2.3")]), + ]) + assert az_net_015.scan(mock_azure, subscription_id) == [] + + +def test_net_015_noncompliant_rfc1918_record_returns_one_finding(mock_azure, subscription_id): + """A public zone with an A record pointing at an RFC1918 IP must flag.""" + zone = make_resource(id=_dns_zone_id("corp.example.com"), name="corp.example.com", zone_type="Public") + mock_azure.set_dns_zones([zone]) + mock_azure.set_dns_record_sets(_RG, "corp.example.com", [ + make_resource(name="app", a_records=[make_resource(ipv4_address="10.0.0.5")]), + ]) + findings = az_net_015.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-NET-015" + assert findings[0]["severity"] == "MEDIUM" + assert findings[0]["resource_name"] == "corp.example.com" + + +def test_net_015_skips_private_zone(mock_azure, subscription_id): + """A non-public (Private) zone must never be flagged, even with RFC1918 records.""" + zone = make_resource(id=_dns_zone_id("internal.local"), name="internal.local", zone_type="Private") + mock_azure.set_dns_zones([zone]) + mock_azure.set_dns_record_sets(_RG, "internal.local", [ + make_resource(name="db", a_records=[make_resource(ipv4_address="10.0.0.9")]), + ]) + assert az_net_015.scan(mock_azure, subscription_id) == [] diff --git a/tests/test_rules_storage.py b/tests/test_rules_storage.py index 4257102..50edaf5 100644 --- a/tests/test_rules_storage.py +++ b/tests/test_rules_storage.py @@ -6,6 +6,9 @@ import scanner.rules.az_stor_001 as az_stor_001 import scanner.rules.az_stor_002 as az_stor_002 +import scanner.rules.az_stor_003 as az_stor_003 +import scanner.rules.az_stor_004 as az_stor_004 +import scanner.rules.az_stor_005 as az_stor_005 from tests.helpers.mock_azure import make_resource _REQUIRED_FIELDS = { @@ -83,3 +86,77 @@ def test_stor_002_noncompliant_returns_one_finding(mock_azure, subscription_id): assert finding["severity"] == "HIGH" assert finding["category"] == "Storage" assert finding["resource_name"] == "http-allowed-storage" + + +# ── AZ-STOR-003: no lifecycle management policy ───────────────────────────── + +def test_stor_003_compliant_returns_no_findings(mock_azure, subscription_id): + acct = make_resource(id=_storage_id("sa-lifecycle"), name="sa-lifecycle", location="eastus") + mock_azure.set_storage_accounts([acct]) + mock_azure.set_storage_lifecycle_policy(_RG, "sa-lifecycle", True) + assert az_stor_003.scan(mock_azure, subscription_id) == [] + + +def test_stor_003_noncompliant_returns_one_finding(mock_azure, subscription_id): + acct = make_resource(id=_storage_id("sa-nolifecycle"), name="sa-nolifecycle", location="eastus") + mock_azure.set_storage_accounts([acct]) + mock_azure.set_storage_lifecycle_policy(_RG, "sa-nolifecycle", False) + findings = az_stor_003.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-STOR-003" + assert findings[0]["severity"] == "MEDIUM" + + +def test_stor_003_indeterminate_skips(mock_azure, subscription_id): + """When lifecycle status is None (cannot determine) the rule must not flag.""" + acct = make_resource(id=_storage_id("sa-unknown"), name="sa-unknown", location="eastus") + mock_azure.set_storage_accounts([acct]) + mock_azure.set_storage_lifecycle_policy(_RG, "sa-unknown", None) + assert az_stor_003.scan(mock_azure, subscription_id) == [] + + +# ── AZ-STOR-004: diagnostic logging disabled (per blob/queue/table) ───────── + +def test_stor_004_compliant_all_services_logged_returns_no_findings(mock_azure, subscription_id): + acct = make_resource(id=_storage_id("sa-logged"), name="sa-logged", location="eastus") + mock_azure.set_storage_accounts([acct]) + for svc in ("blob", "queue", "table"): + mock_azure.set_storage_service_logging(_RG, "sa-logged", svc, True) + assert az_stor_004.scan(mock_azure, subscription_id) == [] + + +def test_stor_004_noncompliant_blob_unlogged_returns_one_finding(mock_azure, subscription_id): + acct = make_resource(id=_storage_id("sa-blobunlogged"), name="sa-blobunlogged", location="eastus") + mock_azure.set_storage_accounts([acct]) + mock_azure.set_storage_service_logging(_RG, "sa-blobunlogged", "blob", False) + mock_azure.set_storage_service_logging(_RG, "sa-blobunlogged", "queue", True) + mock_azure.set_storage_service_logging(_RG, "sa-blobunlogged", "table", True) + findings = az_stor_004.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-STOR-004" + assert findings[0]["severity"] == "MEDIUM" + assert findings[0]["metadata"]["service"] == "blob" + + +# ── AZ-STOR-005: not geo-redundant ────────────────────────────────────────── + +def test_stor_005_compliant_grs_returns_no_findings(mock_azure, subscription_id): + acct = make_resource( + id=_storage_id("sa-grs"), name="sa-grs", location="eastus", + sku=make_resource(name="Standard_GRS"), + ) + mock_azure.set_storage_accounts([acct]) + assert az_stor_005.scan(mock_azure, subscription_id) == [] + + +def test_stor_005_noncompliant_lrs_returns_one_finding(mock_azure, subscription_id): + acct = make_resource( + id=_storage_id("sa-lrs"), name="sa-lrs", location="eastus", + sku=make_resource(name="Standard_LRS"), + ) + mock_azure.set_storage_accounts([acct]) + findings = az_stor_005.scan(mock_azure, subscription_id) + assert len(findings) == 1 + assert findings[0]["rule_id"] == "AZ-STOR-005" + assert findings[0]["severity"] == "MEDIUM" + assert findings[0]["resource_name"] == "sa-lrs"