From 0ea2f6d67ee54a6e3e05f118fdb309dd4e18d117 Mon Sep 17 00:00:00 2001 From: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com> Date: Mon, 8 Jun 2026 17:11:54 +0200 Subject: [PATCH] feat(okta): add API token STIG checks (#11464) Co-authored-by: Daniel Barranquero --- .../providers/okta/authentication.mdx | 16 +- .../providers/okta/getting-started-okta.mdx | 18 +- prowler/CHANGELOG.md | 1 + prowler/providers/okta/okta_provider.py | 3 + .../okta/services/apitoken/__init__.py | 0 .../services/apitoken/api_token_client.py | 4 + .../services/apitoken/api_token_service.py | 327 ++++++++++ .../apitoken_not_super_admin/__init__.py | 0 .../apitoken_not_super_admin.metadata.json | 37 ++ .../apitoken_not_super_admin.py | 70 ++ .../__init__.py | 0 ...n_restricted_to_network_zone.metadata.json | 37 ++ .../apitoken_restricted_to_network_zone.py | 55 ++ .../okta/services/apitoken/lib/__init__.py | 0 .../apitoken/lib/api_token_helpers.py | 140 ++++ .../okta/lib/service/pagination_test.py | 147 +++++ tests/providers/okta/okta_fixtures.py | 6 + .../services/api_token/api_token_fixtures.py | 46 ++ .../api_token/api_token_service_test.py | 616 ++++++++++++++++++ .../apitoken_not_super_admin_test.py | 99 +++ ...pitoken_restricted_to_network_zone_test.py | 114 ++++ 21 files changed, 1724 insertions(+), 12 deletions(-) create mode 100644 prowler/providers/okta/services/apitoken/__init__.py create mode 100644 prowler/providers/okta/services/apitoken/api_token_client.py create mode 100644 prowler/providers/okta/services/apitoken/api_token_service.py create mode 100644 prowler/providers/okta/services/apitoken/apitoken_not_super_admin/__init__.py create mode 100644 prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json create mode 100644 prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py create mode 100644 prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/__init__.py create mode 100644 prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json create mode 100644 prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py create mode 100644 prowler/providers/okta/services/apitoken/lib/__init__.py create mode 100644 prowler/providers/okta/services/apitoken/lib/api_token_helpers.py create mode 100644 tests/providers/okta/lib/service/pagination_test.py create mode 100644 tests/providers/okta/services/api_token/api_token_fixtures.py create mode 100644 tests/providers/okta/services/api_token/api_token_service_test.py create mode 100644 tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py create mode 100644 tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py diff --git a/docs/user-guide/providers/okta/authentication.mdx b/docs/user-guide/providers/okta/authentication.mdx index 67eb72e153..74e8ef7805 100644 --- a/docs/user-guide/providers/okta/authentication.mdx +++ b/docs/user-guide/providers/okta/authentication.mdx @@ -36,6 +36,9 @@ The bundled checks require the following read-only scopes: - `okta.brands.read` - `okta.apps.read` - `okta.networkZones.read` +- `okta.apiTokens.read` +- `okta.roles.read` +- `okta.groups.read` - `okta.logStreams.read` - `okta.idps.read` @@ -46,7 +49,10 @@ Additional scopes will be needed as more services and checks are added. These ar | `okta.policies.read` | Sign-on, password, authentication, and `USER_LIFECYCLE` (Workflow > Automations) policies | | `okta.brands.read` | Sign-in page customizations (DOD Notice and Consent Banner check) | | `okta.apps.read` | First-party app settings (Okta Admin Console session), integrated app inventory, and the Authentication Policies bound to Okta applications | -| `okta.networkZones.read` | Network Zone inventory and anonymized-proxy blocklist checks | +| `okta.networkZones.read` | Network Zone inventory, anonymized-proxy blocklist checks, and API token Network Zone validation | +| `okta.apiTokens.read` | API token metadata and token network conditions | +| `okta.roles.read` | Admin role assignments for API token owners (both direct and group-inherited) | +| `okta.groups.read` | Group memberships of API token owners, used to resolve admin roles inherited via group assignment (e.g. Super Admin granted through the default admin group) | | `okta.logStreams.read` | Log Stream configuration (`/api/v1/logStreams`) | | `okta.idps.read` | Identity Providers, including Smart Card (X509) IdPs (`/api/v1/idps`) | @@ -130,7 +136,7 @@ Okta displays the private key **only once**. If you close the modal without copy ### 5. Grant the required OAuth scopes -On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled checks require `okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.logStreams.read`, and `okta.idps.read`. +On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled checks require `okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.apiTokens.read`, `okta.roles.read`, `okta.groups.read`, `okta.logStreams.read`, and `okta.idps.read`. ![Okta — grant OAuth scopes](/user-guide/providers/okta/images/grant-permissions.png) @@ -166,8 +172,8 @@ export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem" # or export OKTA_PRIVATE_KEY="$(cat /secure/path/to/prowler-okta.pem)" -# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" +# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.apiTokens.read,okta.roles.read,okta.groups.read,okta.logStreams.read,okta.idps.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.apiTokens.read,okta.roles.read,okta.groups.read,okta.logStreams.read,okta.idps.read" uv run python prowler-cli.py okta ``` @@ -208,7 +214,7 @@ Prowler validates credentials at startup by listing one sign-on policy. This err Raised when the credential probe succeeds at the OAuth layer but the request is rejected because the service app lacks the required scope or admin role: -- **`invalid_scope`** — one of the requested scopes (`okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.logStreams.read`, and `okta.idps.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**. +- **`invalid_scope`** — one of the requested scopes (`okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.apiTokens.read`, `okta.roles.read`, `okta.groups.read`, `okta.logStreams.read`, and `okta.idps.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**. - **`Forbidden` / `not authorized`** — no admin role is assigned to the service app. Assign **Read-Only Administrator** (or **Super Administrator** for the first-party application checks) from **Admin roles**. ### Application-service checks return MANUAL on first-party apps diff --git a/docs/user-guide/providers/okta/getting-started-okta.mdx b/docs/user-guide/providers/okta/getting-started-okta.mdx index 740cfbf729..32c08433b7 100644 --- a/docs/user-guide/providers/okta/getting-started-okta.mdx +++ b/docs/user-guide/providers/okta/getting-started-okta.mdx @@ -12,7 +12,7 @@ Set up authentication for Okta with the [Okta Authentication](/user-guide/provid - An Okta organization. The UI examples below use **Identity Engine** terminology such as **Global Session Policy**; Classic Engine exposes the equivalent sign-on policy concepts under older names. - A **Super Administrator** account on that organization for the one-time service-app setup. -- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.logStreams.read`, and `okta.idps.read` scopes granted and an admin role assigned. **Read-Only Administrator** covers the Sign-On, Network, User, System Log, and Identity Provider checks, and runs the per-app application network-zone check against the apps the service app can see (under Read-Only Administrator that is typically only the service app's own row — the rest of the org's app inventory stays invisible). **Super Administrator** is required additionally to evaluate the five first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, phishing-resistant authentication) and to widen the application network-zone check to the full app inventory — see [Okta Authentication](/user-guide/providers/okta/authentication#required-admin-role) for the full breakdown. +- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read`, `okta.brands.read`, `okta.apps.read`, `okta.networkZones.read`, `okta.apiTokens.read`, `okta.roles.read`, `okta.groups.read`, `okta.logStreams.read`, and `okta.idps.read` scopes granted and an admin role assigned. **Read-Only Administrator** covers the Sign-On, Network, API Token, User, System Log, and Identity Provider checks, and runs the per-app application network-zone check against the apps the service app can see (under Read-Only Administrator that is typically only the service app's own row — the rest of the org's app inventory stays invisible). **Super Administrator** is required additionally to evaluate the five first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, phishing-resistant authentication) and to widen the application network-zone check to the full app inventory — see [Okta Authentication](/user-guide/providers/okta/authentication#required-admin-role) for the full breakdown. - Python 3.10+ and Prowler 5.27.0 or later installed locally. @@ -85,8 +85,8 @@ Follow the [Okta Authentication](/user-guide/providers/okta/authentication) guid export OKTA_ORG_DOMAIN="acme.okta.com" export OKTA_CLIENT_ID="0oa1234567890abcdef" export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem" -# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read" +# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.apiTokens.read,okta.roles.read,okta.groups.read,okta.logStreams.read,okta.idps.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.apiTokens.read,okta.roles.read,okta.groups.read,okta.logStreams.read,okta.idps.read" ``` The private key file may contain either a PEM-encoded RSA key or a JWK JSON document. @@ -148,6 +148,7 @@ Prowler for Okta includes security checks across the following services: | **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) | | **Application** | Okta Admin Console sign-on settings plus Authentication Policy controls for Okta applications (session idle, MFA, phishing resistance, network zones) | | **Network** | Network Zone blocklists for anonymized proxy sources | +| **API Token** | API token owner-role validation and Network Zone restrictions | | **User** | User lifecycle automations (inactivity-based deprovisioning) | | **System Log** | Log Stream configuration that off-loads audit records to a central SIEM | | **Identity Provider** | Identity Providers, including Smart Card (X509) IdP status and certificate-chain visibility | @@ -162,25 +163,28 @@ This is stricter than simply finding the same timeout value somewhere else in th ### Default Scopes -Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On, Application, Network, User, System Log, and Identity Provider services: +Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On, Application, Network, API Token, User, System Log, and Identity Provider services: - `okta.policies.read` - `okta.brands.read` - `okta.apps.read` - `okta.networkZones.read` +- `okta.apiTokens.read` +- `okta.roles.read` +- `okta.groups.read` - `okta.logStreams.read` - `okta.idps.read` -The service app must have these scopes granted in the **Okta API Scopes** tab. When the granted set is narrower than the requested set, the token request fails with an `invalid_scope` error and the scan stops at provider initialization. +The service app must have these scopes granted in the **Okta API Scopes** tab. `okta.groups.read` is required so the API token Super Admin check can resolve admin roles inherited via group membership; without it the check falls back to direct-only role assignments and emits a best-effort caveat. When the granted set is narrower than the requested set, the token request fails with an `invalid_scope` error and the scan stops at provider initialization. When additional checks are enabled — or when running against a service app that exposes a different scope set — override the default with `OKTA_SCOPES` (comma-separated string for the env var) or `--okta-scopes` (space-separated list for the CLI): ```bash # Environment variable — comma-separated -export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.logStreams.read,okta.idps.read,okta.users.read" +export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read,okta.networkZones.read,okta.apiTokens.read,okta.roles.read,okta.groups.read,okta.logStreams.read,okta.idps.read,okta.users.read" # CLI flag — space-separated -prowler okta --okta-scopes okta.policies.read okta.brands.read okta.apps.read okta.networkZones.read okta.logStreams.read okta.idps.read okta.users.read +prowler okta --okta-scopes okta.policies.read okta.brands.read okta.apps.read okta.networkZones.read okta.apiTokens.read okta.roles.read okta.groups.read okta.logStreams.read okta.idps.read okta.users.read ``` For the full catalog of OAuth scopes exposed by the Okta Management API, refer to the [Okta OAuth 2.0 scopes documentation](https://developer.okta.com/docs/api/oauth2/). diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index fab236e9e1..7893dd36b3 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `sagemaker_models_monitor_enabled` check for AWS provider, verifying that each SageMaker monitoring schedule is in the `Scheduled` state so data and model drift is actively detected [(#11278)](https://github.com/prowler-cloud/prowler/pull/11278) - DORA (Digital Operational Resilience Act, Regulation (EU) 2022/2554) universal compliance framework with AWS provider coverage across the five DORA pillars [(#11131)](https://github.com/prowler-cloud/prowler/pull/11131) - Okta network zone check to detect whether anonymized proxy traffic is blocked [(#11463)](https://github.com/prowler-cloud/prowler/pull/11463) +- Okta API token checks for super admin ownership and network zone restrictions [(#11464)](https://github.com/prowler-cloud/prowler/pull/11464) - `elbv2_alb_drop_invalid_header_fields_enabled` check for AWS provider, verifying Application Load Balancers have `routing.http.drop_invalid_header_fields.enabled` set to `true` to mitigate HTTP desync attacks (AWS FSBP ELB.4) [(#11471)](https://github.com/prowler-cloud/prowler/pull/11471) - `user`, `systemlog` and `idp` service for Okta provider with `user_inactivity_automation_35d_enabled`, `systemlog_streaming_enabled` and `idp_smart_card_dod_approved_ca` checks [(#11496)](https://github.com/prowler-cloud/prowler/pull/11496) diff --git a/prowler/providers/okta/okta_provider.py b/prowler/providers/okta/okta_provider.py index 4356adbc57..e5c968e947 100644 --- a/prowler/providers/okta/okta_provider.py +++ b/prowler/providers/okta/okta_provider.py @@ -37,6 +37,9 @@ DEFAULT_SCOPES = [ "okta.brands.read", "okta.apps.read", "okta.networkZones.read", + "okta.apiTokens.read", + "okta.roles.read", + "okta.groups.read", "okta.logStreams.read", "okta.idps.read", ] diff --git a/prowler/providers/okta/services/apitoken/__init__.py b/prowler/providers/okta/services/apitoken/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/api_token_client.py b/prowler/providers/okta/services/apitoken/api_token_client.py new file mode 100644 index 0000000000..fbe10d7c7f --- /dev/null +++ b/prowler/providers/okta/services/apitoken/api_token_client.py @@ -0,0 +1,4 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.okta.services.apitoken.api_token_service import ApiToken + +api_token_client = ApiToken(Provider.get_global_provider()) diff --git a/prowler/providers/okta/services/apitoken/api_token_service.py b/prowler/providers/okta/services/apitoken/api_token_service.py new file mode 100644 index 0000000000..5fafd71c35 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/api_token_service.py @@ -0,0 +1,327 @@ +from typing import Optional + +from pydantic import BaseModel, Field, ValidationError + +from prowler.lib.logger import logger +from prowler.providers.okta.lib.service.pagination import paginate as _paginate_shared +from prowler.providers.okta.lib.service.raw_fetch import ( + get_json_paginated as _raw_get_json_paginated, +) +from prowler.providers.okta.lib.service.service import OktaService + +REQUIRED_SCOPES: dict[str, str] = { + "api_tokens": "okta.apiTokens.read", + "network_zones": "okta.networkZones.read", + "user_roles": "okta.roles.read", + # Needed to resolve admin roles inherited via group membership. + # `/api/v1/users/{id}/roles` returns only direct role assignments; + # group-inherited Super Admin is invisible without `okta.groups.read` + # to enumerate the user's groups. + "user_groups": "okta.groups.read", +} + + +def _value(value) -> str: + """Return plain string values from Okta SDK enums and raw strings.""" + if value is None: + return "" + enum_value = getattr(value, "value", None) + if enum_value is not None: + return str(enum_value) + return str(value) + + +def _role_to_string(role) -> str: + """Pick the most specific role identifier from an SDK Role object. + + `list_assigned_roles_for_user` and `list_group_assigned_roles` return + `ListGroupAssignedRoles200ResponseInner` — a oneOf wrapper that holds + the real `StandardRole`/`CustomRole` on `.actual_instance`. Reading + `.type`/`.label` from the wrapper returns None and the role silently + disappears, so unwrap first. + """ + inner = getattr(role, "actual_instance", None) or role + return _value(getattr(inner, "type", None)) or _value(getattr(inner, "label", None)) + + +def _raw_value(item, key: str) -> str: + """Return a string value from an SDK model or raw dictionary.""" + if isinstance(item, dict): + return _value(item.get(key)) + return _value(getattr(item, key, None)) + + +class ApiToken(OktaService): + """Fetches Okta API token metadata, token owners' roles, and zones.""" + + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + granted = set(getattr(provider.identity, "granted_scopes", None) or []) + self.missing_scope: dict[str, Optional[str]] = { + resource: (scope if granted and scope not in granted else None) + for resource, scope in REQUIRED_SCOPES.items() + } + # Per-resource caches keyed on the Okta resource id. API tokens + # commonly share owners (e.g. a service user holding multiple + # tokens) and admin groups frequently overlap across users, so we + # memoize the resolutions within a single service instance. + self._user_roles_cache: dict[str, list[str]] = {} + self._group_roles_cache: dict[str, list[str]] = {} + self.known_network_zone_ids: set[str] = ( + set() + if self.missing_scope["api_tokens"] or self.missing_scope["network_zones"] + else self._list_known_network_zone_ids() + ) + self.api_tokens: dict[str, OktaApiToken] = ( + {} if self.missing_scope["api_tokens"] else self._list_api_tokens() + ) + + def _list_api_tokens(self) -> dict[str, "OktaApiToken"]: + """List active API token metadata and owner roles.""" + logger.info("ApiToken - Listing Okta API tokens...") + try: + return self._run(self._fetch_api_tokens()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return {} + + async def _fetch_api_tokens(self) -> dict[str, "OktaApiToken"]: + # `list_api_tokens` is non-paginated in the SDK (no `after` + # parameter); we inline the tuple unwrap rather than going + # through `paginate`. Same pattern application_service uses for + # `get_first_party_app_settings`. + result: dict[str, OktaApiToken] = {} + sdk_result = await self.client.list_api_tokens() + err = sdk_result[-1] + if err is not None: + logger.error(f"Error listing API tokens: {err}") + return result + items = sdk_result[0] or [] + + for token in items: + token_id = _value(getattr(token, "id", None)) + user_id = _value(getattr(token, "user_id", None)) + roles = ( + await self._fetch_effective_user_role_types(user_id) if user_id else [] + ) + network = getattr(token, "network", None) + token_obj = OktaApiToken( + id=token_id, + name=_value(getattr(token, "name", None)) or token_id, + client_name=_value(getattr(token, "client_name", None)), + user_id=user_id, + network_connection=_value(getattr(network, "connection", None)), + network_includes=list(getattr(network, "include", None) or []), + network_excludes=list(getattr(network, "exclude", None) or []), + owner_roles=roles, + ) + result[token_obj.id] = token_obj + return result + + async def _fetch_effective_user_role_types(self, user_id: str) -> list[str]: + """Return direct + group-inherited admin role types for `user_id`. + + Okta's `/api/v1/users/{userId}/roles` (the SDK's + `list_assigned_roles_for_user`) only returns roles assigned + *directly* to the user. Roles inherited via group membership are + invisible to that endpoint — but they are how Okta normally + grants Super Admin (e.g. the org creator joins the default + "Okta Super Admins" group). Without resolving group-inherited + roles, the Super Admin check would falsely PASS for any token + whose owner gets admin via a group. + + Results are memoized per `user_id` so multiple tokens with the + same owner cost a single resolution. + """ + if user_id in self._user_roles_cache: + return self._user_roles_cache[user_id] + direct = await self._fetch_direct_user_role_types(user_id) + inherited = await self._fetch_group_inherited_role_types(user_id) + # Dedupe while preserving first-seen order (direct first, then + # inherited) so the status_extended reads from most-specific. + seen: set[str] = set() + combined: list[str] = [] + for role in (*direct, *inherited): + if role and role not in seen: + combined.append(role) + seen.add(role) + self._user_roles_cache[user_id] = combined + return combined + + async def _fetch_direct_user_role_types(self, user_id: str) -> list[str]: + """Return roles assigned directly to the user (no group inheritance).""" + if self.missing_scope["user_roles"]: + return [] + # `list_assigned_roles_for_user` is non-paginated in the SDK + # (no `after` parameter); inline the tuple unwrap. + sdk_result = await self.client.list_assigned_roles_for_user(user_id) + err = sdk_result[-1] + if err is not None: + logger.error(f"Error listing roles for token owner {user_id}: {err}") + return [] + items = sdk_result[0] or [] + roles = [_role_to_string(role) for role in items if _role_to_string(role)] + if roles or not items: + return roles + + # Belt-and-suspenders: when the SDK's typed parse returns items + # but every projection ends up empty (a discriminator surface we + # don't yet handle, a future schema change, …), fall back to the + # raw JSON. The `_role_to_string` unwrap above already covers the + # known `ListGroupAssignedRoles200ResponseInner` oneOf wrapper + # bug — this fallback exists for whatever the next SDK quirk is. + return await self._fetch_user_role_types_raw(user_id) + + async def _fetch_user_role_types_raw(self, user_id: str) -> list[str]: + """Return user role types from the raw response when typed models are empty. + + Uses the shared `get_json_paginated` helper so any `Link: next` + header the API returns is followed (role lists are typically + small, but the SDK doesn't paginate this endpoint at all so the + only correct way to drain it lives here). + """ + raw_items = await _raw_get_json_paginated( + self.client, + f"/api/v1/users/{user_id}/roles", + context=f"user roles for {user_id}", + ) + if raw_items is None: + return [] + roles = [ + _value(role.get("type")) or _value(role.get("label")) + for role in raw_items + if isinstance(role, dict) + ] + return [role for role in roles if role] + + async def _fetch_group_inherited_role_types(self, user_id: str) -> list[str]: + """Return roles inherited via the user's group memberships. + + Each group's role list is itself memoized — admin groups are + commonly shared across many users. + """ + if self.missing_scope["user_roles"] or self.missing_scope["user_groups"]: + return [] + # Defensive try/except: tenants we've seen in the wild return 403 + # on `/api/v1/users/{id}/groups` even when `okta.groups.read` is + # granted (admin-role on the service app gates the response + # separately). Treat any failure as "no inherited roles" so the + # caller still surfaces direct roles cleanly. + try: + sdk_result = await self.client.list_user_groups(user_id) + except Exception as error: + logger.error( + f"Error listing groups for token owner {user_id}: " + f"{error.__class__.__name__}: {error}" + ) + return [] + err = sdk_result[-1] + if err is not None: + logger.error(f"Error listing groups for token owner {user_id}: {err}") + return [] + groups = sdk_result[0] or [] + roles: list[str] = [] + for group in groups: + group_id = _value(getattr(group, "id", None)) + if not group_id: + continue + if group_id in self._group_roles_cache: + roles.extend(self._group_roles_cache[group_id]) + continue + # Per-group try/except: one group's parse or auth failure + # must not erase admin-role coverage for other groups. + try: + group_roles = await self._fetch_group_role_types(group_id) + except Exception as error: + logger.error( + f"Error listing roles for group {group_id} " + f"(owner={user_id}): {error.__class__.__name__}: {error}" + ) + group_roles = [] + self._group_roles_cache[group_id] = group_roles + roles.extend(group_roles) + return roles + + async def _fetch_group_role_types(self, group_id: str) -> list[str]: + """Return role types assigned to `group_id`.""" + sdk_result = await self.client.list_group_assigned_roles(group_id) + err = sdk_result[-1] + if err is not None: + logger.error(f"Error listing roles for group {group_id}: {err}") + return [] + items = sdk_result[0] or [] + return [_role_to_string(role) for role in items if _role_to_string(role)] + + def _list_known_network_zone_ids(self) -> set[str]: + """List known Network Zone ids and names for token condition validation.""" + logger.info("ApiToken - Listing Network Zones for token restrictions...") + try: + return self._run(self._fetch_known_network_zone_ids()) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + return set() + + async def _fetch_known_network_zone_ids(self) -> set[str]: + identifiers: set[str] = set() + items, err = await self._fetch_all_network_zones() + if err is not None: + logger.error(f"Error listing Network Zones for API token checks: {err}") + return identifiers + for zone in items: + zone_id = _raw_value(zone, "id") + zone_name = _raw_value(zone, "name") + if zone_id: + identifiers.add(zone_id) + if zone_name: + identifiers.add(zone_name) + return identifiers + + async def _fetch_all_network_zones(self) -> tuple[list, object]: + """Drain all Network Zone pages for API token reference validation. + + Catches the upstream Okta SDK ↔ Management API schema drift on + Enhanced Dynamic Zones (object-shaped pydantic model where the + API returns a JSON array) the same way `network_zone_service` + does. `(ValueError, ValidationError)` covers both discriminator + misses and model mismatches — matching the `user_service` + precedent. + """ + try: + return await _paginate_shared( + lambda after: self.client.list_network_zones(after=after, limit=200) + ) + except (ValueError, ValidationError) as ex: + logger.warning( + f"Okta SDK raised {type(ex).__name__} parsing Network Zones " + "for API token validation — falling back to raw-JSON parse." + ) + return await self._fetch_all_network_zones_raw() + + async def _fetch_all_network_zones_raw(self) -> tuple[list, object]: + """Drain Network Zone pages via the shared raw-JSON helper.""" + items = await _raw_get_json_paginated( + self.client, + "/api/v1/zones", + page_size=200, + context="Network Zones for API token validation", + ) + if items is None: + return [], Exception("raw Network Zones fetch failed; see logs") + return items, None + + +class OktaApiToken(BaseModel): + """Normalized Okta API token metadata used by checks.""" + + id: str + name: str + client_name: str = "" + user_id: str = "" + network_connection: str = "" + network_includes: list[str] = Field(default_factory=list) + network_excludes: list[str] = Field(default_factory=list) + owner_roles: list[str] = Field(default_factory=list) diff --git a/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/__init__.py b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json new file mode 100644 index 0000000000..d15c5b4680 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "apitoken_not_super_admin", + "CheckTitle": "Okta API tokens are not owned by Super Admin users", + "CheckType": [], + "ServiceName": "apitoken", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "**Okta API token ownership** should avoid Super Admin users because API tokens inherit the admin permissions of the user that created them.", + "Risk": "**Super Admin-owned API tokens** become high-impact secrets: if one is exposed, an attacker can perform broad organization administration with the token owner privileges.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/guides/roles", + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/apitoken" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Create a dedicated service account, assign only required admin roles, rotate the API token, and revoke Super Admin-owned tokens.", + "Terraform": "" + }, + "Recommendation": { + "Text": "**Use dedicated Okta service accounts** for API tokens and assign only the least-privilege admin roles required; rotate and revoke tokens created by Super Admin users.", + "Url": "https://hub.prowler.com/check/apitoken_not_super_admin" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py new file mode 100644 index 0000000000..0971af4aab --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_not_super_admin/apitoken_not_super_admin.py @@ -0,0 +1,70 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.apitoken.api_token_client import api_token_client +from prowler.providers.okta.services.apitoken.lib.api_token_helpers import ( + missing_api_token_scope_finding, + missing_user_roles_scope_for_token_finding, + owner_has_super_admin, +) + + +class apitoken_not_super_admin(Check): + """Ensure Okta API tokens are not owned by Super Admin users.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate every active API token owner's assigned admin roles.""" + org_domain = api_token_client.provider.identity.org_domain + missing_api_token_scope = api_token_client.missing_scope.get("api_tokens") + if missing_api_token_scope: + return [ + missing_api_token_scope_finding( + self.metadata(), + org_domain, + missing_api_token_scope, + additional_required=["okta.roles.read", "okta.groups.read"], + ) + ] + + missing_user_roles_scope = api_token_client.missing_scope.get("user_roles") + # `okta.groups.read` is needed to resolve admin roles inherited via + # group membership. Without it we fall back to direct-only role + # assignments, which Okta returns for `/api/v1/users/{id}/roles` — + # commonly empty for trial accounts where Super Admin is granted + # through the default admin group. The finding stays evaluable but + # is flagged as best-effort so operators know to grant the scope. + missing_user_groups_scope = api_token_client.missing_scope.get("user_groups") + findings: list[CheckReportOkta] = [] + for token in api_token_client.api_tokens.values(): + report = CheckReportOkta( + metadata=self.metadata(), resource=token, org_domain=org_domain + ) + if missing_user_roles_scope: + report = missing_user_roles_scope_for_token_finding( + self.metadata(), org_domain, token, missing_user_roles_scope + ) + elif owner_has_super_admin(token): + report.status = "FAIL" + report.status_extended = ( + f"API token {token.name} is owned by user {token.user_id} " + "with the Super Admin role. Use a dedicated service account " + "with least-privilege admin roles instead." + ) + else: + roles = ( + ", ".join(token.owner_roles) + if token.owner_roles + else "no admin roles returned" + ) + caveat = ( + " Group-inherited roles were not checked because the " + f"`{missing_user_groups_scope}` scope is missing — grant " + "it to detect Super Admin assigned via group membership." + if missing_user_groups_scope + else "" + ) + report.status = "PASS" + report.status_extended = ( + f"API token {token.name} owner {token.user_id} is not " + f"assigned Super Admin ({roles}).{caveat}" + ) + findings.append(report) + return findings diff --git a/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/__init__.py b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json new file mode 100644 index 0000000000..4a088a0c1c --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "okta", + "CheckID": "apitoken_restricted_to_network_zone", + "CheckTitle": "Okta API tokens are restricted to known Network Zones", + "CheckType": [], + "ServiceName": "apitoken", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "NotDefined", + "ResourceGroup": "IAM", + "Description": "**Okta API token network restrictions** should prevent token use from Any IP by tying each token to known Okta Network Zones.", + "Risk": "**API tokens allowed from Any IP** can be replayed from attacker-controlled infrastructure if the secret is exposed, removing an important network boundary.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://developer.okta.com/docs/api/openapi/okta-management/management/tags/apitoken", + "https://help.okta.com/oie/en-us/content/topics/security/api.htm" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "Security > API > Tokens: edit each token Security section and select specific Network Zones instead of Any IP.", + "Terraform": "" + }, + "Recommendation": { + "Text": "**Restrict every Okta API token to trusted IP-based Network Zones** and review token network conditions whenever service locations change.", + "Url": "https://hub.prowler.com/check/apitoken_restricted_to_network_zone" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py new file mode 100644 index 0000000000..04cfcf2df7 --- /dev/null +++ b/prowler/providers/okta/services/apitoken/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone.py @@ -0,0 +1,55 @@ +from prowler.lib.check.models import Check, CheckReportOkta +from prowler.providers.okta.services.apitoken.api_token_client import api_token_client +from prowler.providers.okta.services.apitoken.lib.api_token_helpers import ( + definite_network_zone_restriction_failure, + missing_api_token_scope_finding, + missing_network_zone_scope_for_token_finding, + network_zone_restriction_status, +) + + +class apitoken_restricted_to_network_zone(Check): + """Ensure Okta API tokens are restricted to known Network Zones.""" + + def execute(self) -> list[CheckReportOkta]: + """Evaluate every active API token's network condition.""" + org_domain = api_token_client.provider.identity.org_domain + missing_api_token_scope = api_token_client.missing_scope.get("api_tokens") + if missing_api_token_scope: + return [ + missing_api_token_scope_finding( + self.metadata(), + org_domain, + missing_api_token_scope, + additional_required=["okta.networkZones.read"], + ) + ] + + missing_network_zone_scope = api_token_client.missing_scope.get("network_zones") + findings: list[CheckReportOkta] = [] + for token in api_token_client.api_tokens.values(): + if missing_network_zone_scope: + definite_failure = definite_network_zone_restriction_failure(token) + if definite_failure: + report = CheckReportOkta( + metadata=self.metadata(), + resource=token, + org_domain=org_domain, + ) + report.status, report.status_extended = definite_failure + else: + report = missing_network_zone_scope_for_token_finding( + self.metadata(), org_domain, token, missing_network_zone_scope + ) + else: + report = CheckReportOkta( + metadata=self.metadata(), resource=token, org_domain=org_domain + ) + ( + report.status, + report.status_extended, + ) = network_zone_restriction_status( + token, api_token_client.known_network_zone_ids + ) + findings.append(report) + return findings diff --git a/prowler/providers/okta/services/apitoken/lib/__init__.py b/prowler/providers/okta/services/apitoken/lib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/okta/services/apitoken/lib/api_token_helpers.py b/prowler/providers/okta/services/apitoken/lib/api_token_helpers.py new file mode 100644 index 0000000000..3a871714ae --- /dev/null +++ b/prowler/providers/okta/services/apitoken/lib/api_token_helpers.py @@ -0,0 +1,140 @@ +from prowler.lib.check.models import CheckReportOkta +from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken + +ANYWHERE_CONNECTIONS = {"", "ANYWHERE", "ANY_IP"} +_SCOPE_ADVICE = ( + "Grant it on the Okta API Scopes tab of the service app in the Okta Admin " + "Console, then re-run the check." +) + + +def network_zone_restriction_status( + token: OktaApiToken, known_network_zone_ids: set[str] +) -> tuple[str, str]: + """Evaluate whether an API token is restricted to known Network Zones.""" + connection = token.network_connection.upper() + if connection in ANYWHERE_CONNECTIONS: + return ( + "FAIL", + f"API token {token.name} can be used from any IP address. " + "Restrict the token to one or more known Okta Network Zones.", + ) + + if not token.network_includes: + return ( + "FAIL", + f"API token {token.name} does not allowlist a specific Okta " + "Network Zone. Excluded zones do not restrict the token to trusted " + "source networks.", + ) + + unknown_zones = [ + zone for zone in token.network_includes if zone not in known_network_zone_ids + ] + if unknown_zones: + return ( + "FAIL", + f"API token {token.name} references unknown Network Zone(s): " + f"{', '.join(unknown_zones)}.", + ) + + return ( + "PASS", + f"API token {token.name} is restricted to known Okta Network Zone(s): " + f"{', '.join(token.network_includes)}.", + ) + + +def definite_network_zone_restriction_failure( + token: OktaApiToken, +) -> tuple[str, str] | None: + """Return a definite network restriction failure that does not need zone lookup.""" + connection = token.network_connection.upper() + if connection in ANYWHERE_CONNECTIONS or not token.network_includes: + return network_zone_restriction_status(token, set()) + return None + + +def owner_has_super_admin(token: OktaApiToken) -> bool: + """Return True when any token owner role is Super Admin.""" + for role in token.owner_roles: + normalized = role.strip().replace(" ", "_").upper() + if normalized in {"SUPER_ADMIN", "SUPER_ADMINISTRATOR"}: + return True + return False + + +def missing_api_token_scope_finding( + metadata, + org_domain: str, + scope: str, + additional_required: list[str] | None = None, +) -> CheckReportOkta: + """Build the MANUAL finding emitted when API tokens cannot be listed. + + `additional_required` lets the calling check name the secondary + scopes it also needs (e.g. `okta.roles.read` for the Super Admin + check, `okta.networkZones.read` for the zone-restriction check) so + the operator can grant everything in one go instead of re-running + once per missing scope. + """ + resource = OktaApiToken( + id="api-tokens-scope-missing", + name="(scope not granted)", + ) + report = CheckReportOkta( + metadata=metadata, resource=resource, org_domain=org_domain + ) + report.status = "MANUAL" + if additional_required: + extras = f" This check also requires {_format_scope_list(additional_required)}." + advice = ( + "Grant them on the service app's Okta API Scopes tab in the Okta " + "Admin Console, then re-run the check." + ) + else: + extras = "" + advice = _SCOPE_ADVICE + report.status_extended = ( + f"Could not retrieve Okta API token metadata: the Okta service app " + f"is missing the required `{scope}` API scope.{extras} {advice}" + ) + return report + + +def _format_scope_list(scopes: list[str]) -> str: + """Format a list of scope names as backticked, comma-joined text.""" + formatted = [f"`{scope}`" for scope in scopes] + if len(formatted) == 1: + return formatted[0] + if len(formatted) == 2: + return " and ".join(formatted) + return ", ".join(formatted[:-1]) + f", and {formatted[-1]}" + + +def missing_network_zone_scope_for_token_finding( + metadata, org_domain: str, token: OktaApiToken, scope: str +) -> CheckReportOkta: + """Build the MANUAL finding emitted when token zones cannot be validated.""" + report = CheckReportOkta(metadata=metadata, resource=token, org_domain=org_domain) + report.status = "MANUAL" + report.status_extended = ( + f"Could not validate Network Zone restrictions for API token " + f"{token.name}: the Okta service app is missing the required " + f"`{scope}` API scope. {_SCOPE_ADVICE}" + ) + return report + + +def missing_user_roles_scope_for_token_finding( + metadata, org_domain: str, token: OktaApiToken, scope: str +) -> CheckReportOkta: + """Build the MANUAL finding emitted when token owner roles cannot be listed.""" + report = CheckReportOkta(metadata=metadata, resource=token, org_domain=org_domain) + report.status = "MANUAL" + report.status_extended = ( + f"Could not retrieve admin roles for API token {token.name} owner " + f"{token.user_id}: the Okta service app is missing the required " + f"`{scope}` API scope. {_SCOPE_ADVICE}" + ) + return report diff --git a/tests/providers/okta/lib/service/pagination_test.py b/tests/providers/okta/lib/service/pagination_test.py new file mode 100644 index 0000000000..a14baeb51e --- /dev/null +++ b/tests/providers/okta/lib/service/pagination_test.py @@ -0,0 +1,147 @@ +"""Tests for the shared Okta pagination helpers in +`prowler.providers.okta.lib.service.pagination`. + +Covers `next_after_cursor` (extracts the `after` query param from an +RFC 5988 `Link: rel="next"` header) and `paginate` (drains all pages +of an SDK list call by following the cursor). + +These tests were carved out of `network_zone_service_test.py` when its +local pagination helpers were replaced by the shared module — they now +cover code that six Okta services depend on. +""" + +import asyncio +from types import SimpleNamespace + +from prowler.providers.okta.lib.service.pagination import ( + next_after_cursor, + paginate, +) + + +def _run(coro): + return asyncio.run(coro) + + +def _resp(headers: dict = None): + return SimpleNamespace(headers=headers or {}) + + +class Test_next_after_cursor: + """Behaviours previously covered in `network_zone_service_test.py` + under `Test_network_zone_pagination` — relocated here when the + local helper was replaced by the shared module. + """ + + def test_returns_none_when_response_is_none(self): + assert next_after_cursor(None) is None + + def test_returns_none_when_no_link_header(self): + assert next_after_cursor(_resp({})) is None + + def test_extracts_next_after_cursor(self): + link = ( + '; rel="self", ' + '; rel="next"' + ) + assert next_after_cursor(_resp({"Link": link})) == "next-page" + + def test_reads_lowercase_link_header(self): + # aiohttp's `CIMultiDict` is case-insensitive in practice, but + # callers occasionally pass a dict, so we check both spellings. + link = '; rel="next"' + assert next_after_cursor(_resp({"link": link})) == "cursor-1" + + def test_next_link_without_after_query_returns_none(self): + link = ( + '; rel="self", ' + '; rel="next"' + ) + assert next_after_cursor(_resp({"Link": link})) is None + + def test_no_next_segment_returns_none(self): + link = '; rel="self"' + assert next_after_cursor(_resp({"Link": link})) is None + + def test_url_decodes_after_cursor(self): + # `parse_qs` decodes percent-encoded values — opaque cursors with + # `=` or `+` must round-trip through callers that re-encode. + link = ( + "; " 'rel="next"' + ) + assert next_after_cursor(_resp({"Link": link})) == "cursor=abc+1" + + +class Test_paginate: + def test_returns_items_for_single_page_response(self): + async def fetch(_after): + return (["a", "b"], _resp({}), None) + + items, err = _run(paginate(fetch)) + assert items == ["a", "b"] + assert err is None + + def test_drains_multiple_pages(self): + link = '; rel="next"' + seen_cursors: list = [] + + async def fetch(after): + seen_cursors.append(after) + if after is None: + return (["a"], _resp({"link": link}), None) + return (["b"], _resp({}), None) + + items, err = _run(paginate(fetch)) + assert items == ["a", "b"] + assert err is None + assert seen_cursors == [None, "p2"] + + def test_returns_empty_when_first_page_is_empty(self): + async def fetch(_after): + return ([], _resp({}), None) + + items, err = _run(paginate(fetch)) + assert items == [] + assert err is None + + def test_returns_empty_and_error_when_first_page_fails(self): + async def fetch(_after): + return ([], _resp({}), Exception("forbidden")) + + items, err = _run(paginate(fetch)) + assert items == [] + assert str(err) == "forbidden" + + def test_returns_partial_items_when_subsequent_page_errors(self): + # Carved out of `network_zone_service_test.py`'s + # `test_pagination_returns_partial_items_when_second_page_errors`. + link = '; rel="next"' + + async def fetch(after): + if after is None: + return (["page-1"], _resp({"link": link}), None) + return ([], _resp({}), Exception("page failed")) + + items, err = _run(paginate(fetch)) + assert items == ["page-1"] + assert str(err) == "page failed" + + def test_accepts_early_error_two_tuple_shape(self): + # The Okta SDK returns `(items, err)` on request-build failures + # (no response) and `(items, resp, err)` on transport responses. + # `paginate` reads `result[-1]` for err so the 2-tuple shape is + # handled — verify explicitly. + async def fetch(_after): + return ([], Exception("create failed")) + + items, err = _run(paginate(fetch)) + assert items == [] + assert str(err) == "create failed" + + def test_treats_none_items_as_empty_list(self): + async def fetch(_after): + return (None, _resp({}), None) + + items, err = _run(paginate(fetch)) + assert items == [] + assert err is None diff --git a/tests/providers/okta/okta_fixtures.py b/tests/providers/okta/okta_fixtures.py index 0ecb4a9eef..d1018a65eb 100644 --- a/tests/providers/okta/okta_fixtures.py +++ b/tests/providers/okta/okta_fixtures.py @@ -21,6 +21,9 @@ def set_mocked_okta_provider( "okta.brands.read", "okta.apps.read", "okta.networkZones.read", + "okta.apiTokens.read", + "okta.roles.read", + "okta.groups.read", "okta.logStreams.read", "okta.idps.read", ], @@ -35,6 +38,9 @@ def set_mocked_okta_provider( "okta.brands.read", "okta.apps.read", "okta.networkZones.read", + "okta.apiTokens.read", + "okta.roles.read", + "okta.groups.read", "okta.logStreams.read", "okta.idps.read", ], diff --git a/tests/providers/okta/services/api_token/api_token_fixtures.py b/tests/providers/okta/services/api_token/api_token_fixtures.py new file mode 100644 index 0000000000..63a67c1605 --- /dev/null +++ b/tests/providers/okta/services/api_token/api_token_fixtures.py @@ -0,0 +1,46 @@ +from unittest import mock + +from prowler.providers.okta.services.apitoken.api_token_service import OktaApiToken +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def build_api_token_client( + tokens: dict = None, + known_network_zone_ids: set[str] = None, + missing_scope: dict = None, +): + client = mock.MagicMock() + client.api_tokens = tokens or {} + client.known_network_zone_ids = known_network_zone_ids or {"nzo-corp"} + client.missing_scope = missing_scope or { + "api_tokens": None, + "network_zones": None, + "user_roles": None, + "user_groups": None, + } + client.provider = set_mocked_okta_provider() + return client + + +def api_token( + token_id: str = "00Tabcdefg1234567890", + name: str = "CI token", + *, + user_id: str = "00uabcdefg1234567890", + network_connection: str = "ZONE", + network_includes: list[str] = None, + network_excludes: list[str] = None, + owner_roles: list[str] = None, +): + return OktaApiToken( + id=token_id, + name=name, + client_name="Okta API", + user_id=user_id, + network_connection=network_connection, + network_includes=( + network_includes if network_includes is not None else ["nzo-corp"] + ), + network_excludes=network_excludes or [], + owner_roles=owner_roles or ["READ_ONLY_ADMIN"], + ) diff --git a/tests/providers/okta/services/api_token/api_token_service_test.py b/tests/providers/okta/services/api_token/api_token_service_test.py new file mode 100644 index 0000000000..0958bdaaad --- /dev/null +++ b/tests/providers/okta/services/api_token/api_token_service_test.py @@ -0,0 +1,616 @@ +import json +from types import SimpleNamespace +from unittest import mock + +from prowler.providers.okta.models import OktaIdentityInfo +from prowler.providers.okta.services.apitoken.api_token_service import ApiToken +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider + + +def _resp(headers: dict = None): + return SimpleNamespace(headers=headers or {}) + + +def _sdk_token( + token_id: str = "00Tabcdefg1234567890", + name: str = "CI token", + *, + user_id: str = "00uabcdefg1234567890", + connection: str = "ZONE", + include: list[str] = None, + exclude: list[str] = None, +): + return SimpleNamespace( + id=token_id, + name=name, + client_name="Okta API", + user_id=user_id, + network=SimpleNamespace( + connection=connection, + include=include if include is not None else ["nzo-corp"], + exclude=exclude or [], + ), + ) + + +def _sdk_role(role_type: str): + return SimpleNamespace(type=role_type, label=role_type.replace("_", " ").title()) + + +def _sdk_role_wrapped(role_type: str): + """Mimic `ListGroupAssignedRoles200ResponseInner` — a oneOf wrapper + holding the real StandardRole on `.actual_instance`. The Okta SDK + actually returns this shape; treating it like the bare role yields + `type=None, label=None` and the role silently vanishes from the + check. + """ + inner = _sdk_role(role_type) + return SimpleNamespace(actual_instance=inner, type=None, label=None) + + +def _sdk_zone(zone_id: str, name: str): + return SimpleNamespace(id=zone_id, name=name) + + +def _sdk_group(group_id: str): + return SimpleNamespace(id=group_id) + + +async def _empty_list(*_a, **_k): + return ([], _resp({}), None) + + +class Test_ApiToken_service: + def test_fetches_tokens_roles_and_known_network_zones(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(user_id, *_a, **_k): + assert user_id == token.user_id + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_list_network_zones(*_a, **_k): + return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_user_groups = _empty_list + mocked.list_group_assigned_roles = _empty_list + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + + service = ApiToken(provider) + + assert set(service.api_tokens.keys()) == {token.id} + assert service.api_tokens[token.id].network_connection == "ZONE" + assert service.api_tokens[token.id].owner_roles == ["READ_ONLY_ADMIN"] + assert service.known_network_zone_ids == {"nzo-corp", "Corporate"} + + def test_role_fetch_error_keeps_token_with_empty_roles(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_roles_error(*_a, **_k): + return ([], _resp({}), Exception("forbidden")) + + async def fake_list_network_zones(*_a, **_k): + return ([], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_roles_error + mocked.list_user_groups = _empty_list + mocked.list_group_assigned_roles = _empty_list + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == [] + + def test_falls_back_to_raw_roles_when_sdk_role_is_empty(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(user_id, *_a, **_k): + assert user_id == token.user_id + return ([SimpleNamespace(type=None, label=None)], _resp({}), None) + + async def fake_create_request(*_a, **_k): + return ("raw-role-request", None) + + async def fake_execute(request, *_a, **_k): + assert request == "raw-role-request" + return ( + _resp({}), + json.dumps( + [ + { + "id": "ra-super-admin", + "type": "SUPER_ADMIN", + "label": "Super Administrator", + } + ] + ), + None, + ) + + async def fake_list_network_zones(*_a, **_k): + return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked._list_assigned_roles_for_user_serialize.return_value = ( + "GET", + "/api/v1/users/00uabcdefg1234567890/roles", + {}, + None, + None, + ) + mocked._request_executor.create_request = fake_create_request + mocked._request_executor.execute = fake_execute + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == ["SUPER_ADMIN"] + + def test_paginates_known_network_zones_for_token_validation(self): + provider = set_mocked_okta_provider() + token = _sdk_token(include=["nzo-page-2"]) + next_link = '; rel="next"' + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(*_a, **_k): + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_list_network_zones(*_a, **kwargs): + if kwargs.get("after") is None: + return ( + [_sdk_zone("nzo-page-1", "First")], + _resp({"link": next_link}), + None, + ) + return ([_sdk_zone("nzo-page-2", "Second")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_user_groups = _empty_list + mocked.list_group_assigned_roles = _empty_list + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.known_network_zone_ids == { + "nzo-page-1", + "First", + "nzo-page-2", + "Second", + } + + def test_falls_back_to_raw_network_zones_when_sdk_listing_fails(self): + provider = set_mocked_okta_provider() + token = _sdk_token(include=["nzo-raw"]) + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(*_a, **_k): + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_list_network_zones(*_a, **_k): + raise ValueError("EnhancedDynamicNetworkZone SDK deserialization failed") + + async def fake_create_request(*_a, **_k): + return ("raw-zones-request", None) + + async def fake_execute(request, *_a, **_k): + assert request == "raw-zones-request" + return ( + _resp({}), + json.dumps([{"id": "nzo-raw", "name": "Raw Corporate"}]), + None, + ) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_network_zones = fake_list_network_zones + mocked._list_network_zones_serialize.return_value = ( + "GET", + "/api/v1/zones", + {}, + None, + None, + ) + mocked._request_executor.create_request = fake_create_request + mocked._request_executor.execute = fake_execute + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.known_network_zone_ids == {"nzo-raw", "Raw Corporate"} + + def test_returns_empty_on_token_api_error(self): + provider = set_mocked_okta_provider() + + async def failing(*_a, **_k): + return ([], _resp({}), Exception("forbidden")) + + async def fake_list_network_zones(*_a, **_k): + return ([], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = failing + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens == {} + + def test_missing_api_token_scope_skips_dependent_api_calls(self): + provider = set_mocked_okta_provider( + identity=OktaIdentityInfo( + org_domain="acme.okta.com", + client_id="0oa1234567890abcdef", + granted_scopes=["okta.networkZones.read", "okta.roles.read"], + ) + ) + + async def fail_if_called(*_a, **_k): + raise AssertionError("API calls should not run without apiTokens scope") + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fail_if_called + mocked.list_network_zones = fail_if_called + mocked.list_assigned_roles_for_user = fail_if_called + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.missing_scope["api_tokens"] == "okta.apiTokens.read" + assert service.api_tokens == {} + assert service.known_network_zone_ids == set() + + def test_missing_network_zone_scope_skips_zone_api_call(self): + provider = set_mocked_okta_provider( + identity=OktaIdentityInfo( + org_domain="acme.okta.com", + client_id="0oa1234567890abcdef", + granted_scopes=["okta.apiTokens.read", "okta.roles.read"], + ) + ) + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_list_assigned_roles_for_user(*_a, **_k): + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fail_if_called(*_a, **_k): + raise AssertionError("list_network_zones should not be called") + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_list_assigned_roles_for_user + mocked.list_network_zones = fail_if_called + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.missing_scope["network_zones"] == "okta.networkZones.read" + assert service.known_network_zone_ids == set() + assert set(service.api_tokens.keys()) == {token.id} + + def test_missing_role_scope_skips_role_api_call(self): + provider = set_mocked_okta_provider( + identity=OktaIdentityInfo( + org_domain="acme.okta.com", + client_id="0oa1234567890abcdef", + granted_scopes=["okta.apiTokens.read", "okta.networkZones.read"], + ) + ) + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fail_if_called(*_a, **_k): + raise AssertionError("list_assigned_roles_for_user should not be called") + + async def fake_list_network_zones(*_a, **_k): + return ([_sdk_zone("nzo-corp", "Corporate")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fail_if_called + mocked.list_network_zones = fake_list_network_zones + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.missing_scope["user_roles"] == "okta.roles.read" + assert service.api_tokens[token.id].owner_roles == [] + + +class Test_ApiToken_service_group_inherited_roles: + """Verifies effective-role resolution combines direct + group-inherited. + + Okta's `/api/v1/users/{userId}/roles` returns only directly-assigned + admin roles. Roles inherited via group membership — the common path + for Super Admin on trial tenants — are invisible to that endpoint. + The service must enumerate the user's groups and combine each + group's role assignments. + """ + + def test_group_inherited_super_admin_surfaces(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_direct_roles(*_a, **_k): + return ([], _resp({}), None) + + async def fake_user_groups(user_id, *_a, **_k): + assert user_id == token.user_id + return ( + [_sdk_group("0gp-admins"), _sdk_group("0gp-eng")], + _resp({}), + None, + ) + + async def fake_group_roles(group_id, *_a, **_k): + if group_id == "0gp-admins": + return ([_sdk_role("SUPER_ADMIN")], _resp({}), None) + return ([], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_direct_roles + mocked.list_user_groups = fake_user_groups + mocked.list_group_assigned_roles = fake_group_roles + mocked.list_network_zones = _empty_list + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == ["SUPER_ADMIN"] + + def test_direct_plus_group_roles_combined_and_deduped(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_direct_roles(*_a, **_k): + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fake_user_groups(*_a, **_k): + return ([_sdk_group("0gp-1")], _resp({}), None) + + async def fake_group_roles(*_a, **_k): + # READ_ONLY_ADMIN already comes from the direct path; the + # dedupe should keep a single entry. SUPER_ADMIN is new. + return ( + [_sdk_role("READ_ONLY_ADMIN"), _sdk_role("SUPER_ADMIN")], + _resp({}), + None, + ) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_direct_roles + mocked.list_user_groups = fake_user_groups + mocked.list_group_assigned_roles = fake_group_roles + mocked.list_network_zones = _empty_list + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == [ + "READ_ONLY_ADMIN", + "SUPER_ADMIN", + ] + + def test_role_resolution_cached_per_user_and_group(self): + provider = set_mocked_okta_provider() + token_a = _sdk_token(token_id="00Ttoken-a", user_id="00uowner-1") + token_b = _sdk_token(token_id="00Ttoken-b", user_id="00uowner-1") + token_c = _sdk_token(token_id="00Ttoken-c", user_id="00uowner-2") + + direct_calls: list[str] = [] + groups_calls: list[str] = [] + group_role_calls: list[str] = [] + + async def fake_list_api_tokens(*_a, **_k): + return ([token_a, token_b, token_c], _resp({}), None) + + async def fake_direct_roles(user_id, *_a, **_k): + direct_calls.append(user_id) + return ([], _resp({}), None) + + async def fake_user_groups(user_id, *_a, **_k): + groups_calls.append(user_id) + return ([_sdk_group("0gp-shared")], _resp({}), None) + + async def fake_group_roles(group_id, *_a, **_k): + group_role_calls.append(group_id) + return ([_sdk_role("HELP_DESK_ADMIN")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_direct_roles + mocked.list_user_groups = fake_user_groups + mocked.list_group_assigned_roles = fake_group_roles + mocked.list_network_zones = _empty_list + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + # Owner 00uowner-1 appears twice but is resolved once. + assert sorted(direct_calls) == ["00uowner-1", "00uowner-2"] + assert sorted(groups_calls) == ["00uowner-1", "00uowner-2"] + # Shared group resolved once even though both owners belong to it. + assert group_role_calls == ["0gp-shared"] + for token in (token_a, token_b, token_c): + assert service.api_tokens[token.id].owner_roles == ["HELP_DESK_ADMIN"] + + def test_missing_groups_scope_falls_back_to_direct_only(self): + provider = set_mocked_okta_provider( + identity=OktaIdentityInfo( + org_domain="acme.okta.com", + client_id="0oa1234567890abcdef", + granted_scopes=[ + "okta.apiTokens.read", + "okta.networkZones.read", + "okta.roles.read", + ], + ) + ) + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_direct_roles(*_a, **_k): + return ([_sdk_role("READ_ONLY_ADMIN")], _resp({}), None) + + async def fail_if_called(*_a, **_k): + raise AssertionError( + "list_user_groups must not be called without okta.groups.read" + ) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_direct_roles + mocked.list_user_groups = fail_if_called + mocked.list_group_assigned_roles = fail_if_called + mocked.list_network_zones = _empty_list + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.missing_scope["user_groups"] == "okta.groups.read" + assert service.api_tokens[token.id].owner_roles == ["READ_ONLY_ADMIN"] + + def test_wrapped_oneof_role_shape_is_unwrapped(self): + """Regression: the SDK returns each role as a oneOf wrapper with + the real StandardRole on `.actual_instance`. The previous + `_role_to_string` read `.type`/`.label` from the wrapper, got + None back, and produced an empty `owner_roles` — causing a + Super Admin token to silently PASS the check.""" + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_direct_roles(*_a, **_k): + return ([_sdk_role_wrapped("SUPER_ADMIN")], _resp({}), None) + + async def fake_user_groups(*_a, **_k): + return ([_sdk_group("0gp-extra")], _resp({}), None) + + async def fake_group_roles(*_a, **_k): + return ([_sdk_role_wrapped("APP_ADMIN")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_direct_roles + mocked.list_user_groups = fake_user_groups + mocked.list_group_assigned_roles = fake_group_roles + mocked.list_network_zones = _empty_list + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == [ + "SUPER_ADMIN", + "APP_ADMIN", + ] + + def test_group_role_fetch_failure_does_not_drop_other_groups(self): + provider = set_mocked_okta_provider() + token = _sdk_token() + + async def fake_list_api_tokens(*_a, **_k): + return ([token], _resp({}), None) + + async def fake_direct_roles(*_a, **_k): + return ([], _resp({}), None) + + async def fake_user_groups(*_a, **_k): + return ( + [_sdk_group("0gp-broken"), _sdk_group("0gp-good")], + _resp({}), + None, + ) + + async def fake_group_roles(group_id, *_a, **_k): + if group_id == "0gp-broken": + raise RuntimeError("upstream parse failure") + return ([_sdk_role("SUPER_ADMIN")], _resp({}), None) + + with mock.patch( + "prowler.providers.okta.lib.service.service.OktaSDKClient" + ) as mocked_client_cls: + mocked = mock.MagicMock() + mocked.list_api_tokens = fake_list_api_tokens + mocked.list_assigned_roles_for_user = fake_direct_roles + mocked.list_user_groups = fake_user_groups + mocked.list_group_assigned_roles = fake_group_roles + mocked.list_network_zones = _empty_list + mocked_client_cls.return_value = mocked + service = ApiToken(provider) + + assert service.api_tokens[token.id].owner_roles == ["SUPER_ADMIN"] diff --git a/tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py b/tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py new file mode 100644 index 0000000000..6177d4b422 --- /dev/null +++ b/tests/providers/okta/services/api_token/apitoken_not_super_admin/apitoken_not_super_admin_test.py @@ -0,0 +1,99 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.api_token.api_token_fixtures import ( + api_token, + build_api_token_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.apitoken." + "apitoken_not_super_admin.apitoken_not_super_admin.api_token_client" +) + + +def _run_check(api_token_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=api_token_client), + ): + from prowler.providers.okta.services.apitoken.apitoken_not_super_admin.apitoken_not_super_admin import ( + apitoken_not_super_admin, + ) + + return apitoken_not_super_admin().execute() + + +class Test_apitoken_not_super_admin: + def test_no_tokens_returns_no_findings(self): + findings = _run_check(build_api_token_client({})) + assert findings == [] + + def test_missing_api_token_scope_is_manual(self): + findings = _run_check( + build_api_token_client( + {}, + missing_scope={"api_tokens": "okta.apiTokens.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.apiTokens.read" in findings[0].status_extended + assert "okta.roles.read" in findings[0].status_extended + assert "okta.groups.read" in findings[0].status_extended + + def test_missing_user_roles_scope_is_manual(self): + token = api_token(owner_roles=[]) + findings = _run_check( + build_api_token_client( + {token.id: token}, + missing_scope={"user_roles": "okta.roles.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert findings[0].resource_id == token.id + assert "okta.roles.read" in findings[0].status_extended + + def test_token_owner_without_super_admin_passes(self): + token = api_token(owner_roles=["READ_ONLY_ADMIN"]) + findings = _run_check(build_api_token_client({token.id: token})) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == token.id + + def test_token_owner_with_super_admin_fails(self): + token = api_token(owner_roles=["SUPER_ADMIN"]) + findings = _run_check(build_api_token_client({token.id: token})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "Super Admin" in findings[0].status_extended + + def test_missing_groups_scope_adds_best_effort_caveat_on_pass(self): + token = api_token(owner_roles=["READ_ONLY_ADMIN"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, + missing_scope={"user_groups": "okta.groups.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert "Group-inherited roles were not checked" in findings[0].status_extended + assert "okta.groups.read" in findings[0].status_extended + + def test_missing_groups_scope_does_not_caveat_when_owner_is_super_admin(self): + token = api_token(owner_roles=["SUPER_ADMIN"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, + missing_scope={"user_groups": "okta.groups.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "Super Admin" in findings[0].status_extended + assert "Group-inherited" not in findings[0].status_extended diff --git a/tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py b/tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py new file mode 100644 index 0000000000..9da6f92939 --- /dev/null +++ b/tests/providers/okta/services/api_token/apitoken_restricted_to_network_zone/apitoken_restricted_to_network_zone_test.py @@ -0,0 +1,114 @@ +from unittest import mock + +from tests.providers.okta.okta_fixtures import set_mocked_okta_provider +from tests.providers.okta.services.api_token.api_token_fixtures import ( + api_token, + build_api_token_client, +) + +CHECK_PATH = ( + "prowler.providers.okta.services.apitoken." + "apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone.api_token_client" +) + + +def _run_check(api_token_client): + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_okta_provider(), + ), + mock.patch(CHECK_PATH, new=api_token_client), + ): + from prowler.providers.okta.services.apitoken.apitoken_restricted_to_network_zone.apitoken_restricted_to_network_zone import ( + apitoken_restricted_to_network_zone, + ) + + return apitoken_restricted_to_network_zone().execute() + + +class Test_apitoken_restricted_to_network_zone: + def test_no_tokens_returns_no_findings(self): + findings = _run_check(build_api_token_client({})) + assert findings == [] + + def test_missing_api_token_scope_is_manual(self): + findings = _run_check( + build_api_token_client( + {}, + missing_scope={"api_tokens": "okta.apiTokens.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert "okta.apiTokens.read" in findings[0].status_extended + assert "okta.networkZones.read" in findings[0].status_extended + + def test_missing_network_zone_scope_is_manual(self): + token = api_token(network_connection="ZONE", network_includes=["nzo-corp"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, + missing_scope={"network_zones": "okta.networkZones.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "MANUAL" + assert findings[0].resource_id == token.id + assert "okta.networkZones.read" in findings[0].status_extended + + def test_missing_network_zone_scope_still_fails_anywhere_token(self): + token = api_token(network_connection="ANYWHERE", network_includes=[]) + findings = _run_check( + build_api_token_client( + {token.id: token}, + missing_scope={"network_zones": "okta.networkZones.read"}, + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "from any IP" in findings[0].status_extended + + def test_token_restricted_to_known_network_zone_passes(self): + token = api_token(network_connection="ZONE", network_includes=["nzo-corp"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, known_network_zone_ids={"nzo-corp"} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "PASS" + assert findings[0].resource_id == token.id + + def test_token_with_only_excluded_network_zone_fails(self): + token = api_token( + network_connection="ZONE", + network_includes=[], + network_excludes=["nzo-blocked"], + ) + findings = _run_check( + build_api_token_client( + {token.id: token}, known_network_zone_ids={"nzo-blocked"} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "does not allowlist" in findings[0].status_extended + + def test_token_open_to_anywhere_fails(self): + token = api_token(network_connection="ANYWHERE", network_includes=[]) + findings = _run_check(build_api_token_client({token.id: token})) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "from any IP" in findings[0].status_extended + + def test_token_restricted_to_unknown_zone_fails(self): + token = api_token(network_connection="ZONE", network_includes=["nzo-missing"]) + findings = _run_check( + build_api_token_client( + {token.id: token}, known_network_zone_ids={"nzo-corp"} + ) + ) + assert len(findings) == 1 + assert findings[0].status == "FAIL" + assert "unknown Network Zone" in findings[0].status_extended