From 7692a1d76a1dabb74adf2eda13f47e4db2b23b09 Mon Sep 17 00:00:00 2001 From: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com> Date: Mon, 8 Jun 2026 16:51:58 +0200 Subject: [PATCH] feat(okta): add network zone STIG check (#11463) Co-authored-by: Daniel Barranquero --- .../providers/okta/authentication.mdx | 10 +- .../providers/okta/getting-started-okta.mdx | 14 +- prowler/CHANGELOG.md | 1 + prowler/providers/okta/okta_provider.py | 1 + .../okta/services/network/__init__.py | 0 .../okta/services/network/lib/__init__.py | 0 .../network/lib/network_zone_helpers.py | 88 +++ .../__init__.py | 0 ...one_block_anonymized_proxies.metadata.json | 37 ++ .../network_zone_block_anonymized_proxies.py | 77 +++ .../services/network/network_zone_client.py | 4 + .../services/network/network_zone_service.py | 234 ++++++++ tests/providers/okta/okta_fixtures.py | 2 + ...work_zone_block_anonymized_proxies_test.py | 153 +++++ .../network_zone/network_zone_fixtures.py | 42 ++ .../network_zone/network_zone_service_test.py | 560 ++++++++++++++++++ 16 files changed, 1213 insertions(+), 10 deletions(-) create mode 100644 prowler/providers/okta/services/network/__init__.py create mode 100644 prowler/providers/okta/services/network/lib/__init__.py create mode 100644 prowler/providers/okta/services/network/lib/network_zone_helpers.py create mode 100644 prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/__init__.py create mode 100644 prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json create mode 100644 prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py create mode 100644 prowler/providers/okta/services/network/network_zone_client.py create mode 100644 prowler/providers/okta/services/network/network_zone_service.py create mode 100644 tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py create mode 100644 tests/providers/okta/services/network_zone/network_zone_fixtures.py create mode 100644 tests/providers/okta/services/network_zone/network_zone_service_test.py diff --git a/docs/user-guide/providers/okta/authentication.mdx b/docs/user-guide/providers/okta/authentication.mdx index ca56a0535b..67eb72e153 100644 --- a/docs/user-guide/providers/okta/authentication.mdx +++ b/docs/user-guide/providers/okta/authentication.mdx @@ -35,6 +35,7 @@ The bundled checks require the following read-only scopes: - `okta.policies.read` - `okta.brands.read` - `okta.apps.read` +- `okta.networkZones.read` - `okta.logStreams.read` - `okta.idps.read` @@ -45,6 +46,7 @@ Additional scopes will be needed as more services and checks are added. These ar | `okta.policies.read` | Sign-on, password, authentication, and `USER_LIFECYCLE` (Workflow > Automations) policies | | `okta.brands.read` | Sign-in page customizations (DOD Notice and Consent Banner check) | | `okta.apps.read` | First-party app settings (Okta Admin Console session), integrated app inventory, and the Authentication Policies bound to Okta applications | +| `okta.networkZones.read` | Network Zone inventory and anonymized-proxy blocklist checks | | `okta.logStreams.read` | Log Stream configuration (`/api/v1/logStreams`) | | `okta.idps.read` | Identity Providers, including Smart Card (X509) IdPs (`/api/v1/idps`) | @@ -128,7 +130,7 @@ Okta displays the private key **only once**. If you close the modal without copy ### 5. Grant the required OAuth scopes -On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled checks require `okta.policies.read`, `okta.brands.read`, and `okta.apps.read`. +On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled checks require `okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.logStreams.read`, and `okta.idps.read`. ![Okta — grant OAuth scopes](/user-guide/providers/okta/images/grant-permissions.png) @@ -164,8 +166,8 @@ export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem" # or export OKTA_PRIVATE_KEY="$(cat /secure/path/to/prowler-okta.pem)" -# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" +# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" uv run python prowler-cli.py okta ``` @@ -206,7 +208,7 @@ Prowler validates credentials at startup by listing one sign-on policy. This err Raised when the credential probe succeeds at the OAuth layer but the request is rejected because the service app lacks the required scope or admin role: -- **`invalid_scope`** — one of the requested scopes (`okta.policies.read`, `okta.brands.read`, or `okta.apps.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**. +- **`invalid_scope`** — one of the requested scopes (`okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.logStreams.read`, and `okta.idps.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**. - **`Forbidden` / `not authorized`** — no admin role is assigned to the service app. Assign **Read-Only Administrator** (or **Super Administrator** for the first-party application checks) from **Admin roles**. ### Application-service checks return MANUAL on first-party apps diff --git a/docs/user-guide/providers/okta/getting-started-okta.mdx b/docs/user-guide/providers/okta/getting-started-okta.mdx index 6086afefb3..740cfbf729 100644 --- a/docs/user-guide/providers/okta/getting-started-okta.mdx +++ b/docs/user-guide/providers/okta/getting-started-okta.mdx @@ -12,7 +12,7 @@ Set up authentication for Okta with the [Okta Authentication](/user-guide/provid - An Okta organization. The UI examples below use **Identity Engine** terminology such as **Global Session Policy**; Classic Engine exposes the equivalent sign-on policy concepts under older names. - A **Super Administrator** account on that organization for the one-time service-app setup. -- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read`, `okta.brands.read`, and `okta.apps.read` scopes granted and an admin role assigned. **Read-Only Administrator** covers every `signon` check and runs the per-app network-zone check against the apps the service app can see (under Read-Only Administrator that is typically only the service app's own row — the rest of the org's app inventory stays invisible). **Super Administrator** is required additionally to evaluate the five first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, phishing-resistant authentication) and to widen the network-zone check to the full app inventory — see [Okta Authentication](/user-guide/providers/okta/authentication#required-admin-role) for the full breakdown. +- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.logStreams.read`, and `okta.idps.read` scopes granted and an admin role assigned. **Read-Only Administrator** covers the Sign-On, Network, User, System Log, and Identity Provider checks, and runs the per-app application network-zone check against the apps the service app can see (under Read-Only Administrator that is typically only the service app's own row — the rest of the org's app inventory stays invisible). **Super Administrator** is required additionally to evaluate the five first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, phishing-resistant authentication) and to widen the application network-zone check to the full app inventory — see [Okta Authentication](/user-guide/providers/okta/authentication#required-admin-role) for the full breakdown. - Python 3.10+ and Prowler 5.27.0 or later installed locally. @@ -85,8 +85,8 @@ Follow the [Okta Authentication](/user-guide/providers/okta/authentication) guid export OKTA_ORG_DOMAIN="acme.okta.com" export OKTA_CLIENT_ID="0oa1234567890abcdef" export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem" -# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read" +# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" ``` The private key file may contain either a PEM-encoded RSA key or a JWK JSON document. @@ -147,6 +147,7 @@ Prowler for Okta includes security checks across the following services: | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- | | **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) | | **Application** | Okta Admin Console sign-on settings plus Authentication Policy controls for Okta applications (session idle, MFA, phishing resistance, network zones) | +| **Network** | Network Zone blocklists for anonymized proxy sources | | **User** | User lifecycle automations (inactivity-based deprovisioning) | | **System Log** | Log Stream configuration that off-loads audit records to a central SIEM | | **Identity Provider** | Identity Providers, including Smart Card (X509) IdP status and certificate-chain visibility | @@ -161,11 +162,12 @@ This is stricter than simply finding the same timeout value somewhere else in th ### Default Scopes -Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On, Application, User, System Log, and Identity Provider services: +Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On, Application, Network, User, System Log, and Identity Provider services: - `okta.policies.read` - `okta.brands.read` - `okta.apps.read` +- `okta.networkZones.read` - `okta.logStreams.read` - `okta.idps.read` @@ -175,10 +177,10 @@ When additional checks are enabled — or when running against a service app tha ```bash # Environment variable — comma-separated -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.logStreams.read,okta.idps.read,okta.users.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read,okta.users.read" # CLI flag — space-separated -prowler okta --okta-scopes okta.policies.read okta.brands.read okta.apps.read okta.logStreams.read okta.idps.read okta.users.read +prowler okta --okta-scopes okta.policies.read okta.brands.read okta.apps.read okta.networkZones.read okta.logStreams.read okta.idps.read okta.users.read ``` For the full catalog of OAuth scopes exposed by the Okta Management API, refer to the [Okta OAuth 2.0 scopes documentation](https://developer.okta.com/docs/api/oauth2/). diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 1389b684cf..fab236e9e1 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `sagemaker_models_monitor_enabled` check for AWS provider, verifying that each SageMaker monitoring schedule is in the `Scheduled` state so data and model drift is actively detected [(#11278)](https://github.com/prowler-cloud/prowler/pull/11278) - DORA (Digital Operational Resilience Act, Regulation (EU) 2022/2554) universal compliance framework with AWS provider coverage across the five DORA pillars [(#11131)](https://github.com/prowler-cloud/prowler/pull/11131) +- Okta network zone check to detect whether anonymized proxy traffic is blocked [(#11463)](https://github.com/prowler-cloud/prowler/pull/11463) - `elbv2_alb_drop_invalid_header_fields_enabled` check for AWS provider, verifying Application Load Balancers have `routing.http.drop_invalid_header_fields.enabled` set to `true` to mitigate HTTP desync attacks (AWS FSBP ELB.4) [(#11471)](https://github.com/prowler-cloud/prowler/pull/11471) - `user`, `systemlog` and `idp` service for Okta provider with `user_inactivity_automation_35d_enabled`, `systemlog_streaming_enabled` and `idp_smart_card_dod_approved_ca` checks [(#11496)](https://github.com/prowler-cloud/prowler/pull/11496) diff --git a/prowler/providers/okta/okta_provider.py b/prowler/providers/okta/okta_provider.py index 678e9edbd4..4356adbc57 100644 --- a/prowler/providers/okta/okta_provider.py +++ b/prowler/providers/okta/okta_provider.py @@ -36,6 +36,7 @@ DEFAULT_SCOPES = [ "okta.policies.read", "okta.brands.read", "okta.apps.read", + "okta.networkZones.read", "okta.logStreams.read", "okta.idps.read", ] diff --git a/prowler/providers/okta/services/network/__init__.py b/prowler/providers/okta/services/network/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/network/lib/__init__.py b/prowler/providers/okta/services/network/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/network/lib/network_zone_helpers.py b/prowler/providers/okta/services/network/lib/network_zone_helpers.py new file mode 100644 index 0000000000..bcd906b01a --- /dev/null +++ b/prowler/providers/okta/services/network/lib/network_zone_helpers.py @@ -0,0 +1,88 @@ +from prowler.lib.check.models import CheckReportOkta +from prowler.providers.okta.services.network.network_zone_service import ( + NetworkZoneSummary, + OktaNetworkZone, +) + +ANONYMIZER_CATEGORY_MARKERS = ( + "ANONYM", + "PROXY", + "TOR", + "VPN", +) + + +def active_blocklist_zones( + network_zones: dict[str, OktaNetworkZone], +) -> list[OktaNetworkZone]: + """Return active Network Zones configured for blocklist usage.""" + return sorted( + [ + zone + for zone in network_zones.values() + if zone.status.upper() == "ACTIVE" and zone.usage.upper() == "BLOCKLIST" + ], + key=lambda zone: (zone.name, zone.id), + ) + + +def is_ip_blocklist_with_entries(zone: OktaNetworkZone) -> bool: + """Return True when an IP blocklist zone contains gateway/proxy entries.""" + return zone.type.upper() == "IP" and bool(zone.gateways or zone.proxies) + + +def is_enhanced_dynamic_anonymizer_blocklist(zone: OktaNetworkZone) -> bool: + """Return True for active Enhanced Dynamic blocklists covering anonymizers.""" + if zone.type.upper() != "DYNAMIC_V2": + return False + categories = [category.upper() for category in zone.ip_service_categories] + return any( + marker in category + for category in categories + for marker in ANONYMIZER_CATEGORY_MARKERS + ) + + +def compliant_anonymized_proxy_blocklist( + network_zones: dict[str, OktaNetworkZone], +) -> tuple[OktaNetworkZone | None, str]: + """Find the Network Zone that satisfies anonymized-proxy blocklisting.""" + for zone in active_blocklist_zones(network_zones): + if is_enhanced_dynamic_anonymizer_blocklist(zone): + return zone, "active Enhanced Dynamic Zone blocklist for anonymizers" + return None, "" + + +def static_ip_blocklist_evidence( + network_zones: dict[str, OktaNetworkZone], +) -> OktaNetworkZone | None: + """Return static IP blocklist evidence that requires human validation.""" + for zone in active_blocklist_zones(network_zones): + if is_ip_blocklist_with_entries(zone): + return zone + return None + + +_SCOPE_ADVICE = ( + "Grant it on the Okta API Scopes tab of the service app in the Okta Admin " + "Console, then re-run the check." +) + + +def missing_network_zone_scope_finding( + metadata, org_domain: str, scope: str +) -> CheckReportOkta: + """Build the MANUAL finding emitted when Network Zones cannot be listed.""" + resource = NetworkZoneSummary( + id="network-zones-scope-missing", + name="(scope not granted)", + ) + report = CheckReportOkta( + metadata=metadata, resource=resource, org_domain=org_domain + ) + report.status = "MANUAL" + report.status_extended = ( + f"Could not retrieve Okta Network Zones: the Okta service app " + f"is missing the required `{scope}` API scope. {_SCOPE_ADVICE}" + ) + return report diff --git a/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/__init__.py b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json new file mode 100644 index 0000000000..d2a4791c17 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "network_zone_block_anonymized_proxies", + "CheckTitle": "Okta uses active Network Zone blocklists for anonymized proxy sources", + "CheckType": [], + "ServiceName": "network", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "network", + "Description": "**Okta Network Zone blocklists** should block anonymized proxy access before authentication. Enhanced Dynamic Zone anonymizer categories provide direct coverage; static IP blocklists show gateway/proxy blocking but cannot prove full anonymizer-provider coverage.", + "Risk": "**Anonymized proxy access** lets attackers hide source networks while attempting credential attacks, session establishment, or policy bypass from untrusted infrastructure.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/networkzone", + "https://help.okta.com/en-us/content/topics/security/network/about-enhanced-dynamic-zones.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > Networks: configure BlockedIpZone gateway/proxy IP entries or activate DefaultEnhancedDynamicZone / Enhanced Dynamic Zone blocklisting for anonymizers.", + "Terraform": "" + }, + "Recommendation": { + "Text": "**Prefer an active Enhanced Dynamic Zone blocklist** for anonymizers. If you use a static IP Network Zone blocklist, keep gateway/proxy entries actively maintained because Prowler cannot prove full anonymizer-provider coverage from static entries alone.", + "Url": "https://hub.prowler.com/check/network_zone_block_anonymized_proxies" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py new file mode 100644 index 0000000000..3da973b415 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies.py @@ -0,0 +1,77 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.network.lib.network_zone_helpers import ( + compliant_anonymized_proxy_blocklist, + missing_network_zone_scope_finding, + static_ip_blocklist_evidence, +) +from prowler.providers.okta.services.network.network_zone_client import ( + network_zone_client, +) +from prowler.providers.okta.services.network.network_zone_service import ( + NetworkZoneSummary, +) + + +class network_zone_block_anonymized_proxies(Check): + """Ensure Okta actively blocks anonymized proxy sources before auth.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate whether an active blocklist covers anonymized proxies.""" + org_domain = network_zone_client.provider.identity.org_domain + missing_scope = network_zone_client.missing_scope.get("network_zones") + if missing_scope: + return [ + missing_network_zone_scope_finding( + self.metadata(), org_domain, missing_scope + ) + ] + + retrieval_error = getattr(network_zone_client, "retrieval_error", None) + if retrieval_error: + resource = NetworkZoneSummary( + id="network-zones-retrieval-error", + name="(retrieval failed)", + ) + report = CheckReportOkta( + metadata=self.metadata(), resource=resource, org_domain=org_domain + ) + report.status = "MANUAL" + report.status_extended = ( + "Okta Network Zones could not be retrieved or validated. " + f"Reason: {retrieval_error}" + ) + return [report] + + matching_zone, reason = compliant_anonymized_proxy_blocklist( + network_zone_client.network_zones + ) + manual_zone = ( + None + if matching_zone + else static_ip_blocklist_evidence(network_zone_client.network_zones) + ) + + resource = matching_zone or manual_zone or NetworkZoneSummary() + report = CheckReportOkta( + metadata=self.metadata(), resource=resource, org_domain=org_domain + ) + if matching_zone: + report.status = "PASS" + report.status_extended = ( + f"Okta Network Zone {matching_zone.name} is an {reason}." + ) + elif manual_zone: + report.status = "MANUAL" + report.status_extended = ( + f"Okta Network Zone {manual_zone.name} is an active manual IP " + "blocklist with gateway or proxy IP entries; Prowler cannot " + "verify full anonymizer coverage for static entries." + ) + else: + report.status = "FAIL" + report.status_extended = ( + "No active Okta Network Zone blocklist was found that blocks " + "anonymized proxies. Existing zones do not actively block gateway " + "or proxy IPs, nor an Enhanced Dynamic Zone anonymizer category." + ) + return [report] diff --git a/prowler/providers/okta/services/network/network_zone_client.py b/prowler/providers/okta/services/network/network_zone_client.py new file mode 100644 index 0000000000..6d4f603f80 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.network.network_zone_service import NetworkZone + +network_zone_client = NetworkZone(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/network/network_zone_service.py b/prowler/providers/okta/services/network/network_zone_service.py new file mode 100644 index 0000000000..550c9b0d49 --- /dev/null +++ b/prowler/providers/okta/services/network/network_zone_service.py @@ -0,0 +1,234 @@ +from typing import Optional + +from pydantic import BaseModel, Field, ValidationError + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate as _paginate_shared +from prowler.providers.okta.lib.service.raw_fetch import ( + get_json_paginated as _raw_get_json_paginated, +) +from prowler.providers.okta.lib.service.service import OktaService + +REQUIRED_SCOPES: dict[str, str] = { + "network_zones": "okta.networkZones.read", +} + + +def _value(value) -> str: + """Return plain string values from Okta SDK enums and raw strings.""" + if value is None: + return "" + enum_value = getattr(value, "value", None) + if enum_value is not None: + return str(enum_value) + return str(value) + + +class NetworkZone(OktaService): + """Fetches Okta Network Zones for STIG network-zone checks.""" + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + granted = set(getattr(provider.identity, "granted_scopes", None) or []) + self.missing_scope: dict[str, Optional[str]] = { + resource: (scope if granted and scope not in granted else None) + for resource, scope in REQUIRED_SCOPES.items() + } + self.retrieval_error: str | None = None + self.network_zones: dict[str, OktaNetworkZone] = ( + {} if self.missing_scope["network_zones"] else self._list_network_zones() + ) + + def _set_retrieval_error(self, message: str) -> None: + self.retrieval_error = message + logger.error(message) + + def _list_network_zones(self) -> dict[str, "OktaNetworkZone"]: + """List all Network Zones visible to the configured Okta service app.""" + logger.info("NetworkZone - Listing Okta Network Zones...") + try: + return self._run(self._fetch_all()) + except Exception as error: + line_number = getattr(error.__traceback__, "tb_lineno", "unknown") + self._set_retrieval_error( + f"{error.__class__.__name__}[{line_number}]: {error}" + ) + return {} + + async def _fetch_all(self) -> dict[str, "OktaNetworkZone"]: + result: dict[str, OktaNetworkZone] = {} + try: + all_zones, err = await _paginate_shared( + lambda after: self.client.list_network_zones(after=after, limit=200) + ) + except (ValueError, ValidationError) as ex: + # Upstream Okta SDK ↔ Management API schema drift: the SDK + # generates `EnhancedDynamicNetworkZoneAllOfAsnsInclude` as an + # object-shaped pydantic model, but the API returns + # `asns.include` as a JSON array (typically `[]`), so pydantic + # rejects the whole zone with `model_type` errors. Fall back + # to a raw-JSON fetch so STIG evaluation isn't blocked by an + # upstream SDK bug. Same workaround shape as + # `application_service._fetch_access_policy_raw`. The wider + # `(ValueError, ValidationError)` catch matches the + # `user_service` precedent — the SDK raises either depending + # on whether the failure is a discriminator miss or a model + # mismatch. + logger.warning( + f"Okta SDK raised {type(ex).__name__} parsing Network Zones — " + "falling back to raw-JSON parse. This is an okta-sdk-python " + "deserialization bug; the workaround should be removed once " + "upstream fixes it." + ) + return await self._fetch_all_raw() + if err is not None: + self._set_retrieval_error(f"Error listing Network Zones: {err}") + return result + + for zone in all_zones: + zone_obj = self._build_zone(zone) + result[zone_obj.id] = zone_obj + return result + + async def _fetch_all_raw(self) -> dict[str, "OktaNetworkZone"]: + """Raw-JSON fallback for `list_network_zones`. + + Bypasses the SDK's typed deserialization via the shared + `get_json_paginated` helper, then projects each zone onto our + own pydantic snapshot — which only validates the fields the + STIG checks actually read. + """ + result: dict[str, OktaNetworkZone] = {} + zones_data = await _raw_get_json_paginated( + self.client, + "/api/v1/zones", + page_size=200, + context="Network Zones", + ) + if zones_data is None: + self._set_retrieval_error( + "Raw Network Zones fetch failed; see logs for details." + ) + return result + for zone_dict in zones_data: + if not isinstance(zone_dict, dict): + continue + zone_obj = _raw_zone_to_model(zone_dict) + result[zone_obj.id] = zone_obj + return result + + @staticmethod + def _build_zone(zone) -> "OktaNetworkZone": + zone_id = _value(getattr(zone, "id", None)) + return OktaNetworkZone( + id=zone_id, + name=_value(getattr(zone, "name", None)) or zone_id, + status=_value(getattr(zone, "status", None)), + type=_value(getattr(zone, "type", None)), + usage=_value(getattr(zone, "usage", None)), + system=bool(getattr(zone, "system", False)), + gateways=_address_values(getattr(zone, "gateways", None)), + proxies=_address_values(getattr(zone, "proxies", None)), + asns=_condition_values(getattr(zone, "asns", None)), + locations=_condition_values(getattr(zone, "locations", None)), + ip_service_categories=_condition_values( + getattr(zone, "ip_service_categories", None) + ), + ) + + +def _raw_zone_to_model(zone_dict: dict) -> "OktaNetworkZone": + """Project a raw `/api/v1/zones` JSON zone onto our model. + + Mirrors `NetworkZone._build_zone` but reads camelCase JSON keys + (`ipServiceCategories`) instead of the SDK's snake_case attributes. + Used by the raw-JSON fallback that activates when the Okta SDK's + strict pydantic validators reject zone payloads the Management API + returns (e.g. Enhanced Dynamic Zones with `asns.include: []`). + """ + zone_id = str(zone_dict.get("id") or "") + categories = _condition_values(zone_dict.get("ipServiceCategories")) + # IP-typed zones return `gateways`/`proxies` as `[{type, value}]` + # arrays; Enhanced Dynamic Zones return `asns`/`locations` and + # `ipServiceCategories` as `{include, exclude}` objects. Keep the + # `list[str]` shape by extracting address values and included + # condition values from both SDK models and raw JSON. + return OktaNetworkZone( + id=zone_id, + name=str(zone_dict.get("name") or zone_id), + status=str(zone_dict.get("status") or ""), + type=str(zone_dict.get("type") or ""), + usage=str(zone_dict.get("usage") or ""), + system=bool(zone_dict.get("system", False)), + gateways=_address_values(zone_dict.get("gateways")), + proxies=_address_values(zone_dict.get("proxies")), + asns=_condition_values(zone_dict.get("asns")), + locations=_condition_values(zone_dict.get("locations")), + ip_service_categories=categories, + ) + + +def _address_values(raw) -> list[str]: + """Return string values from an Okta address-style JSON array. + + Each entry in `gateways`/`proxies` is `{"type": ..., "value": ...}`; + `asns`/`locations` may be a `{include, exclude}` object on Enhanced + Dynamic Zones. Non-list inputs collapse to `[]` so the resulting + list satisfies the pydantic `list[str]` field. + """ + if not isinstance(raw, list): + return [] + out: list[str] = [] + for entry in raw: + if isinstance(entry, dict): + value = entry.get("value") + elif entry is not None: + value = getattr(entry, "value", entry) + else: + value = None + if value is not None: + out.append(_value(value)) + return out + + +def _condition_values(raw) -> list[str]: + """Return string values from Okta include/exclude-style conditions.""" + if raw is None: + return [] + values = ( + raw.get("include") if isinstance(raw, dict) else getattr(raw, "include", raw) + ) + if values is None: + return [] + if not isinstance(values, list): + values = [values] + normalized = [] + for value in values: + if isinstance(value, dict): + value = value.get("value") + if value is not None: + normalized.append(_value(value)) + return normalized + + +class OktaNetworkZone(BaseModel): + """Normalized Okta Network Zone attributes used by checks.""" + + id: str + name: str + status: str = "" + type: str = "" + usage: str = "" + system: bool = False + gateways: list[str] = Field(default_factory=list) + proxies: list[str] = Field(default_factory=list) + asns: list[str] = Field(default_factory=list) + locations: list[str] = Field(default_factory=list) + ip_service_categories: list[str] = Field(default_factory=list) + + +class NetworkZoneSummary(BaseModel): + """Synthetic resource for org-level Network Zone findings.""" + + id: str = "okta-network-zones" + name: str = "Okta Network Zones" diff --git a/tests/providers/okta/okta_fixtures.py b/tests/providers/okta/okta_fixtures.py index 83c8812495..0ecb4a9eef 100644 --- a/tests/providers/okta/okta_fixtures.py +++ b/tests/providers/okta/okta_fixtures.py @@ -20,6 +20,7 @@ def set_mocked_okta_provider( "okta.policies.read", "okta.brands.read", "okta.apps.read", + "okta.networkZones.read", "okta.logStreams.read", "okta.idps.read", ], @@ -33,6 +34,7 @@ def set_mocked_okta_provider( "okta.policies.read", "okta.brands.read", "okta.apps.read", + "okta.networkZones.read", "okta.logStreams.read", "okta.idps.read", ], diff --git a/tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py b/tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py new file mode 100644 index 0000000000..89cc2fd8ed --- /dev/null +++ b/tests/providers/okta/services/network_zone/network_zone_block_anonymized_proxies/network_zone_block_anonymized_proxies_test.py @@ -0,0 +1,153 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.network_zone.network_zone_fixtures import ( + build_network_zone_client, + network_zone, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.network." + "network_zone_block_anonymized_proxies." + "network_zone_block_anonymized_proxies.network_zone_client" +) + + +def _run_check(network_zone_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=network_zone_client), + ): + from prowler.providers.okta.services.network.network_zone_block_anonymized_proxies.network_zone_block_anonymized_proxies import ( + network_zone_block_anonymized_proxies, + ) + + return network_zone_block_anonymized_proxies().execute() + + +class Test_network_zone_block_anonymized_proxies: + def test_no_zones_fails(self): + findings = _run_check(build_network_zone_client({})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "No active Okta Network Zone blocklist" in findings[0].status_extended + + def test_missing_network_zone_scope_is_manual(self): + findings = _run_check( + build_network_zone_client( + {}, + missing_scope={"network_zones": "okta.networkZones.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.networkZones.read" in findings[0].status_extended + + def test_sdk_network_zone_retrieval_error_is_manual(self): + findings = _run_check( + build_network_zone_client( + {}, + retrieval_error="Error listing Network Zones: forbidden", + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "could not be retrieved" in findings[0].status_extended + assert "forbidden" in findings[0].status_extended + + def test_raw_network_zone_retrieval_error_is_manual(self): + findings = _run_check( + build_network_zone_client( + {}, + retrieval_error="Raw Network Zones fetch (execute) failed: timeout", + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "could not be retrieved" in findings[0].status_extended + assert "timeout" in findings[0].status_extended + + def test_active_ip_blocklist_gateway_is_manual(self): + zone = network_zone(gateways=["198.51.100.10/32"]) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert findings[0].resource_id == zone.id + assert "manual IP blocklist" in findings[0].status_extended + assert "cannot verify full anonymizer coverage" in findings[0].status_extended + + def test_pass_with_active_enhanced_dynamic_anonymizer_blocklist(self): + zone = network_zone( + zone_id="nzo-enhanced", + name="DefaultEnhancedDynamicZone", + zone_type="DYNAMIC_V2", + system=True, + ip_service_categories=["ANONYMIZER"], + ) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert "Enhanced Dynamic" in findings[0].status_extended + + def test_pass_with_custom_enhanced_dynamic_anonymizer_blocklist(self): + zone = network_zone( + zone_id="nzo-custom-enhanced", + name="Custom Anonymous VPN Blocklist", + zone_type="DYNAMIC_V2", + system=False, + ip_service_categories=["VPN"], + ) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert "Enhanced Dynamic" in findings[0].status_extended + + def test_dynamic_blocklist_without_anonymizer_category_fails(self): + zone = network_zone( + zone_id="nzo-dynamic", + name="Dynamic Blocklist Without Anonymizers", + zone_type="DYNAMIC", + ip_service_categories=["RISKY_IPS"], + ) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "do not actively block" in findings[0].status_extended + + def test_default_enhanced_dynamic_zone_without_categories_fails(self): + zone = network_zone( + zone_id="nzo-default-enhanced", + name="DefaultEnhancedDynamicZone", + zone_type="DYNAMIC_V2", + system=True, + ip_service_categories=[], + ) + findings = _run_check(build_network_zone_client({zone.id: zone})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "do not actively block" in findings[0].status_extended + + def test_existing_zones_without_anonymized_proxy_blocklist_fail(self): + policy_zone = network_zone( + zone_id="nzo-policy", + name="Corporate Policy Zone", + usage="POLICY", + gateways=["10.0.0.0/8"], + ) + inactive_blocklist = network_zone( + zone_id="nzo-inactive", + name="Inactive Blocklist", + status="INACTIVE", + gateways=["203.0.113.0/24"], + ) + findings = _run_check( + build_network_zone_client( + {policy_zone.id: policy_zone, inactive_blocklist.id: inactive_blocklist} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "do not actively block" in findings[0].status_extended diff --git a/tests/providers/okta/services/network_zone/network_zone_fixtures.py b/tests/providers/okta/services/network_zone/network_zone_fixtures.py new file mode 100644 index 0000000000..09328a5d73 --- /dev/null +++ b/tests/providers/okta/services/network_zone/network_zone_fixtures.py @@ -0,0 +1,42 @@ +from unittest import mock + +from prowler.providers.okta.services.network.network_zone_service import OktaNetworkZone +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_network_zone_client( + zones: dict = None, + missing_scope: dict = None, + retrieval_error: str | None = None, +): + client = mock.MagicMock() + client.network_zones = zones or {} + client.missing_scope = missing_scope or {"network_zones": None} + client.retrieval_error = retrieval_error + client.provider = set_mocked_okta_provider() + return client + + +def network_zone( + zone_id: str = "nzo-1", + name: str = "BlockedIpZone", + *, + status: str = "ACTIVE", + zone_type: str = "IP", + usage: str = "BLOCKLIST", + system: bool = False, + gateways: list[str] = None, + proxies: list[str] = None, + ip_service_categories: list[str] = None, +): + return OktaNetworkZone( + id=zone_id, + name=name, + status=status, + type=zone_type, + usage=usage, + system=system, + gateways=gateways or [], + proxies=proxies or [], + ip_service_categories=ip_service_categories or [], + ) diff --git a/tests/providers/okta/services/network_zone/network_zone_service_test.py b/tests/providers/okta/services/network_zone/network_zone_service_test.py new file mode 100644 index 0000000000..36790c1328 --- /dev/null +++ b/tests/providers/okta/services/network_zone/network_zone_service_test.py @@ -0,0 +1,560 @@ +import json +from types import SimpleNamespace +from unittest import mock + +from pydantic import ValidationError + +from prowler.providers.okta.models import OktaIdentityInfo +from prowler.providers.okta.services.network.network_zone_service import ( + NetworkZone, + OktaNetworkZone, + _raw_zone_to_model, + _value, +) +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + return SimpleNamespace(headers=headers or {}) + + +def _sdk_zone( + zone_id: str, + name: str, + *, + status: str = "ACTIVE", + zone_type: str = "IP", + usage: str = "BLOCKLIST", + system: bool = False, + gateways: list[str] = None, + proxies: list[str] = None, + ip_service_categories: list[str] = None, +): + return SimpleNamespace( + id=zone_id, + name=name, + status=status, + type=zone_type, + usage=usage, + system=system, + gateways=gateways or [], + proxies=proxies or [], + ip_service_categories=ip_service_categories or [], + ) + + +class _ValueObject: + def __init__(self, value: str): + self.value = value + + +class Test_value_helper: + def test_value_returns_empty_string_for_none(self): + assert _value(None) == "" + + +class Test_NetworkZone_service: + def test_fetches_ip_and_enhanced_dynamic_zones(self): + provider = set_mocked_okta_provider() + ip_zone = _sdk_zone( + "nzo-ip", + "Blocked IPs", + gateways=["203.0.113.10/32"], + ) + enhanced_zone = _sdk_zone( + "nzo-enhanced", + "DefaultEnhancedDynamicZone", + zone_type="DYNAMIC_V2", + system=True, + ip_service_categories=["ANONYMIZER"], + ) + + async def fake_list_network_zones(*_a, **_k): + return ([ip_zone, enhanced_zone], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + + service = NetworkZone(provider) + + assert set(service.network_zones.keys()) == {"nzo-ip", "nzo-enhanced"} + assert isinstance(service.network_zones["nzo-ip"], OktaNetworkZone) + assert service.network_zones["nzo-ip"].gateways == ["203.0.113.10/32"] + assert service.network_zones["nzo-enhanced"].type == "DYNAMIC_V2" + assert service.network_zones["nzo-enhanced"].ip_service_categories == [ + "ANONYMIZER" + ] + + def test_paginates_network_zones(self): + provider = set_mocked_okta_provider() + page_1 = _sdk_zone("nzo-1", "First") + page_2 = _sdk_zone("nzo-2", "Second") + next_link = '; rel="next"' + calls = [] + + async def fake_list_network_zones(*_a, **kwargs): + calls.append(kwargs.get("after")) + if kwargs.get("after") is None: + return ([page_1], _resp({"link": next_link}), None) + return ([page_2], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = NetworkZone(provider) + + assert calls == [None, "cursor-2"] + assert set(service.network_zones.keys()) == {"nzo-1", "nzo-2"} + + def test_preserves_sdk_error_reason_on_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(*_a, **_k): + return ([], _resp({}), Exception("forbidden")) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = failing + mocked_client_cls.return_value = mocked + service = NetworkZone(provider) + + assert service.network_zones == {} + assert service.retrieval_error == "Error listing Network Zones: forbidden" + + def test_build_zone_extracts_sdk_network_zone_address_values(self): + from okta.models.network_zone_address import NetworkZoneAddress + + zone = _sdk_zone( + "nzo-ip", + "Blocked IPs", + gateways=[ + NetworkZoneAddress( + type="CIDR", + value="203.0.113.10/32", + ) + ], + proxies=[ + NetworkZoneAddress( + type="CIDR", + value="198.51.100.10/32", + ) + ], + ) + + built_zone = NetworkZone._build_zone(zone) + + assert built_zone.gateways == ["203.0.113.10/32"] + assert built_zone.proxies == ["198.51.100.10/32"] + + def test_build_zone_normalizes_sdk_value_objects_to_strings(self): + zone = _sdk_zone( + "nzo-sdk-values", + "SDK Values", + gateways=[_ValueObject("203.0.113.10/32")], + proxies=[_ValueObject("198.51.100.10/32")], + ) + zone.asns = SimpleNamespace(include=[_ValueObject("64512")], exclude=[]) + zone.locations = SimpleNamespace(include=[_ValueObject("US")], exclude=[]) + + built_zone = NetworkZone._build_zone(zone) + + assert built_zone.gateways == ["203.0.113.10/32"] + assert built_zone.proxies == ["198.51.100.10/32"] + assert built_zone.asns == ["64512"] + assert built_zone.locations == ["US"] + + def test_build_zone_extracts_sdk_enhanced_dynamic_category_values(self): + from okta.models.enhanced_dynamic_network_zone_all_of_ip_service_categories import ( + EnhancedDynamicNetworkZoneAllOfIpServiceCategories, + ) + + zone = _sdk_zone( + "nzo-enhanced", + "Enhanced Anonymizers", + zone_type="DYNAMIC_V2", + system=False, + ) + zone.ip_service_categories = EnhancedDynamicNetworkZoneAllOfIpServiceCategories( + include=["ALL_ANONYMIZERS"], + exclude=[], + ) + + built_zone = NetworkZone._build_zone(zone) + + assert built_zone.ip_service_categories == ["ALL_ANONYMIZERS"] + + def test_missing_network_zone_scope_skips_api_call(self): + provider = set_mocked_okta_provider( + identity=OktaIdentityInfo( + org_domain="acme.okta.com", + client_id="0oa1234567890abcdef", + granted_scopes=["okta.policies.read", "okta.brands.read"], + ) + ) + + async def fail_if_called(*_a, **_k): + raise AssertionError("list_network_zones should not be called") + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_network_zones = fail_if_called + mocked_client_cls.return_value = mocked + service = NetworkZone(provider) + + assert service.missing_scope["network_zones"] == "okta.networkZones.read" + assert service.network_zones == {} + + +class Test_NetworkZone_service_sdk_validation_fallback: + """Verifies the raw-JSON fallback for the Okta SDK Enhanced Dynamic + Zone deserialization bug. + + The Okta Management API returns `asns.include` as a JSON array + (typically `[]`) but the SDK's `EnhancedDynamicNetworkZoneAllOfAsnsInclude` + is an object-shaped pydantic model — so listing zones raises + ValidationError. Without a fallback the whole fetch crashes and + every check FAILs as if no zones exist; with the fallback we parse + the raw JSON and STIG evaluation continues. + """ + + @staticmethod + def _trigger_real_validation_error() -> ValidationError: + try: + from okta.models.enhanced_dynamic_network_zone_all_of_asns_include import ( # noqa: E501 + EnhancedDynamicNetworkZoneAllOfAsnsInclude, + ) + + EnhancedDynamicNetworkZoneAllOfAsnsInclude.from_dict([]) + except ValidationError as ve: + return ve + raise AssertionError("Expected pydantic ValidationError from Okta SDK model") + + def _build_service_with_raw_payload( + self, raw_zones_payload, response=None, body_factory=None + ): + response_body = ( + body_factory(raw_zones_payload) + if body_factory + else json.dumps(raw_zones_payload) + ) + return self._build_service_with_raw_response(response_body, response=response) + + def _build_service_with_raw_response( + self, response_body, response=None, execute_error=None + ): + provider = set_mocked_okta_provider() + ve = self._trigger_real_validation_error() + + async def failing_list_network_zones(*_a, **_k): + raise ve + + async def fake_raw_create(*_a, **_k): + return ({"url": "/api/v1/zones"}, None) + + async def fake_raw_execute(_request): + return (response, response_body, execute_error) + + sdk_mock = mock.MagicMock() + sdk_mock.list_network_zones = failing_list_network_zones + sdk_mock._request_executor.create_request = fake_raw_create + sdk_mock._request_executor.execute = fake_raw_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk_mock, + ): + return NetworkZone(provider) + + def test_raw_fallback_projects_ip_and_enhanced_dynamic_zones(self): + zones_payload = [ + { + "id": "nzo-ip", + "name": "Blocked IPs", + "status": "ACTIVE", + "type": "IP", + "usage": "BLOCKLIST", + "system": False, + "gateways": [{"type": "CIDR", "value": "203.0.113.10/32"}], + "proxies": [], + }, + { + "id": "nzo-enhanced", + "name": "DefaultEnhancedDynamicZone", + "status": "ACTIVE", + "type": "DYNAMIC_V2", + "usage": "BLOCKLIST", + "system": True, + "asns": {"include": [], "exclude": []}, + "locations": {"include": [], "exclude": []}, + "ipServiceCategories": [{"value": "ANONYMIZER"}], + }, + ] + + service = self._build_service_with_raw_payload(zones_payload) + + assert set(service.network_zones.keys()) == {"nzo-ip", "nzo-enhanced"} + ip_zone = service.network_zones["nzo-ip"] + assert ip_zone.type == "IP" + assert ip_zone.gateways == ["203.0.113.10/32"] + enhanced = service.network_zones["nzo-enhanced"] + assert enhanced.type == "DYNAMIC_V2" + assert enhanced.system is True + assert enhanced.ip_service_categories == ["ANONYMIZER"] + assert enhanced.asns == [] + assert enhanced.locations == [] + + def test_raw_fallback_handles_empty_payload(self): + service = self._build_service_with_raw_payload([]) + assert service.network_zones == {} + + def test_raw_fallback_handles_executor_error(self): + provider = set_mocked_okta_provider() + ve = self._trigger_real_validation_error() + + async def failing_list_network_zones(*_a, **_k): + raise ve + + async def fake_raw_create(*_a, **_k): + return (None, Exception("network down")) + + sdk_mock = mock.MagicMock() + sdk_mock.list_network_zones = failing_list_network_zones + sdk_mock._request_executor.create_request = fake_raw_create + sdk_mock._request_executor.execute = mock.AsyncMock() + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk_mock, + ): + service = NetworkZone(provider) + + assert service.network_zones == {} + assert service.retrieval_error == ( + "Raw Network Zones fetch failed; see logs for details." + ) + + def test_raw_fallback_handles_execute_error(self): + service = self._build_service_with_raw_response( + None, + execute_error=Exception("timeout"), + ) + + assert service.network_zones == {} + assert service.retrieval_error == ( + "Raw Network Zones fetch failed; see logs for details." + ) + + def test_raw_fallback_decodes_bytes_response_body(self): + service = self._build_service_with_raw_payload( + [ + { + "id": "nzo-bytes", + "name": "Bytes", + "status": "ACTIVE", + "type": "IP", + "usage": "BLOCKLIST", + } + ], + body_factory=lambda payload: json.dumps(payload).encode("utf-8"), + ) + + assert set(service.network_zones.keys()) == {"nzo-bytes"} + + def test_raw_fallback_handles_invalid_utf8_response_body(self): + service = self._build_service_with_raw_response(b"\xff") + + assert service.network_zones == {} + assert service.retrieval_error == ( + "Raw Network Zones fetch failed; see logs for details." + ) + + def test_raw_fallback_handles_invalid_json_response_body(self): + service = self._build_service_with_raw_response("{") + + assert service.network_zones == {} + assert service.retrieval_error == ( + "Raw Network Zones fetch failed; see logs for details." + ) + + def test_raw_fallback_handles_unexpected_payload_shape(self): + service = self._build_service_with_raw_payload({"id": "nzo-not-a-list"}) + + assert service.network_zones == {} + assert service.retrieval_error == ( + "Raw Network Zones fetch failed; see logs for details." + ) + + def test_raw_fallback_skips_non_dict_payload_items(self): + service = self._build_service_with_raw_payload( + [ + "not-a-zone", + { + "id": "nzo-valid", + "name": "Valid", + "status": "ACTIVE", + "type": "IP", + "usage": "BLOCKLIST", + }, + ] + ) + + assert set(service.network_zones.keys()) == {"nzo-valid"} + + def test_raw_fallback_paginates_via_link_header(self): + next_link = '; rel="next"' + page_1 = [ + { + "id": "nzo-1", + "name": "First", + "status": "ACTIVE", + "type": "IP", + "usage": "BLOCKLIST", + } + ] + page_2 = [ + { + "id": "nzo-2", + "name": "Second", + "status": "ACTIVE", + "type": "IP", + "usage": "BLOCKLIST", + } + ] + + provider = set_mocked_okta_provider() + ve = self._trigger_real_validation_error() + execute_calls = [] + + async def failing_list_network_zones(*_a, **_k): + raise ve + + async def fake_raw_create(*_a, **kwargs): + return ({"url": kwargs.get("url", "")}, None) + + async def fake_raw_execute(request): + execute_calls.append(request) + if len(execute_calls) == 1: + return ( + SimpleNamespace(headers={"link": next_link}), + json.dumps(page_1), + None, + ) + return (SimpleNamespace(headers={}), json.dumps(page_2), None) + + sdk_mock = mock.MagicMock() + sdk_mock.list_network_zones = failing_list_network_zones + sdk_mock._request_executor.create_request = fake_raw_create + sdk_mock._request_executor.execute = fake_raw_execute + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient", + return_value=sdk_mock, + ): + service = NetworkZone(provider) + + assert len(execute_calls) == 2 + assert "after=cursor-2" in execute_calls[1]["url"] + assert set(service.network_zones.keys()) == {"nzo-1", "nzo-2"} + + +class Test_raw_zone_to_model: + def test_extracts_address_values_and_categories(self): + zone = _raw_zone_to_model( + { + "id": "nzo-ip", + "name": "IPs", + "status": "ACTIVE", + "type": "IP", + "usage": "BLOCKLIST", + "system": False, + "gateways": [ + {"type": "CIDR", "value": "203.0.113.0/24"}, + {"type": "RANGE", "value": "198.51.100.5-198.51.100.10"}, + ], + "proxies": [{"type": "CIDR", "value": "192.0.2.0/24"}], + "ipServiceCategories": [ + {"value": "ANONYMIZER"}, + {"value": "TOR_ANONYMIZER"}, + ], + } + ) + assert zone.gateways == [ + "203.0.113.0/24", + "198.51.100.5-198.51.100.10", + ] + assert zone.proxies == ["192.0.2.0/24"] + assert zone.ip_service_categories == ["ANONYMIZER", "TOR_ANONYMIZER"] + + def test_collapses_non_list_asns_and_locations_to_empty(self): + zone = _raw_zone_to_model( + { + "id": "nzo-enhanced", + "name": "Enhanced", + "type": "DYNAMIC_V2", + "asns": {"include": [], "exclude": []}, + "locations": {"include": [], "exclude": []}, + } + ) + assert zone.asns == [] + assert zone.locations == [] + assert isinstance(zone, OktaNetworkZone) + + def test_extracts_ip_service_categories_from_raw_include_condition(self): + zone = _raw_zone_to_model( + { + "id": "nzo-enhanced", + "name": "Enhanced", + "type": "DYNAMIC_V2", + "ipServiceCategories": { + "include": ["ALL_ANONYMIZERS"], + "exclude": [], + }, + } + ) + assert zone.ip_service_categories == ["ALL_ANONYMIZERS"] + + def test_extracts_scalar_ip_service_category_condition(self): + zone = _raw_zone_to_model( + { + "id": "nzo-enhanced", + "name": "Enhanced", + "type": "DYNAMIC_V2", + "ipServiceCategories": { + "include": {"value": "VPN_ANONYMIZER"}, + "exclude": [], + }, + } + ) + assert zone.ip_service_categories == ["VPN_ANONYMIZER"] + + def test_ignores_none_address_entries_and_empty_condition_values(self): + zone = _raw_zone_to_model( + { + "id": "nzo-ip", + "gateways": [None], + "ipServiceCategories": { + "include": None, + "exclude": [], + }, + } + ) + assert zone.gateways == [] + assert zone.ip_service_categories == [] + + def test_falls_back_name_to_id_when_missing(self): + zone = _raw_zone_to_model({"id": "nzo-1"}) + assert zone.id == "nzo-1" + assert zone.name == "nzo-1" + assert zone.status == "" + assert zone.system is False